文档章节

Camel In Action 读书笔记 (8)

vidy_tu
 vidy_tu
发布于 2013/06/16 16:36
字数 1059
阅读 2.1K
收藏 3

第8章Enterprise integration patterns是core Camel的最后一章了,第二章已经介绍了一部分camel在eip中的应用。这一章全部都是讲eip.

看来eip确实是camel的核心,camel确实是基于eip的。

这一章包含了5方面的EIP问题:

The Aggregator EIP 消息合并
The Splitter EIP  消息分拆
The Routing Slip EIP 根据消息标签进行路由(也是一种路由)
The Dynamic Router EIP 动态路由
The Load Balancer EIP 负载均衡

The Aggregator EIP

消息合并模式,如下图所示,只有当接收到三个标识相同的消息合并完成后再发送出去。1为消息标识,ABC为消息内容。

image

 

关于消息合并我们需要关注三个方面:

1.Correlation identifier

消息合并的标识,即如何确定这几个消息是一组的。如上图标识的1,2

2.Completion condition

消息合并完成的条件,即多少个消息,多长时间内的消息合并。如上图3个消息合并完成。

3.Aggregation strategy

消息合并的策略,即消息是如何合并的。这个需要自己实现接口AggregationStrategy。

public void configure() throws Exception {
    from("direct:start")
        .log("Sending ${body}")
        .aggregate(xpath(
/order/@customer), new MyAggregationStrategy()) //定义合并标示
           .completionSize(2).completionTimeout(5000)  //定义合并完成条件消息数量为2,等待时间为5s
            .log("Sending out ${body}")
            .to("mock:result");
}

消息合并的是需要等待的,默认情况下Camel是将收到的消息缓存在内存中,在生产中为了消息的安全,可能需要将收到的消息持久化。

Camel提供了接口AggregationRepository帮助我们实现消息持久化,如下。

AggregationRepository myRepo = new
    HawtDBAggregationRepository("myrepo", "data/myrepo.dat");
from("file://target/inbox")
    .log("Consuming ${file:name}")

.aggregate(constant(true), new MyAggregationStrategy())
    .aggregationRepository(myRepo)
    .completionSize(3)
    .log("Sending out ${body}")
    .to("mock:result");
HawtDB是Camel自带的一个嵌入式数据库。

The Splitter EIP 

消息分拆模式与前一种模式刚好相反,是将接收到的消息按一定的规则分拆成多个消息发出去,如下图所示。

image

Camel对消息的分拆也提供了多种方法:

如果消息本身就是一个集合,可以用DSL中的split对消息分拆,示例如下:

public class SplitterABCTest extends CamelTestSupport {
    public void testSplitABC() throws Exception {
        MockEndpoint mock = getMockEndpoint("mock:split");
        mock.expectedBodiesReceived("A", "B", "C");
        List<String> body = new ArrayList<String>();
        body.add("A");
        body.add("B");
        body.add("C");
        template.sendBody("direct:start", body);
        assertMockEndpointsSatisfied();
    }
    protected RouteBuilder createRouteBuilder() throws Exception {
        return new RouteBuilder() {
            public void configure() throws Exception {
                from("direct:start")
                    .split(body())                   
                        .log("Split line ${body}")
                        .to("mock:split");
            }
        };
    }
}

如果消息本身是一个对象我们也可能通过bean的形式对消息分拆,示例如下:
public void configure() throws Exception {
    from("direct:start")
        .split().method(CustomerService.class, "splitDepartments")
            .to("log:split")
            .to("mock:split");
}

最后对大消息的分拆可能采用流的形式如.split(body().tokenize("\n")).streaming()

The Routing Slip EIP

有些特殊的消息我们需要根据消息来动态的路由到一个多个结点。这是我们可以采用routing Slip的模式,示例如下:

public void testRoutingSlip() throws Exception {
    getMockEndpoint("mock:a").expectedMessageCount(1);
    getMockEndpoint("mock:b").expectedMessageCount(0);
    getMockEndpoint("mock:c").expectedMessageCount(1);
    template.sendBodyAndHeader("direct:start", "Hello World",
                               "mySlip", "mock:a,mock:c");
    assertMockEndpointsSatisfied();
}

