文档章节

Guava EventBus源码解析

S
 SaintTinyBoy
发布于 06/24 17:25
字数 1953
阅读 42
收藏 0

一、EventBus使用场景示例

Guava EventBus是事件发布/订阅框架,采用观察者模式,通过解耦发布者和订阅者简化事件(消息)的传递。这有点像简化版的MQ,除去了Broker,由EventBus托管了订阅&发布。EventBus能使用在异步场景中,例如数据库状态的更新、发送邮件、更新日志等。在我们的系统中,主要用于任务执行完毕之后数据库中对应数据状态的变更。示例代码如下:


public class TaskEventBus {

    /**
     * 使用同步方式,触发事件执行的线程即是调用线程,例如main()中调用post方法,事件的执行线程即Main
     */
    //private static EventBus eventBus = new EventBus();
    /**
     * 使用异步方式,事件执行交由指定线程池完成,例如executor即是事件的具体执行者,会有4个线程负责执行事件
     */
    private static ExecutorService executor = Executors.newFixedThreadPool(4);
    private static EventBus eventBus = new AsyncEventBus("taskEventBus",executor );

    /**
     * 注册监听器(订阅事件)
     * @param listener
     */
    public static void registerListener(Listener listener) {
        eventBus.register(listener);
    }

    /**
     * 触发事件,获取对应监听器执行某些操作
     * @param event
     */
    public static void notifyListener(Event event) {
        eventBus.post(event);
    }

    public static void close() {
        executor.shutdown();
    }
}

public class SubStoreTaskSuccessEvent extends SubStoreTaskEvent{

    public SubStoreTaskSuccessEvent(SubStoreTask task) {
        super(task);
    }
}

public class SubStoreTaskFailEvent extends SubStoreTaskEvent{

    public SubStoreTaskFailEvent(SubStoreTask task) {
        super(task);
    }
}

public class SubStoreTaskListener implements Listener{

    @Subscribe
    public void onSuccess(SubStoreTaskSuccessEvent obj) {
        obj.getEvent().doSuccess();
    }
    @Subscribe
    public void onFail(SubStoreTaskFailEvent obj) {
        obj.getEvent().doFail();
    }
}
@AllArgsConstructor
public class SubStoreTask {

    private String taskName;

    public void doFail() {
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+" "+SubStoreTask.class.getName() + taskName + " Fail "+System.currentTimeMillis());
    }
    public void doSuccess() {
        try {
            TimeUnit.MILLISECONDS.sleep(500);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName()+" "+SubStoreTask.class.getName() + taskName + " Success "+System.currentTimeMillis());
    }
}
public class TaskInvoker {
    public static void main(String[] args) {

        TaskEventBus.registerListener(new SubStoreTaskListener());
        SubStoreTask task = new SubStoreTask("A");
        TaskEventBus.notifyListener(new SubStoreTaskSuccessEvent(task));
        TaskEventBus.notifyListener(new SubStoreTaskSuccessEvent(task));
        TaskEventBus.notifyListener(new SubStoreTaskFailEvent(task));
        TaskEventBus.close();
    }
}

 

二、EventBus源码分析

2.1 整体结构&概念

EventBus是同步模式,异步模式使用AsyncEventBus。

EventBus主要有以下几个概念:

1.SubscriberRegistry:管理所有的事件订阅者

2.SubscriberExceptionHandler:处理事件时发生异常的处理器,可以由开发者自己实现

3.Executor:事件的执行线程,EventBus采用默认的线程池,AsyncEventBus需要开发者自己指定线程池

4.Dispatcher:订阅&事件 的派发器,处理某一个事件和对应的订阅者,指导订阅者该如何消费事件。具体实现有

ImmediateDispatcher:从名字来看这个事件分发器是立即发布当前的这个事件,实际情况也是如此,当用户调用dispatch(...)方法时,它不用任何队列缓存事件,而是立即向该事件的订阅者发布事件

LegacyAsyncDispatcher:从名字来看这个事件分发器应该是要废弃掉的(官方的说法是:but is otherwise not especially useful.),与PerThreadQueuedDispatcher相比,它是采用了一个全局队列来存储所有{到达的事件:订阅者}对,它不确保投放的顺序和事件到达的顺序是一致的。这也是作者诟病既然不能保证顺序,用队列就是多此一举。

PerThreadQueuedDispatcher::从名字可以看出来这个事件分发器是以线程为单位进行的,它为每个线程申请了一个队列,这个队列存放了所有投递到该线程的事件及其订阅者们,并且按照事件到达的顺序依次进行投放。

2.2 如何注册

使用 eventBus.register(listener); 一行代码就实现订阅者的注册。源码位于SubscriberRegistry.java。通过register实现订阅者的注册。这里使用了MultiMap & CopyOnWriteArraySet 数据结构,前者维护<key,List<value>>,后者在每次添加新的订阅者时都会复制一份,具体用法原理百度之。


