RabbitMqUtil.php 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: edz
  5. * Date: 2019-04-30
  6. * Time: 17:22
  7. */
  8. namespace App\Service;
  9. use PhpAmqpLib\Connection\AMQPStreamConnection;
  10. use PhpAmqpLib\Message\AMQPMessage;
  11. class RabbitMqUtil
  12. {
  13. protected $connection;
  14. protected $queue;
  15. protected $channel;
  16. public function __construct()
  17. {
  18. $this->connection = $this->getConnection();
  19. $this->channel = $this->connection->channel();
  20. }
  21. public function getConnection()
  22. {
  23. $conn = false;
  24. if ($this->connection) {
  25. return $this->connection;
  26. }
  27. for ($i = 0; $i < 3; $i++) {
  28. $connection = $this->createConn();
  29. if ($connection) {
  30. $conn = $connection;
  31. break;
  32. }
  33. Log::info("create amqp conn retry=" . $i);
  34. }
  35. return $conn;
  36. }
  37. public function createConn()
  38. {
  39. try {
  40. $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
  41. } catch (\Exception $exception) {
  42. Log::info("AMQP connection Error" . $exception->getMessage());
  43. $connection = false;
  44. }
  45. return $connection;
  46. }
  47. /**
  48. * 生产消息-分布式任务模式(Work queues)
  49. * @param $queue
  50. * @param $data
  51. */
  52. public function push($queue,$data){
  53. $this->channel->queue_declare($queue, false, true, false, false);
  54. $msg = new AMQPMessage(
  55. \GuzzleHttp\json_encode($data),
  56. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  57. );
  58. return $this->channel->basic_publish($msg, '', $queue);
  59. }
  60. /**
  61. * 生产消息-发布订阅模式(Publish/Subscribe)
  62. *
  63. * @param $queue
  64. * @param $data
  65. */
  66. public function publish($queue,$data){
  67. $this->channel->exchange_declare($queue, 'fanout', false, true, false);
  68. $msg = new AMQPMessage(
  69. \GuzzleHttp\json_encode($data),
  70. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  71. );
  72. return $this->channel->basic_publish($msg, $queue);
  73. }
  74. public function __destruct()
  75. {
  76. // TODO: Implement __destruct() method.
  77. $this->channel->close();
  78. $this->connection->close();
  79. }
  80. }