首页 > 代码库 > Java调用MQ队列

Java调用MQ队列

IBM MQ 6.0中设置两个队列,(远程队列、通道之类都不设置)。

队列管理器是XIR_QM_1502

队列名称是ESBREQ

IP地址是10.23.117.134(远程的一台电脑,跟我的电脑不在一个局域网内)

端口1414

CCSID 1208


MQ配置可以参考这个,有配图http://wenku.baidu.com/view/06d108d0360cba1aa811daa3.html

程序如下,发送线程两个,接收线程一个。接收完毕后就结束。


[java] view plaincopy
  1. /* 
  2.  * 创建日期 2012-7-10 
  3.  * 
  4.  * TODO 要更改此生成的文件的模板,请转至 
  5.  * 窗口 - 首选项 - Java - 代码样式 - 代码模板 
  6.  */  
  7. package yerasel;  
  8.   
  9. /** 
  10.  * @author Fenglb E-mail:56553655@163.com 
  11.  * @version 创建时间:2009-4-30 下午04:13:38 类说明 
  12.  */  
  13.   
  14. import java.io.IOException;  
  15. import com.ibm.mq.MQC;  
  16. import com.ibm.mq.MQEnvironment;  
  17. import com.ibm.mq.MQException;  
  18. import com.ibm.mq.MQGetMessageOptions;  
  19. import com.ibm.mq.MQMessage;  
  20. import com.ibm.mq.MQPutMessageOptions;  
  21. import com.ibm.mq.MQQueue;  
  22. import com.ibm.mq.MQQueueManager;  
  23.   
  24. interface SomeConstants {  
  25.     String qManager = "XIR_QM_1502";//"XIR_QM"; //QueueManager name  
  26.     String qName = "ESBREQ";// Queue Name  
  27.     String strIP = "10.23.117.134";//"10.24.28.139";//"10.24.28.102";  
  28.     int iPort = 1502;//1414;  
  29.     String strChl = "SYSTEM.DEF.SVRCONN";// Server-Connection Channel  
  30.     int iCCSID = 1208;  
  31. }  
  32.   
  33. class Sender implements Runnable, SomeConstants {  
  34.     public void run() {  
  35.         sendMessage();  
  36.     }  
  37.   
  38.     public void sendMessage() {  
  39.   
  40.         String name = Thread.currentThread().getName();  
  41.         System.out.println("进入线程" + name);  
  42.   
  43.         MQQueueManager qMgr = null;  
  44.         // configure connection parameters  
  45.   
  46.         MQEnvironment.hostname = strIP;  
  47.         // Server name or IP  
  48.         MQEnvironment.port = iPort;  
  49.         MQEnvironment.channel = strChl;  
  50.         MQEnvironment.CCSID = iCCSID;  
  51.   
  52.         // java程序连接mq的方式有两种,一是客户机方式,一是绑定方式,  
  53.         // 默认是客户机方式,当mq部署在本地的时候,就需要用绑定方式  
  54.         // 本机IP是10.24.28.139连接10.23.117.134的时候不需要下句  
  55.         //MQEnvironment.properties.put(MQC.TRANSPORT_PROPERTY,  
  56.         //MQC.TRANSPORT_MQSERIES_BINDINGS);  
  57.   
  58.         // Create a connection to the QueueManager  
  59.         System.out.println(name + " Connecting to queue manager: " + qManager);  
  60.         try {  
  61.             qMgr = new MQQueueManager(qManager);  
  62.             // Set up the options on the queue we wish to open  
  63.             int openOptions = MQC.MQMT_REQUEST | MQC.MQPMO_NEW_MSG_ID  
  64.                     | MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING  
  65.                     | MQC.MQOO_INPUT_AS_Q_DEF;  
  66.             // Now specify the queue that we wish to open and the open options  
  67.             System.out.println(name + " Accessing queue: " + qName);  
  68.             MQQueue queue = qMgr.accessQueue(qName, openOptions);  
  69.             // Define a simple WebSphere MQ Message ...  
  70.   
  71.             // Specify the default put message options  
  72.             MQPutMessageOptions pmo = new MQPutMessageOptions();  
  73.   
  74.             // Put the message to the queue  
  75.             System.out.println(name + " Sending a message...");  
  76.   
  77.             MQMessage msg = new MQMessage();  
  78.             msg.messageId = "MSGID".getBytes();  
  79.             msg.messageType = MQC.MQMT_REQUEST;  
  80.             msg.replyToQueueName = "ESBREQ";  
  81.   
  82.             // 在此测试一下 mq 的传输次列  
  83.             for (int j = 1; j < 5; j++) {  
  84.                 msg.messageSequenceNumber = j;  
  85.                 // write some text in UTF8 format  
  86.                 try {  
  87.                     String str = "Salemetsizbe Yerasel";  
  88.                     str = str + " " + j;  
  89.                     msg.writeUTF(str);  
  90.                     queue.put(msg, pmo);  
  91.                     msg.clearMessage();  
  92.                     System.out.println(name + " putting the message... " + j);  
  93.                 } catch (MQException mqe) {  
  94.                     mqe.printStackTrace();  
  95.                     break;  
  96.                 } catch (IOException e1) {  
  97.                     e1.printStackTrace();  
  98.                 }  
  99.             }  
  100.             qMgr.commit();  
  101.             System.out.println(name + " Done!");  
  102.             System.out.println("==========");  
  103.             System.out.println("");  
  104.         } catch (MQException e) {  
  105.             e.printStackTrace();  
  106.         }  
  107.     }  
  108. }  
  109.   
  110. class Receiver implements Runnable, SomeConstants {  
  111.   
  112.     public void run() {  
  113.         recvMessage();  
  114.     }  
  115.   
  116.     public void recvMessage() {  
  117.   
  118.         String name = Thread.currentThread().getName();  
  119.           
  120.         try {  
  121.             Thread.sleep(1000);  
  122.             MQQueueManager qMgr = null;  
  123.   
  124.               
  125.             System.out.println("进入线程" + name);  
  126.   
  127.             System.out.println(name + " Connecting to queue manager: "  
  128.                     + qManager);  
  129.             qMgr = new MQQueueManager(qManager);  
  130.             // 设置将要连接的队列属性  
  131.             // Note. The MQC interface defines all the constants used by the  
  132.             // WebSphere MQ Java programming interface  
  133.             // (except for completion code constants and error code constants).  
  134.             // MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the  
  135.             // queue-defined default.  
  136.             // MQOO_OUTPUT:Open the queue to put messages.  
  137.             int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT  
  138.                     | MQC.MQOO_INQUIRE;  
  139.   
  140.             // Now get the message back again. First define a WebSphere MQ  
  141.             // message to receive the data  
  142.             MQMessage rcvMessage = new MQMessage();  
  143.   
  144.             // Specify default get message options  
  145.             MQGetMessageOptions gmo = new MQGetMessageOptions();  
  146.             gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;// Get messages  
  147.                                                             // under sync point  
  148.                                                             // control(在同步点控制下获取消息)  
  149.             gmo.options = gmo.options + MQC.MQGMO_WAIT; // Wait if no messages  
  150.                                                         // on the  
  151.                                                         // Queue(如果在队列上没有消息则等待)  
  152.             gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if  
  153.                                                                     // Qeue  
  154.                                                                     // Manager  
  155.                                                                     // Quiescing(如果队列管理器停顿则失败)  
  156.             gmo.waitInterval = 1000; // Sets the time limit for the  
  157.                                         // wait.(设置等待的毫秒时间限制)  
  158.   
  159.             System.out.println(name + " Accessing queue: " + qName);  
  160.             MQQueue queue = qMgr.accessQueue(qName, openOptions);  
  161.             int depth = 0;  
  162.   
  163.             // Get the message off the queue.  
  164.             System.out.println("... " + name + " getting the message back again");  
  165.             for (;;) {  
  166.                 try {  
  167.                     queue.get(rcvMessage, gmo);  
  168.                     System.out.println(" ID: "  
  169.                             + (new String(rcvMessage.messageId)).trim()  
  170.                             + " Num: " + rcvMessage.messageSequenceNumber  
  171.                             + " Type: " + rcvMessage.messageType + " Flag: "  
  172.                             + rcvMessage.messageFlags);  
  173.                     // And display the message text...  
  174.                     String msgText = rcvMessage.readUTF();  
  175.                     System.out.println("The message is: " + msgText);  
  176.                     rcvMessage.clearMessage();  
  177.   
  178.                     // Break if no MSG left in queue  
  179.                     depth = queue.getCurrentDepth();  
  180.                     if (depth == 0)  
  181.                         break;  
  182.   
  183.                 } catch (MQException mqe) {  
  184.                     mqe.printStackTrace();  
  185.                     break;  
  186.                     // null;  
  187.                 } catch (IOException e) {  
  188.                     e.printStackTrace();  
  189.                 }  
  190.             }  
  191.             // Close the queue  
  192.             System.out.println(name + " Closing the queue");  
  193.             queue.close();  
  194.             // Disconnect from the QueueManager  
  195.             System.out.println(name + " Disconnecting from the Queue Manager");  
  196.             qMgr.disconnect();  
  197.             System.out.println(name + " Done!");  
  198.             System.out.println("==========");  
  199.             System.out.println("");  
  200.         } catch (MQException ex) {  
  201.             System.out  
  202.                     .println("A WebSphere MQ Error occured : Completion Code "  
  203.                             + ex.completionCode + " Reason Code "  
  204.                             + ex.reasonCode + ex.getMessage());  
  205.         } catch (InterruptedException e1) {  
  206.             e1.printStackTrace();  
  207.         }  
  208.     }  
  209. }  
  210.   
  211. public class MQTest {  
  212.   
  213.     public static void main(String args[]) {  
  214.   
  215.         /* 
  216.          * MQTest first = new MQTest(); first.sendMessage(); 
  217.          * first.recvMessage(); 
  218.          */  
  219.         Sender sender = new Sender();  
  220.         Thread senderThread = new Thread(sender);  
  221.         senderThread.start();  
  222.         senderThread.setName("Sender");  
  223.           
  224.         Thread senderThread2 = new Thread(sender);  
  225.         senderThread2.start();  
  226.         senderThread2.setName("Sender2");  
  227.           
  228.         Receiver recv = new Receiver();  
  229.         Thread recvThread = new Thread(recv);  
  230.         recvThread.start();  
  231.         recvThread.setName("Receiver");  
  232.   
  233.         // Receiver recv = new Receiver();  
  234.         // new Thread(recv).start();  
  235.   
  236.     }  
  237.   
  238. }  


