首页 > 代码库 > Quartz定时向阿里云MQ发送数据(TCP模式)

Quartz定时向阿里云MQ发送数据(TCP模式)

针对公司业务逻辑,向阿里云MQ发送指定数据,消费端根据数据来做具体的业务,分两个项目,一个生产端(Producer)、一个消费端(Consumer)

生产端通过定时任务执行sql向阿里云MQ发送数据,消费端消费指定Topic上的数据

1:定时任务列表:

技术分享

2:生产端表结构:

技术分享

aliasName:定时任务别名;

cronExpression:定时任务轮询规则;

jobGroup:定时任务分组;

jobName:定时任务名称;

jobTrigger:定时任务触发器;

packageUrl:定时任务扫描具体封装类;

excuteSql:扫描类中执行的获取数据的脚本;

lastPramaryKey:最后一次获取数据时最大的主键;

topic:阿里云MQ的topic;

producerId:生产端的Id;

accessKey、securityKey:账号跟秘钥

dataBaseType:操作数据库类型(公司数据库类型比较多,执行脚本时,需要根据类型来指定具体的Service)

技术分享

3:Java端核心代码,定时任务扫描如下配置的任务类来向阿里云MQ发送数据

public class SendPrimaryKeyListToMqTask implements Job{        private final Logger logger = LoggerFactory.getLogger(SendPrimaryKeyListToMqTask.class);        public void execute(JobExecutionContext context) throws JobExecutionException{        JobDataMap data = context.getJobDetail().getJobDataMap();        ScheduleJob scheduleJob = (ScheduleJob)data.get("jobParam");            //最后一次获取数据时最大的主键        int lastPramaryKey = scheduleJob.getLastPramaryKey();                //执行sql        String excuteSql = scheduleJob.getExcuteSql();        excuteSql = excuteSql.replace("lastPramaryKey", String.valueOf(lastPramaryKey));                //操作数据库类型(数据库配置)        int dataBaseType = scheduleJob.getDataBaseType();                //从游戏库获取数据        LinkedList<ExcuteResultData> resultData = http://www.mamicode.com/new LinkedList<ExcuteResultData>();        if( dataBaseType == 1 ){            GameService gameService = (GameService)SpringBeanFactory.getBean(GameService.class);            resultData = gameService.getExcuteResultData(excuteSql);        //从网站库获取数据        }else if( dataBaseType == 2 ){             SiteService siteService = (SiteService)SpringBeanFactory.getBean(SiteService.class);             resultData = siteService.getExcuteResultData(excuteSql);        }            if ( resultData.size() > 0 ){            scheduleJob.setPrimaryKeyList(resultData);            QuartzService quartzService = (QuartzService)SpringBeanFactory.getBean(QuartzService.class);            //将数据集中最大的主键更新            scheduleJob.setLastPramaryKey(resultData.getLast().getPrimaryKey());            quartzService.updateLastPramaryKey(scheduleJob);                String topic = scheduleJob.getTopic();            String producerId = scheduleJob.getProducerId();            String ak = scheduleJob.getAccessKey();            String sk = scheduleJob.getSecurityKey();                        //添加日志            ScheduleJobLog scjLog = new ScheduleJobLog();            scjLog.setDataSize(resultData.size());            scjLog.setJobName(scheduleJob.getJobName());            scjLog.setTopic(topic);            int scjLogId = quartzService.addMqScheduleJobLog(scjLog);            //消费端根据此日志主键更新日志状态            scheduleJob.setScjLogId(scjLogId);                        Properties properties = new Properties();            properties.put("ProducerId", producerId);            properties.put("AccessKey", ak);            properties.put("SecretKey", sk);            Producer producer = ONSFactory.createProducer(properties);            producer.start();                        Message msg = new Message(topic, "PRIMARY_KEY_" + String.valueOf(scjLogId), ObjectsTranscoder.serialize(scheduleJob));            msg.setKey("PRIMARY_KEY_" + String.valueOf(scjLogId));            SendResult sendResult = producer.send(msg);            if ( ( sendResult != null ) && ( sendResult.getMessageId() != null ) ){                scjLog.setMessageId(sendResult.getMessageId());                scjLog.setStatus(2);                quartzService.updateMqScheduleJobLog(scjLog);            }                        producer.shutdown();                logger.debug("=====>任务名称:" + scheduleJob.getJobName());            logger.debug("=====>发送条数:" + resultData.size());            logger.debug("=====>发送主键内容:" + resultData.toString());        }    }}

4:消费端表结构:

技术分享

5:消费端Java核心代码(通过监听器来做):

import java.util.List;import java.util.Properties;import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;import org.springframework.web.context.WebApplicationContext;import org.springframework.web.context.support.WebApplicationContextUtils;import com.aliyun.openservices.ons.api.Action;import com.aliyun.openservices.ons.api.ConsumeContext;import com.aliyun.openservices.ons.api.Consumer;import com.aliyun.openservices.ons.api.Message;import com.aliyun.openservices.ons.api.MessageListener;import com.aliyun.openservices.ons.api.ONSFactory;import com.aliyun.openservices.ons.api.PropertyKeyConst;import com.odao.common.utils.ObjectsTranscoder;import com.odao.entity.ScheduleJob;import com.odao.entity.ScheduleJobLog;import com.odao.service.consumer.ConsumerService;import com.odao.service.message.MessageService;/** * 阿里云游戏、网站 主键数据集消费监听器 */public class ConsumePrimaryKeyFromMqListener implements ServletContextListener {        @Override    public void contextInitialized(ServletContextEvent sce) {        WebApplicationContext appctx = WebApplicationContextUtils.getWebApplicationContext(sce.getServletContext());        MessageService messageService = (MessageService) appctx.getBean(MessageService.class);        List<ScheduleJob> consumeList = messageService.getScheduleJobList();        for(final ScheduleJob sjc : consumeList){                        String topic = sjc.getTopic();            String consumerId= sjc.getConsumerId();            String ak = sjc.getAccessKey();            String sk = sjc.getSecurityKey();                        Properties properties = new Properties();            properties.put(PropertyKeyConst.ConsumerId,consumerId);            properties.put(PropertyKeyConst.AccessKey,ak);            properties.put(PropertyKeyConst.SecretKey,sk);            //properties.put(PropertyKeyConst.ONSAddr,"http://onsaddr-internal.aliyun.com:8080/rocketmq/nsaddr4client-internal");                        Consumer consumer = ONSFactory.createConsumer(properties);                        consumer.subscribe(topic, "*", new MessageListener() {                @Override                public Action consume(Message message, ConsumeContext context) {                    ScheduleJob scheduleJob = (ScheduleJob) ObjectsTranscoder.deserialize(message.getBody());                    if( scheduleJob !=null ){                        //更新消息状态为3:消费消息成功                        ScheduleJobLog scjLog = new ScheduleJobLog();                        scjLog.setStatus(3);                        scjLog.setMqScheduleJobLogId(scheduleJob.getScjLogId());                        messageService.updateMqScheduleJobLog(scjLog);                        try {                            ConsumerService consumerService = (ConsumerService) Class.forName(sjc.getImplementClass()).newInstance();                            boolean isSuccess = consumerService.consume(scheduleJob.getPrimaryKeyList());                            if(isSuccess){                                //更新消息状态为4:业务逻辑处理成功                                scjLog.setStatus(4);                                messageService.updateMqScheduleJobLog(scjLog);                            }                        } catch (InstantiationException e) {                            e.printStackTrace();                        } catch (IllegalAccessException e) {                            e.printStackTrace();                        } catch (ClassNotFoundException e) {                            e.printStackTrace();                        }                    }                    return Action.CommitMessage;                }            });                        consumer.start();        }    }        @Override    public void contextDestroyed(ServletContextEvent sce) {            }}

 

Quartz定时向阿里云MQ发送数据(TCP模式)