文档章节

Spring Integration Framework

D
 DoIt
发布于 2017/05/18 09:32
字数 14449
阅读 108
收藏 2


从系统文件中读取配置,持久化数据到数据库,发送信息到外部客户端,发布邮件,FTP日常的快照,执行其它的例行任务。
我们的应用程序要跟文件系统,数据库系统,邮件系统,FTP服务等打交道。
我们还需要考虑部署各种不同的适配器来满足我们应用程序跟外部其它应用集成的需要。

共享文件系统:两个以上应用程序共享一个通用的文件系统,一个可能写入,其它的读取。
这时需要将发送者和接收者解耦。但是它短板在于性能,稳定性,以及对文件系统的依赖。
单数据库:应用程序共享单一数据库。造成网络延迟,连接锁定问题
消息系统:要求发送和接收者解耦,发送消息的程序将一些数据封装发送给一个消息中间体就不需要在关心它了。
一个消费消息的消费者无论何时都可以开始它的工作流。好处在于消息可以在这个过程中被强化,转换,路由,过滤,然后才送达终端处理。

消息模式:消息,信道,转换器
pipes and filters 模式

mkonda$ cat just-spring-titles.txt | grep "Just Spring" | wc -l
这里有三个终端处理 cat,grep,wc
cat 命令显示文件内容,这里的显示不是显示到屏幕上,而是通过管道发送给grep命令,grep命令会获取文件内容并从中查找
Just Spring字符串,然后将结果通过管道传递给wc,由它简单的在屏幕上显示结果数量。

这就是一个管道和过滤模型,其中 | 表示管道。

如果我们了解JMS或者分布式技术,那就应该听说过企业级消息。
我们的应用程序在跟其它应用程序通过网络进行交互可以被看作企业应用程序。
我们需要用有一个应用程序服务器来承载这些应用程序,并暴露服务接口给其它应用程序调用。

Enterprise Integration Pattern:EIP 
传统的编程模型:
比如我们设计一个从外部系统获取交易的系统(比如我们从一个文件中),需要按照如下步骤处理交易:
    需要基于交易的类型(新建,撤销,修正等)对交易进行分类
    然后将交易按照类别进行处理和存储
    当一个交易被存储后,必须有一个审计工具被通知。
我们可以将上面的设计编写到一个组件中。
//Pseudo code
public class TradesLoader {
    private List<Trade> trades = null;
    ....
    public void start(){
        trades = loadTrades(tradesFile);
        (for Trade t: trades){
            processTrade(t);
            persistTrades(t);
            auditNotify(t);
        }
    }
    public void processTrade(Trade trade){
        if (t.getStatus().equalsIgnoreCase("new")) {
            processNewTrade();
        } else if (t.getStatus().equalsIgnoreCase("amend")) {
            processAmmendTrade();
        } else {
            processOtherTrade();
        }
    }
    public void loadTrades(File fromFile){ .. }
    public void persistTrades(Trade trade){ .. }
    public void auditNotify(Trade trade){ .. }
    public static void main(String[] args) {
        new TradesLoader().start();
    }
}
这是一个顺序模型,组件跟整个业务流程高度的耦合。如果我们想添加另外一个工作流,比如我们将对所有大级别交易发送通知
或者创建一个新任务来收集交易模型,我们就需要重新构建这个流程,并写添加更多的if-else语句。

这样我们就看出上面这个TradesLoader组件干的太多了,而不单单是夹在交易。
所以我们需要简化这种设计,让TradesLoader组件感到加载完Trades为止结束。
在上面但进程处理中,TradesLoader首先是获取Trade然后存储到一个事先定义好的内部序列中。
相关的操作比如TradeProcessor,TadePersistor,TradeNotifier都将基于这个序列来完成整个工作流程。

独立的消息模型:
上面的组件应用可以被重构成TradesLoader在从文件中加载了Trades后,发布Trades到一个容器数据结构,在消息模型中,这个容器
被称为destination 或者是 channel,而其它的处理组件都从这里获取Trade。而destination或者channel扮演了一个导管的角色。

Spring Integration 框架是基于Spring的核心组件开发出来的,满足此类模型的编程的优秀框架,它需要我们提供专门的消息容器程序。

基本概念:消息,信道和端点,Messages,Channels和Endpoints
消息是数据的容器,信道是包含这些消息的地址,端点是一些连接到这些信道消费或者发布消息的组件。

消息是在两个应用程序之间携带信息的对象,它形成于一段,在另一端被解析处理,消息的生产者或者发布者发布消息到一个信道,
消息的预订者或者消费者连接该信道然后获取消息,从消息中读取其携带的数据,复原会相应的领域对象,进行相关业务处理。

解析一个消息:
消息由两部分组成:payload和header
payload相当于信的内容,是对它感兴趣的一方需要处理的内容。
header是头信息,相当于信封。

public interface Message<T> {
    T getPayLoad();
    MessageHeaders getHeaders();
}

public final class MessageHeaders implements Map<String, Object>, Serializable { ... }

框架提供了一个Message接口的通用实现GenericMessage
我们可以使用工具类MessageBuilder来生成:
// Create payload object
Account a = new Account();
// creating a map with header properties
Map<String, Object> accountProperties = new HashMap<String, Object>();
// Set our header properties
accountProperties.put("ACCOUNT_EXPIRY","NEVER");
// Use MessageBuilder class to create a Message
// and set header properties
Message<Account> m = MessageBuilder.withPayload(a)
    .setHeader("ACCOUNT_EXPIRY", "NEVER")
    .build()
    
消息信道:
消息数据的容器,被称为信道,表示消息被发送的位置。
框架中我们定义了MessageChannel接口来描述它。
Spring Integration提供了声明信道的模型,我们不需要通过java类来定义它。
<beans
    xmlns:int="http://www.springframework.org/schema/integration"
    xsi:schemaLocation="http://www.springframework.org/schema/integration
    http://www.springframework.org/schema/integration/spring-integration-2.1.xsd"
    ...
    >
    // declaratively creating a channel
    <int:channel id="newAccounts">
</beans>
同时提供了一些具体的实现,QueueChannel,PriorityChannel和RendezvousChannel等。
虽然它们各有不同但是底层的设计原则都是一样的,表现为一个端点地址。

端点Endpoints:
它们从一个输入信道消费消息,或者从一个输出信道发布消息。也可能是只消费消息,或者只发送消息。
框架提供了很多即插即用的端点实现:Transformers,Splitters,Filters,Routers等。
还提供了一些端点适配器来连接像JMS,FTP,JDBC,Twitter等。

比如我们有一个Service Activator endpoint
它是一个通用的端点,用于当一个消息抵达输入信道时,就在一个bean上调用一个方法。

<int:service-activator input-channel="positions-channel" 
                        ref="newPositionProcessor" 
                        method="processNewPosition">
</int:service-activator>

//bean to be invoked
<bean id="newPositionProcessor" class="com.madhusudhan.jsi.basics.NewPositionProcessor" />

这里service-activator 端点会在消息抵达position-channel时会获取消息并调用bean上的processNewPosition方法,
而完成这些所需要的代码,框架已经帮忙实现了。

比如一个交易者需要一个web应用程序生成一个交易,发送到一个JMS目的地,在那里这些交易会被另外一个组件预订,它们会
连接JMS目的地并获取交易并处理它们。

我们可以使用一个适配器inbound-channel-adapter来从一个输入JMS Queue获取消息。
<int:channel id="trades-channel" />
<jms:inbound-channel-adapter id="tradesJmsAdapter"
    connection-factory="connectionFactory"
    destination="tradesQueue"
    channel="trades-channel">
    
    <int:poller fixed-rate="1000" />
</jms:inbound-channel-adapter>

<int:service-activator
    input-channel="trades-channel"
    ref="newTradeProcessor"
    method="processNewTrade">
</int:service-activator>

<bean id="tradesQueue"
    class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="TRADES_QUEUE" />
</bean>

<bean name="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
        <value>tcp://localhost:61616</value>
    </property>
</bean>

<int:service-activator
    input-channel="trades-channel"
    ref="newTradeProcessor"
    method="processNewTrade">
</int:service-activator>
// bean that would be invoked
<bean id="newTradeProcessor" class="com.madhusudhan.jsi.basics.NewTradeProcessor" />


public class NewTradeProcessor {
    public void processNewTrade(Trade t){
        // Process your trade here.
        System.out.println("Message received:"+m.getPayload().toString());
    }
}

测试类:
public class NewTradeProcessorTest {
    private ApplicationContext ctx = null;
    private MessageChannel channel = null;
    // Constructor which instantiates the endpoints
    public NewTradeProcessorTest() {
        ctx = new ClassPathXmlApplicationContext("basics-example-beans.xml");
        channel = ctx.getBean("trades-channel", MessageChannel.class);
    }
    private Trade createNewTrade() {
        Trade t = new Trade();
        t.setId("1234");
        ...
        return t;
    }
    private void sendTrade() {
        Trade trade = createNewTrade();
        Message<Trade> tradeMsg = MessageBuilder.withPayload(trade).build();
        channel.send(tradeMsg, 10000);
        System.out.println("Trade Message published.");
    }
    public static void main(String[] args) {
        NewTradeProcessorTest test = new NewTradeProcessorTest();
        test.sendTrade();
    }
}

通过信道我们可以解耦发送者和接收者,在我们的管道和过滤模型中,Channel是管道。
我们可以通过信道和终端组合出复杂的集成方案。

消息信道:
MessageChannel接口定义两个主要发方法:
boolean send(Message message);
boolean send(Message message, long timeout)
第一个方法执行时必须等到消息发送成功才会返回控制权,当一个消息没有被发送时,第二个方法会接管,抛出异常。

这里的timeout变量可以是0,正值或者负值。如果为负值,则线程会被阻塞直到消息发布成功。
如果是0,发送方法无论成功与否都会立刻返回控制权,如果正值,则会在发送失败时等待相应的时间抛出错误和异常。
它没有定义任何的接收消息的方法,接收消息大量依赖接收语法Point-to-Point(p2p)或者publish/Subscribe(Pub/Sub).

在P2P模式下,只有一个消息接收者接收消息,即使有多个接收者能连接该信道,但是只能有一个接收,而且可以使随机选择。
在Pub/Sub模式下,消息是发送给所有预订了该消息的消费者,就是说消息被拷贝后分发给所有预订者。

这里有个概念叫做message buffering,消息缓存,消息被根据配置按照序列保存到内存或者持久存储区。

