264 lines
7.9 KiB
PHP
264 lines
7.9 KiB
PHP
|
<?php
|
|||
|
/**
|
|||
|
* WebSocket服务端类
|
|||
|
* User: fantasticbin
|
|||
|
* Date: 2020/1/15
|
|||
|
*/
|
|||
|
namespace server;
|
|||
|
|
|||
|
use Swoole\WebSocket\Server;
|
|||
|
use app\common\controller\JsonReturn;
|
|||
|
use app\common\controller\Task;
|
|||
|
use think\facade\Cache;
|
|||
|
|
|||
|
class Http
|
|||
|
{
|
|||
|
const HOST = '0.0.0.0';
|
|||
|
const PORT = 8080;
|
|||
|
|
|||
|
// WebSocket对象
|
|||
|
public $server;
|
|||
|
public $fd_table;
|
|||
|
|
|||
|
public function __construct()
|
|||
|
{
|
|||
|
// 创建WebSocket服务器,监听本机的所有地址并监听 8080 端口
|
|||
|
$this->server = new Server(self::HOST, self::PORT);
|
|||
|
|
|||
|
// 设置响应html文件模式以及定义路径
|
|||
|
$this->server->set([
|
|||
|
/*'worker_num' => 2,*/
|
|||
|
'task_worker_num' => 2,
|
|||
|
'dispatch_mode' => 5,
|
|||
|
/*'enable_coroutine' => true,
|
|||
|
'task_enable_coroutine' => true,*/ // Task 启用协程,前提是enable_coroutine必须为true
|
|||
|
'enable_static_handler' => true,
|
|||
|
'document_root' => dirname(dirname(__FILE__)) . '/public/page'
|
|||
|
]);
|
|||
|
|
|||
|
// 设置异步AIO线程数量
|
|||
|
swoole_async_set([
|
|||
|
'thread_num' => 10,
|
|||
|
]);
|
|||
|
|
|||
|
// 监听启动
|
|||
|
$this->server->on('Start', [$this, 'onStart']);
|
|||
|
// 监听worker进程加载
|
|||
|
$this->server->on('WorkerStart', [$this, 'onWorkerStart']);
|
|||
|
// 监听请求并输出内容
|
|||
|
$this->server->on('Request', [$this, 'onRequest']);
|
|||
|
// 监听客户端握手成功
|
|||
|
$this->server->on('Open', [$this, 'onOpen']);
|
|||
|
// 监听客户端发送消息
|
|||
|
$this->server->on('Message', [$this, 'onMessage']);
|
|||
|
// 监听task任务执行
|
|||
|
$this->server->on('Task', [$this, 'onTask']);
|
|||
|
// 监听task任务完成
|
|||
|
$this->server->on('Finish', [$this, 'onFinish']);
|
|||
|
// 监听关闭
|
|||
|
$this->server->on('Close', [$this, 'onClose']);
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* 监听启动,修改主进程名称,便于执行平滑重启
|
|||
|
*/
|
|||
|
public function onStart()
|
|||
|
{
|
|||
|
swoole_set_process_name('http_server');
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* 监听worker进程加载
|
|||
|
* @param $server Server WebSocket对象
|
|||
|
* @param $worker_id int 进程ID
|
|||
|
*/
|
|||
|
public function onWorkerStart($server, $worker_id)
|
|||
|
{
|
|||
|
define('APP_PATH', __DIR__ . '/../application/');
|
|||
|
// 加载基础文件
|
|||
|
require __DIR__ . '/../thinkphp/base.php';
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* 监听请求并输出内容
|
|||
|
* @param $request object 请求对象
|
|||
|
* @param $response object 响应对象
|
|||
|
*/
|
|||
|
public function onRequest($request, $response)
|
|||
|
{
|
|||
|
// 过滤图标请求
|
|||
|
if ($request->server['request_uri'] == '/favicon.ico') {
|
|||
|
$response->status(404);
|
|||
|
$response->end();
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
// 把swoole接收的信息转换为thinkphp可识别的
|
|||
|
$_SERVER = [];
|
|||
|
if (isset($request->server)) {
|
|||
|
foreach ($request->server as $k => $v) {
|
|||
|
$_SERVER[strtoupper($k)] = $v;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
if (isset($request->header)) {
|
|||
|
foreach ($request->header as $k => $v) {
|
|||
|
$_SERVER[strtoupper($k)] = $v;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// swoole对于超全局数组:$_SERVER、$_GET、$_POST、define不会释放
|
|||
|
$_GET = [];
|
|||
|
if (isset($request->get)) {
|
|||
|
foreach ($request->get as $k => $v) {
|
|||
|
$_GET[$k] = $v;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
$_POST = [];
|
|||
|
if (isset($request->post)) {
|
|||
|
foreach ($request->post as $k => $v) {
|
|||
|
$_POST[$k] = $v;
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
// 最好全局保存一下 $this->server 对象以供thinkphp控制器中使用,此案例没用到所以省略了
|
|||
|
|
|||
|
ob_start();
|
|||
|
// 执行应用并响应
|
|||
|
try {
|
|||
|
\think\Container::get('app', [APP_PATH])->run()->send();
|
|||
|
} catch (\Exception $e) {
|
|||
|
echo $e->getMessage();
|
|||
|
// todo
|
|||
|
}
|
|||
|
$res = ob_get_contents();
|
|||
|
ob_end_clean();
|
|||
|
$response->header('content-type','text/html; charset=UTF-8');
|
|||
|
$response->end($res);
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* 监听客户端握手成功
|
|||
|
* @param $server Server WebSocket对象
|
|||
|
* @param $request object 请求对象
|
|||
|
*/
|
|||
|
public function onOpen($server, $request)
|
|||
|
{
|
|||
|
$fd = $request->fd;
|
|||
|
$token = $request->get['token'] ?? ''; //拿到客户端用户ID
|
|||
|
if (!$token) {
|
|||
|
$server->push($fd, JsonReturn::error('请先登录!'));
|
|||
|
$server->close($fd);
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
$user_data = Cache::get($token);
|
|||
|
if (!$user_data) {
|
|||
|
$server->push($fd, JsonReturn::error('用户身份无效,请重新登录!'));
|
|||
|
$server->close($fd);
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
// 把用户id绑定到fd中
|
|||
|
$server->bind($fd, $user_data->id);
|
|||
|
|
|||
|
// 使用table内存表用来保存客户端fd跟用户ID的关联
|
|||
|
if (!isset($this->fd_table)) {
|
|||
|
$this->fd_table = new \Swoole\Table(1024);
|
|||
|
$this->fd_table->column('fd', $this->fd_table::TYPE_INT, 4);
|
|||
|
$this->fd_table->create();
|
|||
|
}
|
|||
|
|
|||
|
$this->fd_table->set('user' . $user_data->id, ['fd' => $fd]);
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* 监听客户端发送消息
|
|||
|
* @param $server Server WebSocket对象
|
|||
|
* @param $frame object 客户端数据对象
|
|||
|
*/
|
|||
|
public function onMessage($server, $frame)
|
|||
|
{
|
|||
|
$data = json_decode($frame->data, true);
|
|||
|
foreach ($data as $key => $value) {
|
|||
|
$_POST[$key] = $value;
|
|||
|
}
|
|||
|
|
|||
|
try {
|
|||
|
$response = \think\Container::get('think\App', [APP_PATH])->action('index/user/sendMessage', [], 'controller', false);
|
|||
|
} catch (\Exception $e) {
|
|||
|
$exp_data = json_decode($e->getMessage(), true);
|
|||
|
$server->push($frame->fd, JsonReturn::error($exp_data['message']));
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
if (!isset($this->fd_table)) {
|
|||
|
$server->push($frame->fd, JsonReturn::error('发送消息暂不可用,请稍后重试!'));
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
if (!$this->fd_table->exist('user' . $response['take_id'])) {
|
|||
|
$server->push($frame->fd, JsonReturn::error('对方暂不在线,无法发送消息!后续开放留言功能~'));
|
|||
|
return;
|
|||
|
}
|
|||
|
|
|||
|
$friend_fd = $this->fd_table->get('user' . $response['take_id']);
|
|||
|
|
|||
|
// 发送者返回的数据
|
|||
|
$send_response_data = array_merge($response, ['type' => 1]);
|
|||
|
// 接受者返回的数据
|
|||
|
$take_response_date = array_merge($response, ['type' => 2]);
|
|||
|
if ($friend_fd) {
|
|||
|
$server->push($friend_fd['fd'], JsonReturn::success('推送消息成功!', $take_response_date));
|
|||
|
$server->push($frame->fd, JsonReturn::success('发送消息成功!', $send_response_data));
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* 监听task任务执行
|
|||
|
* @param $server Server WebSocket对象
|
|||
|
* @param $task object task任务数据对象
|
|||
|
* @return string 执行结果
|
|||
|
*/
|
|||
|
public function onTask($server, $task)
|
|||
|
{
|
|||
|
// 分发task任务机制,让不同的任务,走不同的逻辑
|
|||
|
$my_task = new Task();
|
|||
|
$method = $task->data['method'];
|
|||
|
$result = $my_task->$method($task->data['data']);
|
|||
|
// 把结果告诉worker
|
|||
|
return $result;
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* 监听task任务完成
|
|||
|
* @param $server Server WebSocket对象
|
|||
|
* @param $task_id int task任务ID
|
|||
|
* @param $data string onTask执行完返回的结果
|
|||
|
*/
|
|||
|
public function onFinish($server, $task_id, $data)
|
|||
|
{}
|
|||
|
|
|||
|
/**
|
|||
|
* 监听关闭
|
|||
|
* @param $server Server WebSocket对象
|
|||
|
* @param $client_id int 客户端连接ID
|
|||
|
*/
|
|||
|
public function onClose($server, $client_id)
|
|||
|
{
|
|||
|
// 关闭时清除用户fd数据
|
|||
|
if (isset($this->fd_table)) $this->fd_table->del('user' . $client_id);
|
|||
|
}
|
|||
|
|
|||
|
/**
|
|||
|
* 启动服务器
|
|||
|
*/
|
|||
|
public function start()
|
|||
|
{
|
|||
|
$this->server->start();
|
|||
|
}
|
|||
|
}
|
|||
|
|
|||
|
$http = new Http();
|
|||
|
$http->start();
|