connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST')); $this->channel = $this->connection->channel(); list($this->callback_queue, ,) = $this->channel->queue_declare( "", false, true, true, false ); $this->channel->basic_consume( $this->callback_queue, '', false, true, false, false, array( $this, 'onResponse' ) ); } public function onResponse($rep) { if ($rep->get('correlation_id') == $this->corr_id) { $this->response = $rep->body; } } /** * @param $param * $array = [ ['sku_id' => 1, 'quantity' => -2], ['sku_id' => 2, 'quantity' => -1] ]; * @return null * @throws \ErrorException */ public function call($param) { Log::debug('开始调用'); $this->response = null; $this->corr_id = uniqid(); $msg = new AMQPMessage( (string)$param, array( 'correlation_id' => $this->corr_id, 'reply_to' => $this->callback_queue ) ); Log::debug('生成msg对象'.\GuzzleHttp\json_encode($msg)); $this->channel->basic_publish($msg, '', $this->queue); Log::debug('发送消息'); while (!$this->response) { $this->channel->wait(); } return $this->response; } public function __destruct() { // TODO: Implement __destruct() method. $this->channel->close(); $this->connection->close(); } }