JdbcPagingItemReader多线程的Step
我们最经常使用的就是 JdbcCursorItemReader,使用游标的方式逐条数据的读取。但是从spring 官方文档我们知道 ,他不是线程安全的。在这里,我们使用 JdbcPagingItemReader从数据库读取数据,并且是分页的读,而且这个类是线程安全的,那么我们就可以使用多线程的Step,从而提高JOB的执行效率。
下面是主要的配置文件:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd
http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">
<!-- 包的扫描 -->
<context:component-scan base-package="com.lyx.batch" />
<bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" />
<batch:step id="abstractStep" abstract="true">
<batch:listeners>
<batch:listener ref="exceptionHandler" />
</batch:listeners>
</batch:step>
<bean id="abstractJdbcPagingItemReader" abstract="true"
class="org.springframework.batch.item.database.JdbcPagingItemReader">
<property name="dataSource" ref="dataSource" />
</bean>
<!-- add people desc job begin -->
<batch:job id="addPeopleDescJob">
<batch:step id="addDescStep" parent="abstractStep">
<batch:tasklet>
<batch:chunk reader="peopleAddDescReader" processor="addDescProcessor"
writer="addDescPeopleWriter" commit-interval="2" />
</batch:tasklet>
</batch:step>
</batch:job>
<!-- add people desc job end -->
<!-- 使用分页的reader begin -->
<bean id="peopleAddDescReader" parent="abstractJdbcPagingItemReader"
scope="step">
<property name="queryProvider">
<bean
class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="selectClause" value="select person_id, first_name, last_name" />
<property name="fromClause" value="from people" />
<property name="whereClause"
value="where ( first_name like :first_name or last_name like :last_name ) " />
<property name="sortKey" value="person_id" />
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="first_name" value="#{jobParameters['first_name']}" />
<entry key="last_name" value="#{jobParameters['last_name']}" />
</map>
</property>
<!-- 配置limit的大小 -->
<property name="pageSize" value="2" />
<property name="rowMapper" ref="peopleRowMapper" />
</bean>
<!-- 使用分页的reader end -->
<bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" />
<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" />
<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />
<!--tomcat jdbc pool数据源配置 -->
<bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource"
destroy-method="close">
<property name="poolProperties">
<bean class="org.apache.tomcat.jdbc.pool.PoolProperties">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url" value="jdbc:mysql://localhost:3306/test" />
<property name="username" value="root" />
<property name="password" value="034039" />
</bean>
</property>
</bean>
<!-- spring batch 配置jobRepository -->
<batch:job-repository id="jobRepository"
data-source="dataSource" transaction-manager="transactionManager"
isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_"
max-varchar-length="1000" />
<!-- spring的事务管理器 -->
<bean id="transactionManager"
class="org.springframework.jdbc.datasource.DataSourceTransactionManager">
<property name="dataSource" ref="dataSource" />
</bean>
<!-- batch luncher -->
<bean id="jobLauncher"
class="org.springframework.batch.core.launch.support.SimpleJobLauncher">
<property name="jobRepository" ref="jobRepository" />
</bean>
</beans>
主要的配置就是:
<!-- 使用分页的reader begin -->
<bean id="peopleAddDescReader" parent="abstractJdbcPagingItemReader"
scope="step">
<property name="queryProvider">
<bean
class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean">
<property name="dataSource" ref="dataSource" />
<property name="selectClause" value="select person_id, first_name, last_name" />
<property name="fromClause" value="from people" />
<property name="whereClause"
value="where ( first_name like :first_name or last_name like :last_name ) " />
<property name="sortKey" value="person_id" />
</bean>
</property>
<property name="parameterValues">
<map>
<entry key="first_name" value="#{jobParameters['first_name']}" />
<entry key="last_name" value="#{jobParameters['last_name']}" />
</map>
</property>
<!-- 配置limit的大小 -->
<property name="pageSize" value="2" />
<property name="rowMapper" ref="peopleRowMapper" />
</bean>
<!-- 使用分页的reader end -->
其他类的从前面的文章找出处,下面是我为了调试与前面不同的类
PeopleRowMapper.java
package com.lyx.batch;
import java.sql.ResultSet;
import java.sql.SQLException;
import org.springframework.jdbc.core.RowMapper;
public class PeopleRowMapper implements RowMapper<People> {
public People mapRow(ResultSet rs, int rowNum) throws SQLException {
People p = new People();
System.out.println("-----------------------person_id-"
+ rs.getInt("person_id"));
p.setId(rs.getInt("person_id"));
p.setFirstName(rs.getString("first_name"));
p.setLastName(rs.getString("last_name"));
return p;
}
}
运行程序AppMain14.java
package com.lyx.batch;
import org.springframework.batch.core.ExitStatus;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.JobExecution;
import org.springframework.batch.core.JobParametersBuilder;
import org.springframework.batch.core.JobParametersInvalidException;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException;
import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException;
import org.springframework.batch.core.repository.JobRestartException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.support.ClassPathXmlApplicationContext;
/**
* 测试 使用分页的 reader
*
* @author Lenovo
*
*/
public class AppMain14 {
public static void main(String[] args)
throws JobExecutionAlreadyRunningException, JobRestartException,
JobInstanceAlreadyCompleteException, JobParametersInvalidException {
long startTime = System.currentTimeMillis(); // 获取开始时间
@SuppressWarnings("resource")
ApplicationContext context = new ClassPathXmlApplicationContext(
new String[] { "classpath:spring-batch-paging.xml" });
JobParametersBuilder jobParametersBuilder = new JobParametersBuilder();
jobParametersBuilder.addString("first_name", "%JOHN%");
jobParametersBuilder.addString("last_name", "%DOE%");
Job job = (Job) context.getBean("addPeopleDescJob");
JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher");
JobExecution result = launcher.run(job,
jobParametersBuilder.toJobParameters());
ExitStatus es = result.getExitStatus();
if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) {
System.out.println("任务正常完成");
} else {
System.out.println("任务失败,exitCode=" + es.getExitCode());
}
long endTime = System.currentTimeMillis(); // 获取结束时间
System.out.println("程序运行时间: " + (endTime - startTime) + "ms");
}
}
运行结果:
-----------------------person_id-157
-----------------------person_id-158
process people desc
process people desc
write people desc
write people desc
任务正常完成
程序运行时间: 11929ms
十一月 22, 2014 12:29:50 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]
最后是成功了。其实更重要的是JdbcPagingItemReader 的实现方式和源码。为什么他是线程安全的,为什么他能分页读,这才是我们最终关心的。
这里我们使用的还是单线程的方式运行的该job ,下面我们来配置多线程的step。配置很简单。就是配置一个异步的spring task executor,使用该异步 task executor 来运行我们的job。
看如下的配置:
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" />
<!-- add people desc job begin -->
<batch:job id="addPeopleDescJob">
<batch:step id="addDescStep" parent="abstractStep">
<batch:tasklet task-executor="taskExecutor">
<batch:chunk reader="peopleAddDescReader" processor="addDescProcessor"
writer="addDescPeopleWriter" commit-interval="2" />
</batch:tasklet>
</batch:step>
</batch:job>
<!-- add people desc job end -->
这里的 taskExecutor 就是一个异步的 task executor。
下面运行一下多线程的step。运行结果:
-----------------------person_id-157
-----------------------person_id-158
process people desc
process people desc
write people desc
write people desc
十一月 22, 2014 1:01:35 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]
任务正常完成
程序运行时间: 8577ms
成功,好的,你是否注意到和上面的单线程的step 比,是不是程序运行的时间要少了。
关于 JdbcPagingItemReader 的实现方式和其线程安全性,如何分页,JdbcPagingItemReader的分页策略我们在下面文章道来。
==============END==============
<bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />
这两个类怎么没有?
引用来自“沈浩z”的评论
<bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" /><bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" />
这两个类怎么没有?