behaviorRecordRepositories = $behaviorRecordRepositories; } public function getConnection() { $conn = false; if ($this->connection) { return $this->connection; } for ($i = 0; $i < 3; $i++) { $connection = $this->createConn(); if ($connection) { $conn = $connection; break; } Log::info("create amqp conn retry=" . $i); } return $conn; } public function createConn() { try { $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST')); } catch (\Exception $exception) { Log::info("AMQP connection Error" . $exception->getMessage()); $connection = false; } return $connection; } /** * Execute the console command. * * @return mixed */ public function handle() { $this->connection = $this->getConnection(); $this->channel = $this->connection->channel(); $this->channel->queue_declare($this->PRODUCT_STOCK_MYSQL_QUEUE, false, true, false, false); $callback = function ($msg) { $param = \GuzzleHttp\json_decode($msg->body,true); $this->line(date('Y-m-d H:i:s').'开始记录账本'); $row = $this->behaviorRecordRepositories->addRecord($param); if($row){ $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } }; $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($this->PRODUCT_STOCK_MYSQL_QUEUE, '', false, false, false, false, $callback); while (count($this->channel->callbacks)) { $this->channel->wait(); } $this->channel->close(); $this->connection->close(); } }