Zinx-V0.5

This commit is contained in:
fantasticbin 2025-09-14 15:52:04 +08:00
parent 0e6a026d47
commit 5fcd797aa7
10 changed files with 238 additions and 32 deletions

View File

@ -54,7 +54,7 @@ func init() {
// 初始化全局配置,并设置默认值
ConfigInstance = &Config{
Name: "ZinxServerApp",
Version: "V0.4",
Version: "V0.5",
Host: "0.0.0.0",
TcpPort: 7777,
MaxConn: 12000,

View File

@ -13,6 +13,8 @@ type IConnection interface {
GetConnID() uint32
// RemoteAddr 获取远程客户端地址信息
RemoteAddr() net.Addr
// SendMsg 发送数据,将数据发送给远程的客户端
SendMsg(msgID uint32, data []byte) error
}
// HandleFunc 定义一个统一处理连接业务的方法

10
zinx/ziface/idatapack.go Normal file
View File

@ -0,0 +1,10 @@
package ziface
type IDataPack interface {
// GetHeadLen 获取包头长度方法
GetHeadLen() uint32
// Pack 封包方法(压缩数据)
Pack(msg IMessage) ([]byte, error)
// Unpack 拆包方法(解压数据)
Unpack(binaryData []byte) (IMessage, error)
}

15
zinx/ziface/imessage.go Normal file
View File

@ -0,0 +1,15 @@
package ziface
type IMessage interface {
// GetMsgID 获取消息ID
GetMsgID() uint32
// GetDataLen 获取消息长度
GetDataLen() uint32
// GetData 获取消息内容
GetData() []byte
// SetMsgID 设置消息ID
SetMsgID(uint32)
// SetData 设置消息内容
SetData([]byte)
}

View File

@ -3,6 +3,8 @@ package ziface
type IRequest interface {
// GetConnection 获取请求连接信息
GetConnection() IConnection
// GetMsgID 获取请求消息ID
GetMsgID() uint32
// GetData 获取请求消息数据
GetData() []byte
}

View File

@ -39,18 +39,41 @@ func (c *Connection) StartReader() {
defer c.Stop()
for {
// 读取客户端数据到buf中最大512字节
buf := make([]byte, 512)
cnt, err := c.Conn.Read(buf)
if err != nil {
fmt.Println("recv buf err", err)
// 创建拆包解包对象
dp := NewDataPack()
// 读取客户端的 Msg Head 二进制流 8 字节
headData := make([]byte, dp.GetHeadLen())
if _, err := c.Conn.Read(headData); err != nil {
fmt.Println("read msg head error:", err)
c.ExitBuffChan <- struct{}{}
continue
}
// 拆包,得到 MsgID 和 DataLen 放在 msg 中
msg, err := dp.Unpack(headData)
if err != nil {
fmt.Println("unpack error:", err)
c.ExitBuffChan <- struct{}{}
continue
}
// 根据 DataLen 再读取 Data放在 msg.Data 中
var data []byte
if msg.GetDataLen() > 0 {
data = make([]byte, msg.GetDataLen())
if _, err := c.Conn.Read(data); err != nil {
fmt.Println("read msg data error:", err)
c.ExitBuffChan <- struct{}{}
continue
}
}
msg.SetData(data)
// 得到当前客户端请求的 Request 请求数据
req := Request{
conn: c,
data: buf[:cnt],
msg: msg, // msg 封装在 Request 中
}
// 从路由中找到注册绑定的 Conn 对应的 router 调用
go func(request ziface.IRequest) {
@ -109,3 +132,27 @@ func (c *Connection) GetConnID() uint32 {
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}
// SendMsg 发送数据,将数据发送给远程的客户端
func (c *Connection) SendMsg(msgID uint32, data []byte) error {
if c.IsClosed {
return fmt.Errorf("connection closed when send msg")
}
// 将 data 封包,并且发送
dp := NewDataPack()
binaryMsg, err := dp.Pack(NewMsgPackage(msgID, data))
if err != nil {
fmt.Println("Pack error msg id =", msgID)
return fmt.Errorf("pack error msg id =%d, err => %s", msgID, err.Error())
}
// 写回客户端
if _, err := c.Conn.Write(binaryMsg); err != nil {
fmt.Println("Write msg id", msgID, "error:", err)
c.ExitBuffChan <- struct{}{}
return fmt.Errorf("write msg id =%d, err => %s", msgID, err.Error())
}
return nil
}

71
zinx/znet/datapack.go Normal file
View File

@ -0,0 +1,71 @@
package znet
import (
"bytes"
"encoding/binary"
"errors"
"go-study/zinx/utils"
"go-study/zinx/ziface"
)
type DataPack struct{}
func NewDataPack() *DataPack {
return &DataPack{}
}
// GetHeadLen 获取包头长度方法
func (dp *DataPack) GetHeadLen() uint32 {
// ID uint32(4字节) + DataLen uint32(4字节)
return 8
}
// Pack 封包方法(压缩数据)
func (dp *DataPack) Pack(msg ziface.IMessage) ([]byte, error) {
// 创建一个存放二进制数据的缓冲
dataBuff := bytes.NewBuffer([]byte{})
// 写 ID
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetMsgID()); err != nil {
return nil, err
}
// 写 DataLen
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetDataLen()); err != nil {
return nil, err
}
// 写 Data
if err := binary.Write(dataBuff, binary.LittleEndian, msg.GetData()); err != nil {
return nil, err
}
return dataBuff.Bytes(), nil
}
// Unpack 拆包方法(解压数据)
func (dp *DataPack) Unpack(binaryData []byte) (ziface.IMessage, error) {
// 创建一个从输入二进制数据的 ioReader
dataBuff := bytes.NewReader(binaryData)
// 只解压 head 信息,得到 ID 和 DataLen
msg := &Message{}
// 读 ID
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.id); err != nil {
return nil, err
}
// 读 DataLen
if err := binary.Read(dataBuff, binary.LittleEndian, &msg.dataLen); err != nil {
return nil, err
}
// 判断 DataLen 是否超出我们允许的最大包长度
if utils.ConfigInstance.MaxPacketSize > 0 && msg.dataLen > utils.ConfigInstance.MaxPacketSize {
return nil, errors.New("too large msg data received")
}
// 这里只需把 head 信息拆包出来即可,然后通过 head 中的长度再从 conn 读取一次数据
return msg, nil
}

45
zinx/znet/message.go Normal file
View File

@ -0,0 +1,45 @@
package znet
type Message struct {
// 消息 ID
id uint32
// 消息长度
dataLen uint32
// 消息内容
data []byte
}
// NewMsgPackage 创建一个 Message 消息包
func NewMsgPackage(id uint32, data []byte) *Message {
return &Message{
id: id,
dataLen: uint32(len(data)),
data: data,
}
}
// GetMsgID 获取消息 ID
func (m *Message) GetMsgID() uint32 {
return m.id
}
// GetDataLen 获取消息长度
func (m *Message) GetDataLen() uint32 {
return m.dataLen
}
// GetData 获取消息内容
func (m *Message) GetData() []byte {
return m.data
}
// SetMsgID 设置消息 ID
func (m *Message) SetMsgID(id uint32) {
m.id = id
}
// SetData 设置消息内容(自动同步长度)
func (m *Message) SetData(data []byte) {
m.data = data
m.dataLen = uint32(len(data))
}

View File

@ -8,7 +8,7 @@ type Request struct {
// 已经和客户端建立好的连接
conn ziface.IConnection
// 客户端请求的数据
data []byte
msg ziface.IMessage
}
// GetConnection 获取请求连接信息
@ -16,7 +16,12 @@ func (req *Request) GetConnection() ziface.IConnection {
return req.conn
}
// GetMsgID 获取请求消息ID
func (req *Request) GetMsgID() uint32 {
return req.msg.GetMsgID()
}
// GetData 获取请求消息数据
func (req *Request) GetData() []byte {
return req.data
return req.msg.GetData()
}

View File

@ -12,27 +12,15 @@ type PingRouter struct {
BaseRouter
}
func (pr *PingRouter) PreHandle(request ziface.IRequest) {
fmt.Println("Call Router PreHandle")
_, err := request.GetConnection().GetTCPConnection().Write([]byte("before ping...\n"))
if err != nil {
fmt.Println("call back before ping error")
}
}
func (pr *PingRouter) Handle(request ziface.IRequest) {
fmt.Println("Call PingRouter Handle")
_, err := request.GetConnection().GetTCPConnection().Write([]byte("ping...ping...ping\n"))
if err != nil {
fmt.Println("call back ping error")
}
}
// 先读取客户端的数据再回写ping...ping...ping
fmt.Println("recv from client: msgID=", request.GetMsgID(), ", data=", string(request.GetData()))
func (pr *PingRouter) PostHandle(request ziface.IRequest) {
fmt.Println("Call Router PostHandle")
_, err := request.GetConnection().GetTCPConnection().Write([]byte("after ping...\n"))
// 回写数据
err := request.GetConnection().SendMsg(1, []byte("ping...ping...ping\n"))
if err != nil {
fmt.Println("call back after ping error")
fmt.Println("call back ping error:", err.Error())
}
}
@ -49,20 +37,41 @@ func ClientTest() {
}
for {
_, err := conn.Write([]byte("hello Zinx"))
if err != nil {
// 发送封包数据
dp := NewDataPack()
msg, _ := dp.Pack(NewMsgPackage(0, []byte("Zinx V0.5 Client Test Message")))
if _, err := conn.Write(msg); err != nil {
fmt.Println("write error:", err)
return
}
buf := make([]byte, 512)
cnt, err := conn.Read(buf)
// 服务器回复一个数据,先读出流中的 head 部分
headData := make([]byte, dp.GetHeadLen())
if _, err := conn.Read(headData); err != nil {
fmt.Println("read head error:", err)
break
}
// 拆包,得到 msgID 和 dataLen 放在 msg 中
msgHead, err := dp.Unpack(headData)
if err != nil {
fmt.Println("read buf error:", err)
fmt.Println("unpack head error:", err)
return
}
fmt.Printf("server call back: %s, cnt = %d\n", string(buf[:cnt]), cnt)
if msgHead.GetDataLen() > 0 {
// msg 是有数据的,需要再次读取 dataLen 个字节的数据
msg := msgHead.(*Message)
msg.data = make([]byte, msg.GetDataLen())
// 根据 dataLen 的长度再次从io中读取
if _, err := conn.Read(msg.data); err != nil {
fmt.Println("read msg data error:", err)
return
}
fmt.Println("==> Recv Server Msg: ID=", msg.GetMsgID(), ", len=", msg.GetDataLen(), ", data=", string(msg.GetData()))
}
time.Sleep(1 * time.Second)
}