Dubbo之降级Mock源码分析

限流,熔断,降级,

通俗的讲,降级可以形容为一些事件A的触发导致发生取代原有事件B的事件C,而熔断和限流就是这些事件A的其中之一。
熔断是异常情况发生的?;ご胧热绲饔霉┯ι?的短信API失败,改为调用供应商2的短信API。
限流是防止异常情况发生的措施,比如抢票时候抛出的提示。
一句口诀,消费者(使用)熔断,提供者(使用)限流。

上面也讲了熔断和限流是触发降级其中两个事件,那么在dubbo中我们如何触发降级(mock)?

  1. 通过服务治理触发(dubbo-admin)
  2. 配置接口mock参数触发

Dubbo中降级的使用

这边的讲解从通过配置接口mock参数触发降级的方式入手,其实服务治理也是相当于通过Override机制给接口增加了mock参数配置。

mock表达式格式

[force/fail] (return xxx |throw xxx | fail | true |default | {mockClassName} ) | false 

如果表达式为false,表示不启用降级功能
如果表达式以force为前缀,表示强制降级
如果表达以fail为前缀或者没有前缀,表示失败降级

去除force或fail前缀后,剩下的表达式表示具体的降级逻辑,逻辑如下

mock表达式 作用
true/default/fail 调用接口+Mock类对应的方法
{mockClass} 调用${mockClass}对应方法
return xxx 解析xxx为方法返回类型的对象返回
throw namespace...xxxException 抛出对应xxxException异常

其中对于xxx,支持基本类型,Collection,Map以及自定义对象

具体示例如下

示例 类型
empty 返回各种类型的空实现
null 直接返回null
true/false 返回对应布尔类型
`abc`,abc 根据方法返回类型解析,可能是String或枚举
{...} 根据方法返回类型解析,可能是map或自定义对象
[...] 根据方法返回类型解析,可以是Collection中的任意一种

如何配置mock表达式

mock表达式的配置方式一共有3种

  • 通过xml或@Reference的mock字段
  • Dubbo Admin控制台
    -在zk设置Override动态配置

后两种的原理都是对zk节点进行操作。

首先讲下第一种配置方式

<dubbo:reference id="demoService" check="false" interface="com.alibaba.dubbo.demo.DemoService" mock="return scj">
            <dubbo:parameter key="sayHello.mock" value="return scj" />
 </dubbo:reference>

因为spring对xml的配置是不支持引号,方括号,尖括号等符号的,满足的格式如以下正则

private static final Pattern PATTERN_NAME_HAS_SYMBOL = Pattern.compile("[:*,\\s/\\-._0-9a-zA-Z]+");

所以通过项目中配置的mock行为是有限的,因此在项目中推荐配置失败降级,通过一个Mock类来实现我们的降级逻辑。

而通过Dubbo-admin控制台去配置mock,更适合强制降级的场景。

通过zk触发强制降级示例代码如下

create -e /dubbo/com.alibaba.dubbo.demo.DemoService/configurators/override%3a%2f%2f10.111.27.41%3a20880%2fcom.alibaba.dubbo.demo.DemoService%3fsayHello.mock%3dforce%3areturn+%60scj+mock+hhh%60%26category%3dconfigurators 1

具体这个配置是如何生效的,可以看下我写的ZookeeperRegistry这篇文章。

源码分析

首先是找入口,入口类为MockClusterWrapper

public class MockClusterWrapper implements Cluster {

    private Cluster cluster;

    public MockClusterWrapper(Cluster cluster) {
        this.cluster = cluster;
    }

    @Override
    public <T> Invoker<T> join(Directory<T> directory) throws RpcException {
        return new MockClusterInvoker<T>(directory,
                this.cluster.join(directory));
    }

}

这个类是一个SPI自动包装类,包装(AOP)的mock逻辑会强制触发。

MockClusterWrapper的join方法会返回MockClusterInvoker,MockClusterInvoker会包装实际cluster生成的ClusterInvoker,以此来实现对mock逻辑的注入。

我们看下MockClusterInvoker#invoke方法增加了什么逻辑。

