文档章节

Spring Batch_Parallel Steps

秋风醉了
 秋风醉了
发布于 2014/11/14 11:15
字数 944
阅读 1826
收藏 3

Spring Batch_Parallel Steps_使用并行的Step

spring 官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html#scalabilityParallelSteps

As long as the application logic that needs to be parallelized can be split into distinct responsibilities, and assigned to individual steps then it can be parallelized in a single process. Parallel Step execution is easy to configure and use, for example, to execute steps (step1,step2) in parallel with step3, you could configure a flow like this:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

The configurable "task-executor" attribute is used to specify which TaskExecutor implementation should be used to execute the individual flows. The default is SyncTaskExecutor, but an asynchronous TaskExecutor is required to run the steps in parallel. Note that the job will ensure that every flow in the split completes before aggregating the exit statuses and transitioning.

 

多个step之间的并行化,可以提高批处理的效率。什么情况下可以应用step之间的并行化,那就要根据具体的业务需求来定。

那我们假设有这样一种场景:有一类数据,分别存在于文件和数据库,数据的内容一样,只是形式不一样,那么我们可以定义并行的step来分别处理来自文件的数据和来自数据库的数据,然后,分别进行同样的processor,然后写入数据库。

 

下面我们就刚才讲的那种场景实现我们并行的step,如下:

spring-batch-split.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
	<!-- 并行的step -->
	<batch:job id="addPeopleDescJob">
		<batch:split id="split1" task-executor="taskExecutor">
			<batch:flow>
				<batch:step id="parallel_step_1">
					<batch:tasklet>
						<batch:chunk reader="peopleAddDescReader_db"
							processor="addDescProcessor" writer="addDescPeopleWriter"
							commit-interval="10" />
					</batch:tasklet>
				</batch:step>
			</batch:flow>
			<batch:flow>
				<batch:step id="parallel_step_2">
					<batch:tasklet>
						<batch:chunk reader="peopleAddDescReader_file"
							processor="addDescProcessor" writer="addDescPeopleWriter"
							commit-interval="10" />
					</batch:tasklet>
				</batch:step>
			</batch:flow>
		</batch:split>
	</batch:job>

	<!-- 从数据库读取数据的reader -->
	<bean id="peopleAddDescReader_db" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!-- 从文件中读取数据的reader -->
	<bean id="lineTokenizer"
		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
		<property name="delimiter" value="," />
		<property name="names">
			<list>
				<value>firstName</value>
				<value>lastName</value>
			</list>
		</property>
	</bean>
	<bean id="fieldSetMapper"
		class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
		<property name="prototypeBeanName" value="people" />
	</bean>
	<bean id="people" class="com.lyx.batch.People" scope="prototype" />
	<bean id="lineMapper"
		class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
		<property name="lineTokenizer" ref="lineTokenizer" />
		<property name="fieldSetMapper" ref="fieldSetMapper" />
	</bean>
	<bean id="resource" class="org.springframework.core.io.ClassPathResource">
		<constructor-arg index="0" type="java.lang.String"
			value="sample-data.csv" />
	</bean>
	<bean id="peopleAddDescReader_file" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" ref="resource" />
		<property name="encoding" value="utf-8" />
		<property name="lineMapper" ref="lineMapper" />
	</bean>
	<!--从文件读取数据的reader end -->

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>

以上就是step 并行化的主要配置。

AppMain7.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试并行的job
 * 
 * @author Lenovo
 *
 */
public class AppMain7 {

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-split.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}

}

==================END==================

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 239
博文 573
码字总数 417084
作品 0
朝阳
程序员
私信 提问
加载中

评论(2)

秋风醉了
秋风醉了

引用来自“Ploughs”的评论

您好,按照您这样来配置的,但是报错以下的错误,百度后说如果nextStep在该JOB中还没有配置,也就是说nextStep还不存在的情况下,就会报错。
Caused by: java.lang.IllegalArgumentException: Missing state for [StateTransition: [state=preparedDataSpilt.0.extractNPSFlowDataStep, pattern=*, next=preparedDataSpilt.0.loadUnionFixedLengthStep]]
  at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitions(SimpleFlow.java:283)
  at org.springframework.batch.core.job.flow.support.SimpleFlow.afterPropertiesSet(SimpleFlow.java:128)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:125)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:46)
  at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:142)
  ... 44 more
