channel/utils.go

60 lines
1.3 KiB
Go

package channel
func determineBufferSize[T any](channels []<-chan T) int {
if len(channels) == 0 {
return 0
}
maxBufSize := 0
for _, ch := range channels {
if cap(ch) > maxBufSize {
maxBufSize = cap(ch)
}
}
return maxBufSize
}
// Flush consumes all values and discards them immediately.
// It blocks until all sources are closed
func Flush[T any](sources ...<-chan T) {
for range Merge(sources...) {
}
}
// Each consumes all values and calls f for each of them.
// It blocks until source is closed
func Each[T any](source <-chan T, f func(T)) {
for value := range source {
f(value)
}
}
// Tee returns 2 channels which both receive all values from source.
// It's basically a copy function for channels
func Tee[T any](source <-chan T) (<-chan T, <-chan T) {
outs := TeeMany(source, 2)
return outs[0], outs[1]
}
// TeeMany returns a given amount of channels which all receive all values from source.
// It's basically a copy function for channels
func TeeMany[T any](source <-chan T, amount int) []<-chan T {
outs := make([]chan T, amount)
go func() {
defer func() {
for _, out := range outs {
close(out)
}
}()
for value := range source {
for _, out := range outs {
out <- value
}
}
}()
return (interface{}(outs)).([]<-chan T)
}