Zinx-V0.2

This commit is contained in:
fantasticbin 2025-09-13 21:50:54 +08:00
parent f5bfa7b513
commit 3b181b7b41
3 changed files with 142 additions and 19 deletions

View File

@ -0,0 +1,19 @@
package ziface
import "net"
type IConnection interface {
// Start 启动连接
Start()
// Stop 停止连接
Stop()
// GetTCPConnection 获取当前连接的 TCP Socket 套接字
GetTCPConnection() *net.TCPConn
// GetConnID 获取当前连接 ID
GetConnID() uint32
// RemoteAddr 获取远程客户端地址信息
RemoteAddr() net.Addr
}
// HandleFunc 定义一个统一处理连接业务的方法
type HandleFunc func(*net.TCPConn, []byte, int) error

105
zinx/znet/connection.go Normal file
View File

@ -0,0 +1,105 @@
package znet
import (
"fmt"
"go-study/zinx/ziface"
"net"
)
type Connection struct {
// 该连接的 TCP Socket 套接字
Conn *net.TCPConn
// 连接 ID全局唯一
ConnID uint32
// 该连接的关闭状态
IsClosed bool
// 该连接的处理方法
HandleFunc ziface.HandleFunc
// 该连接的退出消息通知 channel
ExitBuffChan chan struct{}
}
// NewConnection 创建连接的方法
func NewConnection(conn *net.TCPConn, connID uint32, handleFunc ziface.HandleFunc) ziface.IConnection {
return &Connection{
Conn: conn,
ConnID: connID,
IsClosed: false,
HandleFunc: handleFunc,
ExitBuffChan: make(chan struct{}, 1),
}
}
// StartReader 处理连接读数据的业务方法
func (c *Connection) StartReader() {
fmt.Println("Reader Goroutine is running")
defer fmt.Println("connID =", c.ConnID, "Reader is exit, remote addr is", c.RemoteAddr().String())
defer c.Stop()
for {
// 读取客户端数据到buf中最大512字节
buf := make([]byte, 512)
cnt, err := c.Conn.Read(buf)
if err != nil {
fmt.Println("recv buf err", err)
c.ExitBuffChan <- struct{}{}
continue
}
// 调用当前连接业务(这里执行的是当前连接绑定的 handle 方法)
if err := c.HandleFunc(c.Conn, buf, cnt); err != nil {
fmt.Println("ConnID =", c.ConnID, "handle is error", err)
c.ExitBuffChan <- struct{}{}
continue
}
}
}
// Start 启动连接,让当前连接开始工作
func (c *Connection) Start() {
// 启动当前连接的读数据业务
go c.StartReader()
for {
select {
case <-c.ExitBuffChan:
// 得到退出消息,不再阻塞
return
}
}
}
// Stop 停止连接,结束当前连接状态
func (c *Connection) Stop() {
if c.IsClosed {
return
}
c.IsClosed = true
// TODO 如果用户注册了该连接的关闭回调业务,那么在此刻应该显式调用
// 关闭 Socket 连接
_ = c.Conn.Close()
// 通知从缓冲队列读取数据的业务,该连接已经关闭
c.ExitBuffChan <- struct{}{}
// 关闭该连接的全部管道
close(c.ExitBuffChan)
}
// GetTCPConnection 获取当前连接的 TCP Socket 套接字
func (c *Connection) GetTCPConnection() *net.TCPConn {
return c.Conn
}
// GetConnID 获取当前连接 ID
func (c *Connection) GetConnID() uint32 {
return c.ConnID
}
// RemoteAddr 获取远程客户端地址信息
func (c *Connection) RemoteAddr() net.Addr {
return c.Conn.RemoteAddr()
}

View File

@ -48,6 +48,9 @@ func (s *Server) Start() {
fmt.Println("start Zinx server ", s.Name, " succ, now listening...")
// TODO 应该有一个自动生成 ID 的方法
cid := uint32(0)
// 3. 阻塞等待客户端连接,处理客户端连接业务(读写)
for {
// 3.1 阻塞等待客户端连接AcceptTCP 会阻塞
@ -59,26 +62,12 @@ func (s *Server) Start() {
// 3.2 TODO 设置服务器最大连接数,如果超过最大连接数则关闭此新连接
// 3.3 TODO 处理该新连接请求的业务方法,此时 handler 和 conn 应该是绑定的
// 3.3 处理该新连接请求的业务方法,此时 handler 和 conn 应该是绑定的
dealConn := NewConnection(conn, cid, CallbackToClient)
cid++
// 这里暂时做一个最大 512 字节的回显服务
go func() {
// 不断的循环,从客户端获取数据
for {
buf := make([]byte, 512)
cnt, err := conn.Read(buf)
if err != nil {
fmt.Println("recv buf err", err)
continue
}
// 回显
if _, err := conn.Write(buf[:cnt]); err != nil {
fmt.Println("write back buf err", err)
continue
}
}
}()
// 3.4 启动当前连接的业务处理
go dealConn.Start()
}
}()
}
@ -99,3 +88,13 @@ func (s *Server) Serve() {
// 阻塞,否则主 Go 程退出listener 会退出
select {}
}
// CallbackToClient 定义当前客户端连接的业务处理方法
func CallbackToClient(conn *net.TCPConn, data []byte, cnt int) error {
// 回显业务
if _, err := conn.Write(data[:cnt]); err != nil {
fmt.Println("write back buf err", err)
return fmt.Errorf("CallbackToClient error: %v", err)
}
return nil
}