使用Golang实现NSQ操作指南
发表时间: 2024-03-08 22:48
package mainimport ( "fmt" "github.com/nsqio/go-nsq" "io" "net/http" "time")// CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build main.govar producer *nsq.Producer// 初始化生产者func initProducer(str string) (err error) { config := nsq.NewConfig() producer, err = nsq.NewProducer(str, config) if err != nil { fmt.Printf("create producer failed, err:%v\n", err) return err } return nil}func receiveMessage(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) if err != nil { println(err) return } fmt.Printf("topic %s => %s \n", "orders", string(body)) err = producer.Publish("orders", body) if err != nil { fmt.Printf("publish msg to nsq failed, err:%v\n", err) } _, err = w.Write([]byte("{\"code\":0,\"msg\":\"success\"}")) if err != nil { return }}func main() { nsqAddress := "127.0.0.1:4150" err := initProducer(nsqAddress) if err != nil { fmt.Printf("init producer failed, err:%v\n", err) return } server := &http.Server{ Addr: "127.0.0.1:8080", ReadTimeout: 2 * time.Second, WriteTimeout: 2 * time.Second, } mux := http.NewServeMux() mux.HandleFunc("/messagePush", receiveMessage) server.Handler = mux err = server.ListenAndServe() if err != nil { fmt.Printf("start , err:%v\n", err) return }}
// nsq_consumer/main.gopackage mainimport ( "fmt" "os" "os/signal" "syscall" "time" "github.com/nsqio/go-nsq")// NSQ Consumer Demo// MyHandler 是一个消费者类型type MyHandler struct { Title string}// HandleMessage 是需要实现的处理消息的方法func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) { fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body)) return}// 初始化消费者func initConsumer(topic string, channel string, address string) (err error) { config := nsq.NewConfig() config.LookupdPollInterval = 15 * time.Second c, err := nsq.NewConsumer(topic, channel, config) if err != nil { fmt.Printf("create consumer failed, err:%v\n", err) return } consumer := &MyHandler{ Title: "consumer", } c.AddHandler(consumer) // if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询 return err } return nil}func main() { err := initConsumer("order", "three", "127.0.0.1:4161") if err != nil { fmt.Printf("init consumer failed, err:%v\n", err) return } c := make(chan os.Signal) // 定义一个信号的通道 signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c <-c // 阻塞}