前言
在一次正常的RPC调用发起之前,做为服务提供方必须先保证自己的服务已经启动,并且监听某个端口来接收Consumer发来的请求,Dubbo中称这个暴露服务的动作为export。Dubbo支持以多种协议对外提供服务,默认采用Dubbo协议。后续对于Dubbo服务提供方的逻辑解析,将分成服务暴露和接收并响应请求两个部分。下面首先来看下服务暴露的过程。
服务端接口初始化
还是从官方的Demo开始,看下服务提供端的启动过程,这里还是使用最常用的Springboot加Dubbo的方式:
public class Application {
public static void main(String[] args) throws Exception {
AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(ProviderConfiguration.class);
context.start();
System.in.read();
}
@Configuration
@EnableDubbo(scanBasePackages = "org.apache.dubbo.demo.provider")
@PropertySource("classpath:/spring/dubbo-provider.properties")
static class ProviderConfiguration {
@Bean
public RegistryConfig registryConfig() {
RegistryConfig registryConfig = new RegistryConfig();
registryConfig.setAddress("zookeeper://127.0.0.1:2181");
return registryConfig;
}
}
}
@Service
public class DemoServiceImpl implements DemoService {
private static final Logger logger = LoggerFactory.getLogger(DemoServiceImpl.class);
@Override
public String sayHello(String name) {
logger.info("Hello " + name + ", request from consumer: " + RpcContext.getContext().getRemoteAddress());
return "Hello " + name + ", response from provider: " + RpcContext.getContext().getLocalAddress();
}
@Override
public CompletableFuture<String> sayHelloAsync(String name) {
return null;
}
}
要暴露一个Dubbo服务,至少需要做两件事,首先需要一个接口的实现类,就是上面代码中的DemoServiceImpl
。这个类和我们Spring中普通的Bean实现类没有任何区别,但是要注意的是@Service注解不是org.springframework.stereotype.Service
,而是org.apache.dubbo.config.annotation.Service
。就是说,如果要把一个Spring Bean暴露成远程服务,实现类只需要更换一个注解。
当然仅仅加个注解肯定是不够的,必须要在Spring启动时加载Dubbo相关配置。这样,在Dubbo启动的时候就会扫描到这个加注解的类。这个就是@EnableDubbo注解的作用。关于这个注解在之前消费端初始化的时候已经详细讲过了,这里不再重复,直接进到处理@Service注解的地方。
@Service注解处理
对Dubbo @Service注解的处理逻辑在DubboComponentScanRegistrar
类里面的registerServiceAnnotationBeanPostProcessor()
方法:
private void registerServiceAnnotationBeanPostProcessor(Set<String> packagesToScan, BeanDefinitionRegistry registry) {
BeanDefinitionBuilder builder = rootBeanDefinition(ServiceAnnotationBeanPostProcessor.class);
builder.addConstructorArgValue(packagesToScan);
builder.setRole(BeanDefinition.ROLE_INFRASTRUCTURE);
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
BeanDefinitionReaderUtils.registerWithGeneratedName(beanDefinition, registry);
}
这里的逻辑相对简单,只是初始化了一个ServiceAnnotationBeanPostProcessor
并注册成Spring Bean,将需要扫描的路径做为构造参数传给它。这是一个BeanDefinitionRegistryPostProcessor
,Spring在获取所有Bean定义之后会回调它的postProcessBeanDefinitionRegistry()
方法。方法实现如下:
@Override
public void postProcessBeanDefinitionRegistry(BeanDefinitionRegistry registry) throws BeansException {
//@since 2.7.5 注册Dubbo生命周期Listener
registerBeans(registry, DubboBootstrapApplicationListener.class);
// 扫描路径下@Service注解的类
Set<String> resolvedPackagesToScan = resolvePackagesToScan(packagesToScan);
// 找到这些类并注册Bean
if (!CollectionUtils.isEmpty(resolvedPackagesToScan)) {
registerServiceBeans(resolvedPackagesToScan, registry);
} else {
if (logger.isWarnEnabled()) {
logger.warn("packagesToScan is empty , ServiceBean registry will be ignored!");
}
}
}
在上面的回调方法里,通过扫描指定的路径找到所有@Service注解的类,这个是通过Spring提供的ClassPathBeanDefinitionScanner
类来实现的,最终会将扫描到的类注册成Bean,然后返回扫描到的所有BeanDefinition。对于上面的Demo,这一步首先会把DemoServiceImpl
注册成Spring Bean,然后调用registerServiceBean()
方法做Dubbo相关的后续处理:
private void registerServiceBean(BeanDefinitionHolder beanDefinitionHolder, BeanDefinitionRegistry registry,
DubboClassPathBeanDefinitionScanner scanner) {
//实现类,即DemoServiceImpl.class
Class<?> beanClass = resolveClass(beanDefinitionHolder);
//获取@Service注解的详细定义
Annotation service = findServiceAnnotation(beanClass);
//@Service注解的属性
AnnotationAttributes serviceAnnotationAttributes = getAnnotationAttributes(service, false, false);
//获取该类要暴露的接口,根据@Service的interfaceClass/interfaceName属性,如果都没配置,则获取DemoServiceImpl类实现的第一个接口
Class<?> interfaceClass = resolveServiceInterfaceClass(serviceAnnotationAttributes, beanClass);
String annotatedServiceBeanName = beanDefinitionHolder.getBeanName();
//构造BeanDefinition
AbstractBeanDefinition serviceBeanDefinition =
buildServiceBeanDefinition(service, serviceAnnotationAttributes, interfaceClass, annotatedServiceBeanName);
// ServiceBean Bean name
String beanName = generateServiceBeanName(serviceAnnotationAttributes, interfaceClass);
if (scanner.checkCandidate(beanName, serviceBeanDefinition)) { // check duplicated candidate bean
//注册
registry.registerBeanDefinition(beanName, serviceBeanDefinition);
} else {
//log warning
}
}
上面的逻辑中,找到@Service注解的类后会看它配置的接口,如果没有配置,则默认使用实现的第一个接口做为它要暴露的服务接口。Bean的构建的核心逻辑在buildServiceBeanDefinition()
方法中。
private AbstractBeanDefinition buildServiceBeanDefinition(Annotation serviceAnnotation,
AnnotationAttributes serviceAnnotationAttributes,
Class<?> interfaceClass,
String annotatedServiceBeanName) {
//1、实际注册的是一个ServiceBean
BeanDefinitionBuilder builder = rootBeanDefinition(ServiceBean.class);
AbstractBeanDefinition beanDefinition = builder.getBeanDefinition();
MutablePropertyValues propertyValues = beanDefinition.getPropertyValues();
String[] ignoreAttributeNames = of("provider", "monitor", "application", "module", "registry", "protocol",
"interface", "interfaceName", "parameters");
//2、设置注解上配置的属性到ServiceBean的属性中
propertyValues.addPropertyValues(new AnnotationPropertyValuesAdapter(serviceAnnotation, environment, ignoreAttributeNames));
//3、ref属性设置为真实实现类的Bean
addPropertyReference(builder, "ref", annotatedServiceBeanName);
// Set interface
builder.addPropertyValue("interface", interfaceClass.getName());
builder.addPropertyValue("parameters", convertParameters(serviceAnnotationAttributes.getStringArray("parameters")));
// Add methods parameters
List<MethodConfig> methodConfigs = convertMethodConfigs(serviceAnnotationAttributes.get("methods"));
if (!methodConfigs.isEmpty()) {
builder.addPropertyValue("methods", methodConfigs);
}
// 设置其它属性
/**
* Add {@link org.apache.dubbo.config.ProviderConfig} Bean reference
*/
String providerConfigBeanName = serviceAnnotationAttributes.getString("provider");
if (StringUtils.hasText(providerConfigBeanName)) {
addPropertyReference(builder, "provider", providerConfigBeanName);
}
/**
* Add {@link org.apache.dubbo.config.MonitorConfig} Bean reference
*/
String monitorConfigBeanName = serviceAnnotationAttributes.getString("monitor");
if (StringUtils.hasText(monitorConfigBeanName)) {
addPropertyReference(builder, "monitor", monitorConfigBeanName);
}
/**
* Add {@link org.apache.dubbo.config.ApplicationConfig} Bean reference
*/
String applicationConfigBeanName = serviceAnnotationAttributes.getString("application");
if (StringUtils.hasText(applicationConfigBeanName)) {
addPropertyReference(builder, "application", applicationConfigBeanName);
}
/**
* Add {@link org.apache.dubbo.config.ModuleConfig} Bean reference
*/
String moduleConfigBeanName = serviceAnnotationAttributes.getString("module");
if (StringUtils.hasText(moduleConfigBeanName)) {
addPropertyReference(builder, "module", moduleConfigBeanName);
}
/**
* Add {@link org.apache.dubbo.config.RegistryConfig} Bean reference
*/
String[] registryConfigBeanNames = serviceAnnotationAttributes.getStringArray("registry");
List<RuntimeBeanReference> registryRuntimeBeanReferences = toRuntimeBeanReferences(registryConfigBeanNames);
if (!registryRuntimeBeanReferences.isEmpty()) {
builder.addPropertyValue("registries", registryRuntimeBeanReferences);
}
/**
* Add {@link org.apache.dubbo.config.ProtocolConfig} Bean reference
*/
String[] protocolConfigBeanNames = serviceAnnotationAttributes.getStringArray("protocol");
List<RuntimeBeanReference> protocolRuntimeBeanReferences = toRuntimeBeanReferences(protocolConfigBeanNames);
if (!protocolRuntimeBeanReferences.isEmpty()) {
builder.addPropertyValue("protocols", protocolRuntimeBeanReferences);
}
return builder.getBeanDefinition();
}
上面的逻辑中会定义一个类型为ServiceBean的Spring Bean,它的ref属性指向真实实现类的Bean,其它的逻辑就是设置其它配置,如各种配置类。ServiceBean的定义如下,大部分属性和方法都是从ServiceConfig中继承的。最重要的方法就是export(),会将自己暴露为一个远程服务。
public class ServiceBean<T> extends ServiceConfig<T> implements InitializingBean, DisposableBean,
ApplicationContextAware, BeanNameAware, ApplicationEventPublisherAware {
// ...
}
接口初始化总结
在Spring Boot应用初始化的时候,通过@EnableDubbo注解,Dubbo首先初始化了应用配置,这个跟Consumer端是一样的。然后扫描声明路径下的所有包含@Service注解的类,首先将这些类定义成Spring的Bean,然后为每个类对应注册一个类型为ServiceBean的Spring Bean。所以最终是ServiceBean暴露为远程服务,当调用过来时,会将调用转到ref属性指向的真实实现类。下面看下ServiceBean的exported()方法是怎么触发的。
服务暴露
在Spring初始化的时候,@EnableDubbo注解同时会注册一个DubboBootstrapApplicationListener
监听Spring上下文事件,当收到ContextRefreshedEvent
的时候就会调用DubboBootstrap.start()
方法来完成Dubbo应用的初始化和启动。在start()方法的实现中,会获取所有的ServiceBean并且调用它的export()方法。
public synchronized void export() {
if (!shouldExport()) {
return;
}
//判断DubboBootstrap有没有初始化,使用Spring的话,再Spring上下文初始化后已经初始化了
if (bootstrap == null) {
bootstrap = DubboBootstrap.getInstance();
bootstrap.init();
}
//检查配置参数
checkAndUpdateSubConfigs();
//初始化 serviceMetadata
serviceMetadata.setVersion(version);
serviceMetadata.setGroup(group);
serviceMetadata.setDefaultGroup(group);
serviceMetadata.setServiceType(getInterfaceClass());
serviceMetadata.setServiceInterfaceName(getInterface());
serviceMetadata.setTarget(getRef());
if (shouldDelay()) {
//如果是延迟export,加到定时调度器中
DELAY_EXPORT_EXECUTOR.schedule(this::doExport, getDelay(), TimeUnit.MILLISECONDS);
} else {
//export服务
doExport();
}
//发送ServiceConfigExportedEvent
exported();
}
Dubbo支持启动时和启动后延时一段时间再暴露服务,同时服务暴露成功后会触发一个Event。doExport()方法最终会调用doExportUrls()。
private void doExportUrls() {
//1、保存当前Service到ServiceRepository
ServiceRepository repository = ApplicationModel.getServiceRepository();
ServiceDescriptor serviceDescriptor = repository.registerService(getInterfaceClass());
repository.registerProvider(
getUniqueServiceName(),
ref,
serviceDescriptor,
this,
serviceMetadata
);
//2、获取注册中心地址
List<URL> registryURLs = ConfigValidationUtils.loadRegistries(this, true);
//3、针对配置的每个协议,暴露服务
for (ProtocolConfig protocolConfig : protocols) {
String pathKey = URL.buildKey(getContextPath(protocolConfig)
.map(p -> p + "/" + path)
.orElse(path), group, version);
// In case user specified path, register service one more time to map it to path.
repository.registerService(pathKey, interfaceClass);
serviceMetadata.setServiceKey(pathKey);
doExportUrlsFor1Protocol(protocolConfig, registryURLs);
}
}
1、ServiceRepository可以看作是一个本地的缓存,存储了所有provider和consumer的引用,方便dubbo其它??槭褂?。key是group+interfaceName+version
2、获取注册中心的地址,如果@Service没有配置具体的注册中心,则默认使用应用的注册中心,Demo中这里获取的地址格式类似于:registry://127.0.0.1:2181/org.apache.dubbo.registry.RegistryService?application=dubbo-demo-annotation-provider®istry=zookeeper
3、一个接口可以使用不同的协议暴露,如果没配置的话就使用应用全局的protocol的配置,即使用dubbo协议
private void doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs) {
String name = protocolConfig.getName();
if (StringUtils.isEmpty(name)) {
name = DUBBO;
}
//1.追加url参数
Map<String, String> map = new HashMap<String, String>();
map.put(SIDE_KEY, PROVIDER_SIDE);
ServiceConfig.appendRuntimeParameters(map);
AbstractConfig.appendParameters(map, getMetrics());
AbstractConfig.appendParameters(map, getApplication());
AbstractConfig.appendParameters(map, getModule());
// remove 'default.' prefix for configs from ProviderConfig
// appendParameters(map, provider, Constants.DEFAULT_KEY);
AbstractConfig.appendParameters(map, provider);
AbstractConfig.appendParameters(map, protocolConfig);
AbstractConfig.appendParameters(map, this);
MetadataReportConfig metadataReportConfig = getMetadataReportConfig();
if (metadataReportConfig != null && metadataReportConfig.isValid()) {
map.putIfAbsent(METADATA_KEY, REMOTE_METADATA_STORAGE_TYPE);
}
//2. 方法级别 得参数配置
if (CollectionUtils.isNotEmpty(getMethods())) {
for (MethodConfig method : getMethods()) {
AbstractConfig.appendParameters(map, method, method.getName());
String retryKey = method.getName() + ".retry";
if (map.containsKey(retryKey)) {
String retryValue = map.remove(retryKey);
if ("false".equals(retryValue)) {
map.put(method.getName() + ".retries", "0");
}
}
List<ArgumentConfig> arguments = method.getArguments();
if (CollectionUtils.isNotEmpty(arguments)) {
for (ArgumentConfig argument : arguments) {
// convert argument type
if (argument.getType() != null && argument.getType().length() > 0) {
Method[] methods = interfaceClass.getMethods();
// visit all methods
if (methods.length > 0) {
for (int i = 0; i < methods.length; i++) {
String methodName = methods[i].getName();
// target the method, and get its signature
if (methodName.equals(method.getName())) {
Class<?>[] argtypes = methods[i].getParameterTypes();
// one callback in the method
if (argument.getIndex() != -1) {
if (argtypes[argument.getIndex()].getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
} else {
// multiple callbacks in the method
for (int j = 0; j < argtypes.length; j++) {
Class<?> argclazz = argtypes[j];
if (argclazz.getName().equals(argument.getType())) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + j);
if (argument.getIndex() != -1 && argument.getIndex() != j) {
throw new IllegalArgumentException("Argument config error : the index attribute and type attribute not match :index :" + argument.getIndex() + ", type:" + argument.getType());
}
}
}
}
}
}
}
} else if (argument.getIndex() != -1) {
AbstractConfig.appendParameters(map, argument, method.getName() + "." + argument.getIndex());
} else {
throw new IllegalArgumentException("Argument config must set index or type attribute.eg: <dubbo:argument index='0' .../> or <dubbo:argument type=xxx .../>");
}
}
}
} // end of methods for
}
//3. 判断是否是GenericService
if (ProtocolUtils.isGeneric(generic)) {
map.put(GENERIC_KEY, generic);
map.put(METHODS_KEY, ANY_VALUE);
} else {
String revision = Version.getVersion(interfaceClass, version);
if (revision != null && revision.length() > 0) {
map.put(REVISION_KEY, revision);
}
String[] methods = Wrapper.getWrapper(interfaceClass).getMethodNames();
if (methods.length == 0) {
logger.warn("No method found in service interface " + interfaceClass.getName());
map.put(METHODS_KEY, ANY_VALUE);
} else {
map.put(METHODS_KEY, StringUtils.join(new HashSet<String>(Arrays.asList(methods)), ","));
}
}
// 4. 访问Token
if(ConfigUtils.isEmpty(token) && provider != null) {
token = provider.getToken();
}
if (!ConfigUtils.isEmpty(token)) {
if (ConfigUtils.isDefault(token)) {
map.put(TOKEN_KEY, UUID.randomUUID().toString());
} else {
map.put(TOKEN_KEY, token);
}
}
//init serviceMetadata attachments
serviceMetadata.getAttachments().putAll(map);
//5. 获取服务得host和port配置
String host = findConfigedHosts(protocolConfig, registryURLs, map);
Integer port = findConfigedPorts(protocolConfig, name, map);
URL url = new URL(name, host, port, getContextPath(protocolConfig).map(p -> p + "/" + path).orElse(path), map);
// 6.可支持自定义配置类
if (ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.hasExtension(url.getProtocol())) {
url = ExtensionLoader.getExtensionLoader(ConfiguratorFactory.class)
.getExtension(url.getProtocol()).getConfigurator(url).configure(url);
}
//7. 获取export的范围
String scope = url.getParameter(SCOPE_KEY);
// 8. 如果是'none'的话跳过
if (!SCOPE_NONE.equalsIgnoreCase(scope)) {
// 9. 如果scope != 'remote',则先在本地暴露服务
if (!SCOPE_REMOTE.equalsIgnoreCase(scope)) {
exportLocal(url);
}
// 10. 如果scope != 'local', 则暴露远程服务
if (!SCOPE_LOCAL.equalsIgnoreCase(scope)) {
if (CollectionUtils.isNotEmpty(registryURLs)) {
//11. 遍历注册中心
for (URL registryURL : registryURLs) {
//12. 如果设置的protocol是injvm,跳过
if (LOCAL_PROTOCOL.equalsIgnoreCase(url.getProtocol())) {
continue;
}
url = url.addParameterIfAbsent(DYNAMIC_KEY, registryURL.getParameter(DYNAMIC_KEY));
URL monitorUrl = ConfigValidationUtils.loadMonitor(this, registryURL);
if (monitorUrl != null) {
url = url.addParameterAndEncoded(MONITOR_KEY, monitorUrl.toFullString());
}
if (logger.isInfoEnabled()) {
if (url.getParameter(REGISTER_KEY, true)) {
logger.info("Register dubbo service " + interfaceClass.getName() + " url " + url + " to registry " + registryURL);
} else {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
}
// 13. 支持用户自定义proxy产生Invoker
String proxy = url.getParameter(PROXY_KEY);
if (StringUtils.isNotEmpty(proxy)) {
registryURL = registryURL.addParameter(PROXY_KEY, proxy);
}
//14. 通过Proxy获取Invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, registryURL.addParameterAndEncoded(EXPORT_KEY, url.toFullString()));
//15. 将Invoker封装一层,wrapperInvoker 提供getServiceConfig()接口
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
//16. 调用Protocol的export方法
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
} else {
if (logger.isInfoEnabled()) {
logger.info("Export dubbo service " + interfaceClass.getName() + " to url " + url);
}
//17. 没有注册中心的情况,直接使用url生成Invoker
Invoker<?> invoker = PROXY_FACTORY.getInvoker(ref, (Class) interfaceClass, url);
DelegateProviderMetaDataInvoker wrapperInvoker = new DelegateProviderMetaDataInvoker(invoker, this);
Exporter<?> exporter = PROTOCOL.export(wrapperInvoker);
exporters.add(exporter);
}
/**
* @since 2.7.0
* ServiceData Store
*/
WritableMetadataService metadataService = WritableMetadataService.getExtension(url.getParameter(METADATA_KEY, DEFAULT_METADATA_STORAGE_TYPE));
if (metadataService != null) {
metadataService.publishServiceDefinition(url);
}
}
}
this.urls.add(url);
}
前两步和Consumer差不多,都是解析配置参数,然后添加到url中
第3步,GenericService这个之前解释过,所有服务默认都实现了这个接口,这里处理的是如果服务只实现了GenericService的情况
第4步,生成访问Token
第6步,用户可自定义一个ConfiguratorFactory实现,通过它来设置参数,这样就不需要所有参数都通过注解或者配置文件来写死了
第7步,处理scope配置,none表示不暴露服务,local表示仅在jvm中暴露服务,无法远程访问,remote表示仅可远程访问,不能采用injvm的方式调用。
第9步,从上一步scope的处理可以看出,除非用户显式配置,否则一定会将服务暴露到本地,比如Demo中的服务url是dubbo://172.21.141.241:20880/org.apache.dubbo.demo.DemoService?xxxx
,则会暴露一个injvm://127.0.0.1/org.apache.dubbo.demo.DemoService?xxxx
。这是为了一个服务内部也通过远程调用的方式在访问自己暴露的接口的时候,Dubbo直接会将这次访问变成进程内调用,exportLocal的逻辑下一篇再讲
第11步,如果配置了注册中心,则会使用注册中心的地址生成一个Invoker,同样是调用的Proxy获取到Invoker,跟Consumer的区别就是这里的Invoker封装的是对本地方法的调用,而Consumer端的Invoker封装的是远程调用
第16步,根据Protocol调用具体export()方法,默认就是调用的DubboProtocol的方法
总结
服务提供方暴露服务比消费方引用远程接口逻辑稍微要复杂一些,所以分成2篇文章来讲解,这一篇主要讲Spring在初始化的时候如何发现Dubbo服务并暴露,下一篇会分解代理及Invoker的生成。