NSQ由3个进程组成:
- nsqd: 接收,维护队列和分发消息给客户端的daemon进程
- nsqlookupd: 管理拓扑信息并提供最终一致性的发现服务
- nsqadmin: 用于实时监控集群运行并提供管理命令的管理网站平台。
我们先从nsqlookupd开始。
1. 程序入口
nsqlookup的入口函数在apps/nsqlookupd/nsqlookupd.go这个文件中。
//apps/nsqlookupd/nsqlookupd.go
func main() {
prg := &program{}
if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
这里用到了github.com/judwhite/go-svc/svc
管理进程。实际工作中调用的是Init,Start,Stop三个函数。
- Init函数判断了当前的操作系统环境,如果是windwos系统的话,就会将修改工作目录??梢圆慰?code>https://github.com/judwhite/go-svc首页的例子。
- Start函数实现了主体功能,接下来会具体分析。
- Stop函数接受外界的signal,如果收到syscall.SIGINT和syscall.SIGTERM信号,就会被执行。
2. Stop函数
先易后难,先解读一下Stop函数。Stop函数调用Exit函数,关闭了tcp服务和http服务,然后等两个服务关闭之后,程序结束?!暗攘礁龇窆乇铡闭飧龆魃婕暗絞oroutine同步,nsq通过WaitGroup(参考Goroutine同步)实现。
//nsqlookupd/nsqlookupd.go
func (l *NSQLookupd) Exit() {
if l.tcpListener != nil {
l.tcpListener.Close()
}
if l.httpListener != nil {
l.httpListener.Close()
}
l.waitGroup.Wait()
}
//internal/util/wait_group_wrapper.go
func (w *WaitGroupWrapper) Wrap(cb func()) {
w.Add(1)
go func() {
cb()
w.Done()
}()
}
其中cb函数以tcp服务为例,当间接检测到tcp已经close时,退出for循环,cb执行结束,waitGroup计数器减一。
这里通过error的值判断tcpListener是否关闭的方式,值得关注一下。
//internal/protocol/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))
for {
clientConn, err := listener.Accept()
if err != nil {
if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
runtime.Gosched()
continue
}
// theres no direct way to detect this error because it is not exposed
if !strings.Contains(err.Error(), "use of closed network connection") {
l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
}
break
}
go handler.Handle(clientConn)
}
l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}
3. Start函数
Start函数实现了主要的功能。首先是读配置,然后初始化nsqlookupd,最后启动了tcp服务和http服务。
其中NSQLookupd.DB中维护了所有的消息的生产者信息。
3.1 tcp服务
tcp协议格式: 4字节的size,4字节的协议版本号(V1),之后的都是数据。
[x][x][x][x][x][x][x][x][x][x][x][x]...
| (int32) || (int32) || (binary)
| 4-byte || 4-byte || N-byte
------------------------------------...
size frame ID data
tcp解包和处理的部分代码为nsqlookupd/tcp.go和nsqlookupd/lookup_protocol_v1.go。需要注意的是,producer与nsqlookupd维持了一个长连接。tcp头域的8个字节只有第一次连接时才会发送。
其中IOLoop中这几行代码,会持续的从tcp连接中读取数据包。
//nsqlookupd/lookup_protocol_v1.go
client := NewClientV1(conn)
reader := bufio.NewReader(client)
for {
line, err = reader.ReadString('\n')
......
tcp服务支持4种操作PING,IDENTIFY,REGISTER,UNREGISTER。
PING用来维持连接,IDENTIFY用来nsqlookupd和producer之间交换身份信息和端口配置信息,REGISTER和UNREGISTER分别是注册和删除producer(通过NSQLookupd.DB)
3.2 http服务
http服务支持一系列接口。
有两点比较有趣:
- nsq实现了一个装饰器decorator,是的,效果和python里的装饰器一样!使用如下:
//nsqlookupd/http.go
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))
Decorator实现方式如下:
//internal/http_api/api_response.go
type Decorator func(APIHandler) APIHandler
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
decorated := f
for _, decorate := range ds {
decorated = decorate(decorated)
}
return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
decorated(w, req, ps)
}
}
- 有个接口叫"/topic/tombstone",tombstone是什么意思呢?字面上是墓碑的意思。在这里的意思,引用官网的一段话:
However, it gets a bit more complicated when a topic is no longer produced on a subset of nodes. Because of the way consumers query nsqlookupd and connect to all producers you enter into race conditions with attempting to remove the information from the cluster and consumers discovering that node and reconnecting (thus pushing updates that the topic is still produced on that node). The solution in these cases is to use “tombstones”. A tombstone in nsqlookupd context is producer specific and lasts for a configurable --tombstone-lifetime time. During that window the producer will not be listed in /lookup queries, allowing the node to delete the topic, propagate that information to nsqlookupd (which then removes the tombstoned producer), and prevent any consumer from re-discovering that node.
如果要下掉某个topic的部分节点,因为消费者会查询nsqlookup然后去连所有的生产者,会产生一个问题:一方面,nsqlookupd会去删除集群中相关的信息,另一方面在下掉这部分生产者之后,消费者不会立刻更新生产者的信息,还是会继续重新连接生产者,这会促使生产者继续生产。解决的办法就是使用"tombstones"。生产者会存在tombstone-lifetime的时间。在那个时间窗口里面,消费者去/lookup的时候,看不到这个生产者,允许这个生产者节点删除这个topic,同时将这个信息传给nsqlookupd,然后删除被tombstoned的节点,阻止消费者重连这个生产者节点。