Welcome

首页 / 软件开发 / C# / 分布式EventBus的Socket实现:发布订阅

分布式EventBus的Socket实现:发布订阅2014-04-08 cnblogs Aaron在这篇文章中,EventBus实现 - 发布订阅 - XML加载 所适用的范围只是本机的事件传播,要是牵 涉到多台服务器之间的事件传播就不行了,解决办法有用msmq解决的,Node.js解决的,也有用redis的 发布订阅解决的,这次用C# socket来实现,能实现立刻推送事件到所有订阅了相关event的server上。

这次的子系统适用的场景如下:

主要分2个部分:各个server使用的Event Bus Broker以及Event Bus Server。

Broker与Server之间的通信协议就3个:ME、Subscribe、Publish。分别代表:我的名字是、我要订 阅的事件是、我触发事件。

Event Bus Server是基于SuperSocket开源组件写的,非常方便。实现了3个对应的命令类(建议大 家先看看SuperSocket的文档),如下:

public class ME : CommandBase<EventDispatcherBusSession, StringRequestInfo>{public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo){session.AssociatedServerIdentity = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);Console.WriteLine(string.Format("[{0}]: connected.", session.AssociatedServerIdentity));}}public class Publish : CommandBase<EventDispatcherBusSession, StringRequestInfo>{public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo){string evtClassPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);string evtXml = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[1]);foreach(EventDispatcherBusSession s in session.AppServer.GetAllSessions()){if (s.AssociatedServerIdentity == session.AssociatedServerIdentity)continue;if (s.SubscribedEventClasses.Contains(evtClassPath)){s.Send("Publish " + requestInfo.Body);//forward onlyConsole.WriteLine(string.Format("Forwarding publish command from {1} to server: [{0}]", s.AssociatedServerIdentity, session.AssociatedServerIdentity));}}}}public class Subscribe : CommandBase<EventDispatcherBusSession, StringRequestInfo>{public override void ExecuteCommand(EventDispatcherBusSession session, StringRequestInfo requestInfo){string classPath = EventBase.Helper.RestoreSpecialCharacters(requestInfo.Parameters[0]);if (session.SubscribedEventClasses.Contains(classPath))return;session.SubscribedEventClasses.Add(classPath);string msg = "";msg+=string.Format("[{0}], now subscribed these events:
", session.AssociatedServerIdentity);foreach(string evtClass in session.SubscribedEventClasses)msg += string.Format("{0}
", evtClass);Console.WriteLine(msg);}}