nsq源码解读之nsqlookupd

NSQ由3个进程组成:

  • nsqd: 接收,维护队列和分发消息给客户端的daemon进程
  • nsqlookupd: 管理拓扑信息并提供最终一致性的发现服务
  • nsqadmin: 用于实时监控集群运行并提供管理命令的管理网站平台。
    我们先从nsqlookupd开始。

1. 程序入口

nsqlookup的入口函数在apps/nsqlookupd/nsqlookupd.go这个文件中。

//apps/nsqlookupd/nsqlookupd.go
func main() {
    prg := &program{}
    if err := svc.Run(prg, syscall.SIGINT, syscall.SIGTERM); err != nil {
        log.Fatal(err)
    }
}

这里用到了github.com/judwhite/go-svc/svc管理进程。实际工作中调用的是Init,Start,Stop三个函数。

  • Init函数判断了当前的操作系统环境,如果是windwos系统的话,就会将修改工作目录??梢圆慰?code>https://github.com/judwhite/go-svc首页的例子。
  • Start函数实现了主体功能,接下来会具体分析。
  • Stop函数接受外界的signal,如果收到syscall.SIGINT和syscall.SIGTERM信号,就会被执行。

2. Stop函数

先易后难,先解读一下Stop函数。Stop函数调用Exit函数,关闭了tcp服务和http服务,然后等两个服务关闭之后,程序结束?!暗攘礁龇窆乇铡闭飧龆魃婕暗絞oroutine同步,nsq通过WaitGroup(参考Goroutine同步)实现。

//nsqlookupd/nsqlookupd.go
func (l *NSQLookupd) Exit() {
    if l.tcpListener != nil {
        l.tcpListener.Close()
    }

    if l.httpListener != nil {
        l.httpListener.Close()
    }
    l.waitGroup.Wait()
}

//internal/util/wait_group_wrapper.go
func (w *WaitGroupWrapper) Wrap(cb func()) {
    w.Add(1)
    go func() {
        cb()
        w.Done()
    }()
}

其中cb函数以tcp服务为例,当间接检测到tcp已经close时,退出for循环,cb执行结束,waitGroup计数器减一。
这里通过error的值判断tcpListener是否关闭的方式,值得关注一下。

//internal/protocol/tcp_server.go
func TCPServer(listener net.Listener, handler TCPHandler, l app.Logger) {
    l.Output(2, fmt.Sprintf("TCP: listening on %s", listener.Addr()))

    for {
        clientConn, err := listener.Accept()
        if err != nil {
            if nerr, ok := err.(net.Error); ok && nerr.Temporary() {
                l.Output(2, fmt.Sprintf("NOTICE: temporary Accept() failure - %s", err))
                runtime.Gosched()
                continue
            }
            // theres no direct way to detect this error because it is not exposed
            if !strings.Contains(err.Error(), "use of closed network connection") {
                l.Output(2, fmt.Sprintf("ERROR: listener.Accept() - %s", err))
            }
            break
        }
        go handler.Handle(clientConn)
    }

    l.Output(2, fmt.Sprintf("TCP: closing %s", listener.Addr()))
}

3. Start函数

Start函数实现了主要的功能。首先是读配置,然后初始化nsqlookupd,最后启动了tcp服务和http服务。
其中NSQLookupd.DB中维护了所有的消息的生产者信息。

3.1 tcp服务

tcp协议格式: 4字节的size,4字节的协议版本号(V1),之后的都是数据。

[x][x][x][x][x][x][x][x][x][x][x][x]...
|  (int32) ||  (int32) || (binary)
|  4-byte  ||  4-byte  || N-byte
------------------------------------...
    size      frame ID     data

tcp解包和处理的部分代码为nsqlookupd/tcp.go和nsqlookupd/lookup_protocol_v1.go。需要注意的是,producer与nsqlookupd维持了一个长连接。tcp头域的8个字节只有第一次连接时才会发送。
其中IOLoop中这几行代码,会持续的从tcp连接中读取数据包。

//nsqlookupd/lookup_protocol_v1.go
client := NewClientV1(conn)
reader := bufio.NewReader(client)
for {
    line, err = reader.ReadString('\n')
......

tcp服务支持4种操作PING,IDENTIFY,REGISTER,UNREGISTER。
PING用来维持连接,IDENTIFY用来nsqlookupd和producer之间交换身份信息和端口配置信息,REGISTER和UNREGISTER分别是注册和删除producer(通过NSQLookupd.DB)

3.2 http服务

http服务支持一系列接口
有两点比较有趣:

  1. nsq实现了一个装饰器decorator,是的,效果和python里的装饰器一样!使用如下:
//nsqlookupd/http.go
router.Handle("GET", "/ping", http_api.Decorate(s.pingHandler, log, http_api.PlainText))

Decorator实现方式如下:

//internal/http_api/api_response.go
type Decorator func(APIHandler) APIHandler
func Decorate(f APIHandler, ds ...Decorator) httprouter.Handle {
    decorated := f
    for _, decorate := range ds {
        decorated = decorate(decorated)
    }
    return func(w http.ResponseWriter, req *http.Request, ps httprouter.Params) {
        decorated(w, req, ps)
    }
}
  1. 有个接口叫"/topic/tombstone",tombstone是什么意思呢?字面上是墓碑的意思。在这里的意思,引用官网的一段话:

However, it gets a bit more complicated when a topic is no longer produced on a subset of nodes. Because of the way consumers query nsqlookupd and connect to all producers you enter into race conditions with attempting to remove the information from the cluster and consumers discovering that node and reconnecting (thus pushing updates that the topic is still produced on that node). The solution in these cases is to use “tombstones”. A tombstone in nsqlookupd context is producer specific and lasts for a configurable --tombstone-lifetime time. During that window the producer will not be listed in /lookup queries, allowing the node to delete the topic, propagate that information to nsqlookupd (which then removes the tombstoned producer), and prevent any consumer from re-discovering that node.

如果要下掉某个topic的部分节点,因为消费者会查询nsqlookup然后去连所有的生产者,会产生一个问题:一方面,nsqlookupd会去删除集群中相关的信息,另一方面在下掉这部分生产者之后,消费者不会立刻更新生产者的信息,还是会继续重新连接生产者,这会促使生产者继续生产。解决的办法就是使用"tombstones"。生产者会存在tombstone-lifetime的时间。在那个时间窗口里面,消费者去/lookup的时候,看不到这个生产者,允许这个生产者节点删除这个topic,同时将这个信息传给nsqlookupd,然后删除被tombstoned的节点,阻止消费者重连这个生产者节点。

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,172评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,346评论 3 389
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 159,788评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,299评论 1 288
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,409评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,467评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,476评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,262评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,699评论 1 307
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,994评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,167评论 1 343
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,827评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,499评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,149评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,387评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,028评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,055评论 2 352

推荐阅读更多精彩内容