文档章节

Spring Batch_Parallel Steps

秋风醉了
 秋风醉了
发布于 2014/11/14 11:15
字数 944
阅读 1786
收藏 3
点赞 0
评论 2

Spring Batch_Parallel Steps_使用并行的Step

spring 官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html#scalabilityParallelSteps

As long as the application logic that needs to be parallelized can be split into distinct responsibilities, and assigned to individual steps then it can be parallelized in a single process. Parallel Step execution is easy to configure and use, for example, to execute steps (step1,step2) in parallel with step3, you could configure a flow like this:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

The configurable "task-executor" attribute is used to specify which TaskExecutor implementation should be used to execute the individual flows. The default is SyncTaskExecutor, but an asynchronous TaskExecutor is required to run the steps in parallel. Note that the job will ensure that every flow in the split completes before aggregating the exit statuses and transitioning.

 

多个step之间的并行化,可以提高批处理的效率。什么情况下可以应用step之间的并行化,那就要根据具体的业务需求来定。

那我们假设有这样一种场景:有一类数据,分别存在于文件和数据库,数据的内容一样,只是形式不一样,那么我们可以定义并行的step来分别处理来自文件的数据和来自数据库的数据,然后,分别进行同样的processor,然后写入数据库。

 

下面我们就刚才讲的那种场景实现我们并行的step,如下:

spring-batch-split.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
	<!-- 并行的step -->
	<batch:job id="addPeopleDescJob">
		<batch:split id="split1" task-executor="taskExecutor">
			<batch:flow>
				<batch:step id="parallel_step_1">
					<batch:tasklet>
						<batch:chunk reader="peopleAddDescReader_db"
							processor="addDescProcessor" writer="addDescPeopleWriter"
							commit-interval="10" />
					</batch:tasklet>
				</batch:step>
			</batch:flow>
			<batch:flow>
				<batch:step id="parallel_step_2">
					<batch:tasklet>
						<batch:chunk reader="peopleAddDescReader_file"
							processor="addDescProcessor" writer="addDescPeopleWriter"
							commit-interval="10" />
					</batch:tasklet>
				</batch:step>
			</batch:flow>
		</batch:split>
	</batch:job>

	<!-- 从数据库读取数据的reader -->
	<bean id="peopleAddDescReader_db" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!-- 从文件中读取数据的reader -->
	<bean id="lineTokenizer"
		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
		<property name="delimiter" value="," />
		<property name="names">
			<list>
				<value>firstName</value>
				<value>lastName</value>
			</list>
		</property>
	</bean>
	<bean id="fieldSetMapper"
		class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
		<property name="prototypeBeanName" value="people" />
	</bean>
	<bean id="people" class="com.lyx.batch.People" scope="prototype" />
	<bean id="lineMapper"
		class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
		<property name="lineTokenizer" ref="lineTokenizer" />
		<property name="fieldSetMapper" ref="fieldSetMapper" />
	</bean>
	<bean id="resource" class="org.springframework.core.io.ClassPathResource">
		<constructor-arg index="0" type="java.lang.String"
			value="sample-data.csv" />
	</bean>
	<bean id="peopleAddDescReader_file" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" ref="resource" />
		<property name="encoding" value="utf-8" />
		<property name="lineMapper" ref="lineMapper" />
	</bean>
	<!--从文件读取数据的reader end -->

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>

以上就是step 并行化的主要配置。

AppMain7.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试并行的job
 * 
 * @author Lenovo
 *
 */
public class AppMain7 {

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-split.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}

}

==================END==================

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 229
博文 577
码字总数 407134
作品 0
朝阳
程序员
加载中

评论(2)

秋风醉了
秋风醉了

引用来自“Ploughs”的评论

您好,按照您这样来配置的,但是报错以下的错误,百度后说如果nextStep在该JOB中还没有配置,也就是说nextStep还不存在的情况下,就会报错。
Caused by: java.lang.IllegalArgumentException: Missing state for [StateTransition: [state=preparedDataSpilt.0.extractNPSFlowDataStep, pattern=*, next=preparedDataSpilt.0.loadUnionFixedLengthStep]]
  at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitions(SimpleFlow.java:283)
  at org.springframework.batch.core.job.flow.support.SimpleFlow.afterPropertiesSet(SimpleFlow.java:128)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:125)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:46)
  at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:142)
  ... 44 more
如果nextStep在该JOB中还没有配置,也就是说nextStep还不存在的情况下,就会报错。
------如果需要nextStep 就配置一下 ,不需要就不要配置啊。
somnusic
somnusic
您好,按照您这样来配置的,但是报错以下的错误,百度后说如果nextStep在该JOB中还没有配置,也就是说nextStep还不存在的情况下,就会报错。
Caused by: java.lang.IllegalArgumentException: Missing state for [StateTransition: [state=preparedDataSpilt.0.extractNPSFlowDataStep, pattern=*, next=preparedDataSpilt.0.loadUnionFixedLengthStep]]
  at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitions(SimpleFlow.java:283)
  at org.springframework.batch.core.job.flow.support.SimpleFlow.afterPropertiesSet(SimpleFlow.java:128)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:125)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:46)
  at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:142)
  ... 44 more
Spring Batch_Step Flow

Spring Batch学习_Step Flow http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html With the ability to group steps together within an owning job comes the nee......

秋风醉了
2014/10/31
0
0
Spring Batch_Multi-threaded Step

