在启动Soul-bootstrap时可以看到下面这个日志输出
load plugin:[divide] [org.dromara.soul.plugin.divide.DividePlugin]
可以先阅读DividePlugin类:
阅读源码可知DividePlugin实现了AbstractSoulPlugin的 doExecute方法,而在AbstractSoulPlugin中doExecute被定义为抽象方法,在
execute()调用,这里用到了模板方法设计模式;
源码如下:
@Override
public Mono<Void> execute(final ServerWebExchange exchange, final SoulPluginChain chain) {
String pluginName = named();
//(1)这个插件是否缓存
final PluginData pluginData = BaseDataCache.getInstance().obtainPluginData(pluginName);
if (pluginData != null && pluginData.getEnabled()) {
//获取SelectorData
final Collection<SelectorData> selectors = BaseDataCache.getInstance().obtainSelectorData(pluginName);
if (CollectionUtils.isEmpty(selectors)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
//匹配选择器
final SelectorData selectorData = matchSelector(exchange, selectors);
if (Objects.isNull(selectorData)) {
return handleSelectorIsNull(pluginName, exchange, chain);
}
selectorLog(selectorData, pluginName);
final List<RuleData> rules = BaseDataCache.getInstance().obtainRuleData(selectorData.getId());
if (CollectionUtils.isEmpty(rules)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
RuleData rule;
if (selectorData.getType() == SelectorTypeEnum.FULL_FLOW.getCode()) {
//get last
rule = rules.get(rules.size() - 1);
} else {
rule = matchRule(exchange, rules);
}
if (Objects.isNull(rule)) {
return handleRuleIsNull(pluginName, exchange, chain);
}
ruleLog(rule, pluginName);
//模板方法,调用子类的实现
return doExecute(exchange, chain, selectorData, rule);
}
return chain.execute(exchange);
}
(1) 插件名称命名规则:在PluginEnum中定义的枚举
(2) chain.execute(exchange) 这边采用了责任链的设计模式;
@Override
public Mono<Void> execute(final ServerWebExchange exchange) {
return Mono.defer(() -> {
if (this.index < plugins.size()) {
SoulPlugin plugin = plugins.get(this.index++);
Boolean skip = plugin.skip(exchange);
if (skip) {
return this.execute(exchange);
}
return plugin.execute(exchange, this);
}
return Mono.empty();
});
}
}
接下来进入本篇的重点:doExecute(exchange, chain, selectorData, rule)
@Override
protected Mono<Void> doExecute(final ServerWebExchange exchange, final SoulPluginChain chain, final SelectorData selector, final RuleData rule) {
final SoulContext soulContext = exchange.getAttribute(Constants.CONTEXT);
assert soulContext != null;
final DivideRuleHandle ruleHandle = GsonUtils.getInstance().fromJson(rule.getHandle(), DivideRuleHandle.class);
final List<DivideUpstream> upstreamList = UpstreamCacheManager.getInstance().findUpstreamListBySelectorId(selector.getId());
if (CollectionUtils.isEmpty(upstreamList)) {
log.error("divide upstream configuration error: {}", rule.toString());
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
final String ip = Objects.requireNonNull(exchange.getRequest().getRemoteAddress()).getAddress().getHostAddress();
DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
if (Objects.isNull(divideUpstream)) {
log.error("divide has no upstream");
Object error = SoulResultWrap.error(SoulResultEnum.CANNOT_FIND_URL.getCode(), SoulResultEnum.CANNOT_FIND_URL.getMsg(), null);
return WebFluxResultUtils.result(exchange, error);
}
// set the http url
String domain = buildDomain(divideUpstream);
String realURL = buildRealURL(domain, soulContext, exchange);
exchange.getAttributes().put(Constants.HTTP_URL, realURL);
// set the http timeout
exchange.getAttributes().put(Constants.HTTP_TIME_OUT, ruleHandle.getTimeout());
exchange.getAttributes().put(Constants.HTTP_RETRY, ruleHandle.getRetry());
return chain.execute(exchange);
}
关键看这一步:DivideUpstream divideUpstream = LoadBalanceUtils.selector(upstreamList, ruleHandle.getLoadBalance(), ip);
可以看出是负载均衡相关的
具体实现
public static DivideUpstream selector(final List<DivideUpstream> upstreamList, final String algorithm, final String ip) {
LoadBalance loadBalance = ExtensionLoader.getExtensionLoader(LoadBalance.class).getJoin(algorithm);
return loadBalance.select(upstreamList, ip);
}
默认的实现是:random
ExtensionLoader.getExtensionLoader(LoadBalance.class)通过SPI机制,LOADERS以class为key,缓存 ExtensionLoader实例;
getJoin(algorithm);在cachedInstances中以algorithm的值为key,获取负载均衡器对象;
这个random在哪边写入?
页面可以设置;(修改负载均衡策略)
这个负载均衡器也是采用了模板方法模式
只要实现doSelect()方法;
下一步目标:
自定义修改负载均衡器
尝试接入dubbo