文档章节

Spring Batch_JOB重启机制

秋风醉了
 秋风醉了
发布于 2014/11/19 17:27
字数 1838
阅读 3093
收藏 4

Spring Batch_JOB重启机制

在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有什么效果,我没有演示,下面我们就来演示一下当job失败退出后,再重启job有什么效果。

先做一个 导致job失败的情景,如下的processor :

ThrowExceptionProcessor.java

package com.lyx.batch;

import org.springframework.batch.item.ItemProcessor;

public class ThrowExceptionProcessor implements
		ItemProcessor<People, PeopleDESC> {

	public PeopleDESC process(People item) throws Exception {
		System.out.println("process people desc");
		if ("lyx".equals(item.getFirstName())) {
			throw new InvalidDataException("invalid data");
		}
		return new PeopleDESC(item.getLastName(), item.getFirstName(), Thread
				.currentThread().getName());
	}
}


当判断某条数据符合失败条件后,抛出异常 ,导致job失败。

下面是整个配置文件:spring-batch-failure-restart.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- add people desc job begin -->
	<batch:job id="addPeopleDescJob" restartable="true">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet allow-start-if-complete="true"
				start-limit="3">
				<batch:chunk reader="peopleAddDescReader" processor="throwExceptionProcessor"
					writer="addDescPeopleWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<!-- add people desc job end -->

	<bean id="peopleAddDescReader" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="throwExceptionProcessor" class="com.lyx.batch.ThrowExceptionProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>


运行任务:

AppMain13.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试当任务失败时,重启任务
 * 
 * @author Lenovo
 *
 */
public class AppMain13 {
	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-failure-restart.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}
}


第一次运行的结果:

严重: Encountered an error executing step addDescStep in job addPeopleDescJob

com.lyx.batch.InvalidDataException: invalid data

at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)

十一月 19, 2014 4:56:16 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [FAILED]

任务失败,exitCode=FAILED

程序运行时间: 7028ms

如上,显示job失败,那么失败的job 在spring batch 的meta table里存储了什么信息:

mysql> select * from batch_step_execution \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 1
           VERSION: 52
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 1
        START_TIME: 2014-11-19 16:56:11
          END_TIME: 2014-11-19 16:56:16
            STATUS: FAILED
      COMMIT_COUNT: 50
        READ_COUNT: 102
      FILTER_COUNT: 0
       WRITE_COUNT: 100
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1
         EXIT_CODE: FAILED
      EXIT_MESSAGE: com.lyx.batch.InvalidDataException: invalid data
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:1)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
        at o
      LAST_UPDATED: 2014-11-19 16:56:16
1 row in set (0.00 sec)

mysql>

这就是step 的运行信息,请注意几个关键的数据,就是

      COMMIT_COUNT: 50
        READ_COUNT: 102
      FILTER_COUNT: 0
       WRITE_COUNT: 100
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1

这些字段表示什么意思,请看这里:http://docs.spring.io/spring-batch/trunk/reference/html/metaDataSchema.html#metaDataBatchStepExecution

好了,我们先做这些工作,先不去修正会导致异常抛出的数据,我们第二次运行这个job


第二次运行的结果:

信息: Executing step: [addDescStep]

process people desc

十一月 19, 2014 5:03:30 下午 org.springframework.batch.core.step.AbstractStep execute

严重: Encountered an error executing step addDescStep in job addPeopleDescJob

com.lyx.batch.InvalidDataException: invalid data


任务失败,exitCode=FAILED

程序运行时间: 3233ms

十一月 19, 2014 5:03:30 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [FAILED]


如上所示,任务失败,再来看一下失败的任务在 spring batch meta table 里存储了什么信息:

mysql> select * from batch_step_execution where step_execution_id = 2 \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 2
           VERSION: 2
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 2
        START_TIME: 2014-11-19 17:03:30
          END_TIME: 2014-11-19 17:03:30
            STATUS: FAILED
      COMMIT_COUNT: 0
        READ_COUNT: 2
      FILTER_COUNT: 0
       WRITE_COUNT: 0
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1
         EXIT_CODE: FAILED
      EXIT_MESSAGE: com.lyx.batch.InvalidDataException: invalid data
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:1)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
        at o
      LAST_UPDATED: 2014-11-19 17:03:30
1 row in set (0.00 sec)

mysql>


要注意这些数据:

      COMMIT_COUNT: 0
        READ_COUNT: 2
      FILTER_COUNT: 0
       WRITE_COUNT: 0
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1

看到了没,这里说明了读出来的数据条数为 2 , 除了回滚的次数为 1 外,其他为 0 ;


第三次运行的结果:

在第三次运行前,我们把数据库里的数据修正,再运行

update people set first_name = 'hello',last_name = 'DOE' where first_name = 'lyx';

修正完成,那么第三次运行

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

任务正常完成

程序运行时间: 3960ms


好的,运行成功了,最重要的就是此时 spring batch在meta table 里存的数据,再来看一下:

mysql> select * from batch_step_execution where step_execution_id = 3 \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 3
           VERSION: 14
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 3
        START_TIME: 2014-11-19 17:11:40
          END_TIME: 2014-11-19 17:11:42
            STATUS: COMPLETED
      COMMIT_COUNT: 12
        READ_COUNT: 23
      FILTER_COUNT: 0
       WRITE_COUNT: 23
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 0
         EXIT_CODE: COMPLETED
      EXIT_MESSAGE:
      LAST_UPDATED: 2014-11-19 17:11:42
