使用Golang实现NSQ操作指南

发表时间: 2024-03-08 22:48

生产者

package mainimport (	"fmt"	"github.com/nsqio/go-nsq"	"io"	"net/http"	"time")// CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build main.govar producer *nsq.Producer// 初始化生产者func initProducer(str string) (err error) {	config := nsq.NewConfig()	producer, err = nsq.NewProducer(str, config)	if err != nil {		fmt.Printf("create producer failed, err:%v\n", err)		return err	}	return nil}func receiveMessage(w http.ResponseWriter, r *http.Request) {	body, err := io.ReadAll(r.Body)	if err != nil {		println(err)		return	}	fmt.Printf("topic %s => %s \n", "orders", string(body))	err = producer.Publish("orders", body)	if err != nil {		fmt.Printf("publish msg to nsq failed, err:%v\n", err)	}	_, err = w.Write([]byte("{\"code\":0,\"msg\":\"success\"}"))	if err != nil {		return	}}func main() {	nsqAddress := "127.0.0.1:4150"	err := initProducer(nsqAddress)	if err != nil {		fmt.Printf("init producer failed, err:%v\n", err)		return	}	server := &http.Server{		Addr:         "127.0.0.1:8080",		ReadTimeout:  2 * time.Second,		WriteTimeout: 2 * time.Second,	}	mux := http.NewServeMux()	mux.HandleFunc("/messagePush", receiveMessage)	server.Handler = mux	err = server.ListenAndServe()	if err != nil {		fmt.Printf("start , err:%v\n", err)		return	}}

消费者

// nsq_consumer/main.gopackage mainimport (	"fmt"	"os"	"os/signal"	"syscall"	"time"	"github.com/nsqio/go-nsq")// NSQ Consumer Demo// MyHandler 是一个消费者类型type MyHandler struct {	Title string}// HandleMessage 是需要实现的处理消息的方法func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {	fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))	return}// 初始化消费者func initConsumer(topic string, channel string, address string) (err error) {	config := nsq.NewConfig()	config.LookupdPollInterval = 15 * time.Second	c, err := nsq.NewConsumer(topic, channel, config)	if err != nil {		fmt.Printf("create consumer failed, err:%v\n", err)		return	}	consumer := &MyHandler{		Title: "consumer",	}	c.AddHandler(consumer)	// if err := c.ConnectToNSQD(address); err != nil { // 直接连NSQD	if err := c.ConnectToNSQLookupd(address); err != nil { // 通过lookupd查询		return err	}	return nil}func main() {	err := initConsumer("order", "three", "127.0.0.1:4161")	if err != nil {		fmt.Printf("init consumer failed, err:%v\n", err)		return	}	c := make(chan os.Signal)        // 定义一个信号的通道	signal.Notify(c, syscall.SIGINT) // 转发键盘中断信号到c	<-c                              // 阻塞}