package znet import ( "fmt" "go-study/zinx/utils" "go-study/zinx/ziface" ) type MsgHandle struct { // 存放每个 MsgID 所对应的处理方法 Apis map[uint32]ziface.IRouter // 业务工作 Worker 池的数量 WorkerPoolSize uint32 // 业务工作 Worker 队列 TaskQueue []chan ziface.IRequest } // NewMsgHandle 创建一个 MsgHandle func NewMsgHandle() *MsgHandle { return &MsgHandle{ Apis: make(map[uint32]ziface.IRouter), WorkerPoolSize: utils.ConfigInstance.WorkerPoolSize, TaskQueue: make([]chan ziface.IRequest, utils.ConfigInstance.WorkerPoolSize), } } // DoMsgHandler 执行对应的路由方法 func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) { router, ok := mh.Apis[request.GetMsgID()] if !ok { fmt.Println("api msgID = ", request.GetMsgID(), " is not found!") return } // 执行对应的处理方法 router.PreHandle(request) router.Handle(request) router.PostHandle(request) } // AddRouter 为消息添加具体的处理逻辑 func (mh *MsgHandle) AddRouter(msgID uint32, router ziface.IRouter) { if _, ok := mh.Apis[msgID]; ok { panic("repeated api, msgID = " + fmt.Sprint(msgID)) } mh.Apis[msgID] = router fmt.Println("Add api msgID = ", msgID) } // StartOneWorker 启动一个工作池(Worker),每个 Worker 都有一个消息队列 func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) { fmt.Println("Worker ID = ", workerID, " is started.") // 不断等待队列中的消息 for { select { // 有消息则取出队列的 Request,并执行绑定的业务方法 case request := <-taskQueue: mh.DoMsgHandler(request) } } } // StartWorkerPool 启动工作池 func (mh *MsgHandle) StartWorkerPool() { // 根据 WorkerPoolSize 分别开启 Worker,每个 Worker 用一个 goroutine 来承载 for i := 0; i < int(mh.WorkerPoolSize); i++ { // 一个 Worker 被启动 // 1. 创建当前 Worker 的消息队列 mh.TaskQueue[i] = make(chan ziface.IRequest, utils.ConfigInstance.MaxWorkerTaskLen) // 2. 启动当前 Worker,阻塞等待消息从消息队列中传递进来 go mh.StartOneWorker(i, mh.TaskQueue[i]) } } // SendMsgToTaskQueue 将消息交给 TaskQueue 处理 func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) { // 1. 根据 ConnID 来分配当前的请求应该由哪个 Worker 负责处理 // 轮询的平均分配法则 workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize fmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request MsgID=", request.GetMsgID(), " to WorkerID=", workerID) // 2. 将请求消息发送给对应的 Worker 的 TaskQueue 即可 mh.TaskQueue[workerID] <- request }