文档章节

Spring Batch_异步并发的processor && writer

秋风醉了
 秋风醉了
发布于 2014/11/13 17:10
字数 1206
阅读 400
收藏 1
点赞 0
评论 0

Spring Batch_异步并发的processor && writer

普通的配置一个job,在这个demo中:http://my.oschina.net/xinxingegeya/blog/343190

job的reader是通过游标读取,commit-interval="2"表示每读取两条数据,就要进行process,process完成之后就要进行write,process和write是同步进行的,也就是说

必须process两条之后才能进行write,这两者不能异步进行。无疑,当process过程处理时间过长时,会拖慢整个过程的效率。还有process过程是single thread进行处理的,一个线程中处理两条数据

比用两个线程处理两条数据效率要慢的多(当处理一条数据花费的时间比较多时),这样会拖慢process过程的效率。

那么如何提高整个批处理过程的效率?

  1. 对于proceess和write过程异步化

  2. 在process过程使用多线程处理数据

主要的代码和配置:

spring-batch-async.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<batch:job id="addPeopleDescJob">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet>
				<batch:chunk reader="peopleAddDescReader" processor="asyncProcessor"
					writer="asynWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<bean id="peopleAddDescReader" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>

	<!--配置异步并发的processor和wirter -->
	<bean id="asyncProcessor" class="com.lyx.batch.AsyncPeopleAddDescItemProcessor">
		<property name="taskExecutor" ref="asyncExecutor" />
	</bean>
	<bean id="asyncExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
	<bean id="asynWriter" class="com.lyx.batch.AsyncPeopleAddDescItemWriter" />
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>

AsyncPeopleAddDescItemProcessor.java

package com.lyx.batch;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

public class AsyncPeopleAddDescItemProcessor implements
		ItemProcessor<People, Future<PeopleDESC>> {

	/**
	 * SyncTaskExecutor会在当前线程执行完客户提交给它的任务,即它是以同步方式完成任务的执行的。
	 * 
	 */
	private TaskExecutor taskExecutor = new SyncTaskExecutor();

	public Future<PeopleDESC> process(final People item) throws Exception {
		FutureTask<PeopleDESC> task = new FutureTask<PeopleDESC>(
				new Callable<PeopleDESC>() {
					public PeopleDESC call() throws Exception {
						Thread.sleep(2000);
						System.out.println("process people desc");
						return new PeopleDESC(item.getLastName(),
								item.getFirstName(), Thread.currentThread()
										.getName());
					}
				});
		this.taskExecutor.execute(task);
		return task;
	}

	public TaskExecutor getTaskExecutor() {
		return this.taskExecutor;
	}

	public void setTaskExecutor(TaskExecutor taskExecutor) {
		this.taskExecutor = taskExecutor;
	}

}

AsyncPeopleAddDescItemWriter.java

package com.lyx.batch;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Future;

import javax.sql.DataSource;

import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class AsyncPeopleAddDescItemWriter implements
		ItemWriter<Future<PeopleDESC>> {

	private JdbcTemplate jdbcTemplate;

	@Autowired
	public void setDataSource(DataSource dataSource) {
		this.jdbcTemplate = new JdbcTemplate(dataSource);
	}

	public void write(List<? extends Future<PeopleDESC>> items)
			throws Exception {
		LinkedList<Future<PeopleDESC>> linklist = new LinkedList<Future<PeopleDESC>>(
				items);

		Future<PeopleDESC> future;

		// the head of this linklist
		while ((future = linklist.poll()) != null) {
			if (future.isDone()) {
				if (!future.isCancelled()) {
					System.out.println("write people desc");
					PeopleDESC peopleDESC = future.get();
					this.jdbcTemplate
							.update("insert into ok_people (first_name, last_name, batch_desc) values (?, ?, ?)",
									peopleDESC.getFirstName(),
									peopleDESC.getLastName(),
									peopleDESC.getDesc());
				}
			} else {
				linklist.addLast(future);
			}
		}
	}
}

 

AppMain5.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * spring batch 的异步任务执行 reader processor writer之间的异步执行 程序运行时间: 108884ms
 * 
 * @author Lenovo
 *
 */
