首页 > 代码库 > QQ聊天

QQ聊天

 

   一、服务器,好了,废话不多说,我们先来看看服务器部分,我这里用到线程池,至于为什么用线程池,不知道的童鞋可以去我的另一篇blog看看:http://blog.csdn.net/weidi1989/article/details/7930820。当一个用户连接上之后,我们马上将该用户的socket丢入已经建好的线程池中去处理,这样可以很快腾出时间来接受下一个用户的连接,而线程池中的这个线程又分支为两个线程,一个是读消息线程,一个是写消息线程,当然,因为我这个聊天是用来转发消息的,所以还以单例模式建了一个Map用来存放每个用户的写消息线程(如果用户多的话,这是相当消耗资源的),以便在转发消息的时候,通过Map的key就可以取出对应用户的写消息线程,从而达到转发消息的目的。具体下面再说

 

[java] view plaincopy
 
  1. /** 
  2.  * 服务器,接受用户登录、离线、转发消息 
  3.  *  
  4.  * @author way 
  5.  *  
  6.  */  
  7. public class Server {  
  8.     private ExecutorService executorService;// 线程池  
  9.     private ServerSocket serverSocket = null;  
  10.     private Socket socket = null;  
  11.     private boolean isStarted = true;//是否循环等待  
  12.   
  13.     public Server() {  
  14.         try {  
  15.             // 创建线程池,池中具有(cpu个数*50)条线程  
  16.             executorService = Executors.newFixedThreadPool(Runtime.getRuntime()  
  17.                     .availableProcessors() * 50);  
  18.             serverSocket = new ServerSocket(Constants.SERVER_PORT);  
  19.         } catch (IOException e) {  
  20.             e.printStackTrace();  
  21.             quit();  
  22.         }  
  23.     }  
  24.   
  25.     public void start() {  
  26.         System.out.println(MyDate.getDateCN() + " 服务器已启动...");  
  27.         try {  
  28.             while (isStarted) {  
  29.                 socket = serverSocket.accept();  
  30.                 String ip = socket.getInetAddress().toString();  
  31.                 System.out.println(MyDate.getDateCN() + " 用户:" + ip + " 已建立连接");  
  32.                 // 为支持多用户并发访问,采用线程池管理每一个用户的连接请求  
  33.                 if (socket.isConnected())  
  34.                     executorService.execute(new SocketTask(socket));// 添加到线程池  
  35.             }  
  36.             if (socket != null)//循环结束后,记得关闭socket,释放资源  
  37.                 socket.close();  
  38.             if (serverSocket != null)  
  39.                 serverSocket.close();  
  40.         } catch (IOException e) {  
  41.             e.printStackTrace();  
  42.             // isStarted = false;  
  43.         }  
  44.     }  
  45.   
  46.     private final class SocketTask implements Runnable {  
  47.         private Socket socket = null;  
  48.         private InputThread in;  
  49.         private OutputThread out;  
  50.         private OutputThreadMap map;  
  51.   
  52.         public SocketTask(Socket socket) {  
  53.             this.socket = socket;  
  54.             map = OutputThreadMap.getInstance();  
  55.         }  
  56.   
  57.         @Override  
  58.         public void run() {  
  59.             out = new OutputThread(socket, map);//  
  60.             // 先实例化写消息线程,(把对应用户的写线程存入map缓存器中)  
  61.             in = new InputThread(socket, out, map);// 再实例化读消息线程  
  62.             out.setStart(true);  
  63.             in.setStart(true);  
  64.             in.start();  
  65.             out.start();  
  66.         }  
  67.     }  
  68.   
  69.     /** 
  70.      * 退出 
  71.      */  
  72.     public void quit() {  
  73.         try {  
  74.             this.isStarted = false;  
  75.             serverSocket.close();  
  76.         } catch (IOException e) {  
  77.             e.printStackTrace();  
  78.         }  
  79.     }  
  80.   
  81.     public static void main(String[] args) {  
  82.         new Server().start();  
  83.     }  
  84. }  

 


二、服务器写消息线程,接下来,我们来看看写消息线程,很简单的一段代码,有注释,我就不多说了:

 

