connection) { return $this->connection; } for ($i = 0; $i < 3; $i++) { $connection = $this->createConn(); if ($connection) { $conn = $connection; break; } Log::info("create amqp conn retry=" . $i); } return $conn; } public function createConn() { try { $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST')); } catch (\Exception $exception) { Log::info("AMQP connection Error" . $exception->getMessage()); $connection = false; } return $connection; } /** * Execute the console command. * * @return mixed */ public function handle() { //监听virus队列 $this->queue = 'virus_'; $this->connection = $this->getConnection(); $this->channel = $this->connection->channel(); $this->channel->exchange_declare($this->queue, 'fanout', false, true, false); $queue_name = $this->queue.'-community-service'; $this->channel->queue_declare($queue_name, false, true, false, false); $this->channel->queue_bind($queue_name, $this->queue); $callback = function ($msg) { $data = \GuzzleHttp\json_decode($msg->body,true); $this->line(date('Y-m-d H:i:s')."收到数据---------$msg->body-----------"); $result = $this->productCartRepository->deleteCarts($data); $this->line(date('Y-m-d H:i:s')."返回---------$result-----------"); $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); }; $this->channel->basic_qos(null, 1, null); $this->channel->basic_consume($queue_name, '', false, false, false, false, $callback); while (count($this->channel->callbacks)) { $this->channel->wait(); } $this->channel->close(); $this->connection->close(); } }