首页 > 代码库 > Spring Integration - 自动轮询发送手机短信

Spring Integration - 自动轮询发送手机短信

 

Spring Integration 配置

<?xml version="1.0" encoding="UTF-8"?><beans xmlns="http://www.springframework.org/schema/beans"    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:int="http://www.springframework.org/schema/integration"    xmlns:int-jpa="http://www.springframework.org/schema/integration/jpa"    xmlns:int-jms="http://www.springframework.org/schema/integration/jms"    xmlns:context="http://www.springframework.org/schema/context"    xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration-2.2.xsd            http://www.springframework.org/schema/integration/jms http://www.springframework.org/schema/integration/jms/spring-integration-jms.xsd            http://www.springframework.org/schema/context    http://www.springframework.org/schema/context/spring-context.xsd                        http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd            http://www.springframework.org/schema/integration/jpa http://www.springframework.org/schema/integration/jpa/spring-integration-jpa-2.2.xsd            ">        <int-jpa:inbound-channel-adapter        auto-startup="true" entity-manager="em"        send-timeout="60000" channel="process.channel"        expect-single-result="true"        jpa-query="SELECT sysdate FROM dual">        <int:poller fixed-delay="60000" />    </int-jpa:inbound-channel-adapter>            <int:channel id="process.channel">        <int:queue capacity="1"/>            </int:channel>    <int:chain input-channel="process.channel">                    <int-jpa:retrieving-outbound-gateway entity-manager="em" jpa-query="SELECT sp FROM SmsMessage sp Where sp.tatus is null order by sp.requestOn,sp.id"/>                            <int:splitter ref="process.processSplitter" method="split"/>                <int:service-activator ref="process.smsSenderService"            method="send" />                    <int:poller fixed-delay="5000" receive-timeout="-1"/>                </int:chain>                <bean id="process.smsSenderService" class="com.yd.core.service.SmsSenderService" />                <bean id="process.processSplitter" class="com.yd.core.service.PaymentProcessSplitter" /></beans>

Job Worker

import org.springframework.context.ApplicationContext;import org.springframework.integration.MessageChannel;import org.springframework.integration.support.MessageBuilder;public class JobWorker implements Runnable {    private static final int DEFAULT_WAIT_TIME = 3000;    @Override    public void run() {        while (true) {            try {                LoggerUtil.getJobLogger().info("JobWorker, Ready for take job run request.");                JobRunnerRequest jobRequest = JobManagerService.getJobManager().takeRequest();                while (jobRequest == null) {                    LoggerUtil.getJobLogger().warn("JobWorker, jobRequest is null, will try to get the job requet again.");                    Thread.sleep(DEFAULT_WAIT_TIME);                    jobRequest = JobManagerService.getJobManager().takeRequest();                }                LoggerUtil.getJobLogger().info("JobWorker, Received a job run request.");                MessageChannel channel = findChannel(jobRequest.getJobChannelId());                if (channel != null) {                    channel.send(MessageBuilder.withPayload(jobRequest.getJobMessagePayload()).build());                    LoggerUtil.getJobLogger().info("JobWorker, Completed to sned message to job channel");                }            }            catch (Exception ex) {                LoggerUtil.getJobLogger().warn("JobWorker, Completed to sned message to job channel");            }        }    }    private MessageChannel findChannel(String jobChannelId) {        ApplicationContext context = ApplicationContextProvider.getContext();        if (context == null) {            LoggerUtil.getJobLogger().error(String.format("JobWorker, Cannot get the application context, to startup job %s", jobChannelId));            return null;        }        Object channel = context.getBean(jobChannelId);        if (channel instanceof MessageChannel) {            return (MessageChannel) channel;        }        else {            LoggerUtil.getJobLogger().error(String.format("JobWorker, Cannot get the message bean, to startup job %s", jobChannelId));            return null;        }    }}

JobManagerService

import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;public final class JobManagerService {    private BlockingQueue<JobRunnerRequest> jobRequestQueue = new LinkedBlockingQueue<JobRunnerRequest>();    private static volatile  JobManagerService jobManagerInstnce;    private static Object objSyncLocker = new Object();    private JobManagerService() {    }    private void startupWorker() {        new Thread(new JobWorker()).start();    }    public static JobManagerService getJobManager() {        if (jobManagerInstnce == null) {            synchronized (objSyncLocker) {                if (jobManagerInstnce == null) {                    jobManagerInstnce = new JobManagerService();                    jobManagerInstnce.startupWorker();                }            }        }        return jobManagerInstnce;    }    public void addRequest(JobRunnerRequest request) {        try {            jobRequestQueue.put(request);        }        catch (InterruptedException e) {            LoggerUtil.getJobLogger().warn(e.getMessage(), e);        }    }    public JobRunnerRequest takeRequest() {        try {            return jobRequestQueue.take();        }        catch (InterruptedException e) {            LoggerUtil.getJobLogger().warn(e.getMessage(), e);            return null;        }    }}

 

ApplicatonContextProvider

import org.springframework.context.ApplicationContext;import org.springframework.context.ApplicationContextAware;public class ApplicationContextProvider implements ApplicationContextAware {    private static volatile ApplicationContext ctx;    public static ApplicationContext getContext() {        return ctx;    }    private static synchronized void setContext(ApplicationContext applicationContext) {        ctx = applicationContext;    }    @Override    public void setApplicationContext(ApplicationContext applicationContext){        setContext(applicationContext);    }}