首页 > 代码库 > IBM MQ 学习

IBM MQ 学习

  1 import java.io.IOException;  
  2 import java.util.HashMap;  
  3 import java.util.Map;  
  4   
  5 import com.ibm.mq.MQC;  
  6 import com.ibm.mq.MQEnvironment;  
  7 import com.ibm.mq.MQException;  
  8 import com.ibm.mq.MQGetMessageOptions;  
  9 import com.ibm.mq.MQMessage;  
 10 import com.ibm.mq.MQPutMessageOptions;  
 11 import com.ibm.mq.MQQueue;  
 12 import com.ibm.mq.MQQueueManager;  
 13     
 14 public class CLIENT_MQ{    
 15      //定义队列管理器和队列的名称    
 16      private static final String qmName = "MQ_SERVICE";//MQ的队列管理器名称 ;     
 17      //private static final String qName = "MIDDLE_SEND_QUEUE"; //MQ远程队列的名称    
 18      private static MQQueueManager qMgr;//队列管理器  
 19      public  static void init(){    
 20          //设置环境:    
 21          //MQEnvironment中包含控制MQQueueManager对象中的环境的构成的静态变量,MQEnvironment的值的设定会在MQQueueManager的构造函数加载的时候起作用,    
 22          //因此必须在建立MQQueueManager对象之前设定MQEnvironment中的值.    
 23          MQEnvironment.hostname="10.172.12.156";          //MQ服务器的IP地址          
 24          MQEnvironment.channel="SERVICE_JAVA";           //通道类型:服务器连接  
 25          MQEnvironment.CCSID=1381;//437                    //服务器MQ服务使用的编码1381代表GBK、1208代表UTF(Coded Character Set Identifier:CCSID)    
 26          MQEnvironment.port=1456;                       //MQ端口    
 27          try {    
 28              //定义并初始化队列管理器对象并连接     
 29              //MQQueueManager可以被多线程共享,但是从MQ获取信息的时候是同步的,任何时候只有一个线程可以和MQ通信。    
 30             qMgr = new MQQueueManager(qmName);    
 31         } catch (MQException e) {    
 32             // TODO Auto-generated catch block    
 33             System.out.println("初使化MQ出错");    
 34             e.printStackTrace();    
 35         }     
 36      }    
 37      /**  
 38       * 往MQ发送消息  
 39       * @param message  
 40       * @return  
 41       */    
 42      public static Map<String,Object> sendMessage(Object message,String qName){      
 43          Map<String,Object> map=new HashMap<String,Object>();  
 44          try{       
 45              //设置将要连接的队列属性    
 46              // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface     
 47              //(except for completion code constants and error code constants).    
 48              //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.    
 49              //MQOO_OUTPUT:Open the queue to put messages.    
 50              /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/    
 51              //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;    
 52              /*以下选项可适合远程队列与本地队列*/    
 53              //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用  
 54              //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用  
 55              int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;    
 56              //连接队列     
 57              //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.     
 58              //The inquire and set capabilities are inherited from MQManagedObject.     
 59              /*关闭了就重新打开*/    
 60             if(qMgr==null || !qMgr.isConnected()){    
 61                 qMgr = new MQQueueManager(qmName);    
 62             }    
 63              MQQueue queue = qMgr.accessQueue(qName, openOptions);              
 64              //定义一个简单的消息    
 65              MQMessage putMessage = new MQMessage();  
 66              map.put("messageId",putMessage);  
 67              //String uuid=java.util.UUID.randomUUID().toString();  
 68              //将数据放入消息缓冲区    
 69              putMessage.writeObject(message);      
 70              //设置写入消息的属性(默认属性)    
 71              MQPutMessageOptions pmo = new MQPutMessageOptions();   
 72               
 73              //将消息写入队列     
 74              queue.put(putMessage,pmo);   
 75              map.put("message",message.toString());  
 76              queue.close();    
 77          }catch (MQException ex) {     
 78              System.out.println("A WebSphere MQ error occurred : Completion code "   + ex.completionCode + " Reason code " + ex.reasonCode);     
 79              ex.printStackTrace();    
 80          }catch (IOException ex) {     
 81              System.out.println("An error occurred whilst writing to the message buffer: " + ex);     
 82          }catch(Exception ex){    
 83              ex.printStackTrace();    
 84          }finally{    
 85              try {    
 86                 qMgr.disconnect();    
 87             } catch (MQException e) {    
 88                 e.printStackTrace();    
 89             }    
 90           }    
 91          return map;    
 92      }    
 93        
 94        
 95         
 96   
 97        
 98      /**  
 99       * 处理完消息回放到MQ队列  
100       * @param message  
101       * @return  
102       */    
103      public static Map<String,Object> sendReplyMessage(Object message,String qName,MQMessage mqMessage){      
104          Map<String,Object> map=new HashMap<String,Object>();  
105          try{       
106              //设置将要连接的队列属性    
107              // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface     
108              //(except for completion code constants and error code constants).    
109              //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.    
110              //MQOO_OUTPUT:Open the queue to put messages.    
111              /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/    
112              //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;    
113              /*以下选项可适合远程队列与本地队列*/    
114              //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用  
115              //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用  
116              int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING;    
117              //连接队列     
118              //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues.     
119              //The inquire and set capabilities are inherited from MQManagedObject.     
120              /*关闭了就重新打开*/    
121             if(qMgr==null || !qMgr.isConnected()){    
122                 qMgr = new MQQueueManager(qmName);    
123             }    
124              MQQueue queue = qMgr.accessQueue(qName, openOptions);              
125              //定义一个简单的消息    
126              MQMessage putMessage = new MQMessage();  
127              putMessage.messageId=mqMessage.messageId;  
128              map.put("messageId",putMessage);  
129              //String uuid=java.util.UUID.randomUUID().toString();  
130              //将数据放入消息缓冲区    
131              putMessage.writeObject(message);      
132              //设置写入消息的属性(默认属性)    
133              MQPutMessageOptions pmo = new MQPutMessageOptions();   
134               
135              //将消息写入队列     
136              queue.put(putMessage,pmo);   
137              map.put("message",message.toString());  
138              queue.close();    
139          }catch (MQException ex) {     
140              System.out.println("A WebSphere MQ error occurred : Completion code "   + ex.completionCode + " Reason code " + ex.reasonCode);     
141              ex.printStackTrace();    
142          }catch (IOException ex) {     
143              System.out.println("An error occurred whilst writing to the message buffer: " + ex);     
144          }catch(Exception ex){    
145              ex.printStackTrace();    
146          }finally{    
147              try {    
148                 qMgr.disconnect();    
149             } catch (MQException e) {    
150                 e.printStackTrace();    
151             }    
152           }    
153          return map;    
154      }    
155        
156        
157        
158        
159      /**  
160       * 从队列中去获取消息,如果队列中没有消息,就会发生异常,不过没有关系,有TRY...CATCH,如果是第三方程序调用方法,如果无返回则说明无消息  
161       * 第三方可以将该方法放于一个无限循环的while(true){...}之中,不需要设置等待,因为在该方法内部在没有消息的时候会自动等待。  
162       * @return  
163       */    
164      public static String getMessage(String qName,MQMessage mqMessage){    
165          String message="";    
166          try{                
167              //设置将要连接的队列属性    
168              // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface     
169              //(except for completion code constants and error code constants).    
170              //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default.    
171              //MQOO_OUTPUT:Open the queue to put messages.    
172              //int qOptioin = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; 发送时使用  
173              //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用  
174                
175              int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT;    
176              MQMessage retrieve = new MQMessage();    
177              //设置取出消息的属性(默认属性)    
178              //Set the put message options.(设置放置消息选项)     
179              MQGetMessageOptions gmo = new MQGetMessageOptions();     
180                
181              gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步点控制下获取消息)     
182              gmo.options = gmo.options + MQC.MQGMO_WAIT;  // Wait if no messages on the Queue(如果在队列上没有消息则等待)     
183              gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败)     
184              gmo.waitInterval = 3000 ;  // Sets the time limit for the wait.(设置等待的毫秒时间限制)     
185              /*关闭了就重新打开*/    
186             if(qMgr==null || !qMgr.isConnected()){    
187                 qMgr = new MQQueueManager(qmName);    
188             }    
189              MQQueue queue = qMgr.accessQueue(qName, openOptions);    
190                
191              MQMessage retrievedMessage = new MQMessage();  
192              //从队列中取出对应messageId的消息  
193              retrieve.messageId = mqMessage.messageId;   
194              // 从队列中取出消息  
195              queue.get(retrieve, gmo);    
196                
197               
198              Object obj = retrieve.readObject();  
199              message=obj.toString();//解决中文乱码问题  
200              /* 
201       
202              //int size = rcvMessage.getMessageLength(); 
203              //byte[] p = new byte[size]; 
204              //rcvMessage.readFully(p); 
205               
206              int len=retrieve.getDataLength(); 
207              byte[] str = new byte[len]; 
208               retrieve.readFully(str,0,len); 
209               message = new String(str);//readUTF();     
210              */  
211              
212              queue.close();    
213          }catch (MQException ex) {  
214              int reason=ex.reasonCode;  
215              if(reason==2033)//no messages  
216              {  
217                 message="nomessage";  
218              }else{  
219                 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode);     
220              }  
221          }catch (IOException ex) {     
222              System.out.println("An error occurred whilst writing to the message buffer: " + ex);     
223          }catch(Exception ex){    
224              ex.printStackTrace();    
225          }finally{    
226             try {    
227                 qMgr.disconnect();    
228             } catch (MQException e) {    
229                 e.printStackTrace();    
230             }    
231          }    
232          return message;    
233      }    
234   
235   
236        
237      public static void main(String args[]) {    
238          init();  
239          Map<String,Object> map = new HashMap<String,Object>();  
240          map=sendMessage("{name: test get message id 123}","SERVICE_TRANSFER_QUEUE");  
241          MQMessage mqMessage = (MQMessage)map.get("messageId");  
242          outSys("传输消息:",mqMessage.messageId.toString());    
243            
244          outSys("接收传输队列:",getMessage("SERVICE_TRANSFER_QUEUE",mqMessage));  
245          Map<String,Object> reply_map = new HashMap<String,Object>();  
246          reply_map=sendReplyMessage("{name: local queue 008}","SERVICE_RECEIVE_QUEUE",mqMessage);  
247          outSys("放入正常队列:",reply_map.get("message").toString());  
248            
249          outSys("接收正常队列:",getMessage("SERVICE_RECEIVE_QUEUE",mqMessage));  
250            
251            
252      }  
253        
254        
255        
256      public static void outSys(String display,String val){  
257          System.out.println(display+val);  
258      }  
259        
260 }   

 

IBM MQ 学习