123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293 |
- <?php
- /**
- * Created by PhpStorm.
- * User: edz
- * Date: 2019-05-07
- * Time: 11:43
- */
- namespace App\RpcClient;
- use Illuminate\Support\Facades\Log;
- use PhpAmqpLib\Connection\AMQPStreamConnection;
- use PhpAmqpLib\Message\AMQPMessage;
- class ProductStockRpcClient
- {
- private $connection;
- private $channel;
- private $callback_queue;
- private $response;
- private $corr_id;
- private $queue = 'rpc_product_stock_queue';
- public function __construct()
- {
- $this->connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST'));
- $this->channel = $this->connection->channel();
- list($this->callback_queue, ,) = $this->channel->queue_declare(
- "",
- false,
- true,
- true,
- false
- );
- $this->channel->basic_consume(
- $this->callback_queue,
- '',
- false,
- true,
- false,
- false,
- array(
- $this,
- 'onResponse'
- )
- );
- }
- public function onResponse($rep)
- {
- if ($rep->get('correlation_id') == $this->corr_id) {
- $this->response = $rep->body;
- }
- }
- /**
- * @param $param
- * $array = [
- ['sku_id' => 1, 'quantity' => -2],
- ['sku_id' => 2, 'quantity' => -1]
- ];
- * @return null
- * @throws \ErrorException
- */
- public function call($param)
- {
- Log::debug('开始调用');
- $this->response = null;
- $this->corr_id = uniqid();
- $msg = new AMQPMessage(
- (string)$param,
- array(
- 'correlation_id' => $this->corr_id,
- 'reply_to' => $this->callback_queue
- )
- );
- Log::debug('生成msg对象'.\GuzzleHttp\json_encode($msg));
- $this->channel->basic_publish($msg, '', $this->queue);
- Log::debug('发送消息');
- while (!$this->response) {
- $this->channel->wait();
- }
- return $this->response;
- }
- public function __destruct()
- {
- // TODO: Implement __destruct() method.
- $this->channel->close();
- $this->connection->close();
- }
- }
|