文档章节

Spring Batch 快速入门

慕容若冰
 慕容若冰
发布于 2016/12/09 14:09
字数 1084
阅读 117
收藏 3
点赞 0
评论 0
  1. Spring Batch 是用来处理大量数据操作的框架,主要用来读取大量数据,然后进行一定处理后输出成指定的形式。
  2. Spring Batch 主要组成部分
  • JobRepository:用来注册Job的容器
  • JobLauncher:用来启动Job的接口
  • Job:实际执行的任务,包含一个或多个Step
  • Step:Step包含ItemReader,ItemProcessor,ItemWriter
  • ItemReader:读取数据的接口
  • ItemProcessor:处理数据的接口
  • ItemWriter:输出数据的接口

以上Spring Batch 主要组成部分只需要注册成Spring的Bean即可,在配置类上使用@EnableBatchProcession注解开启批处理支持

package com.springbatch.config;

import com.springbatch.batch.CsvBeanValidator;
import com.springbatch.batch.CsvItemProcessor;
import com.springbatch.batch.CsvJobListener;
import com.springbatch.entity.Person;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.launch.support.RunIdIncrementer;
import org.springframework.batch.core.launch.support.SimpleJobLauncher;
import org.springframework.batch.core.repository.JobRepository;
import org.springframework.batch.core.repository.support.JobRepositoryFactoryBean;
import org.springframework.batch.item.ItemProcessor;
import org.springframework.batch.item.ItemReader;
import org.springframework.batch.item.ItemWriter;
import org.springframework.batch.item.database.BeanPropertyItemSqlParameterSourceProvider;
import org.springframework.batch.item.database.JdbcBatchItemWriter;
import org.springframework.batch.item.file.FlatFileItemReader;
import org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper;
import org.springframework.batch.item.file.mapping.DefaultLineMapper;
import org.springframework.batch.item.file.transform.DelimitedLineTokenizer;
import org.springframework.batch.item.validator.Validator;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.ClassPathResource;
import org.springframework.transaction.PlatformTransactionManager;

import javax.sql.DataSource;

/**Batch配置类
 * Created by Administrator on 2016/12/8.
 */
@Configuration
//开启批处理支持
@EnableBatchProcessing
public class CsvBathConfig {

    //ItemReader定义
    @Bean
    public ItemReader<Person> reader() throws Exception {
        //使用FlatFileItemReader读取文件
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        //设置csv文件路径
        reader.setResource(new ClassPathResource("people.csv"));
        //对csv文件的数据和领域模型类做对应映射
        reader.setLineMapper(new DefaultLineMapper<Person>(){{
           setLineTokenizer(new DelimitedLineTokenizer(){{
               setNames(new String[]{"name", "age", "nation", "address"});
           }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){{
                setTargetType(Person.class);
            }});
        }});
        return reader;
    }

    //ItemProcessor定义
    @Bean
    public ItemProcessor<Person, Person> processor() {
        //采用自定义的ItemProcessor的实现
        CsvItemProcessor processor = new CsvItemProcessor();
        //指定自定义检验器
        processor.setValidator(csvBeanValidator());
        return processor;
    }

    //ItemWriter定义
    @Bean
    public ItemWriter<Person> writer(DataSource dataSource) {//自动注入dataSource
        //使用jdbc批处理的JdbcBatchItemWriter写数据到数据库
        JdbcBatchItemWriter<Person> writer = new JdbcBatchItemWriter<Person>();
        writer.setItemSqlParameterSourceProvider(new
                BeanPropertyItemSqlParameterSourceProvider<Person>());
        //要执行批处理的sql语句
        String sql = "insert into person " + "(name,age,nation,address) " +
                "values (:name, :age, :nation, :address)";
        writer.setSql(sql);
        writer.setDataSource(dataSource);
        return writer;
    }

