文档章节

Spring Integration

慕容若冰
 慕容若冰
发布于 2016/12/09 22:34
字数 839
阅读 480
收藏 1
点赞 0
评论 0
  1. Spring Integration主要解决的问题是不同系统之间交互的问题,通过异步消息驱动来达到系统交互时系统之间的松耦合。
  2. Spring Integration主要由Message,Channel和Message EndPoint组成。
  3. Message:用来在不同部分之间传递的数据。由消息体(payload)和消息头(header)。消息体可以是任何数据类型(xml,json,Java对象);消息头的元数据就是解释消息体的内容。
  4. Channel:消息发送者发送消息到通道,消息接收者从通道接收消息。
  5. Message EndPoint:是真正处理xiao消息的组件,还可以控制通道的路由。可用的消息端点有:

(1)Channel Adapter:是一种连接外部或传输协议的端点,分为入站(inbound)和出站(outbound),通道适配器是单向的,入站通道适配器只支持接收消息,出站通道适配器只支持输出消息。

      6. Spring Integration Java DSL :Spring Integration 提供IntegrationFlow来定义系统继承流程,

        通过IntegrationFlows和IntegrationFlowBuilder来实现使用Fluent API来定义流程。

        一个简单的流程定义如下:

@Bean
public IntegrationFlow demoFlow() {
   return IntegrationFlows.from("input")//从Channel input获取消息
         .<SyndEntry, String> transform(Integer::parseInt)//将消息转换成整数
         .get();//获取集成流程并注册为Bean
}

完整示例:

package com.integration;

import com.rometools.rome.feed.synd.SyndEntry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.file.Files;
import org.springframework.integration.dsl.mail.Mail;
import org.springframework.integration.feed.inbound.FeedEntryMessageSource;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.scheduling.PollerMetadata;

import java.io.File;
import java.io.IOException;

import static java.lang.System.getProperty;

@SpringBootApplication
public class IntegrationApplication {

   //自动获得资源
   @Value("https://spring.io/blog.atom")
   Resource resource;

   public static void main(String[] args) {
      SpringApplication.run(IntegrationApplication.class, args);
   }

   //配置默认的轮询方式
   @Bean(name = PollerMetadata.DEFAULT_POLLER)
   public PollerMetadata poller() {
      return Pollers.fixedRate(500).get();
   }

   @Bean
   public FeedEntryMessageSource feedMessageSource() throws IOException {
      FeedEntryMessageSource messageSource = new
            FeedEntryMessageSource(resource.getURL(), "news");
      return messageSource;
   }

   @Bean
   public IntegrationFlow myFlow() throws IOException {
      return IntegrationFlows.from(feedMessageSource())
            //通过route来选择路由,消息体(payload)的类型为SyndEntry,判断条件类型为String
            //判断的值通过payload获得的分类(Categroy)
            //不同的分类值转向不同的消息通道
            .<SyndEntry, String>route(payload -> payload.getCategories().get(0).getName(),
                  mapping -> mapping.channelMapping("releases",
                        "releasesChannel")
            .channelMapping("engineering",
                  "engineeringChannel")
            .channelMapping("news",
                  "newsChannel"))
            .get();//获得IntegrationFlow实体,配置为Spring的Bean
   }

   //releases流程
   @Bean
   public IntegrationFlow releasesFlow() {
      return IntegrationFlows.from(MessageChannels.queue("releasesChannel",
            10))   //从releasesChannel获取数据
            .<SyndEntry, String>transform( //transform方法做数据转换,payload类型为SynEntry,将其转换为字符串类型,
                  // 并自定义数据的格式
                  payload -> "《" + payload.getTitle() + "》" +
                        payload.getLink() + getProperty("line.separator"))
            //handle方法处理file的出站适配器,Files类是由Spring Integration Java DSL提供的
            // Fluent API 用来构造文件输出的适配器
            .handle(Files.outboundAdapter(new File("g:/springblog"))
            .fileExistsMode(FileExistsMode.APPEND)
            .charset("UTF-8")
            .fileNameGenerator(message -> "releases.txt")
            .get())
            .get();
   }

   //engineering流程,与releases流程相同
   @Bean
   public IntegrationFlow engineeringFlow() {
      return IntegrationFlows.from(MessageChannels.queue("engineeringChannel", 10))
            .<SyndEntry, String> transform(
                  e -> "《" + e.getTitle() + "》" + e.getLink() +
                        getProperty("line.separator"))
            .handle(Files.outboundAdapter(new File("g:/springblog"))
            .fileExistsMode(FileExistsMode.APPEND)
            .charset("UTF-8")
            .fileNameGenerator(message -> "engineering.txt")
            .get())
            .get();
   }

