文档章节

Spring Batch_Intercepting Step Execution

秋风醉了
 秋风醉了
发布于 2014/11/19 15:11
字数 1307
阅读 444
收藏 0
点赞 0
评论 3

Spring Batch_Intercepting Step Execution_配置SkipListener

关于配置skip:http://my.oschina.net/xinxingegeya/blog/346244

先看一下StepListener.java 接口的继承关系图:


StepExecutionListener

StepExecutionListener represents the most generic listener for Step execution. It allows for notification before a Step is started and after it has ends, whether it ended normally or failed


SkipListener

ItemReadListener, ItemProcessListener, and ItemWriteListner all provide mechanisms for being notified of errors, but none will inform you that a record has actually been skipped. onWriteError, for example, will be called even if an item is retried and successful. For this reason, there is a separate interface for tracking skipped items:

public interface SkipListener<T,S> extends StepListener {
    void onSkipInRead(Throwable t);
    void onSkipInProcess(T item, Throwable t);
    void onSkipInWrite(S item, Throwable t);
}

onSkipInRead will be called whenever an item is skipped while reading. It should be noted that rollbacks may cause the same item to be registered as skipped more than once. onSkipInWrite will be called when an item is skipped while writing. Because the item has been read successfully (and not skipped), it is also provided the item itself as an argument.

SkipListeners and Transactions

One of the most common use cases for a SkipListener is to log out a skipped item, so that another batch process or even human process can be used to evaluate and fix the issue leading to the skip. Because there are many cases in which the original transaction may be rolled back, Spring Batch makes two guarantees:


The appropriate skip method (depending on when the error happened) will only be called once per item.

The SkipListener will always be called just before the transaction is committed. This is to ensure that any transactional resources call by the listener are not rolled back by a failure within the ItemWriter.


ChunkListener

A chunk is defined as the items processed within the scope of a transaction. Committing a transaction, at each commit interval, commits a 'chunk'. A ChunkListener can be useful to perform logic before a chunk begins processing or after a chunk has completed successfully:

public interface ChunkListener extends StepListener {
    void beforeChunk();
    void afterChunk();
}

The beforeChunk method is called after the transaction is started, but before read is called on the ItemReader. Conversely, afterChunk is called after the chunk has been committed (and not at all if there is a rollback).


上面就例举了几个Listener,那么这些listener 都怎么用:以SkipListener 为例,先来了解一下SkipListener 接口:


我们选择一个SkipListenerSupport ,通过继承 它实现我们自定义逻辑的 SkipListener。如下:

MySkipListener.java

package com.lyx.batch3;

import org.springframework.batch.core.listener.SkipListenerSupport;

import com.lyx.batch.People;
import com.lyx.batch.PeopleDESC;

public class MySkipListener extends SkipListenerSupport<People, PeopleDESC> {

	@Override
	public void onSkipInRead(Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInRead(t);
		System.out.println(">>>>>>>>>>>>>>skip in read>>>>>>>>>>>>>>");
	}

	@Override
	public void onSkipInWrite(PeopleDESC item, Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInWrite(item, t);
		System.out.println(">>>>>>>>>>>>>>skip in write>>>>>>>>>>>>>>");
		System.out.println(">>>>=" + item.toString());
	}

	/**
	 * 当processor抛出 skip include包含的异常时
	 */
	@Override
	public void onSkipInProcess(People item, Throwable t) {
		// TODO Auto-generated method stub
		super.onSkipInProcess(item, t);
		System.out.println(">>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>");
		System.out.println(">>>>=" + item.toString());
	}
}


以下是配置文件:

