网站备案取名,营销型集团网站建设,登录我的博客,免费分站网站安装think-swoole安装 composer require php-amqplib/php-amqplib,以支持rabbitMq使用安装rabbitMq延迟队列插件 安装 rabbitmq_delayed_message_exchange 插件#xff0c;按照以下步骤操作#xff1a;
下载插件#xff1a;https://github.com/rabbitmq/rabbitmq-delayed-…安装think-swoole安装 composer require php-amqplib/php-amqplib,以支持rabbitMq使用安装rabbitMq延迟队列插件 安装 rabbitmq_delayed_message_exchange 插件按照以下步骤操作
下载插件https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases以下路由不一定是一样的
将插件复制到 RabbitMQ 插件目录 将下载的插件文件复制到 RabbitMQ 插件目录。
sudo cp rabbitmq_delayed_message_exchange-3.8.9.ez /usr/lib/rabbitmq/lib/rabbitmq_server-version/plugins/
将 version 替换为您的 RabbitMQ 服务器版本。
启用插件 使用 RabbitMQ 命令行工具启用插件。
sudo rabbitmq-plugins enable rabbitmq_delayed_message_exchange
重启 RabbitMQ 重启 RabbitMQ 服务器以应用更改。
sudo systemctl restart rabbitmq-server config目录创建 rabbitmq.php 文件,内容如下
return [host 服务器地址,port 端口,user 账户,password 密码,vhost /,exchange delayed_exchange,exchange_type direct, // 交换机类型如 direct、fanout、topicexchange_arguments [x-delayed-type direct], // 延迟交换机参数
];
创建 RabbitMQService 类 class RabbitMQService
{protected $connection;protected $channel;public function __construct(){$config config(rabbitmq);$this-connection new AMQPStreamConnection($config[host],$config[port],$config[user],$config[password],$config[vhost]);$this-channel $this-connection-channel();$this-channel-exchange_declare($config[exchange],x-delayed-message, // 指定延迟交换机类型false,true,false,false,false,new AMQPTable([x-delayed-type $config[exchange_type]]) // 设置延迟交换机的底层类型);}public function publish($message, $queue, $delay 0){// 声明队列$this-channel-queue_declare($queue, false, true, false, false);// 绑定队列到交换机$this-channel-queue_bind($queue, config(rabbitmq.exchange));// 设置延迟头部信息$headers new AMQPTable([x-delay $delay // 延迟时间单位为毫秒]);// 创建消息$msg new AMQPMessage($message, [delivery_mode AMQPMessage::DELIVERY_MODE_PERSISTENT, // 持久化消息]);$msg-set(application_headers, $headers); // 正确设置头信息// 发布消息到交换机$this-channel-basic_publish($msg, config(rabbitmq.exchange));}public function consume($queue, $callback){$this-channel-queue_declare($queue, false, true, false, false);$this-channel-basic_consume($queue, , false, true, false, false, $callback);while ($this-channel-is_consuming()) {$this-channel-wait();}}public function __destruct(){$this-channel-close();$this-connection-close();}
}
创建 RabbitMqUseService 类文件 class RabbitMqUseService
{// 消费队列public static function consumption(){$rabbitMQ new RabbitMQService();$rabbitMQ-consume(queue, function ($msg){Log::error(消费队列.$msg-body);$con json_decode($msg-body,true);$class $con[class];Log::error(class-.$class);if(class_exists($class)){$obj new $class;$obj-handle($con[body]);}});}/*** param $obj* param $data* param $delay* return void*/public static function push($obj,$data,$delay 0){$rabbitMQ new RabbitMQService();$class get_class($obj);// 构造消息体$message json_encode([class $class, // 类名body $data // 具体数据]);$rabbitMQ-publish($message, queue, $delay * 1000);var_dump(已加入);}public function test(){self::push(new TestJob(),[nametest],10);}
}
配置消费任务
新建文件类 RabbitConsumptionHandleclass RabbitConsumptionHandle
{public function handle(){RabbitMqUseService::consumption();}
}在app/event.php listen 中引入listen [swoole.init [RabbitConsumptionHandle::class]]
新增队列 RabbitMqUseService::push(new \app\job\TestJob(),[a1,b2]);