客户端选择信道依赖于分发模式(P2P,Pub/Sub)和缓冲或非缓冲情景。

这里有两个接口来处理接收端:
PollableChannel 和 SubscribleChannel 它们两个都扩展子MessageChannel,故自动继承了Send方法。

点到点模式:
消费者仅需要使用任意的PollableChannel接口实现即可:
public interface PollableChannel extends MessageChannel {
    // This call blocks the thread until it receives a message
    Message<?> receive();
    // This call will wait for a specified timeout before
    // throwing a failure if message is not available
    Message<?> receive(long timeout);
}
框架提供了该接口的三个实现:QueueChannel,PriorityChannel和RendezvousChannel
QueueChannel具备消息缓冲功能,PriorityChannel和RendezvousChannel是QueueChannel更出色的实现版本。
它们扩展了P2P和缓冲特性。
Spring会在应用程序启动时创建它们。

public class QueueChannelTest{

    private ApplicationContext ctx=null;
    private MessageChannel qChannel=null;
    
    public QueueChannelTest(){
        ctx=new ClassPathXmlApplicationContext("channels-beans.xml");
        qChannel = ctx.getBean("q-channel",MessageChannel.class);
    }
    
    public void receive(){
        // This method receives a message, however it blocks
        // indefinitely until it finds a message
        //Message m =((QueueChannel)qChannel).receive();
        // This method receives a message, however it exists
        // within the 10 seconds even if doesn't find a message
        Message m = ((QueueChannel) qChannel).receive(10000);
        System.out.println("Payload: " + m.getPayload());
    }
}

channels-beans.xml
<int:channel id="q-channel">
    <int:queue capacity="10" />
</int:channel>
-------------------------------------------------------------------------------------------------------------------
Pub/Sub模型:
使用SubscribableChannel,每个消息都要被广播给所有注册的订阅者。
public interface SubscribableChannel extends MessageChannel{
    //to subscribe a MessageHandler for handling the message
    boolean subscribe(MessageHandler handler);
    
    //unsubscribe
    boolean unsubscribe(MessageHandler handler);
}

public interface MessageHandler{
    //this method is invoked when a fresh message appears on the channel
    void handleMessage(Message<?> message) throws MessagingException;
}

public class ChannelTest{
    private ApplicationContext ctx=null;
    private MessageChannel pubSubChannel=null;
    
    public ChannelTest(){
        ctx=new ClassPathXmlApplicationContext("channels-beans.xml");
        pubSubChannel = ctx.getBean("pubsub-channel",MessageChannel.class);
    }
    
    public void subscribe(){
        ((PublishSubscribeChannel)pubSubChannel).subscribe(new TradeMessageHandler());
    }
    
    class TradeMessageHandler implements MessageHandler{
        public void handleMessage(Message<?> message) throws MessagingException{
            System.out.println("Handling Message:"+ message);
        }
    }
}

<int:publish-subscribe-channel id="pubsub-channel"/>
--------------------------------------------------------------------------------------------------------------
Queue Channel:
该信道展现出点对点特性,只有一个消费者可以接收到消息,但是可以创建多个消费者。
该信道同时还支持缓冲消息,因为它使用一个序列结构将消息保存到内存中。

<int:channel id="newAccounts">
    <int:queue capacity="100"/>
</int:channel>
定义了一个100个元素的序列信道。如果省略则会创建一个无限容量的信道,capacity 的会被设置为Integer.MAX_VALUE。

在没有客户消费消息时,消息序列可能会被塞满,消息发送者也会被阻塞,直到序列有空间可用或者超时发生。

QueueChannel实现的事First In First Out(FIFO)顺序。数据会被保存到java.util.concurrent.LinkedBlockingQueue.
QueueChannel 还提供了一个purge方法来净化序列,用MessageSelector来预定义条件
public List<Message<?>> purge(MessageSelector selector){}
如果给purge方法传入null参数,将会清空整个序列。
-------------------------------------------------------------------------------------------------------------
Priority Channel:
属于QueueChannel的一个子类,只是添加了一个消息优先级设置。如果我们需要发送高优先级消息,那么使用PriorityChannel是不错的选择。
我们使用MessageHeader的PRIORITY属性设置优先级。
public void publishPriorityTrade(Trade t){
    Message<Trade> tradeMsg = MessageBuilder.withPayload(t).
        setHeader(MessageHeades.PRIORITY,10).build();
        
    priorityChannel.send(tradeMsg,10000);
        
    System.out.println("The Message is published successfully");
}

<int:channel id="newAccounts">
    <int:priority-queue capacity="10" />
</int:channel>

如果我们需要进一步的控制优先级,我们需要通过实现Comparator<Message<?>> 来提供比较实现。
public class AccountComparator implements Comparator<Message<Account>>{
    @Override
    public int compare(Message<Account> msg1,Message<Account> msg2){
        Account a1 = (Account)msg1.getPayload();
        Account a2 = (Account)msg2.getPayload();
        
        Integer i1 = a1.getAccountType();
        Integer i2 = a1.getAccountType();
        
        return i1.compareTo(i2);
    }
}
我们需要让框架直到我们定义的比较器,
<int:channel id="newAccounts">
    <int:priority-queue capacity="10" comparator="accountComparator" />
</int:channel>

<bean id="accountComparator" class="com.madhusudhan.jsi.channels.AccountComparator"/>

-------------------------------------------------------------------------------------------------------------
Rendezvous Channel:
也是QueueChannel的一个子类,展示的是点对点特性。它实现的是一个零容量的序列。
在底层它使用SynchronousQueue数据结构,这就意味着任何时间点都只能有一个消息存在于信道中。
当消息生产者发送一个消息给它时,它会被锁定直到消息被消费者消费掉。
同样的,消费者也会被锁定等待消息出现在信道中。

<int:channel id="newAccounts">
    <int:rendezvous-queue/>
</int:channel>
当我们希望接收一个请求回复时,RendezvousChannel是个理想的信道。 客户端推送一个在消息头中添加要求回复的头信息的请求,
public void sendTradeToRendezvous(Trade t) {
    Message<Trade> tradeMsg = MessageBuilder.withPayload(t).
        etHeader(MessageHeaders.REPLY_CHANNEL, "replyChannel").build();
    rendezvousChannel.send(tradeMsg, 10000);
    System.out.println(t.getStatus() + " Trade published to a Rendezvous channel");
}

-----------------------------------------------------------------------------------------------
PublishSubscribe Channel:
如果我们需要将消息发送给多个消费者,则需要使用SubscribeChannel, 这里没有定义receive 方法。
因为信息接收时有MessageHandler来处理的。
<int:publish-subscribe-channel id="newAccounts" />

public class PubSubTest{
    MessageHandler handler = new TradeMessageHandler();
    private ApplicationContext ctx = null;
    private PublishSubscribeChannel pubSubChannel = null;
    ...
    // subscribe to the channel
    public void subscribe() {
        boolean handlerAdded = pubSubChannel.subscribe(handler);
        System.out.println("Handler added?" + handlerAdded);
    }
    // Unsubscribe using the same channel and handler references.
    public void unsubscribe() {
        boolean handlerRemoved = pubSubChannel.unsubscribe(handler);
        System.out.println("Handler removed?" + handlerRemoved);
    }
    //Handler to handle the messages
    class TradeMessageHandler implements MessageHandler {
        public void handleMessage(Message<?> message) throws MessagingException {
            System.out.println("Handling Message:" + message);
        }
    }
}
当消息出现在信道时,它会调用注册的handler并传入消息,进行处理。
---------------------------------------------------------------------------------------------------------
Direct Channel:
混合了P2P和Pub/Sub特色,它实现了SubscribableChannel接口,所以我们需要一个MessageHandler的具体实现来订阅它。
消息可以被订阅它的处理器处理,但是只有一个订阅者会获取消息,呈现了P2P特点。
即使你注册了多个订阅者,信道也只会交付给它们中的一个。 
该框架用于循环广播策略从众多预订者中选择一个接收消息。

消息的生产者和消费者都运行在同一个线程中,对于跨域多个资源事务的企业级应用来说非常有用。

<int:channel id="newAccounts"/>

如果多个处理器订阅一个信道,有两个问题,要选择哪个处理器处理消息和如果选择了一个处理器不能处理消息问题。
一个是load-balancer另一个是failover 属性。
load-balancer 标记选择一个合适的加载策略来选择处理器。
failover属性是布尔标记,如果设置为true,如果被选中的处理抛出异常,它会让后续的处理器处理消息,默认值为true。

因为DirectChannel将处理订阅者的任务委托给MessageDispatcher,

<int:channel id="newAccounts">
    <dispatcher failover="false" load-balancer="round-robin"/>
</int:channel>

load-balancer默认设置为round-robin,如果要忽略加载平衡策略,则直接可以将load-balancer的值设置为 none
------------------------------------------------------------------------------------------------------------
Executor Channel:
实现SubscribableChannel接口,类似DirectChannel,除了是由java.uti.concurrent.Executor 实例来派发消息。
在DirectChannel实现里,消息发送线程从头到尾完全掌控,而在ExecutorChannel中发送线程发送完就结束了。
消息消费是由派发器独立的线程处理,派发器通过消费者调用消息处理的执行器。
<int:channel id="newAccounts">
    <int:dispatcher task-executor="accountsExecutor"/>
</int:channel>
// define the executor
<bean id="accountsExecutor" class="com.madhusudhan.jsi.channels.AccountsExecutor"/>

默认设置:
<int:channel id="newAccounts">
    <int:dispatcher load-balancer="none" failover="false" task-executor="accountsExecutor"/>
</int:channel>

-------------------------------------------------------------------------------------------------------------
Null Channel:
是PollableChannel,主要用于测试目的。其发送方法总是返回true,指定的操作都是成功的。而接收方法总是获取一个空的消息。
底层代码没有创建任何序列,但是send操作返回true,receive返回null。
// This is the framework class implementation
public class NullChannel implements PollableChannel {
    // send will always return true
    public boolean send(Message<?> message) {
        if (logger.isDebugEnabled()) {
            logger.debug("message sent to null channel: " + message);
        }
        return true;
    }
    // receive will return null always
    public Message<?> receive() {
        if (logger.isDebugEnabled()) {
            logger.debug("receive called on null channel");
        }
        return null;
    }
...
}

总之,消息信道是分隔生产者和消费者的主要组件。

