封装多进程模式的实现

This commit is contained in:
fantasticbin 2025-06-16 21:07:33 +08:00
parent 18033415c2
commit ed4190e944

View File

@ -18,25 +18,52 @@ var (
child = flag.Bool("child", false, "is child process") child = flag.Bool("child", false, "is child process")
) )
func main() { // Server 封装服务器功能的结构体
flag.Parse() type Server struct {
concurrency int // 并发级别
address string // 监听地址
handler func(net.Conn) // 连接处理函数
isPrefork bool // 是否使用prefork模式
isChild bool // 是否是子进程
children []*exec.Cmd // 子进程列表
}
// NewServer 创建新的服务器
func NewServer(address string, concurrency int, isPrefork bool, isChild bool) *Server {
return &Server{
concurrency: concurrency,
address: address,
isPrefork: isPrefork,
isChild: isChild,
children: make([]*exec.Cmd, concurrency),
}
}
// SetHandler 设置连接处理函数
func (s *Server) SetHandler(handler func(net.Conn)) {
s.handler = handler
}
// Run 启动服务器
func (s *Server) Run() error {
var ln net.Listener var ln net.Listener
var err error var err error
if *prefork { // 如果要启动子进程模式 if s.isPrefork {
ln = doPrefork(*c) ln = s.doPrefork()
} else { } else {
ln, err = net.Listen("tcp", ":8972") ln, err = net.Listen("tcp", s.address)
if err != nil { if err != nil {
panic(err) return err
} }
} }
start(ln) // 处理 net.Listener s.start(ln)
return nil
} }
func start(ln net.Listener) { // start 处理连接
func (s *Server) start(ln net.Listener) {
log.Println("started") log.Println("started")
for { for {
conn, e := ln.Accept() conn, e := ln.Accept()
@ -51,17 +78,13 @@ func start(ln net.Listener) {
return return
} }
go func() { go s.handler(conn)
_, err := io.Copy(conn, conn) // 实现 echo 协议,将收到的东西原样返回
if err != nil {
log.Printf("copy err: %v", err)
}
}()
} }
} }
func doPrefork(c int) net.Listener { // doPrefork 实现prefork模式
if *child { func (s *Server) doPrefork() net.Listener {
if s.isChild {
// 子进程:从文件描述符恢复监听器 // 子进程:从文件描述符恢复监听器
listener, err := net.FileListener(os.NewFile(3, "")) listener, err := net.FileListener(os.NewFile(3, ""))
if err != nil { if err != nil {
@ -83,7 +106,7 @@ func doPrefork(c int) net.Listener {
} }
// 主进程:创建监听器并将其传递给子进程 // 主进程:创建监听器并将其传递给子进程
addr, err := net.ResolveTCPAddr("tcp", ":8972") addr, err := net.ResolveTCPAddr("tcp", s.address)
if err != nil { if err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -100,9 +123,6 @@ func doPrefork(c int) net.Listener {
log.Fatal(err) log.Fatal(err)
} }
// 启动子进程并持续监控
children := make([]*exec.Cmd, c)
// 处理信号以优雅关闭 // 处理信号以优雅关闭
sigCh := make(chan os.Signal, 1) sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
@ -111,8 +131,8 @@ func doPrefork(c int) net.Listener {
go func() { go func() {
<-sigCh <-sigCh
// 收到信号后通知所有子进程优雅关闭 // 收到信号后通知所有子进程优雅关闭
for _, child := range children { for _, child := range s.children {
if child.Process != nil { if child != nil && child.Process != nil {
if err := child.Process.Signal(syscall.SIGTERM); err != nil { if err := child.Process.Signal(syscall.SIGTERM); err != nil {
log.Fatal(err) log.Fatal(err)
} }
@ -128,7 +148,7 @@ func doPrefork(c int) net.Listener {
cmd.Stdout = os.Stdout cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr cmd.Stderr = os.Stderr
cmd.ExtraFiles = []*os.File{fl} cmd.ExtraFiles = []*os.File{fl}
children[i] = cmd s.children[i] = cmd
if err := cmd.Start(); err != nil { if err := cmd.Start(); err != nil {
log.Fatalf("启动子进程 %d 失败: %v", i, err) log.Fatalf("启动子进程 %d 失败: %v", i, err)
@ -142,11 +162,28 @@ func doPrefork(c int) net.Listener {
} }
}() }()
} }
// 启动和监控子进程 // 启动和监控子进程
for i := range children { for i := range s.children {
startChildProcess(i) startChildProcess(i)
} }
// 主进程保持运行,但不处理连接 // 主进程保持运行,但不处理连接
select {} // 阻塞主进程 select {} // 阻塞主进程
} }
func main() {
flag.Parse()
server := NewServer(":8972", *c, *prefork, *child)
server.SetHandler(func(conn net.Conn) {
_, err := io.Copy(conn, conn) // 实现 echo 协议
if err != nil {
log.Printf("copy err: %v", err)
}
})
if err := server.Run(); err != nil {
panic(err)
}
}