connection = $this->getConnection(); $this->channel = $this->connection->channel(); $this->channel->exchange_declare($this->queue, 'fanout', false, true, false); } public function getConnection() { $conn = false; if ($this->connection) { return $this->connection; } for ($i = 0; $i < 3; $i++) { $connection = $this->createConn(); if ($connection) { $conn = $connection; break; } Log::info("create amqp conn retry=" . $i); } return $conn; } public function createConn() { try { $connection = new AMQPStreamConnection(env('MQ_HOST'), env('MQ_PORT'), env('MQ_USERNAME'), env('MQ_PWD'), env('MQ_VHOST')); } catch (\Exception $exception) { Log::info("AMQP connection Error" . $exception->getMessage()); $connection = false; } return $connection; } /** * Execute the console command. * * @return mixed */ public function handle() { $queue_name = $this->queue; $this->channel->queue_declare($queue_name, false, true, false, false); $this->channel->queue_bind($queue_name, $this->queue); $callback = function ($msg) { $this->line(date('Y-m-d H:i:S').$msg->body); $data = \GuzzleHttp\json_decode($msg->body, true); $this->line('接收一条消息收到数据' . $msg->body); $res = $this->send($data); if ($res) { $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); } }; $this->channel->basic_consume($queue_name, '', false, false, false, false, $callback); while (count($this->channel->callbacks)) { $this->channel->wait(); } $this->channel->close(); $this->connection->close(); } private function send($data) { // 初始化virus域名 $client = new Client([ 'timeout' => 3 ]); try { // 签名设置 $signKey = [ 'app_id' => config('customer.VIRUS_APP_ID'), 'behavior_id' => $data['behavior_id'], 'target_id' => (string) $data['target_id'], 'action_id' => (string) $data['action_id'], 'behavior_flag' => $data['behavior_flag'] ]; ksort($signKey); $signKey = urldecode(http_build_query($signKey)); $sign = md5(config('customer.VIRUS_APP_SECRET') . $signKey); Log::debug('签名:'.$sign); $json = []; if($data['behavior_flag'] == 'publish'){ $json = [ 'sign' => $sign, 'app_id' => config('customer.VIRUS_APP_ID'), 'behavior_id' => $data['behavior_id'], 'behavior_flag' => $data['behavior_flag'], 'post_id' => $data['post_id'], 'post_type' => $data['post_type'], 'post_desc' => $data['post_desc'], 'post_cover' => $data['post_cover'], 'target_id' => (string) $data['target_id'], 'action_id' => (string) $data['action_id'], 'extra' => [], ]; }elseif($data['behavior_flag'] == 'comment'){ $json = [ 'sign' => $sign, 'app_id' => config('customer.VIRUS_APP_ID'), 'behavior_id' => $data['behavior_id'], 'behavior_flag' => $data['behavior_flag'], 'post_id' => $data['post_id'], 'post_type' => $data['post_type'], 'post_author_uid' => $data['post_author_uid'], 'post_desc' => $data['post_desc'], 'post_cover' => $data['post_cover'], 'comment_id' => $data['comment_id'], 'comment_content' => $data['comment_content'], 'parent_comment_id' => $data['parent_comment_id'], 'parent_comment_content' => $data['parent_comment_content'], 'parent_comment_uid' => $data['parent_comment_uid'], 'parent_comment_time' => $data['parent_comment_time'], 'reply_uid' => $data['reply_uid'], 'reply_username' => $data['reply_username'], 'target_id' => (string) $data['target_id'], 'action_id' => (string) $data['action_id'], 'extra' => [], ]; }elseif(in_array($data['behavior_flag'], ['read', 'like', 'collect', 'forward'])){ $json = [ 'sign' => $sign, 'app_id' => config('customer.VIRUS_APP_ID'), 'behavior_id' => $data['behavior_id'], 'behavior_flag' => $data['behavior_flag'], 'behavior_value' => $data['behavior_value'], 'post_id' => $data['post_id'], 'post_type' => $data['post_type'], 'post_author_uid' => $data['post_author_uid'], 'post_desc' => $data['post_desc'], 'post_cover' => $data['post_cover'], 'target_id' => (string) $data['target_id'], 'action_id' => (string) $data['action_id'], 'extra' => [], ]; }else{ Log::debug('行为类型错误'.json_encode($data)); return true; } $response = $client->request('POST', config('customer.VIRUS_URL').'/v2/record/add', [ 'headers' => [ 'Content-Type' => 'application/json' ], 'json' => $json ]); Log::debug('virus添加数据成功:'.$response->getBody()->getContents()); return true; } catch (RequestException $e) { Log::debug('virus添加数据失败:'.$e->getMessage()); } } }