ProductStockRpcClient.php 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. <?php
  2. /**
  3. * Created by PhpStorm.
  4. * User: edz
  5. * Date: 2019-05-07
  6. * Time: 11:43
  7. */
  8. namespace App\RpcClient;
  9. use Illuminate\Support\Facades\Log;
  10. use PhpAmqpLib\Connection\AMQPStreamConnection;
  11. use PhpAmqpLib\Message\AMQPMessage;
  12. class ProductStockRpcClient
  13. {
  14. private $connection;
  15. private $channel;
  16. private $callback_queue;
  17. private $response;
  18. private $corr_id;
  19. private $queue = 'rpc_product_stock_queue';
  20. public function __construct()
  21. {
  22. $this->connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
  23. $this->channel = $this->connection->channel();
  24. list($this->callback_queue, ,) = $this->channel->queue_declare(
  25. "",
  26. false,
  27. true,
  28. true,
  29. false
  30. );
  31. $this->channel->basic_consume(
  32. $this->callback_queue,
  33. '',
  34. false,
  35. true,
  36. false,
  37. false,
  38. array(
  39. $this,
  40. 'onResponse'
  41. )
  42. );
  43. }
  44. public function onResponse($rep)
  45. {
  46. if ($rep->get('correlation_id') == $this->corr_id) {
  47. $this->response = $rep->body;
  48. }
  49. }
  50. /**
  51. * @param $param
  52. * $array = [
  53. ['sku_id' => 1, 'quantity' => -2],
  54. ['sku_id' => 2, 'quantity' => -1]
  55. ];
  56. * @return null
  57. * @throws \ErrorException
  58. */
  59. public function call($param)
  60. {
  61. Log::debug('开始调用');
  62. $this->response = null;
  63. $this->corr_id = uniqid();
  64. $msg = new AMQPMessage(
  65. (string)$param,
  66. array(
  67. 'correlation_id' => $this->corr_id,
  68. 'reply_to' => $this->callback_queue
  69. )
  70. );
  71. Log::debug('生成msg对象'.\GuzzleHttp\json_encode($msg));
  72. $this->channel->basic_publish($msg, '', $this->queue);
  73. Log::debug('发送消息');
  74. while (!$this->response) {
  75. $this->channel->wait();
  76. }
  77. return $this->response;
  78. }
  79. public function __destruct()
  80. {
  81. // TODO: Implement __destruct() method.
  82. $this->channel->close();
  83. $this->connection->close();
  84. }
  85. }