rabbitmq常用

rabbitmq

#personal/技术储备/rabbitmq

RabbitMQ是一个消息代理 - 一个消息系统的媒介。它可以为你的应用提供一个通用的消息发送和接收平台,并且保证消息在传输过程中的安全。
RabbitMQ中文文档 · RabbitMQ in Chinese

持久化

  • 队列持久化

    1
    channel.queue_declare(queue='hello', durable=True)
  • 消息持久化

    1
    delivery_mode = 2

以上代码只是告诉了RabbitMq要把消息存到硬盘,但从RabbitMq收到消息到保存之间还是有一个很小的间隔时间。因为RabbitMq并不是所有的消息都使用fsync(2)——它有可能只是保存到缓存中,并不一定会写到硬盘中。并不能保证真正的持久化,但已经足够应付我们的简单工作队列。如果你一定要保证持久化,你需要改写你的代码来支持事务

交换机

  1. 扇形交换机fanout

    主要用来一对多。消息发送到多个队列,通过队列绑定到交换机来换成

    1
    2
    3
    4
    5
    6
    7
    8
    # exchange后接交换机名,type表明交换机类型;
    # 每次均需要申明队列名
    ch.exchange_declare(exchange='test', type='fanout')
    ch.queue_declare(queue='a')
    ch.queue_declare(queue='b')
    # 绑定队列到test这个交换机中
    ch.queue_bind(exchange='test', queue='a')
    ch.queue_bind(exchange='test', queue='b')
  2. direct

    交换机将会对绑定键(binding key)和路由键(routing key)进行精确匹配,从而确定消息该分发到哪个队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    channel.exchange_declare(exchange='direct_logs',type='direct')
    channel.basic_publish(exchange='direct_logs',
    routing_key=severity,
    body=message)
    # 处理接收消息的方式和之前差不多,只有一个例外,我们将会为我们感兴趣的每个严重级别分别创建一个新的绑定
    for severity in severities:
    channel.queue_bind(exchange='direct_logs',
    queue=queue_name,
    routing_key=severity)
  3. topic

    一个携带着特定路由键的消息会被主题交换机投递给绑定键与之想匹配的队列

  • * (星号) 用来表示一个单词.
  • # (井号) 用来表示任意数量(零个或多个)单词。
    1
    2
    3
    4
    5
    6
    channel.exchange_declare(exchange='topic_logs',
    type='topic')
    routing_key = sys.argv[1] if len(sys.argv) > 1 else 'anonymous.info'
    channel.basic_publish(exchange='topic_logs',
    routing_key=routing_key,
    body=message)

接受消息

1
2
3
4
5
6
7
8
9
10
channel.exchange_declare(exchange='topic_logs',
type='topic')

result = channel.queue_declare(exclusive=True)
queue_name = result.method.queue
binding_keys = sys.argv[1:]
for binding_key in binding_keys:
channel.queue_bind(exchange='topic_logs',
queue=queue_name,
routing_key=binding_key)

callback

1
2
3
4
5
6
7
8
9
10
11
12
channel.basic_publish(exchange='',
routing_key='task_queue',
body=message,
properties=pika.BasicProperties(
delivery_mode = 2, # make message persistent
))


channel.basic_consume(callback,
queue='task_queue')

channel.start_consuming()