===============================================================================
Endpoints 端点
消息终端是从消息框架中分离业务逻辑的组件。它们对于隐藏消息细节非常重要。
它们负责连接应用程序组件到消息信道来发送和接收消息。
Spring Integration提供的终端有Service Activator,Channel Adapter,Message Bridge,Gateway,Transformer,Filter,Router等。

首先我们需要在配置文件中引入相应的命名空间定义:
xmlns:int="http://www.springframework.org/schema/integration"
xsi:schemaLocation="http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration-2.1.xsd"

Service Activator:
是一个通用的端点,它在一个消息抵达某个信道时调用某个bean上定义的方法执行。
如果这个方法有返回值,该返回值将会被发送到一个输出信道,前提是已经配置了输出信道。

使用service-activator 元素配置activator,并设置input-channel和ref的bean。
<int:service-activator input-channel="positions-channel" ref="newTradeActivator" method="processNewPosition">
</int:service-activator>

<bean id="newTradeActivator" class="com.madhusudhan.jsi.endpoints.common.NewTradeActivator" />
通过上面的定义,任何到达positions-channel的消息都会被传递给NewTradeActivator 并通过由method属性指定的方法来处理它。
如果目标bean只有一个方法,那么这里method属性就没必要设置。框架会自动将该唯一方法解析为服务方法并调用它。
NewTradeActivator类作为服务的入口
public class NewTradeActivator {
    Position position = ..
    public void processNewPosition(Position t) {
        System.out.println("Method invoked to process the new Position"+t);
        // process the position..
        // ...
    }
}
该方法返回非空值时,返回值会被包裹在一个Message中并发送给一个output-channel.
如果你想在处理完后发送一个回复给其它信道,我们就给方法定义返回值。
// Return value will be wrapped in a Message
// and sent to an output channel
public Position processNewPosition(Position t) {
    System.out.println("Method invoked to process the new Position"+t);
    // process the position..
    // ...
    return position;
}
我们可以没有定义output-channel,只要我们的处理方法带有返回值,框架将会使用消息头中名为replyChannel的属性来发送回复。
如果在消息头中没有发现replyChannel属性,则会抛出异常。

服务方法可以有Message或者一个Java对象作为参数,如果使用一个Java对象作为参数,消息中的负载就会被取出来传递给message。
因为传入消息是一个java对象,该模式将不会绑定我们的Spring API,所以是个更好的选项。
上例中一个Position对象被包裹到一个Message中传递给信道。
----------------------------------------------------------------------------------------------------------
Message Bridge:

消息桥是一个简单的端点,它连接不同的消息模型或者适配器。
常用的消息桥有绑定一个P2P模型信道到一个Pub/Sub模型。
在P2P模型中,一个PollableChannel被端点使用,反之,whereas,一个PublishSubscribableChannel用于Pub/Sub模型。

在配置文件中使用bridge元素定义它,
<int:publish-subscribe-channel id="trades-in-channel" />

<int:channel id="trades-out-channel">
    <int:queue capacity="10" />
</int:channel>

<!-- Bridges pub/sub channel to a p2p channel -->
<int:bridge input-channel="trades-in-channel" output-channel="trades-out-channel" />
上例中,bridge会获取来自输入信道的消息发布到输出信道中。
输入信道是PublishSubscribeChannel,而输出信道是QueueChannel

要完成上面的设计,需要一个service activator挂到输出信道上。
在消息通过桥端点到达输出信道时,PositionReceiver 会被调用。
<int:service-activator input-channel="trades-out-channel" ref="positionReceiver"/>

<bean id="positionReceiver" class="com.madhusudhan.jsi.endpoints.PositionReceiver"/>
---------------------------------------------------------------------------------------------------------

Message Enricher:
一个消息增强组件可以给一个输入的消息添加额外的信息并发送给下一个消费者。
比如一个交易由一些编码信息,比如安全ID,或者客户账户代码组成。
为了保持可信度我们可能在不同的阶段为其附加不同的信息数据。
框架提供了两种方式来增强消息:Header Enricher和Payload Enricher

Header Enricher:
<int:header-enricher
    input-channel="in-channel"
    output-channel="out-channel">
    <int:header name="SRC_SYSTEM" value="BBG" />
    <int:header name="TARGET_SYSTEM" value="LOCAL" />
</int:header-enricher>

我们还可以设置一些预定义的属性,比如priority,reply-channel,error-channel等。
<int:header-enricher id="maxi-enricher" input-channel="in-channel" output-channel="out-channel">
    <int:priority value="10"/>
    <int:error-channel ref="myErrorChannel"/>
    <int:correlation-id value="APP_OWN_ID"/>
    <int:reply-channel value="reply-channel"/>
    <int:header name="SRC_SYSTEM" value="BBG" />
</int:header-enricher>

<int:publish-subscribe-channel id="myErrorChannel" />

框架还支持通过允许header-enricher的头属性来引用一个bean来使用payload设置头属性。
<int:header-enricher id="pojo-enricher"
    input-channel="in-channel"
    output-channel="out-channel">
    <int:header name="ID" ref="tradeEnricher" method="enrichHeader"/>
</int:header-enricher>

这里的ID是在TradeEnricher的帮助下从payload中获取数据。
public class TradeEnricher {
    public String enrichHeader(Message m) {
        Trade t = (Trade)m.getPayload();
        return t.getId()+"SRC";
    }
}

Payload Enricher:
使用enricher标签定义添加或者增强到payload的部分的内容。
<int:enricher input-channel="in-channel"
    request-channel="enricher-req-channel"
    output-channel="stdout">
    <int:property name="price" expression="payload.price" />
    <int:property name="instrument" expression="payload.instrument" />
</int:enricher>
<int:service-activator input-channel="enricher-req-channel" ref="tradeEnricher">
</int:service-activator>
<bean id="enricherBean" class="com.madhusudhan.jsi.endpoints.enricher.Enricher" />
<bean id="tradeEnricher" class="com.madhusudhan.jsi.endpoints.enricher.PriceEnricher" />

同样的enricher需要input-channel中的消息,然后将它传递给request-channel并等待回复。
应该有一些其他组件监听request-channel 来增强消息。
在增强了消息的payload后,该组件会发布回复给回复信道。回复信道是通过header属性声明在消息内部的。
一旦增强器enricher获得了回复,它会通过表达式将增强数据设置给属性。

上面的配置中,Price被传入in-channel,Price消息没有任何数据,enricher将它传递给enricher-req-channel并等待回复。
获取消息并增强其信息后返回Price,返回值被发布到reply-channel. enricher一旦接收到来自reply-channel的消息就继续处理,
添加额外的属性,比如price和instrument到消息并发送它们到output-channel。

public void publishPrice(){
    //Create a Price object with no values
    Price p = new Price();
    
    //note the reply-channel as header property
    Message<Price> msg = MessageBuilder.withPayload(p)
        .setHeader(MessageHeaders.REPLY_CHANNEL, "reply-channel")
        .build();
    
    channel.send(msg,10000);
    System.out.println("Price Message published");
}

public class PriceEnricher{
    public Price enrichHeader(Message m){
        Price p = (Price)m.getPayload();
        p.setInstrument("IBM");
        p.setPrice(111.11);
        return p;
    }
}

Enricher 组件符合Gateway模式。
------------------------------------------------------------------------------
Gateway:网关
有两种类型的网关,同步网关和异步网关
同步网关中,消息调用会被阻塞直到消息处理完成,而异步网关,消息的调用时非阻塞的。

Synchronous Gateway:
写一个网关的第一步就是定义一个接口来描述跟消息系统的交互方法。
比如我们定义ITradeGateway接口包含一个单一的processTrade方法
public interface ITradeGateway {
    public Trade processTrade(Trade t);
}
配置网关:
<int:gateway id="tradeGateway"
    default-request-channel="trades-in-channel"
    default-reply-channel="trades-out-channel"
    service-interface="com.madhusudhan.jsi.endpoints.gateway.ITradeGateway" />
    
上面配置在应用程序上下文加载时,会用默认请求和回复信道创建一个网关端点,网关有一个service-interface属性
该属性执行我们的ITradeGateway接口。框架的GatewayProxyFactoryBean 为服务接口创建一个代理(所以你不用为其提供任何实现)
代理能够使用提供的信道来处理客户端输入和输出的请求。

所以如果客户端调用一个processTrade方法,也是由代理来完成。
它发布一个带有Trade对象的消息到trades-in-channel,代理会阻塞调用直到从trades-out-channel收到回复为止。
该回复会被传回客户端。会有另外一个组件来获取来trades-in-channel的消息根据业务需求进行处理。

客户端代码:
public GatewayEndpointTest() {
    ...
    public GatewayEndpointTest() {
        ctx = new ClassPathXmlApplicationContext("endpoints-gateway-beans.xml");
        // obtain our service interface
        tradeGateway = ctx.getBean("tradeGateway",ITradeGateway.class);
    }
    public void publishTrade(Trade t) {
        // call the method to publish the trade!
        Trade it = tradeGateway.processTrade(t);
        System.out.println("Trade Message published (Reply)."+it.getStatus());
    }
    public static void main(String[] args) {
        GatewayEndpointTest test = new GatewayEndpointTest();
        Trade t = new Trade();
        test.publishTrade(t);
    }
}

我们从应用程序上下文中获取一个tradeGateway bean并调用processTrade方法,完全不依赖于消息框架。
为了完成这个实例,我们可以配置一个Service Activator来从trades-in-channel(跟代理发布消息是同一个信道)中获取消息,并传递回复给trades-out-channel(跟代理监听回复的信道相同)
<int:service-activator
    input-channel="trades-in-channel"
    output-channel="trades-out-channel"
    ref="tradeProcessor"
    method="receiveTrade" >
</int:service-activator>
<bean id="tradeProcessor" class="com.madhusudhan.jsi.endpoints.gateway.TradeProcessor" />

public class TradeProcessor {
    public Trade receiveTrade(Trade t) {
        System.out.println("Received the Trade via Gateway:"+t);
        t.setStatus("PROCESSED");
        return t;
    }
}

Asynchronous Gateway:
要获取异步效果,那么服务接口的返回值需要改变,现在我们让它返回一个Future对象。
import java.util.concurrent.Future;
public interface ITradeGatewayAsync {
    public Future<Trade> processTrade(Trade t);
}

public void publishTrade(Trade t) {
    Future<Trade> f = tradeGateway.processTrade(t);
    try {
        Trade ft = f.get();
    } catch (Exception e) { .. }
}

