文档章节

Spring Batch_JOB重启机制

秋风醉了
 秋风醉了
发布于 2014/11/19 17:27
字数 1838
阅读 2941
收藏 4

Spring Batch_JOB重启机制

在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有什么效果,我没有演示,下面我们就来演示一下当job失败退出后,再重启job有什么效果。

先做一个 导致job失败的情景,如下的processor :

ThrowExceptionProcessor.java

package com.lyx.batch;

import org.springframework.batch.item.ItemProcessor;

public class ThrowExceptionProcessor implements
		ItemProcessor<People, PeopleDESC> {

	public PeopleDESC process(People item) throws Exception {
		System.out.println("process people desc");
		if ("lyx".equals(item.getFirstName())) {
			throw new InvalidDataException("invalid data");
		}
		return new PeopleDESC(item.getLastName(), item.getFirstName(), Thread
				.currentThread().getName());
	}
}


当判断某条数据符合失败条件后,抛出异常 ,导致job失败。

下面是整个配置文件:spring-batch-failure-restart.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- add people desc job begin -->
	<batch:job id="addPeopleDescJob" restartable="true">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet allow-start-if-complete="true"
				start-limit="3">
				<batch:chunk reader="peopleAddDescReader" processor="throwExceptionProcessor"
					writer="addDescPeopleWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<!-- add people desc job end -->

	<bean id="peopleAddDescReader" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="throwExceptionProcessor" class="com.lyx.batch.ThrowExceptionProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>


运行任务:

AppMain13.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试当任务失败时,重启任务
 * 
 * @author Lenovo
 *
 */
public class AppMain13 {
	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-failure-restart.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}
}


第一次运行的结果:

严重: Encountered an error executing step addDescStep in job addPeopleDescJob

com.lyx.batch.InvalidDataException: invalid data

at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)

十一月 19, 2014 4:56:16 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [FAILED]

任务失败,exitCode=FAILED

程序运行时间: 7028ms

如上,显示job失败,那么失败的job 在spring batch 的meta table里存储了什么信息:

mysql> select * from batch_step_execution \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 1
           VERSION: 52
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 1
        START_TIME: 2014-11-19 16:56:11
          END_TIME: 2014-11-19 16:56:16
            STATUS: FAILED
      COMMIT_COUNT: 50
        READ_COUNT: 102
      FILTER_COUNT: 0
       WRITE_COUNT: 100
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1
         EXIT_CODE: FAILED
      EXIT_MESSAGE: com.lyx.batch.InvalidDataException: invalid data
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:1)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
        at o
      LAST_UPDATED: 2014-11-19 16:56:16
1 row in set (0.00 sec)

mysql>

这就是step 的运行信息,请注意几个关键的数据,就是

      COMMIT_COUNT: 50
        READ_COUNT: 102
      FILTER_COUNT: 0
       WRITE_COUNT: 100
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1

这些字段表示什么意思,请看这里:http://docs.spring.io/spring-batch/trunk/reference/html/metaDataSchema.html#metaDataBatchStepExecution

好了,我们先做这些工作,先不去修正会导致异常抛出的数据,我们第二次运行这个job


第二次运行的结果:

信息: Executing step: [addDescStep]

process people desc

十一月 19, 2014 5:03:30 下午 org.springframework.batch.core.step.AbstractStep execute

严重: Encountered an error executing step addDescStep in job addPeopleDescJob

com.lyx.batch.InvalidDataException: invalid data


任务失败,exitCode=FAILED

程序运行时间: 3233ms

十一月 19, 2014 5:03:30 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [FAILED]


如上所示,任务失败,再来看一下失败的任务在 spring batch meta table 里存储了什么信息:

mysql> select * from batch_step_execution where step_execution_id = 2 \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 2
           VERSION: 2
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 2
        START_TIME: 2014-11-19 17:03:30
          END_TIME: 2014-11-19 17:03:30
            STATUS: FAILED
      COMMIT_COUNT: 0
        READ_COUNT: 2
      FILTER_COUNT: 0
       WRITE_COUNT: 0
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1
         EXIT_CODE: FAILED
      EXIT_MESSAGE: com.lyx.batch.InvalidDataException: invalid data
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:1)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
        at o
      LAST_UPDATED: 2014-11-19 17:03:30
1 row in set (0.00 sec)

mysql>


要注意这些数据:

      COMMIT_COUNT: 0
        READ_COUNT: 2
      FILTER_COUNT: 0
       WRITE_COUNT: 0
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1

看到了没,这里说明了读出来的数据条数为 2 , 除了回滚的次数为 1 外,其他为 0 ;


第三次运行的结果:

在第三次运行前,我们把数据库里的数据修正,再运行

update people set first_name = 'hello',last_name = 'DOE' where first_name = 'lyx';

修正完成,那么第三次运行

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

任务正常完成

程序运行时间: 3960ms


好的,运行成功了,最重要的就是此时 spring batch在meta table 里存的数据,再来看一下:

mysql> select * from batch_step_execution where step_execution_id = 3 \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 3
           VERSION: 14
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 3
        START_TIME: 2014-11-19 17:11:40
          END_TIME: 2014-11-19 17:11:42
            STATUS: COMPLETED
      COMMIT_COUNT: 12
        READ_COUNT: 23
      FILTER_COUNT: 0
       WRITE_COUNT: 23
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 0
         EXIT_CODE: COMPLETED
      EXIT_MESSAGE:
      LAST_UPDATED: 2014-11-19 17:11:42
