文章目录 [+]
前言
最近在重构公司的项目,从建表到迁移数据,都是一个非常重要的,也是需要非常严谨的,但是数据量太大,而且就是字段什么的不一致,那么我们作为PHP程序员,就用php的方法解决吧,于是用到了RabbitMQ,如果没有安装的话,参考我以前文章就行,这里不多介绍。
Composer安装laravel-queue-rabbitmq
Laravel添加rabbitMq:
composer require vladimir-yuldashev/laravel-queue-rabbitmq
composer update
在config/app.php
文件中,providers
中添加:
VladimirYuldashev\LaravelQueueRabbitMQ\LaravelQueueRabbitMQServiceProvider::class,
在app/config/queue.php
文件中,connections
数组中添加:
'rabbitmq' => [ 'driver' => 'rabbitmq', 'host' => env('RABBITMQ_HOST', '127.0.0.1'), 'port' => env('RABBITMQ_PORT', 5672), 'vhost' => env('RABBITMQ_VHOST', '/'), 'login' => env('RABBITMQ_LOGIN', 'guest'), 'password' => env('RABBITMQ_PASSWORD', 'guest'), 'queue' => env('RABBITMQ_QUEUE'), // name of the default queue, 'exchange_declare' => env('RABBITMQ_EXCHANGE_DECLARE', true), // create the exchange if not exists 'queue_declare_bind' => env('RABBITMQ_QUEUE_DECLARE_BIND', true), // create the queue if not exists and bind to the exchange 'queue_params' => [ 'passive' => env('RABBITMQ_QUEUE_PASSIVE', false), 'durable' => env('RABBITMQ_QUEUE_DURABLE', true), 'exclusive' => env('RABBITMQ_QUEUE_EXCLUSIVE', false), 'auto_delete' => env('RABBITMQ_QUEUE_AUTODELETE', false), ], 'exchange_params' => [ 'name' => env('RABBITMQ_EXCHANGE_NAME', null), 'type' => env('RABBITMQ_EXCHANGE_TYPE', 'direct'), // more info at http://www.rabbitmq.com/tutorials/amqp-concepts.html 'passive' => env('RABBITMQ_EXCHANGE_PASSIVE', false), 'durable' => env('RABBITMQ_EXCHANGE_DURABLE', true), // the exchange will survive server restarts 'auto_delete' => env('RABBITMQ_EXCHANGE_AUTODELETE', false), ], ],
在.env
文件中,PS:如果有设置了QUEUE_DRIVER
的话,就改为rabbitmq:
QUEUE_DRIVER=rabbitmq RABBITMQ_HOST=127.0.0.1 RABBITMQ_PORT=5672 RABBITMQ_VHOST=/ RABBITMQ_LOGIN=guest RABBITMQ_PASSWORD=guest RABBITMQ_QUEUE=queue_name
测试rabbitmq在laravel中的使用
消费者(接收方),用来处理任务
创建一个任务类:
php artisan make:job Queue
执行之后生成一个Queue.php
,因为只要针对公司业务,就不写所谓的测试代码了,本代码是将接收的数据,然后对应表的字段填充进去,进行添加 :
<?php namespace App\Jobs; use App\Models\Article; use Illuminate\Bus\Queueable; use Illuminate\Queue\SerializesModels; use Illuminate\Queue\InteractsWithQueue; use Illuminate\Contracts\Queue\ShouldQueue; use Illuminate\Foundation\Bus\Dispatchable; class Queue implements ShouldQueue { use Dispatchable, InteractsWithQueue, Queueable, SerializesModels; protected $data; /** * Create a new job instance. * * @return void */ public function __construct($data) { $this->data = $data; } /** * Execute the job. * * @return void */ public function handle() { try{ $article = new Article(); $article->id = $this->data[ 'NewsId' ] ? : ''; $article->title = $this->data[ 'NewsTitle' ] ? : ''; $article->author = $this->data[ 'NewsAuthor' ] ? : ''; $article->content = $this->data[ 'NewsMatter' ] ? : ''; $article->intro = mb_substr ( $article->content , 0 , 500 ); $article->read = $this->data[ 'NewsNum' ] ? : 0; $article->tag = $this->data[ 'NewsTag' ] ? : ''; $article->category_id = $this->data['NewsClass'] ? : 0; $article->project_id = $this->data[ 'NewsProject' ] ? : 0; if($this->data[ 'NewsState' ] == 0){//原来的0是隐藏,现在的0是显示,所以换一下 $article->status = "1"; }else if($this->data[ 'NewsState' ] == 1){ $article->status = "0"; } $article->sort = $this->data[ 'NewsSort' ] ? : 0; $article->top = $this->data[ 'NewsTop' ] ? : 0; $article->video = "{}"; $result = $article->save (); echo json_encode ([ 'code' => 200, 'msg' => $result ]); }catch (\Exception $exception){ echo json_encode ([ 'code' => 0, 'msg' => '错误' ]); \Illuminate\Support\Facades\Log::error ( $this->data['NewsId'] . '------->' . $exception->getMessage () ); } } }
生成者(消息发送方)
创建一个Controller(如以下命令),这里为了迎合公司项目开发规范,就不使用命令生成了:
php artisan make:controller FactoryController
代码如下,本代码是将一个表中的数据读取出来,然后发送消息:
<?php namespace App\Api\Controllers; use App\Api\DingoController; class FactoryController extends DingoController { /** * 文章填充数据队列生产者,消息发送方 */ public function article () { $newsId = CostaNews::pluck ( 'NewsId' ); try { foreach ( $newsId as $k => $v ) { $data = CostaNews::find ( $v ); if ( $data ) { $data = $data->toArray (); $this->dispatch ( new Queue( $data ) );//分配队列 } } echo 'ok'; } catch ( \Exception $exception ) { throw new \Exception ( $exception->getMessage () ); } } }
添加路由
在routes/web.php
中修改路由:
Route::get ( 'factory/article' , 'FactoryController@article' );
执行之后便会产生队列:
执行队列:
php artisan queue:work rabbitmq
就这样,用队列去迁移数据完成。
stream_socket_client(): unable to connect to tcp://127.0.0.1:5672 (Connecti on refused)
amqp拓展和sockets拓展都安装了的....
发表评论