-------------------------------------------------------------------------------
Delayer:延迟器
用于在发送者和接收者之间引入延迟。
<int:delayer default-delay="5000"
    input-channel="in-channel"
    output-channel="out-channel">
</int:delayer>

所有的进入in-channel的消息将会被延迟5秒后传送给out-channel,如果default-delay设置为0或者负数时,会立刻转发消息。

我们还可以通过消息的header字段来定义每个消息的延迟周期,为此我们需要使用delay-header-name来让框架知道。
<int:delayer default-delay="5000"
    input-channel="prices-in-channel"
    output-channel="prices-out-channel"
    delay-header-name="MSG_DELAY">
</int:delayer>
上面配置指定所有具有MSG_DELAY属性的消息都将按照自己设置的头字段值进行延迟,没有该属性设置的消息按照系统配置的默认值延迟。

----------------------------------------------------------------------------------
Spring 表达式:
Spring集成支持Spring表达式(SpEL)定义。
我们可以用表达式来在消息头和负载中求取属性值。
<int:header-enricher id="enricher"
    input-channel="in-channel" output-channel="out-channel">
    <int:header name="TARGET_SYSTEM" expression="headers.TARGET_SYSTEM"/>
</int:header-enricher>
这里headers 属性引用MessageHeaders,所以我们可以使用headers.property_name语法来查属性值。
类似的,消息的payload可以作为payload属性来用,所以我们可以使用点号查询payload对象属性值。
<int:enricher input-channel="in-channel"
    request-channel="enricher-req-channel"
    output-channel="stdout">
    <int:property name="price" expression="payload.price"/>
</int:enricher>

另外端点比如Transformer,Filter,Service Activator以及Splitter都支持SpEL。
----------------------------------------------------------------------------------------
脚本支持:
可以使用框架扩展来支持脚本语言,我们可以使用框架支持的语言编写脚本,然后被端点调用。
事实上我们可以使用实现了JSR-223的任何语言。Groovy,Python/Jython,Ruby/JRuby,JavaScript等。

下面例子是端点从in-channel中获取消息然后传递给position-transformer.groovy脚本:
<int:transformer
    input-channel="in-channel"
    output-channel="stdout">
    <int-script:script lang="groovy"
        location="/home/mkonda/justspring/jsi/position-transformer.groovy"/>
</int:transformer>
在该脚本的执行上下文中,脚本可以通过headers和payload变量访问消息的MessageHeaders和Payload。
我们还可以直接将脚本以CDATA元素的形态直接嵌入到配置文件中。
------------------------------------------------------------------------------------------
Consumers:消费者
我们的信道有两种一种是pollable可轮询的,一种是subscribable可订阅的,同样的我们的终端消费者也可分为
Polling Consumer和Event-Driven客户。

轮询消费者基于轮询配置为消息轮询信道,它是由客户程序驱动的。
事件驱动消费者预订了一个可预订信道,当消息到达时可以被异步通知。

Polling Consumers:
其特点是定时的为消息进行轮询,框架提供了PollingConsumer类来完成这项工作。
在实例化它是需要给构造函数传入一个可轮询信道和一个消息处理器。
消息处理器是一个处理发布到该信道的消息的接口定义。

private MessageHandler positionsHandler = null;
private QueueChannel positionsChannel = null;
...
// Instantiating a PollingConsumer
PollingConsumer consumer = new PollingConsumer(positionsChannel, positionsHandler);


Message m = channel.receive();//or other receive methods
System.out.println("Payload: " + m.getPayload());

消息处理器:
public class PositionsHandler implements MessageHandler {
    public void handleMessage(Message<?> message) throws MessagingException {
        System.out.println("Handling a message: "+ message.getPayload().toString());
    }
}


public class PositionsPollingConsumer {
    private PollingConsumer consumer = null;
    private PositionsHandler positionsHandler = null;
    public PositionsPollingConsumer(ApplicationContext ctx, QueueChannel positionsChannel) {
        //instance of handler
        positionsHandler = new PositionsHandler();
        // now create the framework's consumer
        consumer = new PollingConsumer(positionsChannel, positionsHandler);
        //You must set the context, or else an error will be thrown
        consumer.setBeanFactory(ctx);
    }
    public void startConsumer() {
        consumer.start();
    }
}
调用:
PositionsPollingConsumer ppc = new PositionsPollingConsumer(ctx, positionsChannel);
ppc.startConsumer();


使用触发器轮询:
上面例子的调用不是轮询方式的,我们需要它按时方式轮询,则需要使用框架的Triggers来完成。
框架为我们提供了两种类型的触发器:PeriodicTrigger 和 CronTrigger
PeriodicTrigger是按照固定的时间间隔轮询
CronTrigger则是基于Unix的cron表达式进行轮询,在任务计划需要复杂需求时,该方式更加灵活。

一旦选中了触发器,我们就需要实例化它并将它安装到consumer,我们还可以设置它的initialDelay和fixedRate来进一步控制轮询。
PeriodicTrigger periodicTrigger = new PeriodicTrigger(2000);
// let the polling kick in after half a second
periodicTrigger.setInitialDelay(500);
// fixed rate polling?
periodicTrigger.setFixedRate(false);
其中initialDelay设置轮询只有在超过时间周期开始轮询
fixedRate是一个布尔变量,指定轮询是否需要在规定的时间间隔之内,如果设置为true,则如果当前的消息处理超过了轮询周期,
则轮询会处理下一个消息。

CronTrigger可以让consumer做更多的轮询,比如我们需要在工作日的午夜叫醒工作,使用corn表达式可以这样复杂的情型。
Cron表达式表示为由空格分开的字段,有六个字段,每个字段表示时间不同的内容。声明一个表达式表现我们的时间需求
// start polling all weekdays at exactly one minute past midnight
String cronExpression="* 01 00 * * MON-FRI";
cronTrigger = new CronTrigger(cronExpression);

-------------------------------------------------------------------------------------------------------
Event-Driven Consumers:
订阅消息的消费者被归类为Event-Driven Consumers,框架定义该类型消费者为EventDrivenConsumer
它的基本特征是等待某人分发到达信道的消息,SubscribableChannel信道支持这类消费。

private EventDrivenConsumer consumer = null;
private PositionsHandler positionsHandler = null;
private ApplicationContext ctx = null;
public PositionsEventDrivenConsumer(ApplicationContext ctx,PublishSubscribeChannel positionsChannel) {
    positionsHandler = new PositionsHandler();
    // instantiate the event driven consumer
    consumer = new EventDrivenConsumer(positionsChannel, positionsHandler);
    consumer.setBeanFactory(ctx);
}

public void startConsumer() {
    // EventDrivenConsumer exposes start method
    consumer.start();
}

=====================================================================================================
Transformer:转换器
并不是所有的程序都能理解它们消费的数据,比如一个生产者使用Java对象作为它的负载制造一条消息,而消息消费者对非Java对象
数据感兴趣,比如XML或者名值对。为了帮助消息生产者和消费者进行交流,我们就需要为他们定义转换器。
框架提供了转换器组件,比如Object-to-String,Object-to-Map等。

框架内建的转换器:
比如对象到字符串,map,或者JSON格式,框架直接提供。

String转换器:
<int:object-to-string-transformer
    input-channel="in-channel"
    output-channel="stdout">
</int:object-to-string-transformer>
<int-stream:stdout-channel-adapter id="stdout"/>
这里需要注意需要作为payload的POJO对象重写toString()方法,以避免出现意外结果。

Map Transformer:
Object --〉Map:
<int:object-to-map-transformer
    input-channel="in-channel"
    output-channel="stdout">
</int:object-to-map-transformer>
<int-stream:stdout-channel-adapter id="stdout"/>

Map--〉Object:
<int:map-to-object-transformer
    input-channel="in-channel"
    output-channel="stdout">
</int:map-to-object-transformer>

序列化和反序列化转换器:
JMS的消息在发送时必须序列化并在接收时反序列化,Payload序列化,即将POJO转换为字节数组,反序列化是将字节数组转换为POJO对象。
<int:payload-serializing-transformer
    input-channel="trades-in-channel"
    output-channel="trades-out-channel">
</int:payload-serializing-transformer>

<int:payload-deserializing-transformer
    input-channel="trades-out-channel"
    output-channel="stdout">
</int:payload-deserializing-transformer>

<int-stream:stdout-channel-adapter id="stdout"/>


JSON 转换器:
<int:object-to-json-transformer
    input-channel="trades-in-channel"
    output-channel="stdout">
</int:object-to-json-transformer>

<int:json-to-object-transformer
    input-channel="trades-in-channel"
    output-channel="trades-out-channel"
    type="com.madhusudhan.jsi.domain.Trade">
</int:json-to-object-transformer>

XML转换器:使用了Spring的Object-to-XML(OXM)框架。
 org.springframework.oxm.Marshaller 和 org.springframework.oxm.Unmarshaller
我们需要XML命名空间来访问XML转换器,
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
    ...
    xmlns:int-xml="http://www.springframework.org/schema/integration/xml"
    xsi:schemaLocation="http://www.springframework.org/schema/beans
    ....
    http://www.springframework.org/schema/integration/xml
    http://www.springframework.org/schema/integration/xml/
    spring-integration-xml-2.1.xsd">
</beans>
声明了空间后,我们就可以使用marshalling-transformer元素来读取一个输入信道的消息。
格式化成XML格式然后传递回给输出信道。
<int-xml:marshalling-transformer
    input-channel="trades-in-channel"
    output-channel="stdout"
    marshaller="marshaller"
    result-type="StringResult">
</int-xml:marshalling-transformer>
<bean id="marshaller" class="org.springframework.oxm.castor.CastorMarshaller" />
这里设置的可选参数result-type决定者结果类型。
框架有两个内建的结果类型:javax.xml.transfor.dom.DOMResult 和 org.springframework.xml.transform.StringResult
DOMResult是默认结果类型。
如果我们希望定义自己的结果类型:

<int-xml:marshalling-transformer
    input-channel="trades-in-channel-xml"
    output-channel="trades-out-channel-xml"
    marshaller="marshaller"
    result-factory="tradeResultFactory">
</int-xml:marshalling-transformer>
<bean id="tradeResultFactory" class="com.madhusudhan.jsi.transformers.builtin.TradeResultFactory" />

