首页 > 代码库 > IBM MQ 学习
IBM MQ 学习
1 import java.io.IOException; 2 import java.util.HashMap; 3 import java.util.Map; 4 5 import com.ibm.mq.MQC; 6 import com.ibm.mq.MQEnvironment; 7 import com.ibm.mq.MQException; 8 import com.ibm.mq.MQGetMessageOptions; 9 import com.ibm.mq.MQMessage; 10 import com.ibm.mq.MQPutMessageOptions; 11 import com.ibm.mq.MQQueue; 12 import com.ibm.mq.MQQueueManager; 13 14 public class CLIENT_MQ{ 15 //定义队列管理器和队列的名称 16 private static final String qmName = "MQ_SERVICE";//MQ的队列管理器名称 ; 17 //private static final String qName = "MIDDLE_SEND_QUEUE"; //MQ远程队列的名称 18 private static MQQueueManager qMgr;//队列管理器 19 public static void init(){ 20 //设置环境: 21 //MQEnvironment中包含控制MQQueueManager对象中的环境的构成的静态变量,MQEnvironment的值的设定会在MQQueueManager的构造函数加载的时候起作用, 22 //因此必须在建立MQQueueManager对象之前设定MQEnvironment中的值. 23 MQEnvironment.hostname="10.172.12.156"; //MQ服务器的IP地址 24 MQEnvironment.channel="SERVICE_JAVA"; //通道类型:服务器连接 25 MQEnvironment.CCSID=1381;//437 //服务器MQ服务使用的编码1381代表GBK、1208代表UTF(Coded Character Set Identifier:CCSID) 26 MQEnvironment.port=1456; //MQ端口 27 try { 28 //定义并初始化队列管理器对象并连接 29 //MQQueueManager可以被多线程共享,但是从MQ获取信息的时候是同步的,任何时候只有一个线程可以和MQ通信。 30 qMgr = new MQQueueManager(qmName); 31 } catch (MQException e) { 32 // TODO Auto-generated catch block 33 System.out.println("初使化MQ出错"); 34 e.printStackTrace(); 35 } 36 } 37 /** 38 * 往MQ发送消息 39 * @param message 40 * @return 41 */ 42 public static Map<String,Object> sendMessage(Object message,String qName){ 43 Map<String,Object> map=new HashMap<String,Object>(); 44 try{ 45 //设置将要连接的队列属性 46 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface 47 //(except for completion code constants and error code constants). 48 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default. 49 //MQOO_OUTPUT:Open the queue to put messages. 50 /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/ 51 //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 52 /*以下选项可适合远程队列与本地队列*/ 53 //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用 54 //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用 55 int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; 56 //连接队列 57 //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues. 58 //The inquire and set capabilities are inherited from MQManagedObject. 59 /*关闭了就重新打开*/ 60 if(qMgr==null || !qMgr.isConnected()){ 61 qMgr = new MQQueueManager(qmName); 62 } 63 MQQueue queue = qMgr.accessQueue(qName, openOptions); 64 //定义一个简单的消息 65 MQMessage putMessage = new MQMessage(); 66 map.put("messageId",putMessage); 67 //String uuid=java.util.UUID.randomUUID().toString(); 68 //将数据放入消息缓冲区 69 putMessage.writeObject(message); 70 //设置写入消息的属性(默认属性) 71 MQPutMessageOptions pmo = new MQPutMessageOptions(); 72 73 //将消息写入队列 74 queue.put(putMessage,pmo); 75 map.put("message",message.toString()); 76 queue.close(); 77 }catch (MQException ex) { 78 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); 79 ex.printStackTrace(); 80 }catch (IOException ex) { 81 System.out.println("An error occurred whilst writing to the message buffer: " + ex); 82 }catch(Exception ex){ 83 ex.printStackTrace(); 84 }finally{ 85 try { 86 qMgr.disconnect(); 87 } catch (MQException e) { 88 e.printStackTrace(); 89 } 90 } 91 return map; 92 } 93 94 95 96 97 98 /** 99 * 处理完消息回放到MQ队列 100 * @param message 101 * @return 102 */ 103 public static Map<String,Object> sendReplyMessage(Object message,String qName,MQMessage mqMessage){ 104 Map<String,Object> map=new HashMap<String,Object>(); 105 try{ 106 //设置将要连接的队列属性 107 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface 108 //(except for completion code constants and error code constants). 109 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default. 110 //MQOO_OUTPUT:Open the queue to put messages. 111 /*目标为远程队列,所有这里不可以用MQOO_INPUT_AS_Q_DEF属性*/ 112 //int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 113 /*以下选项可适合远程队列与本地队列*/ 114 //int openOptions = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; //发送时使用 115 //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用 116 int openOptions = MQC.MQOO_OUTPUT | MQC.MQOO_FAIL_IF_QUIESCING; 117 //连接队列 118 //MQQueue provides inquire, set, put and get operations for WebSphere MQ queues. 119 //The inquire and set capabilities are inherited from MQManagedObject. 120 /*关闭了就重新打开*/ 121 if(qMgr==null || !qMgr.isConnected()){ 122 qMgr = new MQQueueManager(qmName); 123 } 124 MQQueue queue = qMgr.accessQueue(qName, openOptions); 125 //定义一个简单的消息 126 MQMessage putMessage = new MQMessage(); 127 putMessage.messageId=mqMessage.messageId; 128 map.put("messageId",putMessage); 129 //String uuid=java.util.UUID.randomUUID().toString(); 130 //将数据放入消息缓冲区 131 putMessage.writeObject(message); 132 //设置写入消息的属性(默认属性) 133 MQPutMessageOptions pmo = new MQPutMessageOptions(); 134 135 //将消息写入队列 136 queue.put(putMessage,pmo); 137 map.put("message",message.toString()); 138 queue.close(); 139 }catch (MQException ex) { 140 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); 141 ex.printStackTrace(); 142 }catch (IOException ex) { 143 System.out.println("An error occurred whilst writing to the message buffer: " + ex); 144 }catch(Exception ex){ 145 ex.printStackTrace(); 146 }finally{ 147 try { 148 qMgr.disconnect(); 149 } catch (MQException e) { 150 e.printStackTrace(); 151 } 152 } 153 return map; 154 } 155 156 157 158 159 /** 160 * 从队列中去获取消息,如果队列中没有消息,就会发生异常,不过没有关系,有TRY...CATCH,如果是第三方程序调用方法,如果无返回则说明无消息 161 * 第三方可以将该方法放于一个无限循环的while(true){...}之中,不需要设置等待,因为在该方法内部在没有消息的时候会自动等待。 162 * @return 163 */ 164 public static String getMessage(String qName,MQMessage mqMessage){ 165 String message=""; 166 try{ 167 //设置将要连接的队列属性 168 // Note. The MQC interface defines all the constants used by the WebSphere MQ Java programming interface 169 //(except for completion code constants and error code constants). 170 //MQOO_INPUT_AS_Q_DEF:Open the queue to get messages using the queue-defined default. 171 //MQOO_OUTPUT:Open the queue to put messages. 172 //int qOptioin = MQC.MQOO_INQUIRE | MQC.MQOO_OUTPUT; 发送时使用 173 //int qOptioin = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 接收时使用 174 175 int openOptions = MQC.MQOO_INPUT_AS_Q_DEF | MQC.MQOO_OUTPUT; 176 MQMessage retrieve = new MQMessage(); 177 //设置取出消息的属性(默认属性) 178 //Set the put message options.(设置放置消息选项) 179 MQGetMessageOptions gmo = new MQGetMessageOptions(); 180 181 gmo.options = gmo.options + MQC.MQGMO_SYNCPOINT;//Get messages under sync point control(在同步点控制下获取消息) 182 gmo.options = gmo.options + MQC.MQGMO_WAIT; // Wait if no messages on the Queue(如果在队列上没有消息则等待) 183 gmo.options = gmo.options + MQC.MQGMO_FAIL_IF_QUIESCING;// Fail if Qeue Manager Quiescing(如果队列管理器停顿则失败) 184 gmo.waitInterval = 3000 ; // Sets the time limit for the wait.(设置等待的毫秒时间限制) 185 /*关闭了就重新打开*/ 186 if(qMgr==null || !qMgr.isConnected()){ 187 qMgr = new MQQueueManager(qmName); 188 } 189 MQQueue queue = qMgr.accessQueue(qName, openOptions); 190 191 MQMessage retrievedMessage = new MQMessage(); 192 //从队列中取出对应messageId的消息 193 retrieve.messageId = mqMessage.messageId; 194 // 从队列中取出消息 195 queue.get(retrieve, gmo); 196 197 198 Object obj = retrieve.readObject(); 199 message=obj.toString();//解决中文乱码问题 200 /* 201 202 //int size = rcvMessage.getMessageLength(); 203 //byte[] p = new byte[size]; 204 //rcvMessage.readFully(p); 205 206 int len=retrieve.getDataLength(); 207 byte[] str = new byte[len]; 208 retrieve.readFully(str,0,len); 209 message = new String(str);//readUTF(); 210 */ 211 212 queue.close(); 213 }catch (MQException ex) { 214 int reason=ex.reasonCode; 215 if(reason==2033)//no messages 216 { 217 message="nomessage"; 218 }else{ 219 System.out.println("A WebSphere MQ error occurred : Completion code " + ex.completionCode + " Reason code " + ex.reasonCode); 220 } 221 }catch (IOException ex) { 222 System.out.println("An error occurred whilst writing to the message buffer: " + ex); 223 }catch(Exception ex){ 224 ex.printStackTrace(); 225 }finally{ 226 try { 227 qMgr.disconnect(); 228 } catch (MQException e) { 229 e.printStackTrace(); 230 } 231 } 232 return message; 233 } 234 235 236 237 public static void main(String args[]) { 238 init(); 239 Map<String,Object> map = new HashMap<String,Object>(); 240 map=sendMessage("{name: test get message id 123}","SERVICE_TRANSFER_QUEUE"); 241 MQMessage mqMessage = (MQMessage)map.get("messageId"); 242 outSys("传输消息:",mqMessage.messageId.toString()); 243 244 outSys("接收传输队列:",getMessage("SERVICE_TRANSFER_QUEUE",mqMessage)); 245 Map<String,Object> reply_map = new HashMap<String,Object>(); 246 reply_map=sendReplyMessage("{name: local queue 008}","SERVICE_RECEIVE_QUEUE",mqMessage); 247 outSys("放入正常队列:",reply_map.get("message").toString()); 248 249 outSys("接收正常队列:",getMessage("SERVICE_RECEIVE_QUEUE",mqMessage)); 250 251 252 } 253 254 255 256 public static void outSys(String display,String val){ 257 System.out.println(display+val); 258 } 259 260 }
IBM MQ 学习
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。