void register(Object listener) {
   //根据订阅者找到对应的<EventType,订阅者集合>
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

   //新的订阅者添加到集合中
    for (Map.Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
      Class<?> eventType = entry.getKey();
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<Subscriber>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

 这里有个坑:@Subscribe所annotate的method的参数,不能支持泛型。因为在运行的时候,因为Type Erasure导致拿不到"真正"的parameterType,所以代码写成这样是会出问题的

public class SubStoreTaskListener implements Listener<SubStoreTaskEvent> {

   //解析时SubStoreTaskEvent会泛型参数,实际是Object类型,导致注册上的eventType是Object.class,不是SubStoreTaskEvent.class
    @Override
    @Subscribe
    public void onSuccess(SubStoreTaskEvent obj) {
        obj.getEvent().doSuccess();
    }

    @Override
    @Subscribe
    public void onFail(SubStoreTaskEvent obj) {
        obj.getEvent().doFail();
    }
}

 

//根据订阅者找到其实现了@SubScribe的方法,建立<事件类型,订阅者>的映射关系
private Multimap<Class<?>, Subscriber> findAllSubscribers(Object listener) {
    Multimap<Class<?>, Subscriber> methodsInListener = HashMultimap.create();
    Class<?> clazz = listener.getClass();
    for (Method method : getAnnotatedMethods(clazz)) {
      Class<?>[] parameterTypes = method.getParameterTypes();
      Class<?> eventType = parameterTypes[0];
      methodsInListener.put(eventType, Subscriber.create(bus, listener, method));
    }
    return methodsInListener;
  }

 private static ImmutableList<Method> getAnnotatedMethods(Class<?> clazz) {
    return subscriberMethodsCache.getUnchecked(clazz);
  }

 //这个执行结果会放入到subscriberMethodsCache本地缓存中,下次同样的Class不用再反射解析,直接从缓存中拿
 private static ImmutableList<Method> getAnnotatedMethodsNotCached(Class<?> clazz) {
    Set<? extends Class<?>> supertypes = TypeToken.of(clazz).getTypes().rawTypes();
    Map<MethodIdentifier, Method> identifiers = Maps.newHashMap();
    for (Class<?> supertype : supertypes) {
      for (Method method : supertype.getDeclaredMethods()) {
        //遍历订阅者的每个方法,检查方法上是否有@SubScribe注解
        if (method.isAnnotationPresent(Subscribe.class) && !method.isSynthetic()) {
          Class<?>[] parameterTypes = method.getParameterTypes();
          //注解方法只能有一个参数
          checkArgument(
              parameterTypes.length == 1,
              "Method %s has @Subscribe annotation but has %s parameters."
                  + "Subscriber methods must have exactly 1 parameter.",
              method,
              parameterTypes.length);

          MethodIdentifier ident = new MethodIdentifier(method);
          if (!identifiers.containsKey(ident)) {
            identifiers.put(ident, method);
          }
        }
      }
    }
    return ImmutableList.copyOf(identifiers.values());
  }

如何消费

使用eventBus.post(event);将事件发布,通知订阅者执行处理逻辑。

public void post(Object event) {
    //SubscriberRegistry根据事件取出对应的订阅者
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
      //交给对应的Dispatcher处理
      dispatcher.dispatch(event, eventSubscribers);
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }

因为Dispatcher有三种实现,在AsyncEventBus中使用LegacyAsyncDispatcher,在EventBus中使用PerThreadQueuedDispatcher(关键使用ThreadLocal<Queue> ThreadLocal<Boolean>)

在PerThreadQueuedDispatcher分发模型下,它会为每个线程申请一个事件队列,该线程所发布的事件都会存入此队列下,因此保证了线程内的事件发布顺序一致性,而且在分发过程中,按照广度优先原则:事件A的所有订阅者都处理完毕时,再发布下一个事件来进行发布。如下图所示:

[PerThreadQueuedDispatcher] 
/**
     * Per-thread queue of events to dispatch.
     */
    private final ThreadLocal<Queue<Event>> queue =
        new ThreadLocal<Queue<Event>>() {
          @Override
          protected Queue<Event> initialValue() {
            return Queues.newArrayDeque();
          }
        };

    /**
     * Per-thread dispatch state, used to avoid reentrant event dispatching.
     * 为了防止线程重复发布事件,即多次递归调用了post(...)方法
     *在第一次进入的时候,将dispatching变量设置true,下次就不会再重入进来。
     *该模型保证了同一线程下事件是按序发布的,而且当一个事件的订阅者都接收到消息时,才会发布下一个事件。
     */
    private final ThreadLocal<Boolean> dispatching =
        new ThreadLocal<Boolean>() {
          @Override
          protected Boolean initialValue() {
            return false;
          }
        };

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      checkNotNull(subscribers);
      Queue<Event> queueForThread = queue.get();
      queueForThread.offer(new Event(event, subscribers));

     //防止多次重入分发逻辑,线程第一次进入时将此变量设置为true,下次无法再次进入。
      if (!dispatching.get()) {
        dispatching.set(true);
        try {
          Event nextEvent;
          while ((nextEvent = queueForThread.poll()) != null) {
            while (nextEvent.subscribers.hasNext()) {
              nextEvent.subscribers.next().dispatchEvent(nextEvent.event);
            }
          }
        } finally {
          dispatching.remove();
          queue.remove();
        }
      }
    }

    private static final class Event {
      private final Object event;
      private final Iterator<Subscriber> subscribers;

LegacyAsyncDispatcher分发模型下,因为采用全局的ConcurrentLinkedQueue,多个线程会同时写入该队列,事件的发布顺序无法保障和事件的到达顺序一致,而且也无法保证事件是按照广度优先的策略发布的,即A事件的所有订阅者都收到消息时才发布下一个事件。 

[LegacyAsyncDispatcher] 
private final ConcurrentLinkedQueue<EventWithSubscriber> queue =
        Queues.newConcurrentLinkedQueue();

    @Override
    void dispatch(Object event, Iterator<Subscriber> subscribers) {
      checkNotNull(event);
      while (subscribers.hasNext()) {
        queue.add(new EventWithSubscriber(event, subscribers.next()));
      }

      EventWithSubscriber e;
      while ((e = queue.poll()) != null) {
        e.subscriber.dispatchEvent(e.event);
      }
    }

    private static final class EventWithSubscriber {
      private final Object event;
      private final Subscriber subscriber;

      private EventWithSubscriber(Object event, Subscriber subscriber) {
        this.event = event;
        this.subscriber = subscriber;
      }
    }
  }

 

 

 

© 著作权归作者所有

共有 人打赏支持
S
粉丝 0
博文 24
码字总数 27286
作品 0
成都
guava eventbus源码解析

说在前面 本文转自“天河聊技术”微信公众号 事件驱动模型设计是一种优雅的程序设计方式,实现有很多,原理都是发布与订阅,观察者设计模式实现,java自带的实现、spring ioc的事件驱动模型,...

天河2018
07/07
0
0
为JFinal添加event消息事件

在之前使用spring mvc的时候,在复杂的下单和支付中有一部分功能使用的Spring事件驱动模型去完成!具体优点不啰嗦了,发现涛哥有篇文章讲得比较详细:[详解Spring事件驱动模型][1] 最初准备基...

如梦技术
2015/04/28
0
20
Guava库学习:学习Guava EventBus(一)EventBus

在软件开发过程中,对象信息的分享以及相互直接的协作是必须的,困难在于确保对象之间的沟通是有效完成的,而不是拥有成本高度耦合的组件。当对象对其他组件的责任有太多的细节时,它被认为是...

Realfighter
2014/12/29
0
0
Android EventBus二三事

废话很多的前言 EventBus,也即事件总线。在[wiki][event_monitor]上有关于Event Monitor的一个说法: Event monitoring makes use of a logical bus to transport event occurrences from so......

苦辛味
2014/09/21
0
0
【死磕Sharding-jdbc】—–路由&执行

原文作者:阿飞Javaer 原文链接:https://www.jianshu.com/p/09efada2d086 继续以模块中的为基础,剖析分库分表简单查询SQL实现--,即如何执行简单的查询SQL,接下来的分析以执行SQL语句为例...

飞哥-Javaer
05/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

day96-20180923-英语流利阅读-待学习

英国王子也不看好人工智能,理由却和霍金不同 Daniel 2018-09-23 1.今日导读 2016 年 3 月 9 日至 15 日,世界围棋冠军李世石与谷歌研发的计算机围棋程序 AlphaGo 进行人机大战并以 1 比 4 ...

飞鱼说编程
18分钟前
0
0
今天在码云遇到一个很有意思的人 for Per.js

今天在码云遇到一个很有意思的人,他在我的Per.js项目下面评论了一句,大意为“你试试这句代码,看看速度到底是你快还是Vue快”【当然,这个评论被我手残不小心删掉了...】。 然后我就试了,...

Skyogo
22分钟前
21
0
Java -------- 首字母相关排序总结

Java 字符串数组首字母排序 字符串数组按首字母排序:(区分大小写) String[] strings = new String[]{"ba","aa","CC","Ba","DD","ee","dd"}; Arrays.sort(strings); for (int i ...

切切歆语
24分钟前
0
0
还在用 Git 的 -f 参数强推仓库,你这是在作死!

最近,美国一个程序员因为同事不写注释,代码不规范,最严重的是天天使用 git push -f 参数强行覆盖仓库,该程序员忍无可忍向四名同事开抢,其中一人情况危急!!! 不写注释、代码不规范是一...

红薯
38分钟前
312
0
NPM报错终极大法

所有的错误基本上都跟node的版本相关 直接删除系统中的node 重新安装 sudo rm -rf /usr/local/{bin/{node,npm},lib/node_modules/npm,lib/node,share/man/*/node.*} 重新安装 $ n lts$ npm...

lilugirl
42分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部