首页 > 代码库 > Spring Batch_使用JdbcPagingItemReader_多线程的Step
Spring Batch_使用JdbcPagingItemReader_多线程的Step
Spring Batch_使用JdbcPagingItemReader_多线程的Step
我们最经常使用的就是 JdbcCursorItemReader,使用游标的方式 逐条数据的读取。但是 从spring 官方文档我们知道 ,他不是线程安全的。在这里,我们使用 JdbcPagingItemReader
从数据库读取数据,并且是分页的读,而且这个类是线程安全的,那么我们就可以使用多线程的Step,从而提高 JOB 的执行效率。
下面是主要的配置文件:
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:context="http://www.springframework.org/schema/context" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.0.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd"> <!-- 包的扫描 --> <context:component-scan base-package="com.lyx.batch" /> <bean id="exceptionHandler" class="com.lyx.batch.ExceptionListener" /> <batch:step id="abstractStep" abstract="true"> <batch:listeners> <batch:listener ref="exceptionHandler" /> </batch:listeners> </batch:step> <bean id="abstractJdbcPagingItemReader" abstract="true" class="org.springframework.batch.item.database.JdbcPagingItemReader"> <property name="dataSource" ref="dataSource" /> </bean> <!-- add people desc job begin --> <batch:job id="addPeopleDescJob"> <batch:step id="addDescStep" parent="abstractStep"> <batch:tasklet> <batch:chunk reader="peopleAddDescReader" processor="addDescProcessor" writer="addDescPeopleWriter" commit-interval="2" /> </batch:tasklet> </batch:step> </batch:job> <!-- add people desc job end --> <!-- 使用分页的reader begin --> <bean id="peopleAddDescReader" parent="abstractJdbcPagingItemReader" scope="step"> <property name="queryProvider"> <bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="selectClause" value="http://www.mamicode.com/select person_id, first_name, last_name" /> <property name="fromClause" value="http://www.mamicode.com/from people" /> <property name="whereClause" value="http://www.mamicode.com/where ( first_name like :first_name or last_name like :last_name ) " /> <property name="sortKey" value="http://www.mamicode.com/person_id" /> </bean> </property> <property name="parameterValues"> <map> <entry key="first_name" value="http://www.mamicode.com/#{jobParameters[‘first_name‘]}" /> <entry key="last_name" value="http://www.mamicode.com/#{jobParameters[‘last_name‘]}" /> </map> </property> <!-- 配置limit的大小 --> <property name="pageSize" value="http://www.mamicode.com/2" /> <property name="rowMapper" ref="peopleRowMapper" /> </bean> <!-- 使用分页的reader end --> <bean id="peopleRowMapper" class="com.lyx.batch.PeopleRowMapper" /> <bean id="addDescProcessor" class="com.lyx.batch.AddPeopleDescProcessor" /> <bean id="addDescPeopleWriter" class="com.lyx.batch.AddDescPeopleWriter" /> <!--tomcat jdbc pool数据源配置 --> <bean id="dataSource" class="org.apache.tomcat.jdbc.pool.DataSource" destroy-method="close"> <property name="poolProperties"> <bean class="org.apache.tomcat.jdbc.pool.PoolProperties"> <property name="driverClassName" value="http://www.mamicode.com/com.mysql.jdbc.Driver" /> <property name="url" value="http://www.mamicode.com/jdbc:mysql://localhost:3306/test" /> <property name="username" value="http://www.mamicode.com/root" /> <property name="password" value="http://www.mamicode.com/034039" /> </bean> </property> </bean> <!-- spring batch 配置jobRepository --> <batch:job-repository id="jobRepository" data-source="dataSource" transaction-manager="transactionManager" isolation-level-for-create="REPEATABLE_READ" table-prefix="BATCH_" max-varchar-length="1000" /> <!-- spring的事务管理器 --> <bean id="transactionManager" class="org.springframework.jdbc.datasource.DataSourceTransactionManager"> <property name="dataSource" ref="dataSource" /> </bean> <!-- batch luncher --> <bean id="jobLauncher" class="org.springframework.batch.core.launch.support.SimpleJobLauncher"> <property name="jobRepository" ref="jobRepository" /> </bean> </beans>
主要的配置就是:
<!-- 使用分页的reader begin --> <bean id="peopleAddDescReader" parent="abstractJdbcPagingItemReader" scope="step"> <property name="queryProvider"> <bean class="org.springframework.batch.item.database.support.SqlPagingQueryProviderFactoryBean"> <property name="dataSource" ref="dataSource" /> <property name="selectClause" value="http://www.mamicode.com/select person_id, first_name, last_name" /> <property name="fromClause" value="http://www.mamicode.com/from people" /> <property name="whereClause" value="http://www.mamicode.com/where ( first_name like :first_name or last_name like :last_name ) " /> <property name="sortKey" value="http://www.mamicode.com/person_id" /> </bean> </property> <property name="parameterValues"> <map> <entry key="first_name" value="http://www.mamicode.com/#{jobParameters[‘first_name‘]}" /> <entry key="last_name" value="http://www.mamicode.com/#{jobParameters[‘last_name‘]}" /> </map> </property> <!-- 配置limit的大小 --> <property name="pageSize" value="http://www.mamicode.com/2" /> <property name="rowMapper" ref="peopleRowMapper" /> </bean> <!-- 使用分页的reader end -->
其他类的从前面的文章找出处,下面是我为了调试 与前面不同的类
PeopleRowMapper.java
package com.lyx.batch; import java.sql.ResultSet; import java.sql.SQLException; import org.springframework.jdbc.core.RowMapper; public class PeopleRowMapper implements RowMapper<People> { public People mapRow(ResultSet rs, int rowNum) throws SQLException { People p = new People(); System.out.println("-----------------------person_id-" + rs.getInt("person_id")); p.setId(rs.getInt("person_id")); p.setFirstName(rs.getString("first_name")); p.setLastName(rs.getString("last_name")); return p; } }
运行程序
AppMain14.java
package com.lyx.batch; import org.springframework.batch.core.ExitStatus; import org.springframework.batch.core.Job; import org.springframework.batch.core.JobExecution; import org.springframework.batch.core.JobParametersBuilder; import org.springframework.batch.core.JobParametersInvalidException; import org.springframework.batch.core.launch.JobLauncher; import org.springframework.batch.core.repository.JobExecutionAlreadyRunningException; import org.springframework.batch.core.repository.JobInstanceAlreadyCompleteException; import org.springframework.batch.core.repository.JobRestartException; import org.springframework.context.ApplicationContext; import org.springframework.context.support.ClassPathXmlApplicationContext; /** * 测试 使用分页的 reader * * @author Lenovo * */ public class AppMain14 { public static void main(String[] args) throws JobExecutionAlreadyRunningException, JobRestartException, JobInstanceAlreadyCompleteException, JobParametersInvalidException { long startTime = System.currentTimeMillis(); // 获取开始时间 @SuppressWarnings("resource") ApplicationContext context = new ClassPathXmlApplicationContext( new String[] { "classpath:spring-batch-paging.xml" }); JobParametersBuilder jobParametersBuilder = new JobParametersBuilder(); jobParametersBuilder.addString("first_name", "%JOHN%"); jobParametersBuilder.addString("last_name", "%DOE%"); Job job = (Job) context.getBean("addPeopleDescJob"); JobLauncher launcher = (JobLauncher) context.getBean("jobLauncher"); JobExecution result = launcher.run(job, jobParametersBuilder.toJobParameters()); ExitStatus es = result.getExitStatus(); if (es.getExitCode().equals(ExitStatus.COMPLETED.getExitCode())) { System.out.println("任务正常完成"); } else { System.out.println("任务失败,exitCode=" + es.getExitCode()); } long endTime = System.currentTimeMillis(); // 获取结束时间 System.out.println("程序运行时间: " + (endTime - startTime) + "ms"); } }
运行结果:
-----------------------person_id-157
-----------------------person_id-158
process people desc
process people desc
write people desc
write people desc
任务正常完成
程序运行时间: 11929ms
十一月 22, 2014 12:29:50 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]
最后是成功了。其实更重要的是JdbcPagingItemReader 的实现方式和源码。为什么 他是线程安全的,为什么他能分页读,这才是我们最终关心的。这里先不分析,到下篇文章再说。
这里我们使用的还是单线程的方式运行的该 job ,下面我们来配置多线程的 step。配置很简单。就是配置一个异步的spring task executor,使用该 异步 task executor 来运行我们的
job。。
看如下的配置:
<bean id="taskExecutor" class="org.springframework.core.task.SimpleAsyncTaskExecutor" /> <!-- add people desc job begin --> <batch:job id="addPeopleDescJob"> <batch:step id="addDescStep" parent="abstractStep"> <batch:tasklet task-executor="taskExecutor"> <batch:chunk reader="peopleAddDescReader" processor="addDescProcessor" writer="addDescPeopleWriter" commit-interval="2" /> </batch:tasklet> </batch:step> </batch:job> <!-- add people desc job end -->
这里的 taskExecutor 就是一个异步的 task executor,具体怎么实现,请看源码。。
下面 运行一下 多线程 的step。
运行结果:
-----------------------person_id-157
-----------------------person_id-158
process people desc
process people desc
write people desc
write people desc
十一月 22, 2014 1:01:35 上午 org.springframework.batch.core.launch.support.SimpleJobLauncher run
信息: Job: [FlowJob: [name=addPeopleDescJob]] completed with the following parameters: [{first_name=%JOHN%, last_name=%DOE%}] and the following status: [COMPLETED]
任务正常完成
程序运行时间: 8577ms
成功,好的,你是否注意到 和上面的单线程的step 比,是不是程序运行的时间要少了。
关于 JdbcPagingItemReader 的实现方式和其线程安全性,如何分页,JdbcPagingItemReader的分页策略我们在下面文章道来。
=================================END=================================
Spring Batch_使用JdbcPagingItemReader_多线程的Step