文章目录- 1. 为什么做订阅分布?
- 2. Redis中的订阅发布
- 3. Redis生产者消费者
- 4. Redis中订阅发布
- 5. Java Jedis踩过的坑
为什么做订阅分布?
随着业务复杂, 业务的项目依赖关系增强, 使用消息队列帮助系统
降低耦合度.
- 订阅分布本身也是一种生产者消费者模式, 订阅者是消费者, 发布者是生产者.
- 订阅发布模式, 发布者发布消息后, 只要有订阅方, 则多个订阅方会收到同样的消息
- 生产者消费者模式, 生产者往队列里放入消息, 由多个消费者对一条消息进行抢占.
- 订阅分布模式可以将一些不着急完成的工作放到其他进程或者线程中进行离线处理.
Redis中的订阅发布
Redis中的订阅发布模式, 当没有订阅者时, 消息会被直接丢弃(Redis不会持久化保存消息)
Redis生产者消费者
生产者使用Redis中的list数据结构进行实现, 将待处理的消息塞入到消息队列中.
| 1234567891011 | class Producer(object): def __init__(self, host="localhost", port=6379):self._conn = redis.StrictRedis(host=host, port=port)self.key = "test_key"self.value = "test_value_{id}" def produce(self):for id in xrange(5):msg = self.value.format(id=id)self._conn.lpush(self.key, msg) |
消费者使用
redis中brpop进行实现, brpop会从list头部消息, 并能够设置超时等待时间.
| 12345678910111213141516171819202122232425 | class Consumer(object): def __init__(self, host="localhost", port=6379):self._conn = redis.StrictRedis(host=host, port=port)self.key = "test_key" def consume(self, timeout=0):# timeout=0 表示会无线阻塞, 直到获得消息while True:msg = self._conn.brpop(self.key, timeout=timeout)process(msg) def process(msg):print msg if __name__ == "__main__":consumer = Consumer()consumer.consume()# 输出结果("test_key", "test_value_1")("test_key", "test_value_2")("test_key", "test_value_3")("test_key", "test_value_4")("test_key", "test_value_5") |
Redis中订阅发布
在Redis Pubsub中, 一个频道(channel)相当于一个消息队列
| 1234567891011 | class Publisher(object): def __init__(self, host, port):self._conn = redis.StrictRedis(host=host, port=port)self.channel = "test_channel"self.value = "test_value_{id}" def pub(self):for id in xrange(5):msg = self.value.format(id=id)self._conn.publish(self.channel, msg) |
其中
get_message使用了
select IO多路复用来检查socket连接是否是否可读.
| 1234567891011121314151617181920212223 | class Subscriber(object): def __init__(self, host="localhost", port=6379):self._conn = redis.StrictRedis(host=host, port=port)self._pubsub = self._conn.pubsub() # 生成pubsub对象self.channel = "test_channel"self._pubsub.subscribe(self.channel) def sub(self):while True:msg = self._pubsub.get_message()if msg and isinstance(msg.get("data"), basestring):process(msg.get("data")) def close(self):self._pubsub.close() # 输出结果test_value_1test_value_2test_value_3test_value_4test_value_5 |
Java Jedis踩过的坑
在Jedis中订阅方处理是采用同步的方式, 看源码中PubSub模块的process函数在
do-while循环中, 会等到当前消息处理完毕才能够处理下一条消息, 这样会导致当入队列消息量过大的时候, redis链接被强制关闭.解决方案: 将整个处理函数改为异步的方式.
下面关于Redis的文章您也可能喜欢,不妨参考下:Ubuntu 14.04下Redis安装及简单???试 http://www.linuxidc.com/Linux/2014-05/101544.htmRedis主从复制基本配置 http://www.linuxidc.com/Linux/2015-03/115610.htmRedis集群明细文档 http://www.linuxidc.com/Linux/2013-09/90118.htmUbuntu 12.10下安装Redis(图文详解)+ Jedis连接Redis http://www.linuxidc.com/Linux/2013-06/85816.htmRedis系列-安装部署维护篇 http://www.linuxidc.com/Linux/2012-12/75627.htmCentOS 6.3安装Redis http://www.linuxidc.com/Linux/2012-12/75314.htmRedis安装部署学习笔记 http://www.linuxidc.com/Linux/2014-07/104306.htmRedis配置文件redis.conf 详解 http://www.linuxidc.com/Linux/2013-11/92524.htm
Redis 的详细介绍:请点这里
Redis 的下载地址:请点这里
本文永久更新链接地址