RabbitMqUtil.php 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: edz
  5. * Date: 2019-04-30
  6. * Time: 17:22
  7. */
  8. namespace App\Service;
  9. use Illuminate\Support\Facades\Log;
  10. use PhpAmqpLib\Connection\AMQPStreamConnection;
  11. use PhpAmqpLib\Message\AMQPMessage;
  12. class RabbitMqUtil
  13. {
  14. private static $instance = null;
  15. protected $connection = null;
  16. protected $channel = null;
  17. public function __construct()
  18. {
  19. $this->connection = $this->getConnection();
  20. $this->channel = $this->connection->channel();
  21. }
  22. /**
  23. * 单例实例化入口;
  24. */
  25. public static function getInstance()
  26. {
  27. if (!self::$instance instanceof self) {
  28. self::$instance = new self();
  29. }
  30. return self::$instance;
  31. }
  32. public function getConnection()
  33. {
  34. $conn = false;
  35. if ($this->connection) {
  36. return $this->connection;
  37. }
  38. for ($i = 0; $i < 3; $i++) {
  39. $connection = $this->createConn();
  40. if ($connection) {
  41. $conn = $connection;
  42. break;
  43. }
  44. Log::info("create amqp conn retry=" . $i);
  45. }
  46. return $conn;
  47. }
  48. public function createConn()
  49. {
  50. try {
  51. $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
  52. } catch (\Exception $exception) {
  53. Log::info("AMQP connection Error" . $exception->getMessage());
  54. $connection = false;
  55. }
  56. return $connection;
  57. }
  58. /**
  59. * 生产消息-分布式任务模式(Work queues)
  60. * @param $queue
  61. * @param $data
  62. */
  63. public function push($queue, $data)
  64. {
  65. try {
  66. $this->channel->queue_declare($queue, false, true, false, false);
  67. $msg = new AMQPMessage(
  68. \GuzzleHttp\json_encode($data),
  69. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  70. );
  71. return $this->channel->basic_publish($msg, '', $queue);
  72. } catch (\Exception $exception) {
  73. Log::error('发布消息失败,数据:'.\GuzzleHttp\json_encode($data).',错误原因:' . $exception->getMessage());
  74. }
  75. }
  76. /**
  77. * 生产消息-发布订阅模式(Publish/Subscribe)
  78. *
  79. * @param $queue string
  80. * @param $data array
  81. */
  82. public function publish($queue, $data)
  83. {
  84. try {
  85. $this->channel->exchange_declare($queue, 'fanout', false, true, false);
  86. $msg = new AMQPMessage(
  87. \GuzzleHttp\json_encode($data),
  88. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  89. );
  90. return $this->channel->basic_publish($msg, $queue);
  91. } catch (\Exception $exception) {
  92. Log::error('发布订阅消息失败,数据:'.\GuzzleHttp\json_encode($data).',错误原因:' . $exception->getMessage());
  93. }
  94. }
  95. public function __destruct()
  96. {
  97. $this->channel->close();
  98. $this->connection->close();
  99. self::$instance = null;
  100. }
  101. }