go-study/observer/observer.go

72 lines
1.4 KiB
Go
Raw Normal View History

2023-12-23 14:33:52 +08:00
package observer
import "reflect"
// Subject 被观察者
type Subject struct {
observers []Observer
in chan any
}
// Observer 观察者 chan
type Observer chan any
// NewSubject 获取被观察者实例
func NewSubject() Subject {
return Subject{in: make(chan any)}
}
// Attach 观察者绑定
func (s *Subject) Attach(observer ...Observer) {
s.observers = append(s.observers, observer...)
}
// Detach 观察者解绑
func (s *Subject) Detach(observer Observer) {
for i, obs := range s.observers {
if obs == observer {
s.observers = append(s.observers[:i], s.observers[i+1:]...)
close(observer)
break
}
}
}
// Notify 通知观察者
func (s *Subject) Notify(data any) {
go s.fanOut(s.in, s.observers)
s.in <- data
}
// fanOut 扇出模式实现
func (s *Subject) fanOut(ch <-chan interface{}, out []Observer) {
var cases []reflect.SelectCase
// 添加输入 chan 的 reflect.SelectCase
cases = append(cases, reflect.SelectCase{
Dir: reflect.SelectRecv,
Chan: reflect.ValueOf(ch),
})
go func() {
defer func() {
// 退出时关闭所有的输出 chan
for _, o := range out {
close(o)
}
}()
for {
_, value, ok := reflect.Select(cases) // 从输入 chan 中读取数据
if !ok {
// 输入 channel 被关闭
return
}
// 输入 channel 接收到数据
for _, o := range out {
o <- value.Interface() // 放入到输出 chan 中,同步方式
}
}
}()
}