wzq 5 lat temu
rodzic
commit
017d6b6732

+ 108 - 0
app/Console/Commands/UpdateMessageRule.php

@@ -0,0 +1,108 @@
+<?php
+/**
+ * Created by PhpStorm.
+ * User: Administrator
+ * Date: 2019/6/13
+ * Time: 18:07
+ */
+
+namespace App\Console\Commands;
+
+use App\Repositories\MessageRuleRepository;
+use Illuminate\Console\Command;
+use Illuminate\Support\Facades\Log;
+use PhpAmqpLib\Connection\AMQPStreamConnection;
+
+class UpdateMessageRule extends Command
+{
+    /**
+     * The name and signature of the console command.
+     *
+     * @var string
+     */
+    protected $signature = 'message:update_status';
+
+    /**
+     * The console command description.
+     *
+     * @var string
+     */
+    protected $description = '更新规则消息状态';
+
+    protected $channel;
+    protected $connection;
+
+    protected $queue = 'update_status_message_rule';
+    /**
+     * Create a new command instance.
+     *
+     * @return void
+     */
+    public function __construct(MessageRuleRepository $messageRuleRepository)
+    {
+        parent::__construct();
+        $this->messageRuleRepository = $messageRuleRepository;
+        $this->connection = $this->getConnection();
+        $this->channel = $this->connection->channel();
+        $this->channel->exchange_declare($this->queue, 'fanout', false, true, false);
+    }
+
+    public function getConnection()
+    {
+        $conn = false;
+        if ($this->connection) {
+            return $this->connection;
+        }
+        for ($i = 0; $i < 3; $i++) {
+            $connection = $this->createConn();
+            if ($connection) {
+                $conn = $connection;
+                break;
+            }
+            Log::info("create amqp conn retry=" . $i);
+        }
+        return $conn;
+    }
+
+    public function createConn()
+    {
+        try {
+            $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
+        } catch (\Exception $exception) {
+            Log::info("AMQP connection Error" . $exception->getMessage());
+            $connection = false;
+        }
+        return $connection;
+    }
+
+    /**
+     * Execute the console command.
+     *
+     * @return mixed
+     */
+    public function handle()
+    {
+        Log::info('发送消息结果');
+        $queue_name = $this->queue;
+        $this->channel->queue_declare($queue_name, false, true, false, false);
+        $this->channel->queue_bind($queue_name, $this->queue);
+        $callback = function ($msg) {
+            Log::info($msg->body);
+            $data = \GuzzleHttp\json_decode($msg->body, true);
+            $this->line('收到数据' . $msg->body);
+            $res = $this->messageRuleRepository->updateStatus($data);
+            if($res){
+                $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
+            }
+        };
+        $this->channel->basic_consume($queue_name, '', false, false, false, false, $callback);
+
+        while (count($this->channel->callbacks)) {
+            $this->channel->wait();
+        }
+        $this->channel->close();
+        $this->connection->close();
+
+
+    }
+}

+ 2 - 0
app/Console/Kernel.php

@@ -4,6 +4,7 @@ namespace App\Console;
 
 use App\Console\Commands\AddMessageRule;
 use App\Console\Commands\Apollo;
+use App\Console\Commands\UpdateMessageRule;
 use Illuminate\Console\Scheduling\Schedule;
 use Laravel\Lumen\Console\Kernel as ConsoleKernel;
 
@@ -16,6 +17,7 @@ class Kernel extends ConsoleKernel
      */
     protected $commands = [
         AddMessageRule::class,
+        UpdateMessageRule::class,
         Apollo::class
     ];
 

+ 29 - 0
app/Repositories/MessageRuleRepository.php

@@ -243,4 +243,33 @@ class MessageRuleRepository
             ]);
         }
     }
+
+    /**
+     * 更新消息规则
+     */
+    public function updateStatus($data)
+    {
+        Log::debug('更新消息规则收到数据:'.json_encode($data));
+        $message = $this->messageRule->find($data['id']);
+        if(!$message || $message->message_status != 1){
+            Log::error('更新消息规则状态失败:'.$data['id']);
+            return false;
+        }
+
+        $message->message_status = 2;
+        $message->sent_count = $data['num'];
+
+        DB::beginTransaction();
+        try{
+            $message->save();
+
+            DB::commit();
+            return true;
+
+        }catch (QueryException $exception){
+            DB::rollBack();
+            Log::error('更新消息规则状态:'.$data['id'].$exception->getMessage());
+            return false;
+        }
+    }
 }