RabbitMqUtil.php 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: edz
  5. * Date: 2019-04-30
  6. * Time: 17:22
  7. */
  8. namespace App\Service;
  9. use Illuminate\Support\Facades\Log;
  10. use PhpAmqpLib\Connection\AMQPStreamConnection;
  11. use PhpAmqpLib\Message\AMQPMessage;
  12. class RabbitMqUtil
  13. {
  14. protected $connection;
  15. protected $queue;
  16. protected $channel;
  17. public function __construct()
  18. {
  19. $this->connection = $this->getConnection();
  20. $this->channel = $this->connection->channel();
  21. }
  22. public function getConnection()
  23. {
  24. $conn = false;
  25. if ($this->connection) {
  26. return $this->connection;
  27. }
  28. for ($i = 0; $i < 3; $i++) {
  29. $connection = $this->createConn();
  30. if ($connection) {
  31. $conn = $connection;
  32. break;
  33. }
  34. Log::info("create amqp conn retry=" . $i);
  35. }
  36. return $conn;
  37. }
  38. public function createConn()
  39. {
  40. try {
  41. $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
  42. } catch (\Exception $exception) {
  43. Log::info("AMQP connection Error" . $exception->getMessage());
  44. $connection = false;
  45. }
  46. return $connection;
  47. }
  48. /**
  49. * 生产消息-分布式任务模式(Work queues)
  50. * @param $queue
  51. * @param $data
  52. */
  53. public function push($queue,$data){
  54. $this->channel->queue_declare($queue, false, true, false, false);
  55. $msg = new AMQPMessage(
  56. \GuzzleHttp\json_encode($data),
  57. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  58. );
  59. return $this->channel->basic_publish($msg, '', $queue);
  60. }
  61. /**
  62. * 生产消息-发布订阅模式(Publish/Subscribe)
  63. *
  64. * @param $queue
  65. * @param $data
  66. */
  67. public function publish($queue,$data){
  68. $this->channel->exchange_declare($queue, 'fanout', false, true, false);
  69. $msg = new AMQPMessage(
  70. \GuzzleHttp\json_encode($data),
  71. array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT)
  72. );
  73. return $this->channel->basic_publish($msg, $queue);
  74. }
  75. public function __destruct()
  76. {
  77. // TODO: Implement __destruct() method.
  78. $this->channel->close();
  79. $this->connection->close();
  80. }
  81. }