go-study/zinx/znet/connection.go
2025-09-14 18:35:26 +08:00

249 lines
6.3 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package znet
import (
"context"
"fmt"
"go-study/zinx/utils"
"go-study/zinx/ziface"
"net"
)
type Connection struct {
// 当前连接属于哪个服务器
TcpServer ziface.IServer
// 该连接的 TCP Socket 套接字
Conn *net.TCPConn
// 连接 ID全局唯一
ConnID uint32
// 该连接的关闭状态
IsClosed bool
// 消息管理模块,绑定该连接的 MsgID 和对应的处理方法
MsgHandler ziface.IMsgHandle
// 该连接的退出消息通知 channel
ExitBuffChan chan struct{}
// 无缓冲管道,用于读、写两个 Goroutine 之间的消息通信
msgChan chan []byte
// 有缓冲管道,用于读、写两个 Goroutine 之间的消息通信
msgBuffChan chan []byte
// 当前连接的上下文,用于连接属性传递和生命周期管理
// Zinx 源代码中并没有使用该字段,而是通过一个 map[string]interface{} 属性集合来实现的
ctx context.Context
}
// NewConnection 创建连接的方法
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
c := &Connection{
TcpServer: server,
Conn: conn,
ConnID: connID,
IsClosed: false,
MsgHandler: msgHandler,
ExitBuffChan: make(chan struct{}, 1),
msgChan: make(chan []byte),
msgBuffChan: make(chan []byte, utils.ConfigInstance.MaxMsgChanLen),
}
// 将新创建的连接添加到连接管理器中
c.TcpServer.GetConnMgr().Add(c)
return c
}
// StartReader 处理连接读数据的业务方法
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println("connID =", c.ConnID, "Reader is exit, remote addr is", c.RemoteAddr().String())
defer c.Stop()
for {
// 创建拆包解包对象
dp := NewDataPack()
// 读取客户端的 Msg Head 二进制流 8 字节
headData := make([]byte, dp.GetHeadLen())
if _, err := c.Conn.Read(headData); err != nil {
fmt.Println("read msg head error:", err)
c.ExitBuffChan <- struct{}{}
continue
}
// 拆包,得到 MsgID 和 DataLen 放在 msg 中
msg, err := dp.Unpack(headData)
if err != nil {
fmt.Println("unpack error:", err)
c.ExitBuffChan <- struct{}{}
continue
}
// 根据 DataLen 再读取 Data放在 msg.Data 中
var data []byte
if msg.GetDataLen() > 0 {
data = make([]byte, msg.GetDataLen())
if _, err := c.Conn.Read(data); err != nil {
fmt.Println("read msg data error:", err)
c.ExitBuffChan <- struct{}{}
continue
}
}
msg.SetData(data)
// 得到当前客户端请求的 Request 请求数据
req := Request{
conn: c,
msg: msg, // msg 封装在 Request 中
}
if utils.ConfigInstance.WorkerPoolSize > 0 {
// 已经开启了工作池机制,将消息交给 Worker 处理
c.MsgHandler.SendMsgToTaskQueue(&req)
} else {
// 从路由中找到注册绑定的 Conn 对应的 router 调用
go c.MsgHandler.DoMsgHandler(&req)
}
}
}
// StartWriter 处理连接写数据的业务方法
func (c *Connection) StartWriter() {
fmt.Println("Writer Goroutine is running")
defer fmt.Println(c.RemoteAddr().String(), "conn Writer exit!")
for {
select {
case data := <-c.msgChan:
// 有数据要写回客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send data error:", err)
return
}
case data, ok := <-c.msgBuffChan:
// 针对有缓冲通道需要进行数据处理
if ok {
// 有数据要写回客户端
if _, err := c.Conn.Write(data); err != nil {
fmt.Println("Send buff data error:", err)
return
}
} else {
// 读取不到数据msgBuffChan 被关闭了
fmt.Println("msgBuffChan is closed")
break
}
case <-c.ExitBuffChan:
// 连接已经关闭
return
}
}
}
// Start 启动连接,让当前连接开始工作
func (c *Connection) Start() {
// 启动当前连接的读数据业务
go c.StartReader()
// 启动当前连接的写数据业务
go c.StartWriter()
// 按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
c.TcpServer.CallOnConnStart(c)
for {
select {
case <-c.ExitBuffChan:
// 得到退出消息,不再阻塞
return
}
}
}
// Stop 停止连接,结束当前连接状态
func (c *Connection) Stop() {
if c.IsClosed {
return
}
c.IsClosed = true
fmt.Println("Conn Stop()...ConnID =", c.ConnID)
// 如果用户注册了该连接的关闭回调业务,那么在此刻应该显式调用
c.TcpServer.CallOnConnStop(c)
// 关闭 Socket 连接
_ = c.Conn.Close()
// 通知从缓冲队列读取数据的业务,该连接已经关闭
c.ExitBuffChan <- struct{}{}
// 将连接从连接管理器中删除
c.TcpServer.GetConnMgr().Remove(c)
// 关闭该连接的全部管道
close(c.ExitBuffChan)
close(c.msgBuffChan)
}
// GetTCPConnection 获取当前连接的 TCP Socket 套接字
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
// GetConnID 获取当前连接 ID
func (c *Connection) GetConnID() uint32 {
return c.ConnID
}
// RemoteAddr 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
// SendMsg 发送数据,将数据发送给远程的客户端
func (c *Connection) SendMsg(msgID uint32, data []byte) error {
if c.IsClosed {
return fmt.Errorf("connection closed when send msg")
}
// 将 data 封包,并且发送
dp := NewDataPack()
binaryMsg, err := dp.Pack(NewMsgPackage(msgID, data))
if err != nil {
fmt.Println("Pack error msg id =", msgID)
return fmt.Errorf("pack error msg id =%d, err => %s", msgID, err.Error())
}
// 写回客户端
c.msgChan <- binaryMsg
return nil
}
// SendBuffMsg 发送有缓冲数据,将数据发送给远程的客户端
func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error {
if c.IsClosed {
return fmt.Errorf("connection closed when send buff msg")
}
// 将 data 封包,并且发送
dp := NewDataPack()
binaryMsg, err := dp.Pack(NewMsgPackage(msgID, data))
if err != nil {
fmt.Println("Pack error msg id =", msgID)
return fmt.Errorf("pack error msg id =%d, err => %s", msgID, err.Error())
}
// 写回客户端
c.msgBuffChan <- binaryMsg
return nil
}
// SetContext 设置当前连接的上下文
func (c *Connection) SetContext(ctx context.Context) {
c.ctx = ctx
}
// GetContext 获取当前连接的上下文
func (c *Connection) GetContext() context.Context {
return c.ctx
}