这里TradeResultFactory 有一个方法createResult实现,它继承自ResultFactory
public class TradeResultFactory implements ResultFactory {
    public Result createResult(Object payload) {
        System.out.println("Creating result ->"+payload);
        //create your own implementation of Result
        return new TradeResult();
    }
}


XPath 转换器:
使用XPath表达式解码XML payload,要求输入信道传入XML负载的消息。
<int-xml:xpath-transformer
    input-channel="trades-in-channel"
    output-channel="stdout"
    xpath-expression="/trade/@status">
    <int:poller fixed-rate="1000" />
</int-xml:xpath-transformer>

我们可以通过如下方式创建XML格式payload消息发布到trades-in-channel信道:
private String createNewTradeXml() {
    return "<trade status='NEW' account='B12D45' direction='BUY'/>";
}


自定义转换器:
public class TradeMapTransformer {
    public Map<String, String> transform(Trade t) {
        Map<String,String> tradeNameValuesMap = new HashMap<String,String>();
        tradeNameValuesMap.put("TRADE_ID", t.getId());
        tradeNameValuesMap.put("TRADE_ACCOUNT", t.getAccount());
        ...
        return tradeNameValuesMap;
    }
}

接下来就是让框架知道我们定义的转换类:
<int:transformer input-channel="trades-in-channel"
    output-channel="trades-out-channel" ref="tradeMapTransformer">
</int:transformer>
<bean id="tradeMapTransformer" class="com.madhusudhan.jsi.transformers.custom.TradeMapTransformer" />

这里需要为transformer元素声明的内容有:
一个输入信道,一个输出信道,还有就是转换器实现类。


String转换器:
POJS-to-String:
public class PojoToStringTransformer {
    private final String tradeString = "TRADE_ID=%s,
        TRADE_ACCOUNT=%s,
        TRADE_SECURITY=%s,
        TRADE_DIRECTION=%s,
        TRADE_STATUS=%s" ;
    public String transform ( Trade t ) {
        return String.format( tradeString,
            t.getId(),
            t.getAccount(),
            t.getSecurity(),
            t.getDirection(),
            t.getStatus() ) ;
    }
}

配置转换器:
<int:transformer input-channel="trades-in-channel"
    output-channel="trades-out-channel"
    ref="pojoToStringTransformer">
</int:transformer>
<bean id="pojoToStringTransformer" class="com.madhusudhan.jsi.transformers.custom.PojoToStringTransformer" />
------------------------------------------------------------------------------------------------------


使用标签:
我们可以使用框架的@Transformer 声明标签来引入转换器bean。
component-scan允许容器在transformers包里扫描声明标记了的bean。
此时AnnotatedTradeMapTransformer 类会被实例化:
@Component
public class AnnotatedTradeMapTransformer {
    @Transformer
    public Map<String, String> transform(Trade t) {
        Map<String,String> tradeNameValuesMap =
        new HashMap<String,String>();
        ....
        return tradeNameValuesMap;
    }
}

@Transformer 标记的方法会被调用。

<context:component-scan
base-package="com.madhusudhan.jsi.flow.transformer" />

=========================================================================
工作流组件:

消息应用有时需要一些额外的组件,比如路由,聚合aggregation,排序sequencing等。
一个应用程序可能有特定的条件来路由信息到多个信道或者分解信息然后聚合他们做更深入的处理。

Spring Integration框架提供了Filters,Routes,Aggregators,和 Splitters等可以直接使用的组件。

Filters:
消费者有不同的消息消费需求,Spring Integration框架使用Filters和配置的条件来决定哪个应用程序应该接收消息。

<int:filter input-channel="in-channel"
    output-channel="out-channel"
    ref="newTradeFilter"
    method="isNewTrade">
</int:filter>
<bean id="newTradeFilter" class="com.madhusudhan.jsi.flow.ex1.NewTradeFilter" />

public class NewTradeFilter {
    public boolean isNewTrade(Message<?> message) {
        Trade t = (Trade)message.getPayload();
        return (t.getStatus().equalsIgnoreCase("new"));
    }
}

使用框架的MessageSelector:
<int:filter input-channel="in-channel"
    output-channel="out-channel"
    ref="cancelTradeFilter">
</int:filter>
<bean id="cancelTradeFilter" class="com.madhusudhan.jsi.flow.filters.CancelTradeFilter" />

public class CancelTradeFilter implements MessageSelector{
    public boolean accept(Message<?> message) {
        Trade t = (Trade)message.getPayload();
        return (t.getStatus().equalsIgnoreCase("cancel"));
    }
}
非公用的Filter我们可以采用内嵌声明:
<int:filter input-channel="in-channel" output-channel="out-channel">
    <!-- Inner Bean -->
    <bean class="com.madhusudhan.jsi.flow.filters.NewTradeFilter" />
</int:filter>

使用标签:
@Component
public class AnnotatedNewTradeFilter {
    @Filter
    public boolean isTradeCancelled(Message<?> message) {
        Trade t = (Trade)message.getPayload();
        return (t.getStatus().equalsIgnoreCase("cancel"));
    }
}


<context:component-scan base-package="com.madhusudhan.jsi.flow.filter"/>
<int:filter input-channel="in-channel"
    output-channel="stdout"
    ref="annotatedNewTradeFilter" >
</int:filter>

----------------------------------------------------------------------
丢弃消息:
框架允许在消息不符合过滤条件时,过滤器抛出异常或者转发另外的信道。
为了能抛出异常,我们添加throw-exception-on-rejection属性到过滤器元素。
<int:filter input-channel="all-trades-in-channel"
    output-channel="cancel-trades-out-channel"
    ref="cancelTradeFilter"
    throw-exception-on-rejection="true">
</int:filter>

或者我在过滤器中装配一个信道来接收这些被丢弃的消息,我们使用discard-channel属性来设置丢弃信道
<int:filter input-channel="all-trades-in-channel"
    output-channel="cancel-trades-out-channel"
    ref="cancelTradeFilter"
    discard-channel="non-cancel-trades-hospital-channel">
</int:filter>
----------------------------------------------------------------------------
路由器:
工作流有一个需求就是根据特定条件将消息发送给一个或者多个信道。
一个路由器组件可以用于分配消息到多个目的地。
路由器会从一个信道获取消息并基于payload或者headers内容重新投递到相关的信道。

filter和router不同,Filter基于简单的布尔测试决定消息是否被发送,Router基于内容来决定如何转发消息。
在Filter里,消息只有两个方向可以去,继续向前传递或者被丢弃,使用Filter时,消息一个消息可能会也可能不会出现在输出信道中。
而使用Router,则单个消息可以被发送给一个或者多个信道。

框架提供了一些内建的路由器:基于消息负载内容的PayloadTypeRouter,和基于消息header值的HeaderValueRouter。

PayloadTypeRouter:基于payload的类型决定将消息路由到哪个信道。
路由组件会附加到一个输入信道,将获取消息负载类型,并据此分配它们到特定类型需求的其它信道。

假设我们有一个信道从外部应用流入Account和Trade消息,我们希望分离他们到两个不同的信道,Accounts进入accounts-channel
,Trades进入trades-channel

为此我们可以在输入信道all-in-channel中装配一个payload-type-router组件,然后使用其mapping属性来设置期待的类型和其相应的
分配信道。
<int:payload-type-router input-channel="all-in-channel">
    <int:mapping type="com.madhusudhan.jsi.flow.router.Trade" channel="trades-channel" />
    <int:mapping type="com.madhusudhan.jsi.flow.router.Account" channel="accounts-channel" />
</int:payload-type-route

HeaderValueRouter:
根据消息头属性来决定路由逻辑:
 <int:header-value-router input-channel="all-in-channel"
    header-name="status"
    default-output-channel="no-matches-channel">
    <int:mapping value="NEW" channel="new-trades-channel" />
    <int:mapping value="CANCEL" channel="cancel-trades-channel" />
</int:header-value-router>


自定义路由器:
我们定义路由时需要获取消息并解析消息的负载或者头信息,然后根据结果返回相应的信道名称。
public class BigTradeRouter {
    public String bigTrade(Message<Trade> message){
        Trade t = message.getPayload();
        // check if the trade is a big one and if it is
        // send it to a separate channel to handle them
        if(t.getQuantity() > 1000000)
            return "big-trades-channel";
        // else send a normal channel
        return "normal-trades-channel";
    }
}
在配置文件中配置自定义的路由器:
<int:router input-channel="all-in-channel"
    ref="bigTradeRouter"
    method="bigTrade"
    default-output-channel="non-matches-channel"/>
<!-- The custom router -->
<bean id="bigTradeRouter" class="com.madhusudhan.jsi.flow.router.BigTradeRouter"/>
--------------------------------------------------------------------------------------

Recipient List Router 接收表路由器
分配给某信道的消息定义在一个接收列表中:
比如下面的 一个Trade消息被分配到三个子流信道:persistor-channel 来保存所有输入的Trade消息
trades-channel用于处理Trade, audit-channel则用于审计目的。
<int:recipient-list-router input-channel="all-in-channel">
    <int:recipient channel="persistor-channel"/>
    <int:recipient channel="trades-channel"/>
    <int:recipient channel="audit-channel"/>
</int:recipient-list-router>
----------------------------------------------------------------------------------------
Unqualified Messages:不合格消息处理
对于一些无法通过特定路由逻辑的消息,框架既可以抛出异常也可以把它们推到一个默认的信道里。
default-output-channel

<int:payload-type-router
    input-channel="all-in-channel"
    default-output-channel="non-matches-channel">
...
</int:payload-type-router>

resolution-required 属性用在路由器上跟默认的输出信道关联。它会基于信道的ID来解析任意消息信道。
如果resolution-required设置为true,但是default-output-channel没有,则会抛出异常。

使用路由器标签:@Router
@Component
public class AnnotatedBigTradeRouter {
    @Router
    public String bigTrade(Message<Trade> message) {
        Trade t = message.getPayload();
        if (t.getQuantity() > 10000)
            return "big-trades-channel";
        return "trades-stdout";
    }
}

<context:component-scan base-package="com.madhusudhan.jsi.flow.router" />
<int:router id="annonatedRouter" input-channel="in-channel" default-output-channel="no-matches-channel"
    ref="annotatedBigTradeRouter">
</int:router>
----------------------------------------------------------------------------------------------

Splitters:
通常用于切分消息到小块,用于更小的自定客户逻辑。
比如将一个大的消息负载切分成小块来并行处理。
框架提供了splitter元素来定义它。
我们可以通过自定义POJO实现自定义逻辑或者扩展框架提供的AbstractMessageSplitter抽象类实现其splitMessage()方法
来定义切割器。

