文档章节

Spring Integration

慕容若冰
 慕容若冰
发布于 2016/12/09 22:34
字数 839
阅读 783
收藏 1
  1. Spring Integration主要解决的问题是不同系统之间交互的问题,通过异步消息驱动来达到系统交互时系统之间的松耦合。
  2. Spring Integration主要由Message,Channel和Message EndPoint组成。
  3. Message:用来在不同部分之间传递的数据。由消息体(payload)和消息头(header)。消息体可以是任何数据类型(xml,json,Java对象);消息头的元数据就是解释消息体的内容。
  4. Channel:消息发送者发送消息到通道,消息接收者从通道接收消息。
  5. Message EndPoint:是真正处理xiao消息的组件,还可以控制通道的路由。可用的消息端点有:

(1)Channel Adapter:是一种连接外部或传输协议的端点,分为入站(inbound)和出站(outbound),通道适配器是单向的,入站通道适配器只支持接收消息,出站通道适配器只支持输出消息。

      6. Spring Integration Java DSL :Spring Integration 提供IntegrationFlow来定义系统继承流程,

        通过IntegrationFlows和IntegrationFlowBuilder来实现使用Fluent API来定义流程。

        一个简单的流程定义如下:

@Bean
public IntegrationFlow demoFlow() {
   return IntegrationFlows.from("input")//从Channel input获取消息
         .<SyndEntry, String> transform(Integer::parseInt)//将消息转换成整数
         .get();//获取集成流程并注册为Bean
}

完整示例:

package com.integration;

import com.rometools.rome.feed.synd.SyndEntry;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.core.io.Resource;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.channel.MessageChannels;
import org.springframework.integration.dsl.core.Pollers;
import org.springframework.integration.dsl.file.Files;
import org.springframework.integration.dsl.mail.Mail;
import org.springframework.integration.feed.inbound.FeedEntryMessageSource;
import org.springframework.integration.file.support.FileExistsMode;
import org.springframework.integration.scheduling.PollerMetadata;

import java.io.File;
import java.io.IOException;

import static java.lang.System.getProperty;

@SpringBootApplication
public class IntegrationApplication {

   //自动获得资源
   @Value("https://spring.io/blog.atom")
   Resource resource;

   public static void main(String[] args) {
      SpringApplication.run(IntegrationApplication.class, args);
   }

   //配置默认的轮询方式
   @Bean(name = PollerMetadata.DEFAULT_POLLER)
   public PollerMetadata poller() {
      return Pollers.fixedRate(500).get();
   }

   @Bean
   public FeedEntryMessageSource feedMessageSource() throws IOException {
      FeedEntryMessageSource messageSource = new
            FeedEntryMessageSource(resource.getURL(), "news");
      return messageSource;
   }

   @Bean
   public IntegrationFlow myFlow() throws IOException {
      return IntegrationFlows.from(feedMessageSource())
            //通过route来选择路由,消息体(payload)的类型为SyndEntry,判断条件类型为String
            //判断的值通过payload获得的分类(Categroy)
            //不同的分类值转向不同的消息通道
            .<SyndEntry, String>route(payload -> payload.getCategories().get(0).getName(),
                  mapping -> mapping.channelMapping("releases",
                        "releasesChannel")
            .channelMapping("engineering",
                  "engineeringChannel")
            .channelMapping("news",
                  "newsChannel"))
            .get();//获得IntegrationFlow实体,配置为Spring的Bean
   }

   //releases流程
   @Bean
   public IntegrationFlow releasesFlow() {
      return IntegrationFlows.from(MessageChannels.queue("releasesChannel",
            10))   //从releasesChannel获取数据
            .<SyndEntry, String>transform( //transform方法做数据转换,payload类型为SynEntry,将其转换为字符串类型,
                  // 并自定义数据的格式
                  payload -> "《" + payload.getTitle() + "》" +
                        payload.getLink() + getProperty("line.separator"))
            //handle方法处理file的出站适配器,Files类是由Spring Integration Java DSL提供的
            // Fluent API 用来构造文件输出的适配器
            .handle(Files.outboundAdapter(new File("g:/springblog"))
            .fileExistsMode(FileExistsMode.APPEND)
            .charset("UTF-8")
            .fileNameGenerator(message -> "releases.txt")
            .get())
            .get();
   }

