文档章节

Spring Batch_异步并发的processor && writer

秋风醉了
 秋风醉了
发布于 2014/11/13 17:10
字数 1206
阅读 2.7K
收藏 2

「深度学习福利」大神带你进阶工程师,立即查看>>>

Spring Batch_异步并发的processor && writer

普通的配置一个job,在这个demo中:http://my.oschina.net/xinxingegeya/blog/343190

job的reader是通过游标读取,commit-interval="2"表示每读取两条数据,就要进行process,process完成之后就要进行write,process和write是同步进行的,也就是说

必须process两条之后才能进行write,这两者不能异步进行。无疑,当process过程处理时间过长时,会拖慢整个过程的效率。还有process过程是single thread进行处理的,一个线程中处理两条数据

比用两个线程处理两条数据效率要慢的多(当处理一条数据花费的时间比较多时),这样会拖慢process过程的效率。

那么如何提高整个批处理过程的效率?

  1. 对于proceess和write过程异步化

  2. 在process过程使用多线程处理数据

主要的代码和配置:

spring-batch-async.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<batch:job id="addPeopleDescJob">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet>
				<batch:chunk reader="peopleAddDescReader" processor="asyncProcessor"
					writer="asynWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<bean id="peopleAddDescReader" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>

	<!--配置异步并发的processor和wirter -->
	<bean id="asyncProcessor" class="com.lyx.batch.AsyncPeopleAddDescItemProcessor">
		<property name="taskExecutor" ref="asyncExecutor" />
	</bean>
	<bean id="asyncExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
	<bean id="asynWriter" class="com.lyx.batch.AsyncPeopleAddDescItemWriter" />
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>

AsyncPeopleAddDescItemProcessor.java

package com.lyx.batch;

import java.util.concurrent.Callable;
import java.util.concurrent.Future;
import java.util.concurrent.FutureTask;

import org.springframework.batch.item.ItemProcessor;
import org.springframework.core.task.SyncTaskExecutor;
import org.springframework.core.task.TaskExecutor;

public class AsyncPeopleAddDescItemProcessor implements
		ItemProcessor<People, Future<PeopleDESC>> {

	/**
	 * SyncTaskExecutor会在当前线程执行完客户提交给它的任务,即它是以同步方式完成任务的执行的。
	 * 
	 */
	private TaskExecutor taskExecutor = new SyncTaskExecutor();

	public Future<PeopleDESC> process(final People item) throws Exception {
		FutureTask<PeopleDESC> task = new FutureTask<PeopleDESC>(
				new Callable<PeopleDESC>() {
					public PeopleDESC call() throws Exception {
						Thread.sleep(2000);
						System.out.println("process people desc");
						return new PeopleDESC(item.getLastName(),
								item.getFirstName(), Thread.currentThread()
										.getName());
					}
				});
		this.taskExecutor.execute(task);
		return task;
	}

	public TaskExecutor getTaskExecutor() {
		return this.taskExecutor;
	}

	public void setTaskExecutor(TaskExecutor taskExecutor) {
		this.taskExecutor = taskExecutor;
	}

}

AsyncPeopleAddDescItemWriter.java

package com.lyx.batch;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.Future;

import javax.sql.DataSource;

import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class AsyncPeopleAddDescItemWriter implements
		ItemWriter<Future<PeopleDESC>> {

	private JdbcTemplate jdbcTemplate;

	@Autowired
	public void setDataSource(DataSource dataSource) {
		this.jdbcTemplate = new JdbcTemplate(dataSource);
	}

	public void write(List<? extends Future<PeopleDESC>> items)
			throws Exception {
		LinkedList<Future<PeopleDESC>> linklist = new LinkedList<Future<PeopleDESC>>(
				items);

		Future<PeopleDESC> future;

		// the head of this linklist
		while ((future = linklist.poll()) != null) {
			if (future.isDone()) {
				if (!future.isCancelled()) {
					System.out.println("write people desc");
					PeopleDESC peopleDESC = future.get();
					this.jdbcTemplate
							.update("insert into ok_people (first_name, last_name, batch_desc) values (?, ?, ?)",
									peopleDESC.getFirstName(),
									peopleDESC.getLastName(),
									peopleDESC.getDesc());
				}
			} else {
				linklist.addLast(future);
			}
		}
	}
}

 

AppMain5.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * spring batch 的异步任务执行 reader processor writer之间的异步执行 程序运行时间: 108884ms
 * 
 * @author Lenovo
 *
 */
public class AppMain5 {

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-async.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}

}

以上就是主要的关于异步并发的代码。

如果使用异步并发的效果是非常明显的,前提条件是处理一条数据花费时间过长,如果不是很长,在极短的时间内就能处理完,使用异步并发和不使用异步并发差别不是很明显。

下面是我没有使用异步并发做测试的类,在process过程中thread.sleep(),2秒钟的时间,同时,异步并发的process也是thread.sleep()两秒的时间。运行一下看看时间上的差别:

