在golang中操作nsq

NSQ介绍
NSQ是Go语言编写的一个开源的实时分布式内存消息队列,其性能十分优异。 NSQ的优势有以下优势:
- NSQ提倡分布式和分散的拓扑,没有单点故障,支持容错和高可用性,并提供可靠的消息交付保证
- NSQ支持横向扩展,没有任何集中式代理。
- NSQ易于配置和部署,并且内置了管理界面。
NSQ的应用场景
通常来说,消息队列都适用以下场景。
NSQ的几个组件
- nsqd:一个负责接收、排队、转发消息到客户端的守护进程
- nsqlookupd:管理拓扑信息, 用于收集nsqd上报的topic和channel,并提供最终一致性的发现服务的守护进程
- nsqadmin:一套Web用户界面,可实时查看集群的统计数据和执行相应的管理任务
- utilities:基础功能、数据流处理工具,如nsq_stat、nsq_tail、nsq_to_file、nsq_to_http、nsq_to_nsq、to_nsq
好了,以上内容皆为copy,这些东西大家自己去看官网百科就行了。
正题

Producer
package main
import (
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"syscall"
"time"
)
const nsqAddress = "127.0.0.1:4150"
func main(){
config:=nsq.NewConfig()
producer,err:=nsq.NewProducer(nsqAddress,config)
if err!=nil {
log.Printf(err.Error())
}
err=producer.Publish("Mytopic",[]byte("Hello I'm the first topic!"))
if err!=nil {
log.Println(err.Error())
}
time.Sleep(3*time.Second)
}
comsumer
package main
import (
"fmt"
"github.com/nsqio/go-nsq"
"log"
"os"
"os/signal"
"syscall"
"time"
)
const nsqAddress = "127.0.0.1:4150"
type MyHandler struct {
Title string
}
// HandleMessage 是需要实现的处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
return
}
func main(){
config:=nsq.NewConfig()
config.LookupdPollInterval = 15 * time.Second
c,err:=nsq.NewConsumer("Mytopic","topic",config)
if err != nil {
log.Println(err.Error())
}
coms:=&MyHandler{Title: "Msfcode"}
c.AddHandler(coms)
err=c.ConnectToNSQD(nsqAddress)
if err!=nil {
log.Println(err.Error())
}
s := make(chan os.Signal) // 定义一个信号的通道
signal.Notify(s, syscall.SIGINT) // 转发键盘中断信号到c
<-s // 阻塞
}
坑
nsqd启动的时候参数是 --nsqlookupd-tcp-address=xxxx http不行 启动nsqadmin使用的是http那个 nsqadmin --nsqlookupd-http-address=xxxxxxx 创建nsq消费者的时候需要直接实现处理消息的方法类似:
type MyHandler struct {
Title string
}
// HandleMessage 是需要实现的处理消息的方法
func (m *MyHandler) HandleMessage(msg *nsq.Message) (err error) {
fmt.Printf("%s recv from %v, msg:%v\n", m.Title, msg.NSQDAddress, string(msg.Body))
return
}

在消费者代码中,必须设置消费者的Handler通过AddHandler设置handler,然后通过nsqlookupd查找nsqd进行消费,这里也可以直连nsqd进行消费,不过直连消费只可以接收到连接之后生产者才生产的信息

消费者代码中nsqlookupd地址最好使用4161那个http地址,4160会爆提示http1.x什么来着
Reference


本文系作者 @孤独常伴 原创发布在 L0ne1y。未经许可,禁止转载。