首页 > 代码库 > springbatch操作DB
springbatch操作DB
一、需求分析
使用Spring Batch对DB进行读写操作: 从一个表中读取数据, 然后批量的插入另外一张表中.
二、代码实现
1. 代码结构图:
2. applicationContext.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd"> <!-- 配置spring扫描范围 --> <context:component-scan base-package="com.zdp" /> <!-- 配置数据源 --> <bean id="dataSource" class="com.mchange.v2.c3p0.ComboPooledDataSource" abstract="false" scope="singleton"> <property name="driverClass" value=http://www.mamicode.com/"org.gjt.mm.mysql.Driver" />>base-package: 扫描spring注解
jobLauncher: 启动Job
jobRepository: 为Job提供持久化操作
transactionManager: 提供事务管理操作
3. springBatch.xml
<?xml version="1.0" encoding="UTF-8"?> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:batch="http://www.springframework.org/schema/batch" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:tx="http://www.springframework.org/schema/tx" xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.1.xsd http://www.springframework.org/schema/batch http://www.springframework.org/schema/batch/spring-batch-2.1.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context-3.1.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-3.1.xsd"> <!-- 引入spring核心配置文件 --> <import resource="applicationContext.xml"/> <batch:job id="ledgerJob"> <!-- 监听job运行状态 --> <batch:listeners> <batch:listener ref="appJobExecutionListener" /> </batch:listeners> <batch:step id="step"> <!-- 添加事务控制 --> <batch:tasklet transaction-manager="transactionManager"> <batch:listeners> <batch:listener ref="itemFailureLoggerListener" /> </batch:listeners> <!-- commit-interval: 批量提交的条数; skip-limit: 指允许跳过记录数 --> <batch:chunk reader="ledgerReader" writer="ledgerWriter" commit-interval="1000" skip-limit="1000"> <batch:skippable-exception-classes> <batch:include class="java.lang.Exception"/> <!-- 出现exception或其子类, Job仍然会往后执行 --> <batch:exclude class="java.io.FileNotFoundException"/> <!-- 出现这个异常, Job会立刻停止 --> </batch:skippable-exception-classes> </batch:chunk> </batch:tasklet> </batch:step> </batch:job> <!-- 从ledger表读取数据 --> <bean id="ledgerReader" class="org.springframework.batch.item.database.JdbcCursorItemReader"> <property name="dataSource" ref="dataSource" /> <property name="sql" value=http://www.mamicode.com/"select * from ledger" /> >
4. AppJobExecutionListener.java/** * 监听job运行状态 */ @Component("appJobExecutionListener") public class AppJobExecutionListener implements JobExecutionListener { private final static Logger logger = Logger.getLogger(AppJobExecutionListener.class); public void afterJob(JobExecution jobExecution) { if (jobExecution.getStatus() == BatchStatus.COMPLETED) { logger.info("Job completed: " + jobExecution.getJobId()); } else if (jobExecution.getStatus() == BatchStatus.FAILED) { logger.info("Job failed: " + jobExecution.getJobId()); } } public void beforeJob(JobExecution jobExecution) { if (jobExecution.getStatus() == BatchStatus.COMPLETED) { logger.info("Job completed: " + jobExecution.getJobId()); } else if (jobExecution.getStatus() == BatchStatus.FAILED) { logger.info("Job failed: " + jobExecution.getJobId()); } } }5. ItemFailureLoggerListener.java
/** * 检查是读出错还是写出错 */ @Component("itemFailureLoggerListener") public class ItemFailureLoggerListener extends ItemListenerSupport<Object, Object> { private final static Logger LOG = Logger.getLogger(ItemFailureLoggerListener.class); public void onReadError(Exception ex) { LOG.error("Encountered error on read", ex); } public void onWriteError(Exception ex, Object item) { LOG.error("Encountered error on write", ex); } }
6. Ledger.javapublic class Ledger implements Serializable { private static final long serialVersionUID = 1L; private int id; private Date receiptDate; private String memberName; private String checkNumber; private Date checkDate; private String paymentType; private double depositAmount; private double paymentAmount; private String comments; // getter and setter }
7. LedgerRowMapper.java/** * ledger行的映射类 */ @SuppressWarnings("rawtypes") @Component("ledgerRowMapper") public class LedgerRowMapper implements RowMapper { public Object mapRow(ResultSet rs, int rowNum) throws SQLException { Ledger ledger = new Ledger(); ledger.setId(rs.getInt("ID")); ledger.setReceiptDate(rs.getDate("RECEIPT_DATE")); ledger.setMemberName(rs.getString("MEMBER_NAME")); ledger.setCheckNumber(rs.getString("MEMBER_NAME")); ledger.setCheckDate(rs.getDate("CHECK_DATE")); ledger.setPaymentType(rs.getString("PAYMENT_TYPE")); ledger.setDepositAmount(rs.getDouble("DEPOSIT_AMOUNT")); ledger.setPaymentAmount(rs.getDouble("PAYMENT_AMOUNT")); ledger.setComments(rs.getString("COMMENTS")); return ledger; } }
8. LedgerDao.javapublic interface LedgerDao { public void save(final Ledger item) ; }
9. LedgerDaoImpl.java/** * ledger数据操作类 */ @Repository public class LedgerDaoImpl implements LedgerDao { private static final String SAVE_SQL = "INSERT INTO LEDGER_TEMP (RECEIPT_DATE, MEMBER_NAME, CHECK_NUMBER, CHECK_DATE, PAYMENT_TYPE, DEPOSIT_AMOUNT, PAYMENT_AMOUNT, COMMENTS) VALUES(?,?,?,?,?,?,?,?)"; @Autowired private JdbcTemplate jdbcTemplate; @Override public void save(final Ledger item) { jdbcTemplate.update(SAVE_SQL, new PreparedStatementSetter() { public void setValues(PreparedStatement stmt) throws SQLException { stmt.setDate(1, new java.sql.Date(item.getReceiptDate().getTime())); stmt.setString(2, item.getMemberName()); stmt.setString(3, item.getCheckNumber()); stmt.setDate(4, new java.sql.Date(item.getCheckDate().getTime())); stmt.setString(5, item.getPaymentType()); stmt.setDouble(6, item.getDepositAmount()); stmt.setDouble(7, item.getPaymentAmount()); stmt.setString(8, item.getComments()); } }); } }
10. LedgerWriter.java/** * ledger写入数据 */ @Component("ledgerWriter") public class LedgerWriter implements ItemWriter<Ledger> { @Autowired private LedgerDao ledgerDao; /** * 写入数据 * @param ledgers */ public void write(List<? extends Ledger> ledgers) throws Exception { for (Ledger ledger : ledgers) { ledgerDao.save(ledger); } } }
11. QuartzLedgerJob.java/** * 定时调度类 */ @Component("quartzLedgerJob") public class QuartzLedgerJob { private static final Logger LOG = LoggerFactory.getLogger(QuartzLedgerJob.class); @Autowired private JobLauncher jobLauncher; @Autowired private Job ledgerJob; @Autowired JobParametersBuilder jobParameterBulider; private static long counter = 0l; /** * 执行业务方法 * @throws Exception */ public void execute() throws Exception { /** * Spring Batch Job同一个job instance,成功执行后是不允许重新执行的, * 失败后是否允许重跑,可通过配置Job的restartable参数来控制,默认是true,如果需要重新执行,可以变通处理, * 添加一个JobParameters构建类,以当前时间作为参数,保证其他参数相同的情况下是不同的job instance */ LOG.debug("start..."); StopWatch stopWatch = new StopWatch(); stopWatch.start(); jobParameterBulider.addDate("date", new Date()); jobLauncher.run(ledgerJob, jobParameterBulider.toJobParameters()); stopWatch.stop(); LOG.debug("Time elapsed:{},Execute quartz ledgerJob:{}", stopWatch.prettyPrint(), ++counter); } }
12. StartQuartz.java
/** * 启动定时调度 * 需求描述: 定时从表ledger读取数据, 然后批量写入表ledger_temp */ public class StartQuartz { public static void main(String[] args) throws FileNotFoundException { new ClassPathXmlApplicationContext("/com/zdp/resources/springBatch.xml"); } }
13. sql:create table ledger( ID int(10) not null AUTO_INCREMENT PRIMARY KEY, RECEIPT_DATE date, MEMBER_NAME varchar(10) , CHECK_NUMBER varchar(10) , CHECK_DATE date, PAYMENT_TYPE varchar(10) , DEPOSIT_AMOUNT double(10,3), PAYMENT_AMOUNT double(10,3), COMMENTS varchar(100) ); create table ledger_temp( ID int(10) not null AUTO_INCREMENT PRIMARY KEY, RECEIPT_DATE date, MEMBER_NAME varchar(10) , CHECK_NUMBER varchar(10) , CHECK_DATE date, PAYMENT_TYPE varchar(10) , DEPOSIT_AMOUNT double(10,3), PAYMENT_AMOUNT double(10,3), COMMENTS varchar(100) );
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。