<beans xmlns="http://www.springframework.org/schema/beans"
	xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
	xmlns:context="http://www.springframework.org/schema/context"
	xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
		http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
		http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
	<!-- 包的扫描 -->
	<context:component-scan base-package="com.lyx.batch" />

	<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />

	<batch:step id="abstractStep" abstract="true">
		<batch:listeners>
			<batch:listener ref="exceptionHandler" />
		</batch:listeners>
	</batch:step>
	<bean id="abstractCursorReader" abstract="true"
		class="org.springframework.batch.item.database.JdbcCursorItemReader">
		<property name="dataSource" ref="dataSource" />
	</bean>

	<!-- add people desc job begin -->
	<batch:job id="addPeopleDescJob">
		<batch:step id="addDescStep" parent="abstractStep">
			<batch:tasklet>
				<batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor"
					writer="addDescPeopleWriter" commit-interval="2" skip-limit="40">
					<batch:skippable-exception-classes>
						<!--batch:include配置允许发生的异常 -->
						<batch:include class="com.lyx.batch.InvalidDataException" />
					</batch:skippable-exception-classes>
					<batch:listeners>
						<!-- 这里可以配置多个listener -->
						<batch:listener ref="sampleSkipListener" />
					</batch:listeners>
				</batch:chunk>
			</batch:tasklet>
		</batch:step>
		<!-- 在job的运行期间,可以监视job -->
		<batch:listeners>
			<batch:listener ref="sampleListener" />
		</batch:listeners>
	</batch:job>
	<!-- add people desc job end -->
	<bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" />
	<bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" />

	<bean id="peopleAddDescReader" parent="abstractCursorReader"
		scope="step">
		<property name="sql">
			<value><![CDATA[select first_name ,last_name from people where 
			first_name like ? or last_name like ?]]></value>
		</property>
		<property name="rowMapper" ref="peopleRowMapper" />
		<property name="preparedStatementSetter" ref="preparedStatementSetter" />
		<property name="fetchSize" value="20" />
	</bean>
	<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
	<bean id="preparedStatementSetter" class="com.lyx.batch.PeoplePreparedStatementSetter" />
	<bean id="allowSkipProcessor" class="com.lyx.batch.AllowSkipProcessor" />
	<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />

	<!--tomcat jdbc pool数据源配置 -->
	<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
		destroy-method="close">
		<property name="poolProperties">
			<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
				<property name="driverClassName" value="com.mysql.jdbc.Driver" />
				<property name="url" value="jdbc:mysql://localhost:3306/test" />
				<property name="username" value="root" />
				<property name="password" value="034039" />
			</bean>
		</property>
	</bean>

	<!-- spring batch 配置jobRepository -->
	<batch:job-repository id="jobRepository"
		data-source="dataSource" transaction-manager="transactionManager"
		isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
		max-varchar-length="1000" />
	<!-- spring的事务管理器 -->
	<bean id="transactionManager"
		class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
		<property name="dataSource" ref="dataSource" />
	</bean>
	<!-- batch luncher -->
	<bean id="jobLauncher"
		class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
		<property name="jobRepository" ref="jobRepository" />
	</bean>
</beans>


主要的配置为:

<!-- add people desc job begin -->
<batch:job id="addPeopleDescJob">
	<batch:step id="addDescStep" parent="abstractStep">
		<batch:tasklet>
			<batch:chunk reader="peopleAddDescReader" processor="allowSkipProcessor"
				writer="addDescPeopleWriter" commit-interval="2" skip-limit="40">
				<batch:skippable-exception-classes>
					<!--batch:include配置允许发生的异常 -->
					<batch:include class="com.lyx.batch.InvalidDataException" />
				</batch:skippable-exception-classes>
				<batch:listeners>
					<!-- 这里可以配置多个listener -->
					<batch:listener ref="sampleSkipListener" />
				</batch:listeners>
			</batch:chunk>
		</batch:tasklet>
	</batch:step>
	<!-- 在job的运行期间,可以监视job -->
	<batch:listeners>
		<batch:listener ref="sampleListener" />
	</batch:listeners>
</batch:job>
<!-- add people desc job end -->
<bean id="sampleListener" class="com.lyx.batch3.SampleJobExecutionListener" />
<bean id="sampleSkipListener" class="com.lyx.batch3.MySkipListener" />


