Welcome

首页 / 软件开发 / .NET编程技术 / 消息队列工具类(MSMQ)

消息队列工具类(MSMQ)2014-06-26 博客园 Aaron所要做的是简化msmq的调用代码以及做到可替代性,实现后,调用消息队列代码变为如下所示:

QueueService srv = QueueService.Instance();//检查存储DTO1的队列是否存在,如不存在则自动建立srv.Prepare<DTO1>();//发送类型为DTO1的消息srv.Send<DTO1>(new DTO1() {p1="1",p2="2" });//发送类型为DTO1的消息,并且将发送的消息Id保存到msgId变量中string msgId=srv.Send<DTO1>(new DTO1() { p1 = "1", p2 = "2" });//接收末尾消息DTO1 msg = srv.Receive<DTO1>();//接收末尾消息,并且将这个消息Id保存在msgId变量中DTO1 msg = srv.Receive<DTO1>(ref msgId);//发送回复消息,并且指定这个回复消息是特定消息ID所专有的回复消息srv.SendResponse<DTO1>(msg, msgId);//接收特定消息ID的回复消息msg=srv.ReceiveResponse<DTO1>(msgId);
主要的地方有2个:

msmq消息大小限制的突破(4M突破)

泛型T对象的序列化、反序列化

突破大小限制

如果大小在4M内,则直接msmq封装(MessageLocation=InQueue)

如果在4M外,则通过网络共享文件来封装(MessageLocation=InNetwork)

泛型T对象的序列化、反序列化

固定住所要传递的对象类型为MessageWrapper

在MessageWrapper内部嵌入用户想要传递的其他对象以及相应的type、module名,这样MessageWrapper就能进行自动xml化以及反xml化了

MessageWrapper代码如下:

public class MessageWrapper{private ShareFileBroker fileBroker;public MessageWrapper(){PersistenceType = MessageLocation.InQueue;fileBroker = new ShareFileBroker(FileService.FileService.Instance());}public string RealObjectType { get; set; }public string RealObjectModule { get; set; }public string RealObjectXml { get; set; }public string NetworkLocation { get; set; }public MessageLocation PersistenceType { get; set; }public void Inject<T>(T obj){this.RealObjectType = typeof(T).FullName;this.RealObjectModule = typeof(T).Module.Name;string xml = SerializeUtils.Serialize2XML(typeof(T), obj);SaveXML(xml);}public T Extract<T>(){Assembly assembly = AppDomain.CurrentDomain.Load(this.RealObjectModule.TrimEnd(".dll".ToCharArray()));Type type = assembly.GetType(this.RealObjectType);string xml = GetXML();return (T)SerializeUtils.DeserializeFromXML(type, xml);}private string GetXML(){string xml = "";if (this.PersistenceType == MessageLocation.InQueue)xml = this.RealObjectXml;else if (this.PersistenceType == MessageLocation.InNetwork)xml = fileBroker.GetContentAndDelete(this.NetworkLocation);return xml;}private void SaveXML(string xml){if (xml.Length > QueueConfiguration.QueueConfiguration.MaxQueueBodyLength){this.NetworkLocation = fileBroker.Save(xml);this.PersistenceType = MessageLocation.InNetwork;}else{this.RealObjectXml = xml;this.PersistenceType = MessageLocation.InQueue;}}}
代码比较简单,就不介绍了。

自省推动进步,视野决定未来。心怀远大理想。为了家庭幸福而努力。A2D科技,服务社会。A2D Framework(Alpha)1. Cache System(本地缓存与分布式缓存共存、支持Memcache和Redis、支持贴标签形式(类似Spring 3.x的Cache形式))

2. Event System(本地事件与分布式事件分发)

3. IoC(自动匹配功能,实例数量限制功能)

4. Sql Dispatcher System(支持ADO.NET及EF)

5. Session System(分布式Session系统)

6. 分布式Command Bus(MSMQ实现,解决4M限制,支持Session的读取)

7. 规则引擎