文档章节

Spring Batch_Intercepting Step Execution

秋风醉了
 秋风醉了
发布于 2014/11/19 15:11
字数 1307
阅读 447
收藏 0

Spring Batch_Intercepting Step Execution_配置SkipListener

关于配置skip:http://my.oschina.net/xinxingegeya/blog/346244

先看一下StepListener.java 接口的继承关系图:


StepExecutionListener

StepExecutionListener represents the most generic listener for Step execution. It allows for notification before a Step is started and after it has ends, whether it ended normally or failed


SkipListener

ItemReadListener, ItemProcessListener, and ItemWriteListner all provide mechanisms for being notified of errors, but none will inform you that a record has actually been skipped. onWriteError, for example, will be called even if an item is retried and successful. For this reason, there is a separate interface for tracking skipped items:

public interface SkipListener<T,S> extends StepListener {
    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);
}

onSkipInRead will be called whenever an item is skipped while reading. It should be noted that rollbacks may cause the same item to be registered as skipped more than once. onSkipInWrite will be called when an item is skipped while writing. Because the item has been read successfully (and not skipped), it is also provided the item itself as an argument.

SkipListeners and Transactions

One of the most common use cases for a SkipListener is to log out a skipped item, so that another batch process or even human process can be used to evaluate and fix the issue leading to the skip. Because there are many cases in which the original transaction may be rolled back, Spring Batch makes two guarantees:


The appropriate skip method (depending on when the error happened) will only be called once per item.

The SkipListener will always be called just before the transaction is committed. This is to ensure that any transactional resources call by the listener are not rolled back by a failure within the ItemWriter.


ChunkListener

A chunk is defined as the items processed within the scope of a transaction. Committing a transaction, at each commit interval, commits a 'chunk'. A ChunkListener can be useful to perform logic before a chunk begins processing or after a chunk has completed successfully:

public interface ChunkListener extends StepListener {
    void beforeChunk();
    void afterChunk();
}

The beforeChunk method is called after the transaction is started, but before read is called on the ItemReader. Conversely, afterChunk is called after the chunk has been committed (and not at all if there is a rollback).


上面就例举了几个Listener,那么这些listener 都怎么用:以SkipListener 为例,先来了解一下SkipListener 接口:


我们选择一个SkipListenerSupport ,通过继承 它实现我们自定义逻辑的 SkipListener。如下:

MySkipListener.java

package com.lyx.batch3;

import org.springframework.batch.core.listener.SkipListenerSupport;

import com.lyx.batch.People;
import com.lyx.batch.PeopleDESC;

public class MySkipListener extends SkipListenerSupport<People, PeopleDESC> {

	@Override
	public void onSkipInRead(Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInRead(t);
		System.out.println(">>>>>>>>>>>>>>skip in read>>>>>>>>>>>>>>");
	}

	@Override
	public void onSkipInWrite(PeopleDESC item, Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInWrite(item, t);
		System.out.println(">>>>>>>>>>>>>>skip in write>>>>>>>>>>>>>>");
		System.out.println(">>>>=" + item.toString());
	}

	/**
	 * 当processor抛出 skip include包含的异常时
	 */
	@Override
	public void onSkipInProcess(People item, Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInProcess(item, t);
		System.out.println(">>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>");
		System.out.println(">>>>=" + item.toString());
	}
}


以下是配置文件:

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- add people desc job begin -->
	<batch:job id="addPeopleDescJob">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet>
				<batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor"
					writer="addDescPeopleWriter" commit-interval="2" skip-limit="40">
					<batch:skippable-exception-classes>
						<!--batch:include配置允许发生的异常 -->
						<batch:include class="com.lyx.batch.InvalidDataException" />
					</batch:skippable-exception-classes>
					<batch:listeners>
						<!-- 这里可以配置多个listener -->
						<batch:listener ref="sampleSkipListener" />
					</batch:listeners>
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
		<!-- 在job的运行期间,可以监视job -->
		<batch:listeners>
			<batch:listener ref="sampleListener" />
		</batch:listeners>
	</batch:job>
	<!-- add people desc job end -->
	<bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" />
	<bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" />

	<bean id="peopleAddDescReader" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="allowSkipProcessor" class="com.lyx.batch.AllowSkipProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>
	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>


主要的配置为:

