noticeRuleRepository = $noticeRuleRepository; $this->connection = $this->getConnection(); $this->channel = $this->connection->channel(); $this->channel->exchange_declare($this->queue, 'fanout', false, true, false); } public function getConnection() { $conn = false; if ($this->connection) { return $this->connection; } for ($i = 0; $i < 3; $i++) { $connection = $this->createConn(); if ($connection) { $conn = $connection; break; } Log::info("create amqp conn retry=" . $i); } return $conn; } public function createConn() { try { $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST')); } catch (\Exception $exception) { Log::info("AMQP connection Error" . $exception->getMessage()); $connection = false; } return $connection; } /** * Execute the console command. * * @return mixed */ public function handle() { Log::info('发送消息结果'); $queue_name = $this->queue; $this->channel->queue_declare($queue_name, false, true, false, false); $this->channel->queue_bind($queue_name, $this->queue); $callback = function ($msg) { Log::info($msg->body); $data = \GuzzleHttp\json_decode($msg->body, true); $this->line('收到数据' . $msg->body); $res = $this->noticeRuleRepository->updateStatus($data); if($res){ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } }; $this->channel->basic_consume($queue_name, '', false, false, false, false, $callback); while (count($this->channel->callbacks)) { $this->channel->wait(); } $this->channel->close(); $this->connection->close(); } }