ContentFeedCreate.php 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. <?php
  2. namespace App\Console\Commands;
  3. use App\Repositories\FeedRepositories;
  4. use Illuminate\Console\Command;
  5. use Illuminate\Support\Facades\Log;
  6. use PhpAmqpLib\Connection\AMQPStreamConnection;
  7. class ContentFeedCreate extends Command
  8. {
  9. /**
  10. * The name and signature of the console command.
  11. *
  12. * @var string
  13. */
  14. protected $signature = 'content:feed';
  15. /**
  16. * The console command description.
  17. *
  18. * @var string
  19. */
  20. protected $description = 'virus通知异步处理-feed生成,帖子数据统计';
  21. protected $connection;
  22. protected $channel;
  23. protected $queue = 'content_feed_queue';
  24. /**
  25. * Create a new command instance.
  26. *
  27. * @return void
  28. */
  29. public function __construct(FeedRepositories $feedRepositories)
  30. {
  31. parent::__construct();
  32. $this->feedRepositories = $feedRepositories;
  33. }
  34. public function getConnection()
  35. {
  36. $conn = false;
  37. if ($this->connection) {
  38. return $this->connection;
  39. }
  40. for ($i = 0; $i < 3; $i++) {
  41. $connection = $this->createConn();
  42. if ($connection) {
  43. $conn = $connection;
  44. break;
  45. }
  46. Log::info("create amqp conn retry=" . $i);
  47. }
  48. return $conn;
  49. }
  50. public function createConn()
  51. {
  52. try {
  53. $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
  54. } catch (\Exception $exception) {
  55. Log::info("AMQP connection Error" . $exception->getMessage());
  56. $connection = false;
  57. }
  58. return $connection;
  59. }
  60. /**
  61. * Execute the console command.
  62. *
  63. * @return mixed
  64. */
  65. public function handle()
  66. {
  67. $this->connection = $this->getConnection();
  68. $this->channel = $this->connection->channel();
  69. $this->channel->queue_declare($this->queue, false, true, false, false);
  70. $callback = function ($msg) {
  71. $param = \GuzzleHttp\json_decode($msg->body,true);
  72. $this->line(date('Y-m-d H:i:s').'收到消息:'.$msg->body);
  73. $this->feedRepositories->contentCreate($param);
  74. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  75. };
  76. $this->channel->basic_qos(null, 1, null);
  77. $this->channel->basic_consume($this->queue, '', false, false, false, false, $callback);
  78. while (count($this->channel->callbacks)) {
  79. $this->channel->wait();
  80. }
  81. $this->channel->close();
  82. $this->connection->close();
  83. }
  84. }