go-study/prefork/prefork.go

190 lines
3.9 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"errors"
"flag"
"io"
"log"
"net"
"os"
"os/exec"
"os/signal"
"syscall"
)
var (
c = flag.Int("c", 10, "concurrency level, default 10")
prefork = flag.Bool("prefork", false, "use prefork mode")
child = flag.Bool("child", false, "is child process")
)
// Server 封装服务器功能的结构体
type Server struct {
concurrency int // 并发级别
address string // 监听地址
handler func(net.Conn) // 连接处理函数
isPrefork bool // 是否使用prefork模式
isChild bool // 是否是子进程
children []*exec.Cmd // 子进程列表
}
// NewServer 创建新的服务器
func NewServer(address string, concurrency int, isPrefork bool, isChild bool) *Server {
return &Server{
concurrency: concurrency,
address: address,
isPrefork: isPrefork,
isChild: isChild,
children: make([]*exec.Cmd, concurrency),
}
}
// SetHandler 设置连接处理函数
func (s *Server) SetHandler(handler func(net.Conn)) {
s.handler = handler
}
// Run 启动服务器
func (s *Server) Run() error {
var ln net.Listener
var err error
if s.isPrefork {
ln = s.doPrefork()
} else {
ln, err = net.Listen("tcp", s.address)
if err != nil {
return err
}
}
s.start(ln)
return nil
}
// start 处理连接
func (s *Server) start(ln net.Listener) {
log.Println("started")
for {
conn, e := ln.Accept()
if e != nil {
var ne net.Error
if errors.As(e, &ne) && ne.Temporary() {
log.Printf("accept temp err: %v", ne)
continue
}
log.Printf("accept err: %v", e)
return
}
go s.handler(conn)
}
}
// doPrefork 实现prefork模式
func (s *Server) doPrefork() net.Listener {
if s.isChild {
// 子进程:从文件描述符恢复监听器
listener, err := net.FileListener(os.NewFile(3, ""))
if err != nil {
log.Fatal(err)
}
// 优雅关闭处理
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM)
go func() {
<-sigCh
err := listener.Close()
if err != nil {
return
}
}()
return listener
}
// 主进程:创建监听器并将其传递给子进程
addr, err := net.ResolveTCPAddr("tcp", s.address)
if err != nil {
log.Fatal(err)
}
tcpListener, err := net.ListenTCP("tcp", addr)
if err != nil {
log.Fatal(err)
}
fl, err := tcpListener.File()
if err != nil {
log.Fatal(err)
}
// 主进程不需要保持监听
if err := tcpListener.Close(); err != nil {
log.Fatal(err)
}
// 处理信号以优雅关闭
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
// 启动子进程管理协程
go func() {
<-sigCh
// 收到信号后通知所有子进程优雅关闭
for _, child := range s.children {
if child != nil && child.Process != nil {
if err := child.Process.Signal(syscall.SIGTERM); err != nil {
log.Fatal(err)
}
}
}
os.Exit(0)
}()
// 声明子进程启动函数以便递归调用
var startChildProcess func(int)
startChildProcess = func(i int) {
cmd := exec.Command(os.Args[0], "-prefork", "-child")
cmd.Stdout = os.Stdout
cmd.Stderr = os.Stderr
cmd.ExtraFiles = []*os.File{fl}
s.children[i] = cmd
if err := cmd.Start(); err != nil {
log.Fatalf("启动子进程 %d 失败: %v", i, err)
}
// 在单独的协程中等待子进程结束并重启
go func() {
if err := cmd.Wait(); err != nil {
log.Printf("子进程 %d 异常退出: %v正在重启...", i, err)
startChildProcess(i) // 递归重启
}
}()
}
// 启动和监控子进程
for i := range s.children {
startChildProcess(i)
}
// 主进程保持运行,但不处理连接
select {} // 阻塞主进程
}
func main() {
flag.Parse()
server := NewServer(":8972", *c, *prefork, *child)
server.SetHandler(func(conn net.Conn) {
_, err := io.Copy(conn, conn) // 实现 echo 协议
if err != nil {
log.Printf("copy err: %v", err)
}
})
if err := server.Run(); err != nil {
panic(err)
}
}