运行结果如下:

进入线程Sender2
进入线程Sender
Sender2 Connecting to queue manager: XIR_QM_1502
Sender Connecting to queue manager: XIR_QM_1502
Sender2 Accessing queue: ESBREQ
Sender2 Sending a message...
Sender Accessing queue: ESBREQ
Sender Sending a message...
Sender2 putting the message... 1
Sender putting the message... 1
Sender2 putting the message... 2
Sender putting the message... 2
Sender2 putting the message... 3
Sender putting the message... 3
Sender2 putting the message... 4
Sender putting the message... 4
Sender2 Done!
==========


Sender Done!
==========


进入线程Receiver
Receiver Connecting to queue manager: XIR_QM_1502
Receiver Accessing queue: ESBREQ
... Receiver getting the message back again
 ID: MSGID Num: 1 Type: 1 Flag: 0
The message is: Salemetsizbe Yerasel 1
 ID: MSGID Num: 1 Type: 1 Flag: 0
The message is: Salemetsizbe Yerasel 1
 ID: MSGID Num: 1 Type: 1 Flag: 0
The message is: Salemetsizbe Yerasel 2
 ID: MSGID Num: 1 Type: 1 Flag: 0
The message is: Salemetsizbe Yerasel 2
 ID: MSGID Num: 1 Type: 1 Flag: 0
The message is: Salemetsizbe Yerasel 3
 ID: MSGID Num: 1 Type: 1 Flag: 0
The message is: Salemetsizbe Yerasel 3
 ID: MSGID Num: 1 Type: 1 Flag: 0
The message is: Salemetsizbe Yerasel 4
 ID: MSGID Num: 1 Type: 1 Flag: 0
The message is: Salemetsizbe Yerasel 4
Receiver Closing the queue
Receiver Disconnecting from the Queue Manager
Receiver Done!
==========