文档章节

EventBus源码学习笔记(二)

小灰灰Blog
 小灰灰Blog
发布于 2017/04/08 21:02
字数 2145
阅读 41
收藏 0

EventBus深入学习二

开始研究源码的设计思路,从Listener注册出发,EventBus 如何维护监听者信息,到Publisher发送消息,消息以怎样的渠道分发给所有的Listener, 顺序如何保证,传递性如何保证,出现异常如何处理,找不到监听者怎么处理等等

EventBus

这个类相当于一个中转站,Publisher 调用它的 post(Object) 来推送事件;然后将事件一次推送给注册的Listener

1. 注册关系的维护

在初始化s时, EventBus对象会维护一个 private final SubscriberRegistry subscribers = new SubscriberRegistry(this); 实例, 这个就是维护订阅关系的核心类

注册方法如下

 /**
   * Registers all subscriber methods on {@code object} to receive events.
   *
   * @param object  object whose subscriber methods should be registered.
   */
  public void register(Object object) {
    subscribers.register(object);
  }

接着我们看下这个类的具体实现

SubscriberRegistry.java

/**
   * All registered subscribers, indexed by event type.
   *
   * <p>The {@link CopyOnWriteArraySet} values make it easy and relatively lightweight to get an
   * immutable snapshot of all current subscribers to an event without any locking.
   */
  private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
      Maps.newConcurrentMap();
      

  /**
   * Registers all subscriber methods on the given listener object.
   */
  void register(Object listener) {
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers = MoreObjects.firstNonNull(
            subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

subscribers : - 对象初始化时创建 - 维护的是EventType -> Listener的映射关系,value为一个集合,说明一个事件可以推送给多个Listener - 监听者,可以有可以监听多个不同类型的事件

注册流程: - 根据注册的对象,将其中所有的回调方法都捞出来 - 将上步的结果塞入 subscribers 集合中; key为 Listener的类名

注册流程详解

注册目的就是发布消息后, EventBus 可以将这个Event传递” Listener(即订阅方)

为了实现上面的目的,如果要我们自己实现,会怎么做?

  • 将类中,所有包含@Subscribe 注解的方法捞出来
  • 方法的第一个参数就是 Event, 因为注册的目的是为了实现回调, 所以封装一个类,包含这个Listener对象的引用 + 要执行的方法

上面注册的实际实现和上面的步骤差不多

  1. 获取所有包含注解的方法

    • 实际的代码如下

      private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
      Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
      Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
      for (Class<?> supertype : supertypes) {
        for (Method method : supertype.getDeclaredMethods()) {
          if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
            // TODO(cgdecker): Should check for a generic parameter type and error out
            Class<?>[] parameterTypes = method.getParameterTypes();
            checkArgument(parameterTypes.length == 1,
                "Method %s has @Subscribe annotation but has %s parameters."
                    + "Subscriber methods must have exactly 1 parameter.",
                method, parameterTypes.length);
      
            MethodIdentifier ident = new MethodIdentifier(method);
            if (!identifiers.containsKey(ident)) {
              identifiers.put(ident, method);
            }
          }
        }
      }
      return ImmutableList.copyOf(identifiers.values());
      }
      
    • 看下上面的实现,非常有意思的是,不仅将改对象中的所有@Subscribe注解的方法捞出来,连父类中的也不放过;就是这个TypeToken.of(clazz).getTypes().rawTypes();

    • 从上面的限定,也可以看出,对于回调方法是有要求的: 有且只能有一个参数, checkArgument(parameterTypes.length == 1,xxx)

    • 过滤重载的回调方法(这点比较有意思,搞了个Map, key由方法名+方法参数确认(MethodIdentifier的equals方法重写了), 而不是直接用集合的contains方法, 请注意其中的区别)

    • method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic() 这个判断条件的后一个可以参考http://www.xue163.com/2122/1/21224778.html

  2. 将上面的方法转换为Map, 看这个 Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    • key为事件类型Event.class; value为一个包含 Listener, Method, EventBus 实例的对象 Subscriber
  3. 将上面的map塞入subscribers 集合

    • subscribers 集合包含的是所有的 (事件 -> 监听者回调集合)Event -> Set<Listener.Method>
    • 简单的迭代即可实现塞数据了
    • 根据 subscribers 的数据结构,其实可以看到,一个Listener对象,如果注册多次,最终的效果其实是一样的,这个监听者,并不会被调用多次; 如果一个Lisntener类,有多个对象,则注册后,每个对象的回调都会执行到;
      • 实现原因: Set 集合存储 Subscriber对象
      • Subscriber 的 hashcode & equals 方法没有重写