[java] view plaincopy
 
  1. /** 
  2.  * 写消息线程 
  3.  *  
  4.  * @author way 
  5.  *  
  6.  */  
  7. public class OutputThread extends Thread {  
  8.     private OutputThreadMap map;  
  9.     private ObjectOutputStream oos;  
  10.     private TranObject object;  
  11.     private boolean isStart = true;// 循环标志位  
  12.     private Socket socket;  
  13.   
  14.     public OutputThread(Socket socket, OutputThreadMap map) {  
  15.         try {  
  16.             this.socket = socket;  
  17.             this.map = map;  
  18.             oos = new ObjectOutputStream(socket.getOutputStream());// 在构造器里面实例化对象输出流  
  19.         } catch (IOException e) {  
  20.             e.printStackTrace();  
  21.         }  
  22.     }  
  23.   
  24.     public void setStart(boolean isStart) {//用于外部关闭写线程  
  25.         this.isStart = isStart;  
  26.     }  
  27.   
  28.     // 调用写消息线程,设置了消息之后,唤醒run方法,可以节约资源  
  29.     public void setMessage(TranObject object) {  
  30.         this.object = object;  
  31.         synchronized (this) {  
  32.             notify();  
  33.         }  
  34.     }  
  35.   
  36.     @Override  
  37.     public void run() {  
  38.         try {  
  39.             while (isStart) {  
  40.                 // 没有消息写出的时候,线程等待  
  41.                 synchronized (this) {  
  42.                     wait();  
  43.                 }  
  44.                 if (object != null) {  
  45.                     oos.writeObject(object);  
  46.                     oos.flush();  
  47.                 }  
  48.             }  
  49.             if (oos != null)// 循环结束后,关闭流,释放资源  
  50.                 oos.close();  
  51.             if (socket != null)  
  52.                 socket.close();  
  53.         } catch (InterruptedException e) {  
  54.             e.printStackTrace();  
  55.         } catch (IOException e) {  
  56.             e.printStackTrace();  
  57.         }  
  58.     }  
  59. }  

 


 三、服务器写消息线程缓存器,接下来让我们看一下那个写消息线程缓存器的庐山真面目:

 

[java] view plaincopy
 
  1. /** 
  2.  * 存放写线程的缓存器 
  3.  *  
  4.  * @author way 
  5.  */  
  6. public class OutputThreadMap {  
  7.     private HashMap<Integer, OutputThread> map;  
  8.     private static OutputThreadMap instance;  
  9.   
  10.     // 私有构造器,防止被外面实例化改对像  
  11.     private OutputThreadMap() {  
  12.         map = new HashMap<Integer, OutputThread>();  
  13.     }  
  14.   
  15.     // 单例模式像外面提供该对象  
  16.     public synchronized static OutputThreadMap getInstance() {  
  17.         if (instance == null) {  
  18.             instance = new OutputThreadMap();  
  19.         }  
  20.         return instance;  
  21.     }  
  22.   
  23.     // 添加写线程的方法  
  24.     public synchronized void add(Integer id, OutputThread out) {  
  25.         map.put(id, out);  
  26.     }  
  27.   
  28.     // 移除写线程的方法  
  29.     public synchronized void remove(Integer id) {  
  30.         map.remove(id);  
  31.     }  
  32.   
  33.     // 取出写线程的方法,群聊的话,可以遍历取出对应写线程  
  34.     public synchronized OutputThread getById(Integer id) {  
  35.         return map.get(id);  
  36.     }  
  37.   
  38.     // 得到所有写线程方法,用于向所有在线用户发送广播  
  39.     public synchronized List<OutputThread> getAll() {  
  40.         List<OutputThread> list = new ArrayList<OutputThread>();  
  41.         for (Map.Entry<Integer, OutputThread> entry : map.entrySet()) {  
  42.             list.add(entry.getValue());  
  43.         }  
  44.         return list;  
  45.     }  
  46. }  

 


四、服务器读消息线程,接下来是读消息线程,这里包括两个部分,一部分是读消息,另一部分是处理消息,我以分开的形式贴出代码,虽然我是写在一个类里面的:

 

