? EventBus是Android平台上一个发布/订阅事件总线,使用EventBus可以方便的在不同的组件中进行消息通信,避免不同组件之间的耦合。EventBus或者类似的事件总线基本上是各个项目中的标配。本文主要基于3.2.0版本介绍EventBus的实现方式。
EventBus的基本流程
? 如上图所示,EventBus的核心架构是通过post()方法把Event交给EventBus,由EventBus根据事件的类型,分发给Subscriber。Subscriber即订阅者,指的是通过EventBus的register()方法在EventBus中注册的对象,可以是Activity、Fragment也可以是其它对象。
? EventBus的流程比较简单,主要分析几个问题。
- EventBus的注册流程
- EventBus分发事件的流程
- EventBus怎么切换线程
EventBus如何存储订阅关系
? 在开始之前,先介绍一下EventBus存储订阅关系的几个Map,了解几个Map存储的数据和作用后,更容易理解EventBus的流程。
- subscriptionsByEventType:
private final Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType;
变量声明如上所示,是一个保存Event类型和对应的订阅对象和方法的Map,key对应的是Event的Class对象,value是对应的Subscription的列表。subscriptionsByEventType的作用是根据Event的类型,拿到对应的订阅者列表及其方法,然后通过反射的形式进行调用。简单来说,subscriptionsByEventType保存的是某个Event有哪些订阅者。
final class Subscription {
final Object subscriber;
final SubscriberMethod subscriberMethod;
...
}
public class SubscriberMethod {
final Method method;
final ThreadMode threadMode;
final Class<?> eventType;
final int priority;
final boolean sticky;
/** Used for efficient comparison */
String methodString;
...
}
Subscription是表示订阅的实体类,从上面两个类的代码可以看出,Subscription用来保存订阅者以及对应的方法,包括方法的对象、线程、事件类型、优先级等等属性。
- typesBySubscriber:
private final Map<Object, List<Class<?>>> typesBySubscriber;
typesBySubscriber的key是Subscriber,value对应的是订阅的Event的列表。typesBySubscriber的作用是在注册成订阅者或者取消注册的时候,可以根据Subscriber拿到对应的Event列表,然后在subscriptionsByEventType中中方便的移除相应的订阅。与subscriptionsByEventType相反,typesBySubscriber保存的是某个订阅者,订阅了哪些Event。
EventBus的注册流程
? EventBus注册的方式很简单,通过register(this)和unregister(this)两个方法进行注册和反注册,通过注解的方式标识具体的方法及处理的线程等。
public void register(Object subscriber) {
Class<?> subscriberClass = subscriber.getClass();
List<SubscriberMethod> subscriberMethods = subscriberMethodFinder.findSubscriberMethods(subscriberClass);
synchronized (this) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
subscribe(subscriber, subscriberMethod);
}
}
}
register()方法比较好理解,就是找到subscriber的订阅的方法,然后挨个进行注册。问题就在于如何找到订阅方法。
遍历订阅者的方法
List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if (subscriberMethods != null) {
return subscriberMethods;
}
if (ignoreGeneratedIndex) {
subscriberMethods = findUsingReflection(subscriberClass);
} else {
subscriberMethods = findUsingInfo(subscriberClass);
}
if (subscriberMethods.isEmpty()) {
throw new EventBusException("Subscriber " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
} else {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
return subscriberMethods;
}
}
在SubscriberMethodFinder中维护了缓存METHOD_CACHE,保存订阅者对应的订阅方法,提高再次订阅的效率。然后查找订阅者的方法。EventBus3.0中新增了索引机制,如果在gradle中配置了索引,则会在编译的时候就遍历订阅者的方法,降低EventBus注册占用的时间,提升EventBus的性能。
private List<SubscriberMethod> findUsingInfo(Class<?> subscriberClass) {
FindState findState = prepareFindState();
findState.initForSubscriber(subscriberClass);
while (findState.clazz != null) {
findState.subscriberInfo = getSubscriberInfo(findState);
if (findState.subscriberInfo != null) {
SubscriberMethod[] array = findState.subscriberInfo.getSubscriberMethods();
for (SubscriberMethod subscriberMethod : array) {
if (findState.checkAdd(subscriberMethod.method, subscriberMethod.eventType)) {
findState.subscriberMethods.add(subscriberMethod);
}
}
} else {
findUsingReflectionInSingleClass(findState);
}
findState.moveToSuperclass();
}
return getMethodsAndRelease(findState);
}
通过内部类FindState来遍历方法,getSubscriberInfo(findState)方法通过SubscriberInfoIndex获取订阅者信息,如果配置了索引,就会获取到subscriberInfo,调用subscriberInfo.getSubscriberMethods()可以获取到订阅的方法数组。如果没有配置索引功能则返回null,会通过反射的方式去遍历方法。最后从findState获取订阅方法的List返回,并重置findState。
private void findUsingReflectionInSingleClass(FindState findState) {
Method[] methods;
try {
// This is faster than getMethods, especially when subscribers are fat classes like Activities
methods = findState.clazz.getDeclaredMethods();
} catch (Throwable th) {
...
}
for (Method method : methods) {
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe subscribeAnnotation = method.getAnnotation(Subscribe.class);
if (subscribeAnnotation != null) {
Class<?> eventType = parameterTypes[0];
if (findState.checkAdd(method, eventType)) {
ThreadMode threadMode = subscribeAnnotation.threadMode();
findState.subscriberMethods.add(new SubscriberMethod(method, eventType, threadMode,
subscribeAnnotation.priority(), subscribeAnnotation.sticky()));
}
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException("@Subscribe method " + methodName +
"must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
throw new EventBusException(methodName +
" is a illegal @Subscribe method: must be public, non-static, and non-abstract");
}
}
}
这里通过getDeclaredMethods拿到类的方法,根据方法的类型、参数、注解依次进行筛选,找到符合条件的方法,将方法的相关信息添加到findState中。
遍历完订阅方法之后,对订阅方法进行注册。
注册订阅方法
private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
Class<?> eventType = subscriberMethod.eventType;
Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
if (subscriptions == null) {
subscriptions = new CopyOnWriteArrayList<>();
subscriptionsByEventType.put(eventType, subscriptions);
} else {
if (subscriptions.contains(newSubscription)) {
throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
+ eventType);
}
}
int size = subscriptions.size();
for (int i = 0; i <= size; i++) {
if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
subscriptions.add(i, newSubscription);
break;
}
}
List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
if (subscribedEvents == null) {
subscribedEvents = new ArrayList<>();
typesBySubscriber.put(subscriber, subscribedEvents);
}
subscribedEvents.add(eventType);
if (subscriberMethod.sticky) {
if (eventInheritance) {
// Existing sticky events of all subclasses of eventType have to be considered.
// Note: Iterating over all events may be inefficient with lots of sticky events,
// thus data structure should be changed to allow a more efficient lookup
// (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
for (Map.Entry<Class<?>, Object> entry : entries) {
Class<?> candidateEventType = entry.getKey();
if (eventType.isAssignableFrom(candidateEventType)) {
Object stickyEvent = entry.getValue();
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
} else {
Object stickyEvent = stickyEvents.get(eventType);
checkPostStickyEventToSubscription(newSubscription, stickyEvent);
}
}
}
订阅的方法中根据subscriber和subscriberMethod构造了一个Subscription对象,然后根据优先级保存到subscriptionsByEventType中,在发送事件的时候取出依次分发。把subscriber和事件类型添加到typesBySubscriber中,方便取消注册。如果是方法设置粘性事件属性的话,就立刻分发现存的对应的粘性事件。
分发事件的流程
public void post(Object event) {
PostingThreadState postingState = currentPostingThreadState.get();
List<Object> eventQueue = postingState.eventQueue;
eventQueue.add(event);
if (!postingState.isPosting) {
postingState.isMainThread = isMainThread();
postingState.isPosting = true;
if (postingState.canceled) {
throw new EventBusException("Internal error. Abort state was not reset");
}
try {
while (!eventQueue.isEmpty()) {
postSingleEvent(eventQueue.remove(0), postingState);
}
} finally {
postingState.isPosting = false;
postingState.isMainThread = false;
}
}
}
把Event放进队列中,然后调用postSingleEvent()方法逐个分发。
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
Class<?> eventClass = event.getClass();
boolean subscriptionFound = false;
if (eventInheritance) {
List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
int countTypes = eventTypes.size();
for (int h = 0; h < countTypes; h++) {
Class<?> clazz = eventTypes.get(h);
subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
}
} else {
subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
}
...
}
调用lookupAllEventTypes()方法遍历Event的所有接口和父类,然后再调用postSingleEventForEventType()分发。通过subscriptionsByEventType拿到之前保存的事件的订阅关系列表,然后再调用postToSubscription()方法分发。
private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
switch (subscription.subscriberMethod.threadMode) {
case POSTING:
invokeSubscriber(subscription, event);
break;
case MAIN:
if (isMainThread) {
invokeSubscriber(subscription, event);
} else {
mainThreadPoster.enqueue(subscription, event);
}
break;
case MAIN_ORDERED:
if (mainThreadPoster != null) {
mainThreadPoster.enqueue(subscription, event);
} else {
// temporary: technically not correct as poster not decoupled from subscriber
invokeSubscriber(subscription, event);
}
break;
case BACKGROUND:
if (isMainThread) {
backgroundPoster.enqueue(subscription, event);
} else {
invokeSubscriber(subscription, event);
}
break;
case ASYNC:
asyncPoster.enqueue(subscription, event);
break;
default:
throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
}
}
根据ThreadMode属性进行线程的切换,比如常用的MAIN模式,如果post的线程是主线程,就直接通过反射的方式调用订阅者的订阅方法。如果是在子线程中调用的post方法,就把这个订阅事件加入到mainThreadPoster的队列中等待处理,mainThreadPoster通过Handler的方式进行主线程切换。
EventBus怎么切换线程
- 切换到主线程
public class HandlerPoster extends Handler implements Poster {
private final PendingPostQueue queue;
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
synchronized (this) {
queue.enqueue(pendingPost);
if (!handlerActive) {
handlerActive = true;
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
}
}
}
@Override
public void handleMessage(Message msg) {
boolean rescheduled = false;
try {
...
while (true) {
PendingPost pendingPost = queue.poll();
if (pendingPost == null) {
synchronized (this) {
// Check again, this time in synchronized
pendingPost = queue.poll();
if (pendingPost == null) {
handlerActive = false;
return;
}
}
}
eventBus.invokeSubscriber(pendingPost);
long timeInMethod = SystemClock.uptimeMillis() - started;
if (timeInMethod >= maxMillisInsideHandleMessage) {
if (!sendMessage(obtainMessage())) {
throw new EventBusException("Could not send handler message");
}
rescheduled = true;
return;
}
}
切换到主线程由HandlerPoster类实现,在enqueue()方法之后,调用sendMessage触发handleMessage方法,在handleMessage中反射调用订阅方法。为了避免主线程占用时间过长,超过10ms会重新通过sendMessage方法重新安排剩下的反射调用。
- 切换到子线程
class AsyncPoster implements Runnable, Poster {
private final PendingPostQueue queue;
private final EventBus eventBus;
AsyncPoster(EventBus eventBus) {
this.eventBus = eventBus;
queue = new PendingPostQueue();
}
public void enqueue(Subscription subscription, Object event) {
PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
queue.enqueue(pendingPost);
eventBus.getExecutorService().execute(this);
}
@Override
public void run() {
PendingPost pendingPost = queue.poll();
if(pendingPost == null) {
throw new IllegalStateException("No pending post available");
}
eventBus.invokeSubscriber(pendingPost);
}
}
切换到子线程通过AsyncPoster和BackGroudPoster实现,原理是一样的,继承Runnable类,通过线程池执行。在run方法中反射调用订阅方法。