文章目录 [+]
继上篇文章,我们开始输出Hello World
对于一个程序员来说,Hello World是一个多么熟悉的字眼。
生产者(消息发送方)
生产者(消息发送方)发送一条消息之后,然后退出。
首先引入autoload.php:
require_once __DIR__ . '/vendor/autoload.php';
现在我们能够创建一个连接服务器的Connection:
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel ();
发送消息前,我们必须声明一个队列为我们发送准备,然后我们可以向队列发送消息:
$channel->queue_declare ( 'hello' , false , false , false , false );
$msg = new AMQPMessage('Hello world');
$channel->basic_publish ($msg,'','hello');
echo "[×] Send 'Hello World!'\n";
然后我们关闭队列,以及关闭连接:
$channel->close();
$connection->close();
消费者(接收方,用来处理任务)
消费者从RabbitMQ接收推送来的消息,我们会保持运行监听并且进行操作(下边的例子只是打印而已)。
和上边一样,都需要引入autoload.php:
require_once __DIR__ . '/vendor/autoload.php';
设置与发布程序相同,我们打开一个连接和通道,并声明将要消耗的队列。注:这里与发送队列匹配。
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
$connection = new AMQPStreamConnection('localhost',5672,'guest','guest');
$channel = $connection->channel ();
$channel->queue_declare('hello', false, false, false, false);
echo ' [*] Waiting for messages. To exit press CTRL+C', "\n";
注意,我们也在这里声明队列。因为我们可能在发布之前启动消费者,我们希望在我们尝试从它那里得到消息之前确定队列是否存在。
我们将告诉服务器从队列中发送消息。我们将定义一个PHP可调用,它将接收服务器端发送的消息。
请记住,消息是服务器异步发送到客户机的。
$callback = function ($msg){ echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); }
当调用basic_consume,我们的代码会阻塞。当我们收到消息时,我们的回调函数将通过接收到的消息传递。
然后我们关闭队列,以及关闭连接:
$channel->close();
$connection->close();
完整代码
生产者(produce.php)
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; use PhpAmqpLib\Message\AMQPMessage; $connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel (); //然后声明一个队列 $channel->queue_declare ( 'hello' , false , false , false , false ); $msg = new AMQPMessage('Hello world'); $channel->basic_publish ($msg,'','hello'); echo "[×] Send 'Hello World!'\n"; //最后关闭通道和连接 $channel->close (); $connection->close ();
消费者(consumption.php)
<?php require_once __DIR__ . '/vendor/autoload.php'; use PhpAmqpLib\Connection\AMQPStreamConnection; $connection = new AMQPStreamConnection('localhost',5672,'guest','guest'); $channel = $connection->channel (); $channel->queue_declare ( 'hello' , false , false , false , false ); echo "[×] Waiting for message,to Exit press Ctrl+C \n"; $callback = function ($msg){ echo " [x] Received ", $msg->body, "\n"; }; $channel->basic_consume('hello', '', false, true, false, false, $callback); while(count($channel->callbacks)) { $channel->wait(); } $channel->close (); $connection->close ();
运行测试
生产者
php produce.php
消费者
php consumption.php
了解如何构建一个简单的工作队列,你可以阅读下一章:未完待续
发表评论