diff --git a/Makefile b/Makefile index c059a3dc..a8f6552b 100644 --- a/Makefile +++ b/Makefile @@ -10,7 +10,7 @@ all: capnp clean: clean-capnp clean-mocks -capnp: capnp-anchor capnp-pubsub capnp-cluster capnp-channel capnp-proc capnp-iostream capnp-wasm +capnp: capnp-anchor capnp-pubsub capnp-cluster capnp-channel capnp-process capnp-iostream # N.B.: compiling capnp schemas requires having capnproto.org/go/capnp/v3 installed # on the GOPATH. @@ -30,20 +30,16 @@ capnp-channel: @mkdir -p internal/api/channel @capnp compile -I$(GOPATH)/src/capnproto.org/go/capnp/std -ogo:internal/api/channel --src-prefix=api api/channel.capnp -capnp-proc: - @mkdir -p internal/api/proc - @capnp compile -I$(GOPATH)/src/capnproto.org/go/capnp/std -ogo:internal/api/proc --src-prefix=api api/proc.capnp +capnp-process: + @mkdir -p internal/api/process + @capnp compile -I$(GOPATH)/src/capnproto.org/go/capnp/std -ogo:internal/api/process --src-prefix=api api/process.capnp capnp-iostream: @mkdir -p internal/api/iostream @capnp compile -I$(GOPATH)/src/capnproto.org/go/capnp/std -ogo:internal/api/iostream --src-prefix=api api/iostream.capnp -capnp-wasm: - @mkdir -p internal/api/wasm - @capnp compile -I$(GOPATH)/src/capnproto.org/go/capnp/std -ogo:internal/api/wasm --src-prefix=api api/wasm.capnp - -clean-capnp: clean-capnp-anchor clean-capnp-pubsub clean-capnp-cluster clean-capnp-channel clean-capnp-proc clean-capnp-iostream clean-capnp-wasm +clean-capnp: clean-capnp-anchor clean-capnp-pubsub clean-capnp-cluster clean-capnp-channel clean-capnp-process clean-capnp-iostream clean-capnp-wasm clean-capnp-anchor: @rm -rf internal/api/anchor @@ -57,15 +53,12 @@ clean-capnp-cluster: clean-capnp-channel: @rm -rf internal/api/channel -clean-capnp-proc: - @rm -rf internal/api/proc +clean-capnp-process: + @rm -rf internal/api/process clean-capnp-iostream: @rm -rf internal/api/iostream -clean-capnp-wasm: - @rm -rf internal/api/wasm - mocks: clean-mocks # This roundabout call to 'go generate' allows us to: diff --git a/api/process.capnp b/api/process.capnp index d5621e6a..bad2c9d7 100644 --- a/api/process.capnp +++ b/api/process.capnp @@ -7,23 +7,24 @@ $Go.import("github.com/wetware/ww/internal/api/process"); interface Executor { - spawn @0 (binary :Data, entryfunction :Text) -> (process :Process); + spawn @0 (byteCode :Data, entryPoint :Text = "run") -> (process :Process); # spawn a WASM based process from the binary module with the target # entry function - - using IOStream = import "iostream.capnp"; } interface Process { - start @0 () -> (); # start the process - stop @1 () -> (); # TODO: provide a signal such as SIGTERM, SIGKILL... - wait @2 () -> (error :Text); # wait for an started process to finish - close @3 () -> (); # close should always be called after running a process - - input @4 () -> (stdin :IOStream.Stream); - # the resulting stream can be used to provide input to the process - output @5(stdout :IOStream.Stream, stderr :IOStream.Stream) -> (); - # receives an stream to provide stdout and stderr to + start @0 () -> (); + stop @1 () -> (); + wait @2 () -> (error :Error); +} - using IOStream = import "iostream.capnp"; +struct Error { + union { + none @0 :Void; + msg @1 :Text; + exitErr :group { + code @2 :UInt32; + module @3 :Text; + } + } } diff --git a/go.mod b/go.mod index 460ef669..90d4a939 100644 --- a/go.mod +++ b/go.mod @@ -36,11 +36,12 @@ require ( github.com/lthibault/go-libp2p-inproc-transport v0.4.0 github.com/multiformats/go-multistream v0.4.1 github.com/stretchr/testify v1.8.1 - github.com/tetratelabs/wazero v1.0.0-pre.4 + github.com/tetratelabs/wazero v1.0.0-rc.1 github.com/thejerf/suture/v4 v4.0.2 github.com/wetware/casm v0.0.0-20230224203443-f715090fc92c golang.org/x/sync v0.1.0 gopkg.in/alexcesaro/statsd.v2 v2.0.0 + lukechampine.com/blake3 v1.1.7 ) require ( @@ -150,6 +151,5 @@ require ( golang.org/x/time v0.3.0 // indirect golang.org/x/tools v0.6.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - lukechampine.com/blake3 v1.1.7 // indirect nhooyr.io/websocket v1.8.7 // indirect ) diff --git a/go.sum b/go.sum index 8c28bd47..0802875a 100644 --- a/go.sum +++ b/go.sum @@ -487,8 +487,8 @@ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= -github.com/tetratelabs/wazero v1.0.0-pre.4 h1:RBJQT5OzmORkSp6MmZDWoFEr0zXjk4pmvMKAdeUnsaI= -github.com/tetratelabs/wazero v1.0.0-pre.4/go.mod h1:u8wrFmpdrykiFK0DFPiFm5a4+0RzsdmXYVtijBKqUVo= +github.com/tetratelabs/wazero v1.0.0-rc.1 h1:ytecMV5Ue0BwezjKh/cM5yv1Mo49ep2R2snSsQUyToc= +github.com/tetratelabs/wazero v1.0.0-rc.1/go.mod h1:wYx2gNRg8/WihJfSDxA1TIL8H+GkfLYm+bIfbblu9VQ= github.com/thejerf/suture/v4 v4.0.2 h1:VxIH/J8uYvqJY1+9fxi5GBfGRkRZ/jlSOP6x9HijFQc= github.com/thejerf/suture/v4 v4.0.2/go.mod h1:g0e8vwskm9tI0jRjxrnA6lSr0q6OfPdWJVX7G5bVWRs= github.com/tinylib/msgp v1.1.5 h1:2gXmtWueD2HefZHQe1QOy9HVzmFrLOVvsXwXBQ0ayy0= diff --git a/internal/api/cluster/cluster.capnp.go b/internal/api/cluster/cluster.capnp.go index 533d3ab2..2eeb72df 100644 --- a/internal/api/cluster/cluster.capnp.go +++ b/internal/api/cluster/cluster.capnp.go @@ -25,7 +25,7 @@ func (c Host) View(ctx context.Context, params func(Host_view_Params) error) (Ho Method: capnp.Method{ InterfaceID: 0x957cbefc645fd307, MethodID: 0, - InterfaceName: "api/cluster.capnp:Host", + InterfaceName: "cluster.capnp:Host", MethodName: "view", }, } @@ -41,7 +41,7 @@ func (c Host) PubSub(ctx context.Context, params func(Host_pubSub_Params) error) Method: capnp.Method{ InterfaceID: 0x957cbefc645fd307, MethodID: 1, - InterfaceName: "api/cluster.capnp:Host", + InterfaceName: "cluster.capnp:Host", MethodName: "pubSub", }, } @@ -57,7 +57,7 @@ func (c Host) Root(ctx context.Context, params func(Host_root_Params) error) (Ho Method: capnp.Method{ InterfaceID: 0x957cbefc645fd307, MethodID: 2, - InterfaceName: "api/cluster.capnp:Host", + InterfaceName: "cluster.capnp:Host", MethodName: "root", }, } @@ -73,7 +73,7 @@ func (c Host) Debug(ctx context.Context, params func(Host_debug_Params) error) ( Method: capnp.Method{ InterfaceID: 0x957cbefc645fd307, MethodID: 3, - InterfaceName: "api/cluster.capnp:Host", + InterfaceName: "cluster.capnp:Host", MethodName: "debug", }, } @@ -89,7 +89,7 @@ func (c Host) Executor(ctx context.Context, params func(Host_executor_Params) er Method: capnp.Method{ InterfaceID: 0x957cbefc645fd307, MethodID: 4, - InterfaceName: "api/cluster.capnp:Host", + InterfaceName: "cluster.capnp:Host", MethodName: "executor", }, } @@ -202,7 +202,7 @@ func Host_Methods(methods []server.Method, s Host_Server) []server.Method { Method: capnp.Method{ InterfaceID: 0x957cbefc645fd307, MethodID: 0, - InterfaceName: "api/cluster.capnp:Host", + InterfaceName: "cluster.capnp:Host", MethodName: "view", }, Impl: func(ctx context.Context, call *server.Call) error { @@ -214,7 +214,7 @@ func Host_Methods(methods []server.Method, s Host_Server) []server.Method { Method: capnp.Method{ InterfaceID: 0x957cbefc645fd307, MethodID: 1, - InterfaceName: "api/cluster.capnp:Host", + InterfaceName: "cluster.capnp:Host", MethodName: "pubSub", }, Impl: func(ctx context.Context, call *server.Call) error { @@ -226,7 +226,7 @@ func Host_Methods(methods []server.Method, s Host_Server) []server.Method { Method: capnp.Method{ InterfaceID: 0x957cbefc645fd307, MethodID: 2, - InterfaceName: "api/cluster.capnp:Host", + InterfaceName: "cluster.capnp:Host", MethodName: "root", }, Impl: func(ctx context.Context, call *server.Call) error { @@ -238,7 +238,7 @@ func Host_Methods(methods []server.Method, s Host_Server) []server.Method { Method: capnp.Method{ InterfaceID: 0x957cbefc645fd307, MethodID: 3, - InterfaceName: "api/cluster.capnp:Host", + InterfaceName: "cluster.capnp:Host", MethodName: "debug", }, Impl: func(ctx context.Context, call *server.Call) error { @@ -250,7 +250,7 @@ func Host_Methods(methods []server.Method, s Host_Server) []server.Method { Method: capnp.Method{ InterfaceID: 0x957cbefc645fd307, MethodID: 4, - InterfaceName: "api/cluster.capnp:Host", + InterfaceName: "cluster.capnp:Host", MethodName: "executor", }, Impl: func(ctx context.Context, call *server.Call) error { @@ -1105,45 +1105,45 @@ func (p Host_executor_Results_Future) Executor() process.Executor { return process.Executor(p.Future.Field(0, nil).Client()) } -const schema_fcf6ac08e448a6ac = "x\xda\x8cRKh\x13Q\x14\xbdw\xde\x8c\xd3L\x8c" + - "\xe1%\x11\x8b\x82\x95\xda\x85\xcd\xa2\xda\x8a\x08\x01\xc9 " + - "\x88\xb5\x06\xcc$\x88.D\x9d\xa4C)\xa4&df" + - "\xaa\x82\x9b\x0a\x01E\x88((\xb8qU]H\xd1\x85" + - "\x88\xd8b\x16\x82\x1bw\xfe\xc0\x82\xbaJ\x04\xa1-\xa2" + - "D(D\x9e\xbc\x89\x93\x8c\xd1~v\x0f\xee9\xe7\xdd" + - "s\xce\xdd\x93 \xaa8\x188\x16\x01A[\x906\xb0" + - "\xca\xd3\xa5\xda\xce]\xd7.\x03\xdd\x82\x00\xa2\x0c\xb07" + - "\xa0lE\x10\xd9\x8f\xec\xf7o\xdb\xcb\x0bW=\x93\xba" + - "\xaf\x97O\xea\xef\x12\xa5\xf2\xcd\x93\xd7\x9b\x13\x09\xf9\xe8" + - "\x13\x1fa\xb8\xe6\x8b\x032\xf9\xed\x99\xd1F\xe5\xd2-" + - "\xa0!\xc2f\xee\x0fW\xbbf~6\x000,)\xb3" + - "\xe1\x80\xc2\xf1>\xe50\x86\xdf\xf3'\x13nKs\xcb" + - ";\xa6\xeez\xe5\x9e+\x07\xb9\xdcK\x85\xcbU\x9f\xd9" + - "\xe9\xa3/\xc4i\xcf&_\x9b;\xf6\x8b\xc1\x87\xe4l" + - "w\xc5K}\xadD9u\xde\xa1\xa6\x12\x1f\xfa\x17#" + - "\xe9\x8a\x87\xfaK\x89q\xaa\xff\xd0\x89\xe9\xd2\xa9\xb9W" + - "^jMqL,:\xd4\xa5M\xbe\xba\xbd|\xe5\xa3" + - "\x17\x10\xf0\x0fq\xc0f?\x07\xdcy\xd0\x90\xec\xde'" + - "5\x8f\xf6>\x7f\x14\x81\x01\x83\x08\xd3\x0b\xe3\xbb\xb39" + - "\xdb$\x96Q\x1c\xc8\xea\x85s\x85\xd8p\xde\xb4\x06\x8a" + - "\xf9\xbc\xd5\x97\xd4\x8b:\x990W\x04\x8d\x1a\x19{\x8c" + - "\xa3d}\x15\xd4\xe4\xb8q\xbe/e\x98\xb6\x9c\xb3L" + - "M$\"\x80\x88\x004\x10\x05\xd0\xba\x08j\x11\x01\x83" + - "\x1c\x84!\x91\x00b\x08\xb0\xa5\x85\xaeV\xbc)\x96D" + - "\xd4\"D\x02h\xa5\x8dn\xcd\xf4F\x14\x04Z\x92\xb1" + - "m\x19\xddp\xe8\xc5\x18\x08tBF\xa1uI\xe8&" + - "Ku\xce;.#i\xdd\x12\xba\x85\xd1#C \xd0" + - "\x032\x8a\xad\x8a\xd0\xbd\x03:8\x02\x02\xed\x97\x9d\xd5" + - "U\x8c\x17\xecL\xda\xce\xa8\x18\xe4\xd1\xa9\xd8\xe3\x84\xa3" + - "\"3.\x18Y\xdb\xca\x17\x01@\xc5$\xb6\xbd\x89\x1d" + - "9\xb9@'\xab\x9ce\x827\xab\x11\x00m#A\xad" + - "[\xf8K\x11){cO\xdd\x9b==\xf0\x08\x00\x91" + - "z\xa2\xfbo\x0d\xebk4e\x98A\xbb\xa3,\xef\x02" + - "\x0ej\xccp\x16\xf8\xb73\xb2\x92\xaf8\xff}\x95\xcf" + - "\x9d\x9b[\xebP8\x08)\xeb\x9d/\xfb\xaa\xfbC\xd5" + - "\xb5\\7[\xe9K\x19=f\xa7\xa3X[\xf5Oy" + - "HY,?\xb9\xed\xcb\xe3\xe4\xe7u\xea&\xf5 \xb7" + - "\xf4;\x00\x00\xff\xff\xae1_h" +const schema_fcf6ac08e448a6ac = "x\xda\x8cRKh\x13Q\x14\xbd\xf7\xbd\x893\xa3\xb6" + + "\xf5e\xba(\x8a\xbfZ\xc1vQ\xac\"b@:]" + + "\x14\xdb\xdaE\xa6\xa5\xe8B\xd4$\x1dJ!5!3" + + "S\x15\xdcT\x08(BDAA\x17\xaej\x17Rt" + + "!\"Z\xc8B\\\xb9T\x17\x16D7\xa9+-\xa2" + + "T\x88\x04\x9e\xbcIf\xf2\x92Zt}\xcf=\xe7\x9e" + + "s\xee\xc1eb*}-\x9f\xb7\x01\xb1^G6\xf1" + + "\xe2\xf3\xd5\x95}\x07n\\\x05f \x80\xa2\x02\x18\x13" + + "\xdaoP\xf8\xcf\xd4\x8f\xef\xbb\x0a_\xaf\xd7\x07\x87\x8f" + + "i\x04A\xe1k\xefG\xf3\x85\xdb\xa7oV'\x11\x14" + + "\xa3\x9db\x84\xc6^\xad\x1f\x90\xab\xef\xceMV\x8aW" + + "\xee\x00k\xa5|qa\xa8\xa4-\xfe\xaa\x00\xa01\xa0" + + "\xdd7\x865\x81\x1f\xd4N\xa0\xc1t\x15\x80\x93\xbb\x91" + + "\xa5\xf2\x9e\xb9\x072]Y\xeb\x14t\xa8\x0b\xba\xd2\x0b" + + "o\xfc\xe4+e^:q\xbf.N\xecV\xda\x1e\xd3" + + "\xf3\x1dEyS\xd77\x8bM\xe6o\x8e\x8d~\xe8\xfe" + + "\xd6>^\x94<\x1c\xd1\xb7\x0b\x0f[\x06O\xcd\xe7\xcf" + + ",\xbdi\xf0\xa0W=\xf8\xab\xab\xad\xfa\x9aW\xbe\xf6" + + "Q\x06\x0c\xe8Q\x01\x18\xf6\x01\xf7\x1eU\"^\xe7\xb3" + + "\x15\x89{ZhsX\xe0\xa9\xb4\xe7\xb8v\xae\x97\xa4" + + "\x12\xd9\x0b\xd9\xd8P\xc6q{s\x99\x8c\xdb\xd5\x1fO" + + "\xe4\x123N\x08P%\xc0\xa4\x9d\xf4\xa6\xba\xaa\x00\x08" + + "\x00\xd2|v\xda\xbe\xd85f;^\xdau\xc0R\xa8" + + "\x02\xa0 \x00k\xe9\x01\xb04\x8aV;\xc16\x01\xc2" + + "\xa8B\x011\x0a\x18\xea`\xc0C\x1d7\x8eh\xb5\xd3" + + "\x08@\x18,\x06\x8d\xb2[=@X^\xc5\xba=\x0c" + + "\x82`\x97c@\xd8\x8c\x8a$\xfc\x19\x0cRd\x09\xb1" + + "7\xa1\"\x0d\xdf\x06\x83r\xd8\xf0! \xec\xb8\x8aJ" + + "X\x07\x06\x95\xb3\xbe\x11 \xac[\xf5\xcf6\xb1?\xeb" + + "%\xc7\xbd\xa4\x89m\",\x13w\xfb\x91\x98\xc8\xedK" + + "v\xcas39\x0001\x8eu_T\xca'\x00\xf9" + + "\x19\xa9i\xd7\x913\x1a\x01\xb0\xb6R\xb4:H\x03\x1b" + + "2\xfe\xd6\x9b{\xf8\xf2l\xef\x13\x00D&EF\x9a" + + "\xa3o\xee\x8e\xae\xeb.(\x076R\xf6aS\xb6\xaf" + + "\xbc\xbe\xa4\xbf\x9a\x11\xa2t\x03U\xff\xa5\xfe\xf5\x11\x02" + + "\x84\x8cw.\x17\xf4\xd2\xd1h\xa9\xd9\xa6\xccW\x8d\xbf" + + "\xc6\x88\x0d\xf9\xc5\xea\x8c\xb5\x96\x90\xf1Xfv\xc7\x97" + + "\xa7\xf1O\xff\xc1Y{k\xf8\x13\x00\x00\xff\xff}\xa0" + + "H\x90" func init() { schemas.Register(schema_fcf6ac08e448a6ac, diff --git a/internal/api/process/process.capnp.go b/internal/api/process/process.capnp.go index f28d3b73..f8c71371 100644 --- a/internal/api/process/process.capnp.go +++ b/internal/api/process/process.capnp.go @@ -10,7 +10,7 @@ import ( server "capnproto.org/go/capnp/v3/server" context "context" fmt "fmt" - iostream "github.com/wetware/ww/internal/api/iostream" + strconv "strconv" ) type Executor capnp.Client @@ -23,7 +23,7 @@ func (c Executor) Spawn(ctx context.Context, params func(Executor_spawn_Params) Method: capnp.Method{ InterfaceID: 0xaf2e5ebaa58175d2, MethodID: 0, - InterfaceName: "api/process.capnp:Executor", + InterfaceName: "process.capnp:Executor", MethodName: "spawn", }, } @@ -128,7 +128,7 @@ func Executor_Methods(methods []server.Method, s Executor_Server) []server.Metho Method: capnp.Method{ InterfaceID: 0xaf2e5ebaa58175d2, MethodID: 0, - InterfaceName: "api/process.capnp:Executor", + InterfaceName: "process.capnp:Executor", MethodName: "spawn", }, Impl: func(ctx context.Context, call *server.Call) error { @@ -212,35 +212,35 @@ func (s Executor_spawn_Params) Message() *capnp.Message { func (s Executor_spawn_Params) Segment() *capnp.Segment { return capnp.Struct(s).Segment() } -func (s Executor_spawn_Params) Binary() ([]byte, error) { +func (s Executor_spawn_Params) ByteCode() ([]byte, error) { p, err := capnp.Struct(s).Ptr(0) return []byte(p.Data()), err } -func (s Executor_spawn_Params) HasBinary() bool { +func (s Executor_spawn_Params) HasByteCode() bool { return capnp.Struct(s).HasPtr(0) } -func (s Executor_spawn_Params) SetBinary(v []byte) error { +func (s Executor_spawn_Params) SetByteCode(v []byte) error { return capnp.Struct(s).SetData(0, v) } -func (s Executor_spawn_Params) Entryfunction() (string, error) { +func (s Executor_spawn_Params) EntryPoint() (string, error) { p, err := capnp.Struct(s).Ptr(1) - return p.Text(), err + return p.TextDefault("run"), err } -func (s Executor_spawn_Params) HasEntryfunction() bool { +func (s Executor_spawn_Params) HasEntryPoint() bool { return capnp.Struct(s).HasPtr(1) } -func (s Executor_spawn_Params) EntryfunctionBytes() ([]byte, error) { +func (s Executor_spawn_Params) EntryPointBytes() ([]byte, error) { p, err := capnp.Struct(s).Ptr(1) - return p.TextBytes(), err + return p.TextBytesDefault("run"), err } -func (s Executor_spawn_Params) SetEntryfunction(v string) error { - return capnp.Struct(s).SetText(1, v) +func (s Executor_spawn_Params) SetEntryPoint(v string) error { + return capnp.Struct(s).SetNewText(1, v) } // Executor_spawn_Params_List is a list of Executor_spawn_Params. @@ -355,7 +355,7 @@ func (c Process) Start(ctx context.Context, params func(Process_start_Params) er Method: capnp.Method{ InterfaceID: 0xda23f0d3a8250633, MethodID: 0, - InterfaceName: "api/process.capnp:Process", + InterfaceName: "process.capnp:Process", MethodName: "start", }, } @@ -371,7 +371,7 @@ func (c Process) Stop(ctx context.Context, params func(Process_stop_Params) erro Method: capnp.Method{ InterfaceID: 0xda23f0d3a8250633, MethodID: 1, - InterfaceName: "api/process.capnp:Process", + InterfaceName: "process.capnp:Process", MethodName: "stop", }, } @@ -387,7 +387,7 @@ func (c Process) Wait(ctx context.Context, params func(Process_wait_Params) erro Method: capnp.Method{ InterfaceID: 0xda23f0d3a8250633, MethodID: 2, - InterfaceName: "api/process.capnp:Process", + InterfaceName: "process.capnp:Process", MethodName: "wait", }, } @@ -398,54 +398,6 @@ func (c Process) Wait(ctx context.Context, params func(Process_wait_Params) erro ans, release := capnp.Client(c).SendCall(ctx, s) return Process_wait_Results_Future{Future: ans.Future()}, release } -func (c Process) Close(ctx context.Context, params func(Process_close_Params) error) (Process_close_Results_Future, capnp.ReleaseFunc) { - s := capnp.Send{ - Method: capnp.Method{ - InterfaceID: 0xda23f0d3a8250633, - MethodID: 3, - InterfaceName: "api/process.capnp:Process", - MethodName: "close", - }, - } - if params != nil { - s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} - s.PlaceArgs = func(s capnp.Struct) error { return params(Process_close_Params(s)) } - } - ans, release := capnp.Client(c).SendCall(ctx, s) - return Process_close_Results_Future{Future: ans.Future()}, release -} -func (c Process) Input(ctx context.Context, params func(Process_input_Params) error) (Process_input_Results_Future, capnp.ReleaseFunc) { - s := capnp.Send{ - Method: capnp.Method{ - InterfaceID: 0xda23f0d3a8250633, - MethodID: 4, - InterfaceName: "api/process.capnp:Process", - MethodName: "input", - }, - } - if params != nil { - s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 0} - s.PlaceArgs = func(s capnp.Struct) error { return params(Process_input_Params(s)) } - } - ans, release := capnp.Client(c).SendCall(ctx, s) - return Process_input_Results_Future{Future: ans.Future()}, release -} -func (c Process) Output(ctx context.Context, params func(Process_output_Params) error) (Process_output_Results_Future, capnp.ReleaseFunc) { - s := capnp.Send{ - Method: capnp.Method{ - InterfaceID: 0xda23f0d3a8250633, - MethodID: 5, - InterfaceName: "api/process.capnp:Process", - MethodName: "output", - }, - } - if params != nil { - s.ArgsSize = capnp.ObjectSize{DataSize: 0, PointerCount: 2} - s.PlaceArgs = func(s capnp.Struct) error { return params(Process_output_Params(s)) } - } - ans, release := capnp.Client(c).SendCall(ctx, s) - return Process_output_Results_Future{Future: ans.Future()}, release -} // String returns a string that identifies this capability for debugging // purposes. Its format should not be depended on: in particular, it @@ -519,12 +471,6 @@ type Process_Server interface { Stop(context.Context, Process_stop) error Wait(context.Context, Process_wait) error - - Close(context.Context, Process_close) error - - Input(context.Context, Process_input) error - - Output(context.Context, Process_output) error } // Process_NewServer creates a new Server from an implementation of Process_Server. @@ -543,14 +489,14 @@ func Process_ServerToClient(s Process_Server) Process { // This can be used to create a more complicated Server. func Process_Methods(methods []server.Method, s Process_Server) []server.Method { if cap(methods) == 0 { - methods = make([]server.Method, 0, 6) + methods = make([]server.Method, 0, 3) } methods = append(methods, server.Method{ Method: capnp.Method{ InterfaceID: 0xda23f0d3a8250633, MethodID: 0, - InterfaceName: "api/process.capnp:Process", + InterfaceName: "process.capnp:Process", MethodName: "start", }, Impl: func(ctx context.Context, call *server.Call) error { @@ -562,7 +508,7 @@ func Process_Methods(methods []server.Method, s Process_Server) []server.Method Method: capnp.Method{ InterfaceID: 0xda23f0d3a8250633, MethodID: 1, - InterfaceName: "api/process.capnp:Process", + InterfaceName: "process.capnp:Process", MethodName: "stop", }, Impl: func(ctx context.Context, call *server.Call) error { @@ -574,7 +520,7 @@ func Process_Methods(methods []server.Method, s Process_Server) []server.Method Method: capnp.Method{ InterfaceID: 0xda23f0d3a8250633, MethodID: 2, - InterfaceName: "api/process.capnp:Process", + InterfaceName: "process.capnp:Process", MethodName: "wait", }, Impl: func(ctx context.Context, call *server.Call) error { @@ -582,42 +528,6 @@ func Process_Methods(methods []server.Method, s Process_Server) []server.Method }, }) - methods = append(methods, server.Method{ - Method: capnp.Method{ - InterfaceID: 0xda23f0d3a8250633, - MethodID: 3, - InterfaceName: "api/process.capnp:Process", - MethodName: "close", - }, - Impl: func(ctx context.Context, call *server.Call) error { - return s.Close(ctx, Process_close{call}) - }, - }) - - methods = append(methods, server.Method{ - Method: capnp.Method{ - InterfaceID: 0xda23f0d3a8250633, - MethodID: 4, - InterfaceName: "api/process.capnp:Process", - MethodName: "input", - }, - Impl: func(ctx context.Context, call *server.Call) error { - return s.Input(ctx, Process_input{call}) - }, - }) - - methods = append(methods, server.Method{ - Method: capnp.Method{ - InterfaceID: 0xda23f0d3a8250633, - MethodID: 5, - InterfaceName: "api/process.capnp:Process", - MethodName: "output", - }, - Impl: func(ctx context.Context, call *server.Call) error { - return s.Output(ctx, Process_output{call}) - }, - }) - return methods } @@ -672,57 +582,6 @@ func (c Process_wait) AllocResults() (Process_wait_Results, error) { return Process_wait_Results(r), err } -// Process_close holds the state for a server call to Process.close. -// See server.Call for documentation. -type Process_close struct { - *server.Call -} - -// Args returns the call's arguments. -func (c Process_close) Args() Process_close_Params { - return Process_close_Params(c.Call.Args()) -} - -// AllocResults allocates the results struct. -func (c Process_close) AllocResults() (Process_close_Results, error) { - r, err := c.Call.AllocResults(capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Process_close_Results(r), err -} - -// Process_input holds the state for a server call to Process.input. -// See server.Call for documentation. -type Process_input struct { - *server.Call -} - -// Args returns the call's arguments. -func (c Process_input) Args() Process_input_Params { - return Process_input_Params(c.Call.Args()) -} - -// AllocResults allocates the results struct. -func (c Process_input) AllocResults() (Process_input_Results, error) { - r, err := c.Call.AllocResults(capnp.ObjectSize{DataSize: 0, PointerCount: 1}) - return Process_input_Results(r), err -} - -// Process_output holds the state for a server call to Process.output. -// See server.Call for documentation. -type Process_output struct { - *server.Call -} - -// Args returns the call's arguments. -func (c Process_output) Args() Process_output_Params { - return Process_output_Params(c.Call.Args()) -} - -// AllocResults allocates the results struct. -func (c Process_output) AllocResults() (Process_output_Results, error) { - r, err := c.Call.AllocResults(capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Process_output_Results(r), err -} - // Process_List is a list of Process. type Process_List = capnp.CapList[Process] @@ -1104,22 +963,28 @@ func (s Process_wait_Results) Message() *capnp.Message { func (s Process_wait_Results) Segment() *capnp.Segment { return capnp.Struct(s).Segment() } -func (s Process_wait_Results) Error() (string, error) { +func (s Process_wait_Results) Error() (Error, error) { p, err := capnp.Struct(s).Ptr(0) - return p.Text(), err + return Error(p.Struct()), err } func (s Process_wait_Results) HasError() bool { return capnp.Struct(s).HasPtr(0) } -func (s Process_wait_Results) ErrorBytes() ([]byte, error) { - p, err := capnp.Struct(s).Ptr(0) - return p.TextBytes(), err +func (s Process_wait_Results) SetError(v Error) error { + return capnp.Struct(s).SetPtr(0, capnp.Struct(v).ToPtr()) } -func (s Process_wait_Results) SetError(v string) error { - return capnp.Struct(s).SetText(0, v) +// NewError sets the error field to a newly +// allocated Error struct, preferring placement in s's segment. +func (s Process_wait_Results) NewError() (Error, error) { + ss, err := NewError(capnp.Struct(s).Segment()) + if err != nil { + return Error{}, err + } + err = capnp.Struct(s).SetPtr(0, capnp.Struct(ss).ToPtr()) + return ss, err } // Process_wait_Results_List is a list of Process_wait_Results. @@ -1138,528 +1003,244 @@ func (f Process_wait_Results_Future) Struct() (Process_wait_Results, error) { p, err := f.Future.Ptr() return Process_wait_Results(p.Struct()), err } - -type Process_close_Params capnp.Struct - -// Process_close_Params_TypeID is the unique identifier for the type Process_close_Params. -const Process_close_Params_TypeID = 0x86e3410d1abd406b - -func NewProcess_close_Params(s *capnp.Segment) (Process_close_Params, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Process_close_Params(st), err -} - -func NewRootProcess_close_Params(s *capnp.Segment) (Process_close_Params, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Process_close_Params(st), err +func (p Process_wait_Results_Future) Error() Error_Future { + return Error_Future{Future: p.Future.Field(0, nil)} } -func ReadRootProcess_close_Params(msg *capnp.Message) (Process_close_Params, error) { - root, err := msg.Root() - return Process_close_Params(root.Struct()), err -} - -func (s Process_close_Params) String() string { - str, _ := text.Marshal(0x86e3410d1abd406b, capnp.Struct(s)) - return str -} - -func (s Process_close_Params) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Struct(s).EncodeAsPtr(seg) -} - -func (Process_close_Params) DecodeFromPtr(p capnp.Ptr) Process_close_Params { - return Process_close_Params(capnp.Struct{}.DecodeFromPtr(p)) -} +type Error capnp.Struct +type Error_exitErr Error +type Error_Which uint16 -func (s Process_close_Params) ToPtr() capnp.Ptr { - return capnp.Struct(s).ToPtr() -} -func (s Process_close_Params) IsValid() bool { - return capnp.Struct(s).IsValid() -} - -func (s Process_close_Params) Message() *capnp.Message { - return capnp.Struct(s).Message() -} - -func (s Process_close_Params) Segment() *capnp.Segment { - return capnp.Struct(s).Segment() -} - -// Process_close_Params_List is a list of Process_close_Params. -type Process_close_Params_List = capnp.StructList[Process_close_Params] - -// NewProcess_close_Params creates a new list of Process_close_Params. -func NewProcess_close_Params_List(s *capnp.Segment, sz int32) (Process_close_Params_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}, sz) - return capnp.StructList[Process_close_Params](l), err -} - -// Process_close_Params_Future is a wrapper for a Process_close_Params promised by a client call. -type Process_close_Params_Future struct{ *capnp.Future } - -func (f Process_close_Params_Future) Struct() (Process_close_Params, error) { - p, err := f.Future.Ptr() - return Process_close_Params(p.Struct()), err -} - -type Process_close_Results capnp.Struct - -// Process_close_Results_TypeID is the unique identifier for the type Process_close_Results. -const Process_close_Results_TypeID = 0xd93c9aa0627bc93c - -func NewProcess_close_Results(s *capnp.Segment) (Process_close_Results, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Process_close_Results(st), err -} - -func NewRootProcess_close_Results(s *capnp.Segment) (Process_close_Results, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Process_close_Results(st), err -} - -func ReadRootProcess_close_Results(msg *capnp.Message) (Process_close_Results, error) { - root, err := msg.Root() - return Process_close_Results(root.Struct()), err -} - -func (s Process_close_Results) String() string { - str, _ := text.Marshal(0xd93c9aa0627bc93c, capnp.Struct(s)) - return str -} - -func (s Process_close_Results) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Struct(s).EncodeAsPtr(seg) -} - -func (Process_close_Results) DecodeFromPtr(p capnp.Ptr) Process_close_Results { - return Process_close_Results(capnp.Struct{}.DecodeFromPtr(p)) -} - -func (s Process_close_Results) ToPtr() capnp.Ptr { - return capnp.Struct(s).ToPtr() -} -func (s Process_close_Results) IsValid() bool { - return capnp.Struct(s).IsValid() -} - -func (s Process_close_Results) Message() *capnp.Message { - return capnp.Struct(s).Message() -} - -func (s Process_close_Results) Segment() *capnp.Segment { - return capnp.Struct(s).Segment() -} - -// Process_close_Results_List is a list of Process_close_Results. -type Process_close_Results_List = capnp.StructList[Process_close_Results] - -// NewProcess_close_Results creates a new list of Process_close_Results. -func NewProcess_close_Results_List(s *capnp.Segment, sz int32) (Process_close_Results_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}, sz) - return capnp.StructList[Process_close_Results](l), err -} +const ( + Error_Which_none Error_Which = 0 + Error_Which_msg Error_Which = 1 + Error_Which_exitErr Error_Which = 2 +) -// Process_close_Results_Future is a wrapper for a Process_close_Results promised by a client call. -type Process_close_Results_Future struct{ *capnp.Future } +func (w Error_Which) String() string { + const s = "nonemsgexitErr" + switch w { + case Error_Which_none: + return s[0:4] + case Error_Which_msg: + return s[4:7] + case Error_Which_exitErr: + return s[7:14] -func (f Process_close_Results_Future) Struct() (Process_close_Results, error) { - p, err := f.Future.Ptr() - return Process_close_Results(p.Struct()), err + } + return "Error_Which(" + strconv.FormatUint(uint64(w), 10) + ")" } -type Process_input_Params capnp.Struct - -// Process_input_Params_TypeID is the unique identifier for the type Process_input_Params. -const Process_input_Params_TypeID = 0xb72541d950858a60 +// Error_TypeID is the unique identifier for the type Error. +const Error_TypeID = 0xd6be5a33d8c2c538 -func NewProcess_input_Params(s *capnp.Segment) (Process_input_Params, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Process_input_Params(st), err +func NewError(s *capnp.Segment) (Error, error) { + st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 1}) + return Error(st), err } -func NewRootProcess_input_Params(s *capnp.Segment) (Process_input_Params, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Process_input_Params(st), err +func NewRootError(s *capnp.Segment) (Error, error) { + st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 8, PointerCount: 1}) + return Error(st), err } -func ReadRootProcess_input_Params(msg *capnp.Message) (Process_input_Params, error) { +func ReadRootError(msg *capnp.Message) (Error, error) { root, err := msg.Root() - return Process_input_Params(root.Struct()), err + return Error(root.Struct()), err } -func (s Process_input_Params) String() string { - str, _ := text.Marshal(0xb72541d950858a60, capnp.Struct(s)) +func (s Error) String() string { + str, _ := text.Marshal(0xd6be5a33d8c2c538, capnp.Struct(s)) return str } -func (s Process_input_Params) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { +func (s Error) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { return capnp.Struct(s).EncodeAsPtr(seg) } -func (Process_input_Params) DecodeFromPtr(p capnp.Ptr) Process_input_Params { - return Process_input_Params(capnp.Struct{}.DecodeFromPtr(p)) +func (Error) DecodeFromPtr(p capnp.Ptr) Error { + return Error(capnp.Struct{}.DecodeFromPtr(p)) } -func (s Process_input_Params) ToPtr() capnp.Ptr { +func (s Error) ToPtr() capnp.Ptr { return capnp.Struct(s).ToPtr() } -func (s Process_input_Params) IsValid() bool { - return capnp.Struct(s).IsValid() -} - -func (s Process_input_Params) Message() *capnp.Message { - return capnp.Struct(s).Message() -} -func (s Process_input_Params) Segment() *capnp.Segment { - return capnp.Struct(s).Segment() +func (s Error) Which() Error_Which { + return Error_Which(capnp.Struct(s).Uint16(0)) } - -// Process_input_Params_List is a list of Process_input_Params. -type Process_input_Params_List = capnp.StructList[Process_input_Params] - -// NewProcess_input_Params creates a new list of Process_input_Params. -func NewProcess_input_Params_List(s *capnp.Segment, sz int32) (Process_input_Params_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}, sz) - return capnp.StructList[Process_input_Params](l), err -} - -// Process_input_Params_Future is a wrapper for a Process_input_Params promised by a client call. -type Process_input_Params_Future struct{ *capnp.Future } - -func (f Process_input_Params_Future) Struct() (Process_input_Params, error) { - p, err := f.Future.Ptr() - return Process_input_Params(p.Struct()), err -} - -type Process_input_Results capnp.Struct - -// Process_input_Results_TypeID is the unique identifier for the type Process_input_Results. -const Process_input_Results_TypeID = 0xf589dc1668ea3d8f - -func NewProcess_input_Results(s *capnp.Segment) (Process_input_Results, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}) - return Process_input_Results(st), err -} - -func NewRootProcess_input_Results(s *capnp.Segment) (Process_input_Results, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}) - return Process_input_Results(st), err -} - -func ReadRootProcess_input_Results(msg *capnp.Message) (Process_input_Results, error) { - root, err := msg.Root() - return Process_input_Results(root.Struct()), err -} - -func (s Process_input_Results) String() string { - str, _ := text.Marshal(0xf589dc1668ea3d8f, capnp.Struct(s)) - return str -} - -func (s Process_input_Results) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Struct(s).EncodeAsPtr(seg) -} - -func (Process_input_Results) DecodeFromPtr(p capnp.Ptr) Process_input_Results { - return Process_input_Results(capnp.Struct{}.DecodeFromPtr(p)) -} - -func (s Process_input_Results) ToPtr() capnp.Ptr { - return capnp.Struct(s).ToPtr() -} -func (s Process_input_Results) IsValid() bool { +func (s Error) IsValid() bool { return capnp.Struct(s).IsValid() } -func (s Process_input_Results) Message() *capnp.Message { +func (s Error) Message() *capnp.Message { return capnp.Struct(s).Message() } -func (s Process_input_Results) Segment() *capnp.Segment { +func (s Error) Segment() *capnp.Segment { return capnp.Struct(s).Segment() } -func (s Process_input_Results) Stdin() iostream.Stream { - p, _ := capnp.Struct(s).Ptr(0) - return iostream.Stream(p.Interface().Client()) -} +func (s Error) SetNone() { + capnp.Struct(s).SetUint16(0, 0) -func (s Process_input_Results) HasStdin() bool { - return capnp.Struct(s).HasPtr(0) } -func (s Process_input_Results) SetStdin(v iostream.Stream) error { - if !v.IsValid() { - return capnp.Struct(s).SetPtr(0, capnp.Ptr{}) +func (s Error) Msg() (string, error) { + if capnp.Struct(s).Uint16(0) != 1 { + panic("Which() != msg") } - seg := s.Segment() - in := capnp.NewInterface(seg, seg.Message().AddCap(capnp.Client(v))) - return capnp.Struct(s).SetPtr(0, in.ToPtr()) -} - -// Process_input_Results_List is a list of Process_input_Results. -type Process_input_Results_List = capnp.StructList[Process_input_Results] - -// NewProcess_input_Results creates a new list of Process_input_Results. -func NewProcess_input_Results_List(s *capnp.Segment, sz int32) (Process_input_Results_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 1}, sz) - return capnp.StructList[Process_input_Results](l), err -} - -// Process_input_Results_Future is a wrapper for a Process_input_Results promised by a client call. -type Process_input_Results_Future struct{ *capnp.Future } - -func (f Process_input_Results_Future) Struct() (Process_input_Results, error) { - p, err := f.Future.Ptr() - return Process_input_Results(p.Struct()), err -} -func (p Process_input_Results_Future) Stdin() iostream.Stream { - return iostream.Stream(p.Future.Field(0, nil).Client()) -} - -type Process_output_Params capnp.Struct - -// Process_output_Params_TypeID is the unique identifier for the type Process_output_Params. -const Process_output_Params_TypeID = 0xf5c2d7ad2dde5570 - -func NewProcess_output_Params(s *capnp.Segment) (Process_output_Params, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 2}) - return Process_output_Params(st), err + p, err := capnp.Struct(s).Ptr(0) + return p.Text(), err } -func NewRootProcess_output_Params(s *capnp.Segment) (Process_output_Params, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 2}) - return Process_output_Params(st), err +func (s Error) HasMsg() bool { + if capnp.Struct(s).Uint16(0) != 1 { + return false + } + return capnp.Struct(s).HasPtr(0) } -func ReadRootProcess_output_Params(msg *capnp.Message) (Process_output_Params, error) { - root, err := msg.Root() - return Process_output_Params(root.Struct()), err +func (s Error) MsgBytes() ([]byte, error) { + p, err := capnp.Struct(s).Ptr(0) + return p.TextBytes(), err } -func (s Process_output_Params) String() string { - str, _ := text.Marshal(0xf5c2d7ad2dde5570, capnp.Struct(s)) - return str +func (s Error) SetMsg(v string) error { + capnp.Struct(s).SetUint16(0, 1) + return capnp.Struct(s).SetText(0, v) } -func (s Process_output_Params) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Struct(s).EncodeAsPtr(seg) -} +func (s Error) ExitErr() Error_exitErr { return Error_exitErr(s) } -func (Process_output_Params) DecodeFromPtr(p capnp.Ptr) Process_output_Params { - return Process_output_Params(capnp.Struct{}.DecodeFromPtr(p)) +func (s Error) SetExitErr() { + capnp.Struct(s).SetUint16(0, 2) } -func (s Process_output_Params) ToPtr() capnp.Ptr { - return capnp.Struct(s).ToPtr() -} -func (s Process_output_Params) IsValid() bool { +func (s Error_exitErr) IsValid() bool { return capnp.Struct(s).IsValid() } -func (s Process_output_Params) Message() *capnp.Message { +func (s Error_exitErr) Message() *capnp.Message { return capnp.Struct(s).Message() } -func (s Process_output_Params) Segment() *capnp.Segment { +func (s Error_exitErr) Segment() *capnp.Segment { return capnp.Struct(s).Segment() } -func (s Process_output_Params) Stdout() iostream.Stream { - p, _ := capnp.Struct(s).Ptr(0) - return iostream.Stream(p.Interface().Client()) +func (s Error_exitErr) Code() uint32 { + return capnp.Struct(s).Uint32(4) } -func (s Process_output_Params) HasStdout() bool { - return capnp.Struct(s).HasPtr(0) +func (s Error_exitErr) SetCode(v uint32) { + capnp.Struct(s).SetUint32(4, v) } -func (s Process_output_Params) SetStdout(v iostream.Stream) error { - if !v.IsValid() { - return capnp.Struct(s).SetPtr(0, capnp.Ptr{}) - } - seg := s.Segment() - in := capnp.NewInterface(seg, seg.Message().AddCap(capnp.Client(v))) - return capnp.Struct(s).SetPtr(0, in.ToPtr()) +func (s Error_exitErr) Module() (string, error) { + p, err := capnp.Struct(s).Ptr(0) + return p.Text(), err } -func (s Process_output_Params) Stderr() iostream.Stream { - p, _ := capnp.Struct(s).Ptr(1) - return iostream.Stream(p.Interface().Client()) +func (s Error_exitErr) HasModule() bool { + return capnp.Struct(s).HasPtr(0) } -func (s Process_output_Params) HasStderr() bool { - return capnp.Struct(s).HasPtr(1) +func (s Error_exitErr) ModuleBytes() ([]byte, error) { + p, err := capnp.Struct(s).Ptr(0) + return p.TextBytes(), err } -func (s Process_output_Params) SetStderr(v iostream.Stream) error { - if !v.IsValid() { - return capnp.Struct(s).SetPtr(1, capnp.Ptr{}) - } - seg := s.Segment() - in := capnp.NewInterface(seg, seg.Message().AddCap(capnp.Client(v))) - return capnp.Struct(s).SetPtr(1, in.ToPtr()) +func (s Error_exitErr) SetModule(v string) error { + return capnp.Struct(s).SetText(0, v) } -// Process_output_Params_List is a list of Process_output_Params. -type Process_output_Params_List = capnp.StructList[Process_output_Params] +// Error_List is a list of Error. +type Error_List = capnp.StructList[Error] -// NewProcess_output_Params creates a new list of Process_output_Params. -func NewProcess_output_Params_List(s *capnp.Segment, sz int32) (Process_output_Params_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 2}, sz) - return capnp.StructList[Process_output_Params](l), err +// NewError creates a new list of Error. +func NewError_List(s *capnp.Segment, sz int32) (Error_List, error) { + l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 8, PointerCount: 1}, sz) + return capnp.StructList[Error](l), err } -// Process_output_Params_Future is a wrapper for a Process_output_Params promised by a client call. -type Process_output_Params_Future struct{ *capnp.Future } +// Error_Future is a wrapper for a Error promised by a client call. +type Error_Future struct{ *capnp.Future } -func (f Process_output_Params_Future) Struct() (Process_output_Params, error) { +func (f Error_Future) Struct() (Error, error) { p, err := f.Future.Ptr() - return Process_output_Params(p.Struct()), err -} -func (p Process_output_Params_Future) Stdout() iostream.Stream { - return iostream.Stream(p.Future.Field(0, nil).Client()) -} - -func (p Process_output_Params_Future) Stderr() iostream.Stream { - return iostream.Stream(p.Future.Field(1, nil).Client()) -} - -type Process_output_Results capnp.Struct - -// Process_output_Results_TypeID is the unique identifier for the type Process_output_Results. -const Process_output_Results_TypeID = 0xeafb60603769c851 - -func NewProcess_output_Results(s *capnp.Segment) (Process_output_Results, error) { - st, err := capnp.NewStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Process_output_Results(st), err -} - -func NewRootProcess_output_Results(s *capnp.Segment) (Process_output_Results, error) { - st, err := capnp.NewRootStruct(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}) - return Process_output_Results(st), err -} - -func ReadRootProcess_output_Results(msg *capnp.Message) (Process_output_Results, error) { - root, err := msg.Root() - return Process_output_Results(root.Struct()), err -} - -func (s Process_output_Results) String() string { - str, _ := text.Marshal(0xeafb60603769c851, capnp.Struct(s)) - return str -} - -func (s Process_output_Results) EncodeAsPtr(seg *capnp.Segment) capnp.Ptr { - return capnp.Struct(s).EncodeAsPtr(seg) -} - -func (Process_output_Results) DecodeFromPtr(p capnp.Ptr) Process_output_Results { - return Process_output_Results(capnp.Struct{}.DecodeFromPtr(p)) -} - -func (s Process_output_Results) ToPtr() capnp.Ptr { - return capnp.Struct(s).ToPtr() -} -func (s Process_output_Results) IsValid() bool { - return capnp.Struct(s).IsValid() -} - -func (s Process_output_Results) Message() *capnp.Message { - return capnp.Struct(s).Message() -} - -func (s Process_output_Results) Segment() *capnp.Segment { - return capnp.Struct(s).Segment() -} - -// Process_output_Results_List is a list of Process_output_Results. -type Process_output_Results_List = capnp.StructList[Process_output_Results] - -// NewProcess_output_Results creates a new list of Process_output_Results. -func NewProcess_output_Results_List(s *capnp.Segment, sz int32) (Process_output_Results_List, error) { - l, err := capnp.NewCompositeList(s, capnp.ObjectSize{DataSize: 0, PointerCount: 0}, sz) - return capnp.StructList[Process_output_Results](l), err + return Error(p.Struct()), err } +func (p Error_Future) ExitErr() Error_exitErr_Future { return Error_exitErr_Future{p.Future} } -// Process_output_Results_Future is a wrapper for a Process_output_Results promised by a client call. -type Process_output_Results_Future struct{ *capnp.Future } +// Error_exitErr_Future is a wrapper for a Error_exitErr promised by a client call. +type Error_exitErr_Future struct{ *capnp.Future } -func (f Process_output_Results_Future) Struct() (Process_output_Results, error) { +func (f Error_exitErr_Future) Struct() (Error_exitErr, error) { p, err := f.Future.Ptr() - return Process_output_Results(p.Struct()), err -} - -const schema_9a51e53177277763 = "x\xda\x8cToH$u\x18~\xdf\xf9+\xe8\"?" + - "W$\x0d\xda\x0f\xadT\xd6\xfa7\xa8\x16eGA\x04" + - "\xbf\xb4#\xf4\xd1\xdaq\xddp\xc8f\xa6\xf9\xc3&%" + - "%db A!\x91\x11\xd9\x87\xc5>di\x19Q" + - "!Q\x10\xd4\xb7n\xef\x8f\xe8\x1dw\xa0\x1c\x1c~\x90" + - "\xe3\xe0>\x1c\xf7a\x8e\xdf\x8c3;\xe7\xde\xae\xf7m" + - "w\x9ew\x9e\xf7}\x9e\xf7y\xa7\xf7}F\xe2\xfab" + - "o4\x02#\x7f\xc4\x0b\xee[\xd2nGl\xf8\xe8c" + - " \x1d\x08\xc0\x89\x00\x03\xab|\x1a\x81s\x9f\xcf\xad\x7f" + - "3j\xe7\xbe\xf6\x11\x1e)4O!\x8c/\xf2\x19@" + - "\xb7\xec,\x94~\x7f\xbd\xfbG -\xac\x9b/>S" + - "\xec\xbb)\xaf\x01`\xbc\xc4\x1f\xc4\xb7y\x11 \xbe\xc9" + - "\x8f\xc5/\xd3_n\xee\x93\xc5\xec\xfep\xe7\xaf\x91>" + - "\xbb~\x9f\xaf\xfe/ol\xb5\xbd\xfa\x07\x90'\xc3>" + - "%~\x9c\xf6\xd9\xf6\xfa\x9c|q\xc9\xfai\xa6\xef\x9f" + - "\xc8\xab\x17\xfcWo-\xfd%\xdcpz\xca\x11\xe47" + - "\xbe\x9f\"\xdf\x8f\xf5&\xd7w\xba\xf6\"\xc8\xb7\xfc\x08" + - "E\x06\xff{oj}mp?\x82,\xfb\xc8\x80\xd0" + - "\xf9\xdd\xc5\xdbO\x1fT\xe9y\x87/\xc7\xe7\xa9\x8a\x81" + - "9~\x09\xe3\xaa@\x05\xc9\xff\xaa/\xe5r\xf7\x8f#" + - "<\xb20Ny\xc6&{\xb6\xda\x7f\xd88\x89 C" + - "\x827U\xcb\xe7\x9f\xad\x1e\x0f5\xde9\x95\xcaP\xa8" + - "S\x18\xa1RSB\x11\xd0\xfdt\xe8x\xa6\xed\xda\xf2" + - "\xdd\xa8\xe7+~\xc1\xaa@\xbd0^\xbb\x9e\xda\xdc\xfb" + - ";(\xf0\x18~\xf1\x0bv=\x86\xa3\x9f\xaf6\x1c\x8e" + - "\xab\xf7\"\xcd\xdb\xc54\xc2\x9f\xaeb\xa8=\x86\xa9\xe7" + - "\xd9\x82eu\xe7\x15C3\xd2YS\xcf{\xfffu" + - "\xab\x90\xccd\x15Sy\xdb\xaaWXTT;9\x91" + - ")X\xce\xacm\xc9\x1c\xcb\x01p\x08@b\xfd\x00r" + - "\x03\x8br+\x83\x89\x82i\xea&6\x01\x83M\x80!" + - "\x1bSa\x1b}\xb7\x90wlV7\xb3\x882\xc7\xf2" + - "\x00\xa13\x18\xa4\x81\x90~`\x08/&,C)j" + - "\x12f\x11\xeb\x0d\xa6j\x86cW)\xe0\xce\xf6\xd4\xcd" + - "n\x8f/9\xe1K\x00\x88\x8a\x18\xa9\x88\xf8\xc0\xf0\x89" + - "\x91Tb\x01\x88\x04\xeaNa\xd9\xba\x11\xdas\xae\x8f" + - "\xd9\xc4\xc3\xd36=\x8aP1\xedp\xd8\xa0\xb0\xd6\x02" + - "\xcf\xabC\x8b\x1a\xfe\x84gx\x90\x13\x0c.\x85lS" + - "\xc3K\"b\x18`\x0c.\x8f|\xd9\x05\x0cY\x11\x91" + - "\x09O\x0e\x83\xcf\x03Y\xa0\x98#\"\x1b~K0\xb8" + - "1\xa2R\xceI\x11\xb9\xf0\xfe1\x888\x91)6*" + - "\"\x1f\x86\x1a\x83\x9b\"\xaf\xa4\x81!)1\xe1\xe9\x97" + - "\xb0\x99\x1a+a3\xb5M\xc2\x84\xa7V\xc2\x84\xb7t" + - "\x093\xbac{?\xa2\x19\xe1\xaaM\xf2\xcb*\xab?" + - "w\x93g\x17T;N~\xeehz\x820=\x97\x06" + - "\x90\x93,\xca\xbd\x0c\x12\xc4V\xa4\x0fS&\x80\xfc\x02" + - "\x8b\xf2\xcb\x0cf\xa6TM1\xe70\x06\x0c\xc6\x00\xdd" + - "\x82f\x9bso:\x1a$\xf2\xb6\xaakU\xf7\xc3\xd5" + - "\x0a}\xa0\xa7\xd69Z\xf6\xb4\xaa!q\x0f\x87w\xae" + - "\xc2!\x1fc\xc8\xd7\xc3" + + "\xa5\x00\x19b\xc8\x10\x16Z\xe1R\xb7\x19 K\x0cY" + + "\xc2\xa8\x0b\xefw\xf1\x06\xc7\x9e\xdf\x88\xe7^\x08\xa2n" + + "\x93\xc7\x91\xb4\xb8Ed\x81H\xe4N\x11\xc9\x0c\x87," + + "0\x14\x03\x83\x84\x99\xb1\xe7\x04\xccLhb@\xbe\xbc" + + "\x10\xd4\xbbq\xa8<@ZAq\xbaDL\x9c\xb0\xc1Fo\x0f\xc3" + + "\x7f\x03q\xcc\xd4\x0e\xdb\xc5\x84\xa0\x86\xbcq\xb2\x86\xbc" + + "\xf1j\xd7$w8\xfe\x7f3wO\xdb\xf3\x95\xed\xb7" + + "\"\xb3\xf6\x83\xb0\x8f\x9f\x1fo\xb80k\x0f@T/" + + "\x12\xc9\x0a\x87|\x92A\xbf|%\x0e\x9e\x0e\x97\x02\"" + + "B\x8e\x18r\x04\x1d\xb4cu\xc5\x0b\x1b\xc4\xdb\xf1\xc0" + + "X\x12\x98\xb5U\xb7\xbd\xbf\xaf\x06\x93\xb7\xa2\xff\x02\x00" + + "\x00\xff\xff\xa8.}\xb7" func init() { schemas.Register(schema_9a51e53177277763, - 0x86e3410d1abd406b, + 0x84fc61ad6ff505d7, 0x9d6074459fa0602b, 0xaf2e5ebaa58175d2, - 0xb72541d950858a60, 0xbb4f16b0a7d2d09b, 0xc53168b273d497ee, 0xd22f75df06c187e8, + 0xd6be5a33d8c2c538, 0xd72ab4a0243047ac, - 0xd93c9aa0627bc93c, 0xda23f0d3a8250633, - 0xeafb60603769c851, 0xeea7ae19b02f5d47, 0xf20b3dea95929312, - 0xf589dc1668ea3d8f, - 0xf5c2d7ad2dde5570, 0xf9694ae208dbb3e3) } diff --git a/internal/cmd/cluster/ls.go b/internal/cmd/cluster/ls.go index 02d7c2d6..0ed600e9 100644 --- a/internal/cmd/cluster/ls.go +++ b/internal/cmd/cluster/ls.go @@ -16,7 +16,11 @@ func list() *cli.Command { Name: "ls", Usage: "list anchor elements", Flags: []cli.Flag{ - &boolFlag, + &cli.BoolFlag{ + Name: "json", + Usage: "print results as json", + EnvVars: []string{"WW_FMT_JSON"}, + }, }, Before: setup(), Action: ls(), @@ -47,7 +51,7 @@ func render(it cluster.Iterator, consume func(routing.Record) error) error { } func formatter(c *cli.Context) func(routing.Record) error { - if c.Bool(_json) { + if c.Bool("json") { return jsonFormatter(c) } diff --git a/internal/cmd/cluster/process.go b/internal/cmd/cluster/process.go index c1f16209..9c82edc4 100644 --- a/internal/cmd/cluster/process.go +++ b/internal/cmd/cluster/process.go @@ -1,59 +1,29 @@ package cluster import ( - "bytes" - "encoding/json" - "errors" - "fmt" "io" "os" "github.com/urfave/cli/v2" - process "github.com/wetware/ww/pkg/process/client" ) -const ( - _module = "module" - _func = "function" -) - -var runError = errors.New("Run failed.") - func run() *cli.Command { return &cli.Command{ - Name: "run", - Usage: "run a WASM module", - Flags: []cli.Flag{ - &cli.StringFlag{ - Name: _module, - Aliases: []string{"m"}, - Usage: "path to the file containing compiled WASM module", - Required: true, - }, - &cli.StringFlag{ - Name: _func, - Aliases: []string{"f"}, - Usage: "name of the function to run within the WASM module", - Required: true, - }, - &boolFlag, - }, - Before: setup(), - After: teardown(), - Action: runAction(), + Name: "run", + Usage: "compile and run a WASM module", + ArgsUsage: " (defaults to stdin)", + Before: setup(), + After: teardown(), + Action: runAction(), } } func runAction() cli.ActionFunc { return func(c *cli.Context) error { - - var stdin io.Reader = os.Stdin - var stdout, stderr io.Writer - ctx := c.Context + // Load the name of the entry function and the WASM file containing the module to run - entryFunction := c.String(_func) - binary, err := os.ReadFile(c.String(_module)) + src, err := bytecode(c) if err != nil { return err } @@ -61,64 +31,23 @@ func runAction() cli.ActionFunc { // Obtain an executor and spawn a process executor, release := node.Executor(ctx) defer release() - proc := process.MakeProcess(ctx, logger, executor, binary, entryFunction) - defer proc.Close(ctx) - - // Select the output - if c.Bool(_json) { - stdout = new(bytes.Buffer) - stderr = new(bytes.Buffer) - } else { - stdout = os.Stdout - stderr = os.Stderr - } - // Run the process - errs := proc.Run(ctx, stdin, stdout, stderr) + proc, release := executor.Spawn(ctx, src) + defer release() - // Output the results - if c.Bool(_json) { - err = outputToJSON(stdout.(*bytes.Buffer), stderr.(*bytes.Buffer), errs) - } else { - err = outputToLog(errs) + if err := proc.Start(ctx); err != nil { + return err } + defer proc.Stop(ctx) - return err + return proc.Wait(ctx) } } -type results struct { - Stdout string `json:"stdout"` - Stderr string `json:"stderr"` - Errs []string `json:"errors"` -} - -func outputToJSON(output *bytes.Buffer, errorOutput *bytes.Buffer, errs []error) error { - var err error - errStrings := make([]string, len(errs)) - for i, e := range errs { - errStrings[i] = e.Error() - } - results := results{ - Stdout: output.String(), - Stderr: errorOutput.String(), - Errs: errStrings, +func bytecode(c *cli.Context) ([]byte, error) { + if c.Args().Len() > 0 { + return os.ReadFile(c.Args().First()) // file path } - content, err := json.Marshal(results) - if err != nil { - return err - } - fmt.Println(string(content)) - return nil -} -func outputToLog(errs []error) error { - var err error - if errs != nil && len(errs) > 0 { - for _, err := range errs { - logger.Error(err) - } - err = runError - } - return err + return io.ReadAll(c.App.Reader) // stdin } diff --git a/internal/cmd/cluster/shared_options.go b/internal/cmd/cluster/shared_options.go deleted file mode 100644 index 85d459e1..00000000 --- a/internal/cmd/cluster/shared_options.go +++ /dev/null @@ -1,15 +0,0 @@ -package cluster - -import ( - "github.com/urfave/cli/v2" -) - -const _json = "json" - -var ( - boolFlag = cli.BoolFlag{ - Name: _json, - Usage: "print results as json", - EnvVars: []string{"WW_FMT_JSON"}, - } -) diff --git a/pkg/client/client.go b/pkg/client/client.go index 880a1854..5ec72621 100644 --- a/pkg/client/client.go +++ b/pkg/client/client.go @@ -9,8 +9,8 @@ import ( casm "github.com/wetware/casm/pkg" "github.com/wetware/casm/pkg/cluster" "github.com/wetware/casm/pkg/debug" - "github.com/wetware/ww/internal/api/process" "github.com/wetware/ww/pkg/host" + "github.com/wetware/ww/pkg/process" "github.com/wetware/ww/pkg/pubsub" ) diff --git a/pkg/host/host.go b/pkg/host/host.go index d7f11e20..7c84398b 100644 --- a/pkg/host/host.go +++ b/pkg/host/host.go @@ -15,6 +15,7 @@ import ( process_api "github.com/wetware/ww/internal/api/process" pubsub_api "github.com/wetware/ww/internal/api/pubsub" "github.com/wetware/ww/pkg/anchor" + "github.com/wetware/ww/pkg/process" "github.com/wetware/ww/pkg/pubsub" ) @@ -58,9 +59,9 @@ func (h Host) Debug(ctx context.Context) (debug.Debugger, capnp.ReleaseFunc) { return debug.Debugger(f.Debugger()), release } -func (h Host) Executor(ctx context.Context) (process_api.Executor, capnp.ReleaseFunc) { +func (h Host) Executor(ctx context.Context) (process.Executor, capnp.ReleaseFunc) { f, release := api.Host(h).Executor(ctx, nil) - return f.Executor(), release + return process.Executor(f.Executor()), release } /*---------------------------* @@ -86,7 +87,7 @@ type DebugProvider interface { } type ExecutorProvider interface { - Executor() process_api.Executor + Executor() process.Executor } // Server provides the Host capability. @@ -146,8 +147,8 @@ func (s Server) Debug(_ context.Context, call api.Host_debug) error { func (s Server) Executor(_ context.Context, call api.Host_executor) error { res, err := call.AllocResults() if err == nil { - executor := s.ExecutorProvider.Executor() - err = res.SetExecutor(executor) + e := s.ExecutorProvider.Executor() + err = res.SetExecutor(process_api.Executor(e)) } return err } diff --git a/pkg/process/client/process.go b/pkg/process/client/process.go deleted file mode 100644 index 1f244159..00000000 --- a/pkg/process/client/process.go +++ /dev/null @@ -1,196 +0,0 @@ -package client - -import ( - "context" - "errors" - "io" - - capnp "capnproto.org/go/capnp/v3" - "github.com/lthibault/log" - - iostream_api "github.com/wetware/ww/internal/api/iostream" - api "github.com/wetware/ww/internal/api/process" - "github.com/wetware/ww/pkg/iostream" -) - -// Process is the local implementation of the process used to interact with -// an underlying process capability. -type Process struct { - capability api.Process // underlying process capability - logger log.Logger - releases []capnp.ReleaseFunc // pending release functions - - outputDone chan error -} - -// MakeProcess is the default constructor for Process. -func MakeProcess(ctx context.Context, logger log.Logger, executor api.Executor, binary []byte, entryFunction string) *Process { - future, release := executor.Spawn(ctx, func(e api.Executor_spawn_Params) error { - var err error - - if err = e.SetBinary(binary); err != nil { - return err - } - return e.SetEntryfunction(entryFunction) - }) - - proc := &Process{ - logger: logger, - capability: future.Process(), - releases: make([]capnp.ReleaseFunc, 0), - - outputDone: make(chan error, 1), - } - - proc.addRelease(release) - - return proc -} - -// addRelease adds a release function to the list of pending releases -// that are called by p.release(). -func (p *Process) addRelease(release capnp.ReleaseFunc) { - p.releases = append(p.releases, release) -} - -// bindOutput binds the provided output to the stream that will be provided -// by the server process, as well as notifying either p.outputDone or p.outputErr. -func (p *Process) bindOutput(ctx context.Context, output io.Writer, errorOutput io.Writer) { - select { - case p.outputDone <- p.provideOutput(ctx, output, errorOutput): - break - case <-ctx.Done(): - break - } -} - -// Cap returns the underlying process capability. -func (p *Process) Cap() api.Process { - return p.capability -} - -// in returns the input stream of the process capability -func (p *Process) in(ctx context.Context) (iostream_api.Stream, error) { - f, release := p.capability.Input(ctx, nil) - p.addRelease(release) - if err := waitForFuncOrCancel(ctx, f.Done); err != nil { - return iostream_api.Stream{}, err - } - return f.Stdin(), nil -} - -// provideInput to the remote process server. -func (p *Process) provideInput(ctx context.Context, input io.Reader) error { - inputProvider := iostream_api.Provider(iostream.NewProvider(input)) - inputStream, err := p.in(ctx) - if err != nil { - return err - } - f, release := inputProvider.Provide(ctx, func(p iostream_api.Provider_provide_Params) error { - return p.SetStream(inputStream) - }) - defer release() - if err = waitForFuncOrCancel(ctx, f.Done); err != nil { - return err - } - return nil -} - -// provideOutput calls the remote process.Output providing method, and waits for it to finish. -func (p *Process) provideOutput(ctx context.Context, stdout io.Writer, stderr io.Writer) error { - outputStream := iostream.New(stdout) - defer outputStream.Close(ctx) // It should be closed by the provider, but just in case - errorStream := iostream.New(stderr) - defer errorStream.Close(ctx) - f, release := p.capability.Output(ctx, func(params api.Process_output_Params) error { - if err := params.SetStdout(iostream_api.Stream(outputStream)); err != nil { - return err - } - return params.SetStderr(iostream_api.Stream(errorStream)) - }) - p.addRelease(release) - - return waitForFuncOrCancel(ctx, f.Done) -} - -// release calls every pending release function. -func (p *Process) release() { - for _, release := range p.releases { - defer release() - } -} - -// Run the process with the given input and wait for it to finish. -func (p *Process) Run(ctx context.Context, input io.Reader, stdout io.Writer, stderr io.Writer) []error { - if err := p.Start(ctx, input, stdout, stderr); err != nil { - return []error{err} - } - return p.Wait(ctx) -} - -// Start the process by binding the remote and local outputs as well as calling -// the Start method of the process capability. -func (p *Process) Start(ctx context.Context, stdin io.Reader, stdout io.Writer, stderr io.Writer) error { - if stdin == nil || stdout == nil { - return errors.New("Process input and output cannot be nil.") - } - go p.provideInput(ctx, stdin) - go p.bindOutput(ctx, stdout, stderr) - - start, release := p.capability.Start(ctx, nil) - p.addRelease(release) - return waitForFuncOrCancel(ctx, start.Done) -} - -// waitForOutput waits until all the output from the remote process run is received. -func (p *Process) waitForOutput(ctx context.Context) error { - var err error - select { - case err = <-p.outputDone: - break - case <-ctx.Done(): - err = ctx.Err() - break - } - - return err -} - -// waitForProcess waits for the process to finish running. -func (p *Process) waitForProcess(ctx context.Context) error { - wait, release := p.capability.Wait(ctx, nil) - p.addRelease(release) - return waitForFuncOrCancel(ctx, wait.Done) -} - -// Wait until the process finishes and all I/O operations are finished. -// Returns the actual error output produced by the process and a slice of wetware errors. -func (p *Process) Wait(ctx context.Context) []error { - errs := make([]error, 0) - if err := p.waitForProcess(ctx); err != nil { - errs = append(errs, err) - } - if err := p.waitForOutput(ctx); err != nil { - errs = append(errs, err) - } - return errs -} - -// Close the process by calling all pending releases and closing the underlying -// process capability. -func (p *Process) Close(ctx context.Context) { - p.release() - p.capability.Close(ctx, nil) -} - -// waitForFuncOrCancel waits until either the channel returned by the function -// produces a value or the context ends. It returns nil in the former case and -// the cause of the context cancelation in the latter. -func waitForFuncOrCancel(ctx context.Context, function func() <-chan struct{}) error { - select { - case <-ctx.Done(): - return ctx.Err() - case <-function(): - return nil - } -} diff --git a/pkg/process/errors.go b/pkg/process/errors.go new file mode 100644 index 00000000..f658e69a --- /dev/null +++ b/pkg/process/errors.go @@ -0,0 +1,9 @@ +package process + +type Error struct { + Message string +} + +func (err Error) Error() string { + return err.Message +} diff --git a/pkg/process/errors/errors.go b/pkg/process/errors/errors.go deleted file mode 100644 index 021415e1..00000000 --- a/pkg/process/errors/errors.go +++ /dev/null @@ -1,20 +0,0 @@ -package error - -import ( - "errors" - "fmt" -) - -var Nil = errors.New("") - -var errorMap = map[string]error{ - Nil.Error(): Nil, -} - -func FromString(errString string) error { - err, found := errorMap[errString] - if !found { - err = fmt.Errorf("Undefined: %s", errString) - } - return err -} diff --git a/pkg/process/executor.go b/pkg/process/executor.go index b33522d7..21e0d684 100644 --- a/pkg/process/executor.go +++ b/pkg/process/executor.go @@ -2,111 +2,111 @@ package process import ( "context" + "encoding/hex" "fmt" capnp "capnproto.org/go/capnp/v3" - "github.com/lthibault/log" "github.com/tetratelabs/wazero" - "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + "lukechampine.com/blake3" + wasm "github.com/tetratelabs/wazero/api" api "github.com/wetware/ww/internal/api/process" ) -const start = "_start" // start function called in every WASM module +// ByteCode is a representation of arbitrary executable data. +type ByteCode []byte -// Executor contains a WASM runtime and can spawn processes in it. -type Executor struct { - logger log.Logger - runtime wazero.Runtime +// Hash returns the BLAKE3-256 hash of the byte code. It is +// suitbale for use as a secure checksum. +func (b ByteCode) Hash() [32]byte { + return blake3.Sum256(b) } -// Executor provides the Executor capability. -func (e Executor) Executor() api.Executor { - return api.Executor_ServerToClient(e) +// Executor is a capability that can spawn processes. +type Executor api.Executor + +func (ex Executor) AddRef() Executor { + return Executor(capnp.Client(ex).AddRef()) } -// NewExecutor is the default constructor for Executor. -func NewExecutor(ctx context.Context, logger log.Logger) Executor { - r := wazero.NewRuntimeWithConfig(ctx, wazero.NewRuntimeConfig()) - wasi_snapshot_preview1.MustInstantiate(ctx, r) +func (ex Executor) Release() { + capnp.Client(ex).Release() +} - return Executor{logger: logger, runtime: r} +func (ex Executor) Spawn(ctx context.Context, src []byte) (Proc, capnp.ReleaseFunc) { + f, release := api.Executor(ex).Spawn(ctx, func(ps api.Executor_spawn_Params) error { + return ps.SetByteCode(src) + }) + return Proc(f.Process()), release } -// Close the executor runtime. Spawned processes should be inidividually closed -// calling Process.Close(). -func (e Executor) Close(ctx context.Context) error { - return e.runtime.Close(ctx) +// Server is the main Executor implementation. It spawns WebAssembly- +// based processes. The zero-value Server panics. +type Server struct { + Runtime wazero.Runtime +} + +// Executor provides the Executor capability. +func (wx Server) Executor() Executor { + return Executor(api.Executor_ServerToClient(wx)) } // Spawn a process by creating a process server and converting it into // a capability as a response to the call. -func (e Executor) Spawn(ctx context.Context, call api.Executor_spawn) error { - binary, err := call.Args().Binary() - if err != nil { - return err - } - entryFunction, err := call.Args().Entryfunction() +func (wx Server) Spawn(ctx context.Context, call api.Executor_spawn) error { + res, err := call.AllocResults() if err != nil { return err } - proc, err := e.spawnProcess(ctx, binary, entryFunction) + + mod, err := wx.loadModule(ctx, call.Args()) if err != nil { return err } - res, err := call.AllocResults() - if err != nil { - return err + + p, err := wx.mkproc(ctx, mod, call.Args()) + if err == nil { + err = res.SetProcess(api.Process_ServerToClient(p)) } - err = res.SetProcess(api.Process_ServerToClient(proc)) return err } -// spawnProcess creates and returns a Process that will run in e.runtime. -func (e Executor) spawnProcess(ctx context.Context, binary []byte, entryFunction string) (*Process, error) { - modId := moduleId(binary) + randomId() // TODO mikel - procIo := newIo() +func (wx Server) mkproc(ctx context.Context, mod wasm.Module, args api.Executor_spawn_Params) (*process, error) { + name, err := args.EntryPoint() + if err != nil { + return nil, err + } - config := wazero. - NewModuleConfig(). - WithName(modId). - WithStdin(procIo.inR). - WithStdout(procIo.outW). - WithStderr(procIo.errW) - - instance := e.runtime.Module(modId) - if instance == nil { - module, err := e.runtime.CompileModule(ctx, binary) - if err != nil { - return nil, err - } - instance, err = e.runtime.InstantiateModule(ctx, module, config) - if err != nil { - return nil, err - } - // instance.ExportedFunction(start).Call(ctx) + var proc process + if proc.fn = mod.ExportedFunction(name); proc.fn == nil { + err = fmt.Errorf("module %s: %s not found", mod.Name(), name) } - function := instance.ExportedFunction(entryFunction) - if function == nil { - return nil, fmt.Errorf("function %s not found in module %s", entryFunction, modId) + return &proc, err +} + +func (wx Server) loadModule(ctx context.Context, args api.Executor_spawn_Params) (wasm.Module, error) { + bc, err := args.ByteCode() + if err != nil { + return nil, err } - runContext, runCancel := context.WithCancel(context.TODO()) + hash := ByteCode(bc).Hash() + name := hex.EncodeToString(hash[:]) + + config := wazero. + NewModuleConfig(). + WithName(name) - proc := Process{ - function: function, - id: processId(modId, entryFunction), - io: procIo, - logger: e.logger, - releaseFuncs: make([]capnp.ReleaseFunc, 0), + if mod := wx.Runtime.Module(name); mod != nil { + return mod, nil + } - exitWaiters: make([]chan struct{}, 0), - runDone: make(chan error, 1), - runContext: runContext, - runCancel: runCancel, + module, err := wx.Runtime.CompileModule(ctx, bc) + if err != nil { + return nil, err } - return &proc, nil + return wx.Runtime.InstantiateModule(ctx, module, config) } diff --git a/pkg/process/executor_test.go b/pkg/process/executor_test.go new file mode 100644 index 00000000..9ab3141c --- /dev/null +++ b/pkg/process/executor_test.go @@ -0,0 +1,47 @@ +package process_test + +import ( + "context" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" + "github.com/tetratelabs/wazero/sys" + "github.com/wetware/ww/pkg/process" +) + +func TestExecutor(t *testing.T) { + t.Parallel() + + r := wazero.NewRuntime(context.Background()) + wasi_snapshot_preview1.MustInstantiate(context.Background(), r) + + exec := process.Server{Runtime: r}.Executor() + defer exec.Release() + + proc, release := exec.Spawn(context.Background(), testdata()) + defer release() + + err := proc.Start(context.Background()) + require.NoError(t, err, "should start process") + + err = proc.Wait(context.Background()) + require.Error(t, err, "should return an error from process") + + ee, ok := err.(*sys.ExitError) + require.True(t, ok, "should return sys.ExitError") + assert.Equal(t, uint32(99), ee.ExitCode()) + assert.NotEmpty(t, ee.ModuleName()) +} + +func testdata() []byte { + b, err := os.ReadFile("testdata/main.wasm") + if err != nil { + panic(err) + } + + return b +} diff --git a/pkg/process/io.go b/pkg/process/io.go deleted file mode 100644 index a023b1a7..00000000 --- a/pkg/process/io.go +++ /dev/null @@ -1,50 +0,0 @@ -package process - -import ( - "context" - "io" - - iostream_api "github.com/wetware/ww/internal/api/iostream" - "github.com/wetware/ww/pkg/iostream" -) - -// processIo constains all the required components for a process to -// run. -type processIo struct { - inR *io.PipeReader - inW *io.PipeWriter - outR *io.PipeReader - outW *io.PipeWriter - errR *io.PipeReader - errW *io.PipeWriter - - in iostream_api.Stream - out iostream_api.Provider - err iostream_api.Provider -} - -// newIo is the default constructor fo Io. -func newIo() processIo { - inR, inW := io.Pipe() - outR, outW := io.Pipe() - errR, errW := io.Pipe() - - return processIo{ - inR: inR, - inW: inW, - outR: outR, - outW: outW, - errR: errR, - errW: errW, - - in: iostream_api.Stream(iostream.New(inW)), - out: iostream_api.Provider(iostream.NewProvider(outR)), - err: iostream_api.Provider(iostream.NewProvider(errR)), - } -} - -func (pio processIo) closeWriters(ctx context.Context) { - defer pio.outW.Close() - defer pio.errW.Close() - defer pio.in.Close(ctx, nil) -} diff --git a/pkg/process/process.go b/pkg/process/process.go index 99c09791..724999cc 100644 --- a/pkg/process/process.go +++ b/pkg/process/process.go @@ -2,206 +2,191 @@ package process import ( "context" - "crypto/md5" - "encoding/hex" + "errors" "fmt" - "math/rand" - "time" + "sync/atomic" capnp "capnproto.org/go/capnp/v3" - "github.com/lthibault/log" - wazero_api "github.com/tetratelabs/wazero/api" + wasm "github.com/tetratelabs/wazero/api" + "github.com/tetratelabs/wazero/sys" - iostream_api "github.com/wetware/ww/internal/api/iostream" + casm "github.com/wetware/casm/pkg" api "github.com/wetware/ww/internal/api/process" - proc_errors "github.com/wetware/ww/pkg/process/errors" ) -// Process represents the execution of a function in a WASM module. -type Process struct { - function wazero_api.Function // entry function - id string // process id - io processIo - logger log.Logger - - exitWaiters []chan struct{} // list of channels waiting for process exit - releaseFuncs []capnp.ReleaseFunc // pending releases - runCancel context.CancelFunc // cancellation call for runContext - runContext context.Context // context for the process runtime - runDone chan error // channel containing result of run -} +var ( + ErrRunning = errors.New("running") + ErrNotStarted = errors.New("not started") +) -// addExitWaiter returns a channel that will produce a value when -// the proces exits. -func (p *Process) addExitWaiter() chan struct{} { - exit := make(chan struct{}, 1) - p.exitWaiters = append(p.exitWaiters, exit) - return exit -} +type Proc api.Process -// addRelease adds a release function to the pending releases. -func (p *Process) addRelease(release capnp.ReleaseFunc) { - p.releaseFuncs = append(p.releaseFuncs, release) +func (p Proc) AddRef() Proc { + return Proc(api.Process(p).AddRef()) } -// Close should always be called after the process is done. -func (p *Process) Close(ctx context.Context, call api.Process_close) error { - return p.close(ctx) -} +func (p Proc) Release() { + capnp.Client(p).Release() -// close performs any missing cleanup operation including potentially -// cancelling a running process, notifying all exitWaiters and calling -// pending release functions. -func (p *Process) close(ctx context.Context) error { - defer p.release() - defer p.exit(ctx) - defer p.runCancel() - return nil } -// exit will notify all exit waiters. -func (p *Process) exit(ctx context.Context) { - for _, exit := range p.exitWaiters { - select { - case exit <- struct{}{}: - continue - case <-ctx.Done(): - return - } - } -} +func (p Proc) Start(ctx context.Context) error { + f, release := api.Process(p).Start(ctx, nil) + defer release() -// Id of the process. -func (p *Process) Id() string { - return p.id + return casm.Future(f).Await(ctx) } -// Input returns the input stream of the process. -func (p *Process) Input(ctx context.Context, call api.Process_input) error { - results, err := call.AllocResults() - if err != nil { - return err - } - err = results.SetStdin(iostream_api.Stream(p.io.in)) - return err -} +func (p Proc) Stop(ctx context.Context) error { + f, release := api.Process(p).Stop(ctx, nil) + defer release() -// Output provides an stream with the process output and returns the -// contents of the process stderr after it finishes. -func (p *Process) Output(ctx context.Context, call api.Process_output) error { - var err error + return casm.Future(f).Await(ctx) +} - call.Go() - outputStream := call.Args().Stdout() - f, release := p.io.out.Provide(ctx, func(p iostream_api.Provider_provide_Params) error { - return p.SetStream(outputStream) - }) +func (p Proc) Wait(ctx context.Context) error { + f, release := api.Process(p).Wait(ctx, nil) defer release() - errorStream := call.Args().Stderr() - f, release = p.io.err.Provide(ctx, func(p iostream_api.Provider_provide_Params) error { - return p.SetStream(errorStream) - }) - defer release() + res, err := f.Struct() + if err != nil { + return err + } - exit := p.addExitWaiter() - select { - case <-ctx.Done(): - err = ctx.Err() - break - case <-exit: - case <-f.Done(): - break + e, err := res.Error() + if err != nil { + return err } - return err -} + switch e.Which() { + case api.Error_Which_none: + return nil -// run the process and write to p.runDone after it is done. -func (p *Process) run(ctx context.Context) { - var err error - - // send the signal after the process finishes running - defer func() { - select { - case p.runDone <- err: - break - case <-ctx.Done(): - p.runCancel() + case api.Error_Which_exitErr: + mod, err := e.ExitErr().Module() + if err != nil { + return err } - }() - - defer p.io.closeWriters(ctx) + return sys.NewExitError(mod, e.ExitErr().Code()) + } - _, err = p.function.Call(p.runContext) + return fmt.Errorf("unknown error type: %d", e.Which()) } -// release calls all pending release functions. -func (p *Process) release() { - for _, releaseFunc := range p.releaseFuncs { - defer releaseFunc() - } +// process is the main implementation of the Process capability. +type process struct { + fn wasm.Function + handle procHandle } // Stop calls the runtime cancellation function. -func (p *Process) Stop(ctx context.Context, call api.Process_stop) error { - p.runCancel() - return nil +func (p *process) Stop(context.Context, api.Process_stop) error { + state := p.handle.Load() + if state.Err == nil { + state.Cancel() + } + + return state.Err } // Start the process in the background. -func (p *Process) Start(ctx context.Context, call api.Process_start) error { - go p.run(ctx) +func (p *process) Start(_ context.Context, call api.Process_start) error { + state := p.handle.Load() + if state.Err != ErrNotStarted { + return state.Err + } + + p.handle.Exec(p.fn) return nil } // Wait for the process to finish running. -func (p *Process) Wait(ctx context.Context, call api.Process_wait) error { +func (p *process) Wait(ctx context.Context, call api.Process_wait) error { + state := p.handle.Load() + if state.Err == ErrNotStarted { + return state.Err + } + results, err := call.AllocResults() if err != nil { - err = p.wait(ctx) - if err == nil { - results.SetError(proc_errors.Nil.Error()) - } else { - results.SetError(err.Error()) - } + return err } - return err -} -// wait for the process to finish running. -func (p *Process) wait(ctx context.Context) error { - var err error + call.Go() + select { - case err = <-p.runDone: - break case <-ctx.Done(): - break + return ctx.Err() + case <-state.Ctx.Done(): + return p.handle.Bind(results) } - return err } -// moduleId retuns a shortened md5hash of the module -func moduleId(binary []byte) string { - hash := md5.Sum(binary) - return hex.EncodeToString(hash[:])[:6] +// procHandle encapsulates all the runtime state of a process. Its +// methods are safe for concurrent access. +type procHandle atomic.Pointer[state] + +// Exec sets the current state to ErrRunning, calls the function, and +// then sets the current state to the resulting error. +func (as *procHandle) Exec(fn wasm.Function) { + ctx, cancel := context.WithCancel(context.Background()) + + // set "running" state + (*atomic.Pointer[state])(as).Store(&state{ + Ctx: ctx, + Cancel: cancel, + Err: ErrRunning, + }) + + go func() { + defer cancel() + + // block until function call completes + _, err := fn.Call(ctx) + + // call entrypoint function & set "finished" state + (*atomic.Pointer[state])(as).Store(&state{ + Ctx: ctx, + Cancel: cancel, + Err: err, + }) + }() } -// processId returns a unique ID for a module -func processId(moduleId string, funcName string) string { - return fmt.Sprintf("%s:%s", moduleId, funcName) +// Bind the error from the entrypoint function to the results struct. +// Callers MUST NOT call Bind until the function has returned. +func (as *procHandle) Bind(res api.Process_wait_Results) error { + state := as.Load() + if state.Err == nil { + return nil + } + + e, err := res.NewError() + if err != nil { + return err + } + + ee := state.Err.(*sys.ExitError) + e.SetExitErr() + e.ExitErr().SetCode(ee.ExitCode()) + return e.ExitErr().SetModule(ee.ModuleName()) } -// randomId produces a 6 character random string -func randomId() string { - rand.Seed(time.Now().Unix()) - charset := "abcdefghijklmnopqrstuvwxyz" - length := 6 +// Load the current state atomically. The resulting resulting state +// defaults to ErrNotStarted. +func (as *procHandle) Load() state { + if s := (*atomic.Pointer[state])(as).Load(); s != nil { + return *s + } - id := make([]byte, length) - for i := 0; i < length; i++ { - id[i] = charset[rand.Intn(len(charset))] + return state{ + Cancel: func() {}, + Err: ErrNotStarted, } +} - return string(id) +type state struct { + Ctx context.Context + Cancel context.CancelFunc + Err error } diff --git a/pkg/process/testdata/main.go b/pkg/process/testdata/main.go new file mode 100644 index 00000000..50622ece --- /dev/null +++ b/pkg/process/testdata/main.go @@ -0,0 +1,14 @@ +package main + +import "os" + +/* + build with: tinygo build -o pkg/process/testdata/main.wasm -target=wasi -scheduler=none pkg/process/testdata/main.go +*/ + +//export run +func run() { + os.Exit(99) +} + +func main() {} diff --git a/pkg/process/testdata/main.wasm b/pkg/process/testdata/main.wasm new file mode 100755 index 00000000..457ee8b8 Binary files /dev/null and b/pkg/process/testdata/main.wasm differ diff --git a/pkg/process/testdata/radio.go b/pkg/process/testdata/radio.go deleted file mode 100644 index 3c56ef2e..00000000 --- a/pkg/process/testdata/radio.go +++ /dev/null @@ -1,42 +0,0 @@ -package main - -import ( - "io" - "os" -) - -var _ping = []byte("Ping\n") - -//export echo -func echo() { - io.Copy(io.MultiWriter(os.Stdout, os.Stderr), os.Stdin) -} - -//export echoErr -func echoErr() { - io.Copy(os.Stderr, os.Stdin) -} - -//export echoOut -func echoOut() { - io.Copy(os.Stdout, os.Stdin) -} - -//export ping -func ping() { - io.MultiWriter(os.Stdout, os.Stderr).Write(_ping) -} - -//export pingErr -func pingErr() { - os.Stdout.Write(_ping) -} - -//export pingOut -func pingOut() { - os.Stdout.Write(_ping) -} - -func main() { - -} diff --git a/pkg/process/testdata/radio.wasm b/pkg/process/testdata/radio.wasm deleted file mode 100644 index f6137462..00000000 Binary files a/pkg/process/testdata/radio.wasm and /dev/null differ diff --git a/pkg/server/config.go b/pkg/server/config.go index 79de789b..d41105b9 100644 --- a/pkg/server/config.go +++ b/pkg/server/config.go @@ -6,6 +6,8 @@ import ( "go.uber.org/fx" + "github.com/tetratelabs/wazero" + "github.com/tetratelabs/wazero/imports/wasi_snapshot_preview1" casm "github.com/wetware/casm/pkg" "github.com/wetware/casm/pkg/cluster" "github.com/wetware/casm/pkg/cluster/pulse" @@ -56,6 +58,27 @@ func (rc ClusterConfig) routingTable() cluster.RoutingTable { return rc.RoutingTable } +type RuntimeConfig struct { + fx.In + + Ctx context.Context `optional:"true"` + Config wazero.RuntimeConfig `optional:"true"` +} + +func (rc RuntimeConfig) New() wazero.Runtime { + if rc.Ctx == nil { + rc.Ctx = context.Background() + } + + if rc.Config == nil { + rc.Config = wazero.NewRuntimeConfig() + } + + r := wazero.NewRuntimeWithConfig(rc.Ctx, rc.Config) + wasi_snapshot_preview1.MustInstantiate(rc.Ctx, r) + return r +} + type DebugConfig struct { fx.In diff --git a/pkg/server/server.go b/pkg/server/server.go index c8e242de..b17d8da7 100644 --- a/pkg/server/server.go +++ b/pkg/server/server.go @@ -54,6 +54,7 @@ type Joiner struct { fx.In Cluster ClusterConfig `optional:"true"` + Runtime RuntimeConfig `optional:"true"` Debugger DebugConfig `optional:"true"` } @@ -70,7 +71,7 @@ func (j Joiner) Join(vat casm.Vat, r Router) (*Node, error) { PubSubProvider: j.pubsub(vat.Logger, r), AnchorProvider: j.anchor(), DebugProvider: j.Debugger.New(), - ExecutorProvider: j.executor(vat.Logger), + ExecutorProvider: j.executor(), }) return &Node{ @@ -90,6 +91,8 @@ func (j Joiner) anchor() anchor.Server { return anchor.Root() } -func (j Joiner) executor(logger log.Logger) process.Executor { - return process.NewExecutor(context.TODO(), logger) +func (j Joiner) executor() process.Server { + return process.Server{ + Runtime: j.Runtime.New(), + } }