Kafka系列1----Rebalance过程(1)

1.Coordinator介绍

Coordinator简单的总结一下就是负责协调组内partition分配,以及Group的管理,每个Broker上都有一个GroupCoordinator的实例
负载均衡的过程涉及以下的几个概念

  • group member:一个消费组类的成员
  • group leader:一个消费组的leader,负责分配partition
  • coodinator:协调者
    涉及以下几个请求:
  • GroupCoordinatorRequest(GCR)
  • JoinGroupRequest(JGR)
  • SyncGroupRequest(SGR)

2.主要流程

主要的流程如下:

  1. 发送GCR请求寻找Coordinator:这个过程主要会向集群中负载最小的broker发起请求,等待成功返回后,那么该Broker将作为Coordinator,尝试连接该Coordinator
  2. 发送JGR请求加入该组:当成功找到Coordinator后,那么就要发起加入group的请求,表示该consumer是该组的成员,Coordinator会接收到该请求,会给集群分配一个Leader(通常是第一个),让其负责partition的分配
  3. 发送SGR请求:JGR请求成功后,如果发现当前Consumer是leader,那么会进行partition的分配,并发起SGR请求将分配结果发送给Coordinator;如果不是leader,那么也会发起SGR请求,不过分配结果为空

流程图如下


Coordinator.png

3.具体实现

具体的过程是在ConsumerCoordinator.poll方法里实现的

    public void poll(long now) {
        invokeCompletedOffsetCommitCallbacks();
        //手动指定分区的模式不会有rebalance过程
        if (subscriptions.partitionsAutoAssigned() && coordinatorUnknown()) {
            ensureCoordinatorReady();//GCR
            now = time.milliseconds();
        }

        if (needRejoin()) {
            if (subscriptions.hasPatternSubscription())
                client.ensureFreshMetadata();

            ensureActiveGroup();// JGR和SCR
            now = time.milliseconds();
        }

        pollHeartbeat(now);
        maybeAutoCommitOffsetsAsync(now);
    }

先看下ensureCoordinatorReady方法

    protected synchronized boolean ensureCoordinatorReady(long startTimeMs, long timeoutMs) {
        long remainingMs = timeoutMs;

        while (coordinatorUnknown()) {//没找到coordinator 或者 coordinator 已经挂了
            //具体获取Coordinator的过程
            RequestFuture<Void> future = lookupCoordinator();
            client.poll(future, remainingMs);

            if (future.failed()) {
                if (future.isRetriable()) {
                    remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
                    if (remainingMs <= 0)
                        break;
                    client.awaitMetadataUpdate(remainingMs);
                } else
                    throw future.exception();
            } else if (coordinator != null && client.connectionFailed(coordinator)) {
                // we found the coordinator, but the connection has failed, so mark
                // it dead and backoff before retrying discovery
                coordinatorDead();
                time.sleep(retryBackoffMs);
            }

            remainingMs = timeoutMs - (time.milliseconds() - startTimeMs);
            if (remainingMs <= 0)
                break;
        }
        return !coordinatorUnknown();
    }

再看下lookupCoordinator方法

    protected synchronized RequestFuture<Void> lookupCoordinator() {
        if (findCoordinatorFuture == null) {
            // 从集群中找出一个负载最小的broker节点
            Node node = this.client.leastLoadedNode();
            if (node == null) {
                return RequestFuture.noBrokersAvailable();
            } else
                findCoordinatorFuture = sendGroupCoordinatorRequest(node);
        }
        return findCoordinatorFuture;
    }
    private RequestFuture<Void> sendGroupCoordinatorRequest(Node node) {
        GroupCoordinatorRequest.Builder requestBuilder =
                new GroupCoordinatorRequest.Builder(this.groupId);
        return client.send(node, requestBuilder)
                     .compose(new GroupCoordinatorResponseHandler());
    }

上面发送了GCR请求,请求参数就一个groupId,并且设置回调GroupCoordinatorResponseHandler,成功后会调用GroupCoordinatorResponseHandler的onSuccess方法

    private class GroupCoordinatorResponseHandler extends RequestFutureAdapter<ClientResponse, Void> {
        @Override
        public void onSuccess(ClientResponse resp, RequestFuture<Void> future) {
            log.debug("Received GroupCoordinator response {} for group {}", resp, groupId);

            GroupCoordinatorResponse groupCoordinatorResponse = (GroupCoordinatorResponse) resp.responseBody();
            Errors error = Errors.forCode(groupCoordinatorResponse.errorCode());
            clearFindCoordinatorFuture();
            if (error == Errors.NONE) {
                synchronized (AbstractCoordinator.this) {
                    //成功后会返回broker节点的信息,那么consumer就与coordinator 建立连接
                    AbstractCoordinator.this.coordinator = new Node(
                            Integer.MAX_VALUE - groupCoordinatorResponse.node().id(),
                            groupCoordinatorResponse.node().host(),
                            groupCoordinatorResponse.node().port());
                    client.tryConnect(coordinator);
                    heartbeat.resetTimeouts(time.milliseconds());
                }
                future.complete(null);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(error);
            }
        }
    }

