Guava EventBus实现事件发布订阅实例及简单源码解读

原创
2014/01/22 10:50
阅读数 3.2K

   发布订阅者模式是23种设计模式之外的一种很常用的设计模式,各种框架的listener都利用了这种模式,比如Quartz、Spring、Servlet。利用Google EventBus能使得发布订阅模式的实现更加简单,它能使事件发布者和事件订阅者完全解耦,事件发布者完全不需要知道有哪些事件订阅者,事件派发由EventBus帮我们完成。具体比较请参考: Google Code EventBus

   下面是简单的使用,导入Maven依赖:

<dependency>
	<groupId>org.apache.camel</groupId>
	<artifactId>camel-guava-eventbus</artifactId>
	<version>2.10.0</version>
</dependency>

   Guava EventBus本身依赖于Guava库中的Cache、Collections等组件,不细说。

   开始代码。先声明订阅事件:

public abstract class PurchaseEvent {
	
	String item;

	public PurchaseEvent(String item) {
		this.item = item;
	}
	
}

public class CreditPurchaseEvent extends PurchaseEvent {
	int amount;
	String cardNumber;

	public CreditPurchaseEvent(String item, int amount, String cardNumber) {
		super(item);
		this.amount = amount;
		this.cardNumber = cardNumber;
	}
	
}

public class CashPurchaseEvent extends PurchaseEvent {
	
	int amount;

	public CashPurchaseEvent(String item, int amount) {
		super(item);
		this.amount = amount;
	}
	
}

   然后是事件订阅者(Subscriber),Subscriber中包含的被@Subscribe注解的方法叫做EventHandler,一个Subscriber可以有一个或者多个EventHandler,每个handler方法都有且仅有一个参数:

import com.google.common.eventbus.Subscribe;

public class CashPurchaseSubscriber {
	
	@Subscribe
	public void handleCashPurchase(CashPurchaseEvent cpe) {
		System.out.println(getClass().getSimpleName() + " handle CashPurchaseEvent: " + cpe);
	}

}

import com.google.common.eventbus.Subscribe;

public class CreditPurchaseSubscriber {
	
	@Subscribe
	public void handle(CreditPurchaseEvent cpe) {
		System.out.println(getClass().getSimpleName() + " handle CreditPurchaseEvent: " + cpe);
	}

}

import com.google.common.eventbus.Subscribe;

public class PurchaseSubscriber {
	
	@Subscribe
	public void handlePurchase(PurchaseEvent pe) {
		System.out.println(getClass().getSimpleName() + " handle " + pe);
	}

}

import com.google.common.eventbus.Subscribe;

public class MultiHandlerSubscriber {
	
	@Subscribe
	public void handleCashPurchase(CashPurchaseEvent cpe) {
		System.out.println(getClass().getSimpleName() + " handle CashPurchaseEvent: " + cpe);
	}
	
	@Subscribe
	public void handle(CreditPurchaseEvent cpe) {
		System.out.println(getClass().getSimpleName() + " handle CreditPurchaseEvent: " + cpe);
	}
	
	@Subscribe
	public void handlePurchase(PurchaseEvent pe) {
		System.out.println(getClass().getSimpleName() + " handle generice PurchaseEvent: " + pe);
	}

}

   最后是Test类:声明EventBus并注册Subscriber,然后发布事件:

import com.google.common.eventbus.EventBus;

public class Test {
	
	public static void main(String[] args) {
		EventBus eventBus = new EventBus();
		
		CashPurchaseSubscriber cashPurchaseSubscriber = new CashPurchaseSubscriber();
		CreditPurchaseSubscriber creditPurchaseSubscriber = new CreditPurchaseSubscriber();
		MultiHandlerSubscriber multiHandlerSubscriber = new MultiHandlerSubscriber();
		
		eventBus.register(cashPurchaseSubscriber);
		eventBus.register(creditPurchaseSubscriber);
		eventBus.register(multiHandlerSubscriber);
		
		eventBus.post(new CashPurchaseEvent("bread", 3));
		eventBus.post(new CreditPurchaseEvent("book", 10, "10023A1223"));
	}

}

   输出如下:

