From 38cc052835a54b9cce78fb1b437038b264598c0c Mon Sep 17 00:00:00 2001 From: Louis Thibault Date: Wed, 29 Mar 2023 18:51:34 -0400 Subject: [PATCH] Remove iostream. --- Makefile | 11 +- api/iostream.capnp | 17 - internal/api/iostream/iostream.capnp.go | 512 ------------------------ pkg/iostream/iostream.go | 210 ---------- pkg/iostream/iostream_test.go | 107 ----- 5 files changed, 2 insertions(+), 855 deletions(-) delete mode 100644 api/iostream.capnp delete mode 100644 internal/api/iostream/iostream.capnp.go delete mode 100644 pkg/iostream/iostream.go delete mode 100644 pkg/iostream/iostream_test.go diff --git a/Makefile b/Makefile index a8f6552b..73546b2b 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ all: capnp clean: clean-capnp clean-mocks -capnp: capnp-anchor capnp-pubsub capnp-cluster capnp-channel capnp-process capnp-iostream +capnp: capnp-anchor capnp-pubsub capnp-cluster capnp-channel capnp-process # N.B.: compiling capnp schemas requires having capnproto.org/go/capnp/v3 installed # on the GOPATH. @@ -34,12 +34,8 @@ capnp-process: @mkdir -p internal/api/process @capnp compile -I$(GOPATH)/src/capnproto.org/go/capnp/std -ogo:internal/api/process --src-prefix=api api/process.capnp -capnp-iostream: - @mkdir -p internal/api/iostream - @capnp compile -I$(GOPATH)/src/capnproto.org/go/capnp/std -ogo:internal/api/iostream --src-prefix=api api/iostream.capnp - -clean-capnp: clean-capnp-anchor clean-capnp-pubsub clean-capnp-cluster clean-capnp-channel clean-capnp-process clean-capnp-iostream clean-capnp-wasm +clean-capnp: clean-capnp-anchor clean-capnp-pubsub clean-capnp-cluster clean-capnp-channel clean-capnp-process clean-capnp-wasm clean-capnp-anchor: @rm -rf internal/api/anchor @@ -56,9 +52,6 @@ clean-capnp-channel: clean-capnp-process: @rm -rf internal/api/process -clean-capnp-iostream: - @rm -rf internal/api/iostream - mocks: clean-mocks # This roundabout call to 'go generate' allows us to: diff --git a/api/iostream.capnp b/api/iostream.capnp deleted file mode 100644 index 22528dac..00000000 --- a/api/iostream.capnp +++ /dev/null @@ -1,17 +0,0 @@ -using Go = import "/go.capnp"; - -@0x89c985e63e991441; - -$Go.package("iostream"); -$Go.import("github.com/wetware/ww/internal/api/iostream"); - -using Chan = import "channel.capnp"; - - -# Stream can send bytes to a remote vat. -interface Stream extends(Chan.SendCloser(Data)) {} - -# Provider can make a Stream available to a remote vat. -interface Provider { - provide @0 (stream :Stream); -} diff --git a/internal/api/iostream/iostream.capnp.go b/internal/api/iostream/iostream.capnp.go deleted file mode 100644 index 0d5af85e..00000000 --- a/internal/api/iostream/iostream.capnp.go +++ /dev/null @@ -1,512 +0,0 @@ -// Code generated by capnpc-go. DO NOT EDIT. - -package iostream - -import ( - capnp "capnproto.org/go/capnp/v3" - text "capnproto.org/go/capnp/v3/encoding/text" - fc "capnproto.org/go/capnp/v3/flowcontrol" - schemas "capnproto.org/go/capnp/v3/schemas" - server "capnproto.org/go/capnp/v3/server" - stream "capnproto.org/go/capnp/v3/std/capnp/stream" - context "context" - fmt "fmt" - channel "github.com/wetware/ww/internal/api/channel" -) - -type Stream capnp.Client - -// Stream_TypeID is the unique identifier for the type Stream. -const Stream_TypeID = 0x800fee1ed6b441e2 - -func (c Stream) Send(ctx context.Context, params func(channel.Sender_send_Params) error) (stream.StreamResult_Future, capnp.ReleaseFunc) { - s := capnp.Send{ - Method: capnp.Method{ - InterfaceID: 0xe8bbed1438ea16ee, - MethodID: 0, - InterfaceName: "channel.capnp:Sender", - MethodName: "send", - }, - } - if params != nil { - s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} - s.PlaceArgs = func(s capnp.Struct) error { return params(channel.Sender_send_Params(s)) } - } - ans, release := capnp.Client(c).SendCall(ctx, s) - return stream.StreamResult_Future{Future: ans.Future()}, release -} -func (c Stream) Close(ctx context.Context, params func(channel.Closer_close_Params) error) (channel.Closer_close_Results_Future, capnp.ReleaseFunc) { - s := capnp.Send{ - Method: capnp.Method{ - InterfaceID: 0xfad0e4b80d3779c3, - MethodID: 0, - InterfaceName: "channel.capnp:Closer", - MethodName: "close", - }, - } - if params != nil { - s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} - s.PlaceArgs = func(s capnp.Struct) error { return params(channel.Closer_close_Params(s)) } - } - ans, release := capnp.Client(c).SendCall(ctx, s) - return channel.Closer_close_Results_Future{Future: ans.Future()}, release -} - -// String returns a string that identifies this capability for debugging -// purposes. Its format should not be depended on: in particular, it -// should not be used to compare clients. Use IsSame to compare clients -// for equality. -func (c Stream) String() string { - return fmt.Sprintf("%T(%v)", c, capnp.Client(c)) -} - -// AddRef creates a new Client that refers to the same capability as c. -// If c is nil or has resolved to null, then AddRef returns nil. -func (c Stream) AddRef() Stream { - return Stream(capnp.Client(c).AddRef()) -} - -// Release releases a capability reference. If this is the last -// reference to the capability, then the underlying resources associated -// with the capability will be released. -// -// Release will panic if c has already been released, but not if c is -// nil or resolved to null. -func (c Stream) Release() { - capnp.Client(c).Release() -} - -// Resolve blocks until the capability is fully resolved or the Context -// expires. -func (c Stream) Resolve(ctx context.Context) error { - return capnp.Client(c).Resolve(ctx) -} - -func (c Stream) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Client(c).EncodeAsPtr(seg) -} - -func (Stream) DecodeFromPtr(p capnp.Ptr) Stream { - return Stream(capnp.Client{}.DecodeFromPtr(p)) -} - -// IsValid reports whether c is a valid reference to a capability. -// A reference is invalid if it is nil, has resolved to null, or has -// been released. -func (c Stream) IsValid() bool { - return capnp.Client(c).IsValid() -} - -// IsSame reports whether c and other refer to a capability created by the -// same call to NewClient. This can return false negatives if c or other -// are not fully resolved: use Resolve if this is an issue. If either -// c or other are released, then IsSame panics. -func (c Stream) IsSame(other Stream) bool { - return capnp.Client(c).IsSame(capnp.Client(other)) -} - -// Update the flowcontrol.FlowLimiter used to manage flow control for -// this client. This affects all future calls, but not calls already -// waiting to send. Passing nil sets the value to flowcontrol.NopLimiter, -// which is also the default. -func (c Stream) SetFlowLimiter(lim fc.FlowLimiter) { - capnp.Client(c).SetFlowLimiter(lim) -} - -// Get the current flowcontrol.FlowLimiter used to manage flow control -// for this client. -func (c Stream) GetFlowLimiter() fc.FlowLimiter { - return capnp.Client(c).GetFlowLimiter() -} // A Stream_Server is a Stream with a local implementation. -type Stream_Server interface { - Send(context.Context, channel.Sender_send) error - - Close(context.Context, channel.Closer_close) error -} - -// Stream_NewServer creates a new Server from an implementation of Stream_Server. -func Stream_NewServer(s Stream_Server) *server.Server { - c, _ := s.(server.Shutdowner) - return server.New(Stream_Methods(nil, s), s, c) -} - -// Stream_ServerToClient creates a new Client from an implementation of Stream_Server. -// The caller is responsible for calling Release on the returned Client. -func Stream_ServerToClient(s Stream_Server) Stream { - return Stream(capnp.NewClient(Stream_NewServer(s))) -} - -// Stream_Methods appends Methods to a slice that invoke the methods on s. -// This can be used to create a more complicated Server. -func Stream_Methods(methods []server.Method, s Stream_Server) []server.Method { - if cap(methods) == 0 { - methods = make([]server.Method, 0, 2) - } - - methods = append(methods, server.Method{ - Method: capnp.Method{ - InterfaceID: 0xe8bbed1438ea16ee, - MethodID: 0, - InterfaceName: "channel.capnp:Sender", - MethodName: "send", - }, - Impl: func(ctx context.Context, call *server.Call) error { - return s.Send(ctx, channel.Sender_send{call}) - }, - }) - - methods = append(methods, server.Method{ - Method: capnp.Method{ - InterfaceID: 0xfad0e4b80d3779c3, - MethodID: 0, - InterfaceName: "channel.capnp:Closer", - MethodName: "close", - }, - Impl: func(ctx context.Context, call *server.Call) error { - return s.Close(ctx, channel.Closer_close{call}) - }, - }) - - return methods -} - -// Stream_List is a list of Stream. -type Stream_List = capnp.CapList[Stream] - -// NewStream creates a new list of Stream. -func NewStream_List(s *capnp.Segment, sz int32) (Stream_List, error) { - l, err := capnp.NewPointerList(s, sz) - return capnp.CapList[Stream](l), err -} - -type Provider capnp.Client - -// Provider_TypeID is the unique identifier for the type Provider. -const Provider_TypeID = 0xec225e7f00ef3b55 - -func (c Provider) Provide(ctx context.Context, params func(Provider_provide_Params) error) (Provider_provide_Results_Future, capnp.ReleaseFunc) { - s := capnp.Send{ - Method: capnp.Method{ - InterfaceID: 0xec225e7f00ef3b55, - MethodID: 0, - InterfaceName: "iostream.capnp:Provider", - MethodName: "provide", - }, - } - if params != nil { - s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 1} - s.PlaceArgs = func(s capnp.Struct) error { return params(Provider_provide_Params(s)) } - } - ans, release := capnp.Client(c).SendCall(ctx, s) - return Provider_provide_Results_Future{Future: ans.Future()}, release -} - -// String returns a string that identifies this capability for debugging -// purposes. Its format should not be depended on: in particular, it -// should not be used to compare clients. Use IsSame to compare clients -// for equality. -func (c Provider) String() string { - return fmt.Sprintf("%T(%v)", c, capnp.Client(c)) -} - -// AddRef creates a new Client that refers to the same capability as c. -// If c is nil or has resolved to null, then AddRef returns nil. -func (c Provider) AddRef() Provider { - return Provider(capnp.Client(c).AddRef()) -} - -// Release releases a capability reference. If this is the last -// reference to the capability, then the underlying resources associated -// with the capability will be released. -// -// Release will panic if c has already been released, but not if c is -// nil or resolved to null. -func (c Provider) Release() { - capnp.Client(c).Release() -} - -// Resolve blocks until the capability is fully resolved or the Context -// expires. -func (c Provider) Resolve(ctx context.Context) error { - return capnp.Client(c).Resolve(ctx) -} - -func (c Provider) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Client(c).EncodeAsPtr(seg) -} - -func (Provider) DecodeFromPtr(p capnp.Ptr) Provider { - return Provider(capnp.Client{}.DecodeFromPtr(p)) -} - -// IsValid reports whether c is a valid reference to a capability. -// A reference is invalid if it is nil, has resolved to null, or has -// been released. -func (c Provider) IsValid() bool { - return capnp.Client(c).IsValid() -} - -// IsSame reports whether c and other refer to a capability created by the -// same call to NewClient. This can return false negatives if c or other -// are not fully resolved: use Resolve if this is an issue. If either -// c or other are released, then IsSame panics. -func (c Provider) IsSame(other Provider) bool { - return capnp.Client(c).IsSame(capnp.Client(other)) -} - -// Update the flowcontrol.FlowLimiter used to manage flow control for -// this client. This affects all future calls, but not calls already -// waiting to send. Passing nil sets the value to flowcontrol.NopLimiter, -// which is also the default. -func (c Provider) SetFlowLimiter(lim fc.FlowLimiter) { - capnp.Client(c).SetFlowLimiter(lim) -} - -// Get the current flowcontrol.FlowLimiter used to manage flow control -// for this client. -func (c Provider) GetFlowLimiter() fc.FlowLimiter { - return capnp.Client(c).GetFlowLimiter() -} // A Provider_Server is a Provider with a local implementation. -type Provider_Server interface { - Provide(context.Context, Provider_provide) error -} - -// Provider_NewServer creates a new Server from an implementation of Provider_Server. -func Provider_NewServer(s Provider_Server) *server.Server { - c, _ := s.(server.Shutdowner) - return server.New(Provider_Methods(nil, s), s, c) -} - -// Provider_ServerToClient creates a new Client from an implementation of Provider_Server. -// The caller is responsible for calling Release on the returned Client. -func Provider_ServerToClient(s Provider_Server) Provider { - return Provider(capnp.NewClient(Provider_NewServer(s))) -} - -// Provider_Methods appends Methods to a slice that invoke the methods on s. -// This can be used to create a more complicated Server. -func Provider_Methods(methods []server.Method, s Provider_Server) []server.Method { - if cap(methods) == 0 { - methods = make([]server.Method, 0, 1) - } - - methods = append(methods, server.Method{ - Method: capnp.Method{ - InterfaceID: 0xec225e7f00ef3b55, - MethodID: 0, - InterfaceName: "iostream.capnp:Provider", - MethodName: "provide", - }, - Impl: func(ctx context.Context, call *server.Call) error { - return s.Provide(ctx, Provider_provide{call}) - }, - }) - - return methods -} - -// Provider_provide holds the state for a server call to Provider.provide. -// See server.Call for documentation. -type Provider_provide struct { - *server.Call -} - -// Args returns the call's arguments. -func (c Provider_provide) Args() Provider_provide_Params { - return Provider_provide_Params(c.Call.Args()) -} - -// AllocResults allocates the results struct. -func (c Provider_provide) AllocResults() (Provider_provide_Results, error) { - r, err := c.Call.AllocResults(capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Provider_provide_Results(r), err -} - -// Provider_List is a list of Provider. -type Provider_List = capnp.CapList[Provider] - -// NewProvider creates a new list of Provider. -func NewProvider_List(s *capnp.Segment, sz int32) (Provider_List, error) { - l, err := capnp.NewPointerList(s, sz) - return capnp.CapList[Provider](l), err -} - -type Provider_provide_Params capnp.Struct - -// Provider_provide_Params_TypeID is the unique identifier for the type Provider_provide_Params. -const Provider_provide_Params_TypeID = 0xbc8b7aa049d95800 - -func NewProvider_provide_Params(s *capnp.Segment) (Provider_provide_Params, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}) - return Provider_provide_Params(st), err -} - -func NewRootProvider_provide_Params(s *capnp.Segment) (Provider_provide_Params, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}) - return Provider_provide_Params(st), err -} - -func ReadRootProvider_provide_Params(msg *capnp.Message) (Provider_provide_Params, error) { - root, err := msg.Root() - return Provider_provide_Params(root.Struct()), err -} - -func (s Provider_provide_Params) String() string { - str, _ := text.Marshal(0xbc8b7aa049d95800, capnp.Struct(s)) - return str -} - -func (s Provider_provide_Params) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Struct(s).EncodeAsPtr(seg) -} - -func (Provider_provide_Params) DecodeFromPtr(p capnp.Ptr) Provider_provide_Params { - return Provider_provide_Params(capnp.Struct{}.DecodeFromPtr(p)) -} - -func (s Provider_provide_Params) ToPtr() capnp.Ptr { - return capnp.Struct(s).ToPtr() -} -func (s Provider_provide_Params) IsValid() bool { - return capnp.Struct(s).IsValid() -} - -func (s Provider_provide_Params) Message() *capnp.Message { - return capnp.Struct(s).Message() -} - -func (s Provider_provide_Params) Segment() *capnp.Segment { - return capnp.Struct(s).Segment() -} -func (s Provider_provide_Params) Stream() Stream { - p, _ := capnp.Struct(s).Ptr(0) - return Stream(p.Interface().Client()) -} - -func (s Provider_provide_Params) HasStream() bool { - return capnp.Struct(s).HasPtr(0) -} - -func (s Provider_provide_Params) SetStream(v Stream) error { - if !v.IsValid() { - return capnp.Struct(s).SetPtr(0, capnp.Ptr{}) - } - seg := s.Segment() - in := capnp.NewInterface(seg, seg.Message().AddCap(capnp.Client(v))) - return capnp.Struct(s).SetPtr(0, in.ToPtr()) -} - -// Provider_provide_Params_List is a list of Provider_provide_Params. -type Provider_provide_Params_List = capnp.StructList[Provider_provide_Params] - -// NewProvider_provide_Params creates a new list of Provider_provide_Params. -func NewProvider_provide_Params_List(s *capnp.Segment, sz int32) (Provider_provide_Params_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}, sz) - return capnp.StructList[Provider_provide_Params](l), err -} - -// Provider_provide_Params_Future is a wrapper for a Provider_provide_Params promised by a client call. -type Provider_provide_Params_Future struct{ *capnp.Future } - -func (f Provider_provide_Params_Future) Struct() (Provider_provide_Params, error) { - p, err := f.Future.Ptr() - return Provider_provide_Params(p.Struct()), err -} -func (p Provider_provide_Params_Future) Stream() Stream { - return Stream(p.Future.Field(0, nil).Client()) -} - -type Provider_provide_Results capnp.Struct - -// Provider_provide_Results_TypeID is the unique identifier for the type Provider_provide_Results. -const Provider_provide_Results_TypeID = 0xfcf105dc8b5ae862 - -func NewProvider_provide_Results(s *capnp.Segment) (Provider_provide_Results, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Provider_provide_Results(st), err -} - -func NewRootProvider_provide_Results(s *capnp.Segment) (Provider_provide_Results, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Provider_provide_Results(st), err -} - -func ReadRootProvider_provide_Results(msg *capnp.Message) (Provider_provide_Results, error) { - root, err := msg.Root() - return Provider_provide_Results(root.Struct()), err -} - -func (s Provider_provide_Results) String() string { - str, _ := text.Marshal(0xfcf105dc8b5ae862, capnp.Struct(s)) - return str -} - -func (s Provider_provide_Results) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Struct(s).EncodeAsPtr(seg) -} - -func (Provider_provide_Results) DecodeFromPtr(p capnp.Ptr) Provider_provide_Results { - return Provider_provide_Results(capnp.Struct{}.DecodeFromPtr(p)) -} - -func (s Provider_provide_Results) ToPtr() capnp.Ptr { - return capnp.Struct(s).ToPtr() -} -func (s Provider_provide_Results) IsValid() bool { - return capnp.Struct(s).IsValid() -} - -func (s Provider_provide_Results) Message() *capnp.Message { - return capnp.Struct(s).Message() -} - -func (s Provider_provide_Results) Segment() *capnp.Segment { - return capnp.Struct(s).Segment() -} - -// Provider_provide_Results_List is a list of Provider_provide_Results. -type Provider_provide_Results_List = capnp.StructList[Provider_provide_Results] - -// NewProvider_provide_Results creates a new list of Provider_provide_Results. -func NewProvider_provide_Results_List(s *capnp.Segment, sz int32) (Provider_provide_Results_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}, sz) - return capnp.StructList[Provider_provide_Results](l), err -} - -// Provider_provide_Results_Future is a wrapper for a Provider_provide_Results promised by a client call. -type Provider_provide_Results_Future struct{ *capnp.Future } - -func (f Provider_provide_Results_Future) Struct() (Provider_provide_Results, error) { - p, err := f.Future.Ptr() - return Provider_provide_Results(p.Struct()), err -} - -const schema_89c985e63e991441 = "x\xdat\xd0?HBQ\x14\x06\xf0\xef\xbcs\x9f\xaf" + - "E\xe4r\x0d\x9b\x12\xa25I\xa2\xc5 \xff,\x11D" + - "\xbc[\x04\xd5\x10\xbc\xcaA\xc8\x94\xf7,(\x10kh" + - "\x08\x89\xe6\x9aZ\"Z\xa3\xb1\xa9\xad\xb1%\x08\x9a\x1a" + - "\"!\x8a\xa6\x96\xe8\xc53Q\x11\xda\xce\xbd\x1c~\xf7" + - "\xbb\xdfh?eD2|\x1c\x82\xa1g\xcc\x90\xff\x9c" + - "\xbd~\x18|\x8f\xecAF\xd8\xcfFO&_\x0e\xee" + - "\x0e\x01R#|\xa5\xc6\xd9\x02T\x92-\x95\xe4\x18\xf0" + - "\xb3\xf88}\xb6[\xbf\x91\x03\x04\x98d\x01c\x9aS" + - "\x04RK\x9c\x06}-L|\xd4V\x86\xdez\xa5\x1d" + - "\xbeU\xfbM\xa9\xcaS\xea<\x98\xfc\xd5\xd7\xe5\xfa\x93" + - "\xf9\xf9\x8d\xa6%\x02\xea\x88s\x84K\xbfP\xf2*n" + - "\xde)Rb\xcd)o\x96S\xf3\xf1\xe6\xd1&\xb2\xd9" + - "\xd4\x82\xc8\x9fmD\xab\xa7\xf7\x17\x0ddHR\\\x0b" + - "\xa3\xeb\x0a\x90\x14\x0b\xb6\x82u\xa20\x8c6\xc8-\xd0" + - "vK\xdb\x85\xf5\xbc\x9b(\xff\x0d\xc3i\xdbq\x9d\xa2" + - "\xa7\x05\x0b@\x10 \xc3)@\xf71\xe9\xa8A\xe9V" + - "\x1c\xd9\xa9\x0aD\x12\xd4\x86\x8d\x1e\x186\x91\x16lv" + - "\x0a\xeb\xfa\xae\xcc\xc1\x90\xa6Uk=\x9e!\x9b:\x92" + - "\xf8/\xe2\\\xde\xdb\xda\xa8x\xf8\x0d\x00\x00\xff\xff\xb0" + - "=\x85\xba" - -func init() { - schemas.Register(schema_89c985e63e991441, - 0x800fee1ed6b441e2, - 0xbc8b7aa049d95800, - 0xec225e7f00ef3b55, - 0xfcf105dc8b5ae862) -} diff --git a/pkg/iostream/iostream.go b/pkg/iostream/iostream.go deleted file mode 100644 index 78dc48ab..00000000 --- a/pkg/iostream/iostream.go +++ /dev/null @@ -1,210 +0,0 @@ -package iostream - -import ( - "bytes" - "context" - "errors" - "io" - "unsafe" - - "capnproto.org/go/capnp/v3" - casm "github.com/wetware/casm/pkg" - chan_api "github.com/wetware/ww/internal/api/channel" - "github.com/wetware/ww/internal/api/iostream" - "github.com/wetware/ww/pkg/csp" -) - -var ErrClosed = errors.New("closed") - -// Provider is the read end of a Unix byte-stream. It works by -// setting a StreamWriter as a callback, which is invoked whenever -// new data becomes available. -type Provider iostream.Provider - -func (p Provider) AddRef() Provider { - return Provider(capnp.Client(p).AddRef()) -} - -func (p Provider) Release() { - capnp.Client(p).Release() -} - -// NewProvider wraps r in a StreamReader and sets the supplied policy. -// If r implements io.Closer, it will be called automatically when -// the returned StreamReader shuts down. -func NewProvider(r io.Reader) Provider { - sr := &sreader{Reader: r} - return Provider(iostream.Provider_ServerToClient(sr)) -} - -// Provide assigns the supplied StreamWriter as the destination for -// incoming bytes in the stream. The dst parameter is effectively -// a callback. Only the first call to Provide will be honored, and -// any subsequent calls will return an error. This includes calls -// made by remote vats. -// -// Calling SetWriter transfers ownership of w to p, and w will be -// closed when p is closed, or the underlying stream fails. Thus, -// one SHOULD NOT call any of w's methods after SetWriter returns. -// -// Callers SHOULD enforce the following invariant on dst: after a -// call to Provide returns, all references to dst are owned by p. -// In practice, callers MAY relax this invariant when either: -// -// (a) References not owned by p are released before the future -// returned by Provide() is resolved. -// -// (b) The consumer behind 'dst' does not distinguish between -// normal and erroneous stream termination. -func (p Provider) Provide(ctx context.Context, s Stream) (casm.Future, capnp.ReleaseFunc) { - stream := func(ps iostream.Provider_provide_Params) error { - return ps.SetStream(iostream.Stream(s)) - } - - f, release := iostream.Provider(p).Provide(ctx, stream) - return casm.Future(f), release -} - -// Stream is the write end of a Unix byte-stream. It provides -// push semantics for transmitting streams of abitrary bytes. -// It is important to note that Stream MAY arbitrarily segment -// bytes. Applications MAY implement their own framing. -type Stream iostream.Provider - -func (s Stream) AddRef() Stream { - return Stream(capnp.Client(s).AddRef()) -} - -func (s Stream) Release() { - capnp.Client(s).Release() -} - -// New wraps the supplied WriteCloser in a StreamWriter. -// Callers MUST call the returned StreamWriter's Close() method -// before releasing the client, to signal graceful termination. -// If the StreamWriter is released before a call to Close returns, -// the downstream consumer SHALL interpret this as ErrUnexpectedEOF. -// -// If w implements io.Closer, it will be closed before the call to -// StreamWriter.Close() resolves, or after the last client reference -// is released, whichever comes first. -func New(w io.Writer) Stream { - s := &swriter{Writer: w} - return Stream(iostream.Stream_ServerToClient(s)) -} - -// Write the bytes to the underlying stream. Contrary to Go's io.Write, -// s.Write will return after all bytes have been written to the stream, -// or an error occurs (whichever happens first). -func (s Stream) Write(ctx context.Context, b []byte) (casm.Future, capnp.ReleaseFunc) { - f, release := iostream.Stream(s).Send(ctx, csp.Data(b)) - return casm.Future(f), release -} - -// WriteString is a convenience method that casts data to bytes before -// calling Write. -func (s Stream) WriteString(ctx context.Context, data string) (casm.Future, capnp.ReleaseFunc) { - return s.Write(ctx, *(*[]byte)(unsafe.Pointer(&data))) -} - -// Close the underlying stream, signalling successful termination to any -// downstream consumers. Close MUST be called when terminating, even if -// a previous write has failed. We may relax this rule in the future. -func (s Stream) Close(ctx context.Context) error { - f, release := iostream.Stream(s).Close(ctx, nil) - defer release() - - _, err := f.Struct() - return err -} - -// Writer returns an io.Writer translates calls to its Write() method -// into calls to s.Write(). The supplied context is implicitly passed -// to all s.Write() calls. Callers MAY implement per-write timeouts by -// repeatedly calling s.Writer() with a fresh context. -func (s Stream) Writer(ctx context.Context) io.Writer { - return writerFunc(func(b []byte) (int, error) { - f, release := s.Write(ctx, b) - defer release() - - return len(b), f.Await(ctx) - }) -} - -/* - Server implementations -*/ - -// sreader is the server type for StreamReader. -type sreader struct { - io.Reader -} - -func (p *sreader) Shutdown() { - if c, ok := p.Reader.(io.Closer); ok { - _ = c.Close() - } -} - -func (p *sreader) Provide(ctx context.Context, call iostream.Provider_provide) (err error) { - callback := Stream(call.Args().Stream()) - - // stream terminated gracefully? - if err = stream(callback.Writer(ctx), p); err == nil { - err = callback.Close(ctx) - } - - return err -} - -// swriter is the server type for StreamWriter. It wraps an io.Writer and -// exports a Send method, thereby satisfying the channel.Sender capability -// interface. -type swriter struct { - closed bool - io.Writer -} - -func (s *swriter) Shutdown() { _ = s.close() } - -func (s *swriter) Send(_ context.Context, call chan_api.Sender_send) error { - if s.closed { - return ErrClosed - } - - ptr, err := call.Args().Value() - if err == nil { - // Don't close the underlying writer here. Certain writer implementations, - // such as net.Conn, may produce temporary errors. - err = stream(s, bytes.NewReader(ptr.Data())) - } - - return err -} - -func (s *swriter) Close(context.Context, chan_api.Closer_close) error { - s.closed = true - return s.close() -} - -func (s *swriter) close() (err error) { - if c, ok := s.Writer.(io.Closer); ok { - err = c.Close() - s.Writer = nil // DEFENSIVE: prevent writer from being closed twice - } - - return -} - -func stream(w io.Writer, r io.Reader) error { - _, err := io.Copy(w, r) - return err -} - -// writerFunc is a function type that implements io.Writer. -type writerFunc func([]byte) (int, error) - -// Write calls the function with b as its argument. -func (write writerFunc) Write(b []byte) (int, error) { - return write(b) -} diff --git a/pkg/iostream/iostream_test.go b/pkg/iostream/iostream_test.go deleted file mode 100644 index 8b8782b2..00000000 --- a/pkg/iostream/iostream_test.go +++ /dev/null @@ -1,107 +0,0 @@ -package iostream_test - -import ( - "bytes" - "context" - "io" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/wetware/ww/pkg/iostream" -) - -func TestStream(t *testing.T) { - t.Parallel() - - buf := &mockCloser{Buffer: new(bytes.Buffer)} - stream := iostream.New(buf) - defer stream.Release() - - stream.AddRef().Release() // for test coverage - - f, release := stream.WriteString(context.TODO(), "hello, world!") - defer release() - - require.NoError(t, f.Err(), "should write") - assert.Equal(t, "hello, world!", buf.String()) - - err := stream.Close(context.TODO()) - require.NoError(t, err, "should close") - - buf.Reset() - f, release = stream.WriteString(context.TODO(), "should fail") - defer release() - - require.ErrorIs(t, f.Err(), iostream.ErrClosed, - "write to closed stream should fail") - assert.Zero(t, buf.Len(), - "data should not be written to closed stream") - assert.True(t, buf.Closed, - "writer should have been closed") -} - -func TestProvider(t *testing.T) { - const bufferSize = 2048 - const s1, s2 = "hello, world!\n", "hello again, world!\n" - rc, wc := io.Pipe() // Client Reader/Writer - rs, ws := io.Pipe() // Server Reader/Writer, could be replaced with bytes.Buffer - - defer rc.Close() - defer wc.Close() - defer rs.Close() - defer ws.Close() - - // Write s1 to the client writer - go func() { - n, err := wc.Write([]byte(s1)) - require.NoError(t, err) - assert.Equal(t, n, len(s1)) - }() - - // Provide the client reader to the server writer - go func() { - p := iostream.NewProvider(rc) - defer p.Release() - p.AddRef().Release() // For test coverage - _, release := p.Provide(context.TODO(), iostream.New(ws)) - defer release() - }() - - // Check the server reader for s1 - b1 := make([]byte, bufferSize) - n, err := rs.Read(b1) - require.NoError(t, err) - assert.Equal(t, n, len(s1)) - assert.Equal(t, string(b1[0:n]), s1) - - // Write s2 to the client writer - go func() { - n, err := wc.Write([]byte(s2)) - require.NoError(t, err) - assert.Equal(t, n, len(s2)) - }() - - // Check server reader for s2 - n, err = rs.Read(b1) - require.NoError(t, err) - assert.Equal(t, n, len(s2)) - assert.Equal(t, string(b1[0:n]), s2) - - // Closing the client reader should make the provider close the - // server pipe writer (and therefore reader) - rc.Close() - _, err = rs.Read(make([]byte, 0)) - assert.NotNil(t, err) - assert.Equal(t, err.Error(), "EOF") -} - -type mockCloser struct { - Closed bool - *bytes.Buffer -} - -func (mc *mockCloser) Close() error { - mc.Closed = true - return nil -}