Spring Batch_JOB重启机制
Spring Batch_JOB重启机制
秋风醉了 发表于3年前
Spring Batch_JOB重启机制
  • 发表于 3年前
  • 阅读 2780
  • 收藏 4
  • 点赞 1
  • 评论 0
摘要: Spring Batch_JOB重启机制

Spring Batch_JOB重启机制

在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有什么效果,我没有演示,下面我们就来演示一下当job失败退出后,再重启job有什么效果。

先做一个 导致job失败的情景,如下的processor :

ThrowExceptionProcessor.java

package com.lyx.batch;

import org.springframework.batch.item.ItemProcessor;

public class ThrowExceptionProcessor implements
		ItemProcessor<People, PeopleDESC> {

	public PeopleDESC process(People item) throws Exception {
		System.out.println("process people desc");
		if ("lyx".equals(item.getFirstName())) {
			throw new InvalidDataException("invalid data");
		}
		return new PeopleDESC(item.getLastName(), item.getFirstName(), Thread
				.currentThread().getName());
	}
}


当判断某条数据符合失败条件后,抛出异常 ,导致job失败。

下面是整个配置文件:spring-batch-failure-restart.xml

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- add people desc job begin -->
	<batch:job id="addPeopleDescJob" restartable="true">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet allow-start-if-complete="true"
				start-limit="3">
				<batch:chunk reader="peopleAddDescReader" processor="throwExceptionProcessor"
					writer="addDescPeopleWriter" commit-interval="2" />
			</batch:tasklet>
		</batch:step>
	</batch:job>
	<!-- add people desc job end -->

	<bean id="peopleAddDescReader" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="throwExceptionProcessor" class="com.lyx.batch.ThrowExceptionProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>


运行任务:

AppMain13.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试当任务失败时,重启任务
 * 
 * @author Lenovo
 *
 */
public class AppMain13 {
	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-failure-restart.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}
}


第一次运行的结果:

严重: Encountered an error executing step addDescStep in job addPeopleDescJob

com.lyx.batch.InvalidDataException: invalid data

at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)

十一月 19, 2014 4:56:16 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [FAILED]

任务失败,exitCode=FAILED

程序运行时间: 7028ms

如上,显示job失败,那么失败的job 在spring batch 的meta table里存储了什么信息:

mysql> select * from batch_step_execution \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 1
           VERSION: 52
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 1
        START_TIME: 2014-11-19 16:56:11
          END_TIME: 2014-11-19 16:56:16
            STATUS: FAILED
      COMMIT_COUNT: 50
        READ_COUNT: 102
      FILTER_COUNT: 0
       WRITE_COUNT: 100
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1
         EXIT_CODE: FAILED
      EXIT_MESSAGE: com.lyx.batch.InvalidDataException: invalid data
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:1)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
        at o
      LAST_UPDATED: 2014-11-19 16:56:16
1 row in set (0.00 sec)

mysql>

这就是step 的运行信息,请注意几个关键的数据,就是

      COMMIT_COUNT: 50
        READ_COUNT: 102
      FILTER_COUNT: 0
       WRITE_COUNT: 100
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1

这些字段表示什么意思,请看这里:http://docs.spring.io/spring-batch/trunk/reference/html/metaDataSchema.html#metaDataBatchStepExecution

好了,我们先做这些工作,先不去修正会导致异常抛出的数据,我们第二次运行这个job


第二次运行的结果:

信息: Executing step: [addDescStep]

process people desc

十一月 19, 2014 5:03:30 下午 org.springframework.batch.core.step.AbstractStep execute

严重: Encountered an error executing step addDescStep in job addPeopleDescJob

com.lyx.batch.InvalidDataException: invalid data


任务失败,exitCode=FAILED

程序运行时间: 3233ms

十一月 19, 2014 5:03:30 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [FAILED]


如上所示,任务失败,再来看一下失败的任务在 spring batch meta table 里存储了什么信息:

mysql> select * from batch_step_execution where step_execution_id = 2 \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 2
           VERSION: 2
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 2
        START_TIME: 2014-11-19 17:03:30
          END_TIME: 2014-11-19 17:03:30
            STATUS: FAILED
      COMMIT_COUNT: 0
        READ_COUNT: 2
      FILTER_COUNT: 0
       WRITE_COUNT: 0
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1
         EXIT_CODE: FAILED
      EXIT_MESSAGE: com.lyx.batch.InvalidDataException: invalid data
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:11)
        at com.lyx.batch.ThrowExceptionProcessor.process(ThrowExceptionProcessor.java:1)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.doProcess(SimpleChunkProcessor.java:126)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.transform(SimpleChunkProcessor.java:293)
        at org.springframework.batch.core.step.item.SimpleChunkProcessor.process(SimpleChunkProcessor.java:192)
        at org.springframework.batch.core.step.item.ChunkOrientedTasklet.execute(ChunkOrientedTasklet.java:75)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:406)
        at org.springframework.batch.core.step.tasklet.TaskletStep$ChunkTransactionCallback.doInTransaction(TaskletStep.java:330)
        at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:133)
        at o
      LAST_UPDATED: 2014-11-19 17:03:30
