文档章节

Spring Batch_Intercepting Step Execution

秋风醉了
 秋风醉了
发布于 2014/11/19 15:11
字数 1307
阅读 454
收藏 0

Spring Batch_Intercepting Step Execution_配置SkipListener

关于配置skip:http://my.oschina.net/xinxingegeya/blog/346244

先看一下StepListener.java 接口的继承关系图:


StepExecutionListener

StepExecutionListener represents the most generic listener for Step execution. It allows for notification before a Step is started and after it has ends, whether it ended normally or failed


SkipListener

ItemReadListener, ItemProcessListener, and ItemWriteListner all provide mechanisms for being notified of errors, but none will inform you that a record has actually been skipped. onWriteError, for example, will be called even if an item is retried and successful. For this reason, there is a separate interface for tracking skipped items:

public interface SkipListener<T,S> extends StepListener {
    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);
}

onSkipInRead will be called whenever an item is skipped while reading. It should be noted that rollbacks may cause the same item to be registered as skipped more than once. onSkipInWrite will be called when an item is skipped while writing. Because the item has been read successfully (and not skipped), it is also provided the item itself as an argument.

SkipListeners and Transactions

One of the most common use cases for a SkipListener is to log out a skipped item, so that another batch process or even human process can be used to evaluate and fix the issue leading to the skip. Because there are many cases in which the original transaction may be rolled back, Spring Batch makes two guarantees:


The appropriate skip method (depending on when the error happened) will only be called once per item.

The SkipListener will always be called just before the transaction is committed. This is to ensure that any transactional resources call by the listener are not rolled back by a failure within the ItemWriter.


ChunkListener

A chunk is defined as the items processed within the scope of a transaction. Committing a transaction, at each commit interval, commits a 'chunk'. A ChunkListener can be useful to perform logic before a chunk begins processing or after a chunk has completed successfully:

public interface ChunkListener extends StepListener {
    void beforeChunk();
    void afterChunk();
}

The beforeChunk method is called after the transaction is started, but before read is called on the ItemReader. Conversely, afterChunk is called after the chunk has been committed (and not at all if there is a rollback).


上面就例举了几个Listener,那么这些listener 都怎么用:以SkipListener 为例,先来了解一下SkipListener 接口:


我们选择一个SkipListenerSupport ,通过继承 它实现我们自定义逻辑的 SkipListener。如下:

MySkipListener.java

package com.lyx.batch3;

import org.springframework.batch.core.listener.SkipListenerSupport;

import com.lyx.batch.People;
import com.lyx.batch.PeopleDESC;

public class MySkipListener extends SkipListenerSupport<People, PeopleDESC> {

	@Override
	public void onSkipInRead(Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInRead(t);
		System.out.println(">>>>>>>>>>>>>>skip in read>>>>>>>>>>>>>>");
	}

	@Override
	public void onSkipInWrite(PeopleDESC item, Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInWrite(item, t);
		System.out.println(">>>>>>>>>>>>>>skip in write>>>>>>>>>>>>>>");
		System.out.println(">>>>=" + item.toString());
	}

	/**
	 * 当processor抛出 skip include包含的异常时
	 */
	@Override
	public void onSkipInProcess(People item, Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInProcess(item, t);
		System.out.println(">>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>");
		System.out.println(">>>>=" + item.toString());
	}
}


以下是配置文件:

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- add people desc job begin -->
	<batch:job id="addPeopleDescJob">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet>
				<batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor"
					writer="addDescPeopleWriter" commit-interval="2" skip-limit="40">
					<batch:skippable-exception-classes>
						<!--batch:include配置允许发生的异常 -->
						<batch:include class="com.lyx.batch.InvalidDataException" />
					</batch:skippable-exception-classes>
					<batch:listeners>
						<!-- 这里可以配置多个listener -->
						<batch:listener ref="sampleSkipListener" />
					</batch:listeners>
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
		<!-- 在job的运行期间,可以监视job -->
		<batch:listeners>
			<batch:listener ref="sampleListener" />
		</batch:listeners>
	</batch:job>
	<!-- add people desc job end -->
	<bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" />
	<bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" />

	<bean id="peopleAddDescReader" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="allowSkipProcessor" class="com.lyx.batch.AllowSkipProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>
	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>


主要的配置为:

