From 9a81b898851fc56990a1ed50e75752bec4749beb Mon Sep 17 00:00:00 2001 From: wangwenbin Date: Sat, 23 Dec 2023 14:33:52 +0800 Subject: [PATCH] =?UTF-8?q?=E7=BB=83=E4=B9=A0=E4=BB=A3=E7=A0=81push?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- cyclic_barrier/h2o.go | 96 ++++++++++++++++++++++++++++++++++++++ cyclic_barrier/h2o_test.go | 24 ++++++++++ go.mod | 9 ++++ go.sum | 6 +++ observer/observer.go | 71 ++++++++++++++++++++++++++++ observer/observer_test.go | 31 ++++++++++++ routine/pool.go | 63 +++++++++++++++++++++++++ routine/pool_test.go | 32 +++++++++++++ 8 files changed, 332 insertions(+) create mode 100644 cyclic_barrier/h2o.go create mode 100644 cyclic_barrier/h2o_test.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 observer/observer.go create mode 100644 observer/observer_test.go create mode 100644 routine/pool.go create mode 100644 routine/pool_test.go diff --git a/cyclic_barrier/h2o.go b/cyclic_barrier/h2o.go new file mode 100644 index 0000000..9bcda64 --- /dev/null +++ b/cyclic_barrier/h2o.go @@ -0,0 +1,96 @@ +package cyclic_barrier + +import ( + "context" + "fmt" + "github.com/marusama/cyclicbarrier" + "golang.org/x/sync/semaphore" + "math/rand" + "sync" + "time" +) + +const ( + H2OHydrogenNum = 2 // 氢原子数量 + H2OOxygenNum = 1 // 氧原子数量 +) + +// H2O 水分子结构体 +type H2O struct { + semaH *semaphore.Weighted + semaO *semaphore.Weighted + cb cyclicbarrier.CyclicBarrier + wg sync.WaitGroup +} + +func NewH2O() *H2O { + return &H2O{ + semaH: semaphore.NewWeighted(H2OHydrogenNum), // 氢原子的信号量 + semaO: semaphore.NewWeighted(H2OOxygenNum), // 氧原子的信号量 + cb: cyclicbarrier.New(H2OHydrogenNum + H2OOxygenNum), // 循环栅栏,用来控制合成 + } +} + +func (h *H2O) hydrogen(releaseHydrogen func()) { + if err := h.semaH.Acquire(context.Background(), 1); err != nil { // 占用氢原子空槽 + fmt.Println("Hydrogen Acquire err:", err) + } + + releaseHydrogen() + if err := h.cb.Await(context.Background()); err != nil { // 等待栅栏放行 + fmt.Println("Hydrogen Await err:", err) + } + + h.semaH.Release(1) // 释放氢原子空槽 +} + +func (h *H2O) oxygen(releaseOxygen func()) { + if err := h.semaO.Acquire(context.Background(), 1); err != nil { // 占用氧原子空槽 + fmt.Println("Oxygen Acquire err:", err) + } + + releaseOxygen() + if err := h.cb.Await(context.Background()); err != nil { // 等待栅栏放行 + fmt.Println("Oxygen Await err:", err) + } + + h.semaO.Release(1) // 释放氧原子空槽 +} + +func (h *H2O) Gen(num uint) <-chan string { + // 计算总共需要生成的原子数量 + sum := H2OHydrogenNum + H2OOxygenNum + numInt := int(num) + // 用来存放生成的原子 + ch := make(chan string, numInt*sum) + releaseHydrogen := func() { + ch <- "H" + } + releaseOxygen := func() { + ch <- "O" + } + + // 使用 WaitGroup 等待所有的 goroutine 完成 + h.wg.Add(numInt * sum) + for i := 0; i < numInt*H2OHydrogenNum; i++ { + go func() { + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + h.hydrogen(releaseHydrogen) + h.wg.Done() + }() + } + for i := 0; i < numInt*H2OOxygenNum; i++ { + go func() { + time.Sleep(time.Duration(rand.Intn(100)) * time.Millisecond) + h.oxygen(releaseOxygen) + h.wg.Done() + }() + } + + go func() { + h.wg.Wait() + close(ch) + }() + + return ch +} diff --git a/cyclic_barrier/h2o_test.go b/cyclic_barrier/h2o_test.go new file mode 100644 index 0000000..7df5948 --- /dev/null +++ b/cyclic_barrier/h2o_test.go @@ -0,0 +1,24 @@ +package cyclic_barrier + +import ( + "sort" + "testing" +) + +func TestH2O(t *testing.T) { + h2o := NewH2O() + ch := h2o.Gen(10) + + result := make([]string, 0, 10) + molecular := make([]string, 0, 3) + for c := range ch { + molecular = append(molecular, c) + if len(molecular) == 3 { + sort.Strings(molecular) + result = append(result, molecular[0]+molecular[1]+molecular[2]) + molecular = molecular[:0] + } + } + + t.Log(result) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..79f28fa --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module go-study + +go 1.20 + +require ( + github.com/brildum/testify v0.0.0-20151105045740-d05693e2e501 // indirect + github.com/marusama/cyclicbarrier v1.1.0 // indirect + golang.org/x/sync v0.5.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..01d178a --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/brildum/testify v0.0.0-20151105045740-d05693e2e501 h1:ZWEmkmb/iJoRKLR0YuRP9wnEeF54NT3iUiGNTm/VQrs= +github.com/brildum/testify v0.0.0-20151105045740-d05693e2e501/go.mod h1:Zpjn5ClomJALLjz5sjBsNW79y3MarfpDcZaSJqbsIwk= +github.com/marusama/cyclicbarrier v1.1.0 h1:ol/AG+sjvh5yz832avbNjaowoerBuD3AgozxL+aD9u0= +github.com/marusama/cyclicbarrier v1.1.0/go.mod h1:5u93l83cy51YXdz6eKq6kO9+9mGAooB6DHMAxcSuWwQ= +golang.org/x/sync v0.5.0 h1:60k92dhOjHxJkrqnwsfl8KuaHbn/5dl0lUPUklKo3qE= +golang.org/x/sync v0.5.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= diff --git a/observer/observer.go b/observer/observer.go new file mode 100644 index 0000000..2c6f6c6 --- /dev/null +++ b/observer/observer.go @@ -0,0 +1,71 @@ +package observer + +import "reflect" + +// Subject 被观察者 +type Subject struct { + observers []Observer + in chan any +} + +// Observer 观察者 chan +type Observer chan any + +// NewSubject 获取被观察者实例 +func NewSubject() Subject { + return Subject{in: make(chan any)} +} + +// Attach 观察者绑定 +func (s *Subject) Attach(observer ...Observer) { + s.observers = append(s.observers, observer...) +} + +// Detach 观察者解绑 +func (s *Subject) Detach(observer Observer) { + for i, obs := range s.observers { + if obs == observer { + s.observers = append(s.observers[:i], s.observers[i+1:]...) + close(observer) + break + } + } +} + +// Notify 通知观察者 +func (s *Subject) Notify(data any) { + go s.fanOut(s.in, s.observers) + s.in <- data +} + +// fanOut 扇出模式实现 +func (s *Subject) fanOut(ch <-chan interface{}, out []Observer) { + var cases []reflect.SelectCase + // 添加输入 chan 的 reflect.SelectCase + cases = append(cases, reflect.SelectCase{ + Dir: reflect.SelectRecv, + Chan: reflect.ValueOf(ch), + }) + + go func() { + defer func() { + // 退出时关闭所有的输出 chan + for _, o := range out { + close(o) + } + }() + + for { + _, value, ok := reflect.Select(cases) // 从输入 chan 中读取数据 + if !ok { + // 输入 channel 被关闭 + return + } + + // 输入 channel 接收到数据 + for _, o := range out { + o <- value.Interface() // 放入到输出 chan 中,同步方式 + } + } + }() +} diff --git a/observer/observer_test.go b/observer/observer_test.go new file mode 100644 index 0000000..d457759 --- /dev/null +++ b/observer/observer_test.go @@ -0,0 +1,31 @@ +package observer + +import ( + "fmt" + "testing" +) + +func TestObserver(t *testing.T) { + sendEmail := make(Observer) + notifyWelcome := make(Observer) + + userRegister := NewSubject() + userRegister.Attach(sendEmail, notifyWelcome) + + go func() { + for data := range sendEmail { + fmt.Println("The send email service receive data: ", data) + } + }() + + go func() { + for data := range notifyWelcome { + fmt.Println("The notify welcome service receive data: ", data) + } + }() + + newUser1 := "fantasticbin" + newUser2 := "gan" + userRegister.Notify(newUser1) + userRegister.Notify(newUser2) +} diff --git a/routine/pool.go b/routine/pool.go new file mode 100644 index 0000000..0bb2809 --- /dev/null +++ b/routine/pool.go @@ -0,0 +1,63 @@ +package routine + +import ( + "fmt" + "runtime/debug" + "sync" +) + +type Pool[T any] struct { + taskQueue chan T + taskFn func(T) + workers int + wg sync.WaitGroup +} + +func NewPool[T any](workers, capacity int, taskFn func(T)) *Pool[T] { + pool := &Pool[T]{ + taskQueue: make(chan T, capacity), + taskFn: func(t T) { + defer func() { + // 处理协程运行中出现panic的情况 + if r := recover(); r != nil { + fmt.Printf("Panic: %v\n %s", r, string(debug.Stack())) + } + }() + + taskFn(t) + }, + workers: workers, + } + pool.wg.Add(workers) + + return pool +} + +// Start 启动任务 +func (p *Pool[T]) Start() { + for i := 0; i < p.workers; i++ { + go func() { + defer p.wg.Done() + + for { + task, ok := <-p.taskQueue + if !ok { + return + } + + p.taskFn(task) + } + }() + } +} + +// Push 提交任务 +func (p *Pool[T]) Push(task T) { + p.taskQueue <- task +} + +// Wait 挂起当前协程 +func (p *Pool[T]) Wait() { + close(p.taskQueue) + p.wg.Wait() +} diff --git a/routine/pool_test.go b/routine/pool_test.go new file mode 100644 index 0000000..09cb3ee --- /dev/null +++ b/routine/pool_test.go @@ -0,0 +1,32 @@ +package routine + +import ( + "runtime" + "sync/atomic" + "testing" + + "github.com/brildum/testify/assert" +) + +func TestPool(t *testing.T) { + num := runtime.NumCPU() + var sum atomic.Int32 + pool := NewPool(num, num, func(num int32) { + if num < 0 { + panic("unable to handle negative numbers") + } + + sum.Add(num) + }) + pool.Start() + + for i := int32(1000); i >= -1; i-- { + pool.Push(i) + } + + pool.Wait() + + if assert.Equal(t, int32(500500), sum.Load()) { + t.Log("the sum value is right") + } +}