public class AppMain5 {

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-async.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}

}

以上就是主要的关于异步并发的代码。

如果使用异步并发的效果是非常明显的,前提条件是处理一条数据花费时间过长,如果不是很长,在极短的时间内就能处理完,使用异步并发和不使用异步并发差别不是很明显。

下面是我没有使用异步并发做测试的类,在process过程中thread.sleep(),2秒钟的时间,同时,异步并发的process也是thread.sleep()两秒的时间。运行一下看看时间上的差别:

(处理的数据是100条,前者是没有使用异步并发,后者使用异步并发)

 * 程序运行时间: 213657ms 程序运行时间: 108884ms

AddPeopleDescProcessor.java

package com.lyx.batch;

import org.springframework.batch.item.ItemProcessor;

public class AddPeopleDescProcessor implements
		ItemProcessor<People, PeopleDESC> {

	public PeopleDESC process(People item) throws Exception {
		Thread.sleep(2000);
		System.out.println("process people desc");
		return new PeopleDESC(item.getLastName(), item.getFirstName(), Thread
				.currentThread().getName());
	}

}

AddDescPeopleWriter.java

package com.lyx.batch;

import java.util.List;

import javax.sql.DataSource;

import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class AddDescPeopleWriter implements ItemWriter<PeopleDESC> {

	private JdbcTemplate jdbcTemplate;

	@Autowired
	public void setDataSource(DataSource dataSource) {
		this.jdbcTemplate = new JdbcTemplate(dataSource);
	}

	public void write(List<? extends PeopleDESC> items) throws Exception {
		for (PeopleDESC peopleDESC : items) {
			System.out.println("write people desc");
			this.jdbcTemplate
					.update("insert into ok_people (first_name, last_name, batch_desc) values (?, ?, ?)",
							peopleDESC.getFirstName(),
							peopleDESC.getLastName(), peopleDESC.getDesc());
		}
	}

}

=============END=============

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 228
博文 579
码字总数 407314
作品 0
东城
程序员
Spring Batch_官网DEMO实现

Spring Batch_官网DEMO实现 http://spring.io/guides/gs/batch-processing/ 使用spring xml方式实现了spring batch官网的demo,现在把具体的代码贴出来,具体的细节配置还要参考官网的说明。...

秋风醉了
2014/11/03
0
0
Spring Batch_JdbcCursorItemReader

Spring Batch_JdbcCursorItemReader 该demo主要完成的功能就是把数据从表people读出来处理加上people的desc属性然后保存到表ok_people。对应的People.java和PeopleDESC.java 对应的数据库表 ...

秋风醉了
2014/11/11
0
0
Spring Batch学习笔记——steps之间共享数据

名词说明: 上下文: 执行: 执行上下文: 案例: 警告:一旦steps共享数据,这些数据就会把这些steps连接起来。努力使steps独立。如果你实在是不能独立他们,才使用下面的技术。你应该把数据...

xhujinjun
2015/04/02
0
0
Spring Batch_ItemReaders and ItemWriters

Spring Batch_ItemReaders and ItemWriters All batch processing can be described in its most simple form as reading in large amounts of data, performing some type of calculation o......

秋风醉了
2014/11/03
0
0
Spring Batch 之 skip讲解(九)

前面的文章跟大家一起讨论了Spring Batch的概念,处理流程,以及SpringBatch处理文件、DB的一些简单实例。接下来的讨论,主要是关于Spring Batch的一些高级应用处理和实际开发中需要注意的一...

长平狐
2012/08/27
666
0
数据处理---Spring Batch之进阶

有了前面两篇的基础之后,我们一起来聊聊更高级的关于Spring Batch的话题。比如Job 配置;自定义各种Listener;控制步骤流程;自定义Reader,Processor,Writer;并行处理。这些东西在Spring ...

hugolyl
2016/07/06
0
0
Spring Batch_Configuring Skip Logic

Spring Batch_Configuring Skip Logic spring 官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#configuringSkip 当spring batch 处理过程中,可能要......

秋风醉了
2014/11/19
0
0
Spring-Batch处理MySQL数据后存到CSV文件