<!-- add people desc job begin -->
<batch:job id="addPeopleDescJob">
	<batch:step id="addDescStep" parent="abstractStep">
		<batch:tasklet>
			<batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor"
				writer="addDescPeopleWriter" commit-interval="2" skip-limit="40">
				<batch:skippable-exception-classes>
					<!--batch:include配置允许发生的异常 -->
					<batch:include class="com.lyx.batch.InvalidDataException" />
				</batch:skippable-exception-classes>
				<batch:listeners>
					<!-- 这里可以配置多个listener -->
					<batch:listener ref="sampleSkipListener" />
				</batch:listeners>
			</batch:chunk>
		</batch:tasklet>
	</batch:step>
	<!-- 在job的运行期间,可以监视job -->
	<batch:listeners>
		<batch:listener ref="sampleListener" />
	</batch:listeners>
</batch:job>
<!-- add people desc job end -->
<bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" />
<bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" />


运行:

AppMain12.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试step listener skip listener
 * 
 * @author Lenovo
 *
 */
public class AppMain12 {
	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-exception-listener.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}
}


运行结果:

.........................................................................

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

job success.........

十一月 19, 2014 3:00:06 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

任务正常完成

程序运行时间: 8779ms

总结:通过skip listener可以看到当skip发生时,可以通过listener捕捉到该事件的发生,从而把skip的数据记录下来,以便做进一步的处理。

====================END====================

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 241
博文 566
码字总数 417505
作品 0
朝阳
程序员
私信 提问
加载中

评论(3)

开源中国首席之首席
开源中国首席之首席
沙发
开源中国首席之首席
开源中国首席之首席
看见了
开源中国首席之首席
开源中国首席之首席
很好
Spring Batch_Intercepting Job Execution

Spring BatchIntercepting Job Execution配置JobExecutionListener 关于spring batch skip 的配置请看:http://my.oschina.net/xinxingegeya/blog/346244 在job的运行期间,我们可以捕捉job......

秋风醉了
2014/11/19
0
0
Spring Batch_Configuring a Step for Restart

Spring Batch_Configuring a Step for Restart spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#stepRestart 当一个普通的 job 处于complete ......

秋风醉了
2014/11/14
0
0
Spring Batch_JOB重启机制

Spring Batch_JOB重启机制 在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有...

秋风醉了
2014/11/19
0
0
Spring Batch学习笔记——steps之间共享数据

名词说明: 上下文: 执行: 执行上下文: 案例: 警告:一旦steps共享数据,这些数据就会把这些steps连接起来。努力使steps独立。如果你实在是不能独立他们,才使用下面的技术。你应该把数据...

xhujinjun
2015/04/02
0
0
Spring Batch_Step Flow

Spring Batch学习_Step Flow http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html With the ability to group steps together within an owning job comes the nee......

秋风醉了
2014/10/31
0
0

没有更多内容

加载失败,请刷新页面

加载更多

TiDB 3.0 Beta Release Notes

2019 年 1 月 19 日,TiDB 发布 3.0 Beta 版,对应 master branch 的 TiDB-Ansible。相比 2.1 版本,该版本对系统稳定性、优化器、统计信息以及执行引擎做了很多改进。 TiDB 新特性 支持 Vi...

TiDB
25分钟前
2
0
从拼多多优惠券事件看到的一些反思

本文由云+社区发表 作者:颜国平 摘要:最近几年,电商行业飞速发展,各种创业公司犹如雨后春笋大量涌现,商家通过各种活动形式的补贴来获取用户、培养用户的消费习惯,即将到来的“ 购物狂欢...

腾讯云加社区
26分钟前
4
0
记录一次BUG

1: 请求注册页面时生成一个UUID, 并且将UUID隐藏在页面中并且添加到session中去。 2: 发送短信时获取手机号和UUID,将两者发送至服务器 3: 先判断UUID和session中UUID是否一致。 不一致就...

专业写BUG的程序员
31分钟前
1
0
阿里云漏洞提示:phpMyAdmin <=4.8.1 checkPageValidity函数缺陷可导致GETSHELL

如题: phpMyAdmin <=4.8.1 后台checkPageValidity函数缺陷可导致GETSHELL 即:checkPageValidity函数对外部输入过滤不严,可导致本地包含任意文件。进一步地攻击者可通过注入代码到特定文件...

408582708
31分钟前
1
0
PyTorch可视化理解卷积神经网络

摘要: 神经网络工具像一个黑匣子,无法知道它的中间是如何处理的。本文使用图片加代码的形式讲解CNN网络,并对每层的输出进行可视化,便于初学者理解,可以动手实践下哦! 如今,机器已经能...

阿里云官方博客
43分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部