文档章节

JdbcPagingItemReader多线程的Step

秋风醉了
 秋风醉了
发布于 2014/11/22 01:07
字数 1184
阅读 3357
收藏 8
点赞 0
评论 8

JdbcPagingItemReader多线程的Step

我们最经常使用的就是 JdbcCursorItemReader,使用游标的方式逐条数据的读取。但是从spring 官方文档我们知道 ,他不是线程安全的。在这里,我们使用 JdbcPagingItemReader从数据库读取数据,并且是分页的读,而且这个类是线程安全的,那么我们就可以使用多线程的Step,从而提高JOB的执行效率。

下面是主要的配置文件:

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractJdbcPagingItemReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcPagingItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- add people desc job begin -->
	<batch:job id="addPeopleDescJob">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet>
				<batch:chunk reader="peopleAddDescReader" processor="addDescProcessor"
					writer="addDescPeopleWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<!-- add people desc job end -->

	<!-- 使用分页的reader begin -->
	<bean id="peopleAddDescReader" parent="abstractJdbcPagingItemReader"
		scope="step">
		<property name="queryProvider">
			<bean
				class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
				<property name="dataSource" ref="dataSource" />
				<property name="selectClause" value="select person_id, first_name, last_name" />
				<property name="fromClause" value="from people" />
				<property name="whereClause"
					value="where ( first_name like :first_name or last_name like :last_name ) " />
				<property name="sortKey" value="person_id" />
			</bean>
		</property>
		<property name="parameterValues">
			<map>
				<entry key="first_name" value="#{jobParameters['first_name']}" />
				<entry key="last_name" value="#{jobParameters['last_name']}" />
			</map>
		</property>
		<!-- 配置limit的大小 -->
		<property name="pageSize" value="2" />
		<property name="rowMapper" ref="peopleRowMapper" />
	</bean>
	<!-- 使用分页的reader end -->

	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>

主要的配置就是:

<!-- 使用分页的reader begin -->
<bean id="peopleAddDescReader" parent="abstractJdbcPagingItemReader"
	scope="step">
	<property name="queryProvider">
		<bean
			class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
			<property name="dataSource" ref="dataSource" />
			<property name="selectClause" value="select person_id, first_name, last_name" />
			<property name="fromClause" value="from people" />
			<property name="whereClause"
				value="where ( first_name like :first_name or last_name like :last_name ) " />
			<property name="sortKey" value="person_id" />
		</bean>
	</property>
	<property name="parameterValues">
		<map>
			<entry key="first_name" value="#{jobParameters['first_name']}" />
			<entry key="last_name" value="#{jobParameters['last_name']}" />
		</map>
	</property>
	<!-- 配置limit的大小 -->
	<property name="pageSize" value="2" />
	<property name="rowMapper" ref="peopleRowMapper" />
</bean>
<!-- 使用分页的reader end -->

其他类的从前面的文章找出处,下面是我为了调试与前面不同的类

PeopleRowMapper.java

package com.lyx.batch;

import java.sql.ResultSet;
import java.sql.SQLException;

import org.springframework.jdbc.core.RowMapper;

public class PeopleRowMapper implements RowMapper<People> {

	public People mapRow(ResultSet rs, int rowNum) throws SQLException {
		People p = new People();
		System.out.println("-----------------------person_id-"
				+ rs.getInt("person_id"));
		p.setId(rs.getInt("person_id"));
		p.setFirstName(rs.getString("first_name"));
		p.setLastName(rs.getString("last_name"));
		return p;
	}

}

 

运行程序AppMain14.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试 使用分页的 reader
 * 
 * @author Lenovo
 *
 */
public class AppMain14 {

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-paging.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		jobParametersBuilder.addString("first_name", "%JOHN%");
		jobParametersBuilder.addString("last_name", "%DOE%");
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}

}

运行结果:

-----------------------person_id-157

-----------------------person_id-158

process people desc

process people desc

write people desc

write people desc

任务正常完成

程序运行时间: 11929ms

十一月 22, 2014 12:29:50 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]

最后是成功了。其实更重要的是JdbcPagingItemReader 的实现方式和源码。为什么他是线程安全的,为什么他能分页读,这才是我们最终关心的。