    @Bean
    public JobRepository jobRepository(DataSource dataSource,
              PlatformTransactionManager transactionManager) throws Exception {
        JobRepositoryFactoryBean jobRepositoryFactoryBean = new
                JobRepositoryFactoryBean();
        jobRepositoryFactoryBean.setDataSource(dataSource);
        jobRepositoryFactoryBean.setTransactionManager(transactionManager);
        jobRepositoryFactoryBean.setDatabaseType("mysql");
        return jobRepositoryFactoryBean.getObject();
    }

    @Bean
    public SimpleJobLauncher jobLauncher(DataSource dataSource,
               PlatformTransactionManager transactionManager) throws Exception {
        SimpleJobLauncher jobLauncher = new SimpleJobLauncher();
        jobLauncher.setJobRepository(jobRepository(dataSource, transactionManager));
        return jobLauncher;
    }

    @Bean
    public Job importJob(JobBuilderFactory jobs, Step s1) {
        return jobs.get("importJob")
                .incrementer(new RunIdIncrementer())
                .flow(s1)
                .end()
                .listener(csvJobListener())
                .build();
    }

    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory, ItemReader<Person>
                      reader, ItemWriter<Person> writer,
                      ItemProcessor<Person, Person> processor) {
        return stepBuilderFactory
                .get("step1")
                .<Person, Person>chunk(65000)//每次提交65000条数据
                .reader(reader)
                .processor(processor)
                .writer(writer)
                .build();
    }

    @Bean
    public CsvJobListener csvJobListener() {
        return new CsvJobListener();
    }

    @Bean
    public Validator<Person> csvBeanValidator() {
        return new CsvBeanValidator<Person>();
    }

}

3.Job监听

    用于监听Job的执行情况,需定义一个类实现JobExecutionListener,并在定义Job的Bean上绑定该监听器

定义:

public class CsvJobListener implements JobExecutionListener {

    long startTime;
    long endTime;

    @Override
    public void beforeJob(JobExecution jobExecution) {
        startTime = System.currentTimeMillis();
        System.out.println("任务处理开始");
    }

    @Override
    public void afterJob(JobExecution jobExecution) {
        endTime = System.currentTimeMillis();
        System.out.println("任务处理结束");
        System.out.println("耗时:" + (endTime - startTime) + "ms");
    }
}

注册:

@Bean
public Job importJob(JobBuilderFactory jobs, Step s1) {
    return jobs.get("importJob")
            .incrementer(new RunIdIncrementer())
            .flow(s1)
            .end()
            .listener(csvJobListener())
            .build();
}
@Bean
public CsvJobListener csvJobListener() {
    return new CsvJobListener();
}

4.数据读取

    Spring Batch 提供了大量的ItemReader实现

5.数据处理和校验

  • 通过ItemProcessor接口实现来完成
/**
 * 数据处理
 * Created by Administrator on 2016/12/8.
 */
public class CsvItemProcessor extends ValidatingItemProcessor<Person> {

    @Override
    public Person process(Person item) throws ValidationException {
        super.process(item);//调用自定义校验器

        //简单数据处理
        if (item.getNation().equals("汉族")) {
            item.setNation("01");
        } else {
            item.setNation("02");
        }
        return item;
    }
}

 

/**
 * 数据校验
 * Created by Administrator on 2016/12/8.
 */
public class CsvBeanValidator<T> implements Validator<T>, InitializingBean {

    private javax.validation.Validator validator;

    //Validator初始化
    @Override
    public void afterPropertiesSet() throws Exception {
        ValidatorFactory validatorFactory = Validation.buildDefaultValidatorFactory();
        validator = validatorFactory.usingContext().getValidator();
    }

    @Override
    public void validate(T value) throws ValidationException {
        //使用Validator的validate方法校验
        Set<ConstraintViolation<T>> constraintViolations = validator.validate(value);
        if (constraintViolations.size() > 0 ) {
            StringBuilder message = new StringBuilder();
            for (ConstraintViolation<T> constraintViolation : constraintViolations) {
                message.append(constraintViolation.getMessage() + "\n");
            }
            throw new ValidationException(message.toString());
        }
    }
}

定义ItemProcessor时把自定义校验器设置进去

