文档章节

Spring Batch_JOB执行流程分析

秋风醉了
 秋风醉了
发布于 2014/11/21 15:38
字数 1530
阅读 5323
收藏 5
点赞 2
评论 1

Spring Batch_JOB执行流程分析

基于以下job的配置,

<batch:job id="addPeopleDescJob">
    <batch:step id="addDescStep" parent="abstractStep">
        <batch:tasklet>
            <batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor"
                writer="addDescPeopleWriter" commit-interval="2">
            </batch:chunk>
        </batch:tasklet>
    </batch:step>
</batch:job>

debug代码

JobExecution result = launcher.run(job,
		jobParametersBuilder.toJobParameters());

这是启动job的方法,如下是方法的具体实现:

SimpleJobLauncher.java run方法的具体实现(删除了部分代码)

@Override
public JobExecution run(final Job job, final JobParameters jobParameters)
		throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException,
		JobParametersInvalidException {
	final JobExecution jobExecution;
	jobExecution = jobRepository.createJobExecution(job.getName(), jobParameters);
	try {
		taskExecutor.execute(new Runnable() {
			@Override
			public void run() {
				try {
					job.execute(jobExecution);
				}
				catch (Throwable t) {
					rethrow(t);
				}
			}
			private void rethrow(Throwable t) {
			}
		});
	}
	return jobExecution;
}

看taskExecutor.execute 方法的主要逻辑,就是把 job execute 封装进一个抽象的任务内,通过taskExecutor 执行 ,taskExecutor 是一个什么类型——org.springframework.core.task.SyncTaskExecutor,一个同步的任务执行类。这样就可以明确的知道每个chunk的处理都是在一个单线程内,循环往复的处理每个commit-interval。

taskExecutor.execute(new Runnable() {
	@Override
	public void run() {	
		job.execute(jobExecution);	
	}
});

 

继续走代码,那接下来执行的方法是job.execute(jobExecution);那先来看job 是什么类型的对象,

那么就是他了,FlowJob: [name=addPeopleDescJob]。进入job.execute ,

他就直接跳进了 org.springframework.batch.core.job.AbstractJob.execute(JobExecution execution)方法,这是FlowJob的父类,FlowJob 显然没有覆写该方法。

AbstractJob.execute(JobExecution execution)

@Override
public final void execute(JobExecution execution) {
	doExecute(execution);
}

这是方法的主要执行逻辑,那么这个doExecute方法就是FlowJob实现父类的抽象方法,完成 job的执行的任务。

FlowJob.doExecute()

/**
 * @see AbstractJob#doExecute(JobExecution)
 */
@Override
protected void doExecute(final JobExecution execution) throws JobExecutionException {
   try {
      JobFlowExecutor executor = new JobFlowExecutor(getJobRepository(),
            new SimpleStepHandler(getJobRepository()), execution);
      executor.updateJobExecutionStatus(flow.start(executor).getStatus());
   }
   catch (FlowExecutionException e) {
      if (e.getCause() instanceof JobExecutionException) {
         throw (JobExecutionException) e.getCause();
      }
      throw new JobExecutionException("Flow execution ended unexpectedly", e);
   }
}

>>>>>>>>>>>>>>>>>>>>>>>>>>>

继续debug

job是任务的任务的抽象表示,完成的具体任务还要在step中,那么接下来就是step的执行了,step是如何执行的?

我们的代码最终调到这org.springframework.batch.core.step.AbstractStep.execute (StepExecution stepExecution) ,step的抽象类。这个方法的主要逻辑如下:

@Override
public final void execute(StepExecution stepExecution)
		throws JobInterruptedException, UnexpectedJobExecutionException {
	doExecute(stepExecution);
}

这个step又是什么类型的对象——TaskletStep: [name=addDescStep],其父类为AbstractStep。

继续看TaskletStep.doExecute(stepExecution); 

@Override
protected void doExecute(StepExecution stepExecution) throws Exception {
	stepExecution.getExecutionContext().put(TASKLET_TYPE_KEY, tasklet.getClass().getName());
	stepExecution.getExecutionContext().put(STEP_TYPE_KEY, this.getClass().getName());

	stream.update(stepExecution.getExecutionContext());
	getJobRepository().updateExecutionContext(stepExecution);

	// Shared semaphore per step execution, so other step executions can run
	// in parallel without needing the lock
	final Semaphore semaphore = createSemaphore();

	stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {

		@Override
		public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
				throws Exception {

			StepExecution stepExecution = chunkContext.getStepContext().getStepExecution();

			// Before starting a new transaction, check for
			// interruption.
			interruptionPolicy.checkInterrupted(stepExecution);

			RepeatStatus result;
			try {
				result = new TransactionTemplate(transactionManager, transactionAttribute)
				.execute(new ChunkTransactionCallback(chunkContext, semaphore));
			}
			catch (UncheckedTransactionException e) {
				// Allow checked exceptions to be thrown inside callback
				throw (Exception) e.getCause();
			}

			chunkListener.afterChunk(chunkContext);

			// Check for interruption after transaction as well, so that
			// the interrupted exception is correctly propagated up to
			// caller
			interruptionPolicy.checkInterrupted(stepExecution);

			return result;
		}

	});

}