MultiHandlerSubscriber handle generice PurchaseEvent: org.jxq.guava_eventbus_demo.eventbus.event.CashPurchaseEvent@a18aa2
CashPurchaseSubscriber handle CashPurchaseEvent: org.jxq.guava_eventbus_demo.eventbus.event.CashPurchaseEvent@a18aa2
MultiHandlerSubscriber handle CashPurchaseEvent: org.jxq.guava_eventbus_demo.eventbus.event.CashPurchaseEvent@a18aa2
MultiHandlerSubscriber handle generice PurchaseEvent: org.jxq.guava_eventbus_demo.eventbus.event.CreditPurchaseEvent@194ca6c
CreditPurchaseSubscriber handle CreditPurchaseEvent: org.jxq.guava_eventbus_demo.eventbus.event.CreditPurchaseEvent@194ca6c
MultiHandlerSubscriber handle CreditPurchaseEvent: org.jxq.guava_eventbus_demo.eventbus.event.CreditPurchaseEvent@194ca6c

   下面开始简单地阅读下EventBus这个小组件的源码,首先进入EventBus类的,它有两个关键实例属性如下:

  /**
   * All registered event handlers, indexed by event type.
   */
  private final SetMultimap<Class<?>, EventHandler> handlersByType =
      Multimaps.newSetMultimap(new ConcurrentHashMap<Class<?>, Collection<EventHandler>>(),
          new Supplier<Set<EventHandler>>() {
            @Override
            public Set<EventHandler> get() {
              return new CopyOnWriteArraySet<EventHandler>();
            }
          });
 
  /**
   * Strategy for finding handler methods in registered objects.  Currently,
   * only the {@link AnnotatedHandlerFinder} is supported, but this is
   * encapsulated for future expansion.
   */
  private final HandlerFindingStrategy finder = new AnnotatedHandlerFinder();

   handlersByType将会保存事件类型与Handler的映射,而finder将负责查找Subscriber对象中的EventHandler,这里只提供了一种查找策略AnnotatedHandlerFinder,即根据@Subscriber注解查找。

   然后进入EventBus.register(Object)方法:

  public void register(Object object) {
    handlersByType.putAll(finder.findAllHandlers(object));
  }

   再看下AnnotationHandlerFinder的findAllHandlers(object)方法,object就是Subscriber对象:

  @Override
  public Multimap<Class<?>, EventHandler> findAllHandlers(Object listener) {
    Multimap<Class<?>, EventHandler> methodsInListener =
        HashMultimap.create();
    Class clazz = listener.getClass();
    while (clazz != null) {
      for (Method method : clazz.getMethods()) {
        Subscribe annotation = method.getAnnotation(Subscribe.class);

        if (annotation != null) {
          Class<?>[] parameterTypes = method.getParameterTypes();
          if (parameterTypes.length != 1) {
            throw new IllegalArgumentException(
                "Method " + method + " has @Subscribe annotation, but requires " +
                parameterTypes.length + " arguments.  Event handler methods " +
                "must require a single argument.");
          }
          Class<?> eventType = parameterTypes[0];
          EventHandler handler = makeHandler(listener, method);

          methodsInListener.put(eventType, handler);
        }
      }
      clazz = clazz.getSuperclass();
    }
    return methodsInListener;
  }

   代码很简单,就是通过反射读取Subscriber对象对应的类中带有@Subscriber注解的方法,并检查下是否有且仅包含一个参数,如果不是就抛IllegalArgumentException。这里关注下clazz = clazz.getSuperclass(),即如果一个子Subscriber继承另一个父Subscriber,则注册子Subscriber中的EventHandler时,也会递归地注册它的父Subscriber中的EventHandler。makeHandler(listener, method)将Subscriber对象(这里又叫做listener)和它的带@Subscribe注解的Method对象封装成EventHandler标准事件处理对象:

  private static EventHandler makeHandler(Object listener, Method method) {
    EventHandler wrapper;
    if (methodIsDeclaredThreadSafe(method)) {
      wrapper = new EventHandler(listener, method);
    } else {
      wrapper = new SynchronizedEventHandler(listener, method);
    }
    return wrapper;
  }

   注册好了之后所有事件类型和EventHandler的映射关系就存在上面说的handlersByType中了。接下来看下post(Object event)方法:

  public void post(Object event) {
    Set<Class<?>> dispatchTypes = flattenHierarchy(event.getClass());

    boolean dispatched = false;
    for (Class<?> eventType : dispatchTypes) {
      Set<EventHandler> wrappers = getHandlersForEventType(eventType);

      if (wrappers != null && !wrappers.isEmpty()) {
        dispatched = true;
        for (EventHandler wrapper : wrappers) {
          enqueueEvent(event, wrapper);
        }
      }
    }

    if (!dispatched && !(event instanceof DeadEvent)) {
      post(new DeadEvent(this, event));
    }

    dispatchQueuedEvents();
  }

   代码先获取事件类型,注意如果是子事件比如最上面代码示例中的CreditPurchaseEvent,那么触发子事件的同时也会连带触发父事件比如PurchaseEvent。然后将事件和对应的EventHandler入队:

  /**
   * Queue the {@code event} for dispatch during
   * {@link #dispatchQueuedEvents()}. Events are queued in-order of occurrence
   * so they can be dispatched in the same order.
   */
  protected void enqueueEvent(Object event, EventHandler handler) {
    eventsToDispatch.get().offer(new EventWithHandler(event, handler));
  }

   最后通过dispatchQueuedEvents()处理队列中的事件:

 /**
   * Drain the queue of events to be dispatched. As the queue is being drained,
   * new events may be posted to the end of the queue.
   */
  protected void dispatchQueuedEvents() {
    // don't dispatch if we're already dispatching, that would allow reentrancy
    // and out-of-order events. Instead, leave the events to be dispatched
    // after the in-progress dispatch is complete.
    if (isDispatching.get()) {
      return;
    }

    isDispatching.set(true);
    try {
      while (true) {
        EventWithHandler eventWithHandler = eventsToDispatch.get().poll();
        if (eventWithHandler == null) {
          break;
        }

        dispatch(eventWithHandler.event, eventWithHandler.handler);
      }
    } finally {
      isDispatching.set(false);
    }
  }

   isDispatching是一个ThreadLocal<Boolean>实例变量,标识当前是否已经在进行事件分派处理,如果是则直接返回。

  dispatch(eventWithHandler.event, eventHandler.handler)代码如下:

  /**
   * Dispatches {@code event} to the handler in {@code wrapper}.  This method
   * is an appropriate override point for subclasses that wish to make
   * event delivery asynchronous.
   *
   * @param event  event to dispatch.
   * @param wrapper  wrapper that will call the handler.
   */
  protected void dispatch(Object event, EventHandler wrapper) {
    try {
      wrapper.handleEvent(event);
    } catch (InvocationTargetException e) {
      logger.log(Level.SEVERE,
          "Could not dispatch event: " + event + " to handler " + wrapper, e);
    }
  }

   wrapper.handlerEvent反射调用事件处理方法:

  public void handleEvent(Object event) throws InvocationTargetException {
    try {
      method.invoke(target, new Object[] { event });
    } catch (IllegalArgumentException e) {
      throw new Error("Method rejected target/argument: " + event, e);
    } catch (IllegalAccessException e) {
      throw new Error("Method became inaccessible: " + event, e);
    } catch (InvocationTargetException e) {
      if (e.getCause() instanceof Error) {
        throw (Error) e.getCause();
      }
      throw e;
    }
  }

   target就是上文的listener或者Subscriber对象。

   总体来说还是很简单清晰的!

展开阅读全文
加载中
点击加入讨论🔥(4) 发布并加入讨论🔥
打赏
4 评论
8 收藏
0
分享
返回顶部
顶部