首页 > 代码库 > 观察者模式实际应用:监听线程,意外退出线程后自动重启

观察者模式实际应用:监听线程,意外退出线程后自动重启

摘要:  观察者模式,定义对象之间的一种一对多的依赖关系,当对象的状态发生改变时,所有依赖于它的对象都得到通知并且被自动更新。观察者模式在JDK中有现成的实现,java.util.Obserable。

  首先说下需求:通过ftp上传约定格式的文件到服务器指定目录下,应用程序能实时监控该目录下文件变化,如果上传的文件格式符合要求,将将按照每一行读取解析再写入到数据库,解析完之后再将文件改名。(这个是原先已经实现了的功能,请看我的一篇文章java利用WatchService实时监控某个目录下的文件变化并按行解析(注:附源代码))

但项目上线一段时间后,发现再利用FileZilla登陆上传文件,文件不能被解析,而重启tomcat之后再上传,又能解析,于是判定是监控指定目录的那个线程挂掉了,导致上传后的文件不能被检测到,故也不能被解析。之后查看日志也最终验证了我推断。

  所以关键的问题就是:如何监听线程,当意外退出线程后进行自动重启,这也是本文所要利用观察者模式实现的。

下面请看实现过程(尤其见红色注解部分):

  1、web.xml监听器配置文件监控监听器,初始化创建一个监控指定目录的线程  

<?xml version="1.0" encoding="UTF-8"?><web-app version="2.5" xmlns="http://java.sun.com/xml/ns/javaee"    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"    xsi:schemaLocation="http://java.sun.com/xml/ns/javaee     http://java.sun.com/xml/ns/javaee/web-app_2_5.xsd">    <context-param>        <param-name>contextConfigLocation</param-name>        <param-value>classpath:root-context.xml</param-value>    </context-param>    <filter>        <filter-name>CharacterEncodingFilter</filter-name>        <filter-class>org.springframework.web.filter.CharacterEncodingFilter</filter-class>        <init-param>            <param-name>encoding</param-name>            <param-value>UTF-8</param-value>        </init-param>        <init-param>            <param-name>forceEncoding</param-name>            <param-value>true</param-value>        </init-param>    </filter>    <filter-mapping>        <filter-name>CharacterEncodingFilter</filter-name>        <url-pattern>/*</url-pattern>    </filter-mapping>    <filter>        <filter-name>sitemesh</filter-name>        <filter-class>com.opensymphony.sitemesh.webapp.SiteMeshFilter</filter-class>    </filter>    <filter-mapping>        <filter-name>sitemesh</filter-name>        <url-pattern>/*</url-pattern>    </filter-mapping>    <servlet>        <servlet-name>appServlet</servlet-name>        <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>        <init-param>            <param-name>contextConfigLocation</param-name>            <param-value>classpath:servlet-context.xml</param-value>        </init-param>        <load-on-startup>1</load-on-startup>    </servlet>    <servlet-mapping>        <servlet-name>appServlet</servlet-name>        <url-pattern>/</url-pattern>    </servlet-mapping>        <!-- 配置spring监听器 -->    <listener>        <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>    </listener>    <!-- 配置监控文件变化监听器 -->    <listener>        <listener-class>com.zealer.ad.listener.ThreadStartUpListenser</listener-class>    </listener>    <listener>        <listener-class>com.zealer.ad.listener.SessionLifecycleListener</listener-class>    </listener>            <jsp-config>      <taglib>       <taglib-uri>/tag</taglib-uri>       <taglib-location>/WEB-INF/tag/tag.tld</taglib-location>      </taglib>    </jsp-config>    <welcome-file-list>        <welcome-file>index.jsp</welcome-file>    </welcome-file-list>        <session-config>        <session-timeout>45</session-timeout>    </session-config></web-app>

  2、编写一个观察者实现类,用于监听“监控指定目录线程”,当“监控指定目录线程”挂掉后,自动重启该线程

    package com.zealer.ad.listener;            import java.util.Observable;    import java.util.Observer;