上面就是TaskletStep.doExecute的完整代码,其中主要的逻辑是:

stepOperations.iterate(new StepContextRepeatCallback(stepExecution) {
	@Override
	public RepeatStatus doInChunkContext(RepeatContext repeatContext, ChunkContext chunkContext)
			throws Exception {

		RepeatStatus result;
		try {
			result = new TransactionTemplate(transactionManager, transactionAttribute)
			.execute(new ChunkTransactionCallback(chunkContext, semaphore));
		}

		return result;
	}

});

stepOperations.iterate() 方法传入StepContextRepeatCallback的一个匿名对象。

继续debug,转了一圈又回到这个回调方法上StepContextRepeatCallback.doInChunkContext(),

result = new TransactionTemplate(transactionManager, transactionAttribute)
	.execute(new ChunkTransactionCallback(chunkContext, semaphore));

new TransactionTemplate().execute()方法忽略过去,在这个方法里面最终还要调用ChunkTransactionCallback.doInTransaction() 的回调方法

@Override
public RepeatStatus doInTransaction(TransactionStatus status) {
	RepeatStatus result = RepeatStatus.CONTINUABLE;
	result = tasklet.execute(contribution, chunkContext);
	return result;
}

这个tasklet 的类型是 :org.springframework.batch.core.step.item.ChunkOrientedTasklet ,定义step的执行策略。ChunkOrientedTasklet.execute() 方法:

@Override
public RepeatStatus execute(StepContribution contribution, ChunkContext chunkContext) throws Exception {

	@SuppressWarnings("unchecked")
	Chunk<I> inputs = (Chunk<I>) chunkContext.getAttribute(INPUTS_KEY);
	if (inputs == null) {
		inputs = chunkProvider.provide(contribution);
		if (buffering) {
			chunkContext.setAttribute(INPUTS_KEY, inputs);
		}
	}

	chunkProcessor.process(contribution, inputs);
	chunkProvider.postProcess(contribution, inputs);

	// Allow a message coming back from the processor to say that we
	// are not done yet
	if (inputs.isBusy()) {
		logger.debug("Inputs still busy");
		return RepeatStatus.CONTINUABLE;
	}

	chunkContext.removeAttribute(INPUTS_KEY);
	chunkContext.setComplete();

	logger.debug("Inputs not busy, ended: " + inputs.isEnd());
	return RepeatStatus.continueIf(!inputs.isEnd());

}

Chunk<I> inputs,就是要读入的数据,他是怎么来的 :

inputs = chunkProvider.provide(contribution);

chunkProvider 是 org.springframework.batch.core.step.item.SimpleChunkProvider 的类型,provide方法:

@Override
public Chunk<I> provide(final StepContribution contribution) throws Exception {

	final Chunk<I> inputs = new Chunk<I>();
	repeatOperations.iterate(new RepeatCallback() {

		@Override
		public RepeatStatus doInIteration(final RepeatContext context) throws Exception {
			I item = null;
			try {
				item = read(contribution, inputs);
			}
			catch (SkipOverflowException e) {
				// read() tells us about an excess of skips by throwing an
				// exception
				return RepeatStatus.FINISHED;
			}
			if (item == null) {
				inputs.setEnd();
				return RepeatStatus.FINISHED;
			}
			inputs.add(item);
			contribution.incrementReadCount();
			return RepeatStatus.CONTINUABLE;
		}

	});

	return inputs;

}

在 provide 方法内,逐条读取一块数据(通过jdbc游标来读取一条或通过分页来读取多条)item = read(contribution, inputs);然后放入inputs 数据块chunk inputs 。数据读完了(此commit内,根据commit-interval的配置,读取配置的次数或方法返回null表示数据读取完成)接下来就要处理inputs 数据了,到chunkProcessor.process (contribution, inputs) :