分析完GCR请求之后,在看下发起JGR和CGR请求的ensureActiveGroup方法

    public void ensureActiveGroup() {
        //又调用了一次方法,判断是否已经找到Coordinator
        ensureCoordinatorReady();
        //开启心跳线程
        startHeartbeatThreadIfNeeded();
        //JGR和SGR请求的真正处理的地方
        joinGroupIfNeeded();
    }
    void joinGroupIfNeeded() {
        while (needRejoin() || rejoinIncomplete()) {
            ensureCoordinatorReady();

            // 提交offset、触发监听器、重置订阅关系
            if (needsJoinPrepare) {
                onJoinPrepare(generation.generationId, generation.memberId);
                needsJoinPrepare = false;
            }
            // JGR和SGR
            RequestFuture<ByteBuffer> future = initiateJoinGroup();
            client.poll(future);
            resetJoinGroupFuture();

            if (future.succeeded()) {
                needsJoinPrepare = true;
                // 成功之后执行的操作
                onJoinComplete(generation.generationId, generation.memberId, generation.protocol, future.value());
            } else {
                RuntimeException exception = future.exception();
                if (exception instanceof UnknownMemberIdException ||
                        exception instanceof RebalanceInProgressException ||
                        exception instanceof IllegalGenerationException)
                    continue;
                else if (!future.isRetriable())
                    throw exception;
                time.sleep(retryBackoffMs);
            }
        }
    }

needRejoin方法返回rejoinNeeded的值,表示是否需要重新发起JCR请求,这个后面会讲到
rejoinIncomplete=>joinFuture != null ,joinFuture是发起JCR请求后返回的futrue,在完成之后,会将其设置为null,joinFuture != null即表示在请处理当中,则执行循环(joinFuture!=null表示请求已经在执行了,但是为什么还需要重试?)

initiateJoinGroup方法如下

    private synchronized RequestFuture<ByteBuffer> initiateJoinGroup() {
        if (joinFuture == null) {
            disableHeartbeatThread();

            state = MemberState.REBALANCING;
            joinFuture = sendJoinGroupRequest();
            joinFuture.addListener(new RequestFutureListener<ByteBuffer>() {
                @Override
                public void onSuccess(ByteBuffer value) {
                    synchronized (AbstractCoordinator.this) {
                        state = MemberState.STABLE;//改变当前组的状态
                        if (heartbeatThread != null)
                            heartbeatThread.enable();
                    }
                }
            });
        }
        return joinFuture;
    }
    private RequestFuture<ByteBuffer> sendJoinGroupRequest() {
        if (coordinatorUnknown())
            return RequestFuture.coordinatorNotAvailable();
        JoinGroupRequest.Builder requestBuilder = new JoinGroupRequest.Builder(
                groupId,
                this.sessionTimeoutMs,
                this.generation.memberId,
                protocolType(),
                metadata()).setRebalanceTimeout(this.rebalanceTimeoutMs);
        return client.send(coordinator, requestBuilder)
                .compose(new JoinGroupResponseHandler());
    }
private class JoinGroupResponseHandler extends CoordinatorResponseHandler<JoinGroupResponse, ByteBuffer> {
        @Override
        public void handle(JoinGroupResponse joinResponse, RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(joinResponse.errorCode());
            if (error == Errors.NONE) {
                sensors.joinLatency.record(response.requestLatencyMs());

                synchronized (AbstractCoordinator.this) {
                    if (state != MemberState.REBALANCING) {
                        future.raise(new UnjoinedGroupException());
                    } else {
                        AbstractCoordinator.this.generation = new Generation(joinResponse.generationId(),
                                joinResponse.memberId(), joinResponse.groupProtocol());
                        AbstractCoordinator.this.rejoinNeeded = false;
                        if (joinResponse.isLeader()) {
                            onJoinLeader(joinResponse).chain(future);
                        } else {
                            onJoinFollower().chain(future);
                        }
                    }
                }
            } else if (error == Errors.GROUP_LOAD_IN_PROGRESS) {
                ....
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
               ....
            } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
               ....
            } else if (error == Errors.INCONSISTENT_GROUP_PROTOCOL || error == Errors.INVALID_SESSION_TIMEOUT  || error == Errors.INVALID_GROUP_ID) {
                ....
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                ....
            } else {
                ....
            }
        }
    }

