86 lines
2.6 KiB
Go
86 lines
2.6 KiB
Go
package znet
|
||
|
||
import (
|
||
"fmt"
|
||
"go-study/zinx/utils"
|
||
"go-study/zinx/ziface"
|
||
)
|
||
|
||
type MsgHandle struct {
|
||
// 存放每个 MsgID 所对应的处理方法
|
||
Apis map[uint32]ziface.IRouter
|
||
// 业务工作 Worker 池的数量
|
||
WorkerPoolSize uint32
|
||
// 业务工作 Worker 队列
|
||
TaskQueue []chan ziface.IRequest
|
||
}
|
||
|
||
// NewMsgHandle 创建一个 MsgHandle
|
||
func NewMsgHandle() *MsgHandle {
|
||
return &MsgHandle{
|
||
Apis: make(map[uint32]ziface.IRouter),
|
||
WorkerPoolSize: utils.ConfigInstance.WorkerPoolSize,
|
||
TaskQueue: make([]chan ziface.IRequest, utils.ConfigInstance.WorkerPoolSize),
|
||
}
|
||
}
|
||
|
||
// DoMsgHandler 执行对应的路由方法
|
||
func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) {
|
||
router, ok := mh.Apis[request.GetMsgID()]
|
||
if !ok {
|
||
fmt.Println("api msgID = ", request.GetMsgID(), " is not found!")
|
||
return
|
||
}
|
||
|
||
// 执行对应的处理方法
|
||
router.PreHandle(request)
|
||
router.Handle(request)
|
||
router.PostHandle(request)
|
||
}
|
||
|
||
// AddRouter 为消息添加具体的处理逻辑
|
||
func (mh *MsgHandle) AddRouter(msgID uint32, router ziface.IRouter) {
|
||
if _, ok := mh.Apis[msgID]; ok {
|
||
panic("repeated api, msgID = " + fmt.Sprint(msgID))
|
||
}
|
||
mh.Apis[msgID] = router
|
||
fmt.Println("Add api msgID = ", msgID)
|
||
}
|
||
|
||
// StartOneWorker 启动一个工作池(Worker),每个 Worker 都有一个消息队列
|
||
func (mh *MsgHandle) StartOneWorker(workerID int, taskQueue chan ziface.IRequest) {
|
||
fmt.Println("Worker ID = ", workerID, " is started.")
|
||
// 不断等待队列中的消息
|
||
for {
|
||
select {
|
||
// 有消息则取出队列的 Request,并执行绑定的业务方法
|
||
case request := <-taskQueue:
|
||
mh.DoMsgHandler(request)
|
||
}
|
||
}
|
||
}
|
||
|
||
// StartWorkerPool 启动工作池
|
||
func (mh *MsgHandle) StartWorkerPool() {
|
||
// 根据 WorkerPoolSize 分别开启 Worker,每个 Worker 用一个 goroutine 来承载
|
||
for i := 0; i < int(mh.WorkerPoolSize); i++ {
|
||
// 一个 Worker 被启动
|
||
// 1. 创建当前 Worker 的消息队列
|
||
mh.TaskQueue[i] = make(chan ziface.IRequest, utils.ConfigInstance.MaxWorkerTaskLen)
|
||
// 2. 启动当前 Worker,阻塞等待消息从消息队列中传递进来
|
||
go mh.StartOneWorker(i, mh.TaskQueue[i])
|
||
}
|
||
}
|
||
|
||
// SendMsgToTaskQueue 将消息交给 TaskQueue 处理
|
||
func (mh *MsgHandle) SendMsgToTaskQueue(request ziface.IRequest) {
|
||
// 1. 根据 ConnID 来分配当前的请求应该由哪个 Worker 负责处理
|
||
// 轮询的平均分配法则
|
||
workerID := request.GetConnection().GetConnID() % mh.WorkerPoolSize
|
||
|
||
fmt.Println("Add ConnID=", request.GetConnection().GetConnID(), " request MsgID=", request.GetMsgID(), " to WorkerID=", workerID)
|
||
|
||
// 2. 将请求消息发送给对应的 Worker 的 TaskQueue 即可
|
||
mh.TaskQueue[workerID] <- request
|
||
}
|