@Override
public final void process(StepContribution contribution, Chunk<I> inputs) throws Exception {

	// Allow temporary state to be stored in the user data field
	initializeUserData(inputs);

	// If there is no input we don't have to do anything more
	if (isComplete(inputs)) {
		return;
	}

	// Make the transformation, calling remove() on the inputs iterator if
	// any items are filtered. Might throw exception and cause rollback.
	Chunk<O> outputs = transform(contribution, inputs);

	// Adjust the filter count based on available data
	contribution.incrementFilterCount(getFilterCount(inputs, outputs));

	// Adjust the outputs if necessary for housekeeping purposes, and then
	// write them out...
	write(contribution, inputs, getAdjustedOutputs(inputs, outputs));

}

在chunkProcessor.process 方法内定义了 两个步骤,一个是transform,也就是process读入的数据集合,一个是write。代码跳到transform这:

protected Chunk<O> transform(StepContribution contribution, Chunk<I> inputs) throws Exception {
	Chunk<O> outputs = new Chunk<O>();
	for (Chunk<I>.ChunkIterator iterator = inputs.iterator(); iterator.hasNext();) {
		final I item = iterator.next();
		O output;
		try {
			output = doProcess(item);
		}
		catch (Exception e) {
			/*
			 * For a simple chunk processor (no fault tolerance) we are done
			 * here, so prevent any more processing of these inputs.
			 */
			inputs.clear();
			throw e;
		}
		if (output != null) {
			outputs.add(output);
		}
		else {
			iterator.remove();
		}
	}
	return outputs;
}

对于 inputs 集合 ,遍历处理后得到 outputs 集合。

output = doProcess(item);

上面这行代码就是要调用自定义的process 方法进行处理。然后看write 方法是如何写入数据的,

protected void write(StepContribution contribution, Chunk<I> inputs, Chunk<O> outputs) throws Exception {
	try {
		doWrite(outputs.getItems());
	}
	catch (Exception e) {
		/*
		 * For a simple chunk processor (no fault tolerance) we are done
		 * here, so prevent any more processing of these inputs.
		 */
		inputs.clear();
		throw e;
	}
	contribution.incrementWriteCount(outputs.size());
}

在write 方法内调用了 doWrite() 方法 ,然后调用自定义的 write方法把数据写入。

上面就是 在一次 commit-interval 内的主要过程和主要的逻辑代码。那么 spring batch 是如何重复commit-interval 的呢 (通过RepeatTemplate)?以后再详细说来。

 

总结:reader,processor和writer过程中数据的处理逻辑

在course reader中,通过游标逐条的读取数据,形成一个inputs chunk,相当于一个集合,然后逐条的处理inputs chunk,进行item process,形成一个outpus chunk,由outputs chunk得到一个集合outputs.getItems(),直接处理这个集合,也就是write方法的参数是一个集合类型。

================END================

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 223
博文 581
码字总数 411013
作品 0
东城
程序员
加载中

评论(1)

d
debugable
写的真不错,正好对reader,processor,writer流程不太理解,通过这个源码现在清楚了,不过还有两个问题,1.JdbcPagingItemReader为什么使用CopyOnWriteArrayList类来存储分页读入的数据?如果reader对应一个writer,使用普通ArrayList不可以吗?
2.分页读取的数据,比如页大小2000,commit-interval设置为200,这样是不是在writer处理完所有两千条数据后,reader读入内存的CopyOnWriteArrayList才会清空?
Spring-Batch批处理框架

官方地址:http://spring.io/guides/gs/batch-processing/ Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的...

boonya ⋅ 2016/08/19 ⋅ 0

Spring Batch 之 框架流程简单介绍(二)

Spring Batch流程介绍: 上图描绘了Spring Batch的执行过程。说明如下: 每个Batch都会包含一个Job。Job就像一个容器,这个容器里装了若干Step,Batch中实际干活的也就是这些Step,至于Step干...

长平狐 ⋅ 2012/08/27 ⋅ 0

批次调度系统--batch-scheduler

batch-scheduler项目简介 这是一个企业级批次调度系统, 在成熟的spring框架基础上,实现ETL调度服务. 权限管理部分,采用asofdate hauth项目. batch-schduler与常见的任务调度系统侧重点不同,...

zhanwei ⋅ 2017/06/22 ⋅ 1

SpringBatch job run twice every time, but I want to run only once

Change application.properties spring.batch.job.enabled=false, 缺省值为true @EnableAutoConfiguration for Spring Batch, 缺省情况下会创建一个job, 在context中所有的jobs都会被执行。......

JerryNing ⋅ 04/29 ⋅ 0

hzwy23/batch-scheduler

Wiki Page oschina 托管地址 github 托管地址 在下Java水平有限,项目中出现错误和不规范的地方,请不吝赐教,感激不尽 项目主要功能图: batch-scheduler 项目简介 这是一个企业级批次调度系统,...

