channel/map.go

58 lines
1.6 KiB
Go

package channel
import "runtime"
// Map applies mapper to all I's coming from in and sends their return values to out while preserving input order.
// All mappings will be done as concurrently as possible using as many threads as there are CPU cores
func Map[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) {
return MapWithRunner(source, NewLimitedRunner(runtime.NumCPU()), mapper)
}
// MapWithRunner behaves like Map but uses runner to spawn its routines
func MapWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O {
out := make(chan O, cap(source))
outchannels := make(chan chan O, cap(source))
// start routine for each incoming value
go func(in <-chan I, outchannels chan chan O) {
defer close(outchannels)
for inputValue := range in {
inputValue := inputValue
outCh := make(chan O)
outchannels <- outCh
runner.Run(func() {
defer close(outCh)
outCh <- mapper(inputValue)
})
}
}(source, outchannels)
// gather all results in incoming order
go func(out chan<- O, outchannels chan chan O) {
defer close(out)
for ch := range outchannels {
for outputValue := range ch {
out <- outputValue
}
}
}(out, outchannels)
return out
}
// MapSuccessive applies mapper to all I's coming from in and sends their return values to out while preserving input order.
// All mappings will be done successively
func MapSuccessive[I, O any](source <-chan I, mapper func(I) O) <-chan O {
out := make(chan O, cap(source))
go func() {
defer close(out)
for value := range source {
out <- mapper(value)
}
}()
return out
}