public Result invoke(Invocation invocation) throws RpcException {
        Result result = null;
        //获取directoryUrl,会包括客户端和Configuration节点url的合并url
        //从method.xxx获取参数,如果没有,直接从xxx参数获取
        String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim();
        if (value.length() == 0 || value.equalsIgnoreCase("false")) {
            //如果没有mock配置,不走mock逻辑
            result = this.invoker.invoke(invocation);
        } else if (value.startsWith("force")) {
            //force开头,强制进行mock //这个force只能通过override触发
            if (logger.isWarnEnabled()) {
                logger.info("force-mock: " + invocation.getMethodName() + " force-mock enabled , url : " + directory.getUrl());
            }
            //force:direct mock
            result = doMockInvoke(invocation, null);
        } else {
            //不是force开头,调用失败走mock逻辑
            try {
                result = this.invoker.invoke(invocation);
            } catch (RpcException e) {
                if (e.isBiz()) {
                    throw e;
                } else {
                    if (logger.isWarnEnabled()) {
                        logger.warn("fail-mock: " + invocation.getMethodName() + " fail-mock enabled , url : " + directory.getUrl(), e);
                    }
                    //不是force开头,会在调用失败后,走mock逻辑
                    result = doMockInvoke(invocation, e);
                }
            }
        }
        return result;
    }

MockClusterInvoker#invoke方法首先会通过mock配置是否为force开头或者是否为false来判断走强制降级,失败降级还是不走降级逻辑。

  • 强制Mock,直接调用mock逻辑,屏蔽对提供者调用
  • 失败Mock,会先进行集群调用,出现异常后,再走mock逻辑

mock逻辑入口为doMockInvoke方法,在该方法内封装了一个隐藏逻辑。之前我们都认为触发mock是通过dubbo的override机制,但是看了这个方法后发现,我们可以在zk的下增加一个mock协议的提供者,也是能够触发mock的,并且后者的优先级大于前者。

private Result doMockInvoke(Invocation invocation, RpcException e) {
        Result result = null;
        Invoker<T> minvoker;

        //如果有mock协议的provider 优先使用mock provider
        List<Invoker<T>> mockInvokers = selectMockInvoker(invocation);
        if (CollectionUtils.isEmpty(mockInvokers)) {
            //使用override后的url生成mockinvoker
            minvoker = (Invoker<T>) new MockInvoker(directory.getUrl());
        } else {
            //使用mock provider
            minvoker = mockInvokers.get(0);
        }
        try {
            result = minvoker.invoke(invocation);
        } catch (RpcException me) {
            if (me.isBiz()) {
                result = new RpcResult(me.getCause());
            } else {
                throw new RpcException(me.getCode(), getMockExceptionMessage(e, me), me.getCause());
            }
        } catch (Throwable me) {
            throw new RpcException(getMockExceptionMessage(e, me), me.getCause());
        }
        return result;
    }

不管是通过何种方式设置的mock逻辑,最终在doMockInvoke我们都会拿到一个MockInvoker,关于Mock表达式的解析,mock对象的返回等逻辑都封装在这个Invoker里。

MockInvoker的源码解析如下,和我上面列的mock表达式配置基本一致。

final public class MockInvoker<T> implements Invoker<T> {
    private final static ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
    private final static Map<String, Invoker<?>> mocks = new ConcurrentHashMap<String, Invoker<?>>();
    private final static Map<String, Throwable> throwables = new ConcurrentHashMap<String, Throwable>();

    private final URL url;

    public MockInvoker(URL url) {
        this.url = url;
    }

    public static Object parseMockValue(String mock) throws Exception {
        return parseMockValue(mock, null);
    }