这里我们使用的还是单线程的方式运行的该job ,下面我们来配置多线程的step。配置很简单。就是配置一个异步的spring task executor,使用该异步 task executor 来运行我们的job。

 

看如下的配置:

<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<!-- add people desc job begin -->
<batch:job id="addPeopleDescJob">
	<batch:step id="addDescStep" parent="abstractStep">
		<batch:tasklet task-executor="taskExecutor">
			<batch:chunk reader="peopleAddDescReader" processor="addDescProcessor"
				writer="addDescPeopleWriter" commit-interval="2" />
		</batch:tasklet>
	</batch:step>
</batch:job>
<!-- add people desc job end -->

这里的 taskExecutor 就是一个异步的 task executor。

下面运行一下多线程的step。运行结果:

-----------------------person_id-157

-----------------------person_id-158

process people desc

process people desc

write people desc

write people desc

十一月 22, 2014 1:01:35 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]

任务正常完成

程序运行时间: 8577ms

成功,好的,你是否注意到和上面的单线程的step 比,是不是程序运行的时间要少了。

关于 JdbcPagingItemReader 的实现方式和其线程安全性,如何分页,JdbcPagingItemReader的分页策略我们在下面文章道来。

==============END==============

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 222
博文 581
码字总数 411013
作品 0
东城
程序员
加载中

评论(8)

沈浩z
可以了 呵呵
沈浩z
可以了 呵呵
沈浩z
能加qq 1439226817
沈浩z
能加qq 1439226817
沈浩z
spring-batch-paging.xml 数据好像没有去 okpeople 表? 是什么原因额?
秋风醉了
秋风醉了

引用来自“沈浩z”的评论

<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

这两个类怎么没有?
http://git.oschina.net/xinxingegeya/spring-batch-showcase
沈浩z
<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

这两个类怎么没有?
沈浩z
能不能给代码?
JdbcPagingItemReader分页和线程安全

JdbcPagingItemReader分页和线程安全 通过源码来分析其分页和线程安全。 http://my.oschina.net/xinxingegeya/blog/347227 http://my.oschina.net/xinxingegeya/blog/344117 多线程的Step中的......

秋风醉了 ⋅ 2014/11/24 ⋅ 1

Spring batch 中分区partitioner,使用异步线程怎么使用JdbcPagingItemReader

如下是报错信息: org.springframework.batch.item.ItemStreamException: Failed to initialize the reader at org.springframework.batch.item.support.AbstractItemCountingItemStreamItem......

龍sky腾 ⋅ 01/15 ⋅ 0

spring batch 分页读取数据

使用org.springframework.batch.item.database.JdbcPagingItemReader 读取数据, 配置如下:

chenlei65368 ⋅ 2013/07/25 ⋅ 0

springBatch的一次实战案例

<bean id="initGoldBeanCustRepeatCheckDataTasklet" class="com.heng.group.biz.job.globalcustomer.check.InitGoldBeanCustRepeatCheckDataTasklet"/> <bean id="pendingCheckRepeatDataRe......

清风少年 ⋅ 2013/08/05 ⋅ 0

SpringBatch企业批处理框架Reader的使用

SpringBatch是解决企业数据逻辑较简单,重复性高,大数据量而设计的.从他提供的各种Reader就能看出来.起码我是这样理解的.最适合做的如:数据清洗,数据分析后转移,或者定时的和其他系统交互的地...

震秦 ⋅ 2012/10/20 ⋅ 7

spring batch 分页查询 oracle数据库 提示列名无效

spring batch 分页查询 oracle数据库 提示列名无效 没有提示具体哪列 spring batch使用的是2.1.8 使用org.springframework.batch.item.database.JdbcPagingItemReader,org.springframework.......

无知的小孩 ⋅ 2016/10/10 ⋅ 0

数据处理---Spring Batch之进阶

有了前面两篇的基础之后,我们一起来聊聊更高级的关于Spring Batch的话题。比如Job 配置;自定义各种Listener;控制步骤流程;自定义Reader,Processor,Writer;并行处理。这些东西在Spring ...

hugolyl ⋅ 2016/07/06 ⋅ 0

spring batch的java config实践

背景 在后台服务开发中, 经常要用到多线程技术进行加速执行, 每家公司都有内部多线程的框架, 这些框架不是文档不规范, 就是只能适用特定场景. 基于这些原因, spring batch带来了更易用, 性能...

