diff --git a/zinx/utils/config.go b/zinx/utils/config.go index f54273e..be22dc4 100644 --- a/zinx/utils/config.go +++ b/zinx/utils/config.go @@ -54,7 +54,7 @@ func init() { // 初始化全局配置,并设置默认值 ConfigInstance = &Config{ Name: "ZinxServerApp", - Version: "V0.4", + Version: "V0.5", Host: "0.0.0.0", TcpPort: 7777, MaxConn: 12000, diff --git a/zinx/ziface/iconnection.go b/zinx/ziface/iconnection.go index 4265350..1f06fa7 100644 --- a/zinx/ziface/iconnection.go +++ b/zinx/ziface/iconnection.go @@ -13,6 +13,8 @@ type IConnection interface { GetConnID() uint32 // RemoteAddr 获取远程客户端地址信息 RemoteAddr() net.Addr + // SendMsg 发送数据,将数据发送给远程的客户端 + SendMsg(msgID uint32, data []byte) error } // HandleFunc 定义一个统一处理连接业务的方法 diff --git a/zinx/ziface/idatapack.go b/zinx/ziface/idatapack.go new file mode 100644 index 0000000..4505942 --- /dev/null +++ b/zinx/ziface/idatapack.go @@ -0,0 +1,10 @@ +package ziface + +type IDataPack interface { + // GetHeadLen 获取包头长度方法 + GetHeadLen() uint32 + // Pack 封包方法(压缩数据) + Pack(msg IMessage) ([]byte, error) + // Unpack 拆包方法(解压数据) + Unpack(binaryData []byte) (IMessage, error) +} diff --git a/zinx/ziface/imessage.go b/zinx/ziface/imessage.go new file mode 100644 index 0000000..2f6f5e1 --- /dev/null +++ b/zinx/ziface/imessage.go @@ -0,0 +1,15 @@ +package ziface + +type IMessage interface { + // GetMsgID 获取消息ID + GetMsgID() uint32 + // GetDataLen 获取消息长度 + GetDataLen() uint32 + // GetData 获取消息内容 + GetData() []byte + + // SetMsgID 设置消息ID + SetMsgID(uint32) + // SetData 设置消息内容 + SetData([]byte) +} diff --git a/zinx/ziface/irequest.go b/zinx/ziface/irequest.go index 5a6511c..fa31185 100644 --- a/zinx/ziface/irequest.go +++ b/zinx/ziface/irequest.go @@ -3,6 +3,8 @@ package ziface type IRequest interface { // GetConnection 获取请求连接信息 GetConnection() IConnection + // GetMsgID 获取请求消息ID + GetMsgID() uint32 // GetData 获取请求消息数据 GetData() []byte } diff --git a/zinx/znet/connection.go b/zinx/znet/connection.go index 77d0e37..15dbbd7 100644 --- a/zinx/znet/connection.go +++ b/zinx/znet/connection.go @@ -39,18 +39,41 @@ func (c *Connection) StartReader() { defer c.Stop() for { - // 读取客户端数据到buf中,最大512字节 - buf := make([]byte, 512) - cnt, err := c.Conn.Read(buf) - if err != nil { - fmt.Println("recv buf err", err) + // 创建拆包解包对象 + dp := NewDataPack() + + // 读取客户端的 Msg Head 二进制流 8 字节 + headData := make([]byte, dp.GetHeadLen()) + if _, err := c.Conn.Read(headData); err != nil { + fmt.Println("read msg head error:", err) c.ExitBuffChan <- struct{}{} continue } + // 拆包,得到 MsgID 和 DataLen 放在 msg 中 + msg, err := dp.Unpack(headData) + if err != nil { + fmt.Println("unpack error:", err) + c.ExitBuffChan <- struct{}{} + continue + } + + // 根据 DataLen 再读取 Data,放在 msg.Data 中 + var data []byte + if msg.GetDataLen() > 0 { + data = make([]byte, msg.GetDataLen()) + if _, err := c.Conn.Read(data); err != nil { + fmt.Println("read msg data error:", err) + c.ExitBuffChan <- struct{}{} + continue + } + } + msg.SetData(data) + + // 得到当前客户端请求的 Request 请求数据 req := Request{ conn: c, - data: buf[:cnt], + msg: msg, // msg 封装在 Request 中 } // 从路由中找到注册绑定的 Conn 对应的 router 调用 go func(request ziface.IRequest) { @@ -109,3 +132,27 @@ func (c *Connection) GetConnID() uint32 { func (c *Connection) RemoteAddr() net.Addr { return c.Conn.RemoteAddr() } + +// SendMsg 发送数据,将数据发送给远程的客户端 +func (c *Connection) SendMsg(msgID uint32, data []byte) error { + if c.IsClosed { + return fmt.Errorf("connection closed when send msg") + } + + // 将 data 封包,并且发送 + dp := NewDataPack() + binaryMsg, err := dp.Pack(NewMsgPackage(msgID, data)) + if err != nil { + fmt.Println("Pack error msg id =", msgID) + return fmt.Errorf("pack error msg id =%d, err => %s", msgID, err.Error()) + } + + // 写回客户端 + if _, err := c.Conn.Write(binaryMsg); err != nil { + fmt.Println("Write msg id", msgID, "error:", err) + c.ExitBuffChan <- struct{}{} + return fmt.Errorf("write msg id =%d, err => %s", msgID, err.Error()) + } + + return nil +} diff --git a/zinx/znet/datapack.go b/zinx/znet/datapack.go new file mode 100644 index 0000000..53a8c60 --- /dev/null +++ b/zinx/znet/datapack.go @@ -0,0 +1,71 @@ +package znet + +import ( + "bytes" + "encoding/binary" + "errors" + "go-study/zinx/utils" + "go-study/zinx/ziface" +) + +type DataPack struct{} + +func NewDataPack() *DataPack { + return &DataPack{} +} + +// GetHeadLen 获取包头长度方法 +func (dp *DataPack) GetHeadLen() uint32 { + // ID uint32(4字节) + DataLen uint32(4字节) + return 8 +} + +// Pack 封包方法(压缩数据) +func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) { + // 创建一个存放二进制数据的缓冲 + dataBuff := bytes.NewBuffer([]byte{}) + + // 写 ID + if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgID()); err != nil { + return nil, err + } + + // 写 DataLen + if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil { + return nil, err + } + + // 写 Data + if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil { + return nil, err + } + + return dataBuff.Bytes(), nil +} + +// Unpack 拆包方法(解压数据) +func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) { + // 创建一个从输入二进制数据的 ioReader + dataBuff := bytes.NewReader(binaryData) + + // 只解压 head 信息,得到 ID 和 DataLen + msg := &Message{} + + // 读 ID + if err := binary.Read(dataBuff, binary.LittleEndian, &msg.id); err != nil { + return nil, err + } + + // 读 DataLen + if err := binary.Read(dataBuff, binary.LittleEndian, &msg.dataLen); err != nil { + return nil, err + } + + // 判断 DataLen 是否超出我们允许的最大包长度 + if utils.ConfigInstance.MaxPacketSize > 0 && msg.dataLen > utils.ConfigInstance.MaxPacketSize { + return nil, errors.New("too large msg data received") + } + + // 这里只需把 head 信息拆包出来即可,然后通过 head 中的长度再从 conn 读取一次数据 + return msg, nil +} diff --git a/zinx/znet/message.go b/zinx/znet/message.go new file mode 100644 index 0000000..9830bb3 --- /dev/null +++ b/zinx/znet/message.go @@ -0,0 +1,45 @@ +package znet + +type Message struct { + // 消息 ID + id uint32 + // 消息长度 + dataLen uint32 + // 消息内容 + data []byte +} + +// NewMsgPackage 创建一个 Message 消息包 +func NewMsgPackage(id uint32, data []byte) *Message { + return &Message{ + id: id, + dataLen: uint32(len(data)), + data: data, + } +} + +// GetMsgID 获取消息 ID +func (m *Message) GetMsgID() uint32 { + return m.id +} + +// GetDataLen 获取消息长度 +func (m *Message) GetDataLen() uint32 { + return m.dataLen +} + +// GetData 获取消息内容 +func (m *Message) GetData() []byte { + return m.data +} + +// SetMsgID 设置消息 ID +func (m *Message) SetMsgID(id uint32) { + m.id = id +} + +// SetData 设置消息内容(自动同步长度) +func (m *Message) SetData(data []byte) { + m.data = data + m.dataLen = uint32(len(data)) +} diff --git a/zinx/znet/request.go b/zinx/znet/request.go index 562d183..aa8087a 100644 --- a/zinx/znet/request.go +++ b/zinx/znet/request.go @@ -8,7 +8,7 @@ type Request struct { // 已经和客户端建立好的连接 conn ziface.IConnection // 客户端请求的数据 - data []byte + msg ziface.IMessage } // GetConnection 获取请求连接信息 @@ -16,7 +16,12 @@ func (req *Request) GetConnection() ziface.IConnection { return req.conn } +// GetMsgID 获取请求消息ID +func (req *Request) GetMsgID() uint32 { + return req.msg.GetMsgID() +} + // GetData 获取请求消息数据 func (req *Request) GetData() []byte { - return req.data + return req.msg.GetData() } diff --git a/zinx/znet/server_test.go b/zinx/znet/server_test.go index 3c65db9..b9aa8d0 100644 --- a/zinx/znet/server_test.go +++ b/zinx/znet/server_test.go @@ -12,27 +12,15 @@ type PingRouter struct { BaseRouter } -func (pr *PingRouter) PreHandle(request ziface.IRequest) { - fmt.Println("Call Router PreHandle") - _, err := request.GetConnection().GetTCPConnection().Write([]byte("before ping...\n")) - if err != nil { - fmt.Println("call back before ping error") - } -} - func (pr *PingRouter) Handle(request ziface.IRequest) { fmt.Println("Call PingRouter Handle") - _, err := request.GetConnection().GetTCPConnection().Write([]byte("ping...ping...ping\n")) - if err != nil { - fmt.Println("call back ping error") - } -} + // 先读取客户端的数据,再回写ping...ping...ping + fmt.Println("recv from client: msgID=", request.GetMsgID(), ", data=", string(request.GetData())) -func (pr *PingRouter) PostHandle(request ziface.IRequest) { - fmt.Println("Call Router PostHandle") - _, err := request.GetConnection().GetTCPConnection().Write([]byte("after ping...\n")) + // 回写数据 + err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping\n")) if err != nil { - fmt.Println("call back after ping error") + fmt.Println("call back ping error:", err.Error()) } } @@ -49,20 +37,41 @@ func ClientTest() { } for { - _, err := conn.Write([]byte("hello Zinx")) - if err != nil { + // 发送封包数据 + dp := NewDataPack() + msg, _ := dp.Pack(NewMsgPackage(0, []byte("Zinx V0.5 Client Test Message"))) + if _, err := conn.Write(msg); err != nil { fmt.Println("write error:", err) return } - buf := make([]byte, 512) - cnt, err := conn.Read(buf) + // 服务器回复一个数据,先读出流中的 head 部分 + headData := make([]byte, dp.GetHeadLen()) + if _, err := conn.Read(headData); err != nil { + fmt.Println("read head error:", err) + break + } + + // 拆包,得到 msgID 和 dataLen 放在 msg 中 + msgHead, err := dp.Unpack(headData) if err != nil { - fmt.Println("read buf error:", err) + fmt.Println("unpack head error:", err) return } - fmt.Printf("server call back: %s, cnt = %d\n", string(buf[:cnt]), cnt) + if msgHead.GetDataLen() > 0 { + // msg 是有数据的,需要再次读取 dataLen 个字节的数据 + msg := msgHead.(*Message) + msg.data = make([]byte, msg.GetDataLen()) + + // 根据 dataLen 的长度再次从io中读取 + if _, err := conn.Read(msg.data); err != nil { + fmt.Println("read msg data error:", err) + return + } + + fmt.Println("==> Recv Server Msg: ID=", msg.GetMsgID(), ", len=", msg.GetDataLen(), ", data=", string(msg.GetData())) + } time.Sleep(1 * time.Second) }