美团Leaf 源码阅读(一)

美团Leaf 分布式ID生成器源码分析(一)

There are no two identical leaves in the world.
世界上没有两片完全相同的树叶。
— 莱布尼茨

Leaf 最早期需求是各个业务线的订单ID生成需求。在美团早期,有的业务直接通过DB自增的方式生成ID,有的业务通过redis缓存来生成ID,也有的业务直接用UUID这种方式来生成ID。以上的方式各自有各自的问题,因此我们决定实现一套分布式ID生成服务来满足需求。具体Leaf 设计文档见: leaf 美团分布式ID生成服务

官方代码仓库: Leaf

工程目录结构

项目分为两个模块: leaf-serverleaf-core,下面分开进行介绍

leaf-server

leaf-server 主要作用是使用spring-boot框架对外提供服务接口.

leaf-server结构

leaf-core

leaf-core 是核心代码,提供两种生成的ID的方式,包括号段模式和snowflake模式.

leaf-core

源码分析

相关分析代码已上传到github: 美团 LEAF

核心代码都在leaf-core中.

SegmentIDGenImpl分析

  1. 查看IDGenServiceTest
    IDGenServiceTest
  2. Config ID Gen
  3. 执行init方法
    • 执行init方法,从数据库中获取所有的tag,并保留在内存中.
    • 定时从数据库中获取最新数据
  4. 获取Id
        @Override
    public Result get(final String key) {
        // 必须在 SegmentIDGenImpl 初始化后执行. init()方法
        if (!initOK) {
            return new Result(EXCEPTION_ID_IDCACHE_INIT_FALSE, Status.EXCEPTION);
        }
        // 通过缓存获取SegmentBuffer
        if (cache.containsKey(key)) {
    
            // 从缓存中获取对应key的 SegmentBuffer
            SegmentBuffer buffer = cache.get(key);
    
            // SegmentBuffer 没有初始化,则先进行初始化.
            if (!buffer.isInitOk()) {
                synchronized (buffer) {
                    // 双重判断,避免重复执行SegmentBuffer的初始化操作.
                    if (!buffer.isInitOk()) {
                        try {
                            updateSegmentFromDb(key, buffer.getCurrent());
                            logger.info("Init buffer. Update leafkey {} {} from db", key, buffer.getCurrent());
                            buffer.setInitOk(true);
                        } catch (Exception e) {
                            logger.warn("Init buffer {} exception", buffer.getCurrent(), e);
                        }
                    }
                }
            }
            return getIdFromSegmentBuffer(cache.get(key));
        }
        return new Result(EXCEPTION_ID_KEY_NOT_EXISTS, Status.EXCEPTION);
    }
    
  5. updateSegmentFromDb
     public void updateSegmentFromDb(String key, Segment segment) {
        StopWatch sw = new Slf4JStopWatch();
        SegmentBuffer buffer = segment.getBuffer();
    
        LeafAlloc leafAlloc;
    
        if (!buffer.isInitOk()) {
            // 第一次初始化
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setStep(leafAlloc.getStep());
    
            //leafAlloc中的step为DB中的step
            buffer.setMinStep(leafAlloc.getStep());
        } else if (buffer.getUpdateTimestamp() == 0) {
            // 第二次,需要准备next Segment
            // 第二号段,设置updateTimestamp
            leafAlloc = dao.updateMaxIdAndGetLeafAlloc(key);
            buffer.setUpdateTimestamp(System.currentTimeMillis());
    
            //leafAlloc中的step为DB中的step
            buffer.setMinStep(leafAlloc.getStep());
        } else {
            // 三次以上 动态设置 nextStep
            long duration = System.currentTimeMillis() - buffer.getUpdateTimestamp();
            int nextStep = buffer.getStep();
    
            /**
             *  动态调整step
             *  1) duration < 15 分钟 : step 变为原来的2倍. 最大为 MAX_STEP
             *  2) 15分钟 < duration < 30分钟 : nothing
             *  3) duration > 30 分钟 : 缩小step ,最小为DB中配置的步数
             */
            // 15分钟
            if (duration < SEGMENT_DURATION) {
                if (nextStep * 2 > MAX_STEP) {
                    //do nothing
                } else {
                    // 步数 * 2
                    nextStep = nextStep * 2;
                }
                // 15分 < duration < 30
            } else if (duration < SEGMENT_DURATION * 2) {
                //do nothing with nextStep
            } else {
                // duration > 30 步数缩小一半,但是大于最小步数(数据库中配置的步数)
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
    
            logger.info("leafKey[{}], step[{}], duration[{}mins], nextStep[{}]", key, buffer.getStep(), String.format("%.2f",((double)duration / (1000 * 60))), nextStep);
            LeafAlloc temp = new LeafAlloc();
    
            temp.setKey(key);
            temp.setStep(nextStep);
            // 更新maxId by CustomStep (nextStep)
            leafAlloc = dao.updateMaxIdByCustomStepAndGetLeafAlloc(temp);
    
            // 更新 updateTimestamp
            buffer.setUpdateTimestamp(System.currentTimeMillis());
            // 设置 buffer的step
            buffer.setStep(nextStep);
            //leafAlloc的step为DB中的step
            buffer.setMinStep(leafAlloc.getStep());
        }
    
        // must set value before set max TODO
        // 暂时还未想通,这里为什么这样写.
        // 已经向作者提交了issue.(https://github.com/Meituan-Dianping/Leaf/issues/16)
        long value = leafAlloc.getMaxId() - buffer.getStep();
        segment.getValue().set(value);
    
        segment.setMax(leafAlloc.getMaxId());
        segment.setStep(buffer.getStep());
        sw.stop("updateSegmentFromDb", key + " " + segment);
    }
    
  6. getIdFromSegmentBuffer
     public Result getIdFromSegmentBuffer(final SegmentBuffer buffer) {
        while (true) {
            try {
                // 获取buffer的读锁
                buffer.rLock().lock();
                // 获取当前的号段
                final Segment segment = buffer.getCurrent();
    
                if (    // nextReady is false (下一个号段没有初始化.)
                        !buffer.isNextReady()
                        // idle = max - currentValue (当前号段下发的值到达设置的阈值 0.9 )
                        && (segment.getIdle() < 0.9 * segment.getStep())
                        // buffer 中的 threadRunning字段. 代表是否已经提交线程池运行.(是否有其他线程已经开始进行另外号段的初始化工作.
                        // 使用 CAS 进行更新. buffer 在任意时刻,只会有一个线程进行异步更新另外一个号段.
                        && buffer.getThreadRunning().compareAndSet(false, true)
                ) {
                    // 放入线程池进行异步更新.
                    service.execute(new Runnable() {
                        @Override
                        public void run() {
                            Segment next = buffer.getSegments()[buffer.nextPos()];
                            boolean updateOk = false;
                            try {
                                updateSegmentFromDb(buffer.getKey(), next);
    
                                // 更新成功,设置标记位为true
                                updateOk = true;
                                logger.info("update segment {} from db {}", buffer.getKey(), next);
                            } catch (Exception e) {
                                logger.warn(buffer.getKey() + " updateSegmentFromDb exception", e);
                            } finally {
                                if (updateOk) {
                                    // 获取buffer 的写锁
                                    buffer.wLock().lock();
                                    // next准备完成
                                    buffer.setNextReady(true);
                                    // next运行标记位设置为false
                                    buffer.getThreadRunning().set(false);
                                    buffer.wLock().unlock();
                                } else {
                                    buffer.getThreadRunning().set(false);
                                }
                            }
                        }
                    });
                }
    
                // 获取value
                long value = segment.getValue().getAndIncrement();
    
                // value < 当前号段的最大值,则返回改值
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }
            } finally {
                buffer.rLock().unlock();
            }
    
            // 等待下一个号段执行完成,执行代码在-> execute()
            // buffer.setNextReady(true);
            // buffer.getThreadRunning().set(false);
            waitAndSleep(buffer);
    
    
            try {
                // buffer 级别加写锁.
                buffer.wLock().lock();
                final Segment segment = buffer.getCurrent();
                // 获取value -> 为什么重复获取value, 多线程执行时,在进行waitAndSleep() 后,
                // current segment可能会被修改. 直接进行一次判断,提高速度,并且防止出错(在交换Segment前进行一次检查).
                long value = segment.getValue().getAndIncrement();
                if (value < segment.getMax()) {
                    return new Result(value, Status.SUCCESS);
                }
    
                // 执行到这里, 其他的线程没有进行号段的调换,并且当前号段所有号码已经下发完成.
                // 判断nextReady是否为true.
                if (buffer.isNextReady()) {
                    // 调换segment
                    buffer.switchPos();
                    // 调换完成后, 设置nextReady为false
                    buffer.setNextReady(false);
                } else {
                    // 进入这里的条件
                    // 1. 当前号段获取到的值大于maxValue
                    // 2. 另外一个号段还没有准备好
                    // 3. 等待时长大于waitAndSleep中的时间.
                    logger.error("Both two segments in {} are not ready!", buffer);
                    return new Result(EXCEPTION_ID_TWO_SEGMENTS_ARE_NULL, Status.EXCEPTION);
                }
            } finally {
                // finally代码块中释放写锁.
                buffer.wLock().unlock();
            }
        }
    }
    
  7. waitAndSleep
        /**
     * 等待下一个号段执行完成
     * buffer.setNextReady(true);
     * buffer.getThreadRunning().set(false);
     * @param buffer
     */
    private void waitAndSleep(SegmentBuffer buffer) {
        int roll = 0;
        while (buffer.getThreadRunning().get()) {
            roll += 1;
            if(roll > 10000) {
                try {
                    Thread.sleep(10);
                    break;
                } catch (InterruptedException e) {
                    logger.warn("Thread {} Interrupted",Thread.currentThread().getName());
                    break;
                }
            }
        }
    }
    

