个人网站做接口可以么,wordpress 外网访问 登录路由器,城阳建设局网站,全球域名最贵的100个域名1、RabbitMQ简介
rabbitmq是一个开源的消息中间件#xff0c;主要有以下用途#xff0c;分别是#xff1a;
应用解耦#xff1a;通过使用RabbitMQ#xff0c;不同的应用程序之间可以通过消息进行通信#xff0c;从而降低应用程序之间的直接依赖性#xff0c;提高系统的…1、RabbitMQ简介
rabbitmq是一个开源的消息中间件主要有以下用途分别是
应用解耦通过使用RabbitMQ不同的应用程序之间可以通过消息进行通信从而降低应用程序之间的直接依赖性提高系统的可维护性、扩展性和容错性。异步提速通过将耗时的操作转化为异步执行可以提高系统的响应速度和吞吐量提升用户体验。削峰填谷在高峰时段RabbitMQ可以缓存大量的消息从而避免系统崩溃并在低峰时段处理这些消息提高系统的稳定性。消息分发RabbitMQ可以将消息分发到多个消费者进行处理从而提高系统的灵活性和处理能力。
了解rabbitmq的设计架构对理解mq如何使用有很大的帮助。 一个非常重要的点mq中的生产者从来不是直接将消息发送到队列中的而是将消息发送到了mq的交换机中(上图中的exchange为交换机) 甚至生产者都不知道这条消息将被发送到哪个队列中。
交换机是个怎样的设计呢他的一侧连接生产者从生产者接收消息另外一侧连接队列将消息push进队列中将消息push进一个队列还是多个队列还是抛弃这些策略是由交换机的类型决定的对于交换机的使用后面详细介绍。
2、RabbitMQ安装
rabiitmq的安装最简单的一种方式为运行mq的docker镜像一行命令搞定
# latest RabbitMQ 3.13
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.13-management
执行命令后可以看到如下打印则代表RabbitMQ启动成功 镜像启动成功后可以通过ip:15672打开mq控制台
http://xxx.xx.xxx.xx:15672/#/ mq安装完成后下面就可以进行实践啦。
3、默认模式读、写mq rabbitmq官方的库github.com/rabbitmq/amqp091-go 生产者侧代码
package mainimport (contextfmttimeamqp github.com/rabbitmq/amqp091-go
)func Send(msg string) error {// 连接rabbitmqconn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)if err ! nil {fmt.Println(connect error:, err)return err}defer conn.Close()// 创建通道ch, err : conn.Channel()if err ! nil {fmt.Println(channel error:, err)return err}defer ch.Close()// 创建队列,使用默认的交换机q, err : ch.QueueDeclare(lp_default, // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // noWaitnil, // arguments)if err ! nil {fmt.Println(queue declare error:, err)return err}ctx, cancel : context.WithTimeout(context.Background(), 5*time.Second)defer cancel()fmt.Println(q.Name)// body : Hello World!err ch.PublishWithContext(ctx,, // exchange,默认交换机q.Name, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: text/plain,Body: []byte(msg),})if err ! nil {fmt.Println(publish error:, err)return err}return nil
}func main() {Send(Hello world)
}运行上面代码后可以在rabbitmq的客户端 看到这个队列 点击队列进入队列详情 第一个框中显式了队列详情可以看出这个队列绑定的是默认的交换机。
第二个框点击后可以看到队列中的消息详情。
消费者
package mainimport (fmtamqp github.com/rabbitmq/amqp091-go
)func main() {conn, err : amqp.Dial(amqp://guest:guestlocalhost:5672/)if err ! nil {fmt.Println(connect error:, err)return}defer conn.Close()ch, err : conn.Channel()if err ! nil {fmt.Println(Channel error:, err)return}defer ch.Close()q, err : ch.QueueDeclare(lp_default, // nametrue, // durablefalse, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments)if err ! nil {fmt.Println(Queue Declare error:, err)return}msgs, err : ch.Consume(q.Name, // queue, // consumertrue, // auto-ackfalse, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err ! nil {fmt.Println(Consume error:, err)return}var forever chan struct{}go func() {for d : range msgs {fmt.Printf(Received a message: %s\n, d.Body)}}()fmt.Printf( [*] Waiting for messages. To exit press CTRLC)-forever
}代码运行记录
liupeng192 default % go run recive.go[*] Waiting for messages. To exit press CTRLCReceived a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
Received a message: Hello world
在以上消费端代码中如果代码在处理消息的过程中出现异常导致了程序退出这样正在处理的这条消息就会丢失为了避免这种情况的发生rabbitmq设计了消息应答的机制我们修改上面程序将auto-ack参数设置为false当处理完消息后使用d.Ack(false)发送消息应答。 msgs, err : ch.Consume(q.Name, // queue, // consumerfalse, // auto-ack设置为false取消自动应答false, // exclusivefalse, // no-localfalse, // no-waitnil, // args)if err ! nil {fmt.Println(Consume error:, err)return}var forever chan struct{}go func() {for d : range msgs {fmt.Printf(Received a message: %s\n, d.Body)d.Ack(false) // 手动应答}}()fmt.Printf( [*] Waiting for messages. To exit press CTRLC)-forever
如果忘记了进行消息应答消息会被重新发入调度队列这样就会吃掉越来越多的内存。
但是当rabbitmq的服务down掉后队列中的消息仍然会丢失为了保证在这种情况下消息仍然能够不丢失我们需要做两件事队列不丢失消息不丢失代码如下
队列持久化
q, err : ch.QueueDeclare(hello, // nametrue, // durable设置队列持久化false, // delete when unusedfalse, // exclusivefalse, // no-waitnil, // arguments
)
failOnError(err, Failed to declare a queue)
消息持久化
将DeliveryMode设置为amqp.Persistent
err ch.PublishWithContext(ctx,, // exchange,默认交换机q.Name, // routing keyfalse, // mandatoryfalse, // immediateamqp.Publishing{ContentType: text/plain,Body: []byte(msg),DeliveryMode: amqp.Persistent,})
以上就是默认读写rabbitmq的方法后面再介绍其他几种使用方式。