   //engineering流程,与releases流程相同
   @Bean
   public IntegrationFlow engineeringFlow() {
      return IntegrationFlows.from(MessageChannels.queue("engineeringChannel", 10))
            .<SyndEntry, String> transform(
                  e -> "《" + e.getTitle() + "》" + e.getLink() +
                        getProperty("line.separator"))
            .handle(Files.outboundAdapter(new File("g:/springblog"))
            .fileExistsMode(FileExistsMode.APPEND)
            .charset("UTF-8")
            .fileNameGenerator(message -> "engineering.txt")
            .get())
            .get();
   }

   //news流程
   @Bean
   public IntegrationFlow newsFlow() {
      return IntegrationFlows.from(MessageChannels.queue("newsChannel", 10))
            .<SyndEntry, String> transform(
                  payload -> "《" + payload.getTitle() + "》" +
                        payload.getLink() + getProperty("line.separator"))
            //提供enrichHeaders方法增加消息头的信息
            .enrichHeaders(
                  Mail.headers()
                  .subject("来自Spring的新闻")
                  .to("guang2_tan@126.com")
                  .from("guang2_tan@126.com"))
            //邮件发送的相关信息通过Spring Integration Java DSL提供的Mail的headers方法来构造
            //使用handle方法自定义邮件发送的出站适配器,使用Spring Integration Java DSL提供的Mail.outboundAdapter来构造
            //使用guang2_tan@126.com向自己发送邮件
            .handle(Mail.outboundAdapter("smtp.126.com")
            .port(25)
            .protocol("smtp")
            .credentials("guang2_tan@126.com", "abc1234")
                  .javaMailProperties(p -> p.put("mail.debug", "false")),
                  e -> e.id("smtpOut"))
            .get();
   }
}

可能会出现:javax.mail.AuthenticationFailedException: 550

解决办法:可能邮箱没有开通pop/stmp协议

© 著作权归作者所有

共有 人打赏支持
慕容若冰
粉丝 0
博文 44
码字总数 9774
作品 0
广州
程序员

暂无文章

搬瓦工镜像站bwh1.net被DNS污染,国内打不开搬瓦工官网

今天下午(2018年10月17日),继搬瓦工主域名bandwagonhost.com被污染后,这个国内的镜像地址bwh1.net也被墙了。那么目前应该怎么访问搬瓦工官网呢? 消息来源:搬瓦工优惠网->搬瓦工镜像站b...

flyzy2005
今天
1
0
SpringBoot自动配置

本篇介绍下,如何通过springboot的自动配置,将公司项目内的依赖jar,不需要扫描路径,依赖jar的情况下,就能将jar内配置了@configuration注解的类,创建到IOC里面 介绍下开发环境 JDK版本1.8 spr...

贺小五
今天
3
0
命令行新建Maven多项目

参考地址 # DgroupId 可以理解为包名# DartifactId 可以理解为项目名mvn archetype:generate -DgroupId=cn.modfun -DartifactId=scaffold -DarchetypeArtifactId=maven-archetype-quickst......

阿白
今天
1
0
OSChina 周四乱弹 —— 上帝对我单身年限的惩罚越来越长了

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @达尔文:分享张卫健的单曲《身体健康》 《身体健康》- 张卫健 手机党少年们想听歌,请使劲儿戳(这里) 昨天是重阳节咯, 可惜小小编辑总是晚...

小小编辑
今天
12
1
django rest framework 外键序列化方法与问题总结

django rest framework 外键序列化方法与问题总结 当借口中需要出现一对多关系的时候,我们可以用rest_framwork的序列化功能来处理,代码如下. # models.pyfrom django.db import modelscl...

_Change_
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部