相关代码已上传到github: 美团 LEAF

技术重点解析

  1. volatile 修饰变量提升可见性
  2. 使用读写锁ReadWriteLock,提升并发读下的读取速度
  3. 使用Atomic变量 ,利用CAS机制保证原子性, 提高并发能力.
    if (    // nextReady is false (下一个号段没有初始化.)
                        !buffer.isNextReady()
                        // idle = max - currentValue (当前号段下发的值到达设置的阈值 0.9 )
                        && (segment.getIdle() < 0.9 * segment.getStep())
                        // buffer 中的 threadRunning字段. 代表是否已经提交线程池运行.(是否有其他线程已经开始进行另外号段的初始化工作.
                        // 使用 CAS 进行更新. buffer 在任意时刻,只会有一个线程进行异步更新另外一个号段.
                        && buffer.getThreadRunning().compareAndSet(false, true)
                ) {
                ...
                }
    
  4. 动态调整step来适应不同的请求速度.
     /**
             *  动态调整step
             *  1) duration < 15 分钟 : step 变为原来的2倍. 最大为 MAX_STEP
             *  2) 15分钟 < duration < 30分钟 : nothing
             *  3) duration > 30 分钟 : 缩小step ,最小为DB中配置的步数
             */
            // 15分钟
            if (duration < SEGMENT_DURATION) {
                if (nextStep * 2 > MAX_STEP) {
                    //do nothing
                } else {
                    // 步数 * 2
                    nextStep = nextStep * 2;
                }
                // 15分 < duration < 30
            } else if (duration < SEGMENT_DURATION * 2) {
                //do nothing with nextStep
            } else {
                // duration > 30 步数缩小一半,但是大于最小步数(数据库中配置的步数)
                nextStep = nextStep / 2 >= buffer.getMinStep() ? nextStep / 2 : nextStep;
            }
    
  5. 使用事务,保证update 操作和select 操作的原子性.
         /**
      * 使用事务保证这两步的原子性(事务的隔离机制)
      * 根据数据库中对应tag的step来更新max_value,同时获取 tag的信息
      * 1. UPDATE leaf_alloc SET max_id = max_id + step WHERE biz_tag = #{tag}
      * 2. SELECT biz_tag, max_id, step FROM leaf_alloc WHERE biz_tag = #{tag}
      * @param tag
      * @return
      */
     LeafAlloc updateMaxIdAndGetLeafAlloc(String tag);
    
        @Override
    public LeafAlloc updateMaxIdAndGetLeafAlloc(String tag) {
        SqlSession sqlSession = sqlSessionFactory.openSession();
        try {
            sqlSession.update("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.updateMaxId", tag);
            LeafAlloc result = sqlSession.selectOne("com.sankuai.inf.leaf.segment.dao.IDAllocMapper.getLeafAlloc", tag);
            sqlSession.commit();
            return result;
        } finally {
            sqlSession.close();
        }
    }
    

文章链接: www.blackchen.site/meituan-leaf-1
作者: BlackChen

最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351

推荐阅读更多精彩内容