diff --git a/zinx/utils/config.go b/zinx/utils/config.go index be22dc4..5fe079e 100644 --- a/zinx/utils/config.go +++ b/zinx/utils/config.go @@ -54,7 +54,7 @@ func init() { // 初始化全局配置,并设置默认值 ConfigInstance = &Config{ Name: "ZinxServerApp", - Version: "V0.5", + Version: "V0.6", Host: "0.0.0.0", TcpPort: 7777, MaxConn: 12000, diff --git a/zinx/ziface/imsghandler.go b/zinx/ziface/imsghandler.go new file mode 100644 index 0000000..fa499aa --- /dev/null +++ b/zinx/ziface/imsghandler.go @@ -0,0 +1,8 @@ +package ziface + +type IMsgHandle interface { + // DoMsgHandler 执行对应的路由方法 + DoMsgHandler(request IRequest) + // AddRouter 为消息添加具体的处理逻辑 + AddRouter(msgID uint32, router IRouter) +} diff --git a/zinx/ziface/iserver.go b/zinx/ziface/iserver.go index e346bbe..f0162a8 100644 --- a/zinx/ziface/iserver.go +++ b/zinx/ziface/iserver.go @@ -8,5 +8,5 @@ type IServer interface { // Serve 运行服务器 Serve() // AddRouter 为服务注册路由方法,供客户端连接处理使用 - AddRouter(router IRouter) + AddRouter(msgID uint32, router IRouter) } diff --git a/zinx/znet/connection.go b/zinx/znet/connection.go index 15dbbd7..81fc9b7 100644 --- a/zinx/znet/connection.go +++ b/zinx/znet/connection.go @@ -14,20 +14,20 @@ type Connection struct { // 该连接的关闭状态 IsClosed bool - // 该连接的处理方法 Router - Router ziface.IRouter + // 消息管理模块,绑定该连接的 MsgID 和对应的处理方法 + MsgHandler ziface.IMsgHandle // 该连接的退出消息通知 channel ExitBuffChan chan struct{} } // NewConnection 创建连接的方法 -func NewConnection(conn *net.TCPConn, connID uint32, router ziface.IRouter) ziface.IConnection { +func NewConnection(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection { return &Connection{ Conn: conn, ConnID: connID, IsClosed: false, - Router: router, + MsgHandler: msgHandler, ExitBuffChan: make(chan struct{}, 1), } } @@ -76,12 +76,7 @@ func (c *Connection) StartReader() { msg: msg, // msg 封装在 Request 中 } // 从路由中找到注册绑定的 Conn 对应的 router 调用 - go func(request ziface.IRequest) { - // 执行注册的路由方法 - c.Router.PreHandle(request) - c.Router.Handle(request) - c.Router.PostHandle(request) - }(&req) + go c.MsgHandler.DoMsgHandler(&req) } } diff --git a/zinx/znet/msghandler.go b/zinx/znet/msghandler.go new file mode 100644 index 0000000..ccc0b24 --- /dev/null +++ b/zinx/znet/msghandler.go @@ -0,0 +1,41 @@ +package znet + +import ( + "fmt" + "go-study/zinx/ziface" +) + +type MsgHandle struct { + // 存放每个 MsgID 所对应的处理方法 + Apis map[uint32]ziface.IRouter +} + +// NewMsgHandle 创建一个 MsgHandle +func NewMsgHandle() *MsgHandle { + return &MsgHandle{ + Apis: make(map[uint32]ziface.IRouter), + } +} + +// DoMsgHandler 执行对应的路由方法 +func (mh *MsgHandle) DoMsgHandler(request ziface.IRequest) { + router, ok := mh.Apis[request.GetMsgID()] + if !ok { + fmt.Println("api msgID = ", request.GetMsgID(), " is not found!") + return + } + + // 执行对应的处理方法 + router.PreHandle(request) + router.Handle(request) + router.PostHandle(request) +} + +// AddRouter 为消息添加具体的处理逻辑 +func (mh *MsgHandle) AddRouter(msgID uint32, router ziface.IRouter) { + if _, ok := mh.Apis[msgID]; ok { + panic("repeated api, msgID = " + fmt.Sprint(msgID)) + } + mh.Apis[msgID] = router + fmt.Println("Add api msgID = ", msgID) +} diff --git a/zinx/znet/server.go b/zinx/znet/server.go index 3635ea2..6e9d5d4 100644 --- a/zinx/znet/server.go +++ b/zinx/znet/server.go @@ -16,19 +16,19 @@ type Server struct { IP string // Port 服务器监听的端口 Port int - // Router 路由 - Router ziface.IRouter + // MsgHandler 该服务器的消息管理模块,用来绑定 MsgID 和对应的处理方法 + MsgHandler ziface.IMsgHandle } // NewServer 创建一个服务器句柄 func NewServer() ziface.IServer { utils.ConfigInstance.Reload() return &Server{ - Name: utils.ConfigInstance.Name, - IPVersion: "tcp4", - IP: utils.ConfigInstance.Host, - Port: utils.ConfigInstance.TcpPort, - Router: nil, + Name: utils.ConfigInstance.Name, + IPVersion: "tcp4", + IP: utils.ConfigInstance.Host, + Port: utils.ConfigInstance.TcpPort, + MsgHandler: NewMsgHandle(), } } @@ -72,7 +72,7 @@ func (s *Server) Start() { // 3.2 TODO 设置服务器最大连接数,如果超过最大连接数则关闭此新连接 // 3.3 处理该新连接请求的业务方法,此时 handler 和 conn 应该是绑定的 - dealConn := NewConnection(conn, cid, s.Router) + dealConn := NewConnection(conn, cid, s.MsgHandler) cid++ // 3.4 启动当前连接的业务处理 @@ -99,7 +99,7 @@ func (s *Server) Serve() { } // AddRouter 为当前服务注册一个路由方法,供客户端连接处理使用 -func (s *Server) AddRouter(router ziface.IRouter) { - s.Router = router +func (s *Server) AddRouter(msgID uint32, router ziface.IRouter) { + s.MsgHandler.AddRouter(msgID, router) fmt.Println("Add Router Succ!") } diff --git a/zinx/znet/server_test.go b/zinx/znet/server_test.go index b9aa8d0..ce31bb8 100644 --- a/zinx/znet/server_test.go +++ b/zinx/znet/server_test.go @@ -18,14 +18,30 @@ func (pr *PingRouter) Handle(request ziface.IRequest) { fmt.Println("recv from client: msgID=", request.GetMsgID(), ", data=", string(request.GetData())) // 回写数据 - err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping\n")) + err := request.GetConnection().SendMsg(0, []byte("ping...ping...ping\n")) if err != nil { fmt.Println("call back ping error:", err.Error()) } } -// ClientTest 模拟客户端 -func ClientTest() { +type HelloZinxRouter struct { + BaseRouter +} + +func (h *HelloZinxRouter) Handle(request ziface.IRequest) { + fmt.Println("Call HelloZinxRouter Handle") + // 先读取客户端的数据,再回写ping...ping...ping + fmt.Println("recv from client: msgID=", request.GetMsgID(), ", data=", string(request.GetData())) + + // 回写数据 + err := request.GetConnection().SendMsg(1, []byte("Hello Zinx Router V0.6\n")) + if err != nil { + fmt.Println("call back HelloZinx error:", err.Error()) + } +} + +// ClientTest0 模拟客户端0 +func ClientTest0() { fmt.Println("Client Test... start") // 3s 之后发起测试请求,给服务器端开启服务的机会 time.Sleep(3 * time.Second) @@ -39,7 +55,60 @@ func ClientTest() { for { // 发送封包数据 dp := NewDataPack() - msg, _ := dp.Pack(NewMsgPackage(0, []byte("Zinx V0.5 Client Test Message"))) + msg, _ := dp.Pack(NewMsgPackage(0, []byte("Zinx V0.6 Client1 Test Message"))) + if _, err := conn.Write(msg); err != nil { + fmt.Println("write error:", err) + return + } + + // 服务器回复一个数据,先读出流中的 head 部分 + headData := make([]byte, dp.GetHeadLen()) + if _, err := conn.Read(headData); err != nil { + fmt.Println("read head error:", err) + break + } + + // 拆包,得到 msgID 和 dataLen 放在 msg 中 + msgHead, err := dp.Unpack(headData) + if err != nil { + fmt.Println("unpack head error:", err) + return + } + + if msgHead.GetDataLen() > 0 { + // msg 是有数据的,需要再次读取 dataLen 个字节的数据 + msg := msgHead.(*Message) + msg.data = make([]byte, msg.GetDataLen()) + + // 根据 dataLen 的长度再次从io中读取 + if _, err := conn.Read(msg.data); err != nil { + fmt.Println("read msg data error:", err) + return + } + + fmt.Println("==> Recv Server Msg: ID=", msg.GetMsgID(), ", len=", msg.GetDataLen(), ", data=", string(msg.GetData())) + } + + time.Sleep(1 * time.Second) + } +} + +// ClientTest1 模拟客户端1 +func ClientTest1() { + fmt.Println("Client Test... start") + // 3s 之后发起测试请求,给服务器端开启服务的机会 + time.Sleep(3 * time.Second) + + conn, err := net.Dial("tcp", "127.0.0.1:7777") + if err != nil { + fmt.Println("client dial err:", err) + return + } + + for { + // 发送封包数据 + dp := NewDataPack() + msg, _ := dp.Pack(NewMsgPackage(1, []byte("Zinx V0.6 Client2 Test Message"))) if _, err := conn.Write(msg); err != nil { fmt.Println("write error:", err) return @@ -82,11 +151,15 @@ func TestServer(t *testing.T) { // 创建一个 Server 句柄 s := NewServer() - // 给当前 Zinx 框架添加一个自定义的 Router - s.AddRouter(&PingRouter{}) + // 给当前 Zinx 框架添加自定义的 Router + s.AddRouter(0, &PingRouter{}) + s.AddRouter(1, &HelloZinxRouter{}) - // 启动客户端测试 - go ClientTest() + // 启动客户端测试0 + go ClientTest0() + + // 启动客户端测试1 + go ClientTest1() // 启动服务器 s.Serve()