    public static Object parseMockValue(String mock, Type[] returnTypes) throws Exception {
        Object value = null;
        if ("empty".equals(mock)) {
            //如果是empty value等于对应类型空实现 具体逻辑见getEmptyObject
            value = ReflectUtils.getEmptyObject(returnTypes != null && returnTypes.length > 0 ? (Class<?>) returnTypes[0] : null);
        } else if ("null".equals(mock)) {
            value = null;
        } else if ("true".equals(mock)) {
            value = true;
        } else if ("false".equals(mock)) {
            value = false;
        } else if (mock.length() >= 2 && (mock.startsWith("\"") && mock.endsWith("\"")
                || mock.startsWith("\'") && mock.endsWith("\'"))) {
            //去除前后引号
            value = mock.subSequence(1, mock.length() - 1);
        } else if (returnTypes != null && returnTypes.length > 0 && returnTypes[0] == String.class) {
            //返回类型为String类型
            value = mock;
        } else if (StringUtils.isNumeric(mock, false)) {
            //如果是数字
            value = JSON.parse(mock);
        } else if (mock.startsWith("{")) {
            //{开头 Json反序列化解析为Map
            value = JSON.parseObject(mock, Map.class);
        } else if (mock.startsWith("[")) {
            //[开头 Json反序列化解析为List
            value = JSON.parseObject(mock, List.class);
        } else {
            //其他 不做处理
            value = mock;
        }
        //如果returnTypes不为空 进一步做处理
        if (ArrayUtils.isNotEmpty(returnTypes)) {
            //returnTypes[0]为目标类型  returnTypes[1]为目标类型的泛型类型
            value = PojoUtils.realize(value, (Class<?>) returnTypes[0], returnTypes.length > 1 ? returnTypes[1] : null);
        }
        return value;
    }

    @Override
    public Result invoke(Invocation invocation) throws RpcException {
        //获取针对方法的mock配置
        String mock = getUrl().getParameter(invocation.getMethodName() + "." + Constants.MOCK_KEY);
        if (invocation instanceof RpcInvocation) {
            ((RpcInvocation) invocation).setInvoker(this);
        }
        //如果针对方法没有mock配置 取接口级别的mock配置
        if (StringUtils.isBlank(mock)) {
            mock = getUrl().getParameter(Constants.MOCK_KEY);
        }

        //如果没有mock配置抛出异常
        if (StringUtils.isBlank(mock)) {
            throw new RpcException(new IllegalAccessException("mock can not be null. url :" + url));
        }
        //标准化mock配置 方便下面匹配
        mock = normalizeMock(URL.decode(mock));
        //return 开头
        if (mock.startsWith(Constants.RETURN_PREFIX)) {
            //去除return
            mock = mock.substring(Constants.RETURN_PREFIX.length()).trim();
            try {
                //获取这个接口的返回类型
                Type[] returnTypes = RpcUtils.getReturnTypes(invocation);
                //mock配置反序列化为对应类型的value
                Object value = parseMockValue(mock, returnTypes);
                return new RpcResult(value);
            } catch (Exception ew) {
                throw new RpcException("mock return invoke error. method :" + invocation.getMethodName()
                        + ", mock:" + mock + ", url: " + url, ew);
            }
        } else if (mock.startsWith(Constants.THROW_PREFIX)) { //throw 开头
            //去除throw
            mock = mock.substring(Constants.THROW_PREFIX.length()).trim();
            //如果mock为空 ,降级为抛出RpcException
            if (StringUtils.isBlank(mock)) {
                throw new RpcException("mocked exception for service degradation.");
            } else { // user customized class
                //反序列化为用户自定义异常
                Throwable t = getThrowable(mock);
                //用RpcException包装自定义异常抛出
                //注意异常类型设置为业务异常
                throw new RpcException(RpcException.BIZ_EXCEPTION, t);
            }
        } else { //impl mock
            try {
                //default或者具体接口实现类,默认使用接口Mock这个类实现mock逻辑
                Invoker<T> invoker = getInvoker(mock);
                return invoker.invoke(invocation);
            } catch (Throwable t) {
                throw new RpcException("Failed to create mock implementation class " + mock, t);
            }
        }
    }

    /**
     * 生成mock异常对象
     * @param throwstr
     * @return
     */
    public static Throwable getThrowable(String throwstr) {
        //缓存逻辑
        Throwable throwable = throwables.get(throwstr);
        if (throwable != null) {
            return throwable;
        }

        try {
            Throwable t;
            //反射获取异常class对象
            Class<?> bizException = ReflectUtils.forName(throwstr);
            Constructor<?> constructor;
            //反射获取异常 参数为string的构造函数
            constructor = ReflectUtils.findConstructor(bizException, String.class);
            //创建异常对象
            t = (Throwable) constructor.newInstance(new Object[]{"mocked exception for service degradation."});
            //缓存大小限制1000
            if (throwables.size() < 1000) {
                throwables.put(throwstr, t);
            }
            return t;
        } catch (Exception e) {
            //如果反射出现异常,降级为直接返回RpcException
            throw new RpcException("mock throw error :" + throwstr + " argument error.", e);
        }
    }

