package main
import (
"fmt""sync"
)
funcproducer(nums ...int) <-chanint {
out := make(chanint)
gofunc() {
deferclose(out)
for _, n := range nums {
out <- i
}
}()
return out
}
funcsquare(inCh <-chanint) <-chanint {
out := make(chanint)
gofunc() {
deferclose(out)
for n := range inCh {
out <- n * n
}
}()
return out
}
funcmerge(cs ...<-chanint) <-chanint {
out := make(chanint)
var wg sync.WaitGroup
collect := func(in <-chanint) {
defer wg.Done()
for n := range in {
out <- n
}
}
wg.Add(len(cs))
// FAN-INfor _, c := range cs {
go collect(c)
}
// 错误方式:直接等待是bug,死锁,因为merge写了out,main却没有读// wg.Wait()// close(out)// 正确方式gofunc() {
wg.Wait()
close(out)
}()
return out
}
funcmain() {
in := producer(1, 2, 3, 4)
// FAN-OUT
c1 := square(in)
c2 := square(in)
c3 := square(in)
// consumerfor ret := range merge(c1, c2, c3) {
fmt.Printf("%3d ", ret)
}
fmt.Println()
}
// hi_simple.gopackage main
import (
"fmt"
)
funcproducer(n int) <-chanint {
out := make(chanint)
gofunc() {
deferclose(out)
for i := 0; i < n; i++ {
out <- i
}
}()
return out
}
funcsquare(inCh <-chanint) <-chanint {
out := make(chanint)
gofunc() {
deferclose(out)
for n := range inCh {
out <- n * n
// simulate
time.Sleep(time.Second)
}
}()
return out
}
funcmain() {
in := producer(10)
ch := square(in)
// consumerfor _ = range ch {
}
}
// hi_fan.gopackage main
import (
"sync""time"
)
funcproducer(n int) <-chanint {
out := make(chanint)
gofunc() {
deferclose(out)
for i := 0; i < n; i++ {
out <- i
}
}()
return out
}
funcsquare(inCh <-chanint) <-chanint {
out := make(chanint)
gofunc() {
deferclose(out)
for n := range inCh {
out <- n * n
// simulate
time.Sleep(time.Second)
}
}()
return out
}
funcmerge(cs ...<-chanint) <-chanint {
out := make(chanint)
var wg sync.WaitGroup
collect := func(in <-chanint) {
defer wg.Done()
for n := range in {
out <- n
}
}
wg.Add(len(cs))
// FAN-INfor _, c := range cs {
go collect(c)
}
// 错误方式:直接等待是bug,死锁,因为merge写了out,main却没有读// wg.Wait()// close(out)// 正确方式gofunc() {
wg.Wait()
close(out)
}()
return out
}
funcmain() {
in := producer(10)
// FAN-OUT
c1 := square(in)
c2 := square(in)
c3 := square(in)
// consumerfor _ = range merge(c1, c2, c3) {
}
}
➜ awesome git:(master) ✗ time go run hi_simple.go
go run hi_simple.go 0.17s user 0.18s system 3% cpu 10.389 total
➜ awesome git:(master) ✗
➜ awesome git:(master) ✗ time go run hi_fan.go
go run hi_fan.go 0.17s user 0.16s system 7% cpu 4.288 total
// hi_simple.gofuncsquare(inCh <-chanint) <-chanint {
out := make(chanint)
gofunc() {
deferclose(out)
for n := range inCh {
out <- n * n
}
}()
return out
}
funcmain() {
in := producer(10000000)
ch := square(in)
// consumerfor _ = range ch {
}
}
// hi_fan.gopackage main
import (
"sync"
)
funcsquare(inCh <-chanint) <-chanint {
out := make(chanint)
gofunc() {
deferclose(out)
for n := range inCh {
out <- n * n
}
}()
return out
}
funcmain() {
in := producer(10000000)
// FAN-OUT
c1 := square(in)
c2 := square(in)
c3 := square(in)
// consumerfor _ = range merge(c1, c2, c3) {
}
}
结果,可以跑多次,结果近似:
➜ awesome git:(master) ✗ time go run hi_simple.go
go run hi_simple.go 9.96s user 5.93s system 168% cpu 9.424 total
➜ awesome git:(master) ✗ time go run hi_fan.go
go run hi_fan.go 23.35s user 11.51s system 297% cpu 11.737 total