文档章节

Spring Batch_使用多线程运行一组JOB

秋风醉了
 秋风醉了
发布于 2014/11/12 14:02
字数 1630
阅读 594
收藏 1
点赞 0
评论 0

Spring Batch_使用多线程运行一组JOB

主要思路:在spring batch中,一个job会完成一个任务,处理一个数据集,有时这个数据集会很大,导致运行时间很长(虽然做了各种优化,数据库访问的优化,代码的优化等等),但是我想如果把这个数据集分成几块,配置几个相同的job来完成同一个任务,每个job处理其中一个数据块。这样不是也能提高效率,节省时间吗?

那么我们就来实验一下,看看可操作性。

如何给给一个大的数据集分块:可以利用limit。通过limit 构造两个sql语句,通过jobParameters 动态传递给运行中的job,那么job的item reader就会读取特定sql 语句查询上来的数据,然后进行处理。

下面我的spring batch的配置文件:

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">


	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<batch:job id="addPeopleDescJob_1">
		<batch:step id="addDescStep_1" parent="abstractStep">
			<batch:tasklet>
				<batch:chunk reader="peopleAddDescReader_1" processor="addDescProcessor"
					writer="addDescPeopleWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<bean id="peopleAddDescReader_1" parent="abstractCursorReader"
		scope="step">
		<property name="sql" value="#{jobParameters['sql1']}" />
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>


	<batch:job id="addPeopleDescJob_2">
		<batch:step id="addDescStep_2" parent="abstractStep">
			<batch:tasklet>
				<batch:chunk reader="peopleAddDescReader_2" processor="addDescProcessor"
					writer="addDescPeopleWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<bean id="peopleAddDescReader_2" parent="abstractCursorReader"
		scope="step">
		<property name="sql" value="#{jobParameters['sql2']}" />
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>


	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>

可以看到有两个job -addPeopleDescJob_1 和 addPeopleDescJob_2,每个job的reader 是不一样的,不一样的地方在 sql参数的不一样,是通过job parameter 动态传递进来的。

下面是AppMain4.java

package com.lyx.batch;