import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import com.zealer.ad.task.WatchFilePathTask; public class ObserverListener implements Observer{ private Log log = LogFactory.getLog(ObserverListener.class); @Override public void update(Observable o, Object arg) { log.info("WatchFilePathTask挂掉"); WatchFilePathTask run = new WatchFilePathTask(); run.addObserver(this); new Thread(run).start(); log.info("WatchFilePathTask重启"); } }

  3、编写一个ThreadStartUpListenser类,实现ServletContextListener,tomcat启动时创建后台线程

package com.zealer.ad.listener;import javax.servlet.ServletContextEvent;import javax.servlet.ServletContextListener;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.springframework.stereotype.Component;import com.zealer.ad.task.WatchFilePathTask;@Componentpublic class ThreadStartUpListenser implements ServletContextListener{    private static WatchFilePathTask r = new WatchFilePathTask();    private Log log = LogFactory.getLog(ThreadStartUpListenser.class);        @Override    public void contextDestroyed(ServletContextEvent paramServletContextEvent)    {//        r.interrupt();             }    @Override    public void contextInitialized(ServletContextEvent paramServletContextEvent)    {            ObserverListener listen = new ObserverListener();
       //给“监控指定目录下的线程”(被观察者),添加一个观察者 r.addObserver(listen);
new Thread(r).start();// r.start(); log.info("ImportUserFromFileTask is started!"); }}

  4、创建指定目录文件变化监控类WatchFilePathTask

package com.zealer.ad.task;import java.io.File;import java.io.FileFilter;import java.nio.file.FileSystems;import java.nio.file.Path;import java.nio.file.StandardWatchEventKinds;import java.nio.file.WatchEvent;import java.nio.file.WatchKey;import java.nio.file.WatchService;import java.util.Observable;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.joda.time.DateTime;import com.zealer.ad.util.ConfigUtils;import com.zealer.ad.util.SpringUtils;/** * 指定目录文件变化监控类 * @author cancer * */public class WatchFilePathTask extends Observable implements Runnable{    private Log log = LogFactory.getLog(WatchFilePathTask.class);        private static final String filePath = ConfigUtils.getInstance().getValue("userfile_path");        private WatchService watchService;        // 此方法一经调用,立马可以通知观察者,在本例中是监听线程    public void doBusiness()
  {
if(true)
     {
super.setChanged(); }
     //通知观察者,重启线程 notifyObservers(); } @Override
public void run() { try { //获取监控服务 watchService = FileSystems.getDefault().newWatchService(); log.debug("获取监控服务"+watchService); Path path = FileSystems.getDefault().getPath(filePath); log.debug("@@@:Path:"+path); final String todayFormat = DateTime.now().toString("yyyyMMdd"); File existFiles = new File(filePath); //启动时检查是否有未解析的符合要求的文件 if(existFiles.isDirectory()) { File[] matchFile = existFiles.listFiles(new FileFilter() { @Override public boolean accept(File pathname) { if((todayFormat+".txt").equals(pathname.getName())) { return true; } else { return false; } } }); if(null != matchFile) { for (File file : matchFile) { //找到符合要求的文件,开始解析 ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask"); task.setFileName(file.getAbsolutePath()); task.start(); } } } //注册监控服务,监控新增事件 WatchKey key = path.register(watchService, StandardWatchEventKinds.ENTRY_CREATE); while (true) { key = watchService.take(); for (WatchEvent<?> event : key.pollEvents()) { //获取目录下新增的文件名 String fileName = event.context().toString(); //检查文件名是否符合要求 if((todayFormat+".txt").equals(fileName)) { String filePath = path.toFile().getAbsolutePath()+File.separator+fileName; log.info("import filePath:"+filePath); //启动线程导入用户数据 ImportUserFromFileTask task = (ImportUserFromFileTask) SpringUtils.getApplicationContext().getBean("importUserFromFileTask");//new ImportUserFromFileTask(filePath); task.setFileName(filePath); task.start(); log.debug("启动线程导入用户数据"+task); } } key.reset(); } }
     catch (Exception e) { e.printStackTrace(); System.out.println("已经到这里来了"); doBusiness();//在抛出异常时调用,通知观察者,让其重启线程 } }}

  5、创建解析用户文件及导入数据库线程,由WatchFilePathTask启动

package com.zealer.ad.task;import com.zealer.ad.entity.AutoPutUser;import com.zealer.ad.entity.Bmsuser;import com.zealer.ad.service.AutoPutUserService;import org.apache.commons.logging.Log;import org.apache.commons.logging.LogFactory;import org.joda.time.DateTime;import java.io.BufferedReader;import java.io.File;import java.io.FileInputStream;import java.io.InputStreamReader;import java.util.Date;import javax.annotation.Resource;/** * 解析用户文件及入库线程,由WatchFilePathTask启动 * @author cancer * */public class ImportUserFromFileTask extends Thread {    private Log log = LogFactory.getLog(ImportUserFromFileTask.class);    private String fileName;    @Resource(name = "autoPutUserService")    private AutoPutUserService autoPutUserService;    @Override    public void run() {        File file = new File(fileName);        if (file.exists() && file.isFile()) {            log.debug(":@@@准备开始休眠10秒钟:" + file);            //休眠十分钟,防止文件过大还没完全拷贝到指定目录下,这里的线程就开始读取文件            try {                sleep(10000);            } catch (InterruptedException e1) {                e1.printStackTrace();            }            InputStreamReader read;            try {                read = new InputStreamReader(new FileInputStream(file), "UTF-8");                BufferedReader bufferedReader = new BufferedReader(read);                String lineTxt = null;                int count = 0;                Boolean f = false;                while ((lineTxt = bufferedReader.readLine()) != null) {                    if ((null == lineTxt) || "".equals(lineTxt)) {                        continue;                    }                    if (lineTxt.startsWith("‘")) {                        lineTxt = lineTxt.substring(1, lineTxt.length());                    }                    //解析分隔符为‘, ‘                    String[] lines = lineTxt.split("‘, ‘");                    int length = lines.length;                    if (length < 2) {                        continue;                    }                    Bmsuser bmsuser = new Bmsuser();                    bmsuser.setName(lines[0]);if (!"".equals(lines[1])) {                        bmsuser.setCity(lines[1]);                    }            //根据唯一索引已经存在的数据则不插入                    f = autoPutUserService.insertIgnore(bmsuser);                    if (f) {                        count++;                    }                }                //汇总数据                AutoPutUser autoPutUser = new AutoPutUser();                autoPutUser.setTotalCount(autoPutUserService.getUserCount());                autoPutUser.setCount(count);                autoPutUser.setCountDate(new Date(System.currentTimeMillis()));                String today = DateTime.now().toString("yyyy-MM-dd");                Integer oldCount = autoPutUserService.getOldCount(today);                //如果今天导入过了就更新否则插入                if (!oldCount.equals(0)) {                    autoPutUserService.updateUserData(autoPutUser, today,                        oldCount);                } else {                    autoPutUserService.gatherUserData(autoPutUser);                }                //注意:要关闭流                read.close();            } catch (Exception e) {                log.error(e.getMessage(), e);            }            File newFile = new File(file.getPath() +                    System.currentTimeMillis() + ".complate");            file.renameTo(newFile);        } else {            log.error(fileName + " file is not exists");        }    }    public String getFileName() {        return fileName;    }    public void setFileName(String fileName) {        this.fileName = fileName;    }    public AutoPutUserService getAutoPutUserService() {        return autoPutUserService;    }    public void setAutoPutUserService(AutoPutUserService autoPutUserService) {        this.autoPutUserService = autoPutUserService;    }}

 

 

附带:

1、sql脚本

CREATE TABLE `bmsuser` (  `id` int(255) unsigned NOT NULL AUTO_INCREMENT,  `name` varchar(32) DEFAULT NULL ,  `city` varchar(32) DEFAULT NULL COMMENT ,  PRIMARY KEY (`bmsid`),  UNIQUE KEY `bbLoginName` (`bbLoginName`)) ENGINE=InnoDB  DEFAULT CHARSET=utf8;

2、文件格式,命名为yyyyMMdd.txt

‘张三‘, ‘深圳‘

 

观察者模式实际应用:监听线程,意外退出线程后自动重启