运行:

AppMain12.java

package com.lyx.batch;

import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;

/**
 * 测试step listener skip listener
 * 
 * @author Lenovo
 *
 */
public class AppMain12 {
	public static void main(String[] args)
			throws JobExecutionAlreadyRunningException, JobRestartException,
			JobInstanceAlreadyCompleteException, JobParametersInvalidException {

		long startTime = System.currentTimeMillis(); // 获取开始时间

		@SuppressWarnings("resource")
		ApplicationContext context = new ClassPathXmlApplicationContext(
				new String[] { "classpath:spring-batch-exception-listener.xml" });
		JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
		Job job = (Job) context.getBean("addPeopleDescJob");
		JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
		JobExecution result = launcher.run(job,
				jobParametersBuilder.toJobParameters());
		ExitStatus es = result.getExitStatus();
		if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
			System.out.println("任务正常完成");
		} else {
			System.out.println("任务失败,exitCode=" + es.getExitCode());
		}

		long endTime = System.currentTimeMillis(); // 获取结束时间
		System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
	}
}


运行结果:

.........................................................................

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

process people desc

skip invalid data!!!!!!!!!!!!!!!!!!!!!!!!

>>>>>>>>>>>>>>skip in process>>>>>>>>>>>>>>

>>>>=firstName: lyx, lastName: lyx

job success.........

十一月 19, 2014 3:00:06 下午 org.springframework.batch.core.launch.support.SimpleJobLauncher run

信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{}] and the following status: [COMPLETED]

任务正常完成

程序运行时间: 8779ms

总结:通过skip listener可以看到当skip发生时,可以通过listener捕捉到该事件的发生,从而把skip的数据记录下来,以便做进一步的处理。

====================END====================

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 229
博文 577
码字总数 407134
作品 0
朝阳
程序员
加载中

评论(3)

开源中国首席之首席
开源中国首席之首席
沙发
开源中国首席之首席
开源中国首席之首席
看见了
开源中国首席之首席
开源中国首席之首席
很好
Spring Batch_Intercepting Job Execution

Spring BatchIntercepting Job Execution配置JobExecutionListener 关于spring batch skip 的配置请看:http://my.oschina.net/xinxingegeya/blog/346244 在job的运行期间,我们可以捕捉job......

秋风醉了
2014/11/19
0
0
Spring Batch_Configuring a Step for Restart

Spring Batch_Configuring a Step for Restart spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html#stepRestart 当一个普通的 job 处于complete ......

秋风醉了
2014/11/14
0
0
Spring Batch_JOB重启机制

Spring Batch_JOB重启机制 在这一篇文章 对于restart做了试验,http://my.oschina.net/xinxingegeya/blog/344817在这片文章里,我们只是当job成功时,重启了job,对于job失败后,重启job有...

秋风醉了
2014/11/19
0
0
Spring Batch_JobParameters

