首页 > 代码库 > EventBus3.0源码分析

EventBus3.0源码分析

简述:

    在项目中,我们大多数开发者可能都使用过EventBus,即使没有使用过但我可以确定Android开发者也听说过这个牛X的库,从诞生到目前EventBus已经更新到3.X版本,可见生命力极强呀。那么这篇博文就从EventBus3.0源码的角度分析一下其内部处理流程。

使用流程:

    注册:

EventBus.getDefault().register(obj)
    订阅(消息接收):

@Subscribe
public void receive(Object event){
}
    发布消息:

EventBus.getDefault().post(event)
    注销:

EventBus.getDefault().unregister(obj)

源码分析:

注册:

EventBus.getDefault().post(event)

   这段代码做了两件事情:① EventBus.getDefault() 创建EventBus对象;② register(this) 方法为this该类对象注册EventBus。 那这两个方法究竟在EventBus中究竟做了哪些工作呢?我们打开EventBus的源码看一下:

   1、EventBus.getDefault()
    源码如下:
public static EventBus getDefault() {
    if (defaultInstance == null) {
        synchronized (EventBus.class) {
            if (defaultInstance == null) {
                defaultInstance = new EventBus();
            }
        }
    }
    return defaultInstance;
}
    看到了吧,EventBus采用单例模式创建EventBus对象,接下来它在构造方法中又做了什么事情呢?
public EventBus() {
    this(DEFAULT_BUILDER);
}
    在构造方法中其调用了有参构造方法:EventBus(EventBusBuilder builder ),我们再跟进去看一看:

EventBus(EventBusBuilder builder) {
    subscriptionsByEventType = new HashMap<>();
    typesBySubscriber = new HashMap<>();
    stickyEvents = new ConcurrentHashMap<>();

    mainThreadPoster = new HandlerPoster(this, Looper.getMainLooper(), 10);
    backgroundPoster = new BackgroundPoster(this);
    asyncPoster = new AsyncPoster(this);

    indexCount = builder.subscriberInfoIndexes != null ? builder.subscriberInfoIndexes.size() : 0;

    //默认情况下参数为(null,false,false)
    subscriberMethodFinder = new SubscriberMethodFinder(builder.subscriberInfoIndexes,
            builder.strictMethodVerification, builder.ignoreGeneratedIndex);

    logSubscriberExceptions = builder.logSubscriberExceptions;
    logNoSubscriberMessages = builder.logNoSubscriberMessages;
    sendSubscriberExceptionEvent = builder.sendSubscriberExceptionEvent;
    sendNoSubscriberEvent = builder.sendNoSubscriberEvent;
    throwSubscriberException = builder.throwSubscriberException;
    eventInheritance = builder.eventInheritance;
    executorService = builder.executorService;
}

    首先,初始化了3个Map,这3个Map有什么用呢?我们再来一一详细说一下:
    ① Map<Class<?>, CopyOnWriteArrayList<Subscription>> subscriptionsByEventType:key:事件类型(如:String类型或者自定义的事件类型),value:该事件的订阅者的list集合。
    ② Map<Object, List<Class<?>>> typesBySubscriber:key:事件的订阅者(如XXXActivity),value:事件类型的运行时类的list集合。
    ③ Map<Class<?>, Object> stickyEvents:
    其次,初始化3个消息发送器如下:
    mainThreadPoster :该类继承自Handler,而且在EventBus中mainThreadPoster属于主线程Handler,这是因为mainThreadPoster就是为处理“订阅者在主线程而接收者在子线程 ”而设计的,所以子线程向主线程发送消息必须使用主线程的Handler。
    backgroundPoster:该类继承自Runnable,重写了run()方法。在run()方法中将子线程中的消息通过EventBus发送到主线程。所以这个消息发送器作用就是处理“订阅者在子线程接收者在主线程”这样的问题。
    asyncPoster:该类继承自Runnable,也重写了run()方法,不过就像名字一样“异步”,也就是说不管订阅者是不是在主线程,接收者都会另外开启一个线程接收消息。
    然后,一个重要的初始化对象为subscriberMethodFinder,这个对象利用反射的方法查找每一个接收消息者的方法(也即是添加了“@Subscribe ”注解的方法)。
    最后就是一些对EventBusBuilder的一些配置信息。 

2、注册register(Object subscriber )
    EventBus的初始化工作已经完毕,我们继续看一下EventBus是怎么进行注册的,在注册过程中又搞了哪些事情?