自定义实现,一般需要我们定义一个简单的POJO,然后实现切分算法。
比如一个正常的Trade和一些加密数据被传入,需求是获取加密数据到另外的对象EncryptedTrade
新创建的EncryptedTrade和原来的Trade都将发送给输出信道。
我们就可以使用切分器来处理。
Trade和EncryptedTrade都继承自ITrade,其中ITrade只是一个没有任何内容定义的标记接口。

public class Trade implements ITrade{
    private String encryptedMsg = null;
    ...
    public String getEncryptedMsg() {
        return encryptedMsg;
    }
    public void setEncryptedMsg(String encryptedMsg) {
        this.encryptedMsg = encryptedMsg;
    }
    ...
}


public class EncryptedTrade implements ITrade{
    private String encryptedMsg = null;
    public EncryptedTrade(String encryptedMsg) {
        this.encryptedMsg = encryptedMsg;
    }
    public String getEncryptedMsg() { ... }
    public void setEncryptedMsg(String encryptedMsg) { ... }
}
其切分过程是从Trade对象中获取encryptedMessage用于构造EncryptedMessage对象。
我们定义CustomEncryptedTradeSplitter封装上面的逻辑实现:
public class CustomEncryptedTradeSplitter{
    public List<ITrade> splitMyMessageToTrades(Message<?> message) {
        List<ITrade> trades = new ArrayList<ITrade>();
        TradeImpl t = (TradeImpl)message.getPayload();
        //Create a new object from the payload
        EncryptedTrade et = new EncryptedTrade(t.getEncryptedMsg());
        trades.add(t);
        trades.add(et);
        System.out.println("Splitting message done, list: "+trades);
        return trades;
    }
}

<!-- Custom splitter -->
<int:splitter input-channel="all-in-channel"
    ref="customEncryptedMessageSplitter"
    method="splitMyMessageToTrades"
    output-channel="all-trades-out-channel">
</int:splitter>
<bean id="customEncryptedMessageSplitter"
    class="com.madhusudhan.jsi.flow.splitter.CustomEncryptedTradeSplitter" />
-------------------------------------------------------------------------------
使用AbstractMessageSplitter抽象类定义:
public class EncryptedTradeSplitter extends AbstractMessageSplitter{
    @Override
    protected Object splitMessage(Message<?> message) {
        ....
        return trades;
    }
}

<int:splitter input-channel="in-channel"
    ref="encryptedMessageSplitter"
    output-channel="out-channel">
</int:splitter>
<bean id="encryptedMessageSplitter"
    class="com.madhusudhan.jsi.flow.splitter.EncryptedTradeSplitter" />

标签 @Splitter 
声明让框架知道我们定义了一个用于切分器的bean。
然后我们必须为方法提供@Splitter标签:
@Component
public class AnnonatedEncryptedTradeSplitter{
    @Splitter
    public List<ITrade> splitMyMessageToTrades(Message<?> message) {
    ..
    }
}
该方法返回一个对象集合,每个对象都会被包裹在Message中作为payload

