server = new Server(self::HOST, self::PORT); // 设置响应html文件模式以及定义路径 $this->server->set([ /*'worker_num' => 2,*/ 'task_worker_num' => 2, 'dispatch_mode' => 5, /*'enable_coroutine' => true, 'task_enable_coroutine' => true,*/ // Task 启用协程,前提是enable_coroutine必须为true 'enable_static_handler' => true, 'document_root' => dirname(dirname(__FILE__)) . '/public/page' ]); // 设置异步AIO线程数量 swoole_async_set([ 'thread_num' => 10, ]); // 监听启动 $this->server->on('Start', [$this, 'onStart']); // 监听worker进程加载 $this->server->on('WorkerStart', [$this, 'onWorkerStart']); // 监听请求并输出内容 $this->server->on('Request', [$this, 'onRequest']); // 监听客户端握手成功 $this->server->on('Open', [$this, 'onOpen']); // 监听客户端发送消息 $this->server->on('Message', [$this, 'onMessage']); // 监听task任务执行 $this->server->on('Task', [$this, 'onTask']); // 监听task任务完成 $this->server->on('Finish', [$this, 'onFinish']); // 监听关闭 $this->server->on('Close', [$this, 'onClose']); } /** * 监听启动,修改主进程名称,便于执行平滑重启 */ public function onStart() { swoole_set_process_name('http_server'); } /** * 监听worker进程加载 * @param $server Server WebSocket对象 * @param $worker_id int 进程ID */ public function onWorkerStart($server, $worker_id) { define('APP_PATH', __DIR__ . '/../application/'); // 加载基础文件 require __DIR__ . '/../thinkphp/base.php'; } /** * 监听请求并输出内容 * @param $request object 请求对象 * @param $response object 响应对象 */ public function onRequest($request, $response) { // 过滤图标请求 if ($request->server['request_uri'] == '/favicon.ico') { $response->status(404); $response->end(); return; } // 把swoole接收的信息转换为thinkphp可识别的 $_SERVER = []; if (isset($request->server)) { foreach ($request->server as $k => $v) { $_SERVER[strtoupper($k)] = $v; } } if (isset($request->header)) { foreach ($request->header as $k => $v) { $_SERVER[strtoupper($k)] = $v; } } // swoole对于超全局数组:$_SERVER、$_GET、$_POST、define不会释放 $_GET = []; if (isset($request->get)) { foreach ($request->get as $k => $v) { $_GET[$k] = $v; } } $_POST = []; if (isset($request->post)) { foreach ($request->post as $k => $v) { $_POST[$k] = $v; } } // 最好全局保存一下 $this->server 对象以供thinkphp控制器中使用,此案例没用到所以省略了 ob_start(); // 执行应用并响应 try { \think\Container::get('app', [APP_PATH])->run()->send(); } catch (\Exception $e) { echo $e->getMessage(); // todo } $res = ob_get_contents(); ob_end_clean(); $response->header('content-type','text/html; charset=UTF-8'); $response->end($res); } /** * 监听客户端握手成功 * @param $server Server WebSocket对象 * @param $request object 请求对象 */ public function onOpen($server, $request) { $fd = $request->fd; $token = $request->get['token'] ?? ''; //拿到客户端用户ID if (!$token) { $server->push($fd, JsonReturn::error('请先登录!')); $server->close($fd); return; } $user_data = Cache::get($token); if (!$user_data) { $server->push($fd, JsonReturn::error('用户身份无效,请重新登录!')); $server->close($fd); return; } // 把用户id绑定到fd中 $server->bind($fd, $user_data->id); // 使用table内存表用来保存客户端fd跟用户ID的关联 if (!isset($this->fd_table)) { $this->fd_table = new \Swoole\Table(1024); $this->fd_table->column('fd', $this->fd_table::TYPE_INT, 4); $this->fd_table->create(); } $this->fd_table->set('user' . $user_data->id, ['fd' => $fd]); } /** * 监听客户端发送消息 * @param $server Server WebSocket对象 * @param $frame object 客户端数据对象 */ public function onMessage($server, $frame) { $data = json_decode($frame->data, true); foreach ($data as $key => $value) { $_POST[$key] = $value; } try { $response = \think\Container::get('think\App', [APP_PATH])->action('index/user/sendMessage', [], 'controller', false); } catch (\Exception $e) { $exp_data = json_decode($e->getMessage(), true); $server->push($frame->fd, JsonReturn::error($exp_data['message'])); return; } if (!isset($this->fd_table)) { $server->push($frame->fd, JsonReturn::error('发送消息暂不可用,请稍后重试!')); return; } if (!$this->fd_table->exist('user' . $response['take_id'])) { $server->push($frame->fd, JsonReturn::error('对方暂不在线,无法发送消息!后续开放留言功能~')); return; } $friend_fd = $this->fd_table->get('user' . $response['take_id']); // 发送者返回的数据 $send_response_data = array_merge($response, ['type' => 1]); // 接受者返回的数据 $take_response_date = array_merge($response, ['type' => 2]); if ($friend_fd) { $server->push($friend_fd['fd'], JsonReturn::success('推送消息成功!', $take_response_date)); $server->push($frame->fd, JsonReturn::success('发送消息成功!', $send_response_data)); } } /** * 监听task任务执行 * @param $server Server WebSocket对象 * @param $task object task任务数据对象 * @return string 执行结果 */ public function onTask($server, $task) { // 分发task任务机制,让不同的任务,走不同的逻辑 $my_task = new Task(); $method = $task->data['method']; $result = $my_task->$method($task->data['data']); // 把结果告诉worker return $result; } /** * 监听task任务完成 * @param $server Server WebSocket对象 * @param $task_id int task任务ID * @param $data string onTask执行完返回的结果 */ public function onFinish($server, $task_id, $data) {} /** * 监听关闭 * @param $server Server WebSocket对象 * @param $client_id int 客户端连接ID */ public function onClose($server, $client_id) { // 关闭时清除用户fd数据 if (isset($this->fd_table)) $this->fd_table->del('user' . $client_id); } /** * 启动服务器 */ public function start() { $this->server->start(); } } $http = new Http(); $http->start();