diff --git a/zinx/utils/config.go b/zinx/utils/config.go index 1888ad8..48417f3 100644 --- a/zinx/utils/config.go +++ b/zinx/utils/config.go @@ -29,6 +29,8 @@ type Config struct { WorkerPoolSize uint32 // MaxWorkerTaskLen 每个 Worker 对应负责的任务队列最大任务存储数量 MaxWorkerTaskLen uint32 + // MaxMsgChanLen 缓冲消息队列最大长度 + MaxMsgChanLen uint32 } // ConfigInstance 定义一个全局的配置实例 @@ -58,13 +60,14 @@ func init() { // 初始化全局配置,并设置默认值 ConfigInstance = &Config{ Name: "ZinxServerApp", - Version: "V0.8", + Version: "V0.9", Host: "0.0.0.0", TcpPort: 7777, MaxConn: 12000, MaxPacketSize: 4096, WorkerPoolSize: 10, MaxWorkerTaskLen: 1024, + MaxMsgChanLen: 1024, } // 加载配置文件 diff --git a/zinx/ziface/iconnection.go b/zinx/ziface/iconnection.go index 1f06fa7..26cd73e 100644 --- a/zinx/ziface/iconnection.go +++ b/zinx/ziface/iconnection.go @@ -13,9 +13,8 @@ type IConnection interface { GetConnID() uint32 // RemoteAddr 获取远程客户端地址信息 RemoteAddr() net.Addr - // SendMsg 发送数据,将数据发送给远程的客户端 + // SendMsg 发送数据,将数据发送给远程的客户端(无缓冲) SendMsg(msgID uint32, data []byte) error + // SendBuffMsg 发送数据,将数据发送给远程的客户端(有缓冲) + SendBuffMsg(msgID uint32, data []byte) error } - -// HandleFunc 定义一个统一处理连接业务的方法 -type HandleFunc func(*net.TCPConn, []byte, int) error diff --git a/zinx/ziface/iconnmanager.go b/zinx/ziface/iconnmanager.go new file mode 100644 index 0000000..9f1ac6f --- /dev/null +++ b/zinx/ziface/iconnmanager.go @@ -0,0 +1,14 @@ +package ziface + +type IConnManager interface { + // Add 添加连接 + Add(conn IConnection) + // Remove 删除连接 + Remove(conn IConnection) + // Get 根据连接ID获取连接 + Get(connID uint32) (IConnection, error) + // Len 获取当前连接总数 + Len() int + // ClearConn 清除并终止所有连接 + ClearConn() +} diff --git a/zinx/ziface/iserver.go b/zinx/ziface/iserver.go index f0162a8..91e1293 100644 --- a/zinx/ziface/iserver.go +++ b/zinx/ziface/iserver.go @@ -9,4 +9,14 @@ type IServer interface { Serve() // AddRouter 为服务注册路由方法,供客户端连接处理使用 AddRouter(msgID uint32, router IRouter) + // GetConnMgr 取得连接管理器 + GetConnMgr() IConnManager + // SetOnConnStart 注册该服务器的连接创建时的钩子函数 + SetOnConnStart(func(conn IConnection)) + // SetOnConnStop 注册该服务器的连接断开时的钩子函数 + SetOnConnStop(func(conn IConnection)) + // CallOnConnStart 调用连接创建时的钩子函数 + CallOnConnStart(conn IConnection) + // CallOnConnStop 调用连接断开时的钩子函数 + CallOnConnStop(conn IConnection) } diff --git a/zinx/znet/connection.go b/zinx/znet/connection.go index 4d853f7..71062bf 100644 --- a/zinx/znet/connection.go +++ b/zinx/znet/connection.go @@ -8,6 +8,8 @@ import ( ) type Connection struct { + // 当前连接属于哪个服务器 + TcpServer ziface.IServer // 该连接的 TCP Socket 套接字 Conn *net.TCPConn // 连接 ID,全局唯一 @@ -22,18 +24,26 @@ type Connection struct { ExitBuffChan chan struct{} // 无缓冲管道,用于读、写两个 Goroutine 之间的消息通信 msgChan chan []byte + // 有缓冲管道,用于读、写两个 Goroutine 之间的消息通信 + msgBuffChan chan []byte } // NewConnection 创建连接的方法 -func NewConnection(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection { - return &Connection{ +func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection { + c := &Connection{ + TcpServer: server, Conn: conn, ConnID: connID, IsClosed: false, MsgHandler: msgHandler, ExitBuffChan: make(chan struct{}, 1), msgChan: make(chan []byte), + msgBuffChan: make(chan []byte, utils.ConfigInstance.MaxMsgChanLen), } + + // 将新创建的连接添加到连接管理器中 + c.TcpServer.GetConnMgr().Add(c) + return c } // StartReader 处理连接读数据的业务方法 @@ -103,6 +113,20 @@ func (c *Connection) StartWriter() { fmt.Println("Send data error:", err) return } + case data, ok := <-c.msgBuffChan: + // 针对有缓冲通道需要进行数据处理 + if ok { + // 有数据要写回客户端 + if _, err := c.Conn.Write(data); err != nil { + fmt.Println("Send buff data error:", err) + return + } + } else { + // 读取不到数据,msgBuffChan 被关闭了 + fmt.Println("msgBuffChan is closed") + break + } + case <-c.ExitBuffChan: // 连接已经关闭 return @@ -117,6 +141,9 @@ func (c *Connection) Start() { // 启动当前连接的写数据业务 go c.StartWriter() + // 按照用户传递进来的创建连接时需要处理的业务,执行钩子方法 + c.TcpServer.CallOnConnStart(c) + for { select { case <-c.ExitBuffChan: @@ -132,17 +159,22 @@ func (c *Connection) Stop() { return } c.IsClosed = true + fmt.Println("Conn Stop()...ConnID =", c.ConnID) - // TODO 如果用户注册了该连接的关闭回调业务,那么在此刻应该显式调用 + // 如果用户注册了该连接的关闭回调业务,那么在此刻应该显式调用 + c.TcpServer.CallOnConnStop(c) // 关闭 Socket 连接 _ = c.Conn.Close() - // 通知从缓冲队列读取数据的业务,该连接已经关闭 c.ExitBuffChan <- struct{}{} + // 将连接从连接管理器中删除 + c.TcpServer.GetConnMgr().Remove(c) + // 关闭该连接的全部管道 close(c.ExitBuffChan) + close(c.msgBuffChan) } // GetTCPConnection 获取当前连接的 TCP Socket 套接字 @@ -179,3 +211,22 @@ func (c *Connection) SendMsg(msgID uint32, data []byte) error { return nil } + +func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error { + if c.IsClosed { + return fmt.Errorf("connection closed when send buff msg") + } + + // 将 data 封包,并且发送 + dp := NewDataPack() + binaryMsg, err := dp.Pack(NewMsgPackage(msgID, data)) + if err != nil { + fmt.Println("Pack error msg id =", msgID) + return fmt.Errorf("pack error msg id =%d, err => %s", msgID, err.Error()) + } + + // 写回客户端 + c.msgBuffChan <- binaryMsg + + return nil +} diff --git a/zinx/znet/connmanager.go b/zinx/znet/connmanager.go new file mode 100644 index 0000000..548ca60 --- /dev/null +++ b/zinx/znet/connmanager.go @@ -0,0 +1,76 @@ +package znet + +import ( + "fmt" + "go-study/zinx/ziface" + "sync" +) + +type ConnManager struct { + // 管理的连接集合 + connections map[uint32]ziface.IConnection + // 保护连接集合的读写锁 + connLock sync.RWMutex +} + +// NewConnManager 创建一个连接管理器 +func NewConnManager() *ConnManager { + return &ConnManager{ + connections: make(map[uint32]ziface.IConnection), + } +} + +// Add 添加连接 +func (cm *ConnManager) Add(conn ziface.IConnection) { + cm.connLock.Lock() + defer cm.connLock.Unlock() + + // 将连接添加到连接集合中 + cm.connections[conn.GetConnID()] = conn + + fmt.Println("connection add to ConnManager successfully: connID =", conn.GetConnID(), " now conn num =", cm.Len()) +} + +// Remove 删除连接 +func (cm *ConnManager) Remove(conn ziface.IConnection) { + cm.connLock.Lock() + defer cm.connLock.Unlock() + + // 删除连接 + delete(cm.connections, conn.GetConnID()) + + fmt.Println("connection remove from ConnManager successfully: connID =", conn.GetConnID(), " now conn num =", cm.Len()) +} + +// Get 根据连接ID获取连接 +func (cm *ConnManager) Get(connID uint32) (ziface.IConnection, error) { + cm.connLock.RLock() + defer cm.connLock.RUnlock() + + if conn, ok := cm.connections[connID]; ok { + return conn, nil + } else { + return nil, fmt.Errorf("connection not found: connID = %d", connID) + } +} + +// Len 获取当前连接总数 +func (cm *ConnManager) Len() int { + return len(cm.connections) +} + +// ClearConn 清除并终止所有连接 +func (cm *ConnManager) ClearConn() { + cm.connLock.Lock() + defer cm.connLock.Unlock() + + // 遍历并停止所有连接 + for connID, conn := range cm.connections { + // 停止连接 + conn.Stop() + // 删除连接 + delete(cm.connections, connID) + } + + fmt.Println("Clear all connections successfully: now conn num =", cm.Len()) +} diff --git a/zinx/znet/server.go b/zinx/znet/server.go index f5f99e4..47a5d30 100644 --- a/zinx/znet/server.go +++ b/zinx/znet/server.go @@ -16,8 +16,16 @@ type Server struct { IP string // Port 服务器监听的端口 Port int - // MsgHandler 该服务器的消息管理模块,用来绑定 MsgID 和对应的处理方法 - MsgHandler ziface.IMsgHandle + + // msgHandler 该服务器的消息管理模块,用来绑定 MsgID 和对应的处理方法 + msgHandler ziface.IMsgHandle + // connMgr 该服务器的连接管理器 + connMgr ziface.IConnManager + + // onConnStart 该服务器的连接创建时 Hook 函数 + onConnStart func(conn ziface.IConnection) + // onConnStop 该服务器的连接断开时的 Hook 函数 + onConnStop func(conn ziface.IConnection) } // NewServer 创建一个服务器句柄 @@ -28,7 +36,8 @@ func NewServer() ziface.IServer { IPVersion: "tcp4", IP: utils.ConfigInstance.Host, Port: utils.ConfigInstance.TcpPort, - MsgHandler: NewMsgHandle(), + msgHandler: NewMsgHandle(), + connMgr: NewConnManager(), } } @@ -42,7 +51,7 @@ func (s *Server) Start() { go func() { // 0. 启动消息队列及 worker 工作池 - s.MsgHandler.StartWorkerPool() + s.msgHandler.StartWorkerPool() // 1. 获取一个 TCP 的 Addr addr, err := net.ResolveTCPAddr(s.IPVersion, fmt.Sprintf("%s:%d", s.IP, s.Port)) @@ -72,10 +81,14 @@ func (s *Server) Start() { continue } - // 3.2 TODO 设置服务器最大连接数,如果超过最大连接数则关闭此新连接 + // 3.2 设置服务器最大连接数,如果超过最大连接数则关闭此新连接 + if s.connMgr.Len() >= utils.ConfigInstance.MaxConn { + _ = conn.Close() + continue + } // 3.3 处理该新连接请求的业务方法,此时 handler 和 conn 应该是绑定的 - dealConn := NewConnection(conn, cid, s.MsgHandler) + dealConn := NewConnection(s, conn, cid, s.msgHandler) cid++ // 3.4 启动当前连接的业务处理 @@ -88,7 +101,8 @@ func (s *Server) Start() { func (s *Server) Stop() { fmt.Println("[STOP] Zinx server, name ", s.Name) - // TODO 将需要清理的连接信息或者其他信息一并停止或者清理 + // 将需要清理的连接信息或者其他信息一并停止或者清理 + s.connMgr.ClearConn() } // Serve 运行服务器 @@ -103,6 +117,37 @@ func (s *Server) Serve() { // AddRouter 为当前服务注册一个路由方法,供客户端连接处理使用 func (s *Server) AddRouter(msgID uint32, router ziface.IRouter) { - s.MsgHandler.AddRouter(msgID, router) + s.msgHandler.AddRouter(msgID, router) fmt.Println("Add Router Succ!") } + +// GetConnMgr 获取连接管理器 +func (s *Server) GetConnMgr() ziface.IConnManager { + return s.connMgr +} + +// SetOnConnStart 设置该服务器的连接创建时的钩子函数 +func (s *Server) SetOnConnStart(hookFunc func(conn ziface.IConnection)) { + s.onConnStart = hookFunc +} + +// SetOnConnStop 设置该服务器的连接断开时的钩子函数 +func (s *Server) SetOnConnStop(hookFunc func(conn ziface.IConnection)) { + s.onConnStop = hookFunc +} + +// CallOnConnStart 调用连接创建时的钩子函数 +func (s *Server) CallOnConnStart(conn ziface.IConnection) { + if s.onConnStart != nil { + fmt.Println("-----> Call OnConnStart()...") + s.onConnStart(conn) + } +} + +// CallOnConnStop 调用连接断开时的钩子函数 +func (s *Server) CallOnConnStop(conn ziface.IConnection) { + if s.onConnStop != nil { + fmt.Println("-----> Call OnConnStop()...") + s.onConnStop(conn) + } +} diff --git a/zinx/znet/server_test.go b/zinx/znet/server_test.go index ce31bb8..316d618 100644 --- a/zinx/znet/server_test.go +++ b/zinx/znet/server_test.go @@ -40,6 +40,20 @@ func (h *HelloZinxRouter) Handle(request ziface.IRequest) { } } +// DoConnectionBegin 创建连接之后执行的钩子函数 +func DoConnectionBegin(conn ziface.IConnection) { + fmt.Println("DoConnectionBegin is Called ... ") + if err := conn.SendMsg(2, []byte("DoConnection BEGIN")); err != nil { + fmt.Println("DoConnectionBegin error:", err) + } +} + +// DoConnectionLost 连接断开之前执行的钩子函数 +func DoConnectionLost(conn ziface.IConnection) { + fmt.Println("DoConnectionLost is Called ... ") + fmt.Println("conn ID =", conn.GetConnID(), " is lost...") +} + // ClientTest0 模拟客户端0 func ClientTest0() { fmt.Println("Client Test... start") @@ -151,6 +165,10 @@ func TestServer(t *testing.T) { // 创建一个 Server 句柄 s := NewServer() + // 设置连接创建和销毁的钩子函数 + s.SetOnConnStart(DoConnectionBegin) + s.SetOnConnStop(DoConnectionLost) + // 给当前 Zinx 框架添加自定义的 Router s.AddRouter(0, &PingRouter{}) s.AddRouter(1, &HelloZinxRouter{})