You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

264 lines
7.9 KiB
PHP

This file contains ambiguous Unicode characters!

This file contains ambiguous Unicode characters that may be confused with others in your current locale. If your use case is intentional and legitimate, you can safely ignore this warning. Use the Escape button to highlight these characters.

<?php
/**
* WebSocket服务端类
* User: fantasticbin
* Date: 2020/1/15
*/
namespace server;
use Swoole\WebSocket\Server;
use app\common\controller\JsonReturn;
use app\common\controller\Task;
use think\facade\Cache;
class Http
{
const HOST = '0.0.0.0';
const PORT = 8080;
// WebSocket对象
public $server;
public $fd_table;
public function __construct()
{
// 创建WebSocket服务器监听本机的所有地址并监听 8080 端口
$this->server = new Server(self::HOST, self::PORT);
// 设置响应html文件模式以及定义路径
$this->server->set([
/*'worker_num' => 2,*/
'task_worker_num' => 2,
'dispatch_mode' => 5,
/*'enable_coroutine' => true,
'task_enable_coroutine' => true,*/ // Task 启用协程前提是enable_coroutine必须为true
'enable_static_handler' => true,
'document_root' => dirname(dirname(__FILE__)) . '/public/page'
]);
// 设置异步AIO线程数量
swoole_async_set([
'thread_num' => 10,
]);
// 监听启动
$this->server->on('Start', [$this, 'onStart']);
// 监听worker进程加载
$this->server->on('WorkerStart', [$this, 'onWorkerStart']);
// 监听请求并输出内容
$this->server->on('Request', [$this, 'onRequest']);
// 监听客户端握手成功
$this->server->on('Open', [$this, 'onOpen']);
// 监听客户端发送消息
$this->server->on('Message', [$this, 'onMessage']);
// 监听task任务执行
$this->server->on('Task', [$this, 'onTask']);
// 监听task任务完成
$this->server->on('Finish', [$this, 'onFinish']);
// 监听关闭
$this->server->on('Close', [$this, 'onClose']);
}
/**
* 监听启动,修改主进程名称,便于执行平滑重启
*/
public function onStart()
{
swoole_set_process_name('http_server');
}
/**
* 监听worker进程加载
* @param $server Server WebSocket对象
* @param $worker_id int 进程ID
*/
public function onWorkerStart($server, $worker_id)
{
define('APP_PATH', __DIR__ . '/../application/');
// 加载基础文件
require __DIR__ . '/../thinkphp/base.php';
}
/**
* 监听请求并输出内容
* @param $request object 请求对象
* @param $response object 响应对象
*/
public function onRequest($request, $response)
{
// 过滤图标请求
if ($request->server['request_uri'] == '/favicon.ico') {
$response->status(404);
$response->end();
return;
}
// 把swoole接收的信息转换为thinkphp可识别的
$_SERVER = [];
if (isset($request->server)) {
foreach ($request->server as $k => $v) {
$_SERVER[strtoupper($k)] = $v;
}
}
if (isset($request->header)) {
foreach ($request->header as $k => $v) {
$_SERVER[strtoupper($k)] = $v;
}
}
// swoole对于超全局数组$_SERVER、$_GET、$_POST、define不会释放
$_GET = [];
if (isset($request->get)) {
foreach ($request->get as $k => $v) {
$_GET[$k] = $v;
}
}
$_POST = [];
if (isset($request->post)) {
foreach ($request->post as $k => $v) {
$_POST[$k] = $v;
}
}
// 最好全局保存一下 $this->server 对象以供thinkphp控制器中使用此案例没用到所以省略了
ob_start();
// 执行应用并响应
try {
\think\Container::get('app', [APP_PATH])->run()->send();
} catch (\Exception $e) {
echo $e->getMessage();
// todo
}
$res = ob_get_contents();
ob_end_clean();
$response->header('content-type','text/html; charset=UTF-8');
$response->end($res);
}
/**
* 监听客户端握手成功
* @param $server Server WebSocket对象
* @param $request object 请求对象
*/
public function onOpen($server, $request)
{
$fd = $request->fd;
$token = $request->get['token'] ?? ''; //拿到客户端用户ID
if (!$token) {
$server->push($fd, JsonReturn::error('请先登录!'));
$server->close($fd);
return;
}
$user_data = Cache::get($token);
if (!$user_data) {
$server->push($fd, JsonReturn::error('用户身份无效,请重新登录!'));
$server->close($fd);
return;
}
// 把用户id绑定到fd中
$server->bind($fd, $user_data->id);
// 使用table内存表用来保存客户端fd跟用户ID的关联
if (!isset($this->fd_table)) {
$this->fd_table = new \Swoole\Table(1024);
$this->fd_table->column('fd', $this->fd_table::TYPE_INT, 4);
$this->fd_table->create();
}
$this->fd_table->set('user' . $user_data->id, ['fd' => $fd]);
}
/**
* 监听客户端发送消息
* @param $server Server WebSocket对象
* @param $frame object 客户端数据对象
*/
public function onMessage($server, $frame)
{
$data = json_decode($frame->data, true);
foreach ($data as $key => $value) {
$_POST[$key] = $value;
}
try {
$response = \think\Container::get('think\App', [APP_PATH])->action('index/user/sendMessage', [], 'controller', false);
} catch (\Exception $e) {
$exp_data = json_decode($e->getMessage(), true);
$server->push($frame->fd, JsonReturn::error($exp_data['message']));
return;
}
if (!isset($this->fd_table)) {
$server->push($frame->fd, JsonReturn::error('发送消息暂不可用,请稍后重试!'));
return;
}
if (!$this->fd_table->exist('user' . $response['take_id'])) {
$server->push($frame->fd, JsonReturn::error('对方暂不在线,无法发送消息!后续开放留言功能~'));
return;
}
$friend_fd = $this->fd_table->get('user' . $response['take_id']);
// 发送者返回的数据
$send_response_data = array_merge($response, ['type' => 1]);
// 接受者返回的数据
$take_response_date = array_merge($response, ['type' => 2]);
if ($friend_fd) {
$server->push($friend_fd['fd'], JsonReturn::success('推送消息成功!', $take_response_date));
$server->push($frame->fd, JsonReturn::success('发送消息成功!', $send_response_data));
}
}
/**
* 监听task任务执行
* @param $server Server WebSocket对象
* @param $task object task任务数据对象
* @return string 执行结果
*/
public function onTask($server, $task)
{
// 分发task任务机制让不同的任务走不同的逻辑
$my_task = new Task();
$method = $task->data['method'];
$result = $my_task->$method($task->data['data']);
// 把结果告诉worker
return $result;
}
/**
* 监听task任务完成
* @param $server Server WebSocket对象
* @param $task_id int task任务ID
* @param $data string onTask执行完返回的结果
*/
public function onFinish($server, $task_id, $data)
{}
/**
* 监听关闭
* @param $server Server WebSocket对象
* @param $client_id int 客户端连接ID
*/
public function onClose($server, $client_id)
{
// 关闭时清除用户fd数据
if (isset($this->fd_table)) $this->fd_table->del('user' . $client_id);
}
/**
* 启动服务器
*/
public function start()
{
$this->server->start();
}
}
$http = new Http();
$http->start();