文档章节

Spring Batch_Parallel Steps

秋风醉了
 秋风醉了
发布于 2014/11/14 11:15
字数 944
阅读 2.1K
收藏 3

精选30+云产品,助力企业轻松上云!>>>

Spring Batch_Parallel Steps_使用并行的Step

spring 官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html#scalabilityParallelSteps

As long as the application logic that needs to be parallelized can be split into distinct responsibilities, and assigned to individual steps then it can be parallelized in a single process. Parallel Step execution is easy to configure and use, for example, to execute steps (step1,step2) in parallel with step3, you could configure a flow like this:

<job id="job1">
    <split id="split1" task-executor="taskExecutor" next="step4">
        <flow>
            <step id="step1" parent="s1" next="step2"/>
            <step id="step2" parent="s2"/>
        </flow>
        <flow>
            <step id="step3" parent="s3"/>
        </flow>
    </split>
    <step id="step4" parent="s4"/>
</job>

<beans:bean id="taskExecutor" class="org.spr...SimpleAsyncTaskExecutor"/>

The configurable "task-executor" attribute is used to specify which TaskExecutor implementation should be used to execute the individual flows. The default is SyncTaskExecutor, but an asynchronous TaskExecutor is required to run the steps in parallel. Note that the job will ensure that every flow in the split completes before aggregating the exit statuses and transitioning.

 

多个step之间的并行化,可以提高批处理的效率。什么情况下可以应用step之间的并行化,那就要根据具体的业务需求来定。

那我们假设有这样一种场景:有一类数据,分别存在于文件和数据库,数据的内容一样,只是形式不一样,那么我们可以定义并行的step来分别处理来自文件的数据和来自数据库的数据,然后,分别进行同样的processor,然后写入数据库。

 

下面我们就刚才讲的那种场景实现我们并行的step,如下:

spring-batch-split.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
	<!-- 并行的step -->
	<batch:job id="addPeopleDescJob">
		<batch:split id="split1" task-executor="taskExecutor">
			<batch:flow>
				<batch:step id="parallel_step_1">
					<batch:tasklet>
						<batch:chunk reader="peopleAddDescReader_db"
							processor="addDescProcessor" writer="addDescPeopleWriter"
							commit-interval="10" />
					</batch:tasklet>
				</batch:step>
			</batch:flow>
			<batch:flow>
				<batch:step id="parallel_step_2">
					<batch:tasklet>
						<batch:chunk reader="peopleAddDescReader_file"
							processor="addDescProcessor" writer="addDescPeopleWriter"
							commit-interval="10" />
					</batch:tasklet>
				</batch:step>
			</batch:flow>
		</batch:split>
	</batch:job>

	<!-- 从数据库读取数据的reader -->
	<bean id="peopleAddDescReader_db" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!-- 从文件中读取数据的reader -->
	<bean id="lineTokenizer"
		class="org.springframework.batch.item.file.transform.DelimitedLineTokenizer">
		<property name="delimiter" value="," />
		<property name="names">
			<list>
				<value>firstName</value>
				<value>lastName</value>
			</list>
		</property>
	</bean>
	<bean id="fieldSetMapper"
		class="org.springframework.batch.item.file.mapping.BeanWrapperFieldSetMapper">
		<property name="prototypeBeanName" value="people" />
	</bean>
	<bean id="people" class="com.lyx.batch.People" scope="prototype" />
	<bean id="lineMapper"
		class="org.springframework.batch.item.file.mapping.DefaultLineMapper">
		<property name="lineTokenizer" ref="lineTokenizer" />
		<property name="fieldSetMapper" ref="fieldSetMapper" />
	</bean>
	<bean id="resource" class="org.springframework.core.io.ClassPathResource">
		<constructor-arg index="0" type="java.lang.String"
			value="sample-data.csv" />
	</bean>
	<bean id="peopleAddDescReader_file" class="org.springframework.batch.item.file.FlatFileItemReader">
		<property name="resource" ref="resource" />
		<property name="encoding" value="utf-8" />
		<property name="lineMapper" ref="lineMapper" />
	</bean>
	<!--从文件读取数据的reader end -->

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>

以上就是step 并行化的主要配置。

AppMain7.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试并行的job
 * 
 * @author Lenovo
 *
 */
public class AppMain7 {

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-split.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}

}

==================END==================

秋风醉了
粉丝 253
博文 530
码字总数 404430
作品 0
朝阳
程序员
私信 提问
加载中
此博客有 2 条评论,请先登录后再查看。
Spring Batch_Multi-threaded Step

Spring Batch_Multi-threaded Step spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html The simplest way to start parallel processing is to ad......

秋风醉了
2014/11/13
1.7K
0
Spring Batch 3.0 知识点总结

来自官网总结 https://docs.spring.io/spring-batch/3.0.x/reference/htmlsingle/ Spring Batch架构 Spring Batch组件 运行模型 JobLauncher Job Step 重试机制 RetryTemplate 输入输出 Item......

peipei巴比
03/31
0
0
Spring Batch_Step Flow

Spring Batch学习_Step Flow http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html With the ability to group steps together within an owning job comes the nee......

秋风醉了
2014/10/31
465
0
Spring Batch并行与扩展

概述 Spring Batch提供了多种方式用于处理并行,提高性能。主要分为2大类: - 单个进程,多线程 - 多个进程 因此,可以细分为以下几类: - 多线程Step(Multi-thread Step,single process) -...

osc_q50is30g
2018/07/24
7
0
SpringBatch系列入门之Tasklet

1、概要 Spring Batch是一个开源的批量处理框架,Spring Batch提供了类和API来读写资源,管理事务,作业处理统计、重启、以及分区技术处理大量数据。在Spring Batch中,一个作业任务可以由多...

稻草鸟人
06/02
9
0

没有更多内容

加载失败,请刷新页面

加载更多

要求jQuery在执行某些操作之前等待所有图像加载的官方方式

问题: In jQuery when you do this: 在jQuery中,当您执行以下操作时: $(function() { alert("DOM is loaded, but images not necessarily all loaded");}); It waits for the DOM t......

法国红酒甜
昨天
11
0
实现Map按值排序

Map按照值排序,需要自定义比较器,实现Comparator接口,实现compare方法。 public class SortByVlue {public static void main(String[] args) {Map<String, Long> map = new HashMap<......

游人未归
昨天
16
0
定天气爬虫加定时发送天气邮件

今天无聊,在家研究个爬虫玩玩 主要用到以下几个库: request 请求资源 iconv-lite转码,有的网站html格式不是utf-8 cheerio类似jq,操作html,获取相关爬虫数据 nodemailer 发送邮件,例如q...

莫西摩西
昨天
14
0
还在为大屏分辨率困扰?图扑提供响应式(自适应)可视化大屏

前言 数据可视化在当下信息时代已经成为炙手可热的话题,而 B/S 化趋势,也使得许多大屏应用上在网页端出现,今天给大家分享一套不一样风格的大屏页面,与传统深蓝色不同,这次采用了暗红色设...

xhload3d
昨天
30
0
如何妙用Spring 数据绑定机制

前言 在剖析完 Spring Boot 返回统一数据格式是怎样实现的?文章之后,一直觉得有必要说明一下 Spring's Data Binding Mechanism 「Spring 数据绑定机制」。 默认情况下,Spring 只知道如何转...

码农小胖哥
2019/12/27
25
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部