文档章节

Spring Batch_Intercepting Step Execution

秋风醉了
 秋风醉了
发布于 2014/11/19 15:11
字数 1307
阅读 713
收藏 0

精选30+云产品,助力企业轻松上云!>>>

Spring Batch_Intercepting Step Execution_配置SkipListener

关于配置skip:http://my.oschina.net/xinxingegeya/blog/346244

先看一下StepListener.java 接口的继承关系图:


StepExecutionListener

StepExecutionListener represents the most generic listener for Step execution. It allows for notification before a Step is started and after it has ends, whether it ended normally or failed


SkipListener

ItemReadListener, ItemProcessListener, and ItemWriteListner all provide mechanisms for being notified of errors, but none will inform you that a record has actually been skipped. onWriteError, for example, will be called even if an item is retried and successful. For this reason, there is a separate interface for tracking skipped items:

public interface SkipListener<T,S> extends StepListener {
    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);
}

onSkipInRead will be called whenever an item is skipped while reading. It should be noted that rollbacks may cause the same item to be registered as skipped more than once. onSkipInWrite will be called when an item is skipped while writing. Because the item has been read successfully (and not skipped), it is also provided the item itself as an argument.

SkipListeners and Transactions

One of the most common use cases for a SkipListener is to log out a skipped item, so that another batch process or even human process can be used to evaluate and fix the issue leading to the skip. Because there are many cases in which the original transaction may be rolled back, Spring Batch makes two guarantees:


The appropriate skip method (depending on when the error happened) will only be called once per item.

The SkipListener will always be called just before the transaction is committed. This is to ensure that any transactional resources call by the listener are not rolled back by a failure within the ItemWriter.


ChunkListener

A chunk is defined as the items processed within the scope of a transaction. Committing a transaction, at each commit interval, commits a 'chunk'. A ChunkListener can be useful to perform logic before a chunk begins processing or after a chunk has completed successfully:

public interface ChunkListener extends StepListener {
    void beforeChunk();
    void afterChunk();
}

The beforeChunk method is called after the transaction is started, but before read is called on the ItemReader. Conversely, afterChunk is called after the chunk has been committed (and not at all if there is a rollback).


上面就例举了几个Listener,那么这些listener 都怎么用:以SkipListener 为例,先来了解一下SkipListener 接口:


我们选择一个SkipListenerSupport ,通过继承 它实现我们自定义逻辑的 SkipListener。如下:

MySkipListener.java

package com.lyx.batch3;

import org.springframework.batch.core.listener.SkipListenerSupport;

import com.lyx.batch.People;
import com.lyx.batch.PeopleDESC;

public class MySkipListener extends SkipListenerSupport<People, PeopleDESC> {

	@Override
	public void onSkipInRead(Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInRead(t);
		System.out.println(">>>>>>>>>>>>>>skip in read>>>>>>>>>>>>>>");
	}

	@Override
	public void onSkipInWrite(PeopleDESC item, Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInWrite(item, t);
		System.out.println(">>>>>>>>>>>>>>skip in write>>>>>>>>>>>>>>");
		System.out.println(">>>>=" + item.toString());
	}

	/**
	 * 当processor抛出 skip include包含的异常时
	 */
	@Override
	public void onSkipInProcess(People item, Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInProcess(item, t);
		System.out.println(">>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>");
		System.out.println(">>>>=" + item.toString());
	}
}


以下是配置文件:

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- add people desc job begin -->
	<batch:job id="addPeopleDescJob">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet>
				<batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor"
					writer="addDescPeopleWriter" commit-interval="2" skip-limit="40">
					<batch:skippable-exception-classes>
						<!--batch:include配置允许发生的异常 -->
						<batch:include class="com.lyx.batch.InvalidDataException" />
					</batch:skippable-exception-classes>
					<batch:listeners>
						<!-- 这里可以配置多个listener -->
						<batch:listener ref="sampleSkipListener" />
					</batch:listeners>
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
		<!-- 在job的运行期间,可以监视job -->
		<batch:listeners>
			<batch:listener ref="sampleListener" />
		</batch:listeners>
	</batch:job>
	<!-- add people desc job end -->
	<bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" />
	<bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" />

	<bean id="peopleAddDescReader" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="allowSkipProcessor" class="com.lyx.batch.AllowSkipProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>
	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>


主要的配置为:

<!-- add people desc job begin -->
<batch:job id="addPeopleDescJob">
	<batch:step id="addDescStep" parent="abstractStep">
		<batch:tasklet>
			<batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor"
				writer="addDescPeopleWriter" commit-interval="2" skip-limit="40">
				<batch:skippable-exception-classes>
					<!--batch:include配置允许发生的异常 -->
					<batch:include class="com.lyx.batch.InvalidDataException" />
				</batch:skippable-exception-classes>
				<batch:listeners>
					<!-- 这里可以配置多个listener -->
					<batch:listener ref="sampleSkipListener" />
				</batch:listeners>
			</batch:chunk>
		</batch:tasklet>
	</batch:step>
	<!-- 在job的运行期间,可以监视job -->
	<batch:listeners>
		<batch:listener ref="sampleListener" />
	</batch:listeners>
</batch:job>
<!-- add people desc job end -->
<bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" />
<bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" />


运行:

AppMain12.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试step listener skip listener
 * 
 * @author Lenovo
 *
 */
public class AppMain12 {
	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-exception-listener.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}
}


运行结果:

.........................................................................

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

job success.........

十一月 19, 2014 3:00:06 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

任务正常完成

程序运行时间: 8779ms

总结:通过skip listener可以看到当skip发生时,可以通过listener捕捉到该事件的发生,从而把skip的数据记录下来,以便做进一步的处理。

====================END====================

秋风醉了
粉丝 253
博文 530
码字总数 404430
作品 0
朝阳
程序员
私信 提问
加载中
此博客有 3 条评论,请先登录后再查看。
SpringBatch系列入门之Tasklet

1、概要 Spring Batch是一个开源的批量处理框架,Spring Batch提供了类和API来读写资源,管理事务,作业处理统计、重启、以及分区技术处理大量数据。在Spring Batch中,一个作业任务可以由多...

稻草鸟人
06/02
9
0
Spring Batch_Intercepting Job Execution

Spring BatchIntercepting Job Execution配置JobExecutionListener 关于spring batch skip 的配置请看:http://my.oschina.net/xinxingegeya/blog/346244 在job的运行期间,我们可以捕捉job......

秋风醉了
2014/11/19
645
0
Spring Batch(4)——Item概念及使用代码

在 批处理概念 中介绍一个标准的批处理分为 Job 和 Step。本文将结合代码介绍在中、、的实际使用。 Reader 是指从各种各样的外部输入中获取数据,框架为获取各种类型的文件已经预定义了常规的...

随风溜达的向日葵
2019/07/08
1.4K
0
Spring Batch_JOB重启机制

Spring Batch_JOB重启机制 在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有...

秋风醉了
2014/11/19
4.5K
0
Spring Batch_Configuring a Step for Restart

Spring Batch_Configuring a Step for Restart spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#stepRestart 当一个普通的 job 处于complete ......

秋风醉了
2014/11/14
1.9K
0

没有更多内容

加载失败,请刷新页面

加载更多

丛书【数据库面试笔试宝典】已在京东、淘宝、天猫等各大电子商城销售

丛书【数据库面试笔试宝典】已在京东、淘宝和天猫开售,一共5本,丛书豆瓣连接:https://book.douban.com/series/46055。 数据库面试笔试系列书籍一共包含5本,分别为《Oracle程序员面试笔试...

小麦苗
06/30
29
0
代码优化-多态代替IF条件判断

场景描述 在开发的场景中,常常会遇到打折的业务需求,每个用户对应的等级,他们的打折情况也是不一样的。例如普通会员打9折,青铜会员打8.5折,黄金会员打8折等等。在一般开发中最简单的就是...

小码农薛尧
2019/12/01
24
0
面试官:CAP都搞不清楚,别跟我说你懂微服务!

CAP理论指在一个分布式系统中,一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)这三个要素最多只能同时实现两点,不可能三者兼顾。 让我们想象一个简单的...

lucifer210
07/08
27
0
聊起车联网技术时,我们可能想说什么

1981年世界上第一个车载导航系统被集成在Honda汽车上,30多年来,随着芯片、通信和互联网技术的快速发展,汽车软件发生着巨大的变化。 随着越来越多的传感器,摄像头,自动驾驶等新技术被集成...

曲奇泡芙
2019/03/10
15
0
开篇初衷

本科的时候看过一本书,叫做《读大学,究竟读什么》,里面的开篇是这样说的:“大学的路上有很多陷阱与诱惑,这些坑有人掉进去之后又爬了出来,觉得很不好意思,然后走了,所以不会在这个坑旁...

pcwlkpzc
2019/01/12
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部