public void register(Object subscriber) {
    Class<?> subscriberClass = subscriber.getClass();
    List<SubscriberMethod> subscriberMethods = subscriberMethodFinder
            .findSubscriberMethods(subscriberClass);
    synchronized (this) {
        for (SubscriberMethod subscriberMethod : subscriberMethods) {
            subscribe(subscriber, subscriberMethod);
        }
    }
}
     在该方法中首先取得注册者的运行时类对象,拿到运行时类对象后通过注册者注册方法查找器SubscriberMethodFinder利用反射的方法找到注册者类中所有的接收信息的方法,也即是所有添加了注解“Subscribe ”的方法。最后进行通过方法subscribe(subscriber, subscriberMethod)为每一个接收消息的方法进行注册。流程大致就是这样的,首先 我们先看一下findSubscriberMethods这个方法:

 /**
     * 方法描述:获取该运行时类的所有@Subscribe注解的所有方法
     *
     * @param subscriberClass @Subscribe注解所属类的运行时类对象
     * @return 注册EventBus类中@Subscribe注解的所有方法的List集合
     */
    List<SubscriberMethod> findSubscriberMethods(Class<?> subscriberClass) {
        //先从缓存中查找是否存在订阅者的方法
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if (subscriberMethods != null) {
            return subscriberMethods;
        }

        if (ignoreGeneratedIndex) {
            //使用反射方法拿到订阅者中的订阅方法
            subscriberMethods = findUsingReflection(subscriberClass);
        } else {
            //使用apt处理器拿到订阅者中的订阅方法
            subscriberMethods = findUsingInfo(subscriberClass);
        }
        if (subscriberMethods.isEmpty()) {
            throw new EventBusException("Subscriber " + subscriberClass
                    + " and its super classes have no public methods with the @Subscribe annotation");
        } else {
            //将查到的方法放到缓存中,以便下次使用
            METHOD_CACHE.put(subscriberClass, subscriberMethods);
            return subscriberMethods;
        }
    }
    这个方法很重要,在初次使用EventBus3.0的时候也容易出错的一个点:就是在订阅事件的方法上没有添加@Subscribe 注解,所以会碰到下面这个异常:

Caused by: org.greenrobot.eventbus.EventBusException: Subscriber class XXX and its super classes have no public methods with the @Subscribe annotation
    说到这我们还是没有最终看到EventBus是怎么进行注册的,OK,回过头来我们继续看注册

private void subscribe(Object subscriber, SubscriberMethod subscriberMethod) {
    Class<?> eventType = subscriberMethod.eventType;
    Log.e(TAG, eventType.getSimpleName());
    //将订阅类subscriber转化为EventBus新的订阅类Subscription
    Subscription newSubscription = new Subscription(subscriber, subscriberMethod);
    CopyOnWriteArrayList<Subscription> subscriptions = subscriptionsByEventType.get(eventType);
    if (subscriptions == null) {
        subscriptions = new CopyOnWriteArrayList<>();
        subscriptionsByEventType.put(eventType, subscriptions);
    } else {
        if (subscriptions.contains(newSubscription)) {
            throw new EventBusException("Subscriber " + subscriber.getClass() + " already registered to event "
                    + eventType);
        }
    }

    /**
    * 方法描述:对CopyOnWriteArrayList中的Subscription根据优先级的高低重新进行排序
    */
    int size = subscriptions.size();
    for (int i = 0; i <= size; i++) {
        if (i == size || subscriberMethod.priority > subscriptions.get(i).subscriberMethod.priority) {
            subscriptions.add(i, newSubscription);
            break;
        }
    }
    /**
    * 方法描述:将开发者注册EventBus的类的运行时类添加到subscribedEvents中,并且把该运行时类添加到
    * typesBySubscriber中
    */
    List<Class<?>> subscribedEvents = typesBySubscriber.get(subscriber);
    Log.e(TAG, "typesBySubscriber的");
    if (subscribedEvents == null) {
        subscribedEvents = new ArrayList<>();
        typesBySubscriber.put(subscriber, subscribedEvents);
    }
    subscribedEvents.add(eventType);

    if (subscriberMethod.sticky) {
        if (eventInheritance) {
            // Existing sticky events of all subclasses of eventType have to be considered.
            // Note: Iterating over all events may be inefficient with lots of sticky events,
            // thus data structure should be changed to allow a more efficient lookup
            // (e.g. an additional map storing sub classes of super classes: Class -> List<Class>).
            Set<Map.Entry<Class<?>, Object>> entries = stickyEvents.entrySet();
            for (Map.Entry<Class<?>, Object> entry : entries) {
                Class<?> candidateEventType = entry.getKey();
                if (eventType.isAssignableFrom(candidateEventType)) {
                    Object stickyEvent = entry.getValue();
                    checkPostStickyEventToSubscription(newSubscription, stickyEvent);
                }
            }
        } else {
            Object stickyEvent = stickyEvents.get(eventType);
            checkPostStickyEventToSubscription(newSubscription, stickyEvent);
        }
    }
}
    上面的这段代码虽然很多,但主要做了几件事情:① 将注册的订阅者封装为新的Subscription类 ②将订阅者存储到Map集合subscriptionsByEventType当中 ③对消息事件接收者根据优先级进行重排序 ④添加粘性消息事件