(处理的数据是100条,前者是没有使用异步并发,后者使用异步并发)

 * 程序运行时间: 213657ms 程序运行时间: 108884ms

AddPeopleDescProcessor.java

package com.lyx.batch;

import org.springframework.batch.item.ItemProcessor;

public class AddPeopleDescProcessor implements
		ItemProcessor<People, PeopleDESC> {

	public PeopleDESC process(People item) throws Exception {
		Thread.sleep(2000);
		System.out.println("process people desc");
		return new PeopleDESC(item.getLastName(), item.getFirstName(), Thread
				.currentThread().getName());
	}

}

AddDescPeopleWriter.java

package com.lyx.batch;

import java.util.List;

import javax.sql.DataSource;

import org.springframework.batch.item.ItemWriter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class AddDescPeopleWriter implements ItemWriter<PeopleDESC> {

	private JdbcTemplate jdbcTemplate;

	@Autowired
	public void setDataSource(DataSource dataSource) {
		this.jdbcTemplate = new JdbcTemplate(dataSource);
	}

	public void write(List<? extends PeopleDESC> items) throws Exception {
		for (PeopleDESC peopleDESC : items) {
			System.out.println("write people desc");
			this.jdbcTemplate
					.update("insert into ok_people (first_name, last_name, batch_desc) values (?, ?, ?)",
							peopleDESC.getFirstName(),
							peopleDESC.getLastName(), peopleDESC.getDesc());
		}
	}

}

=============END=============

秋风醉了
粉丝 253
博文 530
码字总数 404430
作品 0
朝阳
程序员
私信 提问
加载中
请先登录后再评论。
用vertx实现高吞吐量的站点计数器

工具:vertx,redis,mongodb,log4j 源代码地址:https://github.com/jianglibo/visitrank 先看架构图: 如果你不熟悉vertx,请先google一下。我这里将vertx当作一个容器,上面所有的圆圈要...

jianglibo
2014/04/03
4.4K
3
Promises/A 和 when() 实现--When.js

When.js 是 cujojs 的轻量级的 Promises/A 和 when() 实现,从 wire.js 的异步核心和 cujojs 的 IOC 容器派生而来。包含很多其他有用的 Promiss 相关概念,例如联合多个 promiss、mapping 和...

匿名
2013/02/15
7.5K
0
REST/HTTP 工具包--Spray

Spray 是一个开源的 REST/HTTP 工具包和底层网络 IO 包,基于 Scala 和 Akka 构建。轻量级、异步、非堵塞、基于 actor 模式、模块化和可测试是 spray 的特点。 示例代码: val responses: F...

匿名
2013/02/20
7.2K
0
WSGI Web服务器--UV-Web

uv-web是一个轻量级的支持高并发的WSGI Web服务器,基于libuv构建,部分代码源于开源项目bjoern,本质是python的C扩展,所以适用于部署绝大部分 python web应用(如 Django) 特性 兼容 HTTP 1...

Jone.x
2013/03/04
1.8K
0
django-c10k-demo

这是一个演示程序,用来实现同时 10000 个并发连接到 Django 。涉及的概念包括:the C10k problem, the WebSocket protocol, the Django web framework, and Python's upcoming asynchronou......

匿名
2013/03/27
1.7K
0

没有更多内容

加载失败,请刷新页面

加载更多

Hacker News 简讯 2020-08-15

最后更新时间: 2020-08-15 04:01 Welders set off Beirut blast while securing explosives - (maritime-executive.com) 焊工在固定炸药的同时引爆了贝鲁特爆炸 得分:215 | 评论:209 Factor......

FalconChen
今天
24
0
OSChina 周六乱弹 —— 老椅小猫秋乡梦 梦里石台堆小鱼

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @小小编辑 :《MOM》- 蜡笔小心 《MOM》- 蜡笔小心 手机党少年们想听歌,请使劲儿戳(这里) @狄工 :腾讯又在裁员了,35岁以上清退,抖音看到...

小小编辑
今天
71
1
构建高性能队列,你不得不知道的底层知识!

前言 本文收录于专辑:http://dwz.win/HjK,点击解锁更多数据结构与算法的知识。 你好,我是彤哥。 上一节,我们一起学习了如何将递归改写为非递归,其中,用到的数据结构主要是栈。 栈和队列...

彤哥读源码
今天
17
0
Anaconda下安装keras和tensorflow

Anaconda下安装keras和tensorflow 一、下载并安装Anaconda: Anaconda下载 安装步骤: 如果是多用户操作系统选择All Users,单用户选择Just Me 选择合适的安装路径 然后勾选这个,自动配置环境...

Atlantis-Brook
今天
15
0
滴滴ElasticSearch千万级TPS写入性能翻倍技术剖析

桔妹导读:滴滴ElasticSearch平台承接了公司内部所有使用ElasticSearch的业务,包括核心搜索、RDS从库、日志检索、安全数据分析、指标数据分析等等。平台规模达到了3000+节点,5PB 的数据存储...

滴滴技术
今天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部