diff --git a/zinx/ziface/iconnection.go b/zinx/ziface/iconnection.go new file mode 100644 index 0000000..4265350 --- /dev/null +++ b/zinx/ziface/iconnection.go @@ -0,0 +1,19 @@ +package ziface + +import "net" + +type IConnection interface { + // Start 启动连接 + Start() + // Stop 停止连接 + Stop() + // GetTCPConnection 获取当前连接的 TCP Socket 套接字 + GetTCPConnection() *net.TCPConn + // GetConnID 获取当前连接 ID + GetConnID() uint32 + // RemoteAddr 获取远程客户端地址信息 + RemoteAddr() net.Addr +} + +// HandleFunc 定义一个统一处理连接业务的方法 +type HandleFunc func(*net.TCPConn, []byte, int) error diff --git a/zinx/znet/connection.go b/zinx/znet/connection.go new file mode 100644 index 0000000..3e91d2f --- /dev/null +++ b/zinx/znet/connection.go @@ -0,0 +1,105 @@ +package znet + +import ( + "fmt" + "go-study/zinx/ziface" + "net" +) + +type Connection struct { + // 该连接的 TCP Socket 套接字 + Conn *net.TCPConn + // 连接 ID,全局唯一 + ConnID uint32 + // 该连接的关闭状态 + IsClosed bool + + // 该连接的处理方法 + HandleFunc ziface.HandleFunc + + // 该连接的退出消息通知 channel + ExitBuffChan chan struct{} +} + +// NewConnection 创建连接的方法 +func NewConnection(conn *net.TCPConn, connID uint32, handleFunc ziface.HandleFunc) ziface.IConnection { + return &Connection{ + Conn: conn, + ConnID: connID, + IsClosed: false, + HandleFunc: handleFunc, + ExitBuffChan: make(chan struct{}, 1), + } +} + +// StartReader 处理连接读数据的业务方法 +func (c *Connection) StartReader() { + fmt.Println("Reader Goroutine is running") + defer fmt.Println("connID =", c.ConnID, "Reader is exit, remote addr is", c.RemoteAddr().String()) + defer c.Stop() + + for { + // 读取客户端数据到buf中,最大512字节 + buf := make([]byte, 512) + cnt, err := c.Conn.Read(buf) + if err != nil { + fmt.Println("recv buf err", err) + c.ExitBuffChan <- struct{}{} + continue + } + // 调用当前连接业务(这里执行的是当前连接绑定的 handle 方法) + if err := c.HandleFunc(c.Conn, buf, cnt); err != nil { + fmt.Println("ConnID =", c.ConnID, "handle is error", err) + c.ExitBuffChan <- struct{}{} + continue + } + } +} + +// Start 启动连接,让当前连接开始工作 +func (c *Connection) Start() { + // 启动当前连接的读数据业务 + go c.StartReader() + + for { + select { + case <-c.ExitBuffChan: + // 得到退出消息,不再阻塞 + return + } + } +} + +// Stop 停止连接,结束当前连接状态 +func (c *Connection) Stop() { + if c.IsClosed { + return + } + c.IsClosed = true + + // TODO 如果用户注册了该连接的关闭回调业务,那么在此刻应该显式调用 + + // 关闭 Socket 连接 + _ = c.Conn.Close() + + // 通知从缓冲队列读取数据的业务,该连接已经关闭 + c.ExitBuffChan <- struct{}{} + + // 关闭该连接的全部管道 + close(c.ExitBuffChan) +} + +// GetTCPConnection 获取当前连接的 TCP Socket 套接字 +func (c *Connection) GetTCPConnection() *net.TCPConn { + return c.Conn +} + +// GetConnID 获取当前连接 ID +func (c *Connection) GetConnID() uint32 { + return c.ConnID +} + +// RemoteAddr 获取远程客户端地址信息 +func (c *Connection) RemoteAddr() net.Addr { + return c.Conn.RemoteAddr() +} diff --git a/zinx/znet/server.go b/zinx/znet/server.go index 6dee915..a6b43df 100644 --- a/zinx/znet/server.go +++ b/zinx/znet/server.go @@ -48,6 +48,9 @@ func (s *Server) Start() { fmt.Println("start Zinx server ", s.Name, " succ, now listening...") + // TODO 应该有一个自动生成 ID 的方法 + cid := uint32(0) + // 3. 阻塞等待客户端连接,处理客户端连接业务(读写) for { // 3.1 阻塞等待客户端连接,AcceptTCP 会阻塞 @@ -59,26 +62,12 @@ func (s *Server) Start() { // 3.2 TODO 设置服务器最大连接数,如果超过最大连接数则关闭此新连接 - // 3.3 TODO 处理该新连接请求的业务方法,此时 handler 和 conn 应该是绑定的 + // 3.3 处理该新连接请求的业务方法,此时 handler 和 conn 应该是绑定的 + dealConn := NewConnection(conn, cid, CallbackToClient) + cid++ - // 这里暂时做一个最大 512 字节的回显服务 - go func() { - // 不断的循环,从客户端获取数据 - for { - buf := make([]byte, 512) - cnt, err := conn.Read(buf) - if err != nil { - fmt.Println("recv buf err", err) - continue - } - - // 回显 - if _, err := conn.Write(buf[:cnt]); err != nil { - fmt.Println("write back buf err", err) - continue - } - } - }() + // 3.4 启动当前连接的业务处理 + go dealConn.Start() } }() } @@ -99,3 +88,13 @@ func (s *Server) Serve() { // 阻塞,否则主 Go 程退出,listener 会退出 select {} } + +// CallbackToClient 定义当前客户端连接的业务处理方法 +func CallbackToClient(conn *net.TCPConn, data []byte, cnt int) error { + // 回显业务 + if _, err := conn.Write(data[:cnt]); err != nil { + fmt.Println("write back buf err", err) + return fmt.Errorf("CallbackToClient error: %v", err) + } + return nil +}