Spring Batch_Multi-threaded Step spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html The simplest way to start parallel processing is to ad......

秋风醉了
2014/11/13
0
0
Spring Batch学习笔记——steps之间共享数据

名词说明: 上下文: 执行: 执行上下文: 案例: 警告:一旦steps共享数据,这些数据就会把这些steps连接起来。努力使steps独立。如果你实在是不能独立他们,才使用下面的技术。你应该把数据...

xhujinjun
2015/04/02
0
0
Spring Batch_官网DEMO实现

Spring Batch_官网DEMO实现 http://spring.io/guides/gs/batch-processing/ 使用spring xml方式实现了spring batch官网的demo,现在把具体的代码贴出来,具体的细节配置还要参考官网的说明。...

秋风醉了
2014/11/03
0
0
Spring Batch_Configuring a Step for Restart

Spring Batch_Configuring a Step for Restart spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#stepRestart 当一个普通的 job 处于complete ......

秋风醉了
2014/11/14
0
0
Duplicate entry '0' for key 'PRIMARY'_Spring Batch

Duplicate entry '0' for key 'PRIMARY'Spring Batch 在运行一个job成功后,在运行一次,会出现这个异常Duplicate entry '0' for key 'PRIMARY',也就是主键冲突。 在truncate 表之后执行以下...

秋风醉了
2014/11/12
0
1
Spring Batch_JobParameters

Spring BatchJobParameters spring batch的JobParameters是设置job运行的参数,同时也具有标志job的作用,就是判断两个job是不是同一个job( "how is one JobInstance distinguished from a...

秋风醉了
2014/11/12
0
0
Spring Batch_JOB重启机制

Spring Batch_JOB重启机制 在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有...

秋风醉了
2014/11/19
0
0
Spring Batch_Configuring Skip Logic

Spring Batch_Configuring Skip Logic spring 官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#configuringSkip 当spring batch 处理过程中,可能要......

秋风醉了
2014/11/19
0
0
Spring Batch_实现Tasklet

Spring Batch_实现Tasklet Chunk-oriented processing is not the only way to process in a Step. What if a Step must consist as a simple stored procedure call? You could implement ......

秋风醉了
2014/12/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

gRPC学习笔记

gRPC编程流程 1. proto文件定义 proto文件用于定义需要通过gRPC生成的接口,可以理解为接口定义文档 2. 通过构建工具生成服务基类代码-Maven或Gradle 3. 服务端开发 服务端实现类须实现通过构...

OSC_fly
9分钟前
0
0
Docker Mac (三) Dockerfile 及命令

Dockerfile 最近学习docker的时候,遇到一件怪事,关于docker镜像可能会被破坏,还不知道它会有此措施 所以需要了解构建Dockerfile的正确方法 Dockerfile是由一系列命令和参数构成的脚本,这些命...

___大侠
36分钟前
0
0
NetCat Tutorials

Hacking with Netcat part 1: The Basics Hacking with Netcat part 2: Bind and reverse shells Hacking with Netcat part 3: Advanced Techniques 10 Introduction to Netcat - pdf NetCat......

zungyiu
36分钟前
0
0
Android Studio+NDK+Cmake 移植FFmpeg-4.0.2命令行工具

一、编译 参考大神的帖子,亲测一次编译成功:https://blog.csdn.net/bobcat_kay/article/details/80889398 鉴于以前查文档的经验,这里附上编写例子的时间:2018年7月22日 我用的是ubantu,...

她叫我小渝
36分钟前
0
0
mysql创建数据库

登录MYSQL mysql -u root -p 脚本创建数据库WeChat,并制定默认的字符集是utf8mb4。 CREATE DATABASE Wechat DEFAULT CHARSET utf8mb4 COLLATE utf8mb4_general_ci; 授权 grant all......

niithub
51分钟前
0
0
svn: Unable to connect to a repository URL 的解决方案

错误图示: 解决办法:清除本地保存的授权信息; 1:右键点击本地文件夹,选择设置; TortoiseSVN -> Settings 2:在弹出的对话框中选择 Saved Data, 右侧选择:授权地方清理所有。 然后点确...

宁哥实战课堂
今天
1
0
sleep与wait的区别

Thread.sleep(XXX)方法消耗CPU吗? 这个知识点是我之前认识一直有错误的一个知识点,在我以前的认识里面,我一直认为Thread.sleep(1000)的这一秒钟的时间内,线程的休眠是一直占用着CPU的时间...

码代码的小司机
今天
1
0
20位活跃在Github上的国内技术大牛 leij 何小鹏 亚信

本文列举了20位在Github上非常活跃的国内大牛,看看其中是不是很多熟悉的面孔? 1. lifesinger(玉伯) Github主页: https://github.com/lifesinger 微博:@ 玉伯也叫射雕 玉伯(王保平),...

海博1600
今天
1
0
Mybatis收集配置

一、Mybatis取Clob数据 1、Mapper.xml配置 <resultMap type="com.test.User" id="user"> <result column="id" property="id"/> <result column="json_data" property="jsonData" ......

星痕2018
今天
1
0
centos7设置以多用户模式启动

1、旧版本linux系统修改inittab文件,在新版本执行vi /etc/inittab 会有以下提示 # inittab is no longer used when using systemd. # # ADDING CONFIGURATION HERE WILL HAVE NO EFFECT ON......

haha360
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部