initial commit
This commit is contained in:
		
							
								
								
									
										32
									
								
								chan_io.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										32
									
								
								chan_io.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,32 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"fmt"
 | 
			
		||||
	"io"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// WriteInto writes all given values into the channel ch.
 | 
			
		||||
// Is is a shorthand for Forward(ch, AsChan(values...))
 | 
			
		||||
func WriteInto[T any](ch chan<- T, values ...T) {
 | 
			
		||||
	Forward(ch, Of(values...))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WriteIntoDelayed writes all given values into the channel ch.
 | 
			
		||||
// It sleeps after every write for the given amount of time.
 | 
			
		||||
// It is a shorthand for Forward(ch, AsChanDelayed(time, values...))
 | 
			
		||||
func WriteIntoDelayed[T any](ch chan<- T, delay time.Duration, values ...T) {
 | 
			
		||||
	Forward(ch, OfDelayed(delay, values...))
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// WriteIntoWriter reads all values from ch and writes them via fmt.Fprintln to all writers
 | 
			
		||||
func WriteIntoWriter[T any](ch <-chan T, writers ...io.Writer) {
 | 
			
		||||
	w := io.MultiWriter(writers...)
 | 
			
		||||
	EachSuccessive(ch, func(value T) {
 | 
			
		||||
		if err, ok := any(value).(error); ok {
 | 
			
		||||
			fmt.Fprintln(w, err.Error())
 | 
			
		||||
			return
 | 
			
		||||
		}
 | 
			
		||||
		fmt.Fprintln(w, value)
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										34
									
								
								each.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										34
									
								
								each.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,34 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
import "sync"
 | 
			
		||||
 | 
			
		||||
// Each consumes all values and calls f for each of them.
 | 
			
		||||
// It blocks until source is closed
 | 
			
		||||
func Each[T any](source <-chan T, f func(T)) {
 | 
			
		||||
	EachWithRunner(source, getDefaultRunner(), f)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Each consumes all values and calls f for each of them.
 | 
			
		||||
// It blocks until source is closed
 | 
			
		||||
func EachWithRunner[T any](source <-chan T, runner Runner, f func(T)) {
 | 
			
		||||
	wg := &sync.WaitGroup{}
 | 
			
		||||
 | 
			
		||||
	for value := range source {
 | 
			
		||||
		value := value
 | 
			
		||||
		wg.Add(1)
 | 
			
		||||
		runner.Run(func() {
 | 
			
		||||
			defer wg.Done()
 | 
			
		||||
			f(value)
 | 
			
		||||
		})
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	wg.Wait()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// EachSuccessive consumes all values and calls f for each of them.
 | 
			
		||||
// It blocks until source is closed
 | 
			
		||||
func EachSuccessive[T any](source <-chan T, f func(T)) {
 | 
			
		||||
	for value := range source {
 | 
			
		||||
		f(value)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										39
									
								
								filter.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										39
									
								
								filter.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,39 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
func FilterSuccessive[T any](source <-chan T, filter func(T) bool) <-chan T {
 | 
			
		||||
	out := make(chan T, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		for value := range source {
 | 
			
		||||
			if filter(value) {
 | 
			
		||||
				out <- value
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func Filter[T any](source <-chan T, filter func(T) bool) <-chan T {
 | 
			
		||||
	return FilterPreserveOrderWithRunner(source, getDefaultRunner(), filter)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func FilterPreserveOrderWithRunner[T any](source <-chan T, runner Runner, filter func(T) bool) <-chan T {
 | 
			
		||||
	type FilteredValue[T any] struct {
 | 
			
		||||
		Value  T
 | 
			
		||||
		Filter bool
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	mappedValues := MapPreserveOrderWithRunner(source, runner, func(value T) FilteredValue[T] {
 | 
			
		||||
		return FilteredValue[T]{Value: value, Filter: filter(value)}
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	filteredValues := FilterSuccessive(mappedValues, func(filteredValue FilteredValue[T]) bool {
 | 
			
		||||
		return filteredValue.Filter
 | 
			
		||||
	})
 | 
			
		||||
 | 
			
		||||
	return MapSuccessive(filteredValues, func(filteredValue FilteredValue[T]) T {
 | 
			
		||||
		return filteredValue.Value
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										38
									
								
								find.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								find.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,38 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
import "context"
 | 
			
		||||
 | 
			
		||||
func FindFirst[T any](source <-chan T) *T {
 | 
			
		||||
	for v := range source {
 | 
			
		||||
		return &v
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func FindFirstAndCancel[T any](source <-chan T, cancel context.CancelFunc) *T {
 | 
			
		||||
	defer cancel()
 | 
			
		||||
	for v := range source {
 | 
			
		||||
		return &v
 | 
			
		||||
	}
 | 
			
		||||
	return nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func FindLast[T any](source <-chan T) *T {
 | 
			
		||||
	var last *T = new(T)
 | 
			
		||||
	found := false
 | 
			
		||||
 | 
			
		||||
	for v := range source {
 | 
			
		||||
		*last = v
 | 
			
		||||
		found = true
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	if !found {
 | 
			
		||||
		return nil
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return last
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func HasAny[T any](source <-chan T) bool {
 | 
			
		||||
	return FindFirst(source) != nil
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										46
									
								
								flat.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										46
									
								
								flat.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,46 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
func FlatSlice[T any](source <-chan []T) <-chan T {
 | 
			
		||||
	out := make(chan T, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		for slice := range source {
 | 
			
		||||
			for _, v := range slice {
 | 
			
		||||
				out <- v
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func FlatMap[K comparable, V, T any](source <-chan map[K]V, unmapper func(key K, value V) T) <-chan T {
 | 
			
		||||
	out := make(chan T, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		for slice := range source {
 | 
			
		||||
			for k, v := range slice {
 | 
			
		||||
				out <- unmapper(k, v)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func FlatChan[T any](source <-chan <-chan T) <-chan T {
 | 
			
		||||
	out := make(chan T, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		for ch := range source {
 | 
			
		||||
			for v := range ch {
 | 
			
		||||
				out <- v
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										11
									
								
								forward.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								forward.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,11 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
// Forward reads all values from all sources and sends them to target.
 | 
			
		||||
// It blocks until all values are forwarded and the out channel was closed.
 | 
			
		||||
// Use with go keyword for non-blocking behavior
 | 
			
		||||
func Forward[T any](target chan<- T, sources ...<-chan T) {
 | 
			
		||||
	for value := range Merge(sources...) {
 | 
			
		||||
		target <- value
 | 
			
		||||
	}
 | 
			
		||||
	close(target)
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										94
									
								
								group.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										94
									
								
								group.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,94 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// GroupByTime groups all incoming values from source using the grouper function
 | 
			
		||||
// and sends them to the returned channel after the given amount of time.
 | 
			
		||||
// This can useful for summing or averaging values from a channel on a fixed interval
 | 
			
		||||
func GroupByTime[T, G any](source <-chan T, duration time.Duration, grouper func(current G, value T) G) <-chan G {
 | 
			
		||||
	out := make(chan G, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
 | 
			
		||||
		ticker := time.NewTicker(duration)
 | 
			
		||||
		defer ticker.Stop()
 | 
			
		||||
 | 
			
		||||
		current := *new(G)
 | 
			
		||||
		changed := false
 | 
			
		||||
		for {
 | 
			
		||||
			select {
 | 
			
		||||
			case value, ok := <-source:
 | 
			
		||||
				if !ok {
 | 
			
		||||
					if changed {
 | 
			
		||||
						out <- current
 | 
			
		||||
					}
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				current = grouper(current, value)
 | 
			
		||||
				changed = true
 | 
			
		||||
			case <-ticker.C:
 | 
			
		||||
				group := current
 | 
			
		||||
				out <- group
 | 
			
		||||
				current = *new(G)
 | 
			
		||||
				changed = false
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GroupByAmount groups all incoming values from source using the grouper function
 | 
			
		||||
// and sends them to the returned channel after the given amount of values were grouped.
 | 
			
		||||
// This can useful for summing or averaging values from a channel for a given amount of values
 | 
			
		||||
func GroupByAmount[T, G any](source <-chan T, amount int, grouper func(current G, value T) G) <-chan G {
 | 
			
		||||
	out := make(chan G, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
 | 
			
		||||
		current := *new(G)
 | 
			
		||||
		currentAmount := 0
 | 
			
		||||
		for value := range source {
 | 
			
		||||
			currentAmount++
 | 
			
		||||
			current = grouper(current, value)
 | 
			
		||||
 | 
			
		||||
			if currentAmount%amount == 0 {
 | 
			
		||||
				group := current
 | 
			
		||||
				out <- group
 | 
			
		||||
				current = *new(G)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// GroupByValue groups all incoming values from source using the grouper function
 | 
			
		||||
// and sends them to the returned channel after valueFunc returns true for a given value.
 | 
			
		||||
// This can useful for summing or averaging values from a channel for a given amount of values
 | 
			
		||||
func GroupByValue[T, G any](source <-chan T, valueFunc func(T) bool, grouper func(current G, value T) G) <-chan G {
 | 
			
		||||
	out := make(chan G, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
 | 
			
		||||
		current := *new(G)
 | 
			
		||||
		currentAmount := 0
 | 
			
		||||
		for value := range source {
 | 
			
		||||
			currentAmount++
 | 
			
		||||
			current = grouper(current, value)
 | 
			
		||||
 | 
			
		||||
			if valueFunc(value) {
 | 
			
		||||
				group := current
 | 
			
		||||
				out <- group
 | 
			
		||||
				current = *new(G)
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										12
									
								
								internal_stuff.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										12
									
								
								internal_stuff.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,12 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
import "runtime"
 | 
			
		||||
 | 
			
		||||
type mapEntry[K comparable, V any] struct {
 | 
			
		||||
	Key   K
 | 
			
		||||
	Value V
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func getDefaultRunner() Runner {
 | 
			
		||||
	return NewLimitedRunner(runtime.NumCPU())
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										28
									
								
								limited_runner.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								limited_runner.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,28 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
// LimitedRunner is a Runner which runs its methods
 | 
			
		||||
// in a pre-defined amount of routines
 | 
			
		||||
type LimitedRunner struct {
 | 
			
		||||
	limiter chan struct{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ Runner = &LimitedRunner{}
 | 
			
		||||
 | 
			
		||||
// NewLimitedRunner returns a new LimitedRunner with the given amount
 | 
			
		||||
// of allowed routines
 | 
			
		||||
func NewLimitedRunner(routineLimit int) *LimitedRunner {
 | 
			
		||||
	return &LimitedRunner{
 | 
			
		||||
		limiter: make(chan struct{}, routineLimit),
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Run blocks if the limit is currently exceeded.
 | 
			
		||||
// It blocks until a routine becomes available again.
 | 
			
		||||
// For non-blocking behavior, use go syntax
 | 
			
		||||
func (r *LimitedRunner) Run(f func()) {
 | 
			
		||||
	r.limiter <- struct{}{}
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer func() { <-r.limiter }()
 | 
			
		||||
		f()
 | 
			
		||||
	}()
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										86
									
								
								map.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										86
									
								
								map.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,86 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
import "sync"
 | 
			
		||||
 | 
			
		||||
// MapPreserveOrder applies mapper to all I's coming from source and sends their return values to out while preserving input order.
 | 
			
		||||
// All mappings will be done as concurrently as possible using as many threads as there are CPU cores
 | 
			
		||||
func MapPreserveOrder[I, O any](source <-chan I, mapper func(I) O) (out <-chan O) {
 | 
			
		||||
	return MapPreserveOrderWithRunner(source, getDefaultRunner(), mapper)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MapPreserveOrderWithRunner behaves like MapPreserveOrder but uses runner to spawn its routines
 | 
			
		||||
func MapPreserveOrderWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O {
 | 
			
		||||
	out := make(chan O, cap(source))
 | 
			
		||||
	outchannels := make(chan chan O, cap(source))
 | 
			
		||||
 | 
			
		||||
	// start routine for each incoming value
 | 
			
		||||
	go func(in <-chan I, outchannels chan chan O) {
 | 
			
		||||
		defer close(outchannels)
 | 
			
		||||
		for inputValue := range in {
 | 
			
		||||
			inputValue := inputValue
 | 
			
		||||
			outCh := make(chan O)
 | 
			
		||||
			outchannels <- outCh
 | 
			
		||||
 | 
			
		||||
			runner.Run(func() {
 | 
			
		||||
				defer close(outCh)
 | 
			
		||||
				outCh <- mapper(inputValue)
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
	}(source, outchannels)
 | 
			
		||||
 | 
			
		||||
	// gather all results in incoming order
 | 
			
		||||
	go func(out chan<- O, outchannels chan chan O) {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		for ch := range outchannels {
 | 
			
		||||
			for outputValue := range ch {
 | 
			
		||||
				out <- outputValue
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}(out, outchannels)
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Map applies mapper to all I's coming from source and sends their return values to out.
 | 
			
		||||
// All mappings will be done as concurrently as possible using as many threads as there are CPU cores
 | 
			
		||||
func Map[I, O any](source <-chan I, mapper func(I) O) <-chan O {
 | 
			
		||||
	return MapWithRunner(source, getDefaultRunner(), mapper)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MapWithRunner behaves like Map but uses runner to spawn its routines
 | 
			
		||||
func MapWithRunner[I, O any](source <-chan I, runner Runner, mapper func(I) O) <-chan O {
 | 
			
		||||
	out := make(chan O, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		wg := &sync.WaitGroup{}
 | 
			
		||||
 | 
			
		||||
		for value := range source {
 | 
			
		||||
			value := value
 | 
			
		||||
			wg.Add(1)
 | 
			
		||||
			runner.Run(func() {
 | 
			
		||||
				defer wg.Done()
 | 
			
		||||
				out <- mapper(value)
 | 
			
		||||
			})
 | 
			
		||||
		}
 | 
			
		||||
 | 
			
		||||
		wg.Wait()
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// MapSuccessive applies mapper to all I's coming from source and sends their return values to out while preserving input order.
 | 
			
		||||
// All mappings will be done successively in a single thread
 | 
			
		||||
func MapSuccessive[I, O any](source <-chan I, mapper func(I) O) <-chan O {
 | 
			
		||||
	out := make(chan O, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		for value := range source {
 | 
			
		||||
			out <- mapper(value)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										28
									
								
								merge.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										28
									
								
								merge.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,28 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
import "sync"
 | 
			
		||||
 | 
			
		||||
// Merge returns a channel in which all values of all incoming channels are sent to.
 | 
			
		||||
// The values will be sent in the same order as they are received
 | 
			
		||||
func Merge[T any](channels ...<-chan T) <-chan T {
 | 
			
		||||
	out := make(chan T, determineBufferSize(channels))
 | 
			
		||||
 | 
			
		||||
	var wg sync.WaitGroup
 | 
			
		||||
	wg.Add(len(channels))
 | 
			
		||||
 | 
			
		||||
	for _, ch := range channels {
 | 
			
		||||
		go func(ch <-chan T) {
 | 
			
		||||
			for v := range ch {
 | 
			
		||||
				out <- v
 | 
			
		||||
			}
 | 
			
		||||
			wg.Done()
 | 
			
		||||
		}(ch)
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		wg.Wait()
 | 
			
		||||
		close(out)
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										99
									
								
								of.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										99
									
								
								of.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,99 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
import (
 | 
			
		||||
	"context"
 | 
			
		||||
	"iter"
 | 
			
		||||
	"time"
 | 
			
		||||
)
 | 
			
		||||
 | 
			
		||||
// Of returns a channel containing all values
 | 
			
		||||
func Of[T any](values ...T) <-chan T {
 | 
			
		||||
	return OfDelayed(0, values...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OfDelayed behaves like Of but with a pre-defined delay between each value
 | 
			
		||||
func OfDelayed[T any](delay time.Duration, values ...T) <-chan T {
 | 
			
		||||
	return OfDelayedFunc(func(value T) time.Duration { return delay }, values...)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OfDelayedFunc behaves like OfDelayed but accepts a function to determine the delay
 | 
			
		||||
func OfDelayedFunc[T any](delayFunc func(value T) time.Duration, values ...T) <-chan T {
 | 
			
		||||
	out := make(chan T, len(values))
 | 
			
		||||
 | 
			
		||||
	go func(out chan T, values []T) {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		for i, value := range values {
 | 
			
		||||
			out <- value
 | 
			
		||||
			if i < len(values)-1 {
 | 
			
		||||
				time.Sleep(delayFunc(value))
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}(out, values)
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OfFunc returns a channel containing the return values of successively calling f
 | 
			
		||||
// It closes the channel as soon as ctx is done
 | 
			
		||||
func OfFunc[T any](ctx context.Context, buffer int, f func() T) <-chan T {
 | 
			
		||||
	out := make(chan T, buffer)
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
 | 
			
		||||
		for ctx.Err() == nil {
 | 
			
		||||
			select {
 | 
			
		||||
			case out <- f():
 | 
			
		||||
			case <-ctx.Done():
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OfMap returns a channel containing the return values of the unmapper function
 | 
			
		||||
// applied to any key-value pair in m
 | 
			
		||||
// The order is random
 | 
			
		||||
func OfMap[K comparable, V, T any](m map[K]V, unmapper func(K, V) T) <-chan T {
 | 
			
		||||
	out := make(chan T, len(m))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		for k, v := range m {
 | 
			
		||||
			out <- unmapper(k, v)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OfSeq returns a channel containing all values provided by the iterator
 | 
			
		||||
func OfSeq[T any](seq iter.Seq[T], buffer int) <-chan T {
 | 
			
		||||
	out := make(chan T, buffer)
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		for v := range seq {
 | 
			
		||||
			out <- v
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// OfSeq2 returns a channel containing the return values of the unmapper function
 | 
			
		||||
// when provided with the values of the iterator
 | 
			
		||||
func OfSeq2[K comparable, V, T any](seq iter.Seq2[K, V], buffer int, unmapper func(K, V) T) <-chan T {
 | 
			
		||||
	out := make(chan T, buffer)
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(out)
 | 
			
		||||
		for key, value := range seq {
 | 
			
		||||
			out <- unmapper(key, value)
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return out
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										102
									
								
								result.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										102
									
								
								result.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,102 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
type Result[T any] struct {
 | 
			
		||||
	value *T
 | 
			
		||||
	err   error
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func ResultOf[T any](value T, err error) Result[T] {
 | 
			
		||||
	if err != nil {
 | 
			
		||||
		return Result[T]{value: nil, err: err}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return Result[T]{value: &value, err: nil}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func WrapResultOutputFunc[I, O any](f func(I) (O, error)) func(I) Result[O] {
 | 
			
		||||
	return func(i I) Result[O] { return ResultOf(f(i)) }
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func WrapResultFunc[I, O any](f func(I) (O, error)) func(Result[I]) Result[O] {
 | 
			
		||||
	resFunc := WrapResultOutputFunc(f)
 | 
			
		||||
	nilValue := *new(O)
 | 
			
		||||
	return func(r Result[I]) Result[O] {
 | 
			
		||||
		v, err := r.Get()
 | 
			
		||||
		if err != nil {
 | 
			
		||||
			return ResultOf(nilValue, err)
 | 
			
		||||
		}
 | 
			
		||||
		return resFunc(v)
 | 
			
		||||
	}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r Result[T]) Success() bool {
 | 
			
		||||
	return r.err == nil
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r Result[T]) Fail() bool {
 | 
			
		||||
	return !r.Success()
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r Result[T]) GetOrDefault(defaultValue T) T {
 | 
			
		||||
	if r.Fail() {
 | 
			
		||||
		return defaultValue
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	return *r.value
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r Result[T]) Get() (T, error) {
 | 
			
		||||
	if r.err != nil {
 | 
			
		||||
		return *new(T), r.err
 | 
			
		||||
	}
 | 
			
		||||
	return *r.value, r.err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r Result[T]) GetUnsafe() T {
 | 
			
		||||
	if r.err != nil {
 | 
			
		||||
		panic(r.err)
 | 
			
		||||
	}
 | 
			
		||||
	return *r.value
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func (r Result[T]) Err() error {
 | 
			
		||||
	return r.err
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func FilterSuccess[T any](source <-chan Result[T]) <-chan T {
 | 
			
		||||
	succeeded := Filter(source, Result[T].Success)
 | 
			
		||||
 | 
			
		||||
	return MapSuccessive(succeeded, func(r Result[T]) T {
 | 
			
		||||
		v, _ := r.Get()
 | 
			
		||||
		return v
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func FilterFail[T any](source <-chan Result[T]) <-chan T {
 | 
			
		||||
	failed := Filter(source, Result[T].Fail)
 | 
			
		||||
 | 
			
		||||
	return MapSuccessive(failed, func(r Result[T]) T {
 | 
			
		||||
		v, _ := r.Get()
 | 
			
		||||
		return v
 | 
			
		||||
	})
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
func FilterResults[T any](source <-chan Result[T]) (succeeded <-chan T, failed <-chan error) {
 | 
			
		||||
	succ := make(chan T, cap(source))
 | 
			
		||||
	fail := make(chan error, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(succ)
 | 
			
		||||
		defer close(fail)
 | 
			
		||||
 | 
			
		||||
		for r := range source {
 | 
			
		||||
			if r.Fail() {
 | 
			
		||||
				fail <- r.Err()
 | 
			
		||||
				continue
 | 
			
		||||
			}
 | 
			
		||||
			succ <- r.GetUnsafe()
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return succ, fail
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										8
									
								
								runner.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										8
									
								
								runner.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,8 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
// Runner is any runnable environment
 | 
			
		||||
type Runner interface {
 | 
			
		||||
	// Run runs f in the Runners environment
 | 
			
		||||
	// It might be blocking or non-blocking depending on Runners implementation
 | 
			
		||||
	Run(f func())
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										37
									
								
								tee.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										37
									
								
								tee.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,37 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
// Tee returns 2 channels which both receive all values from source.
 | 
			
		||||
// It's basically a copy function for channels
 | 
			
		||||
func Tee[T any](source <-chan T) (<-chan T, <-chan T) {
 | 
			
		||||
	outs := TeeMany(source, 2)
 | 
			
		||||
	return outs[0], outs[1]
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// TeeMany returns a given amount of channels which all receive all values from source.
 | 
			
		||||
// It's basically a copy function for channels
 | 
			
		||||
func TeeMany[T any](source <-chan T, amount int) []<-chan T {
 | 
			
		||||
	outputs := make([]chan T, amount)
 | 
			
		||||
	for i := range outputs {
 | 
			
		||||
		outputs[i] = make(chan T, cap(source))
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer func() {
 | 
			
		||||
			for _, out := range outputs {
 | 
			
		||||
				close(out)
 | 
			
		||||
			}
 | 
			
		||||
		}()
 | 
			
		||||
 | 
			
		||||
		for value := range source {
 | 
			
		||||
			for _, out := range outputs {
 | 
			
		||||
				out <- value
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	readOnlyOutputs := make([]<-chan T, 0, amount)
 | 
			
		||||
	for _, out := range outputs {
 | 
			
		||||
		readOnlyOutputs = append(readOnlyOutputs, out)
 | 
			
		||||
	}
 | 
			
		||||
	return readOnlyOutputs
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										30
									
								
								timeout.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										30
									
								
								timeout.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,30 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
import "time"
 | 
			
		||||
 | 
			
		||||
// CloseOnTimeout returns a channel which receives all values from the source.
 | 
			
		||||
// If no value was received in the given timeout duration, the returned channel will be closed.
 | 
			
		||||
// The input channel will not be closed.
 | 
			
		||||
func CloseOnTimeout[T any](source <-chan T, timeout time.Duration) <-chan T {
 | 
			
		||||
	output := make(chan T, cap(source))
 | 
			
		||||
 | 
			
		||||
	go func() {
 | 
			
		||||
		defer close(output)
 | 
			
		||||
 | 
			
		||||
		for {
 | 
			
		||||
			timer := time.NewTimer(timeout)
 | 
			
		||||
 | 
			
		||||
			select {
 | 
			
		||||
			case value, ok := <-source:
 | 
			
		||||
				if !ok {
 | 
			
		||||
					return
 | 
			
		||||
				}
 | 
			
		||||
				output <- value
 | 
			
		||||
			case <-timer.C:
 | 
			
		||||
				return
 | 
			
		||||
			}
 | 
			
		||||
		}
 | 
			
		||||
	}()
 | 
			
		||||
 | 
			
		||||
	return output
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										84
									
								
								to.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										84
									
								
								to.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,84 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
import "container/list"
 | 
			
		||||
 | 
			
		||||
// ToSlice returns a slice containing all values read from ch
 | 
			
		||||
func ToSlice[T any](ch <-chan T) []T {
 | 
			
		||||
	s := make([]T, 0, cap(ch))
 | 
			
		||||
	EachSuccessive(ch, func(value T) { s = append(s, value) })
 | 
			
		||||
	return s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToSliceContinuous returns a slice containing all values read from ch.
 | 
			
		||||
// The returned slice will be a pointer slice to a continuous block of memory.
 | 
			
		||||
// All values will be copied.
 | 
			
		||||
func ToSliceContinuous[T any](ch <-chan *T) []*T {
 | 
			
		||||
	values := make([]T, 0, cap(ch))
 | 
			
		||||
	pointers := make([]*T, 0, cap(ch))
 | 
			
		||||
	EachSuccessive(ch, func(value *T) {
 | 
			
		||||
		pointers = append(pointers, value)
 | 
			
		||||
 | 
			
		||||
		if value != nil {
 | 
			
		||||
			values = append(values, *value)
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
	return pointers
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToSliceDeref returns a slice containing all values read from ch.
 | 
			
		||||
// The returned slice will be a dereferenced and continuous block of memory.
 | 
			
		||||
// Nil pointers are ignored.
 | 
			
		||||
func ToSliceDeref[T any](ch <-chan *T) []T {
 | 
			
		||||
	s := make([]T, 0, cap(ch))
 | 
			
		||||
	EachSuccessive(ch, func(value *T) {
 | 
			
		||||
		if value != nil {
 | 
			
		||||
			s = append(s, *value)
 | 
			
		||||
		}
 | 
			
		||||
	})
 | 
			
		||||
	return s
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToList returns a list.List containing all values read from ch
 | 
			
		||||
func ToList[T any](ch <-chan T) *list.List {
 | 
			
		||||
	l := list.New()
 | 
			
		||||
	EachSuccessive(ch, func(value T) { l.PushBack(value) })
 | 
			
		||||
	return l
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToMap returns a map containing all values read from ch.
 | 
			
		||||
// The map key-value pairs are determined by f which will be called as concurrently as possible
 | 
			
		||||
// to build the resulting map
 | 
			
		||||
func ToMap[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V {
 | 
			
		||||
	return ToMapWithRunner(ch, getDefaultRunner(), f)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToMap returns a map containing all values read from ch.
 | 
			
		||||
// The map key-value pairs are determined by f which will be called as concurrently as possible
 | 
			
		||||
// to build the resulting map
 | 
			
		||||
func ToMapWithRunner[T any, K comparable, V any](ch <-chan T, runner Runner, f func(T) (K, V)) map[K]V {
 | 
			
		||||
	map2entry := func(t T) mapEntry[K, V] {
 | 
			
		||||
		k, v := f(t)
 | 
			
		||||
		return mapEntry[K, V]{Key: k, Value: v}
 | 
			
		||||
	}
 | 
			
		||||
 | 
			
		||||
	map2kv := func(e mapEntry[K, V]) (K, V) { return e.Key, e.Value }
 | 
			
		||||
 | 
			
		||||
	return ToMapSuccessive(MapWithRunner(ch, runner, map2entry), map2kv)
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToMapSuccessive returns a map containing all values read from ch.
 | 
			
		||||
// The map key-value pairs are determined by f
 | 
			
		||||
func ToMapSuccessive[T any, K comparable, V any](ch <-chan T, f func(T) (K, V)) map[K]V {
 | 
			
		||||
	m := map[K]V{}
 | 
			
		||||
	EachSuccessive(ch, func(value T) {
 | 
			
		||||
		k, v := f(value)
 | 
			
		||||
		m[k] = v
 | 
			
		||||
	})
 | 
			
		||||
	return m
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// ToStructMap returns a struct{} map containing all values read from ch as keys.
 | 
			
		||||
// It is a shorthand for ToMap(ch, func(value T) (T, struct{}) { return value, struct{}{} })
 | 
			
		||||
func ToStructMap[T comparable](ch <-chan T) map[T]struct{} {
 | 
			
		||||
	return ToMap(ch, func(value T) (T, struct{}) { return value, struct{}{} })
 | 
			
		||||
}
 | 
			
		||||
							
								
								
									
										19
									
								
								unlimited_runner.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										19
									
								
								unlimited_runner.go
									
									
									
									
									
										Normal file
									
								
							@ -0,0 +1,19 @@
 | 
			
		||||
package channel
 | 
			
		||||
 | 
			
		||||
// UnlimitedRunner is a Runner which runs each method
 | 
			
		||||
// in its own routine
 | 
			
		||||
type UnlimitedRunner struct {
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
var _ Runner = &UnlimitedRunner{}
 | 
			
		||||
 | 
			
		||||
// NewUnlimitedRunner returns a new LimitedRunner with the given amount
 | 
			
		||||
// of allowed routines
 | 
			
		||||
func NewUnlimitedRunner() *UnlimitedRunner {
 | 
			
		||||
	return &UnlimitedRunner{}
 | 
			
		||||
}
 | 
			
		||||
 | 
			
		||||
// Run always returns immediately
 | 
			
		||||
func (r *UnlimitedRunner) Run(f func()) {
 | 
			
		||||
	go f()
 | 
			
		||||
}
 | 
			
		||||
		Reference in New Issue
	
	Block a user