PHP使用enqueue/amqp-lib实现rabbitmq任务处理

一:拓展安装

composer require enqueue/amqp-lib

文档地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md

二:方法介绍

1:连接rabbitmq

$factory = new AmqpConnectionFactory([
    'host' => '192.168.6.88',//host
    'port' => '5672',//端口
    'vhost' => '/',//虚拟主机
    'user' => 'admin',//账号
    'pass' => 'admin',//密码
]);
$context = $factory->createContext();

2:声明主题

//声明并创建主题
$exchangeName = 'exchange';
$fooTopic = $context->createTopic($exchangeName);
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);
 
//删除主题
$context->deleteTopic($fooTopic);

3:声明队列

//声明并创建队列
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);
 
//删除队列
$context->deleteQueue($fooQueue);

4:将队列绑定到主题

$context->bind(new AmqpBind($fooTopic, $fooQueue));

5:发送消息

//向队列发送消息
$message = $context->createMessage('Hello world!');
$context->createProducer()->send($fooQueue, $message);
 
//向队列发送优先消息
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue(queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
//设置队列的最大优先级
$fooQueue->setArguments(['x-max-priority' => 10]);
$context->declareQueue($fooQueue);
 
$message = $context->createMessage('Hello world!');
 
$context->createProducer()
    ->setPriority(5) //设置优先级,优先级越高,消息越快到达消费者
    ->send($fooQueue, $message);
 
//向队列发送延时消息
$message = $context->createMessage('Hello world!');
 
$context->createProducer()
    ->setDelayStrategy(new RabbitMqDlxDelayStrategy())
    ->setDeliveryDelay(5000) //消息延时5秒
    ->send($fooQueue, $message);

6:消费消息【接收消息】

//消费消息
$consumer = $context->createConsumer($fooQueue);
 
$message = $consumer->receive();
 
// process a message
//业务代码
 
$consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
// $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
 
 
//订阅消费者
$fooConsumer = $context->createConsumer($fooQueue);
 
$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
    // process message
    //业务代码
    $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
    // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
 
    return true;
});
$subscriptionConsumer->consume();
 
//清除队列消息
$queueName = 'rabbitmq';
$queue = $context->createQueue($queueName);
$context->purgeQueue($queue);

三:简单实现 

1:发送消息

//连接rabbitmq
$factory = new AmqpConnectionFactory([
    'host' => '192.168.6.88',
    'port' => '5672',
    'vhost' => '/',
    'user' => 'admin',
    'pass' => 'admin',
    'persisted' => false,
]);
 
$context = $factory->createContext();
//声明主题
$exchangeName = 'exchange';
$fooTopic = $context->createTopic($exchangeName);
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);
 
//声明队列
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);
 
//将队列绑定到主题
$context->bind(new AmqpBind($fooTopic, $fooQueue));
 
//发送消息到队列
$message = $context->createMessage('Hello world!');
 
$context->createProducer()->send($fooQueue, $message);

2:消费消息

$factory = new AmqpConnectionFactory([
    'host' => '192.168.6.88',
    'port' => '5672',
    'vhost' => '/',
    'user' => 'admin',
    'pass' => 'admin',
    'persisted' => false,
]);
$context = $factory->createContext();
 
 
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
 
 
 
$fooConsumer = $context->createConsumer($fooQueue);
 
$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
    // process message
    //业务代码
    $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
    // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
 
    return true;
});
$subscriptionConsumer->consume();

到此这篇关于PHP使用enqueue/amqp-lib实现rabbitmq任务处理的文章就介绍到这了,更多相关PHP rabbitmq任务处理内容请搜索恩蓝小号以前的文章或继续浏览下面的相关文章希望大家以后多多支持恩蓝小号!

原创文章,作者:EMGJB,如若转载,请注明出处:http://www.wangzhanshi.com/n/574.html

(0)
EMGJB的头像EMGJB
上一篇 2024年12月17日 17:53:01
下一篇 2024年12月17日 17:53:03

相关推荐

  • 没有php5apache2_4.dll的解决方法

    没有“php5apache2_4.dll”是因为PHP下载的版本不对,其解决办法就是去官网下载“php 5.5 thread safe”版本的PHP即可。 apache配置 php…

    2025年1月2日
  • php7中如何创建扩展

    将实现如下功能: <?phpecho say(); ?> 输出内容: $ php ./test.php $ he…

    php 2025年1月1日
  • php7新功能有哪些

    PHP7 是最令人期待的,是 PHP 编程语言的主要功能。PHP7 是在2015年12月3日发布的,被誉为可以开发和交付移动到企业和云端的 Web 应用程序的革命。 PHP7 新功…

    php 2025年1月1日
  • php7中普通变量和静态变量有什么不同

    静态变量是什么 是属于静态存储方式,但是属于静态存储方式的量不一定就是静态变量,例如外部变量虽属于静态存储方式,但不一定是静态变量,必须由static加以定义后才能成为静态外部变量…

    php 2025年1月1日
  • php使用swoole实现TCP服务

    这里以在Yii框架下示例 一:swoole配置TCP ‘swoole’ => [ // 日志文件路径 ‘log_file’ => ‘@console/log/swool…

    php 2024年12月17日
  • HashTable在PHP7中的应用

    先来简单回顾一下PHP5的Hashtable: PHP5的实现中, Hashtable的核心是存储了一个个指向zval指针的指针, 也就是zval**(我遇到不少的同学问为什么是z…

    php 2025年1月1日
  • PHP7连接数据库以及增删查改的方法

    用mysqli方法 实现以下功能(php7): 1、连接MySQL数据库服务器; 2、创建一个名为test的数据库; 3、在该数据库内创建一个名为“testTable”的数据表,数…

    2025年1月1日
  • PHP7中的数据类型有哪些

    PHP中变量名→zval,变量值→zend_value。其变量内存是通过引用计数管理的,在PHP7中引用计数在value结构中。 变量类型: 头文件在PHP源码 /zend/zen…

    2025年1月1日
  • 实例探索PHP只读属性改变游戏规则的特性

    正文 只读属性是一个改变游戏规则的新特性。它允许您声明只能在初始化期间设置且此后无法修改的属性。 只读属性就像是代码中的恒久不变的守护者,确保一切保持原样。它们可以用于各种目的,例…

    php 2024年12月17日
  • PHP7中的异常处理和错误处理

    PHP7中异常与错误处理与之前版本对比 先上代码 ECHO PHP_VERSION.PHP_EOL; function add (int $left,int $right){ re…

    php 2025年1月1日

发表回复

登录后才能评论