package observer import "reflect" // Subject 被观察者 type Subject struct { observers []Observer in chan any } // Observer 观察者 chan type Observer chan any // NewSubject 获取被观察者实例 func NewSubject() Subject { return Subject{in: make(chan any)} } // Attach 观察者绑定 func (s *Subject) Attach(observer ...Observer) { s.observers = append(s.observers, observer...) } // Detach 观察者解绑 func (s *Subject) Detach(observer Observer) { for i, obs := range s.observers { if obs == observer { s.observers = append(s.observers[:i], s.observers[i+1:]...) close(observer) break } } } // Notify 通知观察者 func (s *Subject) Notify(data any) { go s.fanOut(s.in, s.observers) s.in <- data } // fanOut 扇出模式实现 func (s *Subject) fanOut(ch <-chan interface{}, out []Observer) { var cases []reflect.SelectCase // 添加输入 chan 的 reflect.SelectCase cases = append(cases, reflect.SelectCase{ Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch), }) go func() { defer func() { // 退出时关闭所有的输出 chan for _, o := range out { close(o) } }() for { _, value, ok := reflect.Select(cases) // 从输入 chan 中读取数据 if !ok { // 输入 channel 被关闭 return } // 输入 channel 接收到数据 for _, o := range out { o <- value.Interface() // 放入到输出 chan 中,同步方式 } } }() }