Spring BatchJobParameters spring batch的JobParameters是设置job运行的参数,同时也具有标志job的作用,就是判断两个job是不是同一个job( "how is one JobInstance distinguished from a...

秋风醉了
2014/11/12
0
0
Spring Batch学习笔记——steps之间共享数据

名词说明: 上下文: 执行: 执行上下文: 案例: 警告:一旦steps共享数据,这些数据就会把这些steps连接起来。努力使steps独立。如果你实在是不能独立他们,才使用下面的技术。你应该把数据...

xhujinjun
2015/04/02
0
0
Spring Batch_Multi-threaded Step

Spring Batch_Multi-threaded Step spring官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html The simplest way to start parallel processing is to ad......

秋风醉了
2014/11/13
0
0
Spring Batch_Step Flow

Spring Batch学习_Step Flow http://docs.spring.io/spring-batch/trunk/reference/html/configureStep.html With the ability to group steps together within an owning job comes the nee......

秋风醉了
2014/10/31
0
0
Spring Batch_Parallel Steps

Spring BatchParallel Steps使用并行的Step spring 官方文档:http://docs.spring.io/spring-batch/trunk/reference/html/scalability.html#scalabilityParallelSteps As long as the applic......

秋风醉了
2014/11/14
0
2
Spring Batch_JOB执行流程分析

Spring Batch_JOB执行流程分析 基于以下job的配置, debug代码 这是启动job的方法,如下是方法的具体实现: SimpleJobLauncher.java run方法的具体实现(删除了部分代码) 看taskExecutor.ex...

秋风醉了
2014/11/21
0
1
Spring Batch_实现Tasklet

Spring Batch_实现Tasklet Chunk-oriented processing is not the only way to process in a Step. What if a Step must consist as a simple stored procedure call? You could implement ......

秋风醉了
2014/12/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

idea tomcat 远程调试

tomcat 配置 编辑文件${tomcat_home}/bin/catalina.sh,在文件开头添加如下代码。    CATALINA_OPTS="-Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=7829" Idea端配......

qwfys
今天
1
0
遍历目录下的文件每250M打包一个文件

#!/usr/bin/env python # -*- utf-8 -*- # @Time : 2018/7/20 0020 下午 10:16 # @Author : 陈元 # @Email : abcmeabc@163.com # @file : tarFile.py import os import tarfile import thr......

寻爱的小草
今天
1
0
expect同步文件&expect指定host和要同步的文件&构建文件分发系统&批量远程执行命令

20.31 expect脚本同步文件 expect通过与rsync结合,可以在一台机器上把文件自动同步到多台机器上 编写脚本 [root@linux-5 ~]# cd /usr/local/sbin[root@linux-5 sbin]# vim 4.expect#!/...

影夜Linux
今天
1
0
SpringBoot | 第九章:Mybatis-plus的集成和使用

前言 本章节开始介绍数据访问方面的相关知识点。对于后端开发者而言,和数据库打交道是每天都在进行的,所以一个好用的ORM框架是很有必要的。目前,绝大部分公司都选择MyBatis框架作为底层数...

oKong
今天
12
0
win10 上安装解压版mysql

1.效果 2. 下载MySQL 压缩版 下载地址: https://downloads.mysql.com/archives/community/ 3. 配置 3.1 将下载的文件解压到合适的位置 我最终将myql文件 放在:D:\develop\mysql 最终放的位...

Lucky_Me
今天
2
0
linux服务器修改mtu值优化cpu

一、jumbo frames 相关 1、什么是jumbo frames Jumbo frames 是指比标准Ethernet Frames长的frame,即比1518/1522 bit大的frames,Jumbo frame的大小是每个设备厂商规定的,不属于IEEE标准;...

问题终结者
今天
1
0
expect脚本同步文件expect脚本指定host和要同步的文件 构建文件分发系统批量远程执行命令

expect脚本同步文件 在一台机器上把文件同步到多台机器上 自动同步文件 vim 4.expect [root@yong-01 sbin]# vim 4.expect#!/usr/bin/expectset passwd "20655739"spawn rsync -av ro...

lyy549745
今天
1
0
36.rsync下 日志 screen

10.32/10.33 rsync通过服务同步 10.34 linux系统日志 10.35 screen工具 10.32/10.33 rsync通过服务同步: rsync还可以通过服务的方式同步。那需要开启一个服务,他的架构是cs架构,客户端服务...

王鑫linux
今天
1
0
matplotlib 保存图片时的参数

简单绘图 import matplotlib.pyplot as pltplt.plot(range(10)) 保存为csv格式,放大后依然很清晰 plt.savefig('t1.svg') 普通保存放大后会有点模糊文件大小20多k plt.savefig('t5.p...

阿豪boy
今天
3
0
java 8 复合Lambda 表达式

comparator 比较器复合 //排序Comparator.comparing(Apple::getWeight);List<Apple> list = Stream.of(new Apple(1, "a"), new Apple(2, "b"), new Apple(3, "c")) .collect(......

Canaan_
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部