From 9bc35d20895d7533b70cbb3cb9c8b5c7084a5ca3 Mon Sep 17 00:00:00 2001 From: Tordarus Date: Wed, 15 Jan 2025 12:31:59 +0100 Subject: [PATCH] initial commit --- chan_io.go | 32 ++++++++++++++ each.go | 34 +++++++++++++++ filter.go | 39 +++++++++++++++++ find.go | 38 +++++++++++++++++ flat.go | 46 ++++++++++++++++++++ forward.go | 11 +++++ go.mod | 3 ++ group.go | 94 ++++++++++++++++++++++++++++++++++++++++ internal_stuff.go | 12 ++++++ limited_runner.go | 28 ++++++++++++ map.go | 86 +++++++++++++++++++++++++++++++++++++ merge.go | 28 ++++++++++++ of.go | 99 ++++++++++++++++++++++++++++++++++++++++++ result.go | 102 ++++++++++++++++++++++++++++++++++++++++++++ runner.go | 8 ++++ tee.go | 37 ++++++++++++++++ timeout.go | 30 +++++++++++++ to.go | 84 ++++++++++++++++++++++++++++++++++++ unlimited_runner.go | 19 +++++++++ utils.go | 15 +++++++ 20 files changed, 845 insertions(+) create mode 100644 chan_io.go create mode 100644 each.go create mode 100644 filter.go create mode 100644 find.go create mode 100644 flat.go create mode 100644 forward.go create mode 100644 go.mod create mode 100644 group.go create mode 100644 internal_stuff.go create mode 100644 limited_runner.go create mode 100644 map.go create mode 100644 merge.go create mode 100644 of.go create mode 100644 result.go create mode 100644 runner.go create mode 100644 tee.go create mode 100644 timeout.go create mode 100644 to.go create mode 100644 unlimited_runner.go create mode 100644 utils.go diff --git a/chan_io.go b/chan_io.go new file mode 100644 index 0000000..c3e048e --- /dev/null +++ b/chan_io.go @@ -0,0 +1,32 @@ +package channel + +import ( + "fmt" + "io" + "time" +) + +// WriteInto writes all given values into the channel ch. +// Is is a shorthand for Forward(ch, AsChan(values...)) +func WriteInto[T any](ch chan<- T, values ...T) { + Forward(ch, Of(values...)) +} + +// WriteIntoDelayed writes all given values into the channel ch. +// It sleeps after every write for the given amount of time. +// It is a shorthand for Forward(ch, AsChanDelayed(time, values...)) +func WriteIntoDelayed[T any](ch chan<- T, delay time.Duration, values ...T) { + Forward(ch, OfDelayed(delay, values...)) +} + +// WriteIntoWriter reads all values from ch and writes them via fmt.Fprintln to all writers +func WriteIntoWriter[T any](ch <-chan T, writers ...io.Writer) { + w := io.MultiWriter(writers...) + EachSuccessive(ch, func(value T) { + if err, ok := any(value).(error); ok { + fmt.Fprintln(w, err.Error()) + return + } + fmt.Fprintln(w, value) + }) +} diff --git a/each.go b/each.go new file mode 100644 index 0000000..8c7503f --- /dev/null +++ b/each.go @@ -0,0 +1,34 @@ +package channel + +import "sync" + +// Each consumes all values and calls f for each of them. +// It blocks until source is closed +func Each[T any](source <-chan T, f func(T)) { + EachWithRunner(source, getDefaultRunner(), f) +} + +// Each consumes all values and calls f for each of them. +// It blocks until source is closed +func EachWithRunner[T any](source <-chan T, runner Runner, f func(T)) { + wg := &sync.WaitGroup{} + + for value := range source { + value := value + wg.Add(1) + runner.Run(func() { + defer wg.Done() + f(value) + }) + } + + wg.Wait() +} + +// EachSuccessive consumes all values and calls f for each of them. +// It blocks until source is closed +func EachSuccessive[T any](source <-chan T, f func(T)) { + for value := range source { + f(value) + } +} diff --git a/filter.go b/filter.go new file mode 100644 index 0000000..d99059d --- /dev/null +++ b/filter.go @@ -0,0 +1,39 @@ +package channel + +func FilterSuccessive[T any](source <-chan T, filter func(T) bool) <-chan T { + out := make(chan T, cap(source)) + + go func() { + defer close(out) + for value := range source { + if filter(value) { + out <- value + } + } + }() + + return out +} + +func Filter[T any](source <-chan T, filter func(T) bool) <-chan T { + return FilterPreserveOrderWithRunner(source, getDefaultRunner(), filter) +} + +func FilterPreserveOrderWithRunner[T any](source <-chan T, runner Runner, filter func(T) bool) <-chan T { + type FilteredValue[T any] struct { + Value T + Filter bool + } + + mappedValues := MapPreserveOrderWithRunner(source, runner, func(value T) FilteredValue[T] { + return FilteredValue[T]{Value: value, Filter: filter(value)} + }) + + filteredValues := FilterSuccessive(mappedValues, func(filteredValue FilteredValue[T]) bool { + return filteredValue.Filter + }) + + return MapSuccessive(filteredValues, func(filteredValue FilteredValue[T]) T { + return filteredValue.Value + }) +} diff --git a/find.go b/find.go new file mode 100644 index 0000000..006fdcc --- /dev/null +++ b/find.go @@ -0,0 +1,38 @@ +package channel + +import "context" + +func FindFirst[T any](source <-chan T) *T { + for v := range source { + return &v + } + return nil +} + +func FindFirstAndCancel[T any](source <-chan T, cancel context.CancelFunc) *T { + defer cancel() + for v := range source { + return &v + } + return nil +} + +func FindLast[T any](source <-chan T) *T { + var last *T = new(T) + found := false + + for v := range source { + *last = v + found = true + } + + if !found { + return nil + } + + return last +} + +func HasAny[T any](source <-chan T) bool { + return FindFirst(source) != nil +} diff --git a/flat.go b/flat.go new file mode 100644 index 0000000..5ce1f25 --- /dev/null +++ b/flat.go @@ -0,0 +1,46 @@ +package channel + +func FlatSlice[T any](source <-chan []T) <-chan T { + out := make(chan T, cap(source)) + + go func() { + defer close(out) + for slice := range source { + for _, v := range slice { + out <- v + } + } + }() + + return out +} + +func FlatMap[K comparable, V, T any](source <-chan map[K]V, unmapper func(key K, value V) T) <-chan T { + out := make(chan T, cap(source)) + + go func() { + defer close(out) + for slice := range source { + for k, v := range slice { + out <- unmapper(k, v) + } + } + }() + + return out +} + +func FlatChan[T any](source <-chan <-chan T) <-chan T { + out := make(chan T, cap(source)) + + go func() { + defer close(out) + for ch := range source { + for v := range ch { + out <- v + } + } + }() + + return out +} diff --git a/forward.go b/forward.go new file mode 100644 index 0000000..5808504 --- /dev/null +++ b/forward.go @@ -0,0 +1,11 @@ +package channel + +// Forward reads all values from all sources and sends them to target. +// It blocks until all values are forwarded and the out channel was closed. +// Use with go keyword for non-blocking behavior +func Forward[T any](target chan<- T, sources ...<-chan T) { + for value := range Merge(sources...) { + target <- value + } + close(target) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e3abc3d --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.tordarus.net/tordarus/channel + +go 1.23 diff --git a/group.go b/group.go new file mode 100644 index 0000000..4078259 --- /dev/null +++ b/group.go @@ -0,0 +1,94 @@ +package channel + +import ( + "time" +) + +// GroupByTime groups all incoming values from source using the grouper function +// and sends them to the returned channel after the given amount of time. +// This can useful for summing or averaging values from a channel on a fixed interval +func GroupByTime[T, G any](source <-chan T, duration time.Duration, grouper func(current G, value T) G) <-chan G { + out := make(chan G, cap(source)) + + go func() { + defer close(out) + + ticker := time.NewTicker(duration) + defer ticker.Stop() + + current := *new(G) + changed := false + for { + select { + case value, ok := <-source: + if !ok { + if changed { + out <- current + } + return + } + current = grouper(current, value) + changed = true + case <-ticker.C: + group := current + out <- group + current = *new(G) + changed = false + } + } + }() + + return out +} + +// GroupByAmount groups all incoming values from source using the grouper function +// and sends them to the returned channel after the given amount of values were grouped. +// This can useful for summing or averaging values from a channel for a given amount of values +func GroupByAmount[T, G any](source <-chan T, amount int, grouper func(current G, value T) G) <-chan G { + out := make(chan G, cap(source)) + + go func() { + defer close(out) + + current := *new(G) + currentAmount := 0 + for value := range source { + currentAmount++ + current = grouper(current, value) + + if currentAmount%amount == 0 { + group := current + out <- group + current = *new(G) + } + } + }() + + return out +} + +// GroupByValue groups all incoming values from source using the grouper function +// and sends them to the returned channel after valueFunc returns true for a given value. +// This can useful for summing or averaging values from a channel for a given amount of values +func GroupByValue[T, G any](source <-chan T, valueFunc func(T) bool, grouper func(current G, value T) G) <-chan G { + out := make(chan G, cap(source)) + + go func() { + defer close(out) + + current := *new(G) + currentAmount := 0 + for value := range source { + currentAmount++ + current = grouper(current, value) + + if valueFunc(value) { + group := current + out <- group + current = *new(G) + } + } + }() + + return out +} diff --git a/internal_stuff.go b/internal_stuff.go new file mode 100644 index 0000000..9ab9010 --- /dev/null +++ b/internal_stuff.go @@ -0,0 +1,12 @@ +package channel + +import "runtime" + +type mapEntry[K comparable, V any] struct { + Key K + Value V +} + +func getDefaultRunner() Runner { + return NewLimitedRunner(runtime.NumCPU()) +} diff --git a/limited_runner.go b/limited_runner.go new file mode 100644 index 0000000..c571649 --- /dev/null +++ b/limited_runner.go @@ -0,0 +1,28 @@ +package channel + +// LimitedRunner is a Runner which runs its methods +// in a pre-defined amount of routines +type LimitedRunner struct { + limiter chan struct{} +} + +var _ Runner = &LimitedRunner{} + +// NewLimitedRunner returns a new LimitedRunner with the given amount +// of allowed routines +func NewLimitedRunner(routineLimit int) *LimitedRunner { + return &LimitedRunner{ + limiter: make(chan struct{}, routineLimit), + } +} + +// Run blocks if the limit is currently exceeded. +// It blocks until a routine becomes available again. +// For non-blocking behavior, use go syntax +func (r *LimitedRunner) Run(f func()) { + r.limiter <- struct{}{} + go func() { + defer func() { <-r.limiter }() + f() + }() +} diff --git a/map.go b/map.go new file mode 100644 index 0000000..b993582 --- /dev/null +++ b/map.go @@ -0,0 +1,86 @@ +package channel + +import "sync" + +// MapPreserveOrder applies mapper to all I's coming from source and sends their return values to out while preserving input order. +// All mappings will be done as concurrently as possible using as many threads as there are CPU cores +func MapPreserveOrder[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { + return MapPreserveOrderWithRunner(source, getDefaultRunner(), mapper) +} + +// MapPreserveOrderWithRunner behaves like MapPreserveOrder but uses runner to spawn its routines +func MapPreserveOrderWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O { + out := make(chan O, cap(source)) + outchannels := make(chan chan O, cap(source)) + + // start routine for each incoming value + go func(in <-chan I, outchannels chan chan O) { + defer close(outchannels) + for inputValue := range in { + inputValue := inputValue + outCh := make(chan O) + outchannels <- outCh + + runner.Run(func() { + defer close(outCh) + outCh <- mapper(inputValue) + }) + } + }(source, outchannels) + + // gather all results in incoming order + go func(out chan<- O, outchannels chan chan O) { + defer close(out) + for ch := range outchannels { + for outputValue := range ch { + out <- outputValue + } + } + }(out, outchannels) + + return out +} + +// Map applies mapper to all I's coming from source and sends their return values to out. +// All mappings will be done as concurrently as possible using as many threads as there are CPU cores +func Map[I, O any](source <-chan I, mapper func(I) O) <-chan O { + return MapWithRunner(source, getDefaultRunner(), mapper) +} + +// MapWithRunner behaves like Map but uses runner to spawn its routines +func MapWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O { + out := make(chan O, cap(source)) + + go func() { + defer close(out) + wg := &sync.WaitGroup{} + + for value := range source { + value := value + wg.Add(1) + runner.Run(func() { + defer wg.Done() + out <- mapper(value) + }) + } + + wg.Wait() + }() + + return out +} + +// MapSuccessive applies mapper to all I's coming from source and sends their return values to out while preserving input order. +// All mappings will be done successively in a single thread +func MapSuccessive[I, O any](source <-chan I, mapper func(I) O) <-chan O { + out := make(chan O, cap(source)) + + go func() { + defer close(out) + for value := range source { + out <- mapper(value) + } + }() + + return out +} diff --git a/merge.go b/merge.go new file mode 100644 index 0000000..a0a5661 --- /dev/null +++ b/merge.go @@ -0,0 +1,28 @@ +package channel + +import "sync" + +// Merge returns a channel in which all values of all incoming channels are sent to. +// The values will be sent in the same order as they are received +func Merge[T any](channels ...<-chan T) <-chan T { + out := make(chan T, determineBufferSize(channels)) + + var wg sync.WaitGroup + wg.Add(len(channels)) + + for _, ch := range channels { + go func(ch <-chan T) { + for v := range ch { + out <- v + } + wg.Done() + }(ch) + } + + go func() { + wg.Wait() + close(out) + }() + + return out +} diff --git a/of.go b/of.go new file mode 100644 index 0000000..e3a2f4c --- /dev/null +++ b/of.go @@ -0,0 +1,99 @@ +package channel + +import ( + "context" + "iter" + "time" +) + +// Of returns a channel containing all values +func Of[T any](values ...T) <-chan T { + return OfDelayed(0, values...) +} + +// OfDelayed behaves like Of but with a pre-defined delay between each value +func OfDelayed[T any](delay time.Duration, values ...T) <-chan T { + return OfDelayedFunc(func(value T) time.Duration { return delay }, values...) +} + +// OfDelayedFunc behaves like OfDelayed but accepts a function to determine the delay +func OfDelayedFunc[T any](delayFunc func(value T) time.Duration, values ...T) <-chan T { + out := make(chan T, len(values)) + + go func(out chan T, values []T) { + defer close(out) + for i, value := range values { + out <- value + if i < len(values)-1 { + time.Sleep(delayFunc(value)) + } + } + }(out, values) + + return out +} + +// OfFunc returns a channel containing the return values of successively calling f +// It closes the channel as soon as ctx is done +func OfFunc[T any](ctx context.Context, buffer int, f func() T) <-chan T { + out := make(chan T, buffer) + + go func() { + defer close(out) + + for ctx.Err() == nil { + select { + case out <- f(): + case <-ctx.Done(): + return + } + } + }() + + return out +} + +// OfMap returns a channel containing the return values of the unmapper function +// applied to any key-value pair in m +// The order is random +func OfMap[K comparable, V, T any](m map[K]V, unmapper func(K, V) T) <-chan T { + out := make(chan T, len(m)) + + go func() { + defer close(out) + for k, v := range m { + out <- unmapper(k, v) + } + }() + + return out +} + +// OfSeq returns a channel containing all values provided by the iterator +func OfSeq[T any](seq iter.Seq[T], buffer int) <-chan T { + out := make(chan T, buffer) + + go func() { + defer close(out) + for v := range seq { + out <- v + } + }() + + return out +} + +// OfSeq2 returns a channel containing the return values of the unmapper function +// when provided with the values of the iterator +func OfSeq2[K comparable, V, T any](seq iter.Seq2[K, V], buffer int, unmapper func(K, V) T) <-chan T { + out := make(chan T, buffer) + + go func() { + defer close(out) + for key, value := range seq { + out <- unmapper(key, value) + } + }() + + return out +} diff --git a/result.go b/result.go new file mode 100644 index 0000000..c5ab05d --- /dev/null +++ b/result.go @@ -0,0 +1,102 @@ +package channel + +type Result[T any] struct { + value *T + err error +} + +func ResultOf[T any](value T, err error) Result[T] { + if err != nil { + return Result[T]{value: nil, err: err} + } + + return Result[T]{value: &value, err: nil} +} + +func WrapResultOutputFunc[I, O any](f func(I) (O, error)) func(I) Result[O] { + return func(i I) Result[O] { return ResultOf(f(i)) } +} + +func WrapResultFunc[I, O any](f func(I) (O, error)) func(Result[I]) Result[O] { + resFunc := WrapResultOutputFunc(f) + nilValue := *new(O) + return func(r Result[I]) Result[O] { + v, err := r.Get() + if err != nil { + return ResultOf(nilValue, err) + } + return resFunc(v) + } +} + +func (r Result[T]) Success() bool { + return r.err == nil +} + +func (r Result[T]) Fail() bool { + return !r.Success() +} + +func (r Result[T]) GetOrDefault(defaultValue T) T { + if r.Fail() { + return defaultValue + } + + return *r.value +} + +func (r Result[T]) Get() (T, error) { + if r.err != nil { + return *new(T), r.err + } + return *r.value, r.err +} + +func (r Result[T]) GetUnsafe() T { + if r.err != nil { + panic(r.err) + } + return *r.value +} + +func (r Result[T]) Err() error { + return r.err +} + +func FilterSuccess[T any](source <-chan Result[T]) <-chan T { + succeeded := Filter(source, Result[T].Success) + + return MapSuccessive(succeeded, func(r Result[T]) T { + v, _ := r.Get() + return v + }) +} + +func FilterFail[T any](source <-chan Result[T]) <-chan T { + failed := Filter(source, Result[T].Fail) + + return MapSuccessive(failed, func(r Result[T]) T { + v, _ := r.Get() + return v + }) +} + +func FilterResults[T any](source <-chan Result[T]) (succeeded <-chan T, failed <-chan error) { + succ := make(chan T, cap(source)) + fail := make(chan error, cap(source)) + + go func() { + defer close(succ) + defer close(fail) + + for r := range source { + if r.Fail() { + fail <- r.Err() + continue + } + succ <- r.GetUnsafe() + } + }() + + return succ, fail +} diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..7de9e36 --- /dev/null +++ b/runner.go @@ -0,0 +1,8 @@ +package channel + +// Runner is any runnable environment +type Runner interface { + // Run runs f in the Runners environment + // It might be blocking or non-blocking depending on Runners implementation + Run(f func()) +} diff --git a/tee.go b/tee.go new file mode 100644 index 0000000..60c5cd0 --- /dev/null +++ b/tee.go @@ -0,0 +1,37 @@ +package channel + +// Tee returns 2 channels which both receive all values from source. +// It's basically a copy function for channels +func Tee[T any](source <-chan T) (<-chan T, <-chan T) { + outs := TeeMany(source, 2) + return outs[0], outs[1] +} + +// TeeMany returns a given amount of channels which all receive all values from source. +// It's basically a copy function for channels +func TeeMany[T any](source <-chan T, amount int) []<-chan T { + outputs := make([]chan T, amount) + for i := range outputs { + outputs[i] = make(chan T, cap(source)) + } + + go func() { + defer func() { + for _, out := range outputs { + close(out) + } + }() + + for value := range source { + for _, out := range outputs { + out <- value + } + } + }() + + readOnlyOutputs := make([]<-chan T, 0, amount) + for _, out := range outputs { + readOnlyOutputs = append(readOnlyOutputs, out) + } + return readOnlyOutputs +} diff --git a/timeout.go b/timeout.go new file mode 100644 index 0000000..203955e --- /dev/null +++ b/timeout.go @@ -0,0 +1,30 @@ +package channel + +import "time" + +// CloseOnTimeout returns a channel which receives all values from the source. +// If no value was received in the given timeout duration, the returned channel will be closed. +// The input channel will not be closed. +func CloseOnTimeout[T any](source <-chan T, timeout time.Duration) <-chan T { + output := make(chan T, cap(source)) + + go func() { + defer close(output) + + for { + timer := time.NewTimer(timeout) + + select { + case value, ok := <-source: + if !ok { + return + } + output <- value + case <-timer.C: + return + } + } + }() + + return output +} diff --git a/to.go b/to.go new file mode 100644 index 0000000..86d8768 --- /dev/null +++ b/to.go @@ -0,0 +1,84 @@ +package channel + +import "container/list" + +// ToSlice returns a slice containing all values read from ch +func ToSlice[T any](ch <-chan T) []T { + s := make([]T, 0, cap(ch)) + EachSuccessive(ch, func(value T) { s = append(s, value) }) + return s +} + +// ToSliceContinuous returns a slice containing all values read from ch. +// The returned slice will be a pointer slice to a continuous block of memory. +// All values will be copied. +func ToSliceContinuous[T any](ch <-chan *T) []*T { + values := make([]T, 0, cap(ch)) + pointers := make([]*T, 0, cap(ch)) + EachSuccessive(ch, func(value *T) { + pointers = append(pointers, value) + + if value != nil { + values = append(values, *value) + } + }) + return pointers +} + +// ToSliceDeref returns a slice containing all values read from ch. +// The returned slice will be a dereferenced and continuous block of memory. +// Nil pointers are ignored. +func ToSliceDeref[T any](ch <-chan *T) []T { + s := make([]T, 0, cap(ch)) + EachSuccessive(ch, func(value *T) { + if value != nil { + s = append(s, *value) + } + }) + return s +} + +// ToList returns a list.List containing all values read from ch +func ToList[T any](ch <-chan T) *list.List { + l := list.New() + EachSuccessive(ch, func(value T) { l.PushBack(value) }) + return l +} + +// ToMap returns a map containing all values read from ch. +// The map key-value pairs are determined by f which will be called as concurrently as possible +// to build the resulting map +func ToMap[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V { + return ToMapWithRunner(ch, getDefaultRunner(), f) +} + +// ToMap returns a map containing all values read from ch. +// The map key-value pairs are determined by f which will be called as concurrently as possible +// to build the resulting map +func ToMapWithRunner[T any, K comparable, V any](ch <-chan T, runner Runner, f func(T) (K, V)) map[K]V { + map2entry := func(t T) mapEntry[K, V] { + k, v := f(t) + return mapEntry[K, V]{Key: k, Value: v} + } + + map2kv := func(e mapEntry[K, V]) (K, V) { return e.Key, e.Value } + + return ToMapSuccessive(MapWithRunner(ch, runner, map2entry), map2kv) +} + +// ToMapSuccessive returns a map containing all values read from ch. +// The map key-value pairs are determined by f +func ToMapSuccessive[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V { + m := map[K]V{} + EachSuccessive(ch, func(value T) { + k, v := f(value) + m[k] = v + }) + return m +} + +// ToStructMap returns a struct{} map containing all values read from ch as keys. +// It is a shorthand for ToMap(ch, func(value T) (T, struct{}) { return value, struct{}{} }) +func ToStructMap[T comparable](ch <-chan T) map[T]struct{} { + return ToMap(ch, func(value T) (T, struct{}) { return value, struct{}{} }) +} diff --git a/unlimited_runner.go b/unlimited_runner.go new file mode 100644 index 0000000..58e05c3 --- /dev/null +++ b/unlimited_runner.go @@ -0,0 +1,19 @@ +package channel + +// UnlimitedRunner is a Runner which runs each method +// in its own routine +type UnlimitedRunner struct { +} + +var _ Runner = &UnlimitedRunner{} + +// NewUnlimitedRunner returns a new LimitedRunner with the given amount +// of allowed routines +func NewUnlimitedRunner() *UnlimitedRunner { + return &UnlimitedRunner{} +} + +// Run always returns immediately +func (r *UnlimitedRunner) Run(f func()) { + go f() +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..1bd3227 --- /dev/null +++ b/utils.go @@ -0,0 +1,15 @@ +package channel + +func determineBufferSize[T any](channels []<-chan T) int { + if len(channels) == 0 { + return 0 + } + + maxBufSize := 0 + for _, ch := range channels { + if cap(ch) > maxBufSize { + maxBufSize = cap(ch) + } + } + return maxBufSize +}