1 row in set (0.00 sec)

mysql>

通过对这三次运行结果的分析,我们可以知道spring batch 对失败的job进行restart ,不是从头开始处理数据,而是从出错的事务边界内第一条记录重复执行的,这样便确保了数据完整性。

当 job 运行成功后(运行成功后也没有必要进行restart),如果 restart 一个job,spring batch就会从第一条记录开始读数据,处理数据,导致数据被重复处理。

 

batch_step_execution 表字段含义

  • STEP_EXECUTION_ID: Primary key that uniquely identifies this execution. The value of this column should be obtainable by calling the getId method of the StepExecution object.

  • VERSION: See above section.

  • STEP_NAME: The name of the step to which this execution belongs.

  • JOB_EXECUTION_ID: Foreign key from the BATCH_JOB_EXECUTION table indicating the JobExecution to which this StepExecution belongs. There may be only one StepExecution for a given JobExecution for a given Step name.

  • START_TIME: Timestamp representing the time the execution was started.

  • END_TIME: Timestamp representing the time the execution was finished, regardless of success or failure. An empty value in this column even though the job is not currently running indicates that there has been some type of error and the framework was unable to perform a last save before failing.

  • STATUS: Character string representing the status of the execution. This may be COMPLETED, STARTED, etc. The object representation of this column is the BatchStatus enumeration.

  • COMMIT_COUNT: The number of times in which the step has committed a transaction during this execution.

  • READ_COUNT: The number of items read during this execution.

  • FILTER_COUNT: The number of items filtered out of this execution.

  • WRITE_COUNT: The number of items written and committed during this execution.

  • READ_SKIP_COUNT: The number of items skipped on read during this execution.

  • WRITE_SKIP_COUNT: The number of items skipped on write during this execution.

  • PROCESS_SKIP_COUNT: The number of items skipped during processing during this execution.

  • ROLLBACK_COUNT: The number of rollbacks during this execution. Note that this count includes each time rollback occurs, including rollbacks for retry and those in the skip recovery procedure.

  • EXIT_CODE: Character string representing the exit code of the execution. In the case of a command line job, this may be converted into a number.

  • EXIT_MESSAGE: Character string representing a more detailed description of how the job exited. In the case of failure, this might include as much of the stack trace as is possible.

  • LAST_UPDATED: Timestamp representing the last time this execution was persisted.

=======================END=======================

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 232
博文 574
码字总数 405033
作品 0
朝阳
程序员
Spring Batch_Configuring a Step for Restart

Spring Batch_Configuring a Step for Restart spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#stepRestart 当一个普通的 job 处于complete ......

秋风醉了
2014/11/14
0
0
Spring Batch_JobParameters

Spring BatchJobParameters spring batch的JobParameters是设置job运行的参数,同时也具有标志job的作用,就是判断两个job是不是同一个job( "how is one JobInstance distinguished from a...

秋风醉了
2014/11/12
0
0
Spring Batch_使用多线程运行一组JOB

Spring Batch_使用多线程运行一组JOB 主要思路:在spring batch中,一个job会完成一个任务,处理一个数据集,有时这个数据集会很大,导致运行时间很长(虽然做了各种优化,数据库访问的优化,...

秋风醉了
2014/11/12
0
0
Duplicate entry '0' for key 'PRIMARY'_Spring Batch

Duplicate entry '0' for key 'PRIMARY'Spring Batch 在运行一个job成功后,在运行一次,会出现这个异常Duplicate entry '0' for key 'PRIMARY',也就是主键冲突。 在truncate 表之后执行以下...

秋风醉了
2014/11/12
0
1
Spring-Batch批处理框架

官方地址:http://spring.io/guides/gs/batch-processing/ Spring Batch是一个轻量级的,完全面向Spring的批处理框架,可以应用于企业级大量的数据处理系统。Spring Batch以POJO和大家熟知的...

boonya
2016/08/19
469
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

JS:异步 - 面试惨案

为什么会写这篇文章,很明显不符合我的性格的东西,原因是前段时间参与了一个面试,对于很多程序员来说,面试时候多么的鸦雀无声,事后心里就有多么的千军万马。去掉最开始毕业干了一年的Jav...

xmqywx
今天
0
0
Win10 64位系统,PHP 扩展 curl插件

执行:1. 拷贝php安装目录下,libeay32.dll、ssleay32.dll 、 libssh2.dll 到 C:\windows\system32 目录。2. 拷贝php/ext目录下, php_curl.dll 到 C:\windows\system32 目录; 3. p...

放飞E梦想O
今天
0
0
谈谈神秘的ES6——(五)解构赋值【对象篇】

上一节课我们了解了有关数组的解构赋值相关内容,这节课,我们接着,来讲讲对象的解构赋值。 解构不仅可以用于数组,还可以用于对象。 let { foo, bar } = { foo: "aaa", bar: "bbb" };fo...

JandenMa
今天
1
0
OSChina 周一乱弹 —— 有人要给本汪介绍妹子啦

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @莱布妮子 :分享水木年华的单曲《中学时代》@小小编辑 手机党少年们想听歌,请使劲儿戳(这里) @须臾时光:夏天还在做最后的挣扎,但是晚上...

小小编辑
今天
21
5
centos7安装redis及开机启动

配置编译环境: sudo yum install gcc-c++ 下载源码: wget http://download.redis.io/releases/redis-3.2.8.tar.gz 解压源码: tar -zxvf redis-3.2.8.tar.gz 进入到解压目录: cd redis-3......

hotsmile
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部