// Generated by `wit-bindgen-wrpc-go` 0.10.0. DO NOT EDIT! package handler import ( bytes "bytes" context "context" binary "encoding/binary" errors "errors" fmt "fmt" io "io" slog "log/slog" math "math" sync "sync" atomic "sync/atomic" wrpc "wrpc.io/go" ) type Req struct { Numbers wrpc.Receiver[[]uint64] Bytes io.ReadCloser } func (v *Req) String() string { return "Req" } func (v *Req) WriteToIndex(w wrpc.ByteWriter) (func(wrpc.IndexWriter) error, error) { writes := make(map[uint32]func(wrpc.IndexWriter) error, 2) slog.Debug("writing field", "name", "numbers") write0, err := func(v wrpc.Receiver[[]uint64], w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { slog.Debug("writing stream `stream::pending` status byte") if err := w.WriteByte(0); err != nil { return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) } return func(w wrpc.IndexWriter) (err error) { defer func() { slog.Debug("closing stream writer") if cErr := v.Close(); cErr != nil { if err == nil { err = fmt.Errorf("failed to close pending stream: %w", cErr) } else { slog.Warn("failed to close pending stream", "err", cErr) } } }() var wg sync.WaitGroup var wgErr atomic.Value var total uint32 for { var end bool slog.Debug("receiving outgoing pending stream contents") chunk, err := v.Receive() n := len(chunk) if n == 0 || err == io.EOF { end = true slog.Debug("outgoing pending stream reached EOF") } else if err != nil { return fmt.Errorf("failed to receive outgoing pending stream chunk: %w", err) } if n > math.MaxUint32 { return fmt.Errorf("outgoing pending stream chunk length of %d overflows a 32-bit integer", n) } if math.MaxUint32-uint32(n) < total { return errors.New("total outgoing pending stream element count would overflow a 32-bit unsigned integer") } slog.Debug("writing pending stream chunk length", "len", n) if err = wrpc.WriteUint32(uint32(n), w); err != nil { return fmt.Errorf("failed to write pending stream chunk length of %d: %w", n, err) } for _, v := range chunk { slog.Debug("writing pending stream element", "i", total) write, err := (func(wrpc.IndexWriter) error)(nil), func(v uint64, w io.Writer) (err error) { b := make([]byte, binary.MaxVarintLen64) i := binary.PutUvarint(b, uint64(v)) slog.Debug("writing u64") _, err = w.Write(b[:i]) return err }(v, w) if err != nil { return fmt.Errorf("failed to write pending stream chunk element %d: %w", total, err) } if write != nil { wg.Add(1) w, err := w.Index(total) if err != nil { return fmt.Errorf("failed to index nested stream writer: %w", err) } go func() { defer wg.Done() if err := write(w); err != nil { wgErr.Store(err) } }() } total++ } if end { if err := w.WriteByte(0); err != nil { return fmt.Errorf("failed to write pending stream end byte: %w", err) } wg.Wait() err := wgErr.Load() if err == nil { return nil } return err.(error) } } }, nil }(v.Numbers, w) if err != nil { return nil, fmt.Errorf("failed to write `numbers` field: %w", err) } if write0 != nil { writes[0] = write0 } slog.Debug("writing field", "name", "bytes") write1, err := func(v io.ReadCloser, w interface { io.ByteWriter io.Writer }) (write func(wrpc.IndexWriter) error, err error) { slog.Debug("writing byte stream `stream::pending` status byte") if err = w.WriteByte(0); err != nil { return nil, fmt.Errorf("failed to write `stream::pending` byte: %w", err) } return func(w wrpc.IndexWriter) (err error) { defer func() { slog.Debug("closing byte list stream writer") if cErr := v.Close(); cErr != nil { if err == nil { err = fmt.Errorf("failed to close pending byte stream: %w", cErr) } else { slog.Warn("failed to close pending byte stream", "err", cErr) } } }() chunk := make([]byte, 8096) for { var end bool slog.Debug("reading pending byte stream contents") n, err := v.Read(chunk) if err == io.EOF { end = true slog.Debug("pending byte stream reached EOF") } else if err != nil { return fmt.Errorf("failed to read pending byte stream chunk: %w", err) } if n > math.MaxUint32 { return fmt.Errorf("pending byte stream chunk length of %d overflows a 32-bit integer", n) } if n > 0 { slog.Debug("writing pending byte stream chunk length", "len", n) if err := wrpc.WriteUint32(uint32(n), w); err != nil { return fmt.Errorf("failed to write pending byte stream chunk length of %d: %w", n, err) } _, err = w.Write(chunk[:n]) if err != nil { return fmt.Errorf("failed to write pending byte stream chunk contents: %w", err) } } if end { if err := w.WriteByte(0); err != nil { return fmt.Errorf("failed to write pending byte stream end byte: %w", err) } return nil } } }, nil }(v.Bytes, w) if err != nil { return nil, fmt.Errorf("failed to write `bytes` field: %w", err) } if write1 != nil { writes[1] = write1 } if len(writes) > 0 { return func(w wrpc.IndexWriter) error { var wg sync.WaitGroup var wgErr atomic.Value for index, write := range writes { wg.Add(1) w, err := w.Index(index) if err != nil { return fmt.Errorf("failed to index nested record writer: %w", err) } write := write go func() { defer wg.Done() if err := write(w); err != nil { wgErr.Store(err) } }() } wg.Wait() err := wgErr.Load() if err == nil { return nil } return err.(error) }, nil } return nil, nil } func Echo(ctx__ context.Context, wrpc__ wrpc.Invoker, r *Req) (r0__ wrpc.Receiver[[]uint64], r1__ io.ReadCloser, writeErrs__ <-chan error, err__ error) { var buf__ bytes.Buffer var writeCount__ uint32 write0__, err__ := (r).WriteToIndex(&buf__) if err__ != nil { err__ = fmt.Errorf("failed to write `r` parameter: %w", err__) return } if write0__ != nil { writeCount__++ } writes__ := make(map[uint32]func(wrpc.IndexWriter) error, uint(writeCount__)) if write0__ != nil { writes__[0] = write0__ } var w__ wrpc.IndexWriteCloser var r__ wrpc.IndexReadCloser w__, r__, err__ = wrpc__.Invoke(ctx__, "wrpc-examples:streams/handler", "echo", buf__.Bytes(), wrpc.NewSubscribePath().Index(0), wrpc.NewSubscribePath().Index(1), ) if err__ != nil { err__ = fmt.Errorf("failed to invoke `echo`: %w", err__) return } defer func() { if err := r__.Close(); err != nil { slog.ErrorContext(ctx__, "failed to close reader", "instance", "wrpc-examples:streams/handler", "name", "echo", "err", err) } }() if writeCount__ > 0 { writeErrCh__ := make(chan error, uint(writeCount__)) writeErrs__ = writeErrCh__ var wg__ sync.WaitGroup for index, write := range writes__ { wg__.Add(1) w, err := w__.Index(index) if err != nil { if cErr := w__.Close(); cErr != nil { slog.DebugContext(ctx__, "failed to close outgoing stream", "instance", "wrpc-examples:streams/handler", "name", "echo", "err", cErr) } err__ = fmt.Errorf("failed to index param writer at index `%v`: %w", index, err) return } write := write go func() { defer wg__.Done() if err := write(w); err != nil { writeErrCh__ <- err } }() } go func() { wg__.Wait() close(writeErrCh__) }() } if cErr__ := w__.Close(); cErr__ != nil { slog.DebugContext(ctx__, "failed to close outgoing stream", "instance", "wrpc-examples:streams/handler", "name", "echo", "err", cErr__) } r0__, err__ = func(r wrpc.IndexReadCloser, path ...uint32) (wrpc.Receiver[[]uint64], error) { slog.Debug("reading stream status byte") status, err := r.ReadByte() if err != nil { return nil, fmt.Errorf("failed to read stream status byte: %w", err) } switch status { case 0: if len(path) > 0 { var err error r, err = r.Index(path...) if err != nil { return nil, fmt.Errorf("failed to index nested stream reader: %w", err) } } var total uint32 return wrpc.NewDecodeReceiver(r, func(r wrpc.IndexReadCloser) ([]uint64, error) { slog.Debug("reading pending stream chunk length") n, err := func(r io.ByteReader) (uint32, error) { var x uint32 var s uint8 for i := 0; i < 5; i++ { slog.Debug("reading u32 byte", "i", i) b, err := r.ReadByte() if err != nil { if i > 0 && err == io.EOF { err = io.ErrUnexpectedEOF } return x, fmt.Errorf("failed to read u32 byte: %w", err) } if s == 28 && b > 0x0f { return x, errors.New("varint overflows a 32-bit integer") } if b < 0x80 { return x | uint32(b)< 0 && err == io.EOF { err = io.ErrUnexpectedEOF } return x, fmt.Errorf("failed to read u64 byte: %w", err) } if s == 63 && b > 0x01 { return x, errors.New("varint overflows a 64-bit integer") } if b < 0x80 { return x | uint64(b)< 0 && err == io.EOF { err = io.ErrUnexpectedEOF } return nil, fmt.Errorf("failed to read list length byte: %w", err) } if s == 28 && b > 0x0f { return nil, errors.New("list length overflows a 32-bit integer") } if b < 0x80 { x = x | uint32(b)< 0 && err == io.EOF { err = io.ErrUnexpectedEOF } return x, fmt.Errorf("failed to read u64 byte: %w", err) } if s == 63 && b > 0x01 { return x, errors.New("varint overflows a 64-bit integer") } if b < 0x80 { return x | uint64(b)< 0 { var err error r, err = r.Index(path...) if err != nil { return nil, fmt.Errorf("failed to index nested byte stream reader: %w", err) } } return wrpc.NewByteStreamReader(r), nil case 1: slog.Debug("reading ready byte stream contents") buf, err := func(r interface { io.ByteReader io.Reader }) ([]byte, error) { var x uint32 var s uint for i := 0; i < 5; i++ { slog.Debug("reading byte list length", "i", i) b, err := r.ReadByte() if err != nil { if i > 0 && err == io.EOF { err = io.ErrUnexpectedEOF } return nil, fmt.Errorf("failed to read byte list length byte: %w", err) } if s == 28 && b > 0x0f { return nil, errors.New("byte list length overflows a 32-bit integer") } if b < 0x80 { x = x | uint32(b)<