如果nextStep在该JOB中还没有配置,也就是说nextStep还不存在的情况下,就会报错。
------如果需要nextStep 就配置一下 ,不需要就不要配置啊。
somnusic
somnusic
您好,按照您这样来配置的,但是报错以下的错误,百度后说如果nextStep在该JOB中还没有配置,也就是说nextStep还不存在的情况下,就会报错。
Caused by: java.lang.IllegalArgumentException: Missing state for [StateTransition: [state=preparedDataSpilt.0.extractNPSFlowDataStep, pattern=*, next=preparedDataSpilt.0.loadUnionFixedLengthStep]]
  at org.springframework.batch.core.job.flow.support.SimpleFlow.initializeTransitions(SimpleFlow.java:283)
  at org.springframework.batch.core.job.flow.support.SimpleFlow.afterPropertiesSet(SimpleFlow.java:128)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:125)
  at org.springframework.batch.core.configuration.xml.SimpleFlowFactoryBean.getObject(SimpleFlowFactoryBean.java:46)
  at org.springframework.beans.factory.support.FactoryBeanRegistrySupport.doGetObjectFromFactoryBean(FactoryBeanRegistrySupport.java:142)
  ... 44 more
Spring Batch_Step Flow

Spring Batch学习_Step Flow http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html With the ability to group steps together within an owning job comes the nee......

秋风醉了
2014/10/31
0
0
Spring Batch_Multi-threaded Step

Spring Batch_Multi-threaded Step spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html The simplest way to start parallel processing is to ad......

秋风醉了
2014/11/13
0
0
Spring Batch学习笔记——steps之间共享数据

名词说明: 上下文: 执行: 执行上下文: 案例: 警告:一旦steps共享数据,这些数据就会把这些steps连接起来。努力使steps独立。如果你实在是不能独立他们,才使用下面的技术。你应该把数据...

xhujinjun
2015/04/02
0
0
Spring Batch 4.1.0.RC1,用于编写批处理应用的框架

Spring Batch 4.1.0.RC1 已发布,可从 Maven Central, Github 和 Pivotal 下载仓库获取。Spring Batch 是一个使用 Spring 和 Java 编写离线和批处理应用程序的框架。 更新内容 在这个版本中,...

局长
09/27
0
0
Spring Batch_官网DEMO实现

Spring Batch_官网DEMO实现 http://spring.io/guides/gs/batch-processing/ 使用spring xml方式实现了spring batch官网的demo,现在把具体的代码贴出来,具体的细节配置还要参考官网的说明。...

秋风醉了
2014/11/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

中高级面试知识点:缓存

前言 几乎所有的项目都做了缓存,但是缓存做的怎么样,其实只有我们自己知道。缓存做的好,没有网络也能流畅的使用;再多的数据请求都不会出现卡顿延迟等待很久的情况。 程序中除了图片缓存(...

Mr_zebra
26分钟前
2
0
Poco官方PPT_190-Applications双语对照翻译

因工作需要用到这一块的功能,所以直接翻译了一下 此PPT来源于官方文件,地址https://pocoproject.org/documentation.html

CHONGCHEN
30分钟前
5
1
使用idea开发servlet,引用maven后触发的class not found的问题的解决方案

需要将maven下载的依赖加入到lib目录,具体操作方法如下:打开Projrct Settings->Artifacts->右边的Output Layout,双击maven的依赖,就可以加载上去...

shatian
31分钟前
2
0
SpringMVC 拦截器

拦截器 是指通过统一拦截从浏览器发往服务器的请求来完成功能的增强 SpringMVC拦截器实现过程 1.编写拦截器,实现 org.springframework.web.servlet.HandlerInterceptor 接口 2.将拦截器注册...

晨猫
34分钟前
2
0
RabbitMQ+PHP演示实例

新建rabbit_consumer.php作为消费者 <?php //配置信息 $conn_args = array( 'host' => '127.0.0.1', 'port' => '5672', 'login' => 'admin', 'password' => ......

hansonwong
35分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部