diff --git a/zinx/utils/config.go b/zinx/utils/config.go index dd33ce2..1888ad8 100644 --- a/zinx/utils/config.go +++ b/zinx/utils/config.go @@ -25,6 +25,10 @@ type Config struct { MaxPacketSize uint32 // MaxConn 最大连接数 MaxConn int + // WorkerPoolSize 业务工作 Worker 池的数量 + WorkerPoolSize uint32 + // MaxWorkerTaskLen 每个 Worker 对应负责的任务队列最大任务存储数量 + MaxWorkerTaskLen uint32 } // ConfigInstance 定义一个全局的配置实例 @@ -53,12 +57,14 @@ func (c *Config) Reload() { func init() { // 初始化全局配置,并设置默认值 ConfigInstance = &Config{ - Name: "ZinxServerApp", - Version: "V0.7", - Host: "0.0.0.0", - TcpPort: 7777, - MaxConn: 12000, - MaxPacketSize: 4096, + Name: "ZinxServerApp", + Version: "V0.8", + Host: "0.0.0.0", + TcpPort: 7777, + MaxConn: 12000, + MaxPacketSize: 4096, + WorkerPoolSize: 10, + MaxWorkerTaskLen: 1024, } // 加载配置文件 diff --git a/zinx/ziface/imsghandler.go b/zinx/ziface/imsghandler.go index fa499aa..6028cb4 100644 --- a/zinx/ziface/imsghandler.go +++ b/zinx/ziface/imsghandler.go @@ -5,4 +5,8 @@ type IMsgHandle interface { DoMsgHandler(request IRequest) // AddRouter 为消息添加具体的处理逻辑 AddRouter(msgID uint32, router IRouter) + // StartWorkerPool 启动工作池 + StartWorkerPool() + // SendMsgToTaskQueue 将消息交给 TaskQueue 处理 + SendMsgToTaskQueue(request IRequest) } diff --git a/zinx/znet/connection.go b/zinx/znet/connection.go index ac99b6e..4d853f7 100644 --- a/zinx/znet/connection.go +++ b/zinx/znet/connection.go @@ -2,6 +2,7 @@ package znet import ( "fmt" + "go-study/zinx/utils" "go-study/zinx/ziface" "net" ) @@ -78,8 +79,14 @@ func (c *Connection) StartReader() { conn: c, msg: msg, // msg 封装在 Request 中 } - // 从路由中找到注册绑定的 Conn 对应的 router 调用 - go c.MsgHandler.DoMsgHandler(&req) + + if utils.ConfigInstance.WorkerPoolSize > 0 { + // 已经开启了工作池机制,将消息交给 Worker 处理 + c.MsgHandler.SendMsgToTaskQueue(&req) + } else { + // 从路由中找到注册绑定的 Conn 对应的 router 调用 + go c.MsgHandler.DoMsgHandler(&req) + } } } diff --git a/zinx/znet/msghandler.go b/zinx/znet/msghandler.go index ccc0b24..ff0e6b7 100644 --- a/zinx/znet/msghandler.go +++ b/zinx/znet/msghandler.go @@ -2,18 +2,25 @@ package znet import ( "fmt" + "go-study/zinx/utils" "go-study/zinx/ziface" ) type MsgHandle struct { // 存放每个 MsgID 所对应的处理方法 Apis map[uint32]ziface.IRouter + // 业务工作 Worker 池的数量 + WorkerPoolSize uint32 + // 业务工作 Worker 队列 + TaskQueue []chan ziface.IRequest } // NewMsgHandle 创建一个 MsgHandle func NewMsgHandle() *MsgHandle { return &MsgHandle{ - Apis: make(map[uint32]ziface.IRouter), + Apis: make(map[uint32]ziface.IRouter), + WorkerPoolSize: utils.ConfigInstance.WorkerPoolSize, + TaskQueue: make([]chan ziface.IRequest, utils.ConfigInstance.WorkerPoolSize), } } @@ -39,3 +46,40 @@ func (mh *MsgHandle) AddRouter(msgID uint32, router ziface.IRouter) { mh.Apis[msgID] = router fmt.Println("Add api msgID = ", msgID) } + +// StartOneWorker 启动一个工作池(Worker),每个 Worker 都有一个消息队列 +func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) { + fmt.Println("Worker ID = ", workerID, " is started.") + // 不断等待队列中的消息 + for { + select { + // 有消息则取出队列的 Request,并执行绑定的业务方法 + case request := <-taskQueue: + mh.DoMsgHandler(request) + } + } +} + +// StartWorkerPool 启动工作池 +func (mh *MsgHandle) StartWorkerPool() { + // 根据 WorkerPoolSize 分别开启 Worker,每个 Worker 用一个 goroutine 来承载 + for i := 0; i < int(mh.WorkerPoolSize); i++ { + // 一个 Worker 被启动 + // 1. 创建当前 Worker 的消息队列 + mh.TaskQueue[i] = make(chan ziface.IRequest, utils.ConfigInstance.MaxWorkerTaskLen) + // 2. 启动当前 Worker,阻塞等待消息从消息队列中传递进来 + go mh.StartOneWorker(i, mh.TaskQueue[i]) + } +} + +// SendMsgToTaskQueue 将消息交给 TaskQueue 处理 +func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) { + // 1. 根据 ConnID 来分配当前的请求应该由哪个 Worker 负责处理 + // 轮询的平均分配法则 + workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize + + fmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request MsgID=", request.GetMsgID(), " to WorkerID=", workerID) + + // 2. 将请求消息发送给对应的 Worker 的 TaskQueue 即可 + mh.TaskQueue[workerID] <- request +} diff --git a/zinx/znet/server.go b/zinx/znet/server.go index 6e9d5d4..f5f99e4 100644 --- a/zinx/znet/server.go +++ b/zinx/znet/server.go @@ -41,6 +41,9 @@ func (s *Server) Start() { utils.ConfigInstance.MaxPacketSize) go func() { + // 0. 启动消息队列及 worker 工作池 + s.MsgHandler.StartWorkerPool() + // 1. 获取一个 TCP 的 Addr addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port)) if err != nil {