PHP使用enqueue/amqp-lib实现rabbitmq任务处理

 更新时间:2024年03月11日 11:58:28   作者:huaweichenai  
这篇文章主要为大家详细介绍了PHP如何使用enqueue/amqp-lib实现rabbitmq任务处理,文中的示例代码讲解详细,感兴趣的小伙伴可以学习一下

一:拓展安装

composer require enqueue/amqp-lib

文档地址:https://github.com/php-enqueue/enqueue-dev/blob/master/docs/transport/amqp_lib.md

二:方法介绍

1:连接rabbitmq

$factory = new AmqpConnectionFactory([
    'host' => '192.168.6.88',//host
    'port' => '5672',//端口
    'vhost' => '/',//虚拟主机
    'user' => 'admin',//账号
    'pass' => 'admin',//密码
]);
$context = $factory->createContext();

2:声明主题

//声明并创建主题
$exchangeName = 'exchange';
$fooTopic = $context->createTopic($exchangeName);
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);
 
//删除主题
$context->deleteTopic($fooTopic);

3:声明队列

//声明并创建队列
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);
 
//删除队列
$context->deleteQueue($fooQueue);

4:将队列绑定到主题

$context->bind(new AmqpBind($fooTopic, $fooQueue));

5:发送消息

//向队列发送消息
$message = $context->createMessage('Hello world!');
$context->createProducer()->send($fooQueue, $message);
 
//向队列发送优先消息
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue(queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
//设置队列的最大优先级
$fooQueue->setArguments(['x-max-priority' => 10]);
$context->declareQueue($fooQueue);
 
$message = $context->createMessage('Hello world!');
 
$context->createProducer()
    ->setPriority(5) //设置优先级,优先级越高,消息越快到达消费者
    ->send($fooQueue, $message);
 
//向队列发送延时消息
$message = $context->createMessage('Hello world!');
 
$context->createProducer()
    ->setDelayStrategy(new RabbitMqDlxDelayStrategy())
    ->setDeliveryDelay(5000) //消息延时5秒
    ->send($fooQueue, $message);

6:消费消息【接收消息】

//消费消息
$consumer = $context->createConsumer($fooQueue);
 
$message = $consumer->receive();
 
// process a message
//业务代码
 
$consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
// $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
 
 
//订阅消费者
$fooConsumer = $context->createConsumer($fooQueue);
 
$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
    // process message
    //业务代码
    $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
    // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
 
    return true;
});
$subscriptionConsumer->consume();
 
//清除队列消息
$queueName = 'rabbitmq';
$queue = $context->createQueue($queueName);
$context->purgeQueue($queue);

三:简单实现 

1:发送消息

//连接rabbitmq
$factory = new AmqpConnectionFactory([
    'host' => '192.168.6.88',
    'port' => '5672',
    'vhost' => '/',
    'user' => 'admin',
    'pass' => 'admin',
    'persisted' => false,
]);
 
$context = $factory->createContext();
//声明主题
$exchangeName = 'exchange';
$fooTopic = $context->createTopic($exchangeName);
$fooTopic->setType(AmqpTopic::TYPE_FANOUT);
$context->declareTopic($fooTopic);
 
//声明队列
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
$fooQueue->addFlag(AmqpQueue::FLAG_DURABLE);
$context->declareQueue($fooQueue);
 
//将队列绑定到主题
$context->bind(new AmqpBind($fooTopic, $fooQueue));
 
//发送消息到队列
$message = $context->createMessage('Hello world!');
 
$context->createProducer()->send($fooQueue, $message);

2:消费消息

$factory = new AmqpConnectionFactory([
    'host' => '192.168.6.88',
    'port' => '5672',
    'vhost' => '/',
    'user' => 'admin',
    'pass' => 'admin',
    'persisted' => false,
]);
$context = $factory->createContext();
 
 
$queueName = 'rabbitmq';
$fooQueue = $context->createQueue($queueName);
 
 
 
