浏览代码

Merge branch 'v0.2' into develop

wzq 5 年之前
父节点
当前提交
4df323ba13
共有 4 个文件被更改,包括 175 次插入1 次删除
  1. 80 0
      app/Console/Commands/AddMessageRule.php
  2. 7 1
      app/Console/Kernel.php
  3. 87 0
      app/Service/RabbitMqUtil.php
  4. 1 0
      composer.json

+ 80 - 0
app/Console/Commands/AddMessageRule.php

@@ -0,0 +1,80 @@
+<?php
+/**
+ * Created by PhpStorm.
+ * User: Administrator
+ * Date: 2019/6/13
+ * Time: 9:13
+ */
+
+namespace App\Console\Commands;
+
+use App\Models\MessageRule;
+use App\Service\RabbitMqUtil;
+use Illuminate\Console\Command;
+use Illuminate\Database\QueryException;
+use Illuminate\Support\Carbon;
+use Illuminate\Support\Facades\DB;
+use Illuminate\Support\Facades\Log;
+use PhpAmqpLib\Connection\AMQPStreamConnection;
+
+class AddMessageRule extends Command
+{
+    /**
+     * The name and signature of the console command.
+     *
+     * @var string
+     */
+    protected $signature = 'message:add';
+
+    /**
+     * The console command description.
+     *
+     * @var string
+     */
+    protected $description = '添加发送消息';
+
+    /**
+     * Create a new command instance.
+     *
+     * @return void
+     */
+    public function __construct(MessageRule $messageRule, RabbitMqUtil $rabbitMqUtil)
+    {
+        parent::__construct();
+        $this->messageRule = $messageRule;
+        $this->rabbitMqUtil = $rabbitMqUtil;
+    }
+
+    /**
+     * Execute the console command.
+     *
+     * @return mixed
+     */
+    public function handle()
+    {
+        $this->line("开始添加发送消息");
+
+        $this->messageRule
+            ->where('message_status', 0)
+            ->where('send_time', '<', Carbon::now()->toDateTimeString())
+            ->whereNotNUll('send_time')
+            ->chunk(100, function($messages){
+                foreach($messages as $message){
+                    DB::beginTransaction();
+                    try{
+                        $this->rabbitMqUtil->push('add_message_rule', $message);
+                        $message->message_status = 1;
+                        $message->save();
+                        DB::commit();
+                        Log::info('添加发送消息成功:'.$message->id);
+                    }catch (QueryException $exception){
+                        DB::rollBack();
+                        Log::error('添加发送消息:'.$exception->getMessage());
+                    }
+                }
+            });
+
+        $this->line("添加发送消息结束");
+
+    }
+}

+ 7 - 1
app/Console/Kernel.php

@@ -2,6 +2,7 @@
 
 namespace App\Console;
 
+use App\Console\Commands\AddMessageRule;
 use App\Console\Commands\Apollo;
 use Illuminate\Console\Scheduling\Schedule;
 use Laravel\Lumen\Console\Kernel as ConsoleKernel;
@@ -14,6 +15,7 @@ class Kernel extends ConsoleKernel
      * @var array
      */
     protected $commands = [
+        AddMessageRule::class,
         Apollo::class
     ];
 
@@ -25,6 +27,10 @@ class Kernel extends ConsoleKernel
      */
     protected function schedule(Schedule $schedule)
     {
-        //
+        $path = storage_path('logs/'.date('Y-m-d').'-schedule.log');
+
+        $schedule->command('message:add')
+            ->everyMinute()
+            ->withoutOverlapping()->appendOutputTo($path);
     }
 }

+ 87 - 0
app/Service/RabbitMqUtil.php

@@ -0,0 +1,87 @@
+<?php
+/**
+ * Created by PhpStorm.
+ * User: edz
+ * Date: 2019-04-30
+ * Time: 17:22
+ */
+namespace App\Service;
+
+use PhpAmqpLib\Connection\AMQPStreamConnection;
+use PhpAmqpLib\Message\AMQPMessage;
+
+class RabbitMqUtil
+{
+    protected $connection;
+    protected $queue;
+    protected $channel;
+    public function __construct()
+    {
+        $this->connection = $this->getConnection();
+        $this->channel = $this->connection->channel();
+    }
+
+    public function getConnection()
+    {
+        $conn = false;
+        if ($this->connection) {
+            return $this->connection;
+        }
+        for ($i = 0; $i < 3; $i++) {
+            $connection = $this->createConn();
+            if ($connection) {
+                $conn = $connection;
+                break;
+            }
+            Log::info("create amqp conn retry=" . $i);
+        }
+        return $conn;
+    }
+
+    public function createConn()
+    {
+        try {
+            $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
+        } catch (\Exception $exception) {
+            Log::info("AMQP connection Error" . $exception->getMessage());
+            $connection = false;
+        }
+        return $connection;
+    }
+
+    /**
+     * 生产消息-分布式任务模式(Work queues)
+     * @param $queue
+     * @param $data
+     */
+    public function push($queue,$data){
+        $this->channel->queue_declare($queue, false, true, false, false);
+        $msg = new AMQPMessage(
+            \GuzzleHttp\json_encode($data),
+            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
+        );
+        return $this->channel->basic_publish($msg, '', $queue);
+    }
+
+    /**
+     * 生产消息-发布订阅模式(Publish/Subscribe)
+     *
+     * @param $queue
+     * @param $data
+     */
+    public function publish($queue,$data){
+        $this->channel->exchange_declare($queue, 'fanout', false, true, false);
+        $msg = new AMQPMessage(
+            \GuzzleHttp\json_encode($data),
+            array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
+        );
+        return $this->channel->basic_publish($msg, $queue);
+    }
+
+    public function __destruct()
+    {
+        // TODO: Implement __destruct() method.
+        $this->channel->close();
+        $this->connection->close();
+    }
+}

+ 1 - 0
composer.json

@@ -16,6 +16,7 @@
         "tymon/jwt-auth": "1.0.0-rc.4.1",
         "multilinguals/apollo-client": "^0.1.2",
         "vlucas/phpdotenv": "^3.3",
+        "php-amqplib/php-amqplib": "^2.9",
         "guzzlehttp/guzzle": "^6.3"
     },
     "require-dev": {