首页 > 代码库 > MSMQ发送和接收
MSMQ发送和接收
?
using System; using System.IO; using System.Messaging; namespace YuRen.PaiWei.BaLianGui { /// <summary> /// MQ文件发送 /// 所有异常都将抛出请做好异常处理 /// </summary> public static class MSMQModel { /// <summary> /// 向队列中发送对象 /// </summary> /// <param name="FilePath">文件路径</param> /// <param name="MQPath">队列地址</param> /// <returns>bool</returns> public static bool Send( string FilePath, string MQPath) { try { MQModel mm = new MQModel(MQPath); Message message = new Message(); message.Formatter = new BinaryMessageFormatter(); message.Label = Path.GetFileName(FilePath); byte [] streams = FileChange.ConvertToBinary(FilePath); if (streams != null ) { message.Body=streams; mm.Inqueue.Send(message); mm.Inqueue.Dispose(); return true ; } } catch (Exception ex) { throw ex; } return false ; } /// <summary> /// 向队列中取出对象并删除对象,接收只能接收本地的队列信息,如要接收远程队列的信息请先获取该服务器的权限否则将报错。 /// 成功返回文件路径否则返回NULL /// </summary> /// <param name="FilePath">对象保存路径</param> /// <param name="MQPath">队列地址</param> /// <returns>string文件存放路径</returns> public static string Receive( string FilePath, string MQPath) { try { MQModel mm = new MQModel(MQPath); Message message = new Message(); message = mm.Inqueue.Receive(); mm.Inqueue.Dispose(); if (message != null ) { if (FileChange.BinaryToConver(FilePath + @"\" + message.Label, ( byte [])message.Body)) { return FilePath + @"\" + message.Label; } } } catch (Exception ex) { throw ex; } return null ; } /// <summary> /// 创建消息队列(创建前请先确定队列不存在,不支持创建远程队列) /// </summary> /// <param name="transactional">是否启用事务</param> /// <param name="MqPath">MQ队列地址</param> /// <returns></returns> public static void Create( bool transactional, string MqPath) { try { MQModel mm = new MQModel(transactional, MqPath); } catch (Exception ex) { throw ex; } } } /// <summary> /// 二进制流和文件之间相互转化 /// </summary> class FileChange { /// <summary> /// 将文件转换成二进制 /// </summary> /// <param name="Path">文件路径</param> /// <returns>byte[]</returns> public static byte [] ConvertToBinary( string Path) { FileStream stream = new FileInfo(Path).OpenRead(); byte [] buffer = new byte [stream.Length]; try { stream.Read(buffer, 0, Convert.ToInt32(stream.Length)); return buffer; } catch (Exception ex) { throw ex; } finally { stream.Close(); } } /// <summary> /// 将二进制转换成文件 /// </summary> /// <param name="FilePath">文件保存路径</param> /// <param name="br">二进制流</param> public static bool BinaryToConver( string FilePath, byte [] br) { FileStream fstream = File.Create(FilePath, br.Length); try { fstream.Write(br, 0, br.Length); //二进制转换成文件 return true ; } catch (Exception ex) { throw ex; } finally { fstream.Close(); } } } /// <summary> /// 消息队列管理器 /// </summary> class MQModel : IDisposable { //队列 private MessageQueue MSMQqueue = null ; //队列链接字符串 private string path; /// <summary> /// 实例化消息队列(队列已存在) /// </summary> /// <param name="MqPath">链接队列字符串</param> public MQModel( string MqPath) { path = MqPath; try { MSMQqueue = new MessageQueue(path); } catch (Exception ex) { throw ex; } } /// <summary> /// 实例化消息队列(队列不存在,先创建后实例化) /// </summary> /// <param name="transactional">是否启动事务</param> /// <param name="MqPath">链接队列字符串</param> public MQModel( bool transactional, string MqPath) { path = MqPath; try { Create(transactional); } catch (Exception ex) { throw ex; } } /// <summary> /// 获取消息队列实例 /// </summary> public MQModel Inqueue { get { return this ; } } /// <summary> /// 创建队列MessageQueue.Create不支持创建远程队列 /// </summary> /// <param name="transactional">是否启用事务</param> /// <returns>bool</returns> public bool Create( bool transactional) { try { if (MessageQueue.Create(path, transactional) != null ) { return true ; } else { return false ; } } catch (Exception ex) { throw ex; } } /// <summary> /// 发送消息队列 /// </summary> /// <param name="message">Message消息</param> /// <returns></returns> public void Send(Message message) { try { MSMQqueue.Send(message); //MSMQqueue.Send(message, MessageQueueTransactionType.Single);//启用事务 } catch (Exception ex) { throw ex; } } /// <summary> /// 接收消息队列信息并删除 /// </summary> /// <returns>Message</returns> public Message Receive() { try { MSMQqueue.Formatter = new BinaryMessageFormatter(); Message message = new Message(); message = MSMQqueue.Receive(TimeSpan.FromMilliseconds(1)); //message = MSMQqueue.Receive(TimeSpan.FromMilliseconds(10), MessageQueueTransactionType.Single);//启用事务 return message; } catch (Exception ex) //队列中无信息时报错 { if (ex.Message != "请求操作的超时时间已到。" ) { throw ex; } } return null ; } /// <summary> /// 清空队列实例 /// </summary> public void Dispose() { if (MSMQqueue != null ) { MSMQqueue.Close(); MSMQqueue.Dispose(); MSMQqueue = null ; } } } } |
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。