前言
众所周知,开发低耦合系统是软件开发的终极目标之一。低耦合的系统更加容易扩展,低耦合的模块更加容易复用,更易于维护和管理。我们知道,消息队列的主要功能就是收发消息,但是它的作用不仅仅只是解决应用之间的通信问题这么简单。消息队列作为常用的中间件,经常被用来对系统解耦,对模块解耦。增强系统的可扩展性和模块的可复用性。
除了对用于对系统、模块解耦,消息队列还有以下几种通途:
- 服务异步处理
 
- 流量控制
 
- 作为发布 / 订阅系统实现一个微服务级系统间的观察者模式
 
- 连接流计算任务和数据
 
- 用于将消息广播给大量接收者
 
事物的存在总会有对立的一面,引入消息队列可能会带来延迟问题、产生数据不一致的问题、增加系统复杂度的问题等等。
EDA 架构之生产者与消费者模式
事件驱动架构(Event Driven Architecture, EDA)
EDA 架构原理
事件驱动架构由事件发起者和事件使用者组成。事件的发起者检测或感知事件,并以消息的形式来表示事件。它并不知道事件的使用者或事件引起的结果。
检测到事件后,系统会通过事件通道从事件发起者传输给事件使用者,而事件处理平台则会在该通道中以异步方式处理事件。事件发生时,需要通知事件使用者。他们可能会处理事件,也可能只是受事件的影响。
事件处理平台将对事件做出正确响应,并将活动下发给相应的事件使用者。通过这种下发活动,我们就可以看到事件的结果。
检测到事件后,系统会通过事件通道从事件发起者传输给事件使用者,而事件处理平台则会在该通道中以异步方式处理事件。事件发生时,需要通知事件使用者。他们可能会处理事件,也可能只是受事件的影响。
事件处理平台将对事件做出正确响应,并将活动下发给相应的事件使用者。通过这种下发活动,我们就可以看到事件的结果。
生产者-消费者模型
操作系统中常见的 EDA 架构就是生产者-消费者模型。消息队列常用来作为生产者和消费者之间的缓冲带,平衡生产者和消费者的处理能同时对服务进行解耦。有了这层缓冲带,生产者和消费者可能都不知道对方的存在。

以下为生产者-消费者模型的简单实现,(内存消息队列)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
   | import time
  from queue import Queue from random import randint from threading import Thread
  class Producer(Thread):     def __init__(self, queue):         super().__init__()         self.queue = queue
      def run(self):         while True:             productA = randint(0, 10)             productB = randint(90, 100)             print('Produce A「number」: {}, Produce B「number」: {}'.format(productA, productB))             self.queue.put((productA, productB))             time.sleep(2)
  class Consumer(Thread):     def __init__(self, queue):         super().__init__()         self.queue = queue
      def run(self):         while True:                          products_tuple = self.queue.get(block=True)             print(f'Consume products: {products_tuple[0]} & {products_tuple[1]}')             time.sleep(randint(0, 10))
  def main():     queue = Queue()     producer = Producer(queue)     consumer = Consumer(queue)
      producer.start()     consumer.start()
  main() """ Produce A「number」: 8, Produce B「number」: 95 Consume products: 8 & 95 Produce A「number」: 4, Produce B「number」: 92 Consume products: 4 & 92 Produce A「number」: 9, Produce B「number」: 90 ... """
   | 
 
基于ZeroMQ PubSub模式的观察者模式实例


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
   |  import time import zmq
  def publisher1():     context = zmq.Context()     socket = context.socket(zmq.PUB)     socket.bind("tcp://*:5555")
      while True:         count = 99
          while True:             time.sleep(1)             socket.send_string('publisher1 pushes event %d' % count)             print('push event %d' % count)             count += 1
  if __name__ == "__main__":     publisher1()
 
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
   | 
  import time import zmq
  def publisher2():     context = zmq.Context()     socket = context.socket(zmq.PUB)     socket.bind("tcp://*:5556")
      while True:         count = 1
          while True:             time.sleep(1)             socket.send_string('publisher2 pushes event %d' % count)             print('push event %d' % count)             count += 1
  if __name__ == "__main__":     publisher2()
 
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   | 
  import zmq
  def subscriber1():     context = zmq.Context()     socket = context.socket(zmq.SUB)     socket.connect('tcp://127.0.0.1:5555')     socket.connect('tcp://127.0.0.1:5556')     socket.setsockopt_string(zmq.SUBSCRIBE, '')
      while True:         message = socket.recv()         print('message: %s' % message)
  if __name__ == "__main__":     subscriber1()
 
  | 
 
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
   | 
  import zmq
  def subscriber2():     context = zmq.Context()     socket = context.socket(zmq.SUB)     socket.connect('tcp://127.0.0.1:5555')     socket.connect('tcp://127.0.0.1:5556')     socket.setsockopt_string(zmq.SUBSCRIBE, '')
      while True:         message = socket.recv()         print('message: %s' % message)
  if __name__ == "__main__":     subscriber2()
 
  | 
 
秒杀系统的架构设计与消息队列
某秒杀系统的主要处理步骤如下:
- 风险控制
 
- 库存锁定
 
- 生成订单
 
- 短信通知
 
- 更新统计数据
 
使用消息队列进行异步处理
由于秒杀成功的关键取决于风险控制、库存锁定这两步骤,所以 server 端处理了这两步之后可以给 client 端返回结果了,后续的步骤可放入消息队列中异步执行。不一定要在秒杀请求中完成。集中资源处理关键步骤(同步),碎片时间(全部秒杀请求处理结束)处理次要步骤(异步)。

使用消息队列进行流量控制(削峰)
秒杀开始后,将超过 server 端处理上限(短时间内)的秒杀请求放入消息队列中,后续有能力处理时再对消息队列中消费请求进行处理。对于超时的请求可以直接丢弃(秒杀失败)。

参考