package niri import ( "bufio" "bytes" "context" "encoding/json" "errors" "io" "net" "os" "sync" ) func notNull(c *Client) bool { return c != nil } func parseEntryAsClient(entry os.DirEntry) *Client { if entry.IsDir() { return nil } client, err := GetClientBySocketPath(entry.Name()) if err != nil { return nil } return client } func readSocket[T any](socket string, body io.Reader) Response[T] { r, err := readSocketRaw(socket, body) if err != nil { return errResponse[T](err) } defer r.Close() value := new(Response[T]) if err := json.NewDecoder(r).Decode(value); err != nil { return errResponse[T](err) } return *value } func subscribeSocket[T any](ctx context.Context, socket string, body io.Reader) (<-chan T, error) { ch, err := readSocketGeneric[T](ctx, socket, body) if err != nil { return nil, err } out := make(chan T, 10) go func() { defer close(out) for { select { case event := <-ch: out <- event case <-ctx.Done(): return } } }() return out, nil } func readSocketGeneric[T any](ctx context.Context, socket string, body io.Reader) (<-chan T, error) { r, err := readSocketRaw(socket, body) if err != nil { return nil, err } out := make(chan T, 10) go func() { defer close(out) defer r.Close() dec := json.NewDecoder(r) for ctx.Err() == nil { value := new(T) if err := dec.Decode(value); err != nil { if errors.Is(err, io.EOF) { return } else { // JSON parsing errors //fmt.Println(err) continue } } out <- *value } }() return out, nil } func readSocketRaw(socket string, body io.Reader) (io.ReadCloser, error) { conn, err := net.Dial("unix", socket) if err != nil { return nil, err } if _, err := io.Copy(conn, body); err != nil { conn.Close() return nil, err } return conn, nil } func asJsonReader(v any) io.Reader { data, _ := json.Marshal(v) return bytes.NewReader(append(data, '\n')) } func reencodeJson[T any](data any) (*T, error) { r, w := io.Pipe() defer w.Close() defer r.Close() wbuf := bufio.NewWriter(w) rbuf := bufio.NewReader(r) wg := &sync.WaitGroup{} value := new(T) var encErr error wg.Go(func() { encErr = json.NewEncoder(wbuf).Encode(data); wbuf.Flush() }) var decErr error wg.Go(func() { decErr = json.NewDecoder(rbuf).Decode(value) }) wg.Wait() if encErr != nil { return nil, encErr } else if decErr != nil { return nil, decErr } return value, nil }