1 row in set (0.00 sec)

mysql>


要注意这些数据:

      COMMIT_COUNT: 0
        READ_COUNT: 2
      FILTER_COUNT: 0
       WRITE_COUNT: 0
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 1

看到了没,这里说明了读出来的数据条数为 2 , 除了回滚的次数为 1 外,其他为 0 ;


第三次运行的结果:

在第三次运行前,我们把数据库里的数据修正,再运行

update people set first_name = 'hello',last_name = 'DOE' where first_name = 'lyx';

修正完成,那么第三次运行

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

任务正常完成

程序运行时间: 3960ms


好的,运行成功了,最重要的就是此时 spring batch在meta table 里存的数据,再来看一下:

mysql> select * from batch_step_execution where step_execution_id = 3 \G
*************************** 1. row ***************************
 STEP_EXECUTION_ID: 3
           VERSION: 14
         STEP_NAME: addDescStep
  JOB_EXECUTION_ID: 3
        START_TIME: 2014-11-19 17:11:40
          END_TIME: 2014-11-19 17:11:42
            STATUS: COMPLETED
      COMMIT_COUNT: 12
        READ_COUNT: 23
      FILTER_COUNT: 0
       WRITE_COUNT: 23
   READ_SKIP_COUNT: 0
  WRITE_SKIP_COUNT: 0
PROCESS_SKIP_COUNT: 0
    ROLLBACK_COUNT: 0
         EXIT_CODE: COMPLETED
      EXIT_MESSAGE:
      LAST_UPDATED: 2014-11-19 17:11:42
1 row in set (0.00 sec)

mysql>

通过对这三次运行结果的分析,我们可以知道spring batch 对失败的job进行restart ,不是从头开始处理数据,而是从出错的事务边界内第一条记录重复执行的,这样便确保了数据完整性。

当 job 运行成功后(运行成功后也没有必要进行restart),如果 restart 一个job,spring batch就会从第一条记录开始读数据,处理数据,导致数据被重复处理。

 

batch_step_execution 表字段含义

  • STEP_EXECUTION_ID: Primary key that uniquely identifies this execution. The value of this column should be obtainable by calling the getId method of the StepExecution object.

  • VERSION: See above section.

  • STEP_NAME: The name of the step to which this execution belongs.

  • JOB_EXECUTION_ID: Foreign key from the BATCH_JOB_EXECUTION table indicating the JobExecution to which this StepExecution belongs. There may be only one StepExecution for a given JobExecution for a given Step name.

  • START_TIME: Timestamp representing the time the execution was started.

  • END_TIME: Timestamp representing the time the execution was finished, regardless of success or failure. An empty value in this column even though the job is not currently running indicates that there has been some type of error and the framework was unable to perform a last save before failing.

  • STATUS: Character string representing the status of the execution. This may be COMPLETED, STARTED, etc. The object representation of this column is the BatchStatus enumeration.

  • COMMIT_COUNT: The number of times in which the step has committed a transaction during this execution.

  • READ_COUNT: The number of items read during this execution.

  • FILTER_COUNT: The number of items filtered out of this execution.

  • WRITE_COUNT: The number of items written and committed during this execution.

  • READ_SKIP_COUNT: The number of items skipped on read during this execution.

  • WRITE_SKIP_COUNT: The number of items skipped on write during this execution.

  • PROCESS_SKIP_COUNT: The number of items skipped during processing during this execution.

  • ROLLBACK_COUNT: The number of rollbacks during this execution. Note that this count includes each time rollback occurs, including rollbacks for retry and those in the skip recovery procedure.

  • EXIT_CODE: Character string representing the exit code of the execution. In the case of a command line job, this may be converted into a number.

  • EXIT_MESSAGE: Character string representing a more detailed description of how the job exited. In the case of failure, this might include as much of the stack trace as is possible.

  • LAST_UPDATED: Timestamp representing the last time this execution was persisted.

=======================END=======================

共有 人打赏支持
粉丝 215
博文 608
码字总数 434545
×
秋风醉了
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: