package niri import ( "bytes" "context" "encoding/json" "errors" "io" "net" "os" ) func notNull(c *Client) bool { return c != nil } func parseEntryAsClient(entry os.DirEntry) *Client { if entry.IsDir() { return nil } client, err := GetClientBySocketPath(entry.Name()) if err != nil { return nil } return client } func readSocket[T any](socket string, body io.Reader) Response[T] { r, err := readSocketRaw(socket, body) if err != nil { return errResponse[T](err) } defer r.Close() value := new(Response[T]) if err := json.NewDecoder(r).Decode(value); err != nil { return errResponse[T](err) } return *value } func subscribeSocket[T any](ctx context.Context, socket string, body io.Reader) (<-chan T, error) { ch, err := readSocketGeneric[T](ctx, socket, body) if err != nil { return nil, err } out := make(chan T, 10) go func() { defer close(out) for { select { case event := <-ch: out <- event case <-ctx.Done(): return } } }() return out, nil } func readSocketGeneric[T any](ctx context.Context, socket string, body io.Reader) (<-chan T, error) { r, err := readSocketRaw(socket, body) if err != nil { return nil, err } out := make(chan T, 10) go func() { defer close(out) defer r.Close() for ctx.Err() == nil { value := new(T) if err := json.NewDecoder(r).Decode(value); err != nil { if errors.Is(err, io.EOF) { return } else { continue } } out <- *value } }() return out, nil } func readSocketRaw(socket string, body io.Reader) (io.ReadCloser, error) { conn, err := net.Dial("unix", socket) if err != nil { return nil, err } if _, err := io.Copy(conn, body); err != nil { conn.Close() return nil, err } return conn, nil } func asJsonReader(v any) io.Reader { data, _ := json.Marshal(v) return bytes.NewReader(append(data, '\n')) }