RabbitMqUtil.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: edz
  5. * Date: 2019-04-30
  6. * Time: 17:22
  7. */
  8. namespace App\Service;
  9. use Illuminate\Support\Facades\Log;
  10. use PhpAmqpLib\Connection\AMQPStreamConnection;
  11. use PhpAmqpLib\Message\AMQPMessage;
  12. class RabbitMqUtil
  13. {
  14. protected $connection;
  15. protected $queue;
  16. protected $channel;
  17. public function __construct()
  18. {
  19. $this->connection = $this->getConnection();
  20. $this->channel = $this->connection->channel();
  21. }
  22. public function getConnection()
  23. {
  24. $conn = false;
  25. if ($this->connection) {
  26. return $this->connection;
  27. }
  28. for ($i = 0; $i < 3; $i++) {
  29. $connection = $this->createConn();
  30. if ($connection) {
  31. $conn = $connection;
  32. break;
  33. }
  34. Log::info("create amqp conn retry=" . $i);
  35. }
  36. return $conn;
  37. }
  38. public function createConn()
  39. {
  40. try {
  41. $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
  42. } catch (\Exception $exception) {
  43. Log::info("AMQP connection Error" . $exception->getMessage());
  44. $connection = false;
  45. }
  46. return $connection;
  47. }
  48. /**
  49. * 生产消息-分布式任务模式(Work queues)
  50. * @param $queue
  51. * @param $data
  52. */
  53. public function push($queue, $data)
  54. {
  55. try {
  56. $this->channel->queue_declare($queue, false, true, false, false);
  57. $msg = new AMQPMessage(
  58. \GuzzleHttp\json_encode($data),
  59. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  60. );
  61. return $this->channel->basic_publish($msg, '', $queue);
  62. } catch (\Exception $exception) {
  63. Log::error('发布消息失败,数据:'.\GuzzleHttp\json_encode($data).',错误原因:' . $exception->getMessage());
  64. }
  65. }
  66. /**
  67. * 生产消息-发布订阅模式(Publish/Subscribe)
  68. *
  69. * @param $queue string
  70. * @param $data array
  71. */
  72. public function publish($queue, $data)
  73. {
  74. try {
  75. $this->channel->exchange_declare($queue, 'fanout', false, true, false);
  76. $msg = new AMQPMessage(
  77. \GuzzleHttp\json_encode($data),
  78. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  79. );
  80. return $this->channel->basic_publish($msg, $queue);
  81. } catch (\Exception $exception) {
  82. Log::error('发布订阅消息失败,数据:'.\GuzzleHttp\json_encode($data).',错误原因:' . $exception->getMessage());
  83. }
  84. }
  85. public function __destruct()
  86. {
  87. // TODO: Implement __destruct() method.
  88. $this->channel->close();
  89. $this->connection->close();
  90. }
  91. }