路由定义为:from("direct:start").routingSlip("mySlip", ";");

关于Routing Slip的形式也有多种方式:

各split一样可以采用dsl的形式,bean的形式同时camel还是提供了注解@RoutingSlip。

The Dynamic Router EIP

动态路由和Routing Slip一样也是不同的消息路由到不同的节点,如下。

public class DynamicRouterBean {
public String route(String body,
    @Header(Exchange.SLIP_ENDPOINT) String previous) {    
    return whereToGo(body, previous);                   
}
private String whereToGo(String body, String previous) {
    if (previous == null) {
        return "mock://a";
    } else if ("mock://a".equals(previous)) {
        return "language://simple:Bye ${body}";
    } else {
        return null;                           
    }
}
基本可以同上,也提供了注解的形式@DynamicRouter。

The Load Balancer EIP

负载均衡就不用细说了,Camel提供了对负载均衡的支持,示例如下:

from("direct:start")
    .loadBalance().roundRobin()
        .to("seda:a").to("seda:b")
    .end();

关于负载均衡的策略Camel也提供了6种策略:Random,Round robin,Sticky,Topic,Failover,Custom。

更详细的信息请查询手册。

小结:至此Camel 的核心部分已经介绍完了,后面会再写一篇关于Camel的事务控制(第9章),额外介绍下Camel关于路由质量的监控BAM组件的使用,

其他的章节就不再介绍了。

© 著作权归作者所有

vidy_tu

vidy_tu

粉丝 34
博文 18
码字总数 7631
作品 0
武汉
程序员
私信 提问
加载中

评论(7)

vidy_tu
vidy_tu 博主

引用来自“西夏一品堂”的评论

<dynamicRouter ignoreInvalidEndpoints="true">
    <method ref="slipRouter" method="route"/>
  </dynamicRouter>
camel为啥要死循环去执行这个method
routingSlip 是一次返回一个和多个endpoint , dynamicRouter 是一次返回一个endpoint ,当返回为空时退出 package local.camel; import org.apache.camel.Exchange; import org.apache.camel.ProducerTemplate; import org.apache.camel.builder.RouteBuilder; import org.apache.camel.impl.DefaultCamelContext; import org.apache.camel.model.language.MethodCallExpression; public class RoteTest { public static int cnt = 0; public static void main(String[] args) throws Exception { DefaultCamelContext context = new DefaultCamelContext(); RouteBuilder rb = new RouteBuilder() { @Override public void configure() throws Exception { from("direct://a").routingSlip(new MethodCallExpression(RoteTest.class, "slip")); from("direct://b").bean(RoteTest.class, "log"); from("direct://c").bean(RoteTest.class, "log"); from("direct://d").dynamicRouter(new MethodCallExpression(RoteTest.class, "dynamic")); } }; rb.addRoutesToCamelContext(context); context.start(); ProducerTemplate template =context.createProducerTemplate(1); template.sendBody("direct://a", "Hello"); //template.sendBody("direct://d", "Hello"); context.shutdown(); } public void log(String str){ System.out.println(str); } public String slip(){ return "direct://b,direct://c"; } public String dynamic(){ if(cnt < 3){ ++cnt; return "direct://b"; } return null; } }
西夏一品堂
西夏一品堂
<dynamicRouter ignoreInvalidEndpoints="true">
    <method ref="slipRouter" method="route"/>
  </dynamicRouter>
camel为啥要死循环去执行这个method
西夏一品堂
西夏一品堂
routingSlip 和 DynamicRouter有什么区别?
vidy_tu
vidy_tu 博主

引用来自“夕水溪下”的评论

引用来自“滄海一夢”的评论

引用来自“夕水溪下”的评论

这书我记得没中文版本吧

没有,好像有个网友自己翻译了一张,开始也在想是翻译,还是写个笔记,觉得翻译工程太大了,所以就写了这个系列。

你可以翻译后然后写本书,哈

没那勇气,也没那毅力。呵呵。够用就好。
lateron
lateron

引用来自“滄海一夢”的评论

引用来自“夕水溪下”的评论

这书我记得没中文版本吧

没有,好像有个网友自己翻译了一张,开始也在想是翻译,还是写个笔记,觉得翻译工程太大了,所以就写了这个系列。

你可以翻译后然后写本书,哈
vidy_tu
vidy_tu 博主

引用来自“夕水溪下”的评论

这书我记得没中文版本吧

没有,好像有个网友自己翻译了一张,开始也在想是翻译,还是写个笔记,觉得翻译工程太大了,所以就写了这个系列。
lateron
lateron
这书我记得没中文版本吧
Camel In Action 读书笔记 (5)

接下来说说第四章, 第四章介绍如何在Camel中使用普通bean,关于bean的使用方式前面已有介绍:javaDSL和SpringDSL两种方式。 下面讲下bean的Camel中的使用模式。 The Service Activator patt...

vidy_tu
2013/06/09
611
0
Camel In Action 读书笔记 (6)

今天继续学习第五章,Error handling. Camel作为一个集成框架,需要与外部系统对接。外部系统的不稳定因素导致其异常处理也相对复杂些。 Camel将异常分为两大类: 一是 irrecoverable error(...

vidy_tu
2013/06/11
1.1K
1
Camel In Action 读书笔记 (2)

在1.4中介绍了Camel的整体架构,以及Camel中的一些概念. 架构图 关于各个概念不在此详述. Camel的主要流程: 1:创建CamelContext CamelContext context = new DefaultCamelContext(); 2.添加C...

vidy_tu
2013/06/01
1.6K
0
Apache Camel 2.17.1 发布,路由以及媒介引擎

Apache Camel 2.17.1 发布了,一些提升及新特性: [CAMEL-9574] - Be able to force one-way operation when using camel-cxf transport [CAMEL-9883] - Add a SpringCache based idempotent......

淡漠悠然
2016/05/09
1.2K
2
Apache Camel 3.0.0 发布,十年来首个主要版本

Apache Camel 3.0.0 发布了,这是自 2009 年 8 月 25 日发布以来,该项目十年来第一个主要版本。 Camel 是一个基于规则的路由以及媒介引擎,它提供了一个基于 POJO 的企业集成模式的实现,开...

oschina
2019/12/03
7.6K
9

没有更多内容

加载失败,请刷新页面

加载更多

随机梯度下降

本文首发自公众号:RAIS,点击直接关注。 前言 本系列文章为 《Deep Learning》 读书笔记,可以参看原书一起阅读,效果更佳。 梯度下降算法 在机器学习或深度学习中,模型的训练在一般都会被...

我是任玉琢
28分钟前
20
0
Navicat for MySQL下载安装和破解教程

1.进https://navicatformysql.en.softonic.com/官网 2.第二步 3.第三步等待下载完成 4.第四步双击 二,破解 1.链接:https://pan.baidu.com/s/1CjV7JVzi7pVqlxKMQ3S8wg 密码:hk59 下载后解压...

osc_zgt6zhsy
28分钟前
23
0
Modbus协议和应用开发介绍

因业务需要了解Modbus协议的使用,因此对Modbus的协议,以及相应的C#处理应用进行了解,针对协议的几种方式(RTU、ASCII、TCPIP)进行了封装,以及对Modbus的各种功能码的特点进行了详细的了...

osc_9yulefcq
30分钟前
17
0
golang设置代理

golang.org打不开,设置一下代理. 打开 MODULE: set GO111MODULE=on //windowsexport GO111MODULE=on //linux 设置代理: // set GOPROXY=https://proxy.golang.orgset GOPROXY=https://mi......

漫步海边小路
30分钟前
19
0
01.ElasticSearch (RestFul Api 基本操作)

创建操作 创建索引(库) #number_of_shards 设置分片#number_of_replicas 设置备份PUT 索引{ "settings": { "number_of_shards": 1, "number_of_replicas": 0 }} 创建映射类......

Pole丶逐
31分钟前
20
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部