    @SuppressWarnings("unchecked")
    private Invoker<T> getInvoker(String mockService) {
        //缓存
        Invoker<T> invoker = (Invoker<T>) mocks.get(mockService);
        if (invoker != null) {
            return invoker;
        }

        //反射得到接口类class对象
        Class<T> serviceType = (Class<T>) ReflectUtils.forName(url.getServiceInterface());
        //得到接口mock类对象
        T mockObject = (T) getMockObject(mockService, serviceType);
        //生成代理
        invoker = proxyFactory.getInvoker(mockObject, serviceType, url);
        //缓存大小1000
        if (mocks.size() < 10000) {
            mocks.put(mockService, invoker);
        }
        return invoker;
    }

    /**
     * 获取serviceType对应mock实现对象
     * @param mockService
     * @param serviceType
     * @return
     */
    @SuppressWarnings("unchecked")
    public static Object getMockObject(String mockService, Class serviceType) {
        //如果mock配置为default 对应mock实现类为 接口名+Mock
        //否则为指定了具体实现类
        if (ConfigUtils.isDefault(mockService)) {
            mockService = serviceType.getName() + "Mock";
        }

        //反射得到mock实现类class对象
        //注意这边如果找不到 会抛出运行时异常
        Class<?> mockClass = ReflectUtils.forName(mockService);
        //判断是否实现了接口
        if (!serviceType.isAssignableFrom(mockClass)) {
            throw new IllegalStateException("The mock class " + mockClass.getName() +
                    " not implement interface " + serviceType.getName());
        }

        try {
            //创建实例
            return mockClass.newInstance();
        } catch (InstantiationException e) {
            throw new IllegalStateException("No default constructor from mock class " + mockClass.getName(), e);
        } catch (IllegalAccessException e) {
            throw new IllegalStateException(e);
        }
    }


    /**
     * Normalize mock string:
     *
     * <ol>
     * <li>return => return null</li>
     * <li>fail => default</li>
     * <li>force => default</li>
     * <li>fail:throw/return foo => throw/return foo</li>
     * <li>force:throw/return foo => throw/return foo</li>
     * </ol>
     *
     * @param mock mock string
     * @return normalized mock string
     */
    public static String normalizeMock(String mock) {
        if (mock == null) {
            return mock;
        }

        mock = mock.trim();

        if (mock.length() == 0) {
            return mock;
        }

        if (Constants.RETURN_KEY.equalsIgnoreCase(mock)) {
            return Constants.RETURN_PREFIX + "null";
        }

        if (ConfigUtils.isDefault(mock) || "fail".equalsIgnoreCase(mock) || "force".equalsIgnoreCase(mock)) {
            return "default";
        }

        if (mock.startsWith(Constants.FAIL_PREFIX)) {
            mock = mock.substring(Constants.FAIL_PREFIX.length()).trim();
        }

        if (mock.startsWith(Constants.FORCE_PREFIX)) {
            mock = mock.substring(Constants.FORCE_PREFIX.length()).trim();
        }

        if (mock.startsWith(Constants.RETURN_PREFIX) || mock.startsWith(Constants.THROW_PREFIX)) {
            mock = mock.replace('`', '"');
        }

        return mock;
    }

    @Override
    public URL getUrl() {
        return this.url;
    }

    @Override
    public boolean isAvailable() {
        return true;
    }

    @Override
    public void destroy() {
        //do nothing
    }

    @Override
    public Class<T> getInterface() {
        //FIXME
        return null;
    }
}

总结

  • Dubbo中的mock是用来做降级功能,通过SPI包装类来强制植入mock逻辑
  • mock分为强制mock和失败mock
  • 推荐在客户端配置失败mock逻辑,通过服务治理设置强制mock
  • 服务治理一般通过override让客户端mock逻辑生效,实际上也支持配置mock协议的provider让mock生效,并且后者优先级更高
最后编辑于
?著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 214,100评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 91,308评论 3 388
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事?!?“怎么了?”我有些...
    开封第一讲书人阅读 159,718评论 0 349
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 57,275评论 1 287
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,376评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,454评论 1 292
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,464评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,248评论 0 269
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,686评论 1 306
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,974评论 2 328
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,150评论 1 342
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,817评论 4 337
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,484评论 3 322
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,140评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,374评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,012评论 2 365
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,041评论 2 351