1 row in set (0.00 sec)

mysql>

通过对这三次运行结果的分析,我们可以知道spring batch 对失败的job进行restart ,不是从头开始处理数据,而是从出错的事务边界内第一条记录重复执行的,这样便确保了数据完整性。

当 job 运行成功后(运行成功后也没有必要进行restart),如果 restart 一个job,spring batch就会从第一条记录开始读数据,处理数据,导致数据被重复处理。

 

batch_step_execution 表字段含义

  • STEP_EXECUTION_ID: Primary key that uniquely identifies this execution. The value of this column should be obtainable by calling the getId method of the StepExecution object.

  • VERSION: See above section.

  • STEP_NAME: The name of the step to which this execution belongs.

  • JOB_EXECUTION_ID: Foreign key from the BATCH_JOB_EXECUTION table indicating the JobExecution to which this StepExecution belongs. There may be only one StepExecution for a given JobExecution for a given Step name.

  • START_TIME: Timestamp representing the time the execution was started.

  • END_TIME: Timestamp representing the time the execution was finished, regardless of success or failure. An empty value in this column even though the job is not currently running indicates that there has been some type of error and the framework was unable to perform a last save before failing.

  • STATUS: Character string representing the status of the execution. This may be COMPLETED, STARTED, etc. The object representation of this column is the BatchStatus enumeration.

  • COMMIT_COUNT: The number of times in which the step has committed a transaction during this execution.

  • READ_COUNT: The number of items read during this execution.

  • FILTER_COUNT: The number of items filtered out of this execution.

  • WRITE_COUNT: The number of items written and committed during this execution.

  • READ_SKIP_COUNT: The number of items skipped on read during this execution.

  • WRITE_SKIP_COUNT: The number of items skipped on write during this execution.

  • PROCESS_SKIP_COUNT: The number of items skipped during processing during this execution.

  • ROLLBACK_COUNT: The number of rollbacks during this execution. Note that this count includes each time rollback occurs, including rollbacks for retry and those in the skip recovery procedure.

  • EXIT_CODE: Character string representing the exit code of the execution. In the case of a command line job, this may be converted into a number.

  • EXIT_MESSAGE: Character string representing a more detailed description of how the job exited. In the case of failure, this might include as much of the stack trace as is possible.

  • LAST_UPDATED: Timestamp representing the last time this execution was persisted.

=======================END=======================

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 239
博文 572
码字总数 416654
作品 0
朝阳
程序员
私信 提问
Spring Batch_Configuring a Step for Restart

Spring Batch_Configuring a Step for Restart spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#stepRestart 当一个普通的 job 处于complete ......

秋风醉了
2014/11/14
0
0
Spring Batch 之 框架流程简单介绍(二)

Spring Batch流程介绍: 上图描绘了Spring Batch的执行过程。说明如下: 每个Batch都会包含一个Job。Job就像一个容器,这个容器里装了若干Step,Batch中实际干活的也就是这些Step,至于Step干...

长平狐
2012/08/27
927
0
Spring Batch管理平台

Spring Batch框架出现,降低了开发人员自行编写批处理框架的难度,同时,也让开发人员更加集中于业务逻辑的处理。但是,Spring Batch框架的掌握并非易事,主要集中在:1、技术资料太少;2、S...

我可梦
2016/04/11
2
0
Spring Batch_JobParameters

Spring BatchJobParameters spring batch的JobParameters是设置job运行的参数,同时也具有标志job的作用,就是判断两个job是不是同一个job( "how is one JobInstance distinguished from a...

秋风醉了
2014/11/12
0
0
Duplicate entry '0' for key 'PRIMARY'_Spring Batch

Duplicate entry '0' for key 'PRIMARY'Spring Batch 在运行一个job成功后,在运行一次,会出现这个异常Duplicate entry '0' for key 'PRIMARY',也就是主键冲突。 在truncate 表之后执行以下...

秋风醉了
2014/11/12
0
1

没有更多内容

加载失败,请刷新页面

加载更多

初探Spring Cloud(一)

1. 什么是Spring Cloud? Spring提供了一系列工具,可以帮助开发人员迅速搭建分布式系统中的公共组件(比如:配置管理,服务发现,断路器,智能路由,微代理,控制总线,一次性令牌,全局锁,...

__HuWei
11分钟前
0
0
Mac配置ssh免密钥登录

Mac终端每次使用ssh -p 22 user@ip登录很是麻烦,下面介绍配置ssh免密钥登录: cd ~/.ssh下创建conf文件,写入以下配置: Host test HostName ip Port 22 IdentityFile /Users/t/key/test.p...

littlemesieV
26分钟前
1
0
Spark2.0操作ES

ES提供了支持包来方便的操作ES。首先添加ES的依赖maven: <dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch-spark-20_2.11</artifactId> <version>6.2.0</ver......

守望者之父
28分钟前
1
0
专业术语

1、防御性编程 DruidDataSource类有一个init方法,我们在spring中配置druid时,都会指定 init-method='init'. 而且DruidDataSource也在其他地方,诸如getConnection()方法里作了防御性编程, 也就...

still5656
29分钟前
1
0
微信开发--测试账号相关

1、微信公众平台选择代码开发后,其平台上的部分功能会不可用 可在开发者工具中选择开通测试账号,进行相关的功能调试(测试账号出微信支付不可调试外,其他功能基本上都可以调试) 2、测试账号...

Code辉
35分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部