hzwy23 ⋅ 2017/07/11 ⋅ 0

数据处理---Spring Batch之进阶

有了前面两篇的基础之后,我们一起来聊聊更高级的关于Spring Batch的话题。比如Job 配置;自定义各种Listener;控制步骤流程;自定义Reader,Processor,Writer;并行处理。这些东西在Spring ...

hugolyl ⋅ 2016/07/06 ⋅ 0

Spring Batch_JOB重启机制

Spring Batch_JOB重启机制 在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有...

秋风醉了 ⋅ 2014/11/19 ⋅ 0

两个分布式batch调度方案的比较

现在有两个方案,目的都是想提供一个分布式的任务调度执行框架. 背景:若干业务应用,均独立部署,需要做集群. 方案A: 一个调度中心(基于quartz),负责维护和调度job.某个job到点执行时,发送消息...

ice_stream ⋅ 2015/09/02 ⋅ 1

大数据批处理框架 Spring Batch全面解析

如今微服务架构讨论的如火如荼。但在企业架构里除了大量的OLTP交易外,还存在海量的批处理交易。在诸如银行的金融机构中,每天有3-4万笔的批处理作业需要处理。针对OLTP,业界有大量的开源框...

huojiao2006 ⋅ 2017/01/09 ⋅ 0

Spring Batch 之 Spring Batch 简介(一)

Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的Spring框架为基础,使开发者更容易的访问和利用企业级服务。...

长平狐 ⋅ 2012/08/27 ⋅ 1

没有更多内容

加载失败,请刷新页面

加载更多

下一页

火狐浏览器各版本下载及插件httprequest

各版本下载地址:http://ftp.mozilla.org/pub/mozilla.org//firefox/releases/ httprequest插件截至57版本可用

xiaoge2016 ⋅ 21分钟前 ⋅ 0

Java学习路径及练手项目合集

Java学习路径及练手项目合集

颖伙虫 ⋅ 36分钟前 ⋅ 0

Docker系列教程28-实战:使用Docker Compose运行ELK

原文:http://www.itmuch.com/docker/28-docker-compose-in-action-elk/,转载请说明出处。 ElasticSearch【存储】 Logtash【日志聚合器】 Kibana【界面】 答案: version: '2'services: ...

周立_ITMuch ⋅ 今天 ⋅ 0

使用快嘉sdkg极速搭建接口模拟系统

在具体项目研发过程中,一旦前后端双方约定好接口,前端和app同事就会希望后台同事可以尽快提供可供对接的接口方便调试,而对后台同事来说定好接口还仅是个开始、设计流程,实现业务逻辑,编...

fastjrun ⋅ 今天 ⋅ 0

PXE/KickStart 无人值守安装

导言 作为中小公司的运维,经常会遇到一些机械式的重复工作,例如:有时公司同时上线几十甚至上百台服务器,而且需要我们在短时间内完成系统安装。 常规的办法有什么? 光盘安装系统 ===> 一...

kangvcar ⋅ 昨天 ⋅ 0

使用Puppeteer撸一个爬虫

Puppeteer是什么 puppeteer是谷歌chrome团队官方开发的一个无界面(Headless)chrome工具。Chrome Headless将成为web应用自动化测试的行业标杆。所以我们很有必要来了解一下它。所谓的无头浏...

小草先森 ⋅ 昨天 ⋅ 0

Java Done Right

* 表示难度较大或理论性较强。 ** 表示难度更大或理论性更强。 【Java语言本身】 基础语法,面向对象,顺序编程,并发编程,网络编程,泛型,注解,lambda(Java8),module(Java9),var(...

风华神使 ⋅ 昨天 ⋅ 0

Linux系统日志

linux 系统日志 /var/log/messages /etc/logrotate.conf 日志切割配置文件 https://my.oschina.net/u/2000675/blog/908189 logrotate 使用详解 dmesg 命令 /var/log/dmesg 日志 last命令,调......

Linux学习笔记 ⋅ 昨天 ⋅ 0

MVC——统一报文格式的异常处理响应

在我们写controller层的时候,常常会有这样的困惑,如果需要返回一个数据是,可能为了统一回去构造一个类似下列的数据格式: { status:true, msg:"保存成功!", data:[]} 而且在写...

alexzhu592 ⋅ 昨天 ⋅ 0

android -------- 打开本地浏览器或指定浏览器加载,打电话,打开第三方app

开发中常常有打开本地浏览器加载url或者指定浏览器加载, 还有打开第三方app, 如 打开高德地图 百度地图等 在Android程序中我们可以通过发送隐式Intent来启动系统默认的浏览器。 如果手机本身...

切切歆语 ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部