233 lines
5.8 KiB
Go
233 lines
5.8 KiB
Go
package znet
|
||
|
||
import (
|
||
"fmt"
|
||
"go-study/zinx/utils"
|
||
"go-study/zinx/ziface"
|
||
"net"
|
||
)
|
||
|
||
type Connection struct {
|
||
// 当前连接属于哪个服务器
|
||
TcpServer ziface.IServer
|
||
// 该连接的 TCP Socket 套接字
|
||
Conn *net.TCPConn
|
||
// 连接 ID,全局唯一
|
||
ConnID uint32
|
||
// 该连接的关闭状态
|
||
IsClosed bool
|
||
|
||
// 消息管理模块,绑定该连接的 MsgID 和对应的处理方法
|
||
MsgHandler ziface.IMsgHandle
|
||
|
||
// 该连接的退出消息通知 channel
|
||
ExitBuffChan chan struct{}
|
||
// 无缓冲管道,用于读、写两个 Goroutine 之间的消息通信
|
||
msgChan chan []byte
|
||
// 有缓冲管道,用于读、写两个 Goroutine 之间的消息通信
|
||
msgBuffChan chan []byte
|
||
}
|
||
|
||
// NewConnection 创建连接的方法
|
||
func NewConnection(server ziface.IServer, conn *net.TCPConn, connID uint32, msgHandler ziface.IMsgHandle) *Connection {
|
||
c := &Connection{
|
||
TcpServer: server,
|
||
Conn: conn,
|
||
ConnID: connID,
|
||
IsClosed: false,
|
||
MsgHandler: msgHandler,
|
||
ExitBuffChan: make(chan struct{}, 1),
|
||
msgChan: make(chan []byte),
|
||
msgBuffChan: make(chan []byte, utils.ConfigInstance.MaxMsgChanLen),
|
||
}
|
||
|
||
// 将新创建的连接添加到连接管理器中
|
||
c.TcpServer.GetConnMgr().Add(c)
|
||
return c
|
||
}
|
||
|
||
// StartReader 处理连接读数据的业务方法
|
||
func (c *Connection) StartReader() {
|
||
fmt.Println("Reader Goroutine is running")
|
||
defer fmt.Println("connID =", c.ConnID, "Reader is exit, remote addr is", c.RemoteAddr().String())
|
||
defer c.Stop()
|
||
|
||
for {
|
||
// 创建拆包解包对象
|
||
dp := NewDataPack()
|
||
|
||
// 读取客户端的 Msg Head 二进制流 8 字节
|
||
headData := make([]byte, dp.GetHeadLen())
|
||
if _, err := c.Conn.Read(headData); err != nil {
|
||
fmt.Println("read msg head error:", err)
|
||
c.ExitBuffChan <- struct{}{}
|
||
continue
|
||
}
|
||
|
||
// 拆包,得到 MsgID 和 DataLen 放在 msg 中
|
||
msg, err := dp.Unpack(headData)
|
||
if err != nil {
|
||
fmt.Println("unpack error:", err)
|
||
c.ExitBuffChan <- struct{}{}
|
||
continue
|
||
}
|
||
|
||
// 根据 DataLen 再读取 Data,放在 msg.Data 中
|
||
var data []byte
|
||
if msg.GetDataLen() > 0 {
|
||
data = make([]byte, msg.GetDataLen())
|
||
if _, err := c.Conn.Read(data); err != nil {
|
||
fmt.Println("read msg data error:", err)
|
||
c.ExitBuffChan <- struct{}{}
|
||
continue
|
||
}
|
||
}
|
||
msg.SetData(data)
|
||
|
||
// 得到当前客户端请求的 Request 请求数据
|
||
req := Request{
|
||
conn: c,
|
||
msg: msg, // msg 封装在 Request 中
|
||
}
|
||
|
||
if utils.ConfigInstance.WorkerPoolSize > 0 {
|
||
// 已经开启了工作池机制,将消息交给 Worker 处理
|
||
c.MsgHandler.SendMsgToTaskQueue(&req)
|
||
} else {
|
||
// 从路由中找到注册绑定的 Conn 对应的 router 调用
|
||
go c.MsgHandler.DoMsgHandler(&req)
|
||
}
|
||
}
|
||
}
|
||
|
||
// StartWriter 处理连接写数据的业务方法
|
||
func (c *Connection) StartWriter() {
|
||
fmt.Println("Writer Goroutine is running")
|
||
defer fmt.Println(c.RemoteAddr().String(), "conn Writer exit!")
|
||
|
||
for {
|
||
select {
|
||
case data := <-c.msgChan:
|
||
// 有数据要写回客户端
|
||
if _, err := c.Conn.Write(data); err != nil {
|
||
fmt.Println("Send data error:", err)
|
||
return
|
||
}
|
||
case data, ok := <-c.msgBuffChan:
|
||
// 针对有缓冲通道需要进行数据处理
|
||
if ok {
|
||
// 有数据要写回客户端
|
||
if _, err := c.Conn.Write(data); err != nil {
|
||
fmt.Println("Send buff data error:", err)
|
||
return
|
||
}
|
||
} else {
|
||
// 读取不到数据,msgBuffChan 被关闭了
|
||
fmt.Println("msgBuffChan is closed")
|
||
break
|
||
}
|
||
|
||
case <-c.ExitBuffChan:
|
||
// 连接已经关闭
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// Start 启动连接,让当前连接开始工作
|
||
func (c *Connection) Start() {
|
||
// 启动当前连接的读数据业务
|
||
go c.StartReader()
|
||
// 启动当前连接的写数据业务
|
||
go c.StartWriter()
|
||
|
||
// 按照用户传递进来的创建连接时需要处理的业务,执行钩子方法
|
||
c.TcpServer.CallOnConnStart(c)
|
||
|
||
for {
|
||
select {
|
||
case <-c.ExitBuffChan:
|
||
// 得到退出消息,不再阻塞
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
// Stop 停止连接,结束当前连接状态
|
||
func (c *Connection) Stop() {
|
||
if c.IsClosed {
|
||
return
|
||
}
|
||
c.IsClosed = true
|
||
fmt.Println("Conn Stop()...ConnID =", c.ConnID)
|
||
|
||
// 如果用户注册了该连接的关闭回调业务,那么在此刻应该显式调用
|
||
c.TcpServer.CallOnConnStop(c)
|
||
|
||
// 关闭 Socket 连接
|
||
_ = c.Conn.Close()
|
||
// 通知从缓冲队列读取数据的业务,该连接已经关闭
|
||
c.ExitBuffChan <- struct{}{}
|
||
|
||
// 将连接从连接管理器中删除
|
||
c.TcpServer.GetConnMgr().Remove(c)
|
||
|
||
// 关闭该连接的全部管道
|
||
close(c.ExitBuffChan)
|
||
close(c.msgBuffChan)
|
||
}
|
||
|
||
// GetTCPConnection 获取当前连接的 TCP Socket 套接字
|
||
func (c *Connection) GetTCPConnection() *net.TCPConn {
|
||
return c.Conn
|
||
}
|
||
|
||
// GetConnID 获取当前连接 ID
|
||
func (c *Connection) GetConnID() uint32 {
|
||
return c.ConnID
|
||
}
|
||
|
||
// RemoteAddr 获取远程客户端地址信息
|
||
func (c *Connection) RemoteAddr() net.Addr {
|
||
return c.Conn.RemoteAddr()
|
||
}
|
||
|
||
// SendMsg 发送数据,将数据发送给远程的客户端
|
||
func (c *Connection) SendMsg(msgID uint32, data []byte) error {
|
||
if c.IsClosed {
|
||
return fmt.Errorf("connection closed when send msg")
|
||
}
|
||
|
||
// 将 data 封包,并且发送
|
||
dp := NewDataPack()
|
||
binaryMsg, err := dp.Pack(NewMsgPackage(msgID, data))
|
||
if err != nil {
|
||
fmt.Println("Pack error msg id =", msgID)
|
||
return fmt.Errorf("pack error msg id =%d, err => %s", msgID, err.Error())
|
||
}
|
||
|
||
// 写回客户端
|
||
c.msgChan <- binaryMsg
|
||
|
||
return nil
|
||
}
|
||
|
||
func (c *Connection) SendBuffMsg(msgID uint32, data []byte) error {
|
||
if c.IsClosed {
|
||
return fmt.Errorf("connection closed when send buff msg")
|
||
}
|
||
|
||
// 将 data 封包,并且发送
|
||
dp := NewDataPack()
|
||
binaryMsg, err := dp.Pack(NewMsgPackage(msgID, data))
|
||
if err != nil {
|
||
fmt.Println("Pack error msg id =", msgID)
|
||
return fmt.Errorf("pack error msg id =%d, err => %s", msgID, err.Error())
|
||
}
|
||
|
||
// 写回客户端
|
||
c.msgBuffChan <- binaryMsg
|
||
|
||
return nil
|
||
}
|