发布消息:

    我们已经分析完了EventBus的注册过程,接下来我们再来分析一下EventBus的事件发送过程。

EventBus.getDefault().post(event);
    那么这段代码是如何实现消息的发送呢?继续源码看一下:

public void post(Object event) {
    PostingThreadState postingState = currentPostingThreadState.get();
    List<Object> eventQueue = postingState.eventQueue;
    eventQueue.add(event);//将该事件添加到事件队列当中
    //事件没有分发则开始分发
    if (!postingState.isPosting) {
        //判断消息接收者是否在主线程 
        postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();  
        postingState.isPosting = true;
        if (postingState.canceled) {
            throw new EventBusException("Internal error. Abort state was not reset");
        }
        try {
            //循环发送消息
            while (!eventQueue.isEmpty()) {
                postSingleEvent(eventQueue.remove(0), postingState);
            }
        } finally {
            postingState.isPosting = false;
            postingState.isMainThread = false;
        }
    }
}
    从上面的代码中可以得知,待发送的消息首先存储到一个消息list集合当中,然后再不断的循环发送消息。发送消息时利用的方法是postSingleEvent(Object event, PostingThreadState postingState ),OK,我们继续跟进:
private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
    Class<?> eventClass = event.getClass();
    boolean subscriptionFound = false;
    //默认情况下Event事件允许继承,即默认情况下eventInheritance==true
    if (eventInheritance) {
        //查找event事件及event子类事件
        List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
        int countTypes = eventTypes.size();
        for (int h = 0; h < countTypes; h++) {
            Class<?> clazz = eventTypes.get(h);
            subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
        }
    } else {
        subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
    }
    if (!subscriptionFound) {
      ...
    }
}
   在这个方法中并没有真正的看到消息的分发,而是查找了待分发事件消息及其子类或者是待分发消息接口及其子类的所有事件(默认情况下我们定义的消息事件是允许继承的。 我们在项目中起初可能考虑的不是很全面,再到后来不可预料的需求到来时我们可能会继续改事件的一种情况,看到这不得不说EventBus真心考虑周全呀)。然后调用postSingleEventForEventType(event, postingState, eventClass)方法查找该事件及其子类事件的订阅者,如果没有找到就发送空消息并打印日志。好吧,很失望,到现在依然没有看到对消息事件进行分发。那我们继续跟进:postSingleEventForEventType(event, postingState, eventClass);

private boolean postSingleEventForEventType(Object event, PostingThreadState postingState,
                                            Class<?> eventClass) {
    CopyOnWriteArrayList<Subscription> subscriptions;
    synchronized (this) {
        //从Map中取出所有订阅了eventClass事件的所有订阅者
        subscriptions = subscriptionsByEventType.get(eventClass);
    }
    //如果该事件的订阅者存在则向每一个订阅者发布消息事件
    if (subscriptions != null && !subscriptions.isEmpty()) {
        for (Subscription subscription : subscriptions) {
            postingState.event = event;
            postingState.subscription = subscription;
            boolean aborted = false;
            try {
                postToSubscription(subscription, event, postingState.isMainThread);
                aborted = postingState.canceled;
            } finally {
                postingState.event = null;
                postingState.subscription = null;
                postingState.canceled = false;
            }
            if (aborted) {
                break;
            }
        }
        return true;
    }
    return false;
}
    好吧,小眼一瞄仍然没有对消息进行分发,而是查找事件的所有订阅者然后对所有订阅者进行了一层封装,封装成PostingThreadState。那我们还是继续吧,我们跟进postToSubscription(subscription, event, postingState.isMainThread)这个方法:

