Welcome

首页 / 软件开发 / 数据结构与算法 / enode框架入门:消息的重试机制的设计思路

enode框架入门:消息的重试机制的设计思路2013-11-26 博客园 netfocus上一篇文章,简单介绍了enode框架中消息队列的设计思路,本文介绍一下enode框架中关系消息的重试机 制的设计思路。

对于一个EDA架构为基础的框架,核心就是消息驱动,然后基于最终一致性的原则。所 以,非常重要的一点是,如果消息一次执行不成功,那该怎么办?我能想到的对策就是消息的重试。我发现, 这篇文章比较难写,因为感觉要把复杂的事情清晰的表达出来,感觉确实不容易。说到重试,那什么是消息的 重试呢?怎么重试呢?我这里提到的重试是指,一个消息,从消息队列取出来后,要处理,但是处理失败了, 然后要重新尝试再处理该消息;怎么重试?这个问题比较复杂,不能用简单的一两句话来说明。

上面 说到,如果消息处理失败要再重试,其实是一个比较粗的回答。因为比如一个消息在处理的时候总共有5个步 骤,如果前2步都成功,但是第3步失败了,那重试的时候,前2步还需要再执行吗?我的想法是,在能办到的 情况下,就不要再做前2步操作了,而是直接从第3步开始重试。所以说,这种做法相当于是“哪里跌倒,哪里 继续”;

那么怎么重试呢?

经过分析,我们发现整个enode框架中需要重试的点是非常多的, 比如command产生的event要发送到队列时,如果失败那需要重试;比如event持久化时失败了,也需要重试, 等等。所以,显而易见,我们应该设计一个可以被重用的重试服务,提供对某些特定的重试场景的支持。

我们先来想一下,我们希望有什么样的重试功能。以“event持久化时失败”为例,如果这一步失败, 我们希望立马对这个步骤重试几次,比如3次,如果3次内成功了,那就成功了,继续往下做下面的逻辑;如果 还是失败了呢?我们难道就放弃了吗?实际上,我们不能放弃,因为一般如果事件持久化失败很有可能是由于 网络问题或eventstore有什么问题,而且如果我们就这样放弃了,那很可能整个业务逻辑的流程就被中断了, 这样就无法做到数据的最终一致性了。所以,因为这种暂时的IO问题导致的失败,我们不能随便就放弃重试, 应该在尝试几次重试仍失败时采取必要的手段,可以在IO恢复时,能自动再处理该消息;但是我们又不能使用 当前线程无限制的重试下去,因为这样就导致没办法处理其他的消息了;所以我们自然就能想到:我们应该在 消息重试几次仍失败时,将该消息放入一个专门的重试队列,然后有另外一个独立的线程会定时从该队列取出 要重试的消息,然后重试这些消息;这样,当IO恢复时,这些消息就能很快被成功处理了;

另外一个 问题,那这种专门的重试队列需要支持消息持久化吗?不用,我们只需要内存队列就行了,因为当一个消息还 没被完全成功处理前,是不会从message store删除的;所以,就算机器重启了,该消息还是能在该机器重启 后被处理的;而当该机器没重启时,该专门重试的内存队列会不断地以独立的线程定时重试该消息;

那这种专门的重试队列需要多少个呢?理论上我们可以为每个需要重试的点都设计一个重试队列来支持,但是 这样一方面过于复杂,而且线程多了还会影响系统的性能;所以我们需要权衡一下,只对同一个阶段中要做的 所有的事情设计一个重试队列,该阶段中这些要做的事情中有任何一步失败,就都放到该阶段对应的重试队列 里;

还有一个问题,如果一个消息在某一次重试时成功了,但是我们希望在成功后继续对该消息做后 续的步骤,该如何实现呢?这个问题初想想感觉比较麻烦,因为我们可能已经没有了该消息的一些上下文环境 。最重要的是,我们如何知道该消息重试成功后接下来该做什么呢?而且就算知道接下来要做什么了,但是要 是我们在做这个下一步的步骤时,要是又失败了呢?是不是也要重试呢?所以,我们发现这里很关键。

经过我的一些思考,我发现,如果一个消息在某个阶段要被处理多个步骤,且有些步骤之间有条件依 赖,比如只有在第2步处理的结果是成功时,我们才有必要做后面的3步;正常情况,如果一切顺利,那就是一 步步从上往下的去做;但是因为考虑到任何一步可能都会出问题,而且我们希望在任何一步失败然后重试成功 后,能继续后续的步骤。所以,基于这些特征,我觉得我们可以设计一种类似回调函数的机制,当某个逻辑执 行成功后,执行回调函数,我们可以在回调函数中存放接下来要做的逻辑;显然,我觉得我们需要某种递归的 数据结构;为了支持上面这种类似回调函数的需求,我设计了如下的一个数据结构:

/// <summary>一个数据结构,封装了一段要执行的逻辑以及一些相关的上下文信息/// </summary>public class ActionInfo{/// <summary>表示某个Action的名字/// </summary>public string Name { get; private set; }/// <summary>表示某个Action,封装了一段逻辑/// </summary>public Func<object, bool> Action { get; private set; }/// <summary>表示Action执行时所需要的数据信息/// </summary>public object Data { get; private set; }/// <summary>表示Action执行成功后,要执行的下一个Action的信息,这里体现出递归/// </summary>public ActionInfo Next { get; private set; }public ActionInfo(string name, Func<object, bool> action, object data, ActionInfo next){if (action == null){throw new ArgumentNullException("action");}Name = name;Action = action;Data = data;Next = next;}}