Files
channel/filter.go

50 lines
1.2 KiB
Go

package channel
func FilterChanges[T any](source <-chan T, cmp EqualityComparator[T]) <-chan T {
oldValue := new(T)
oldValue = nil
return FilterSuccessive(source, func(newValue T) bool {
res := oldValue == nil || !cmp(*oldValue, newValue)
oldValue = &newValue
return res
})
}
func FilterSuccessive[T any](source <-chan T, filter FilterFunc[T]) <-chan T {
out := make(chan T, cap(source))
go func() {
defer close(out)
for value := range source {
if filter(value) {
out <- value
}
}
}()
return out
}
func Filter[T any](source <-chan T, filter FilterFunc[T]) <-chan T {
return FilterPreserveOrderWithRunner(source, getDefaultRunner(), filter)
}
func FilterPreserveOrderWithRunner[T any](source <-chan T, runner Runner, filter FilterFunc[T]) <-chan T {
type FilteredValue[T any] struct {
Value T
Filter bool
}
mappedValues := MapPreserveOrderWithRunner(source, runner, func(value T) FilteredValue[T] {
return FilteredValue[T]{Value: value, Filter: filter(value)}
})
filteredValues := FilterSuccessive(mappedValues, func(filteredValue FilteredValue[T]) bool {
return filteredValue.Filter
})
return MapSuccessive(filteredValues, func(filteredValue FilteredValue[T]) T {
return filteredValue.Value
})
}