connection = $this->getConnection(); $this->channel = $this->connection->channel(); } public function getConnection() { $conn = false; if ($this->connection) { return $this->connection; } for ($i = 0; $i < 3; $i++) { $connection = $this->createConn(); if ($connection) { $conn = $connection; break; } Log::info("create amqp conn retry=" . $i); } return $conn; } public function createConn() { try { $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST')); } catch (\Exception $exception) { Log::info("AMQP connection Error" . $exception->getMessage()); $connection = false; } return $connection; } /** * 生产消息-分布式任务模式(Work queues) * @param $queue * @param $data */ public function push($queue,$data){ $this->channel->queue_declare($queue, false, true, false, false); $msg = new AMQPMessage( \GuzzleHttp\json_encode($data), array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); return $this->channel->basic_publish($msg, '', $queue); } /** * 生产消息-发布订阅模式(Publish/Subscribe) * * @param $queue * @param $data */ public function publish($queue,$data){ $this->channel->exchange_declare($queue, 'fanout', false, true, false); $msg = new AMQPMessage( \GuzzleHttp\json_encode($data), array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); return $this->channel->basic_publish($msg, $queue); } public function __destruct() { // TODO: Implement __destruct() method. $this->channel->close(); $this->connection->close(); } }