<!-- add people desc job begin -->
<batch:job id="addPeopleDescJob">
	<batch:step id="addDescStep" parent="abstractStep">
		<batch:tasklet>
			<batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor"
				writer="addDescPeopleWriter" commit-interval="2" skip-limit="40">
				<batch:skippable-exception-classes>
					<!--batch:include配置允许发生的异常 -->
					<batch:include class="com.lyx.batch.InvalidDataException" />
				</batch:skippable-exception-classes>
				<batch:listeners>
					<!-- 这里可以配置多个listener -->
					<batch:listener ref="sampleSkipListener" />
				</batch:listeners>
			</batch:chunk>
		</batch:tasklet>
	</batch:step>
	<!-- 在job的运行期间,可以监视job -->
	<batch:listeners>
		<batch:listener ref="sampleListener" />
	</batch:listeners>
</batch:job>
<!-- add people desc job end -->
<bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" />
<bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" />


运行:

AppMain12.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试step listener skip listener
 * 
 * @author Lenovo
 *
 */
public class AppMain12 {
	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-exception-listener.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}
}


运行结果:

.........................................................................

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

job success.........

十一月 19, 2014 3:00:06 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

任务正常完成

程序运行时间: 8779ms

总结:通过skip listener可以看到当skip发生时,可以通过listener捕捉到该事件的发生,从而把skip的数据记录下来,以便做进一步的处理。

====================END====================

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 236
博文 577
码字总数 418437
作品 0
朝阳
程序员
加载中

评论(3)

开源中国首席之首席
开源中国首席之首席
沙发
开源中国首席之首席
开源中国首席之首席
看见了
开源中国首席之首席
开源中国首席之首席
很好
Spring Batch_Intercepting Job Execution

Spring BatchIntercepting Job Execution配置JobExecutionListener 关于spring batch skip 的配置请看:http://my.oschina.net/xinxingegeya/blog/346244 在job的运行期间,我们可以捕捉job......

秋风醉了
2014/11/19
0
0
Spring Batch_Configuring a Step for Restart

Spring Batch_Configuring a Step for Restart spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#stepRestart 当一个普通的 job 处于complete ......

秋风醉了
2014/11/14
0
0
Spring Batch_JOB重启机制

Spring Batch_JOB重启机制 在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有...

秋风醉了
2014/11/19
0
0
Spring Batch_JobParameters

Spring BatchJobParameters spring batch的JobParameters是设置job运行的参数,同时也具有标志job的作用,就是判断两个job是不是同一个job( "how is one JobInstance distinguished from a...

秋风醉了
2014/11/12
0
0
Spring Batch学习笔记——steps之间共享数据

名词说明: 上下文: 执行: 执行上下文: 案例: 警告:一旦steps共享数据,这些数据就会把这些steps连接起来。努力使steps独立。如果你实在是不能独立他们,才使用下面的技术。你应该把数据...

xhujinjun
2015/04/02
0
0

没有更多内容

加载失败,请刷新页面

加载更多

window.parent,top,window.self,parent,opener

在应用有frameset或者iframe的页面时,parent是父窗口,top是最顶级父窗口(有的窗口中套了好几层frameset或者iframe),self是当前窗口, opener是用open方法打开当前窗口的那个窗口。 wind...

inidcard
27分钟前
1
0
Linux下Redis的安装和部署

一、Redis介绍 Redis是当前比较热门的NOSQL系统之一 它是一个key-value存储系统。和Memcache类似,但很大程度补偿了Memcache的不足,它支持存储的value类型相对更多,包括string、list、set...

曾大大胖
30分钟前
2
0
开源堡垒机jumpserver搭建

概述 之前说了国产良心kodexplorer,今天再说一个国内比较好的开源项目jumpserver,除此之外还可以的国内开源项目我觉得就是宝塔面板了。废话不多说上教程搭建。 虽然说你可以看下面的教程不...

bboysoulcn
36分钟前
2
0
iPhoneX 底部距离

iPhoneX 底部距离 CGFloat adjust = 0; if (@available(iOS 11.0, *)) { //Account for possible notch UIEdgeInsets safeArea = [[UIApplication sharedApplication] keyWindow].safeAreaIn......

壹峰
40分钟前
2
0
养生篇01 (饭水分离法)

作者简介❤李祥文,1939年出生于韩国庆尚北道尚州市成昌邑。 24岁时,偶遇奇人习得这种分别喝水吃饭的养生方法,从而治好了自己的胃病和哮喘。从此,李祥文对饮食之于人体的影响产生了极大的...

十九亿少女的梦
56分钟前
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部