到此, 注册完毕;注销的方法和上面差不多,唯一的区别是最后一个是向 subscribers 塞数据,一个是从其中删数据而已

题外话

   如果我们想获取工程中所有包含某个注解的类可以怎么办?
   
   - 如果是用spring的话, 可以考虑  `ApplicationContext.getBeansWithAnnotation()`
   
   获取工程中,所有包含某个注解的方法,除了上面的主动注册,有什么其他的方法?

###2. 推送事件

发布方,调用EventBus.post(Object) 方法实现消息的推送

预测

正式开始之前,我们可以先预测一下,当发布方调用了这个方法之后,会执行那些action

  1. 根据事件类型,可以从注册关系表subscribers中获取出所有的监听者,以及对应的回调方法, 放在一个集合中
  2. 因为监听的先后顺序可能有要求,那么将上面的集合进行排序
  3. 循环遍历上面,依次执行

上面是正向的操作流程,接着一些异常情况和边界也需要考虑下

  1. 如果一个事件找不到订阅者如何处理
  2. 如果某个监听者执行完毕之后,希望其之后的监听者都不能接受这个事件(类似web应用中的拦截器,如果被拦截了,如果被拦截了,后面的拦截器也没必要继续执行)
  3. 某个监听者很遗憾的抛出了个异常,会不会整调链路都挂掉

深入解读

带着上面的臆测,来实际看下EventBus自己是怎么玩的

 /**
   * 将事件推送给所有的监听者,不管监听者是否正常处理,都是正确返回
   * Posts an event to all registered subscribers.  This method will return
   * successfully after the event has been posted to all subscribers, and
   * regardless of any exceptions thrown by subscribers.
   *
   * 如果一个事件没有监听者,且该事件不是 DeadEvent, 则转为 DeadEvent并重新推送
   * <p>If no subscribers have been subscribed for {@code event}'s class, and
   * {@code event} is not already a {@link DeadEvent}, it will be wrapped in a
   * DeadEvent and reposted.
   *
   * @param event  event to post.
   */
  public void post(Object event) {
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }

上面的解释比较清楚, 基本上核心的推送就是 dispatcher.dispatch(event, eventSubscribers);

实际的使用的是 PerThreadQueuedDispatcher 推送代码如下,逻辑比较清晰,将Event塞入队列, 然后将队列中的所有消息依次推送给所有的订阅者

  • 可以参考下面的设计,优雅的避免重复推送的问题
  • 考虑为什么要先写入队列,然后再依次推送队列中的事件
/**
* Per-thread queue of events to dispatch.
*/
private final ThreadLocal<Queue<Event>> queue =
   new ThreadLocal<Queue<Event>>() {
     @Override
     protected Queue<Event> initialValue() {
       return Queues.newArrayDeque();
     }
   };

/**
* Per-thread dispatch state, used to avoid reentrant event dispatching.
*/
private final ThreadLocal<Boolean> dispatching =
   new ThreadLocal<Boolean>() {
     @Override
     protected Boolean initialValue() {
       return false;
     }
   };

@Override
void dispatch(Object event, Iterator<Subscriber> subscribers) {
 checkNotNull(event);
 checkNotNull(subscribers);
 Queue<Event> queueForThread = queue.get();
 queueForThread.offer(new Event(event, subscribers));

 if (!dispatching.get()) {
   dispatching.set(true);
   try {
     Event nextEvent;
     while ((nextEvent = queueForThread.poll()) != null) {
       while (nextEvent.subscribers.hasNext()) {
         nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
       }
     }
   } finally {
     dispatching.remove();
     queue.remove();
   }
 }
}

最终真正执行推送Event的是这个方法 com.google.common.eventbus.Subscriber#dispatchEvent

/**
   * Dispatches {@code event} to this subscriber using the proper executor.
   */
  final void dispatchEvent(final Object event) {
    executor.execute(new Runnable() {
      @Override
      public void run() {
        try {
          invokeSubscriberMethod(event);
        } catch (InvocationTargetException e) {
          bus.handleSubscriberException(e.getCause(), context(event));
        }
      }
    });
  }
  
  /**
   * Invokes the subscriber method. This method can be overridden to make the invocation
   * synchronized.
   */
  @VisibleForTesting
  void invokeSubscriberMethod(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, checkNotNull(event));
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }

3. 图解

上面从源码的角度,对整个流程顺了一遍,下面的图对几个主要的类结构进行了抽取,并对上面的几个方法进行了简要的说明

图一, 将上面说明的几个类属性 + 方法进行了说明

输入图片说明

图二, 对逻辑进行列举

输入图片说明

输入图片说明

4. 新技能

1.根据class,获取所有超类集合 (EventBus的实际使用中,Event的超类集合都塞入了缓存,加快查询速度)

TypeToken.of(concreteClass).getTypes().rawTypes());

2.获取类中标有注解的方法

private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz, Class annotationClz) {
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    for (Class<?> supertype : supertypes) {
      for (Method method : supertype.getDeclaredMethods()) {
        if (method.isAnnotationPresent(annotationClz) && !method.isSynthetic()) {
          // TODO(cgdecker): Should check for a generic parameter type and error out
          Class<?>[] parameterTypes = method.getParameterTypes();
          
          MethodIdentifier ident = new MethodIdentifier(method);
          if (!identifiers.containsKey(ident)) {
            identifiers.put(ident, method);
          }
        }
      }
    }
    return ImmutableList.copyOf(identifiers.values());
  }
  
  private static final class MethodIdentifier {

    private final String name;
    private final List<Class<?>> parameterTypes;

    MethodIdentifier(Method method) {
      this.name = method.getName();
      this.parameterTypes = Arrays.asList(method.getParameterTypes());
    }

    @Override
    public int hashCode() {
      return Objects.hashCode(name, parameterTypes);
    }

    @Override
    public boolean equals(@Nullable Object o) {
      if (o instanceof MethodIdentifier) {
        MethodIdentifier ident = (MethodIdentifier) o;
        return name.equals(ident.name) && parameterTypes.equals(ident.parameterTypes);
      }
      return false;
    }
  }

© 著作权归作者所有

小灰灰Blog
粉丝 195
博文 204
码字总数 363562
作品 0
武汉
程序员
私信 提问
Guava库学习:学习Guava EventBus(一)EventBus

在软件开发过程中,对象信息的分享以及相互直接的协作是必须的,困难在于确保对象之间的沟通是有效完成的,而不是拥有成本高度耦合的组件。当对象对其他组件的责任有太多的细节时,它被认为是...

Realfighter
2014/12/29
0
0
为JFinal添加event消息事件

在之前使用spring mvc的时候,在复杂的下单和支付中有一部分功能使用的Spring事件驱动模型去完成!具体优点不啰嗦了,发现涛哥有篇文章讲得比较详细:[详解Spring事件驱动模型][1] 最初准备基...

如梦技术
2015/04/28
0
20
guava eventbus源码解析

说在前面 本文转自“天河聊技术”微信公众号 事件驱动模型设计是一种优雅的程序设计方式,实现有很多,原理都是发布与订阅,观察者设计模式实现,java自带的实现、spring ioc的事件驱动模型,...

天河2018
2018/07/07
0
0
Guava学习笔记:EventBus

  EventBus是Guava的事件处理机制,是设计模式中的观察者模式(生产/消费者编程模型)的优雅实现。对于事件监听和发布订阅模式,EventBus是一个非常优雅和简单解决方案,我们不用创建复杂的...

pior
2015/04/01
0
0
Android框架之路——EventBus的使用

一、简介 EventBus是由greenrobot 组织贡献的一个Android事件发布/订阅轻量级框架。EventBus是一个Android端优化的publish/subscribe消息总线,简化了应用程序内各组件间、组件与后台线程间的...

天王盖地虎626
02/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

《生活的难题》的读后感3600字范文

《生活的难题》的读后感3600字范文: 假如我们对丑的事物也能够像对待美的事物那般抱持一种开放、接纳的心态,拥有相同的感受力,那么我们便会发现它们都是充满意义的,而这种认识会使生活变...

原创小博客
12分钟前
1
0
Linux learn(四)

7. Linux磁盘与文件系统管理 文件系统通常会将数据放在不同的区块,权限与属性放置到inode中,至于实际数据则放到datab lock区块中,另外,还有一个超级区块(superblock)会记录整个文件系统...

lazy~
20分钟前
1
0
微信公众号开发(四)

微信公众号开发时常需要一个用户授权绑定的过程。关于微信公众号的用户绑定,一般有如下两种实现方式: (1)通过发送短信验证码的方式; (2)使用用户登录时,向后端传递openid的方式。 使...

织梦之魂
今天
3
0
设计模式-工厂模式

工厂模式 工厂模式(Factory Pattern)是 Java 中最常用的设计模式之一。这种类型的设计模式属于创建型模式,它提供了一种创建对象的最佳方式。 在工厂模式中,我们在创建对象时不会对客户端...

HOT_POT
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部