//ItemProcessor定义
@Bean
    public ItemProcessor<Person, Person> processor() {
    //采用自定义的ItemProcessor的实现
    CsvItemProcessor processor = new CsvItemProcessor();
    //指定自定义检验器
    processor.setValidator(csvBeanValidator());
    return processor;
}
@Bean
public Validator<Person> csvBeanValidator() {
    return new CsvBeanValidator<Person>();
}

6.数据输出,Spring Batch 提供了大量的ItemWriter的实现

7.计划任务

    只需在普通计划任务方法中只需JobLauncher的run方法即可

8.参数后置绑定

    可以在JobParameters中绑定参数,定义Bean时使用@StepScope注解,然后通过@Value注入此参数

参数设置:

@RequestMapping("/imp")
public String imp(String fileName) throws Exception {
    String path = fileName + ".csv";
    jobParameters = new JobParametersBuilder()
            .addLong("time", System.currentTimeMillis())
            .addString("input.file.name", path)
            .toJobParameters();
    jobLauncher.run(importJob, jobParameters);
    return "ok";
}

定义Bean:

@Configuration
@EnableBatchProcessing
public class TriggerBatchConfig {
    //ItemReader定义
    @Bean
    @StepScope
    public FlatFileItemReader<Person> reader(@Value("#{jobParameters['input.file.name']}")
                                                         String pathToFile) throws Exception {
        //使用FlatFileItemReader读取文件
        FlatFileItemReader<Person> reader = new FlatFileItemReader<Person>();
        //设置csv文件路径
        reader.setResource(new ClassPathResource(pathToFile));
        //对csv文件的数据和领域模型类做对应映射
        reader.setLineMapper(new DefaultLineMapper<Person>(){{
            setLineTokenizer(new DelimitedLineTokenizer(){{
                setNames(new String[]{"name", "age", "nation", "address"});
            }});
            setFieldSetMapper(new BeanWrapperFieldSetMapper<Person>(){{
                setTargetType(Person.class);
            }});
        }});
        return reader;
    }

源码:https://git.oschina.net/NeedLoser/Spring-Boot.git 的springbatch项目

© 著作权归作者所有

共有 人打赏支持
慕容若冰
粉丝 0
博文 44
码字总数 9774
作品 0
广州
程序员
Spring Boot实战与原理分析

Spring Boot实战与原理分析 1:Spring Boot概述与课程概要介绍 2:Spring4 快速入门 3:Spring4 扩展分析(一) 4:Spring4 扩展分析(二) 5:Spring Boot 快速入门 6:Spring Boot 配置分析...

刘宗泽
06/26
0
0
spring boot框架学习2-spring boot核心(1)

本节主要: 1:解析spring boot入口和@SpringBootApplication源码详解 SpringBootApplication包含: @SpringBootConfiguration @ComponentScan @EnableAutoConfiguration 本文是《凯哥陪你学......

凯哥java
2017/10/27
0
0
spring boot框架学习3-spring boot核心(2)

本节主要: 1:解析spring boot入口和@SpringBootApplication源码详解 SpringBootApplication包含: @SpringBootConfiguration @ComponentScan @EnableAutoConfiguration 本文是《凯哥陪你学......

凯哥java
2017/10/26
0
0
Spring Security4实战与原理分析视频课程( 扩展+自定义)

Spring Security概述与课程概要介绍 Spring Security快速入门(基于XML) Spring Security快速入门(基于XML) URL匹配详解 自定义登陆 配置退出 Ajax登陆退出 JDBC认证 层级角色关系 认证体...

刘宗泽
06/26
0
0
Spring Batch_官网DEMO实现

Spring Batch_官网DEMO实现 http://spring.io/guides/gs/batch-processing/ 使用spring xml方式实现了spring batch官网的demo,现在把具体的代码贴出来,具体的细节配置还要参考官网的说明。...

秋风醉了
2014/11/03
0
0
SpringCloud SpringBoot 分布式微服务云架构 构建RESTful API

首先,回顾并详细说明一下在快速入门中使用的、、注解。如果您对Spring MVC不熟悉并且还没有尝试过快速入门案例,建议先看一下快速入门的内容。 :修饰class,用来创建处理http请求的对象 :...

itcloud
07/09
0
0
【第2章 Spring快速入门】2.2 WEB应用之HelloWorld

上一节:【第2章 Spring快速入门】2.1 Java应用之HelloWorld 本章节主要介绍在WEB开发过程中,Spring是怎么使用的?上一节中我们已经讲到在普通的Java Project中如何使用Spring容器了,那么在...

陶邦仁
2015/07/07
0
0
Spring Batch_Multi-threaded Step

Spring Batch_Multi-threaded Step spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html The simplest way to start parallel processing is to ad......

秋风醉了
2014/11/13
0
0
Spring Batch_Configuring a Step for Restart

Spring Batch_Configuring a Step for Restart spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#stepRestart 当一个普通的 job 处于complete ......

秋风醉了
2014/11/14
0
0
Duplicate entry '0' for key 'PRIMARY'_Spring Batch

Duplicate entry '0' for key 'PRIMARY'Spring Batch 在运行一个job成功后,在运行一次,会出现这个异常Duplicate entry '0' for key 'PRIMARY',也就是主键冲突。 在truncate 表之后执行以下...

秋风醉了
2014/11/12
0
1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

DUBBO 详细介绍

摘要: 主要核心部件: Remoting: 网络通信框架,实现了 sync-over-async 和 request-response 消息机制. RPC: 一个远程过程调用的抽象,支持负载均衡、容灾和集群功能 Registry: 服务目录框架...

明理萝
11分钟前
0
1
4 个快速的 Python 编译器 for 2018

简评:Python 和其他的解释型语言一样经常被吐槽性能不行,所以开发人员为了提升性能创建了不少编译器,本文则选取其中的四个做了基准测试。 Python 其实是一种相当快的语言,但它并不像编译...

极光推送
14分钟前
0
0
spring boot注册多个MQ服务器的问题

关于注册到多个MQ源的文章已经有很多了,这里记录一下声明queue的坑; 如果使用注册bean的方式声明queue,会导致声明的queue同时被注册到所有的MQ源上; //如果使用下面的声明方式,que...

placeholder
16分钟前
0
0
Java面试基础篇——第九篇:BIO,NIO,AIO的区别

现在IO模型主要分三类:BIO(同步阻塞IO),NIO(同步非阻塞IO),AIO()。 先来看看BIO。 1. BIO 服务端接受到请求后,要指派或新建一个线程去处理客户端的IO请求,直到收到断开连接的指令。这么做...

developlee的潇洒人生
21分钟前
0
0
@RequestMapping @ResponseBody 和 @RequestBody 用法与区别

1.@RequestMapping 国际惯例先介绍什么是@RequestMapping,@RequestMapping 是一个用来处理请求地址映射的注解,可用于类或方法上。用于类上,表示类中的所有响应请求的方法都是以该地址作为...

特拉仔
23分钟前
1
0
基于 HTML5 结合互联网+ 的 3D 隧道

前言 目前,物资采购和人力成本是隧道业发展的两大瓶颈。比如依靠民间借贷,融资成本很高;采购价格不透明,没有增值税发票;还有项目管控和供应链管理的问题。成本在不断上升,利润在不断下...

xhload3d
25分钟前
0
0
济南小程序热度分析

原文链接:http://www.jnqianle.cn/company/2072.html

tianma3798
26分钟前
1
0
大数据软件

beats 采集 kafka spark hive es grafana zeppelin

ArlenXu
28分钟前
0
0
Mac item2常用快捷键

标签 新建标签:command + t 关闭标签:command + w 切换标签:command + 数字 command + 左右方向键 切换全屏:command + enter 查找:command + f 分屏 水平分屏:command + d 垂直分屏:c...

说回答
31分钟前
0
0
mac常用软件

1.excel for mac http://www.pc6.com/mac/114205.html

小黑202
31分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部