ContentFeedDelete.php 2.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: Administrator
  5. * Date: 2019/7/17
  6. * Time: 9:33
  7. */
  8. namespace App\Console\Commands;
  9. use App\Repositories\FeedRepositories;
  10. use Illuminate\Console\Command;
  11. use Illuminate\Support\Facades\Log;
  12. use PhpAmqpLib\Connection\AMQPStreamConnection;
  13. class ContentFeedDelete extends Command
  14. {
  15. /**
  16. * The name and signature of the console command.
  17. *
  18. * @var string
  19. */
  20. protected $signature = 'content:feed_delete';
  21. /**
  22. * The console command description.
  23. *
  24. * @var string
  25. */
  26. protected $description = '取消关注删除对应feed';
  27. protected $channel;
  28. protected $connection;
  29. protected $queue = 'cancel_follow';
  30. /**
  31. * Create a new command instance.
  32. *
  33. * @return void
  34. */
  35. public function __construct(FeedRepositories $feedRepositories)
  36. {
  37. parent::__construct();
  38. $this->feedRepositories = $feedRepositories;
  39. $this->connection = $this->getConnection();
  40. $this->channel = $this->connection->channel();
  41. $this->channel->exchange_declare($this->queue, 'fanout', false, true, false);
  42. }
  43. public function getConnection()
  44. {
  45. $conn = false;
  46. if ($this->connection) {
  47. return $this->connection;
  48. }
  49. for ($i = 0; $i < 3; $i++) {
  50. $connection = $this->createConn();
  51. if ($connection) {
  52. $conn = $connection;
  53. break;
  54. }
  55. Log::info("create amqp conn retry=" . $i);
  56. }
  57. return $conn;
  58. }
  59. public function createConn()
  60. {
  61. try {
  62. $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
  63. } catch (\Exception $exception) {
  64. Log::info("AMQP connection Error" . $exception->getMessage());
  65. $connection = false;
  66. }
  67. return $connection;
  68. }
  69. /**
  70. * Execute the console command.
  71. *
  72. * @return mixed
  73. */
  74. public function handle()
  75. {
  76. Log::info('取消关注删除对应feed');
  77. $queue_name = $this->queue.'_delete_feed';
  78. $this->channel->queue_declare($queue_name, false, true, false, false);
  79. $this->channel->queue_bind($queue_name, $this->queue);
  80. $callback = function ($msg) {
  81. $this->line(date('Y-m-d H:i:S').$msg->body);
  82. $data = \GuzzleHttp\json_decode($msg->body, true);
  83. $this->line('收到数据' . $msg->body);
  84. $this->feedRepositories->contentFeedDelete($data);
  85. $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
  86. };
  87. $this->channel->basic_consume($queue_name, '', false, false, false, false, $callback);
  88. while (count($this->channel->callbacks)) {
  89. $this->channel->wait();
  90. }
  91. $this->channel->close();
  92. $this->connection->close();
  93. }
  94. }