1 介绍 用Spring Batch实现了个简单的需求,从MySQL中读取用户表数据,根据生日计算年龄,将结果输出到csv文件。 1.1 准备表及数据 2 实现 2.1 项目目录 2.2 实体类 2.3 ItemReader TestUse...

yysue
05/26
0
0
Spring Batch_Parallel Steps

Spring BatchParallel Steps使用并行的Step spring 官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html#scalabilityParallelSteps As long as the applic......

秋风醉了
2014/11/14
0
2
spring batch的java config实践

背景 在后台服务开发中, 经常要用到多线程技术进行加速执行, 每家公司都有内部多线程的框架, 这些框架不是文档不规范, 就是只能适用特定场景. 基于这些原因, spring batch带来了更易用, 性能...

涛声依旧啊
2017/11/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

垃圾回收算法

一 如何判断对象可以回收 1 引用计数法 思路大概为:给对象添加一个引用计数器,每当有一个地方引用它时,计数器值加1;当引用失效时,计数器减1;任何时刻计算器为0的对象就是不可能再被使用...

sen_ye
2分钟前
0
0
Activiti简介(学习总结一)

一、介绍 activiti是使用命令模式设计基于bpmn2.0的一款开源工作流引擎。 工作流简单举例:提交请假申请->经理审批->结束。这就是一个简单流程。activiti支持用户自定义流程。配置各个流程对...

沙shasha
2分钟前
0
0
VCL界面控件DevExpress VCL Controls发布v18.1.3|附下载

DevExpress VCL Controls是 Devexpress公司旗下最老牌的用户界面套包。所包含的控件有:数据录入,图表,数据分析,导航,布局,网格,日程管理,样式,打印和工作流等,让您快速开发出完美、...

Miss_Hello_World
3分钟前
0
0
加米谷大数据培训:云计算、大数据和人工智能之间的关系

一般谈云计算的时候会提到大数据、谈人工智能的时候会提大数据、谈人工智能的时候会提云计算……感觉三者之间相辅相成又不可分割。 一、云计算最初的目标 云计算最初的目标是对资源的管理,管...

加米谷大数据
8分钟前
1
0
java集合元素的默认大小

当底层实现涉及到扩容时,容器或重新分配一段更大的连续内存(如果是离散分配则不需要重新分配,离散分配都是插入新元素时动态分配内存),要将容器原来的数据全部复制到新的内存上,这无疑使...

竹叶青出于蓝
10分钟前
1
0
Java快速开发平台,JEECG 3.7.7闪电版本发布,增加多套主流UI代码生成器模板

JEECG 3.7.7 闪电版本发布,提供5套主流UI代码生成器模板 导读 ⊙平台性能优化,速度闪电般提升 ⊙提供5套新的主流UI代码生成器模板(Bootstrap表单+BootstrapTable列表\ ElementUI列表表单)...

Jeecg
14分钟前
0
0
export 和 module.export 的区别

在浏览器端 js 里面,为了解决各模块变量冲突等问题,往往借助于 js 的闭包把左右模块相关的代码都包装在一个匿名函数里。而 Nodejs 编写模块相当的自由,开发者只需要关注 require,exports,...

孟飞阳
16分钟前
1
0
技术教育的兴起

技术教育的兴起 作者: 阮一峰 1、 有一年,我在台湾环岛旅行。 花莲的海边,我遇到一对台湾青年夫妻,带着女儿在海滩上玩。我们聊了起来。 当时,我还在高校当老师。他们问我,是否觉得台湾...

吕伯文
16分钟前
0
0
Linux服务器下的HTTP抓包分析

说到抓包分析,最简单的办法莫过于在客户端直接安装一个Wireshark或者Fiddler了,但是有时候由于客户端开发人员(可能是第三方)知识欠缺或者其它一些原因,无法顺利的在客户端进行抓包分析,...

mylxsw
21分钟前
0
0
mybatis3-javaapi

sqlSessionFactoryBuilder->sqlSessionFactory->sqlSession<-rowbound<-resultHandler myBatis uses a Java enumeration wrapper for transaction isolation levels, called TransactionIsol......

writeademo
24分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部