招标网站排行榜,小型教育网站的开发建设开题报告,网络公司电话,深圳市住房城乡建设局网站RabbitMQ的exchange#xff0c;即交换机有不同的类型#xff1a;
1.direct Exchange(直接交换机) 匹配路由键#xff0c;只有完全匹配消息才会被转发 2.Fanout Excange#xff08;扇出交换机#xff09; 将消息发送至所有的队列 3.Topic Exchange(主题交换机) 将路由按模…RabbitMQ的exchange即交换机有不同的类型
1.direct Exchange(直接交换机) 匹配路由键只有完全匹配消息才会被转发 2.Fanout Excange扇出交换机 将消息发送至所有的队列 3.Topic Exchange(主题交换机) 将路由按模式匹配此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词符号“*”匹配不多不少一个词。因此“abc.#”能够匹配到“abc.def.ghi”但是“abc.*” 只会匹配到“abc.def”。 4.Header Exchange 在绑定Exchange和Queue的时候指定一组键值对header为键根据请求消息中携带的header进行路由
RabbitMQ六种工作模式 六种模式分别为Hello world、Work queues工作队列、Publish/Subscribe发布订阅、Routing路由、Topics主题、RPC远程调用除了RPC模式外其余的模式都是从简单的使用到更为灵活的使用基本的代码框架都是差不多的只是在不同的模式下达到的效果不同它们各有各的特点在实际使用中应该根据需求来选择具体的模式而不是简单粗暴的选择最“高端”的模式。
1. Hello world模式也叫simple (简单模式) Hello world模式是最简单的一种模式一个producer发送message另一个consumer接收message。 2. Work queues模式工作模式 Work queues模式即工作队列模式也称为Task queues模式任务队列模式这个模式的特点在于同一个queue可以允许多个consumer从中获取massageRabbitMQ默认会从queue中依次循环的给不同的consumer发送message。一个生产者生产信息多个消费者进行消费但是一条消息只能消费一次 3. Publish/Subscribe模式发布订阅模式相当于广播 相对于工作/任务模式中的一个message只能发送给一个consumer使用发布订阅模式会将一个message同时发送给多个consumer使用其实就是producer将message广播给所有的consumer。 生产者首先投递消息到交换机订阅了这个交换机的队列就会收到生产者投递的消息。
使用fanout交换机类型传递到 exchange 的消息将会转发到所有与其绑定的 queue 上。
不需要指定 routing_key 即使指定了也是无效。 需要提前将 exchange 和 queue 绑定一个 exchange 可以绑定多个 queue一个queue可以绑定多个exchange。 需要先启动 订阅者此模式下的队列是 consumer 随机生成的发布者 仅仅发布消息到 exchange 由 exchange 转发消息至 queue。如果不先启动订阅者则发布者发布的消息订阅者是无法事后接收到的。 发布者
import pika # 链接mq需要pika模块
import jsonuser_info pika.PlainCredentials(tester,test1234)
connection pika.BlockingConnection(pika.ConnectionParameters(192.168.1.117,5672,/,user_info))
channel connection.channel()# 声明exchange由exchange指定消息在哪个队列传递如不存在则创建。durable True 代表exchange持久化存储False 非持久化存储channel.exchange_declare(exchangelogs,exchange_typefanout,)
for i in range(0,10):message json.dumps({消息ID:1000%s%i,},ensure_asciiFalse)channel.basic_publish(exchangelogs,routing_key,bodybytes(message,encodingutf8),)print(message)connection.close()
接收者
import pika
import jsonuser_info pika.PlainCredentials(tester,test1234)
connection pika.BlockingConnection(pika.ConnectionParameters(192.168.1.117,5672,/,user_info))
channel connection.channel()# 创建临时队列,队列名传空字符或不设置将创建唯一的临时queueconsumer关闭后队列自动删除
result channel.queue_declare(,exclusiveTrue)
queue_name result.method.queue
print(temp queue name:,queue_name)
channel.queue_bind(exchangelogs,queuequeue_name,)def callback(ch, method, properties, body):print([x] Received %r % str(body,encodingutf8))# 如果basic_consume中auto_ack为False则这里要手动进行应答channel.basic_ack(delivery_tagmethod.delivery_tag) # 手动应答print(手动应答队列中消息)channel.basic_consume(queuequeue_name, # 接收指定queue的消息on_message_callbackcallback, # 设置收到消息的回调函数auto_ackFalse) # 指定为True表示消息接收到后自动给消息发送方回复确认已收到消息False表示不自动确认需要在callback中手工确认print([*] Waiting for message. To exit press CTRLC)# 一直处于等待接收消息的状态如果没收到消息就一直处于阻塞状态收到消息就调用上面的回调函数
channel.start_consuming()4. Routing模式路由模式相当于组播 路由模式中exchange类型为direct与发布订阅模式相似但是不同之处在于发布订阅模式将message不加区分广播给所有的绑定queue但是路由模式中允许queue在绑定exchange时同时指定 routing_key exchange就只会发送message到与 routing_key 匹配的queue中其他的所有message都将被丢弃。当然也允许多个queue指定相同的 routing_key 此时效果就相当于fanout类型的发布订阅模式了。 生产者生产消息投递到direct交换机中扇出交换机会根据消息携带的routing Key匹配相应的队列
生产者
import pika # 链接mq需要pika模块
import sysuser_info pika.PlainCredentials(tester,test1234)
connection pika.BlockingConnection(pika.ConnectionParameters(192.168.1.117,5672,/,user_info))
channel connection.channel()channel.exchange_declare(exchangedirect-logs,exchange_typedirect, # 类型为directdurable True,)severity sys.argv[1] if len(sys.argv)1 else info # 定义消息严重级别
message .join(sys.argv[2:]) or Hello World!channel.basic_publish(exchangedirect-logs,routing_keyseverity, # 把消息发送到一组队列这一组队列按routing_key分组bodymessage)
print( [x] Sent %r:%r % (severity,message))
connection.close()
消费者
import pika
import sysuser_info pika.PlainCredentials(tester,test1234)
connection pika.BlockingConnection(pika.ConnectionParameters(192.168.1.117,5672,/,user_info))
channel connection.channel()# 创建临时队列,队列名传空字符或不设置将创建唯一的临时queueconsumer关闭后队列自动删除
result channel.queue_declare(,exclusiveTrue)
queue_name result.method.queueseverities sys.argv[1:] # 可以输入多个级别
if not severities:sys.stderr.write(Usage: %s [info] [warning] [error]\n % sys.argv[0])sys.exit(1)for severity in severities: # 循环绑定routing_keychannel.queue_bind(exchangedirect-logs,queuequeue_name,routing_keyseverity,)def callback(ch, method, properties, body):print([x] Received %r % body)# 如果basic_consume中auto_ack为False则这里要手动进行应答channel.basic_ack(delivery_tagmethod.delivery_tag) # 手动应答print(手动应答队列中消息)channel.basic_consume(queuequeue_name, # 接收指定queue的消息on_message_callbackcallback, # 设置收到消息的回调函数auto_ackFalse) # 指定为True表示消息接收到后自动给消息发送方回复确认已收到消息False表示不自动确认需要在callback中手工确认channel.start_consuming()
运行结果 5. Topics模式主题模式 主题模式的exchange类型为topic相较于路由模式主题模式更加灵活区别就在于它的routing_key可以带通配符 * 匹配一个单词和 # 匹配0个或多个单词每个单词以点号分隔但注意routing_key的总大小不能超过255个字节。
如果一个message同时匹配了多个queue中的routing_key那这几个queue都会收到这个message如果一个message同时匹配了一个queue中的多个routing_key那这个queue也只会接收一次这条message如果一个message没有匹配上任何routing_key那么这个message将被丢弃。
如果routing_key定义为 # 就只有这一个通配符那么这个queue将接收所有message就像exchange类型为fanout的发布订阅模式一样如果routing_key两个通配符都没有使用那么这个queue将会接收固定routing_key的message就像exchange类型为direct的路由模式一样。
producer端从代码上讲producer的代码与路由模式没什么区别只不过在routing_key的传值上需要注意与想要发送到的queue进行匹配。 生产者生产消息投递到topic交换机中上面是完全匹配路由键而主题模式是模糊匹配只要有合适规则的路由就会投递给消费者。
生产者
import pika # 链接mq需要pika模块
import sysuser_info pika.PlainCredentials(tester,test1234)
connection pika.BlockingConnection(pika.ConnectionParameters(192.168.1.117,5672,/,user_info))
channel connection.channel()channel.exchange_declare(exchangetopic-logs,exchange_typetopic, # 类型为directdurable True,)routing_key sys.argv[1] if len(sys.argv)1 else anonymous.info # 定义消息严重级别message .join(sys.argv[2:]) or Hello World!channel.basic_publish(exchangetopic-logs,routing_keyrouting_key, # 把消息发送到一组队列这一组队列按routing_key分组bodymessage)
print( [x] Sent %r:%r % (routing_key,message))
connection.close()
消费者
import pika
import sysuser_info pika.PlainCredentials(tester,test1234)
connection pika.BlockingConnection(pika.ConnectionParameters(192.168.1.117,5672,/,user_info))
channel connection.channel()channel.exchange_declare(exchangetopic-logs,exchange_typetopic,durableTrue,)# 创建临时队列,队列名传空字符或不设置将创建唯一的临时queueconsumer关闭后队列自动删除
result channel.queue_declare(,exclusiveTrue)
queue_name result.method.queuebinding_keys sys.argv[1:] # 可以输入多个级别
if not binding_keys:sys.stderr.write(Usage: %s [binding_key]...\n % sys.argv[0])sys.exit(1)for binding_key in binding_keys: # 循环绑定routing_keychannel.queue_bind(exchangetopic-logs,queuequeue_name,routing_keybinding_key,)def callback(ch, method, properties, body):print([x] Received %r % body)# 如果basic_consume中auto_ack为False则这里要手动进行应答channel.basic_ack(delivery_tagmethod.delivery_tag) # 手动应答print(手动应答队列中消息)channel.basic_consume(queuequeue_name, # 接收指定queue的消息on_message_callbackcallback, # 设置收到消息的回调函数auto_ackFalse) # 指定为True表示消息接收到后自动给消息发送方回复确认已收到消息False表示不自动确认需要在callback中手工确认print([*] Waiting for message. To exit press CTRLC)
channel.start_consuming()
运行结果可以使用*#等进行过滤 6. RPC模式 RPC远程调用Remote Procedure Call模式其实就是使用消息队列处理请求的一种方式通常请求接收到后会立即执行且多个请求是并行执行的如果一次性来了太多请求达到了服务端处理请求的瓶颈就会影响性能但是如果使用消息队列的方式最大的一点好处是可以不用立即处理请求而是将请求放入消息队列服务端只需要根据自己的状态从消息队列中获取并处理请求即可。
producer端RPC模式的客户端producer需要使用到两个queue一个用于发送request消息此queue通常在服务端声明和创建一个用于接收response消息。另外需要特别注意的一点是需要为每个request消息指定一个uuidcorrelation_id属性类似请求id用于识别返回的response消息是否属于对应的request。 客户端client
import pika
import uuidclass FibonacciRpcClient(object):def __init__(self):self.connection pika.BlockingConnection(pika.ConnectionParameters(192.168.1.117, 5672, /, pika.PlainCredentials(tester,test1234)))self.channel self.connection.channel()result self.channel.queue_declare(,exclusiveTrue) # 随机生成一个临时的唯一的queueself.callback_queue result.method.queue # 这个临时唯一queue的名字# 注意这个临时queue不是用于发送消息的是用于接收消息的这个queue名字# 会传给server端server端用这个Queue发送消息也就是客户端指定了服务器端要使用的queueself.channel.basic_consume(on_message_callbackself.on_response,auto_ackFalse,queueself.callback_queue,) # 这是客户端发送完请求后接收服务器端返回消息的配置注意queue就是上面生成的临时queuedef on_response(self,ch,method,props,body):if self.corr_id props.correlation_id:self.response bodych.basic_ack(delivery_tagmethod.delivery_tag)print(手动应答成功)def call(self,n):self.response Noneself.corr_id str(uuid.uuid4())self.channel.basic_publish(exchange,routing_keyrpc_queue,propertiespika.BasicProperties(reply_toself.callback_queue,correlation_idself.corr_id,# 这个参数是用来标识本次请求如果客户端发送多个请求每个请求有不同的uuid以此进行区分类似cookie),bodystr(n))while self.response is None:self.connection.process_data_events() # 以非阻塞的方式去检查有没有新消息return int(self.response)fibonacci_rpc FibonacciRpcClient()print([x] Requesting fib(7))
response fibonacci_rpc.call(7)
print([.] Got %r % response)
服务器端server
import pikauser_info pika.PlainCredentials(tester,test1234)
connection pika.BlockingConnection(pika.ConnectionParameters(192.168.1.117,5672,/,user_info,))channel connection.channel()channel.queue_declare(queuerpc_queue)def fib(n):if n 0:return 0elif n 1:return 1else:return fib(n-1) fib(n-2)def on_request(ch,method,props,body):n int(body)print([.] fib(%s) % n)response fib(n)ch.basic_publish(exchange,routing_keyprops.reply_to,propertiespika.BasicProperties(correlation_idprops.correlation_id),bodystr(response))ch.basic_ack(delivery_tagmethod.delivery_tag)channel.basic_qos(prefetch_count1)
channel.basic_consume(on_message_callbackon_request, queuerpc_queue)print([x] Awaiting RPC requests)
channel.start_consuming()
要注意的是作为RPC模式client端一开始是消息发送方即发布者server端是消费者当server端收到消息后经过处理要将处理结果再返回给client端此时server端就是发布者client端就是消费者并且server端发布时使用的queue是client端指定的即client端生成的临时queue。
correlation_id主要是为了在异步处理中客户端发送多个请求服务器端返回的响应因处理速度不同可能响应的顺序也不同为了区分不同的请求的响应使用此标志。