$fooConsumer = $context->createConsumer($fooQueue);
 
$subscriptionConsumer = $context->createSubscriptionConsumer();
$subscriptionConsumer->subscribe($fooConsumer, function(Message $message, Consumer $consumer) {
    // process message
    //业务代码
    $consumer->acknowledge($message);//ack应答,通知rabbitmq成功,删除对应任务
    // $consumer->reject($message);ack应答,通知rabbitmq失败,不删除对应任务
 
    return true;
});
$subscriptionConsumer->consume();

到此这篇关于PHP使用enqueue/amqp-lib实现rabbitmq任务处理的文章就介绍到这了,更多相关PHP rabbitmq任务处理内容请搜索脚本之家以前的文章或继续浏览下面的相关文章希望大家以后多多支持脚本之家!

相关文章

  • PHP中使用foreach和引用导致程序BUG的问题介绍

    PHP中使用foreach和引用导致程序BUG的问题介绍

    PHP 引用有些类似 C 语言指针, 但一些重要的特性和 C 语言指针不一样, 如果不注意, 会导致程序 BUG. foreach 操作的是数组或对象的拷贝, 但 PHP5, 可以使用引用操作对象元素本身
    2012-09-09
  • PHP curl 获取响应的状态码的方法

    PHP curl 获取响应的状态码的方法

    PHP curl可以从服务器端模拟一个http请求,例如抓取网页、模拟登陆等,想要获取状态码,需要在执行curl_exec后再通过curl_getinfo来获取
    2014-01-01
  • 简单谈谈php浮点数精确运算

    简单谈谈php浮点数精确运算

    如果用php的+-*/计算浮点数的时候,可能会遇到一些计算结果错误的问题,所以基本上大部分语言都提供了精准计算的类库或函数库,比如php有BC高精确度函数库,下面我们介绍一下一些常用的BC高精确度函数使用。
    2016-03-03
  • php中的钩子理解及应用实例分析

    php中的钩子理解及应用实例分析

    这篇文章主要介绍了php中的钩子理解及应用,结合实例形式详细分析了php钩子的概念、功能、实现与使用方法,需要的朋友可以参考下
    2019-08-08
  • 分享3个php获取日历的函数

    分享3个php获取日历的函数

    本文给大家汇总了3个php获取日历的函数,分别是php获取指定日期的月份的日历、获取指定日期所在月的开始日期与结束日期、获取当前星期的日期范围,都是比较常用的方法,有需要的小伙伴可以参考下。
    2015-09-09
  • header与缓冲区之间的深层次分析

    header与缓冲区之间的深层次分析

    实际的开发中,大家是否听说过在header之前不能有任何的实际输出。甚至有的认为header函数必须写在代码的最前面。可是你是否试验过header函数之前输出东西?下来让我们更深层次的了解一下
    2016-07-07
  • PHP+ACCESS 文章管理程序代码

    PHP+ACCESS 文章管理程序代码

    花了一天的时间,写了一个PHP操作ACCESS的演示整站示例程序(不包括分页和二级分类)
    2010-06-06
  • 在Mac上编译安装PHP7的开发环境

    在Mac上编译安装PHP7的开发环境

    这篇文章主要介绍了在Mac上编译安装PHP7的开发环境的相关资料,需要的朋友可以参考下
    2015-07-07
  • CentOS6.5 编译安装lnmp环境

    CentOS6.5 编译安装lnmp环境

    这篇文章主要介绍了CentOS6.5 编译安装lnmp环境的相关资料及方法,需要的朋友可以参考下
    2014-12-12
  • php的ajax框架xajax入门与试用介绍

    php的ajax框架xajax入门与试用介绍

    xajax功能很简单,但很灵活!~它不象其它一些大的框架,功能确实强大,但执行速度不敢恭维。。功能虽多,但不够灵活。api多,学起来简直如同学习一门新的语言。
    2010-12-12

最新评论