private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
    switch (subscription.subscriberMethod.threadMode) {
        case POSTING:
            invokeSubscriber(subscription, event);
            break;
        case MAIN:
            if (isMainThread) {
                invokeSubscriber(subscription, event);
            } else {
                mainThreadPoster.enqueue(subscription, event);
            }
            break;
        case BACKGROUND:
            if (isMainThread) {
                backgroundPoster.enqueue(subscription, event);
            } else {
                invokeSubscriber(subscription, event);
            }
            break;
        case ASYNC:
            asyncPoster.enqueue(subscription, event);
            break;
        default:
            throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
    }
}

    看到这是不是有一种“复行数十步,豁然开朗”的感觉。是的,在这个方法中我们终于看到了对消息事件进行4中不同情况下的分发了。根据消息接收的threadMode分别进行了不同的处理:
    POSTING:EventBus默认情况下的threadMode类型,这里意思就是如果消息发布和消息接收在同一线程情况下就直接调用invokeSubscriber(subscription, event)对消息进行发送。这种情况下事件传递是同步完成的,事件传递完成时,所有的订阅者将已经被调用一次了。这个ThreadMode意味着最小的开销,因为它完全避免了线程的切换。
    MAIN:消息接收在主线程中进行(此种情况适合进行UI操作),如果消息发布也在主线程就直接调用invokeSubscriber(subscription, event)对消息进行发送(这种情况是POSTING的情况),如果消息发布不在主线程中进行,那么调用mainThreadPoster.enqueue(subscription, event)进行处理。他是怎么处理的呢?我们跟进去瞧瞧:
final class HandlerPoster extends Handler {

    private final PendingPostQueue queue;
    private final int maxMillisInsideHandleMessage;
    private final EventBus eventBus;
    private boolean handlerActive;

    //默认情况下EventBus创建HandlerPoster的Looper为MainLooper,最大maxMillisInsideHandleMessage==10ms
    HandlerPoster(EventBus eventBus, Looper looper, int maxMillisInsideHandleMessage) {
        super(looper);
        this.eventBus = eventBus;
        this.maxMillisInsideHandleMessage = maxMillisInsideHandleMessage;
        queue = new PendingPostQueue();
    }

    /**
    * 方法描述:将订阅者与消息实体之间的映射存到队列PendingPostQueue当中
    *
    * @param subscription 订阅者
    * @param event        消息事件
    */
    void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

    @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                //同步取出消息队列
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                //消息发送超时
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }
}
    为了说明问题,我们整个类贴出来:由于是主线程向子线程发送消息所以Looper采用的是主线程Looper,Handler也就是主线程Handler,其内部维护了一个PendingPost的对象池,这样做也是为了提高内存利用率,这也不是重点,我们直接看重点,在enqueue(Subscription subscription, Object event)方法中利用HandlerPoster 发送空消息,HandlerPoster也重写了handleMessage方法,在handleMessage方法中又调用eventBus.invokeSubscriber(pendingPost)进行消息发送,我们跟进去之后发现最终还是调用了invokeSubscriber(subscription, event)对消息进行发送。
     BACKGROUND:这种情况是消息接收在子线程(此种模式下适合在接收者方法中做IO等耗时操作)。那么如果消息发布也在某个子线程中进行的就直接调用invokeSubscriber(subscription, event)对消息进行发送,如果消息发布在主线程当中应该尽可能快的将消息发送出去以免造成主线程阻塞,所以这时候就交给backgroundPoster去处理。它是怎么处理的呢?我们进去看一看:
final class BackgroundPoster implements Runnable {

    ....
    private volatile boolean executorRunning;

    BackgroundPoster(EventBus eventBus) {
        this.eventBus = eventBus;
        queue = new PendingPostQueue();
    }

    public void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
                ...
                eventBus.getExecutorService().execute(this);
            }
        }
    }

    @Override
    public void run() {
        try {
             while (true) {
                    ...
             eventBus.invokeSubscriber(pendingPost);
            }
        } finally {
            executorRunning = false;
        }
    }
}
     backgroundPoster对Runnable进行了重写,而且和HandlerPoster一样也采用了对象池提高效率,当然重点是其开启了线程池处理消息的发送,这也是避免阻塞主线程的举措。当然其最终还是调用了invokeSubscriber()-----》invokeSubscriber(subscription, event)方法。
    ASYNC:这种情况是消息接收在子线程(如果消息发布在子线程中进行,那么该子线程既不同于消息发布的子线程,又不在主线程,而是接收消息是一个独立于主线程又不同于消息发布的子线程)。由于在这种模式下每一个新添加的任务都会在线程池中开辟一个新线程执行,所以并发量更高效。而且最终还是会调用invokeSubscriber(subscription, event)方法对消息进行分发。
    既然4种模式下均是调用了invokeSubscriber(subscription, event)方法,那我们最后再看一下这个方法:
void invokeSubscriber(Subscription subscription, Object event) {
    try {
        subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
    } catch (InvocationTargetException e) {
        handleSubscriberException(subscription, event, e.getCause());
    } catch (IllegalAccessException e) {
        throw new IllegalStateException("Unexpected exception", e);
    }
}
   看到了吧,这个方法中就是利用反射将消息发送给每一个消息的订阅者。到此我们就完整的看完了EventBus的工作流程及主要代码的分析过程。真心不容易呀,已经被累跪啦!

EventBus3.0源码分析