文档章节

Spring Batch_Parallel Steps

秋风醉了
 秋风醉了
发布于 2014/11/14 11:15
字数 944
阅读 1833
收藏 3

Spring Batch_Parallel Steps_使用并行的Step

spring 官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html#scalabilityParallelSteps

As long as the application logic that needs to be parallelized can be split into distinct responsibilities, and assigned to individual steps then it can be parallelized in a single process. Parallel Step execution is easy to configure and use, for example, to execute steps (step1,step2) in parallel with step3, you could configure a flow like this:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

The configurable "task-executor" attribute is used to specify which TaskExecutor implementation should be used to execute the individual flows. The default is SyncTaskExecutor, but an asynchronous TaskExecutor is required to run the steps in parallel. Note that the job will ensure that every flow in the split completes before aggregating the exit statuses and transitioning.

 

多个step之间的并行化,可以提高批处理的效率。什么情况下可以应用step之间的并行化,那就要根据具体的业务需求来定。

那我们假设有这样一种场景:有一类数据,分别存在于文件和数据库,数据的内容一样,只是形式不一样,那么我们可以定义并行的step来分别处理来自文件的数据和来自数据库的数据,然后,分别进行同样的processor,然后写入数据库。

 

下面我们就刚才讲的那种场景实现我们并行的step,如下:

spring-batch-split.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
	<!-- 并行的step -->
	<batch:job id="addPeopleDescJob">
		<batch:split id="split1" task-executor="taskExecutor">
			<batch:flow>
				<batch:step id="parallel_step_1">
					<batch:tasklet>
						<batch:chunk reader="peopleAddDescReader_db"
							processor="addDescProcessor" writer="addDescPeopleWriter"
							commit-interval="10" />
					</batch:tasklet>
				</batch:step>
			</batch:flow>
			<batch:flow>
				<batch:step id="parallel_step_2">
					<batch:tasklet>
						<batch:chunk reader="peopleAddDescReader_file"
							processor="addDescProcessor" writer="addDescPeopleWriter"
							commit-interval="10" />
					</batch:tasklet>
				</batch:step>
			</batch:flow>
		</batch:split>
	</batch:job>

	<!-- 从数据库读取数据的reader -->
	<bean id="peopleAddDescReader_db" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!-- 从文件中读取数据的reader -->
	<bean id="lineTokenizer"
		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
		<property name="delimiter" value="," />
		<property name="names">
			<list>
				<value>firstName</value>
				<value>lastName</value>
			</list>
		</property>
	</bean>
	<bean id="fieldSetMapper"
		class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
		<property name="prototypeBeanName" value="people" />
	</bean>
	<bean id="people" class="com.lyx.batch.People" scope="prototype" />
	<bean id="lineMapper"
		class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
		<property name="lineTokenizer" ref="lineTokenizer" />
		<property name="fieldSetMapper" ref="fieldSetMapper" />
	</bean>
	<bean id="resource" class="org.springframework.core.io.ClassPathResource">
		<constructor-arg index="0" type="java.lang.String"
			value="sample-data.csv" />
	</bean>
	<bean id="peopleAddDescReader_file" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" ref="resource" />
		<property name="encoding" value="utf-8" />
		<property name="lineMapper" ref="lineMapper" />
	</bean>
	<!--从文件读取数据的reader end -->

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>

以上就是step 并行化的主要配置。

AppMain7.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试并行的job
 * 
 * @author Lenovo
 *
 */
public class AppMain7 {

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-split.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}

}

==================END==================

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 241
博文 566
码字总数 417505
作品 0
朝阳
程序员
私信 提问
加载中

评论(2)

秋风醉了
秋风醉了

引用来自“Ploughs”的评论

您好,按照您这样来配置的,但是报错以下的错误,百度后说如果nextStep在该JOB中还没有配置,也就是说nextStep还不存在的情况下,就会报错。
Caused by: java.lang.IllegalArgumentException: Missing state for [StateTransition: [state=preparedDataSpilt.0.extractNPSFlowDataStep, pattern=*, next=preparedDataSpilt.0.loadUnionFixedLengthStep]]
  at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitions(SimpleFlow.java:283)
  at org.springframework.batch.core.job.flow.support.SimpleFlow.afterPropertiesSet(SimpleFlow.java:128)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:125)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:46)
  at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:142)
  ... 44 more