[java] view plaincopy
 
  1. /** 
  2.  * 读消息线程和处理方法 
  3.  *  
  4.  * @author way 
  5.  *  
  6.  */  
  7. public class InputThread extends Thread {  
  8.     private Socket socket;// socket对象  
  9.     private OutputThread out;// 传递进来的写消息线程,因为我们要给用户回复消息啊  
  10.     private OutputThreadMap map;//写消息线程缓存器  
  11.     private ObjectInputStream ois;//对象输入流  
  12.     private boolean isStart = true;//是否循环读消息  
  13.   
  14.     public InputThread(Socket socket, OutputThread out, OutputThreadMap map) {  
  15.         this.socket = socket;  
  16.         this.out = out;  
  17.         this.map = map;  
  18.         try {  
  19.             ois = new ObjectInputStream(socket.getInputStream());//实例化对象输入流  
  20.         } catch (IOException e) {  
  21.             e.printStackTrace();  
  22.         }  
  23.   
  24.     }  
  25.   
  26.     public void setStart(boolean isStart) {//提供接口给外部关闭读消息线程  
  27.         this.isStart = isStart;  
  28.     }  
  29.   
  30.     @Override  
  31.     public void run() {  
  32.         try {  
  33.             while (isStart) {  
  34.                 // 读取消息  
  35.                 readMessage();  
  36.             }  
  37.             if (ois != null)  
  38.                 ois.close();  
  39.             if (socket != null)  
  40.                 socket.close();  
  41.         } catch (ClassNotFoundException e) {  
  42.             e.printStackTrace();  
  43.         } catch (IOException e) {  
  44.             e.printStackTrace();  
  45.         }  
  46.   
  47.     }  

 


五、服务器消息处理,下面是处理消息的方法,由于比较麻烦以及各种纠结,我就与读消息线程分开贴,显得稍微简洁一点:

 

 

 

[java] view plaincopy
 
  1. /** 
  2.      * 读消息以及处理消息,抛出异常 
  3.      *  
  4.      * @throws IOException 
  5.      * @throws ClassNotFoundException 
  6.      */  
  7.     public void readMessage() throws IOException, ClassNotFoundException {  
  8.         Object readObject = ois.readObject();// 从流中读取对象  
  9.         UserDao dao = UserDaoFactory.getInstance();// 通过dao模式管理后台  
  10.         if (readObject != null && readObject instanceof TranObject) {  
  11.             TranObject read_tranObject = (TranObject) readObject;// 转换成传输对象  
  12.             switch (read_tranObject.getType()) {  
  13.             case REGISTER:// 如果用户是注册  
  14.                 User registerUser = (User) read_tranObject.getObject();  
  15.                 int registerResult = dao.register(registerUser);  
  16.                 System.out.println(MyDate.getDateCN() + " 新用户注册:"  
  17.                         + registerResult);  
  18.                 // 给用户回复消息  
  19.                 TranObject<User> register2TranObject = new TranObject<User>(  
  20.                         TranObjectType.REGISTER);  
  21.                 User register2user = new User();  
  22.                 register2user.setId(registerResult);  
  23.                 register2TranObject.setObject(register2user);  
  24.                 out.setMessage(register2TranObject);  
  25.                 break;  
  26.             case LOGIN:  
  27.                 User loginUser = (User) read_tranObject.getObject();  
  28.                 ArrayList<User> list = dao.login(loginUser);  
  29.                 TranObject<ArrayList<User>> login2Object = new TranObject<ArrayList<User>>(  
  30.                         TranObjectType.LOGIN);  
  31.                 if (list != null) {// 如果登录成功  
  32.                     TranObject<User> onObject = new TranObject<User>(  
  33.                             TranObjectType.LOGIN);  
  34.                     User login2User = new User();  
  35.                     login2User.setId(loginUser.getId());  
  36.                     onObject.setObject(login2User);  
  37.                     for (OutputThread onOut : map.getAll()) {  
  38.                         onOut.setMessage(onObject);// 广播一下用户上线  
  39.                     }  
  40.                     map.add(loginUser.getId(), out);// 先广播,再把对应用户id的写线程存入map中,以便转发消息时调用  
  41.                     login2Object.setObject(list);// 把好友列表加入回复的对象中  
  42.                 } else {  
  43.                     login2Object.setObject(null);  
  44.                 }  
  45.                 out.setMessage(login2Object);// 同时把登录信息回复给用户  
  46.   
  47.                 System.out.println(MyDate.getDateCN() + " 用户:"  
  48.                         + loginUser.getId() + " 上线了");  
  49.                 break;  
  50.             case LOGOUT:// 如果是退出,更新数据库在线状态,同时群发告诉所有在线用户  
  51.                 User logoutUser = (User) read_tranObject.getObject();  
  52.                 int offId = logoutUser.getId();  
  53.                 System.out  
  54.                         .println(MyDate.getDateCN() + " 用户:" + offId + " 下线了");  
  55.                 dao.logout(offId);  
  56.                 isStart = false;// 结束自己的读循环  
  57.                 map.remove(offId);// 从缓存的线程中移除  
  58.                 out.setMessage(null);// 先要设置一个空消息去唤醒写线程  
  59.                 out.setStart(false);// 再结束写线程循环  
  60.   
  61.                 TranObject<User> offObject = new TranObject<User>(  
  62.                         TranObjectType.LOGOUT);  
  63.                 User logout2User = new User();  
  64.                 logout2User.setId(logoutUser.getId());  
  65.                 offObject.setObject(logout2User);  
  66.                 for (OutputThread offOut : map.getAll()) {// 广播用户下线消息  
  67.                     offOut.setMessage(offObject);  
  68.                 }  
  69.                 break;  
  70.             case MESSAGE:// 如果是转发消息(可添加群发)  
  71.                 // 获取消息中要转发的对象id,然后获取缓存的该对象的写线程  
  72.                 int id2 = read_tranObject.getToUser();  
  73.                 OutputThread toOut = map.getById(id2);  
  74.                 if (toOut != null) {// 如果用户在线  
  75.                     toOut.setMessage(read_tranObject);  
  76.                 } else {// 如果为空,说明用户已经下线,回复用户  
  77.                     TextMessage text = new TextMessage();  
  78.                     text.setMessage("亲!对方不在线哦,您的消息将暂时保存在服务器");  
  79.                     TranObject<TextMessage> offText = new TranObject<TextMessage>(  
  80.                             TranObjectType.MESSAGE);  
  81.                     offText.setObject(text);  
  82.                     offText.setFromUser(0);  
  83.                     out.setMessage(offText);  
  84.                 }  
  85.                 break;  
  86.             case REFRESH:  
  87.                 List<User> refreshList = dao.refresh(read_tranObject  
  88.                         .getFromUser());  
  89.                 TranObject<List<User>> refreshO = new TranObject<List<User>>(  
  90.                         TranObjectType.REFRESH);  
  91.                 refreshO.setObject(refreshList);  
  92.                 out.setMessage(refreshO);  
  93.                 break;  
  94.             default:  
  95.                 break;  
  96.             }  
  97.         }  
  98.     }  

 


好了,服务器的核心代码就这么一些了,很简单吧?是的,因为我们还有很多事情没有去做,比如说心跳监测用户是否一直在线,如果不在线,就释放资源等,这些都是商业项目中必须要考虑到的问题,至于这个通过心跳监测用户是否在线,我说说我的一些想法吧:由客户端定时给服务器发送一个心跳包(最好是空包,节约流量),服务器也定时去监测那个心跳包,如果有3次未收到客户端的心跳包,就判断该用户已经掉线,释放资源,至于这次数和时间间隔,就随情况而定了。如果有什么更好的其他建议,欢迎给我留言,谢谢。

 

 

 

六、消息传输对象,下面,我们来看看,这个超级消息对象和定义好的消息类型:

 

[java] view plaincopy
 
  1. /** 
  2.  * 传输的对象,直接通过Socket传输的最大对象 
  3.  *  
  4.  * @author way 
  5.  */  
  6. public class TranObject<T> implements Serializable {  
  7.     /** 
  8.      *  
  9.      */  
  10.     private static final long serialVersionUID = 1L;  
  11.   
  12.     private TranObjectType type;// 发送的消息类型  
  13.   
  14.     private int fromUser;// 来自哪个用户  
  15.     private int toUser;// 发往哪个用户  
  16.   
  17.     private T object;// 传输的对象,这个对象我们可以自定义任何  
  18.     private List<Integer> group;// 群发给哪些用户  
  19.   
  20. get...set...  

 

 

 

[java] view plaincopy
 
  1. /** 
  2.  * 传输对象类型 
  3.  *  
  4.  * @author way 
  5.  *  
  6.  */  
  7. public enum TranObjectType {  
  8.     REGISTER, // 注册  
  9.     LOGIN, // 用户登录  
  10.     LOGOUT, // 用户退出登录  
  11.     FRIENDLOGIN, // 好友上线  
  12.     FRIENDLOGOUT, // 好友下线  
  13.     MESSAGE, // 用户发送消息  
  14.     UNCONNECTED, // 无法连接  
  15.     FILE, // 传输文件  
  16.     REFRESH,//刷新好友列表  
  17. }  

 

 

 

 

 

七、客户端,然后是客户端部分了,其实跟服务器差不多,只是没有建立线程池了,因为没有必要,是吧?然后实例化写线程和读线程没有先后顺序,这也勉强算一个区别吧~呵呵

 

[java] view plaincopy
 
  1. /** 
  2.  * 客户端 
  3.  *  
  4.  * @author way 
  5.  *  
  6.  */  
  7. public class Client {  
  8.   
  9.     private Socket client;  
  10.     private ClientThread clientThread;  
  11.     private String ip;  
  12.     private int port;  
  13.   
  14.     public Client(String ip, int port) {  
  15.         this.ip = ip;  
  16.         this.port = port;  
  17.     }  
  18.   
  19.     public boolean start() {  
  20.         try {  
  21.             client = new Socket();  
  22.             // client.connect(new InetSocketAddress(Constants.SERVER_IP,  
  23.             // Constants.SERVER_PORT), 3000);  
  24.             client.connect(new InetSocketAddress(ip, port), 3000);  
  25.             if (client.isConnected()) {  
  26.                 // System.out.println("Connected..");  
  27.                 clientThread = new ClientThread(client);  
  28.                 clientThread.start();  
  29.             }  
  30.         } catch (IOException e) {  
  31.             e.printStackTrace();  
  32.             return false;  
  33.         }  
  34.         return true;  
  35.     }  
  36.   
  37.     // 直接通过client得到读线程  
  38.     public ClientInputThread getClientInputThread() {  
  39.         return clientThread.getIn();  
  40.     }  
  41.   
  42.     // 直接通过client得到写线程  
  43.     public ClientOutputThread getClientOutputThread() {  
  44.         return clientThread.getOut();  
  45.     }  
  46.   
  47.     // 直接通过client停止读写消息  
  48.     public void setIsStart(boolean isStart) {  
  49.         clientThread.getIn().setStart(isStart);  
  50.         clientThread.getOut().setStart(isStart);  
  51.     }  
  52.       
  53.     public class ClientThread extends Thread {  
  54.   
  55.         private ClientInputThread in;  
  56.         private ClientOutputThread out;  
  57.   
  58.         public ClientThread(Socket socket) {  
  59.             in = new ClientInputThread(socket);  
  60.             out = new ClientOutputThread(socket);  
  61.         }  
  62.   
  63.         public void run() {  
  64.             in.setStart(true);  
  65.             out.setStart(true);  
  66.             in.start();  
  67.             out.start();  
  68.         }  
  69.   
  70.         // 得到读消息线程  
  71.         public ClientInputThread getIn() {  
  72.             return in;  
  73.         }  
  74.   
  75.         // 得到写消息线程  
  76.         public ClientOutputThread getOut() {  
  77.             return out;  
  78.         }  
  79.     }  
  80. }  

 


八、客户端写消息线程,先看看客户端写消息线程吧:

 

[java] view plaincopy
 
  1. /** 
  2.  * 客户端写消息线程 
  3.  *  
  4.  * @author way 
  5.  *  
  6.  */  
  7. public class ClientOutputThread extends Thread {  
  8.     private Socket socket;  
  9.     private ObjectOutputStream oos;  
  10.     private boolean isStart = true;  
  11.     private TranObject msg;  
  12.   
  13.     public ClientOutputThread(Socket socket) {  
  14.         this.socket = socket;  
  15.         try {  
  16.             oos = new ObjectOutputStream(socket.getOutputStream());  
  17.         } catch (IOException e) {  
  18.             e.printStackTrace();  
  19.         }  
  20.     }  
  21.   
  22.     public void setStart(boolean isStart) {  
  23.         this.isStart = isStart;  
  24.     }  
  25.   
  26.     // 这里处理跟服务器是一样的  
  27.     public void setMsg(TranObject msg) {  
  28.         this.msg = msg;  
  29.         synchronized (this) {  
  30.             notify();  
  31.         }  
  32.     }  
  33.   
  34.     @Override  
  35.     public void run() {  
  36.         try {  
  37.             while (isStart) {  
  38.                 if (msg != null) {  
  39.                     oos.writeObject(msg);  
  40.                     oos.flush();  
  41.                     if (msg.getType() == TranObjectType.LOGOUT) {// 如果是发送下线的消息,就直接跳出循环  
  42.                         break;  
  43.                     }  
  44.                     synchronized (this) {  
  45.                         wait();// 发送完消息后,线程进入等待状态  
  46.                     }  
  47.                 }  
  48.             }  
  49.             oos.close();// 循环结束后,关闭输出流和socket  
  50.             if (socket != null)  
  51.                 socket.close();  
  52.         } catch (InterruptedException e) {  
  53.             e.printStackTrace();  
  54.         } catch (IOException e) {  
  55.             e.printStackTrace();  
  56.         }  
  57.     }  
  58.   
  59. }  

 


九、客户端读消息线程,然后是客户端读消息线程,这里又有一个要注意的地方,我们收到消息的时候,是不是要告诉用户?如何告诉呢?接口监听貌似是一个很好的办法,神马?不知道接口监听?你会用Android的setOnClickListener不?这就是android封装好的点击事件监听,不懂的话,可以好好看看,理解一下,其实也不难:

 

 

 

[java] view plaincopy
 
    1. /** 
    2.  * 客户端读消息线程 
    3.  *  
    4.  * @author way 
    5.  *  
    6.  */  
    7. public class ClientInputThread extends Thread {  
    8.     private Socket socket;  
    9.     private TranObject msg;  
    10.     private boolean isStart = true;  
    11.     private ObjectInputStream ois;  
    12.     private MessageListener messageListener;// 消息监听接口对象  
    13.   
    14.     public ClientInputThread(Socket socket) {  
    15.         this.socket = socket;  
    16.         try {  
    17.             ois = new ObjectInputStream(socket.getInputStream());  
    18.         } catch (IOException e) {  
    19.             e.printStackTrace();  
    20.         }  
    21.     }  
    22.   
    23.     /** 
    24.      * 提供给外部的消息监听方法 
    25.      *  
    26.      * @param messageListener 
    27.      *            消息监听接口对象 
    28.      */  
    29.     public void setMessageListener(MessageListener messageListener) {  
    30.         this.messageListener = messageListener;  
    31.     }  
    32.   
    33.     public void setStart(boolean isStart) {  
    34.         this.isStart = isStart;  
    35.     }  
    36.   
    37.     @Override  
    38.     public void run() {  
    39.         try {  
    40.             while (isStart) {  
    41.                 msg = (TranObject) ois.readObject();  
    42.                 // 每收到一条消息,就调用接口的方法,并传入该消息对象,外部在实现接口的方法时,就可以及时处理传入的消息对象了  
    43.                 // 我不知道我有说明白没有?  
    44.                 messageListener.Message(msg);  
    45.             }  
    46.             ois.close();  
    47.             if (socket != null)  
    48.                 socket.close();  
    49.         } catch (ClassNotFoundException e) {  
    50.             e.printStackTrace();  
    51.         } catch (IOException e) {  
    52.             e.printStackTrace();  
    53.         }  
    54.     }  
    55.       
    56.     /** 
    57.      * 消息监听接口 
    58.      *  
    59.      * @author way 
    60.      *  
    61.      */  
    62.     public interface MessageListener {  
    63.         public void Message(TranObject msg);  
    64.     }  
    65. }  

 

QQ聊天