VirusRule.php 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Administrator
  5. * Date: 2019-05-13
  6. * Time: 10:52
  7. */
  8. namespace App\Console\Commands;
  9. use Illuminate\Console\Command;
  10. use Illuminate\Support\Facades\Log;
  11. use PhpAmqpLib\Connection\AMQPStreamConnection;
  12. class VirusRule extends Command
  13. {
  14. /**
  15. * The name and signature of the console command.
  16. *
  17. * @var string
  18. */
  19. protected $signature = 'virus:rule';
  20. /**
  21. * The console command description.
  22. *
  23. * @var string
  24. */
  25. protected $description = '订阅VIRUS事件通知';
  26. protected $connection;
  27. protected $channel;
  28. /**
  29. * Create a new command instance.
  30. *
  31. * @return void
  32. */
  33. public function __construct()
  34. {
  35. parent::__construct();
  36. }
  37. public function getConnection()
  38. {
  39. $conn = false;
  40. if ($this->connection) {
  41. return $this->connection;
  42. }
  43. for ($i = 0; $i < 3; $i++) {
  44. $connection = $this->createConn();
  45. if ($connection) {
  46. $conn = $connection;
  47. break;
  48. }
  49. Log::info("create amqp conn retry=" . $i);
  50. }
  51. return $conn;
  52. }
  53. public function createConn()
  54. {
  55. try {
  56. $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
  57. } catch (\Exception $exception) {
  58. Log::info("AMQP connection Error" . $exception->getMessage());
  59. $connection = false;
  60. }
  61. return $connection;
  62. }
  63. /**
  64. * Execute the console command.
  65. *
  66. * @return mixed
  67. */
  68. public function handle()
  69. {
  70. //监听virus队列
  71. $this->queue = 'virus_';
  72. $this->connection = $this->getConnection();
  73. $this->channel = $this->connection->channel();
  74. $this->channel->exchange_declare($this->queue, 'fanout', false, true, false);
  75. $queue_name = $this->queue.'-community-service';
  76. $this->channel->queue_declare($queue_name, false, true, false, false);
  77. $this->channel->queue_bind($queue_name, $this->queue);
  78. $callback = function ($msg) {
  79. $data = \GuzzleHttp\json_decode($msg->body,true);
  80. $this->line(date('Y-m-d H:i:s')."收到数据---------$msg->body-----------");
  81. $result = $this->productCartRepository->deleteCarts($data);
  82. $this->line(date('Y-m-d H:i:s')."返回---------$result-----------");
  83. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  84. };
  85. $this->channel->basic_qos(null, 1, null);
  86. $this->channel->basic_consume($queue_name, '', false, false, false, false, $callback);
  87. while (count($this->channel->callbacks)) {
  88. $this->channel->wait();
  89. }
  90. $this->channel->close();
  91. $this->connection->close();
  92. }
  93. }