123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111 |
- <?php
- /**
- * Created by PhpStorm.
- * User: edz
- * Date: 2019-04-30
- * Time: 17:22
- */
- namespace App\Service;
- use Illuminate\Support\Facades\Log;
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use PhpAmqpLib\Message\AMQPMessage;
- class RabbitMqUtil
- {
- private static $instance = null;
- protected $connection = null;
- protected $channel = null;
- public function __construct()
- {
- $this->connection = $this->getConnection();
- $this->channel = $this->connection->channel();
- }
- /**
- * 单例实例化入口;
- */
- public static function getInstance()
- {
- if (!self::$instance instanceof self) {
- self::$instance = new self();
- }
- return self::$instance;
- }
- public function getConnection()
- {
- $conn = false;
- if ($this->connection) {
- return $this->connection;
- }
- for ($i = 0; $i < 3; $i++) {
- $connection = $this->createConn();
- if ($connection) {
- $conn = $connection;
- break;
- }
- Log::info("create amqp conn retry=" . $i);
- }
- return $conn;
- }
- public function createConn()
- {
- try {
- $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
- } catch (\Exception $exception) {
- Log::info("AMQP connection Error" . $exception->getMessage());
- $connection = false;
- }
- return $connection;
- }
- /**
- * 生产消息-分布式任务模式(Work queues)
- * @param $queue
- * @param $data
- */
- public function push($queue, $data)
- {
- try {
- $this->channel->queue_declare($queue, false, true, false, false);
- $msg = new AMQPMessage(
- \GuzzleHttp\json_encode($data),
- array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
- );
- return $this->channel->basic_publish($msg, '', $queue);
- } catch (\Exception $exception) {
- Log::error('发布消息失败,数据:'.\GuzzleHttp\json_encode($data).',错误原因:' . $exception->getMessage());
- }
- }
- /**
- * 生产消息-发布订阅模式(Publish/Subscribe)
- *
- * @param $queue string
- * @param $data array
- */
- public function publish($queue, $data)
- {
- try {
- $this->channel->exchange_declare($queue, 'fanout', false, true, false);
- $msg = new AMQPMessage(
- \GuzzleHttp\json_encode($data),
- array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
- );
- return $this->channel->basic_publish($msg, $queue);
- } catch (\Exception $exception) {
- Log::error('发布订阅消息失败,数据:'.\GuzzleHttp\json_encode($data).',错误原因:' . $exception->getMessage());
- }
- }
- public function __destruct()
- {
- $this->channel->close();
- $this->connection->close();
- self::$instance = null;
- }
- }
|