涛声依旧啊 ⋅ 2017/11/03 ⋅ 0

Debugging with GDB 用GDB调试多线程程序

Debugging with GDB http://www.delorie.com/gnu/docs/gdb/gdb_25.html GDB调试多线程程序总结 一直对GDB多线程调试接触不多,最近因为工作有了一些接触,简单作点记录吧。 先介绍一下GDB多线...

AlphaJay ⋅ 2011/07/22 ⋅ 0

多线程下写日志

鄙人最近遇到了一个奇特的线上事故,记录一下,以备记忆。 鄙人所在的部门负责给公司提供各种基础库,即基础架构部门。最近某别的部门用本部门提供的支持多线程版本的日志库后,出现这样一个...

alexstocks ⋅ 2014/10/31 ⋅ 2

没有更多内容

加载失败,请刷新页面

加载更多

下一页

RabbitMQ学习以及与Spring的集成(三)

本文介绍RabbitMQ与Spring的简单集成以及消息的发送和接收。 在RabbitMQ的Spring配置文件中,首先需要增加命名空间。 xmlns:rabbit="http://www.springframework.org/schema/rabbit" 其次是模...

onedotdot ⋅ 27分钟前 ⋅ 0

JAVA实现仿微信红包分配规则

最近过年发红包拜年成为一种新的潮流,作为程序猿对算法的好奇远远要大于对红包的好奇,这里介绍一种自己想到的一种随机红包分配策略,还请大家多多指教。 算法介绍 一、红包金额限制 对于微...

小致dad ⋅ 39分钟前 ⋅ 0

Python 数电表格格式化 xlutils xlwt xlrd的使用

需要安装 xlutils xlwt xlrd 格式化前 格式化后 代码 先copy读取的表格,然后按照一定的规则修改,将昵称中的学号提取出来替换昵称即可 from xlrd import open_workbookfrom xlutils.copy ...

阿豪boy ⋅ 今天 ⋅ 0

面试题:使用rand5()生成rand7()

前言 读研究生这3 年,思维与本科相比变化挺大的,这几年除了看论文、设计方案,更重要的是学会注重先思考、再实现,感觉更加成熟吧,不再像个小P孩,人年轻时总会心高气傲。有1 道面试题:给...

初雪之音 ⋅ 今天 ⋅ 0

Docker Toolbox Looks like something went wrong

Docker Toolbox 重新安装后提示错误:Looks like something went wrong in step ´Checking if machine default exists´ 控制面板-->程序与应用-->启用或关闭windows功能:找到Hyper-V,如果处......

随你疯 ⋅ 今天 ⋅ 0

Guacamole 远程桌面

本文将Apache的guacamole服务的部署和应用,http://guacamole.apache.org/doc/gug/ 该链接下有全部相关知识的英文文档,如果水平ok,可以去这里仔细查看。 一、简介 Apache Guacamole 是无客...

千里明月 ⋅ 今天 ⋅ 0

nagios 安装

Nagios简介:监控网络并排除网络故障的工具:nagios,Ntop,OpenVAS,OCS,OSSIM等开源监控工具。 可以实现对网络上的服务器进行全面的监控,包括服务(apache、mysql、ntp、ftp、disk、qmail和h...

寰宇01 ⋅ 今天 ⋅ 0

AngularDart注意事项

默认情况下创建Dart项目应出现以下列表: 有时会因为不知明的原因导致列表项缺失: 此时可以通过以下步骤解决: 1.创建项目涉及到的包:stagehand 2.执行pub global activate stagehand或pub...

scooplol ⋅ 今天 ⋅ 0

Java Web如何操作Cookie的添加修改和删除

创建Cookie对象 Cookie cookie = new Cookie("id", "1"); 修改Cookie值 cookie.setValue("2"); 设置Cookie有效期和删除Cookie cookie.setMaxAge(24*60*60); // Cookie有效时间 co......

二营长意大利炮 ⋅ 今天 ⋅ 0

【每天一个JQuery特效】淡入淡出显示或隐藏窗口

我是JQuery新手爱好者,有时间就练练代码,防止手生,争取每天一个JQuery练习,在这个博客记录下学习的笔记。 本特效主要采用fadeIn()和fadeOut()方法显示淡入淡出的显示效果显示或隐藏元...

Rhymo-Wu ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部