看到进来的时候,会判断joinFuture == null,这个为了防止在请求还没处理完的时候,又重复执行了加入组的操作,那么外面joinFuture != null会继续执行下面的代码,应该只有一种可能,就是请求失败了,重新执行client.poll(future);这个操作获取结果
sendJoinGroupRequest才是发送请求的地方,同GCR,看下handler的回调方法,response会返回是否leader的标志,按照一开始说的,leader需要通过SGR请求把分配结果发送给Coodinator,而follower不需要该参数
那么onJoinLeader和onJoinFollower方法就很好猜了,发送SGR请求,直接看对应的handler。不过onJoinLeader里会分配好partition

    private class SyncGroupResponseHandler extends CoordinatorResponseHandler<SyncGroupResponse, ByteBuffer> {
        @Override
        public void handle(SyncGroupResponse syncResponse,
                           RequestFuture<ByteBuffer> future) {
            Errors error = Errors.forCode(syncResponse.errorCode());
            if (error == Errors.NONE) {
                sensors.syncLatency.record(response.requestLatencyMs());
                future.complete(syncResponse.memberAssignment());//这是分配的结果,将其设置到future中,在onJoinComplete中使用
            } else {
                requestRejoin();
                if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                    ...
                } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                    ...
                } else if (error == Errors.UNKNOWN_MEMBER_ID  || error == Errors.ILLEGAL_GENERATION) {
                    ....
                } else if (error == Errors.GROUP_COORDINATOR_NOT_AVAILABLE || error == Errors.NOT_COORDINATOR_FOR_GROUP) {
                    ....
                } else {
                    ....
                }
            }
        }
    }

返回没什么说的,看下失败的情况,requestRejoin这个方法,会把rejoinNeeded设置为true,那么在外面循环的条件就是rejoinNeeded为true,就会再次执行
这里有种情况就是一个leader和一个follower,follower先发起了请求但是leader还没有将分配结果发送出去,那么follower这时是取不到分配结果的,那么会请求失败,设置为true之后,重新执行这个过程

SGR和JGR请求成功之后,会执行onJoinComplete方法

    protected void onJoinComplete(int generation,String memberId,String assignmentStrategy,ByteBuffer assignmentBuffer) {
        // 只有leader才负责数据的变化
        if (!isLeader)
            assignmentSnapshot = null;
        
        PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
        if (assignor == null)
            throw new IllegalStateException("Coordinator selected invalid assignment protocol: " + assignmentStrategy);
        //这个assignmentBuffer就是SyncGroupResponseHandler往future设置的值
        Assignment assignment = ConsumerProtocol.deserializeAssignment(assignmentBuffer);

        // set the flag to refresh last committed offsets
        subscriptions.needRefreshCommits();

        // 更新leader分配的partition
        subscriptions.assignFromSubscribed(assignment.partitions());

        // check if the assignment contains some topics that were not in the original
        // subscription, if yes we will obey what leader has decided and add these topics
        // into the subscriptions as long as they still match the subscribed pattern
        //
        // TODO this part of the logic should be removed once we allow regex on leader assign
        Set<String> addedTopics = new HashSet<>();
        for (TopicPartition tp : subscriptions.assignedPartitions()) {
            if (!joinedSubscription.contains(tp.topic()))
                addedTopics.add(tp.topic());
        }

        if (!addedTopics.isEmpty()) {
            Set<String> newSubscription = new HashSet<>(subscriptions.subscription());
            Set<String> newJoinedSubscription = new HashSet<>(joinedSubscription);
            newSubscription.addAll(addedTopics);
            newJoinedSubscription.addAll(addedTopics);

            this.subscriptions.subscribeFromPattern(newSubscription);
            this.joinedSubscription = newJoinedSubscription;
        }

        // update the metadata and enforce a refresh to make sure the fetcher can start
        // fetching data in the next iteration
        this.metadata.setTopics(subscriptions.groupSubscription());
        client.ensureFreshMetadata();

        // give the assignor a chance to update internal state based on the received assignment
        assignor.onAssignment(assignment);

        // reschedule the auto commit starting from now
        this.nextAutoCommitDeadline = time.milliseconds() + autoCommitIntervalMs;

        // execute the user's callback after rebalance
        ConsumerRebalanceListener listener = subscriptions.listener();
        log.info("Setting newly assigned partitions {} for group {}", subscriptions.assignedPartitions(), groupId);
        try {
            Set<TopicPartition> assigned = new HashSet<>(subscriptions.assignedPartitions());
            listener.onPartitionsAssigned(assigned);//执行监听
        } catch (WakeupException | InterruptException e) {
            throw e;
        } catch (Exception e) {
            log.error("User provided listener {} for group {} failed on partition assignment",
                    listener.getClass().getName(), groupId, e);
        }
    }
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,128评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,316评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,737评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,283评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,384评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,458评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,467评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,251评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,688评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,980评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,155评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,818评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,492评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,142评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,382评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,020评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,044评论 2 352

推荐阅读更多精彩内容