package znet import ( "fmt" "go-study/zinx/ziface" "net" ) type Connection struct { // 该连接的 TCP Socket 套接字 Conn *net.TCPConn // 连接 ID,全局唯一 ConnID uint32 // 该连接的关闭状态 IsClosed bool // 消息管理模块,绑定该连接的 MsgID 和对应的处理方法 MsgHandler ziface.IMsgHandle // 该连接的退出消息通知 channel ExitBuffChan chan struct{} // 无缓冲管道,用于读、写两个 Goroutine 之间的消息通信 msgChan chan []byte } // NewConnection 创建连接的方法 func NewConnection(conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection { return &Connection{ Conn: conn, ConnID: connID, IsClosed: false, MsgHandler: msgHandler, ExitBuffChan: make(chan struct{}, 1), msgChan: make(chan []byte), } } // StartReader 处理连接读数据的业务方法 func (c *Connection) StartReader() { fmt.Println("Reader Goroutine is running") defer fmt.Println("connID =", c.ConnID, "Reader is exit, remote addr is", c.RemoteAddr().String()) defer c.Stop() for { // 创建拆包解包对象 dp := NewDataPack() // 读取客户端的 Msg Head 二进制流 8 字节 headData := make([]byte, dp.GetHeadLen()) if _, err := c.Conn.Read(headData); err != nil { fmt.Println("read msg head error:", err) c.ExitBuffChan <- struct{}{} continue } // 拆包,得到 MsgID 和 DataLen 放在 msg 中 msg, err := dp.Unpack(headData) if err != nil { fmt.Println("unpack error:", err) c.ExitBuffChan <- struct{}{} continue } // 根据 DataLen 再读取 Data,放在 msg.Data 中 var data []byte if msg.GetDataLen() > 0 { data = make([]byte, msg.GetDataLen()) if _, err := c.Conn.Read(data); err != nil { fmt.Println("read msg data error:", err) c.ExitBuffChan <- struct{}{} continue } } msg.SetData(data) // 得到当前客户端请求的 Request 请求数据 req := Request{ conn: c, msg: msg, // msg 封装在 Request 中 } // 从路由中找到注册绑定的 Conn 对应的 router 调用 go c.MsgHandler.DoMsgHandler(&req) } } // StartWriter 处理连接写数据的业务方法 func (c *Connection) StartWriter() { fmt.Println("Writer Goroutine is running") defer fmt.Println(c.RemoteAddr().String(), "conn Writer exit!") for { select { case data := <-c.msgChan: // 有数据要写回客户端 if _, err := c.Conn.Write(data); err != nil { fmt.Println("Send data error:", err) return } case <-c.ExitBuffChan: // 连接已经关闭 return } } } // Start 启动连接,让当前连接开始工作 func (c *Connection) Start() { // 启动当前连接的读数据业务 go c.StartReader() // 启动当前连接的写数据业务 go c.StartWriter() for { select { case <-c.ExitBuffChan: // 得到退出消息,不再阻塞 return } } } // Stop 停止连接,结束当前连接状态 func (c *Connection) Stop() { if c.IsClosed { return } c.IsClosed = true // TODO 如果用户注册了该连接的关闭回调业务,那么在此刻应该显式调用 // 关闭 Socket 连接 _ = c.Conn.Close() // 通知从缓冲队列读取数据的业务,该连接已经关闭 c.ExitBuffChan <- struct{}{} // 关闭该连接的全部管道 close(c.ExitBuffChan) } // GetTCPConnection 获取当前连接的 TCP Socket 套接字 func (c *Connection) GetTCPConnection() *net.TCPConn { return c.Conn } // GetConnID 获取当前连接 ID func (c *Connection) GetConnID() uint32 { return c.ConnID } // RemoteAddr 获取远程客户端地址信息 func (c *Connection) RemoteAddr() net.Addr { return c.Conn.RemoteAddr() } // SendMsg 发送数据,将数据发送给远程的客户端 func (c *Connection) SendMsg(msgID uint32, data []byte) error { if c.IsClosed { return fmt.Errorf("connection closed when send msg") } // 将 data 封包,并且发送 dp := NewDataPack() binaryMsg, err := dp.Pack(NewMsgPackage(msgID, data)) if err != nil { fmt.Println("Pack error msg id =", msgID) return fmt.Errorf("pack error msg id =%d, err => %s", msgID, err.Error()) } // 写回客户端 c.msgChan <- binaryMsg return nil }