import javax.sql.DataSource;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class AppMain4 {

	private static JdbcTemplate jdbcTemplate;

	@Autowired
	public void setDataSource(DataSource dataSource) {
		jdbcTemplate = new JdbcTemplate(dataSource);
	}

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		final ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch4.xml" });
		final JobLauncher launcher = (JobLauncher) context
				.getBean("jobLauncher");

		int rowCount = jdbcTemplate.queryForObject(
				"select count(*) from people where "
						+ "first_name like '%JOHN%' or last_name like '%DOE%'",
				Integer.class);

		final String sql1;
		final String sql2;
		int mid = (rowCount - 1) >>> 1;
		if ((rowCount & 1) == 0) { // 偶数
			sql1 = "select first_name ,last_name from people where "
					+ "first_name like ? or last_name like ? limit 0," + mid;
			sql2 = "select first_name ,last_name from people where "
					+ "first_name like ? or last_name like ? order by person_id desc limit 0,"
					+ mid;
		} else { // 奇数
			sql1 = "select first_name ,last_name from people where "
					+ "first_name like ? or last_name like ? limit 0," + mid;
			sql2 = "select first_name ,last_name from people where "
					+ "first_name like ? or last_name like ? order by person_id desc limit 0,"
					+ (mid + 1);
		}

		Thread thread_1 = new Thread(new Runnable() {
			public void run() {
				long t1 = System.currentTimeMillis(); // 获取开始时间

				// TODO Auto-generated method stub
				JobParametersBuilder job1 = new JobParametersBuilder();
				job1.addString("sql1", sql1);
				Job task1 = (Job) context.getBean("addPeopleDescJob_1");

				try {
					JobExecution result1 = launcher.run(task1,
							job1.toJobParameters());
					ExitStatus es1 = result1.getExitStatus();
					if (es1.getExitCode().equals(
							ExitStatus.COMPLETED.getExitCode())) {
						System.out.println("job1任务正常完成");
					} else {
						System.out.println("job1任务失败,exitCode="
								+ es1.getExitCode());
					}
				} catch (JobExecutionAlreadyRunningException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (JobRestartException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (JobInstanceAlreadyCompleteException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (JobParametersInvalidException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}
				long t2 = System.currentTimeMillis(); // 获取结束时间
				System.out.println(Thread.currentThread().getName() + "运行时间: "
						+ (t2 - t1) + "ms");
			}
		});

		thread_1.start();

		Thread thread_2 = new Thread(new Runnable() {
			public void run() {
				long t1 = System.currentTimeMillis();
				// TODO Auto-generated method stub
				JobParametersBuilder job2 = new JobParametersBuilder();
				// 设置JobParameter
				job2.addString("sql2", sql2);
				Job task2 = (Job) context.getBean("addPeopleDescJob_2");
				try {
					JobExecution result2 = launcher.run(task2,
							job2.toJobParameters());
					ExitStatus es2 = result2.getExitStatus();
					if (es2.getExitCode().equals(
							ExitStatus.COMPLETED.getExitCode())) {
						System.out.println("job2任务正常完成");
					} else {
						System.out.println("job2任务失败,exitCode="
								+ es2.getExitCode());
					}
				} catch (JobExecutionAlreadyRunningException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (JobRestartException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (JobInstanceAlreadyCompleteException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				} catch (JobParametersInvalidException e) {
					// TODO Auto-generated catch block
					e.printStackTrace();
				}

				long t2 = System.currentTimeMillis(); // 获取结束时间
				System.out.println(Thread.currentThread().getName() + "运行时间: "
						+ (t2 - t1) + "ms");
			}
		});
		thread_2.start();
		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}
}

PeoplePreparedStatementSetter.java

package com.lyx.batch;

import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.springframework.jdbc.core.PreparedStatementSetter;

public class PeoplePreparedStatementSetter implements PreparedStatementSetter {

	public void setValues(PreparedStatement ps) throws SQLException {
		// TODO Auto-generated method stub
		ps.setString(1, "%JOHN%");
		ps.setString(2, "%DOE%");
		// ps.setInt(3, 1);
		// ps.setInt(4, 100);
	}
}

 

运行结果:

job1任务正常完成

Thread-3运行时间: 4573ms

 

job2任务正常完成

Thread-4运行时间: 4627ms

看到每个线程的运行时间都在4秒多。

 

再看一下在一个线程中运行一组Job的情况:

AppMain3.java

package com.lyx.batch;

import javax.sql.DataSource;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
import org.springframework.jdbc.core.JdbcTemplate;
import org.springframework.stereotype.Component;

@Component
public class AppMain3 {

	private static JdbcTemplate jdbcTemplate;

	@Autowired
	public void setDataSource(DataSource dataSource) {
		jdbcTemplate = new JdbcTemplate(dataSource);
	}

	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch4.xml" });
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");

		int rowCount = jdbcTemplate.queryForObject(
				"select count(*) from people where "
						+ "first_name like '%JOHN%' or last_name like '%DOE%'",
				Integer.class);

		String sql1 = null;
		String sql2 = null;
		int mid = (rowCount - 1) >>> 1;
		if ((rowCount & 1) == 0) { // 偶数
			sql1 = "select first_name ,last_name from people where "
					+ "first_name like ? or last_name like ? limit 0," + mid;
			sql2 = "select first_name ,last_name from people where "
					+ "first_name like ? or last_name like ? order by person_id desc limit 0,"
					+ mid;
		} else { // 奇数
			sql1 = "select first_name ,last_name from people where "
					+ "first_name like ? or last_name like ? limit 0," + mid;
			sql2 = "select first_name ,last_name from people where "
					+ "first_name like ? or last_name like ? order by person_id desc limit 0,"
					+ (mid + 1);
		}

		JobParametersBuilder job1 = new JobParametersBuilder();
		job1.addString("sql1", sql1);
		Job task1 = (Job) context.getBean("addPeopleDescJob_1");

		JobExecution result1 = launcher.run(task1, job1.toJobParameters());
		ExitStatus es1 = result1.getExitStatus();
		if (es1.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("job1任务正常完成");
		} else {
			System.out.println("job1任务失败,exitCode=" + es1.getExitCode());
		}

		JobParametersBuilder job2 = new JobParametersBuilder();
		// 设置JobParameter
		job2.addString("sql2", sql2);

		Job task2 = (Job) context.getBean("addPeopleDescJob_2");
		JobExecution result2 = launcher.run(task2, job2.toJobParameters());
		ExitStatus es2 = result2.getExitStatus();
		if (es2.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("job2任务正常完成");
		} else {
			System.out.println("job2任务失败,exitCode=" + es2.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}
}

运行结果:

job1任务正常完成

job2任务正常完成

程序运行时间: 8706ms

结果你也看到了,是不是多线程运行一组job效率更高。但是用多线程,配置一组相同的job带来的问题我没有预计到,虽然提高了效率,但可能给job的重试和重启还有job的管理带来了问题。

==============END==============

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 222
博文 581
码字总数 411013
作品 0
东城
程序员
JdbcPagingItemReader多线程的Step

JdbcPagingItemReader多线程的Step 我们最经常使用的就是 JdbcCursorItemReader,使用游标的方式逐条数据的读取。但是从spring 官方文档我们知道 ,他不是线程安全的。在这里,我们使用 Jdbc...

秋风醉了 ⋅ 2014/11/22 ⋅ 8

spring batch的java config实践

背景 在后台服务开发中, 经常要用到多线程技术进行加速执行, 每家公司都有内部多线程的框架, 这些框架不是文档不规范, 就是只能适用特定场景. 基于这些原因, spring batch带来了更易用, 性能...

涛声依旧啊 ⋅ 2017/11/03 ⋅ 0

大数据批处理框架 Spring Batch全面解析

如今微服务架构讨论的如火如荼。但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易。在诸如银行的金融机构中,每天有3-4万笔的批处理作业需要处理。针对OLTP,业界有大量的开源框...

huojiao2006 ⋅ 2017/01/09 ⋅ 0

SpringBatch job run twice every time, but I want to run only once

Change application.properties spring.batch.job.enabled=false, 缺省值为true @EnableAutoConfiguration for Spring Batch, 缺省情况下会创建一个job, 在context中所有的jobs都会被执行。......

JerryNing ⋅ 04/29 ⋅ 0

Spring Batch_JobParameters

Spring BatchJobParameters spring batch的JobParameters是设置job运行的参数,同时也具有标志job的作用,就是判断两个job是不是同一个job( "how is one JobInstance distinguished from a...

秋风醉了 ⋅ 2014/11/12 ⋅ 0

Spring Batch_异步并发的processor && writer

Spring Batch_异步并发的processor && writer 普通的配置一个job,在这个demo中:http://my.oschina.net/xinxingegeya/blog/343190 job的reader是通过游标读取,commit-interval="2"表示每读...

秋风醉了 ⋅ 2014/11/13 ⋅ 0

Spring Batch_Configuring a Step for Restart

Spring Batch_Configuring a Step for Restart spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#stepRestart 当一个普通的 job 处于complete ......

秋风醉了 ⋅ 2014/11/14 ⋅ 0

Spring Batch_JOB重启机制

Spring Batch_JOB重启机制 在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有...

秋风醉了 ⋅ 2014/11/19 ⋅ 0

spring batch的一个问题

最近一个项目中遇到这样一个问题使用spring batch 将一个数据库中的表定时迁移到 另一个数据库中,我看文档上有 text--》db 等方式,但是没有说 db--->db 再加上在配置的时候有如下一个配置就...

灯光下的宁静 ⋅ 2015/07/20 ⋅ 3

Duplicate entry '0' for key 'PRIMARY'_Spring Batch

Duplicate entry '0' for key 'PRIMARY'Spring Batch 在运行一个job成功后,在运行一次,会出现这个异常Duplicate entry '0' for key 'PRIMARY',也就是主键冲突。 在truncate 表之后执行以下...

秋风醉了 ⋅ 2014/11/12 ⋅ 1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

熊掌号收录比例对于网站原创数据排名的影响[图]

从去年下半年开始,我在写博客了,因为我觉得业余写写博客也还是很不错的,但是从2017年下半年开始,百度已经推出了原创保护功能和熊掌号平台,为此,我也提交了不少以前的老数据,而这些历史...

原创小博客 ⋅ 29分钟前 ⋅ 0

LVM讲解、磁盘故障小案例

LVM LVM就是动态卷管理,可以将多个硬盘和硬盘分区做成一个逻辑卷,并把这个逻辑卷作为一个整体来统一管理,动态对分区进行扩缩空间大小,安全快捷方便管理。 1.新建分区,更改类型为8e 即L...

蛋黄Yolks ⋅ 47分钟前 ⋅ 0

Hadoop Yarn调度器的选择和使用

一、引言 Yarn在Hadoop的生态系统中担任了资源管理和任务调度的角色。在讨论其构造器之前先简单了解一下Yarn的架构。 上图是Yarn的基本架构,其中ResourceManager是整个架构的核心组件,它负...

p柯西 ⋅ 58分钟前 ⋅ 0

uWSGI + Django @ Ubuntu

创建 Django App Project 创建后, 可以看到路径下有一个wsgi.py的问题 uWSGI运行 直接命令行运行 利用如下命令, 可直接访问 uwsgi --http :8080 --wsgi-file dj/wsgi.py 配置文件 & 运行 [u...

袁祾 ⋅ 今天 ⋅ 0

JVM堆的理解

在JVM中,我们经常提到的就是堆了,堆确实很重要,其实,除了堆之外,还有几个重要的模块,看下图: 大 多数情况下,我们并不需要关心JVM的底层,但是如果了解它的话,对于我们系统调优是非常...

不羁之后 ⋅ 昨天 ⋅ 0

推荐:并发情况下:Java HashMap 形成死循环的原因

在淘宝内网里看到同事发了贴说了一个CPU被100%的线上故障,并且这个事发生了很多次,原因是在Java语言在并发情况下使用HashMap造成Race Condition,从而导致死循环。这个事情我4、5年前也经历...

码代码的小司机 ⋅ 昨天 ⋅ 1

聊聊spring cloud gateway的RetryGatewayFilter

序 本文主要研究一下spring cloud gateway的RetryGatewayFilter GatewayAutoConfiguration spring-cloud-gateway-core-2.0.0.RC2-sources.jar!/org/springframework/cloud/gateway/config/G......

go4it ⋅ 昨天 ⋅ 0

创建新用户和授予MySQL中的权限教程

导读 MySQL是一个开源数据库管理软件,可帮助用户存储,组织和以后检索数据。 它有多种选项来授予特定用户在表和数据库中的细微的权限 - 本教程将简要介绍一些选项。 如何创建新用户 在MySQL...

问题终结者 ⋅ 昨天 ⋅ 0

android -------- 颜色的半透明效果配置

最近有朋友问我 Android 背景颜色的半透明效果配置,我网上看资料,总结了一下, 开发中也是常常遇到的,所以来写篇博客 常用的颜色值格式有: RGB ARGB RRGGBB AARRGGBB 这4种 透明度 透明度...

切切歆语 ⋅ 昨天 ⋅ 0

CentOS开机启动subversion

建立自启动脚本: vim /etc/init.d/subversion 输入如下内容: #!/bin/bash## subversion startup script for the server## chkconfig: 2345 90 10# description: start the subve......

随风而飘 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部