   //news流程
   @Bean
   public IntegrationFlow newsFlow() {
      return IntegrationFlows.from(MessageChannels.queue("newsChannel", 10))
            .<SyndEntry, String> transform(
                  payload -> "《" + payload.getTitle() + "》" +
                        payload.getLink() + getProperty("line.separator"))
            //提供enrichHeaders方法增加消息头的信息
            .enrichHeaders(
                  Mail.headers()
                  .subject("来自Spring的新闻")
                  .to("guang2_tan@126.com")
                  .from("guang2_tan@126.com"))
            //邮件发送的相关信息通过Spring Integration Java DSL提供的Mail的headers方法来构造
            //使用handle方法自定义邮件发送的出站适配器,使用Spring Integration Java DSL提供的Mail.outboundAdapter来构造
            //使用guang2_tan@126.com向自己发送邮件
            .handle(Mail.outboundAdapter("smtp.126.com")
            .port(25)
            .protocol("smtp")
            .credentials("guang2_tan@126.com", "abc1234")
                  .javaMailProperties(p -> p.put("mail.debug", "false")),
                  e -> e.id("smtpOut"))
            .get();
   }
}

可能会出现:javax.mail.AuthenticationFailedException: 550

解决办法:可能邮箱没有开通pop/stmp协议

© 著作权归作者所有

共有 人打赏支持
慕容若冰
粉丝 0
博文 43
码字总数 9774
作品 0
广州
程序员

暂无相关文章

Sqoop

1.Sqoop: 《=》 SQL to Hadoop 背景 1)场景:数据在RDBMS中,我们如何使用Hive或者Hadoop来进行数据分析呢? 1) RDBMS ==> Hadoop(广义) 2) Hadoop ==> RDBMS 2)原来可以通过MapReduce I...

GordonNemo ⋅ 44分钟前 ⋅ 0

全量构建和增量构建的区别

1.全量构建每次更新时都需要更新整个数据集,增量构建只对需要更新的时间范围进行更新,所以计算量会较小。 2.全量构建查询时不需要合并不同Segment,增量构建查询时需要合并不同Segment的结...

无精疯 ⋅ 54分钟前 ⋅ 0

如何将S/4HANA系统存储的图片文件用Java程序保存到本地

我在S/4HANA的事务码MM02里为Material维护图片文件作为附件: 通过如下简单的ABAP代码即可将图片文件的二进制内容读取出来: REPORT zgos_api.DATA ls_appl_object TYPE gos_s_obj.DA...

JerryWang_SAP ⋅ 今天 ⋅ 0

云计算的选择悖论如何对待?

导读 人们都希望在工作和生活中有所选择。但心理学家的调查研究表明,在多种选项中进行选择并不一定会使人们更快乐,甚至不会产生更好的决策。心理学家Barry Schwartz称之为“选择悖论”。云...

问题终结者 ⋅ 今天 ⋅ 0

637. Average of Levels in Binary Tree - LeetCode

Question 637. Average of Levels in Binary Tree Solution 思路:定义一个map,层数作为key,value保存每层的元素个数和所有元素的和,遍历这个树,把map里面填值,遍历结束后,再遍历这个map,把每...

yysue ⋅ 今天 ⋅ 0

IDEA配置和使用

版本控制 svn IDEA版本控制工具不能使用 VCS-->Enable Version Control Integration File-->Settings-->Plugins 搜索Subversion,勾选SVN和Git插件 删除.idea文件夹重新生成项目 安装SVN客户......

bithup ⋅ 今天 ⋅ 0

PE格式第三讲扩展,VA,RVA,FA的概念

作者:IBinary 出处:http://www.cnblogs.com/iBinary/ 版权所有,欢迎保留原文链接进行转载:) 一丶VA概念 VA (virtual Address) 虚拟地址的意思 ,比如随便打开一个PE,找下它的虚拟地址 这边...

simpower ⋅ 今天 ⋅ 0

180623-SpringBoot之logback配置文件

SpringBoot配置logback 项目的日志配置属于比较常见的case了,之前接触和使用的都是Spring结合xml的方式,引入几个依赖,然后写个 logback.xml 配置文件即可,那么在SpringBoot中可以怎么做?...

小灰灰Blog ⋅ 今天 ⋅ 0

冒泡排序

原理:比较两个相邻的元素,将值大的元素交换至右端。 思路:依次比较相邻的两个数,将小数放在前面,大数放在后面。即在第一趟:首先比较第1个和第2个数,将小数放前,大数放后。然后比较第...

人觉非常君 ⋅ 今天 ⋅ 0

Vagrant setup

安装软件 brew cask install virtualboxbrew cask install vagrant 创建project mkdir -p mst/vmcd mst/vmvagrant init hashicorp/precise64vagrant up hashicorp/precise64是一个box......

遥借东风 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部