一个菜鸟驿站!

RabbitMq初体验(二)

PHP 2018-07-17 浏览(2368) 评论(0)
- N +

文章目录 [+]

上篇文章,我们开始输出Hello World

对于一个程序员来说,Hello World是一个多么熟悉的字眼。

生产者(消息发送方)

5.png

生产者(消息发送方)发送一条消息之后,然后退出。

首先引入autoload.php

require_once __DIR__ . '/vendor/autoload.php';

现在我们能够创建一个连接服务器的Connection

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel ();

发送消息前,我们必须声明一个队列为我们发送准备,然后我们可以向队列发送消息:

$channel->queue_declare ( 'hello' , false , false , false , false );
$msg = new AMQPMessage('Hello world');
$channel->basic_publish ($msg,'','hello');
echo "[×] Send 'Hello World!'\n";

然后我们关闭队列,以及关闭连接:

$channel->close();
$connection->close();

消费者(接收方,用来处理任务)

消费者RabbitMQ接收推送来的消息,我们会保持运行监听并且进行操作(下边的例子只是打印而已)。

6.png

和上边一样,都需要引入autoload.php:

require_once __DIR__ . '/vendor/autoload.php';

设置与发布程序相同,我们打开一个连接和通道,并声明将要消耗的队列。注:这里与发送队列匹配

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel ();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";

注意,我们也在这里声明队列。因为我们可能在发布之前启动消费者,我们希望在我们尝试从它那里得到消息之前确定队列是否存在。

我们将告诉服务器从队列中发送消息。我们将定义一个PHP可调用,它将接收服务器端发送的消息。

请记住,消息是服务器异步发送到客户机的。

$callback = function ($msg){
    echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}

当调用basic_consume,我们的代码会阻塞。当我们收到消息时,我们的回调函数将通过接收到的消息传递。

然后我们关闭队列,以及关闭连接:

$channel->close();
$connection->close();

完整代码

生产者(produce.php)

<?php
require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;

$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel ();
//然后声明一个队列
$channel->queue_declare ( 'hello' , false , false , false , false );
$msg = new AMQPMessage('Hello world');
$channel->basic_publish ($msg,'','hello');
echo "[×] Send 'Hello World!'\n";

//最后关闭通道和连接
$channel->close ();
$connection->close ();

消费者(consumption.php)

<?php

require_once __DIR__ . '/vendor/autoload.php';

use PhpAmqpLib\Connection\AMQPStreamConnection;


$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel ();

$channel->queue_declare ( 'hello' , false , false , false , false );
echo "[×] Waiting for message,to Exit press Ctrl+C \n";


$callback = function ($msg){
    echo " [x] Received ", $msg->body, "\n";
};
$channel->basic_consume('hello', '', false, true, false, false, $callback);

while(count($channel->callbacks)) {
    $channel->wait();
}


$channel->close ();
$connection->close ();


运行测试

生产者

php produce.php

消费者

php consumption.php

模拟队列.jpg


了解如何构建一个简单的工作队列,你可以阅读下一章:未完待续


标签:
作者:猫巷

,

评论列表 (0)条评论

发表评论

召唤伊斯特瓦尔