connection = $this->getConnection(); $this->channel = $this->connection->channel(); } /** * 单例实例化入口; */ public static function getInstance() { if (!self::$instance instanceof self) { self::$instance = new self(); } return self::$instance; } public function getConnection() { $conn = false; if ($this->connection) { return $this->connection; } for ($i = 0; $i < 3; $i++) { $connection = $this->createConn(); if ($connection) { $conn = $connection; break; } Log::info("create amqp conn retry=" . $i); } return $conn; } public function createConn() { try { $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST')); } catch (\Exception $exception) { Log::info("AMQP connection Error" . $exception->getMessage()); $connection = false; } return $connection; } /** * 生产消息-分布式任务模式(Work queues) * @param $queue * @param $data */ public function push($queue, $data) { try { $this->channel->queue_declare($queue, false, true, false, false); $msg = new AMQPMessage( \GuzzleHttp\json_encode($data), array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); return $this->channel->basic_publish($msg, '', $queue); } catch (\Exception $exception) { Log::error('发布消息失败,数据:'.\GuzzleHttp\json_encode($data).',错误原因:' . $exception->getMessage()); } } /** * 生产消息-发布订阅模式(Publish/Subscribe) * * @param $queue string * @param $data array */ public function publish($queue, $data) { try { $this->channel->exchange_declare($queue, 'fanout', false, true, false); $msg = new AMQPMessage( \GuzzleHttp\json_encode($data), array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) ); return $this->channel->basic_publish($msg, $queue); } catch (\Exception $exception) { Log::error('发布订阅消息失败,数据:'.\GuzzleHttp\json_encode($data).',错误原因:' . $exception->getMessage()); } } public function __destruct() { $this->channel->close(); $this->connection->close(); self::$instance = null; } }