先决条件

已安装PHP,Erlang和RabbitMQ。

安装PHP环境下使用的RabbitMQ第三方库——php-amqplib

使用composer安装php-amqplib库。

生产者代码

<?php
require 'vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;
use PhpAmqpLibMessageAMQPMessage;

$conf = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'admin',
    'password' => 'admin',
    'vhost' => '/',
];
$exchangeName = 'testExch';     //交换机名称
$queueName = 'testQue';         //队列名称
$routingKey = 'testRoute';      //路由关键字(也可以省略)

//建立生产者与mq之间的连接
$conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']);

$channel = $conn->channel();    //在已连接基础上建立生产者与mq之间的通道
$channel->exchange_declare($exchangeName, 'direct', false, true, false);    //声明初始化交换机
$channel->queue_declare($queueName, false, true, false, false);     //声明初始化一条队列
$channel->queue_bind($queueName, $exchangeName, $routingKey);       //将队列与某个交换机进行绑定,并使用路由关键字

for ($i=1; $i<=20; $i++){
    $msgBody = json_encode(["name" => "WCW", "no" => $i]);
    $msg = new AMQPMessage($msgBody, ['content_type' => 'text/plain', 'delivery_mode' => 2]);   //构建消息
    $ret = $channel->basic_publish($msg, $exchangeName, $routingKey);     //发布消息到某个交换机
}

$channel->close();
$conn->close();

消费者代码

<?php
require 'vendor/autoload.php';

use PhpAmqpLibConnectionAMQPStreamConnection;

$conf = [
    'host' => 'localhost',
    'port' => 5672,
    'user' => 'admin',
    'password' => 'admin',
    'vhost' => '/',
];
$exchangeName = 'testExch';     //交换机名
$queueName = 'testQue';         //队列名称
$routingKey = 'testRoute';      //路由关键字(也可以省略)

//建立生产者与mq之间的连接
$conn = new AMQPStreamConnection($conf['host'], $conf['port'], $conf['user'], $conf['password'], $conf['vhost']);

$channel = $conn->channel();    //在已连接基础上建立生产者与mq之间的通道
$channel->queue_declare($queueName, false, true, false, false);     //声明初始化一条队列

//回调函数,数据处理
$callback = function($msg) {
    echo " Received: ", $msg->body, "n";
};
$channel->basic_consume($queueName, '', false, true, false, false, $callback);      //消费接收消息

//监听消息,一有消息,立马就处理
while(count($channel->callbacks)) {
    $channel->wait();
}

效果测试

执行生产者程序:

执行消费者程序:

消息接收成功!

 

至此。转载请注明出处,记得扫码打赏支持哦,谢谢!

 

内容来源于网络如有侵权请私信删除