如果nextStep在该JOB中还没有配置,也就是说nextStep还不存在的情况下,就会报错。
------如果需要nextStep 就配置一下 ,不需要就不要配置啊。
somnusic
somnusic
您好,按照您这样来配置的,但是报错以下的错误,百度后说如果nextStep在该JOB中还没有配置,也就是说nextStep还不存在的情况下,就会报错。
Caused by: java.lang.IllegalArgumentException: Missing state for [StateTransition: [state=preparedDataSpilt.0.extractNPSFlowDataStep, pattern=*, next=preparedDataSpilt.0.loadUnionFixedLengthStep]]
  at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitions(SimpleFlow.java:283)
  at org.springframework.batch.core.job.flow.support.SimpleFlow.afterPropertiesSet(SimpleFlow.java:128)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:125)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:46)
  at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:142)
  ... 44 more
Spring Batch_Step Flow

Spring Batch学习_Step Flow http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html With the ability to group steps together within an owning job comes the nee......

秋风醉了
2014/10/31
0
0
Spring Batch_Multi-threaded Step

Spring Batch_Multi-threaded Step spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html The simplest way to start parallel processing is to ad......

秋风醉了
2014/11/13
0
0
Spring Tool Suite 3.8.2 发布

Spring Tool Suite 3.8.2 发布了。 SpringSource Tool Suite 基于 Spring IDE ,提供了其它的一些特性,如 基于Spring dm Server 的osgi 开发,及其它一些 Spring 项目的支持,如Spring Roo,...

达尔文
2016/10/08
5.3K
2
Spring Batch 2.2.0.M1 发布

SpringBatch,作为一个 Spring 组件,提供了通过使用 Spring 的 依赖注入(dependency injection) 来处理批处理的条件。下图是 Spring Batch 的体系结构 SpringSource 发布了 Spring Batch 2...

红薯
2013/01/30
840
0
Spring Batch学习笔记——steps之间共享数据

名词说明: 上下文: 执行: 执行上下文: 案例: 警告:一旦steps共享数据,这些数据就会把这些steps连接起来。努力使steps独立。如果你实在是不能独立他们,才使用下面的技术。你应该把数据...

xhujinjun
2015/04/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周一乱弹 —— 白掌柜说了卖货不卖身

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @爱漫爱 :这是一场修行分享羽肿的单曲《Moony》 手机党少年们想听歌,请使劲儿戳(这里) @clouddyy :开不开心? 开心呀, 我又不爱睡懒觉…...

小小编辑
今天
15
3
大数据教程(11.7)hadoop2.9.1平台上仓库工具hive1.2.2搭建

上一篇文章介绍了hive2.3.4的搭建,然而这个版本已经不能稳定的支持mapreduce程序。本篇博主将分享hive1.2.2工具搭建全过程。先说明:本节就直接在上一节的hadoop环境中搭建了! 一、下载apa...

em_aaron
今天
5
0
开始看《JSP&Servlet学习笔记》

1:WEB应用简介。其中1.2.1对Web容器的工作流程写得不错 2:编写Servlet。搞清楚了Java的Web目录结构,以及Web.xml的一些配置作用。特别是讲了@WebServlet标签 3:请求与响应。更细致的讲了从...

max佩恩
今天
5
0
mysql分区功能详细介绍,以及实例

一,什么是数据库分区 前段时间写过一篇关于mysql分表的的文章,下面来说一下什么是数据库分区,以mysql为例。mysql数据库中的数据是以文件的形势存在磁盘上的,默认放在/mysql/data下面(可...

吴伟祥
今天
5
0
SQL语句查询

1.1 排序 通过order by语句,可以将查询出的结果进行排序。放置在select语句的最后。 格式: SELECT * FROM 表名 ORDER BY 排序字段ASC|DESC; ASC 升序 (默认) DESC 降序 1.查询所有商品信息,...

stars永恒
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部