package observer import "reflect" // Subject 被观察者 type Subject struct { observers []Observer in chan any } // Observer 观察者 chan type Observer chan any // NewSubject 获取被观察者实例 func NewSubject() Subject { return Subject{in: make(chan any)} } // Attach 观察者绑定 func (s *Subject) Attach(obs ...Observer) { s.observers = append(s.observers, obs...) } // Detach 观察者解绑 func (s *Subject) Detach(obs Observer) { for i, o := range s.observers { if o == obs { s.observers = append(s.observers[:i], s.observers[i+1:]...) close(obs) break } } } // Notify 通知观察者 func (s *Subject) Notify(data any) { // 这里可考虑不需要使用协程运行 go s.fanOut(s.in, s.observers) s.in <- data } // fanOut 扇出模式实现 func (s *Subject) fanOut(ch <-chan interface{}, out []Observer) { // 绑定输入 chan 的 reflect.SelectCase cases := []reflect.SelectCase{ {Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch)}, } go func() { defer func() { // 退出时关闭所有的输出 chan for _, o := range out { close(o) } }() for { _, value, ok := reflect.Select(cases) // 从输入 chan 中读取数据 if !ok { // 输入 channel 被关闭 return } // 输入 channel 接收到数据 for _, o := range out { o <- value.Interface() // 放入到输出 chan 中,同步方式 } } }() }