commit 9bc35d20895d7533b70cbb3cb9c8b5c7084a5ca3 Author: Tordarus Date: Wed Jan 15 12:31:59 2025 +0100 initial commit diff --git a/chan_io.go b/chan_io.go new file mode 100644 index 0000000..c3e048e --- /dev/null +++ b/chan_io.go @@ -0,0 +1,32 @@ +package channel + +import ( + "fmt" + "io" + "time" +) + +// WriteInto writes all given values into the channel ch. +// Is is a shorthand for Forward(ch, AsChan(values...)) +func WriteInto[T any](ch chan<- T, values ...T) { + Forward(ch, Of(values...)) +} + +// WriteIntoDelayed writes all given values into the channel ch. +// It sleeps after every write for the given amount of time. +// It is a shorthand for Forward(ch, AsChanDelayed(time, values...)) +func WriteIntoDelayed[T any](ch chan<- T, delay time.Duration, values ...T) { + Forward(ch, OfDelayed(delay, values...)) +} + +// WriteIntoWriter reads all values from ch and writes them via fmt.Fprintln to all writers +func WriteIntoWriter[T any](ch <-chan T, writers ...io.Writer) { + w := io.MultiWriter(writers...) + EachSuccessive(ch, func(value T) { + if err, ok := any(value).(error); ok { + fmt.Fprintln(w, err.Error()) + return + } + fmt.Fprintln(w, value) + }) +} diff --git a/each.go b/each.go new file mode 100644 index 0000000..8c7503f --- /dev/null +++ b/each.go @@ -0,0 +1,34 @@ +package channel + +import "sync" + +// Each consumes all values and calls f for each of them. +// It blocks until source is closed +func Each[T any](source <-chan T, f func(T)) { + EachWithRunner(source, getDefaultRunner(), f) +} + +// Each consumes all values and calls f for each of them. +// It blocks until source is closed +func EachWithRunner[T any](source <-chan T, runner Runner, f func(T)) { + wg := &sync.WaitGroup{} + + for value := range source { + value := value + wg.Add(1) + runner.Run(func() { + defer wg.Done() + f(value) + }) + } + + wg.Wait() +} + +// EachSuccessive consumes all values and calls f for each of them. +// It blocks until source is closed +func EachSuccessive[T any](source <-chan T, f func(T)) { + for value := range source { + f(value) + } +} diff --git a/filter.go b/filter.go new file mode 100644 index 0000000..d99059d --- /dev/null +++ b/filter.go @@ -0,0 +1,39 @@ +package channel + +func FilterSuccessive[T any](source <-chan T, filter func(T) bool) <-chan T { + out := make(chan T, cap(source)) + + go func() { + defer close(out) + for value := range source { + if filter(value) { + out <- value + } + } + }() + + return out +} + +func Filter[T any](source <-chan T, filter func(T) bool) <-chan T { + return FilterPreserveOrderWithRunner(source, getDefaultRunner(), filter) +} + +func FilterPreserveOrderWithRunner[T any](source <-chan T, runner Runner, filter func(T) bool) <-chan T { + type FilteredValue[T any] struct { + Value T + Filter bool + } + + mappedValues := MapPreserveOrderWithRunner(source, runner, func(value T) FilteredValue[T] { + return FilteredValue[T]{Value: value, Filter: filter(value)} + }) + + filteredValues := FilterSuccessive(mappedValues, func(filteredValue FilteredValue[T]) bool { + return filteredValue.Filter + }) + + return MapSuccessive(filteredValues, func(filteredValue FilteredValue[T]) T { + return filteredValue.Value + }) +} diff --git a/find.go b/find.go new file mode 100644 index 0000000..006fdcc --- /dev/null +++ b/find.go @@ -0,0 +1,38 @@ +package channel + +import "context" + +func FindFirst[T any](source <-chan T) *T { + for v := range source { + return &v + } + return nil +} + +func FindFirstAndCancel[T any](source <-chan T, cancel context.CancelFunc) *T { + defer cancel() + for v := range source { + return &v + } + return nil +} + +func FindLast[T any](source <-chan T) *T { + var last *T = new(T) + found := false + + for v := range source { + *last = v + found = true + } + + if !found { + return nil + } + + return last +} + +func HasAny[T any](source <-chan T) bool { + return FindFirst(source) != nil +} diff --git a/flat.go b/flat.go new file mode 100644 index 0000000..5ce1f25 --- /dev/null +++ b/flat.go @@ -0,0 +1,46 @@ +package channel + +func FlatSlice[T any](source <-chan []T) <-chan T { + out := make(chan T, cap(source)) + + go func() { + defer close(out) + for slice := range source { + for _, v := range slice { + out <- v + } + } + }() + + return out +} + +func FlatMap[K comparable, V, T any](source <-chan map[K]V, unmapper func(key K, value V) T) <-chan T { + out := make(chan T, cap(source)) + + go func() { + defer close(out) + for slice := range source { + for k, v := range slice { + out <- unmapper(k, v) + } + } + }() + + return out +} + +func FlatChan[T any](source <-chan <-chan T) <-chan T { + out := make(chan T, cap(source)) + + go func() { + defer close(out) + for ch := range source { + for v := range ch { + out <- v + } + } + }() + + return out +} diff --git a/forward.go b/forward.go new file mode 100644 index 0000000..5808504 --- /dev/null +++ b/forward.go @@ -0,0 +1,11 @@ +package channel + +// Forward reads all values from all sources and sends them to target. +// It blocks until all values are forwarded and the out channel was closed. +// Use with go keyword for non-blocking behavior +func Forward[T any](target chan<- T, sources ...<-chan T) { + for value := range Merge(sources...) { + target <- value + } + close(target) +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..e3abc3d --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module git.tordarus.net/tordarus/channel + +go 1.23 diff --git a/group.go b/group.go new file mode 100644 index 0000000..4078259 --- /dev/null +++ b/group.go @@ -0,0 +1,94 @@ +package channel + +import ( + "time" +) + +// GroupByTime groups all incoming values from source using the grouper function +// and sends them to the returned channel after the given amount of time. +// This can useful for summing or averaging values from a channel on a fixed interval +func GroupByTime[T, G any](source <-chan T, duration time.Duration, grouper func(current G, value T) G) <-chan G { + out := make(chan G, cap(source)) + + go func() { + defer close(out) + + ticker := time.NewTicker(duration) + defer ticker.Stop() + + current := *new(G) + changed := false + for { + select { + case value, ok := <-source: + if !ok { + if changed { + out <- current + } + return + } + current = grouper(current, value) + changed = true + case <-ticker.C: + group := current + out <- group + current = *new(G) + changed = false + } + } + }() + + return out +} + +// GroupByAmount groups all incoming values from source using the grouper function +// and sends them to the returned channel after the given amount of values were grouped. +// This can useful for summing or averaging values from a channel for a given amount of values +func GroupByAmount[T, G any](source <-chan T, amount int, grouper func(current G, value T) G) <-chan G { + out := make(chan G, cap(source)) + + go func() { + defer close(out) + + current := *new(G) + currentAmount := 0 + for value := range source { + currentAmount++ + current = grouper(current, value) + + if currentAmount%amount == 0 { + group := current + out <- group + current = *new(G) + } + } + }() + + return out +} + +// GroupByValue groups all incoming values from source using the grouper function +// and sends them to the returned channel after valueFunc returns true for a given value. +// This can useful for summing or averaging values from a channel for a given amount of values +func GroupByValue[T, G any](source <-chan T, valueFunc func(T) bool, grouper func(current G, value T) G) <-chan G { + out := make(chan G, cap(source)) + + go func() { + defer close(out) + + current := *new(G) + currentAmount := 0 + for value := range source { + currentAmount++ + current = grouper(current, value) + + if valueFunc(value) { + group := current + out <- group + current = *new(G) + } + } + }() + + return out +} diff --git a/internal_stuff.go b/internal_stuff.go new file mode 100644 index 0000000..9ab9010 --- /dev/null +++ b/internal_stuff.go @@ -0,0 +1,12 @@ +package channel + +import "runtime" + +type mapEntry[K comparable, V any] struct { + Key K + Value V +} + +func getDefaultRunner() Runner { + return NewLimitedRunner(runtime.NumCPU()) +} diff --git a/limited_runner.go b/limited_runner.go new file mode 100644 index 0000000..c571649 --- /dev/null +++ b/limited_runner.go @@ -0,0 +1,28 @@ +package channel + +// LimitedRunner is a Runner which runs its methods +// in a pre-defined amount of routines +type LimitedRunner struct { + limiter chan struct{} +} + +var _ Runner = &LimitedRunner{} + +// NewLimitedRunner returns a new LimitedRunner with the given amount +// of allowed routines +func NewLimitedRunner(routineLimit int) *LimitedRunner { + return &LimitedRunner{ + limiter: make(chan struct{}, routineLimit), + } +} + +// Run blocks if the limit is currently exceeded. +// It blocks until a routine becomes available again. +// For non-blocking behavior, use go syntax +func (r *LimitedRunner) Run(f func()) { + r.limiter <- struct{}{} + go func() { + defer func() { <-r.limiter }() + f() + }() +} diff --git a/map.go b/map.go new file mode 100644 index 0000000..b993582 --- /dev/null +++ b/map.go @@ -0,0 +1,86 @@ +package channel + +import "sync" + +// MapPreserveOrder applies mapper to all I's coming from source and sends their return values to out while preserving input order. +// All mappings will be done as concurrently as possible using as many threads as there are CPU cores +func MapPreserveOrder[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) { + return MapPreserveOrderWithRunner(source, getDefaultRunner(), mapper) +} + +// MapPreserveOrderWithRunner behaves like MapPreserveOrder but uses runner to spawn its routines +func MapPreserveOrderWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O { + out := make(chan O, cap(source)) + outchannels := make(chan chan O, cap(source)) + + // start routine for each incoming value + go func(in <-chan I, outchannels chan chan O) { + defer close(outchannels) + for inputValue := range in { + inputValue := inputValue + outCh := make(chan O) + outchannels <- outCh + + runner.Run(func() { + defer close(outCh) + outCh <- mapper(inputValue) + }) + } + }(source, outchannels) + + // gather all results in incoming order + go func(out chan<- O, outchannels chan chan O) { + defer close(out) + for ch := range outchannels { + for outputValue := range ch { + out <- outputValue + } + } + }(out, outchannels) + + return out +} + +// Map applies mapper to all I's coming from source and sends their return values to out. +// All mappings will be done as concurrently as possible using as many threads as there are CPU cores +func Map[I, O any](source <-chan I, mapper func(I) O) <-chan O { + return MapWithRunner(source, getDefaultRunner(), mapper) +} + +// MapWithRunner behaves like Map but uses runner to spawn its routines +func MapWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O { + out := make(chan O, cap(source)) + + go func() { + defer close(out) + wg := &sync.WaitGroup{} + + for value := range source { + value := value + wg.Add(1) + runner.Run(func() { + defer wg.Done() + out <- mapper(value) + }) + } + + wg.Wait() + }() + + return out +} + +// MapSuccessive applies mapper to all I's coming from source and sends their return values to out while preserving input order. +// All mappings will be done successively in a single thread +func MapSuccessive[I, O any](source <-chan I, mapper func(I) O) <-chan O { + out := make(chan O, cap(source)) + + go func() { + defer close(out) + for value := range source { + out <- mapper(value) + } + }() + + return out +} diff --git a/merge.go b/merge.go new file mode 100644 index 0000000..a0a5661 --- /dev/null +++ b/merge.go @@ -0,0 +1,28 @@ +package channel + +import "sync" + +// Merge returns a channel in which all values of all incoming channels are sent to. +// The values will be sent in the same order as they are received +func Merge[T any](channels ...<-chan T) <-chan T { + out := make(chan T, determineBufferSize(channels)) + + var wg sync.WaitGroup + wg.Add(len(channels)) + + for _, ch := range channels { + go func(ch <-chan T) { + for v := range ch { + out <- v + } + wg.Done() + }(ch) + } + + go func() { + wg.Wait() + close(out) + }() + + return out +} diff --git a/of.go b/of.go new file mode 100644 index 0000000..e3a2f4c --- /dev/null +++ b/of.go @@ -0,0 +1,99 @@ +package channel + +import ( + "context" + "iter" + "time" +) + +// Of returns a channel containing all values +func Of[T any](values ...T) <-chan T { + return OfDelayed(0, values...) +} + +// OfDelayed behaves like Of but with a pre-defined delay between each value +func OfDelayed[T any](delay time.Duration, values ...T) <-chan T { + return OfDelayedFunc(func(value T) time.Duration { return delay }, values...) +} + +// OfDelayedFunc behaves like OfDelayed but accepts a function to determine the delay +func OfDelayedFunc[T any](delayFunc func(value T) time.Duration, values ...T) <-chan T { + out := make(chan T, len(values)) + + go func(out chan T, values []T) { + defer close(out) + for i, value := range values { + out <- value + if i < len(values)-1 { + time.Sleep(delayFunc(value)) + } + } + }(out, values) + + return out +} + +// OfFunc returns a channel containing the return values of successively calling f +// It closes the channel as soon as ctx is done +func OfFunc[T any](ctx context.Context, buffer int, f func() T) <-chan T { + out := make(chan T, buffer) + + go func() { + defer close(out) + + for ctx.Err() == nil { + select { + case out <- f(): + case <-ctx.Done(): + return + } + } + }() + + return out +} + +// OfMap returns a channel containing the return values of the unmapper function +// applied to any key-value pair in m +// The order is random +func OfMap[K comparable, V, T any](m map[K]V, unmapper func(K, V) T) <-chan T { + out := make(chan T, len(m)) + + go func() { + defer close(out) + for k, v := range m { + out <- unmapper(k, v) + } + }() + + return out +} + +// OfSeq returns a channel containing all values provided by the iterator +func OfSeq[T any](seq iter.Seq[T], buffer int) <-chan T { + out := make(chan T, buffer) + + go func() { + defer close(out) + for v := range seq { + out <- v + } + }() + + return out +} + +// OfSeq2 returns a channel containing the return values of the unmapper function +// when provided with the values of the iterator +func OfSeq2[K comparable, V, T any](seq iter.Seq2[K, V], buffer int, unmapper func(K, V) T) <-chan T { + out := make(chan T, buffer) + + go func() { + defer close(out) + for key, value := range seq { + out <- unmapper(key, value) + } + }() + + return out +} diff --git a/result.go b/result.go new file mode 100644 index 0000000..c5ab05d --- /dev/null +++ b/result.go @@ -0,0 +1,102 @@ +package channel + +type Result[T any] struct { + value *T + err error +} + +func ResultOf[T any](value T, err error) Result[T] { + if err != nil { + return Result[T]{value: nil, err: err} + } + + return Result[T]{value: &value, err: nil} +} + +func WrapResultOutputFunc[I, O any](f func(I) (O, error)) func(I) Result[O] { + return func(i I) Result[O] { return ResultOf(f(i)) } +} + +func WrapResultFunc[I, O any](f func(I) (O, error)) func(Result[I]) Result[O] { + resFunc := WrapResultOutputFunc(f) + nilValue := *new(O) + return func(r Result[I]) Result[O] { + v, err := r.Get() + if err != nil { + return ResultOf(nilValue, err) + } + return resFunc(v) + } +} + +func (r Result[T]) Success() bool { + return r.err == nil +} + +func (r Result[T]) Fail() bool { + return !r.Success() +} + +func (r Result[T]) GetOrDefault(defaultValue T) T { + if r.Fail() { + return defaultValue + } + + return *r.value +} + +func (r Result[T]) Get() (T, error) { + if r.err != nil { + return *new(T), r.err + } + return *r.value, r.err +} + +func (r Result[T]) GetUnsafe() T { + if r.err != nil { + panic(r.err) + } + return *r.value +} + +func (r Result[T]) Err() error { + return r.err +} + +func FilterSuccess[T any](source <-chan Result[T]) <-chan T { + succeeded := Filter(source, Result[T].Success) + + return MapSuccessive(succeeded, func(r Result[T]) T { + v, _ := r.Get() + return v + }) +} + +func FilterFail[T any](source <-chan Result[T]) <-chan T { + failed := Filter(source, Result[T].Fail) + + return MapSuccessive(failed, func(r Result[T]) T { + v, _ := r.Get() + return v + }) +} + +func FilterResults[T any](source <-chan Result[T]) (succeeded <-chan T, failed <-chan error) { + succ := make(chan T, cap(source)) + fail := make(chan error, cap(source)) + + go func() { + defer close(succ) + defer close(fail) + + for r := range source { + if r.Fail() { + fail <- r.Err() + continue + } + succ <- r.GetUnsafe() + } + }() + + return succ, fail +} diff --git a/runner.go b/runner.go new file mode 100644 index 0000000..7de9e36 --- /dev/null +++ b/runner.go @@ -0,0 +1,8 @@ +package channel + +// Runner is any runnable environment +type Runner interface { + // Run runs f in the Runners environment + // It might be blocking or non-blocking depending on Runners implementation + Run(f func()) +} diff --git a/tee.go b/tee.go new file mode 100644 index 0000000..60c5cd0 --- /dev/null +++ b/tee.go @@ -0,0 +1,37 @@ +package channel + +// Tee returns 2 channels which both receive all values from source. +// It's basically a copy function for channels +func Tee[T any](source <-chan T) (<-chan T, <-chan T) { + outs := TeeMany(source, 2) + return outs[0], outs[1] +} + +// TeeMany returns a given amount of channels which all receive all values from source. +// It's basically a copy function for channels +func TeeMany[T any](source <-chan T, amount int) []<-chan T { + outputs := make([]chan T, amount) + for i := range outputs { + outputs[i] = make(chan T, cap(source)) + } + + go func() { + defer func() { + for _, out := range outputs { + close(out) + } + }() + + for value := range source { + for _, out := range outputs { + out <- value + } + } + }() + + readOnlyOutputs := make([]<-chan T, 0, amount) + for _, out := range outputs { + readOnlyOutputs = append(readOnlyOutputs, out) + } + return readOnlyOutputs +} diff --git a/timeout.go b/timeout.go new file mode 100644 index 0000000..203955e --- /dev/null +++ b/timeout.go @@ -0,0 +1,30 @@ +package channel + +import "time" + +// CloseOnTimeout returns a channel which receives all values from the source. +// If no value was received in the given timeout duration, the returned channel will be closed. +// The input channel will not be closed. +func CloseOnTimeout[T any](source <-chan T, timeout time.Duration) <-chan T { + output := make(chan T, cap(source)) + + go func() { + defer close(output) + + for { + timer := time.NewTimer(timeout) + + select { + case value, ok := <-source: + if !ok { + return + } + output <- value + case <-timer.C: + return + } + } + }() + + return output +} diff --git a/to.go b/to.go new file mode 100644 index 0000000..86d8768 --- /dev/null +++ b/to.go @@ -0,0 +1,84 @@ +package channel + +import "container/list" + +// ToSlice returns a slice containing all values read from ch +func ToSlice[T any](ch <-chan T) []T { + s := make([]T, 0, cap(ch)) + EachSuccessive(ch, func(value T) { s = append(s, value) }) + return s +} + +// ToSliceContinuous returns a slice containing all values read from ch. +// The returned slice will be a pointer slice to a continuous block of memory. +// All values will be copied. +func ToSliceContinuous[T any](ch <-chan *T) []*T { + values := make([]T, 0, cap(ch)) + pointers := make([]*T, 0, cap(ch)) + EachSuccessive(ch, func(value *T) { + pointers = append(pointers, value) + + if value != nil { + values = append(values, *value) + } + }) + return pointers +} + +// ToSliceDeref returns a slice containing all values read from ch. +// The returned slice will be a dereferenced and continuous block of memory. +// Nil pointers are ignored. +func ToSliceDeref[T any](ch <-chan *T) []T { + s := make([]T, 0, cap(ch)) + EachSuccessive(ch, func(value *T) { + if value != nil { + s = append(s, *value) + } + }) + return s +} + +// ToList returns a list.List containing all values read from ch +func ToList[T any](ch <-chan T) *list.List { + l := list.New() + EachSuccessive(ch, func(value T) { l.PushBack(value) }) + return l +} + +// ToMap returns a map containing all values read from ch. +// The map key-value pairs are determined by f which will be called as concurrently as possible +// to build the resulting map +func ToMap[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V { + return ToMapWithRunner(ch, getDefaultRunner(), f) +} + +// ToMap returns a map containing all values read from ch. +// The map key-value pairs are determined by f which will be called as concurrently as possible +// to build the resulting map +func ToMapWithRunner[T any, K comparable, V any](ch <-chan T, runner Runner, f func(T) (K, V)) map[K]V { + map2entry := func(t T) mapEntry[K, V] { + k, v := f(t) + return mapEntry[K, V]{Key: k, Value: v} + } + + map2kv := func(e mapEntry[K, V]) (K, V) { return e.Key, e.Value } + + return ToMapSuccessive(MapWithRunner(ch, runner, map2entry), map2kv) +} + +// ToMapSuccessive returns a map containing all values read from ch. +// The map key-value pairs are determined by f +func ToMapSuccessive[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V { + m := map[K]V{} + EachSuccessive(ch, func(value T) { + k, v := f(value) + m[k] = v + }) + return m +} + +// ToStructMap returns a struct{} map containing all values read from ch as keys. +// It is a shorthand for ToMap(ch, func(value T) (T, struct{}) { return value, struct{}{} }) +func ToStructMap[T comparable](ch <-chan T) map[T]struct{} { + return ToMap(ch, func(value T) (T, struct{}) { return value, struct{}{} }) +} diff --git a/unlimited_runner.go b/unlimited_runner.go new file mode 100644 index 0000000..58e05c3 --- /dev/null +++ b/unlimited_runner.go @@ -0,0 +1,19 @@ +package channel + +// UnlimitedRunner is a Runner which runs each method +// in its own routine +type UnlimitedRunner struct { +} + +var _ Runner = &UnlimitedRunner{} + +// NewUnlimitedRunner returns a new LimitedRunner with the given amount +// of allowed routines +func NewUnlimitedRunner() *UnlimitedRunner { + return &UnlimitedRunner{} +} + +// Run always returns immediately +func (r *UnlimitedRunner) Run(f func()) { + go f() +} diff --git a/utils.go b/utils.go new file mode 100644 index 0000000..1bd3227 --- /dev/null +++ b/utils.go @@ -0,0 +1,15 @@ +package channel + +func determineBufferSize[T any](channels []<-chan T) int { + if len(channels) == 0 { + return 0 + } + + maxBufSize := 0 + for _, ch := range channels { + if cap(ch) > maxBufSize { + maxBufSize = cap(ch) + } + } + return maxBufSize +}