文档章节

guava eventbus源码解析

天河2018
 天河2018
发布于 07/07 14:30
字数 1104
阅读 92
收藏 3

说在前面

本文转自“天河聊技术”微信公众号

事件驱动模型设计是一种优雅的程序设计方式,实现有很多,原理都是发布与订阅,观察者设计模式实现,java自带的实现、spring ioc的事件驱动模型,还有guava的实现,今天介绍guava eventbus的源码实现,看过这篇文章你自己也可以实现实现一套了。

 

guava event源码解析

先上一个demo实现,了解车的原理之前先上去感受下

/**
 * 事件
 * weifeng.jiang 2018-06-11 19:06
 */
public class HelloEvent {

    private String message;

    public HelloEvent(String message) {
        this.message = message;
    }

    public String getMessage() {
        return message;
    }
}
/**
 * 订阅者
 * weifeng.jiang 2018-06-11 19:11
 */
public class EventListener {

    @Subscribe
    public void listen(HelloEvent helloEvent){
        System.out.println(helloEvent.getMessage());
    }
}
/**
 * 客户端
 * weifeng.jiang 2018-06-11 19:12
 */
public class Main {

    public static void main(String[] args) {
//        创建事件总线
        EventBus eventBus = new EventBus("test");
//        创建订阅者
        EventListener listener = new EventListener();
//       注册订阅者
        eventBus.register(listener);
//        发布事件
        eventBus.post(new HelloEvent("asdasd"));
        eventBus.post(new HelloEvent("asdasdasdas"));
        eventBus.post(new HelloEvent("asdasdasdasd"));
    }
}

实现原理架构图

怎么成为一个订阅者接受事件呢

接受事件的对象应有一个public方法,用@Subscribe注解标记这个方法,将接受事件对象传递给EventBus实例的register(Object)方法,参考图一和图三

 

怎么发布事件呢

只需要调用EventBus实例的post方法,参考图三

 

guava eventbus事件总线有两种,同步的实现EventBus,异步的实现AsyncEventBus,如果订阅者在接收事件后进行长时间的逻辑处理,比如和数据库交互,这时候就需要用异步事件了,如果是简单处理,同步实现就可以。

 

这里以EventBus事件总线同步实现为例进行源码解析。

 

成为订阅者的源码实现

和@Subscribe注解配合使用的还有一个@AllowConcurrentEvents注解,这个注解是可以允许事件并发的执行,看下创建订阅者对象的源码实现,如下

/** Creates a {@code Subscriber} for {@code method} on {@code listener}. 为监听器上的方法创建一个订阅服务器。*/
static Subscriber create(EventBus bus, Object listener, Method method) {
  return isDeclaredThreadSafe(method)
      ? new Subscriber(bus, listener, method)
      : new SynchronizedSubscriber(bus, listener, method);
}

可以允许并发事件,在这个类中

@VisibleForTesting
  static final class SynchronizedSubscriber extends Subscriber {

    private SynchronizedSubscriber(EventBus bus, Object target, Method method) {
      super(bus, target, method);
    }

    @Override
    void invokeSubscriberMethod(Object event) throws InvocationTargetException {
      synchronized (this) {
        super.invokeSubscriberMethod(event);
      }
    }
  }
}

执行事件的时候是同步实现。

 

事件总线订阅源码实现

com.google.common.eventbus.SubscriberRegistry#register

void register(Object listener) {
//    查找所有订阅者,维护了一个key是事件类型,value是定订阅这个事件类型的订阅者集合的一个map
    Multimap<Class<?>, Subscriber> listenerMethods = findAllSubscribers(listener);

    for (Entry<Class<?>, Collection<Subscriber>> entry : listenerMethods.asMap().entrySet()) {
//      获取事件类型
      Class<?> eventType = entry.getKey();
//      获取这个事件类型的订阅者集合
      Collection<Subscriber> eventMethodsInListener = entry.getValue();

//      从缓存中按事件类型查找订阅者集合
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);

      if (eventSubscribers == null) {
//        从缓存中取不到,更新缓存
        CopyOnWriteArraySet<Subscriber> newSet = new CopyOnWriteArraySet<>();
        eventSubscribers =
            MoreObjects.firstNonNull(subscribers.putIfAbsent(eventType, newSet), newSet);
      }

      eventSubscribers.addAll(eventMethodsInListener);
    }
  }

事件和订阅事件的订阅者集合是在com.google.common.eventbus.SubscriberRegistry这里维护的

private final ConcurrentMap<Class<?>, CopyOnWriteArraySet<Subscriber>> subscribers =
    Maps.newConcurrentMap();

到这里,订阅者已经准备好了,准备接受事件了。

 

发布事件源码实现

com.google.common.eventbus.EventBus#post