每个子消息都会被印上相同的集合ID:CORRELATION_ID
在SEQUENCE_SIZE参数设置母消息要被切分成子消息的个数。
在每个子消息上印上单独的SEQUENCE_NUMBER
Received a message:[Payload=Trade [...][Headers={sequenceNumber=1,
correlationId=18c9eee1-4795-4378-b70e-d236027d0c30, ..., sequenceSize=2}]
Received a message:[Payload=EncryptedTrade[...]][Headers={sequenceNumber=2,
correlationId=18c9eee1-4795-4378-b70e-d236027d0c30, ..., sequenceSize=2}]

其中correlationId相同说明这两个子消息相互关联,表示CORRELATION_ID。
sequenceSize 说明由几个子消息组成,由SEQUENCE_SIZE 头属性指定的,。
sequenceNumber 表示每个子消息的序列码。

以上三个属性将在消息聚合时发挥重要作用。
----------------------------------------------------------------------------------
Aggregation 消息聚合
装配多个消息来创建一个父消息,与Splitter是一个相反的过程。
在装配开始之前要求所有的参与装配的子消息必须都到达。装配它们要基于相关性和其发布策略。
public class TradeAggregator {
    public ITrade aggregateTrade(List<ITrade> childTrades) {
        ...
    }
}


<int:aggregator input-channel="in-channel
    output-channel="agg-channel"
    ref="tradeAggregator"
    method="aggregateTrade">
</int:aggregator>

<bean id="tradeAggregator" class="com.madhusudhan.jsi.flow.aggregator.TradeAggregator" />

<!-- Splitter that would cut the messages for aggregator to re-build -->
<int:splitter input-channel="in-channel" ref="customSplitter"
    output-channel="out-channel">
</int:splitter>
<bean id="customSplitter" class="com.madhusudhan.jsi.flow.splitter.CustomEncryptedTradeSplitter" />

-------------------------------------------------------------------------------------------------
策略:
聚合不会单独工作,这种成对算法叫做策略。 这对聚合行为非常重要。

一个父消息被切分成大量的子消息,接下来聚合器需要等待这些子消息全部到达,然后才能聚合它们再次成为一个父消息。
聚合器会遵循某种算法来开始和结束它的工作。这种算法是以correlation和release策略的算法提供给聚合器的。
聚合器使用这些策略来追踪流入的子消息并对它们进行聚合处理。

Correlation Strategy:关联策略
定义了用于分组消息的键。默认分组是基于CORRELATION_ID ,所有具有相同CORRELATION_ID的消息会被放到同一个篮子里等待聚合。

框架提供了HeaderAttributeCorrelationStrategy可以直接使用,也可以自定义自己的策略。
定义自己的策略可以通过实现CorrelationStrategy接口或者自己创建自己的POJO。
public class MyCorrelationStrategy implements CorrelationStrategy {
    public Object getCorrelationKey(Message<?> message) {
    // implement your own correlation key here
    // return ..
    }
}

<int:aggregator input-channel="all-trades-out-channel"
    output-channel="agg-channel"
    ref="tradeAggregator"
    method="aggregateTrade"
    correlation-strategy="myCorrelationStrategy">
</int:aggregator>
<bean id="myCorrelationStrategy" class="com.madhusudhan.jsi.flow.aggregator.MyCorrelationStrategy" />

如果是自己定义的POJO则需要在配置时指定方法名:并且要求方法输入Message<?>参数,返回一个Object对象。
<int:aggregator input-channel="all-trades-out-channel"
    ...
    correlation-strategy="myCorrelationStrategy"
    correlation-strategy-method="fetchCorrelationKey">
</int:aggregator>


Release Strategy:发布策略
该策略指定集合后的信息在那个点上被发送或者发布。
除了设置了release-on-expire标记情况外,它会等待信号发送。

默认的策略是 SequenceSizeReleaseStrategy,它实现了ReleaseStrategy接口。
它会检查通过SEQUENCE_SIZE分组的消息,比如SEQUENCE_SIZE是10,只有当接收所有10个消息并且序列码在1~10返回内时,
策略会触发一个信号给聚合器。

跟CorrelationStrategy 类似,我们可以通过实现ReleaseStrategy或者创建自己的POJO类来实现。
public class MyReleaseStrategy implements ReleaseStrategy {
    public boolean canRelease(MessageGroup group) {
        // implement your strategy here
        return false;
    }
}

<int:aggregator input-channel="in-channel"
    output-channel="agg-channel"
    ref="tradeAggregator" method="aggregateTrade"
    correlation-strategy="myCorrelationStrategy"
    correlation-strategy-method="fetchCorrelationKey"
    release-strategy="myReleaseStrategy">
</int:aggregator>
<bean id="myReleaseStrategy" class="com.madhusudhan.jsi.flow.aggregator.MyReleaseStrategy" />

如果自定义POJO则需要给实现方法输入一个java.util.List对象,返回一个布尔型返回值。
使用release-strategy 和 release-strategy-method 配置:
 <int:aggregator input-channel="in-channel"
    ...
    release-strategy="myReleaseStrategy"
    release-strategy-method="signalRelease">
</int:aggregator>
--------------------------------------------------------------------------------------------
消息存储:
聚合器会暂存消息直到相关联的消息都收到。即使有一子消息没收到,聚合都不能发布(除非存储过期)
这就要求聚合器有一个地方存储消息。
框架提供了相应的选项来该聚合器提供消息存储。

有两种存储选项:内存或者外部数据库

内存存储是默认的,它会通过java.util.Map收集消息存储到内存中。
框架提供了message-store 属性来引用相应的消息存储。
如果是默认的内存存储,则可以忽略。
下列配置是一个数据库存储:
<int:aggregator input-channel="all-trades-out-channel" output-channel="agg-channel"
    ....
    message-store="mySqlStore">
</int:aggregator>

<bean id="mySqlStore" class="org.springframework.integration.jdbc.JdbcMessageStore">
    <property name="dataSource" ref="mySqlDataSource"/>
</bean>
------------------------------------------------------------------------------------------
Resequencer 排序:
消息系统一个重要的特征就是消息的顺序。
尽管排序会伤害性能,但是某些情况下还是需要强调排序的,比如在灾难恢复时,要求消息按照原来的顺序回复。
Resequencer组件能够对接收的消息进行排序。
它会作用于SEQUENCE_NUMBER头字段来追踪顺序。
如果我们将其 release-partial-sequences 标记设置为true,它会在已收到消息就发布,而不会等待所有的分组消息成员都到达。
<int:resequencer input-channel="all-in-reseq-channel"
    output-channel="reseq-channel"
    release-partial-sequences="true">
</int:resequencer>
=================================================================================================
Adapter 适配器:
框架提供了很多可以直接使用的适配器,它们分为进项适配器和出项适配器。
Inbound adapters获取文件或者数据库结果集,Outbound Adapter则是从信道中获取消息然后转换它们成为文件传输到一个文件系统中,
或者 转换成数据库记录保存的数据库中。

这一切的基础就是文件适配器,其它适配器工作方式与之类似。

File Adapters:
用于从不同的文件系统中获取或者拷贝文件,然后转换成框架的Message,发布到一个信道,反之。
框架支持使用文件空间的声明模型,它还提供了一些类来读取和写入文件操作,但是我们推荐使用命名空间。

使用Namespace
file 提供了很多直接使用的元素定义。
在使用之前我们需要首先引入该Namespace

<?xml version="1.0" encoding="UTF-8"?>
<beans
    ....
    xmlns:file="http://www.springframework.org/schema/integration/file"
    xsi:schemaLocation=
    http://www.springframework.org/schema/integration/file
    http://www.springframework.org/schema/integration/file/spring-integrationfile-2.1.xsd">
...
</beans>

框架为file提供了两个适配器用于读取和写入文件。
inbound-channeladapter 元素用于读取文件并将它们作为File 负责的消息发布到一个信道。
outbound-channel-adapter 元素用于从信道中获取消息的负载的File,将其写入到文件系统。

<!-- Adapter using namespace -->
<file:inbound-channel-adapter id="fileAdapter"
    directory="/Users/mkonda/dev/ws/" channel="files-out-channel">
    <int:poller fixed-rate="1000" />
</file:inbound-channel-adapter>
上面的配置设置了适配器以一秒为频次读取指定文件内容,发布到files-out-channel信道。

<int-stream:stdout-channel-adapter id="files-out-channel" />
这里简单设置了输出适配器,从信道files-out-channel中获取消息然后在控制台打印他们。

File 适配器有一些参数可以设置:
组织重复读取文件的设置: prevent-duplicates 该标记设置只作用于每次会话,如果读取重启则设置失效。

<file:inbound-channel-adapter id="fileAdapter"
    directory="/Users/mkonda/dev/ws/"
    channel="files-out-channel"
    prevent-duplicates="true">
    <int:poller fixed-rate="1000" />
</file:inbound-channel-adapter>

过滤:使用FileListFilter接口的实现 来做。框架提供了一个AcceptOnceFileListFilter,它在当前会话中只接收一个文件一次。
我们可以通过实现FileListFilter来自定义更多过滤:
public class PositionsFilter implements FileListFilter<Position> {
    public List<Position> filterFiles(Position[] files) {
        List<Position> filteredList = new ArrayList<Position>();
        // implement your filtering logic here
        return filteredList;
    }
}

为了防止重复读取文件,我们可以使用filename-pattern 和 filename-regex属性来阻止:
<file:inbound-channel-adapter id="positionsAdapter"
    ...
    filename-pattern="*.pos">
    ...
</file:inbound-channel-adapter>
这样适配器只获取扩展名为pos的文件,我们还可以通过正则表达式加以规范。
<file:inbound-channel-adapter id="positionsAdapter"
    directory="/Users/mkonda/dev/ws/"
    ...
    filename-regex="[ABC]_positions.pos">
</file:inbound-channel-adapter>

文件锁定:
我们可以使用框架提供了FileLocker接口实现来锁定文件防止其它进程访问。
<file:inbound-channel-adapter id="positionsAdapter"
    directory="/Users/mkonda/dev/ws/" channel="positions-files-channel"
    prevent-duplicates="true" filename-regex="[ABC]_positions.pos">
    <file:nio-locker/>
    <int:poller fixed-rate="1000" />
</file:inbound-channel-adapter>

我们可以添加自己的自定义锁:
<file:inbound-channel-adapter id="positionsAdapter"
    directory="/Users/mkonda/dev/ws/" channel="positions-files-channel"
    prevent-duplicates="true" filename-regex="[ABC]_positions.pos">
    <!-- use custom locker -->
    <file:locker ref="positionsLocker"/>
    <int:poller fixed-rate="1000" />
</file:inbound-channel-adapter>
<bean id="positionsLocker" class="com.madhusudhan.jsi.adapters.PositionsFileLocker"/>
------------------------------------------------------------------------------------
独立文件读取器:
框架提供FileReadingMessageSource 类,它实现了框架的MessageSource接口,定义了receive()方法。
这是所有需要轮询消息的基础接口,返回值是Message对象,该对象包含java.io.File作为负载。
public class StandaloneFileAdapterTest {
    // set the directory from where the files need to be picked up
    File directory = new File("/Users/mkonda/dev/ws");
    public void startStandaloneAdatper() {
        FileReadingMessageSource src = new FileReadingMessageSource();
        src.setDirectory(directory);
        Message<File> msg = src.receive();
        System.out.println("Received:"+msg);
    }
    public static void main(String[] args) {
        StandaloneFileAdapterTest test = new StandaloneFileAdapterTest();
        test.startAdatper();
    }
}

// declaring the framework's class as a bean
<bean id="positionsReader" class="org.springframework.integration.file.FileReadingMessageSource">
    <property name="directory" value="/Users/mkonda/dev/ws/" />
</bean>

private void startAdapterUsingDeclaredBeanRef() {
    ctx = new ClassPathXmlApplicationContext("adapters-file-beans.xml");
    fileReader = ctx.getBean("fileReader", FileReadingMessageSource.class);
    // now you got the instance, poll for msgs
    Message<File> msg = fileReader.receive();
    System.out.println("Message received from the bean:" + msg);
}
-----------------------------------------------------------------------------------------------
Outbound Adapter:出项适配器
filewriter 适配器就是从信道中获取消息然后把它们写入文件系统。

在file命名空间下使用outbound-channel-adapter 元素来指定出项适配器。
<file:outbound-channel-adapter
    directory="/Users/mkonda/dev/ws/tmp"
    channel="positions-file-channel"/>
    
这里定义的适配器从positions-file-channel 信道获取消息,然后写入到 directory属性指定的目录中。

综合入项和出项适配器:
<file:inbound-channel-adapter id="inAdapter"
    directory="/Users/mkonda/dev/ws/"
    channel="file-channel">
    <int:poller fixed-rate="1000"/>
</file:inbound-channel-adapter>

<file:outbound-channel-adapter id="outAdapter"
    channel="file-channel"
    directory="/Users/mkonda/dev/ws/tmp"/>
-----------------------------------------------------------------------------------------
独立的文件适配器:
使用standalone 类更加直观,使用一个要写入文件的位置目录实例化 FileWritingMessageHandler 

// set the directory
File directory = new File("/Users/mkonda/dev/ws/tmp");
..
private void startStandaloneWriter() {
    // fetch the channel for incoming feed
    outChannel = ctx.getBean("files-channel",PublishSubscribeChannel.class);
    
    handler = new FileWritingMessageHandler(directory);
    // subscribe to the incoming feed
    outChannel.subscribe(handler);
}
------------------------------------------------------------------------------------------
FTP 适配器:
我们使用File Transfer Protocol来进行远程文件获取和本地文件上载。
我们的框架提供了inbound和outbound信道适配器。

输入信道适配器连接到一个FTP Server来获取远程文件并将它们作为消息Message<File>的负载。
输出信道适配器连接信道,消费消息,并把消息的负载写入到远程服务器目录。

这两个适配器都可以使用ftp命名空间下的inbound-channel-adapter 和 outbound-channeladapter
配置,在配置它们时一个前提条件是连接配置。

Session Factory:会话工厂
适配器应该要知道它所要连接的服务器的详细细节,包括用户名和密码。
框架的DefaultFtpSessionConnectionFactory类提供了这些内容。
我们需要在配置文件中声明它,并设置相应的属性值。
然后将该bean引用给session-factory属性。
<bean name="sessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
    <property name="host" value="ftp.madhusudhan.com"/>
    <property name="username" value="jsi"/>
    <property name="password" value="******"/>
    ...
</bean>

有了会话工厂,我们就可以定义FTP适配器了。
Inbound FTP 适配器
使用会话工厂提供的连接远程文件系统,并轮询文件。如果发现文件,它会获取文件并以其作为消息的负载创建消息Message<File>
然后发送给指定的信道进行进一步处理。
<?xml version="1.0" encoding="UTF-8"?>
<beans
    ....
    xmlns:ftp="http://www.springframework.org/schema/integration/ftp"
    xsi:schemaLocation=
    "http://www.springframework.org/schema/integration/ftp
    http://www.springframework.org/schema/integration/ftp/spring-integration
    ftp-2.1.xsd
    ....">
    
    <ftp:inbound-channel-adapter channel="positions-channel"
        session-factory="sessionFactory"
        remote-directory="/feeds/systems/positions/"
        local-directory="/feeds/in/positions/">
        <int:poller fixed-rate="1000"/>
    </ftp:inbound-channel-adapter>
    <bean name="sessionFactory" class="org.springframework.integration.ftp.session.DefaultFtpSessionFactory">
    ...
    </bean>
</beans>

组件使用Session 工厂连接远程服务器,从remote-directory指定的文件目录获取文件,并把它们包裹成消息发布到指定信道。
我们可以通过 filename-pattern, filename-regex等设置获取文件的规则。
这里的local-directory指定的目录是组件在开始轮询远程文件之前会先检查本地这个目录。
一旦所有的本地文件都被发布了,才会开始对远程文件进行轮询和传输。

我们可以定义自己的文件过滤类,使用filter属性指定到配置文件中。

Outbound FTP 适配器:
用于创建一个通过FTP发布消息到远程文件系统的终端。
<ftp:outbound-channel-adapter channel="positions-channel"
    remote-directory="/feeds/systems/positions/"
    session-factory="connectionFactory">
    <int:poller fixed-rate="1000" />
</ftp:outbound-channel-adapter>

-----------------------------------------------------------------------------------------------
Caching Session:缓存会话
框架在双向信道适配器器上创建一个FTP 会话池来优化网络访问。
我们可以通过属性cache-session设置为false,来关闭它。
<ftp:outbound-channel-adapter channel="positions-channel"
cache-sessions="false"
...
</ftp:outbound-channel-adapter>
-----------------------------------------------------------------------------------------------
JMS 适配器:
框架提供了输入和输出适配器来跨外部消息系统接收和发送消息。
输入适配器可以从一个JMS目标地(topic 或者queue)获取消息,然后发布他们到本地信道。
输出适配器可以将本地信道负载转换成JMS消息发布到JMS目的地(topic或者queue).
相关的适配器元素定义在jms命名空间下:

Inbound 适配器:接收消息
从消息系统接收消息可能很复杂,取决于是由消费客户端驱动还是消息提供者驱动。
客户端会基于某种规则轮询消息,服务器也会在消息到达时直接发送给客户端(也就是消息驱动或者事件驱动)。

同步消费者:
inbound-channel-adapter负责从JMS服务器获取消息,通过对端点的配置来连接JMS Server,获取消息,发布消息到一个本地的信道。
这是一种拉消费,在底层,它使用JmsTemplate的receive方法来拉消息。我们也可以提供一个JmsTemplate实例或者同时提供
connectionFactory和destination。
<jms:inbound-channel-adapter id="positionsJmsAdapter"
    connection-factory="connectionFactory"
    destination="positionsQueue"
    channel="positions-channel">
    <int:poller fixed-rate="1000" />
</jms:inbound-channel-adapter>

<!-- destination on ActiveMQ -->
<bean id="positionsQueue" class="org.apache.activemq.command.ActiveMQQueue">
    <constructor-arg value="POSITIONS_QUEUE" />
</bean>
<!-- connection factory for ActiveMQ -->
<bean name="connectionFactory" class="org.apache.activemq.ActiveMQConnectionFactory">
    <property name="brokerURL">
        <value>tcp://localhost:61616</value>
    </property>
</bean>
连接工厂封装连接外部JMS提供者的详细信息连接信息,简单的定义为一个bean,并使用适配器的connection-factory属性
装配到适配器。如果你把该bean的名字定义成connectionFactory,则不需要指定它,适配器会自动找该名字的bean注入。
我们使用ActiveMQ作为提供者,brokerURL指向本地的ActiveMQ服务器。
这里需要注意的是JMS Destination对象,它实质上是JMS技术里的一个Queue,适配器连接本地ActiveMQ服务,检查POSITIONS_QUEUE
获取发现的消息,发布它到本地应用程序信道positions-channel.

Message-Driven Consumers:消息驱动消费
该情形是服务端基于订阅情况来驱动的消费类型。message-driven-channel-adapter 元素定义它,消费者需要一个
Spring MessageListener容器或者一个connectionFactory和destination的组合。
<!-- Event Driven consumer-->
<jms:message-driven-channel-adapter id="msgDrivenPositionsAdapter"
    connection-factory="connectionFactory"
    destination="positionsQueue"
    channel="positions-channel">
</jms:message-driven-channel-adapter>

这里需要将Spring的对象转换为JMS Message,或者将Message转换回Spring对象, extract-payload属性用于转换消息的Payload

关于消息的Payload转换:
我们需要使用转换器从JMS Message中取出payload,然后放到本地message中。框架为我们提供了SimpleMessageConerter
它会将内容转换为我们需要的消息payload,
如果JMS是一个TextMessage,会转换为String,如果是ByteMessage,它会转换为bytes。
需要注意的是只有属性extract-payload设置为true时,转换器才会启动,默认为true。
我们还可以通过message-converter属性指定自定义的转换器。
<jms:message-driven-channel-adapter
    id="msgDrivenPositionsAdapter"
    ...
    message-converter="positionsConverter">
</jms:message-driven-channel-adapter>
<bean id="positionsConverter" class="com.madhusudhan.jsi.adapters.jms.PositionsConverter">
-----------------------------------------------------------------------------------------------------
发布消息:输出适配器
它的任务是从信道获取消息并发布到JMS Queue或者Topic中。
<jms:outbound-channel-adapter channel="positions-channel"
    connection-factory="connectionFactory"
    destination="positionsQueue">
    <int:poller fixed-rate="1000"/>
</jms:outbound-channel-adapter>
底层我们使用JmsSendingMessageHandler,相反的当 extract-payload设置为true时,适配器会转换信道的payload到JMS Message 内容。
==============================================================================================================

JDBC 适配器:
同样分为进项适配器和出向适配器,进向适配器从数据库中获取数据并将结果集作为Message负载发布到本地信道。
出向适配器读取信道的消息数据保存到数据库中。

进向JDBC适配器:
负责读取数据集然后转换成消息,jdbc命名空间下inbound-channel-adapter用于创建这类端点。
适配器提供一个SQL查询和一个目的信道,同时还定义一个Datasource实例,用它来提供相关数据库的连接设置。
<jdbc:inbound-channel-adapter channel="resultset-channel"
    data-source="mySqlDatasource"
    query="SELECT * FROM ACCOUNTS A
    where A.STATUS='NEW' and POLLED='N'">
    <int:poller fixed-rate="1000"/>
</jdbc:inbound-channel-adapter>
上面的设置使用query查询ACCOUNTS表状态为NEW的数据然后转换为Message发布到resultset-channel信道。
这里会将整个查询结果集List作为一个消息的payload,记录的类型依赖于我们行映射策略。

有时候我们不希望轮询结果包含重复的结果,框架提供了update语句追加到每次查询上。
我们每次拉数据时,我们会使用select查询中的特定设置更新记录,来避免获取以更新的数据。

比如我们只希望获取新创建的Account记录,所以我们可以更新名为POLLED的一列。
<jdbc:inbound-channel-adapter channel="resultset-channel"
    data-source="mySqlDatasource"
    query="SELECT * FROM ACCOUNTS A where A.STATUS='NEW' and POLLED='N'"
    update="UPDATE ACCONTS set POLLED='Y' where ACCOUNT_ID in (:ACCOUNT_ID)">
    <int:poller fixed-rate="1000" />
</jdbc:inbound-channel-adapter>
--------------------------------------------------------------------------------------------------
出向JDBC适配器:
用于在数据库中执行SQL查询,而这查询语句是有从输入信道的消息里获取的内容构建的。
所以,出向适配器监听消息信道,获取消息,抽取消息中相关的值,构造查询语句并在数据库上执行该语句。

比如每个出现在信道trade-persistence-channel的Trade消息应该被保存。
正常情况下,我们会写一个消息消费终端来获取每个消息然后用持久化机制来讲消息保存到数据库。
然而,Spring Integration替我们干了这些。
我们只需要配置一个出向适配器:
<jdbc:outbound-channel-adapter
    channel="trades-persistence-channel"
    data-source="mySqlDatasource"
    query="insert into TRADE t(ID,ACCOUNT,INSTRUMENT)
    values(:payload[TRADE_ID], :payload[TRADE_ACCOUNT],:payload[TRADE_INSTRUMENT])">
</jdbc:outbound-channel-adapter>
这里比较有趣的是query参数的设置,我们可以使用payload键标记来规范参数。
每一个输入的消息都会有一个Map类型的payload负载,从这里面我们可以查询ID,ACCOUNT等
同时我们还可以使用headers这个Map值。
qyery="insert into TRADE t(ID,ACCOUNT,INSTRUMENT,EXPIRY)
    values(:payload[TRADE_ID], :payload[TRADE_ACCOUNT],
    :payload[TRADE_INSTRUMENT], :headers[EXPIRY])">
    
Map消息的创建代码:
public Message<Map<String, Object>> createTradeMessage(){
    Map<String, Object> tradeMap = new HashMap<String, Object>();
    tradeMap.put("ID", "1929303d");
    tradeMap.put("ACCOUNT", "ACC12345");
    //..
    // Create a Msg using MessageBuilder
    Message<Map<String, Object>> tradeMsg = MessageBuilder.withPayload(tradeMap).build();
    return tradeMsg;
}
如此,只要消息一到达persistence-channel,它就会被适配器获取并自动在数据库上执行构造的SQL。
 

© 著作权归作者所有

D
粉丝 3
博文 105
码字总数 248750
作品 0
海淀
高级程序员
私信 提问
Spring Integration 4.1 Milestone 1 发布

Spring Integration 4.1 Milestone 1 发布,现已提供在 Milestone Repository,可以使用 Maven 和 Gradle 安装升级,也可以直接下载。 Spring Integration 4.1 Milestone 1 包括一些新特性,...

oschina
2014/09/06
998
0
Spring IO Platform 1.1.5/2.0.1 发布

Spring IO Platform 1.1.5/2.0.1 发布,现已提供在 repo.spring.io 和 Maven Central。Spring IO Platform 1.1.5 是个维护版本,主要包括最新的维护版本: Spring Batch 3.0.6.RELEASE Sprin......

oschina
2015/12/19
1K
2
Spring Boot 1.4.7 发布

Spring Boot 1.4.7 发布了,该版本包含 30 多项更改,主要是依赖性升级和文档改进。部分如下: Update to Spring Integration 4.3.10 Upgrade to Spring Framework 4.3.9.RELEASE Upgrade to...

淡漠悠然
2017/06/08
1K
5
Spring Integration 4.2 M1 发布

Spring Integration 4.2 Milestone 1 发布,此版本更新内容如下: A significant overhaul of the JMX support in the framework, providing performance and other improvements when JMX ......

oschina
2015/05/29
1K
5
Spring IO Platform Brussels SR10 和 Cairo SR1 发布

Spring IO Platform Brussels-SR10 和 Spring IO Platform Cairo-SR1 均已发布,可从 repo.spring.io 和 Maven Central 获取更新。 两者都对许多项目进行了升级,其中有 Spring IO Platform...

局长
2018/05/11
679
0

没有更多内容

加载失败,请刷新页面

加载更多

【JAVA基础☞内部存储和GC】Java方法区和永久代

Java方法区和永久代 这里只讨论HotSpot虚拟机,这也是目前使用的最多的JVM。Sun JDK7 HotSpot虚拟机的内存模型如下图所示: 1、什么是方法区 在Java虚拟机中,方法区是可供各线程共享的运行时...

卯金刀GG
5分钟前
0
0
Spring Boot Actuator

编者注 由于开发一个对性能要求很强的后台应用,突然发现部署到aliyun发生问题,而普通笔记本没有任何问题,则需要持续一段时间的监控定位函数执行时间,分析过程,并添加健康检查的可视化内...

抢小孩糖吃
6分钟前
0
0
Zabbix监控ActiveMQ

当我们在线上使用了ActiveMQ 后,我们需要对一些参数进行监控,比如 消息是否有阻塞,哪个消息队列阻塞了,总的消息数是多少等等。下面我们就通过 Zabbix 结合 Python 脚本来实现对 ActiveMQ...

SEOwhywhy
18分钟前
1
0
非webpack require.js + vue + vueRouter + iView 实现按需加载

适合一个人开发的时候,在整个php框架下,又想单页,又可以直接后端assign变量穿透到模板。又不想写接口搞前后分离脚手架一大堆npm 包, 在php模板下 引入require.js <!DOCTYPE html><html...

一箭落旄头
33分钟前
7
0
新特性解读 | MySQL 8.0 窗口函数详解

原创作者: 杨涛涛 背景 一直以来,MySQL 只有针对聚合函数的汇总类功能,比如MAX, AVG 等,没有从 SQL 层针对聚合类每组展开处理的功能。不过 MySQL 开放了 UDF 接口,可以用 C 来自己写UDF...

爱可生
39分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部