public void post(Object event) {
//    获取事件的订阅者集合
    Iterator<Subscriber> eventSubscribers = subscribers.getSubscribers(event);
    if (eventSubscribers.hasNext()) {
//      转发事件
      dispatcher.dispatch(event, eventSubscribers);
//      如果不是死亡事件,重新包装成死亡事件重新发布
    } else if (!(event instanceof DeadEvent)) {
      // the event had no subscribers and was not itself a DeadEvent
      post(new DeadEvent(this, event));
    }
  }
Iterator<Subscriber> getSubscribers(Object event) {
//    获取事件类型类的超类集合
    ImmutableSet<Class<?>> eventTypes = flattenHierarchy(event.getClass());

    List<Iterator<Subscriber>> subscriberIterators =
        Lists.newArrayListWithCapacity(eventTypes.size());

    for (Class<?> eventType : eventTypes) {
//      获取事件类型的订阅者集合
      CopyOnWriteArraySet<Subscriber> eventSubscribers = subscribers.get(eventType);
      if (eventSubscribers != null) {
        // eager no-copy snapshot
        subscriberIterators.add(eventSubscribers.iterator());
      }
    }

    return Iterators.concat(subscriberIterators.iterator());
  }

事件转发器有三种实现

第一种是立即转发,实时性比较高,其他两种都是队列实现。

执行订阅方法都是异步实现

final void dispatchEvent(final Object event) {
  executor.execute(
      new Runnable() {
        @Override
        public void run() {
          try {
            invokeSubscriberMethod(event);
          } catch (InvocationTargetException e) {
            bus.handleSubscriberException(e.getCause(), context(event));
          }
        }
      });
}

 

说到最后

本次源码解析到这里,仅供参考。

 

© 著作权归作者所有

共有 人打赏支持
天河2018
粉丝 28
博文 56
码字总数 72707
作品 0
郑州
私信 提问
为JFinal添加event消息事件

在之前使用spring mvc的时候,在复杂的下单和支付中有一部分功能使用的Spring事件驱动模型去完成!具体优点不啰嗦了,发现涛哥有篇文章讲得比较详细:[详解Spring事件驱动模型][1] 最初准备基...

如梦技术
2015/04/28
0
20
Guava库学习:学习Guava EventBus(一)EventBus

在软件开发过程中,对象信息的分享以及相互直接的协作是必须的,困难在于确保对象之间的沟通是有效完成的,而不是拥有成本高度耦合的组件。当对象对其他组件的责任有太多的细节时,它被认为是...

Realfighter
2014/12/29
0
0
Android EventBus二三事

废话很多的前言 EventBus,也即事件总线。在[wiki][event_monitor]上有关于Event Monitor的一个说法: Event monitoring makes use of a logical bus to transport event occurrences from so......

苦辛味
2014/09/21
0
0
【死磕Sharding-jdbc】—–路由&执行

原文作者:阿飞Javaer 原文链接:https://www.jianshu.com/p/09efada2d086 继续以模块中的为基础,剖析分库分表简单查询SQL实现--,即如何执行简单的查询SQL,接下来的分析以执行SQL语句为例...

飞哥-Javaer
05/03
0
0
走近Guava(六): 事件总线EventBus

EventBus: 创建EventBus实例: EventBus eventBus = new EventBus();//或者EventBus eventBus = new EventBus(TradeAccountEvent.class.getName());//带标识符,用于日志记录 订阅事件: 模拟......

ihaolin
2014/04/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

php获取客户端IP

php获取客户端IP 首先先阅读关于IP真实性安全的文章:如何正確的取得使用者 IP? 「任何從客戶端取得的資料都是不可信任的!」 HTTP_CLIENT_IP头是有的,但未成标准,不一定服务器都实现。 ...

DrChenXX
昨天
0
0
. The valid characters are defined in RFC 7230 and RFC 问题

通过这里的回答,我们可以知道: Tomcat在 7.0.73, 8.0.39, 8.5.7 版本后,添加了对于http头的验证。 具体来说,就是添加了些规则去限制HTTP头的规范性 参考这里 具体来说: org.apache.tom...

west_coast
昨天
1
0
刷leetcode第704题-二分查找

今天双十一买的算法书到货了,路上刷到有人说的这个题,借(chao)鉴(xi)一下别人的思路,这个是C++标准库里面的经典方法,思路精巧,优雅好品味 int search(int* nums, int numsSize, in...

锟斤拷烫烫烫
昨天
2
0
【分享实录】BANCOR算法详解及代码实现

1 活动基本信息 1)主题:【区块链技术工坊22期】BANCOR算法详解及代码实现 2)议题: BANCOR算法的特点和优劣势 BANCOR算法和举例 如何加入BANCOR.NETWORK交易所 如何开发自己的BANCOR去中心...

HiBlock
昨天
2
0
微信小程序(2)

开始看微信小程序的教程了。刚刚看完官方教程的视图层部分。这里摘录一些自己认为的部分关键点。 1.直接修改数值无法重新渲染,需要使用setData()方法; 2.列表渲染中:wx:key用于保持项目在...

MKjy
昨天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部