Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handle incoming receiverAnswer cap descriptors. #209

Merged
merged 24 commits into from
Jan 3, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
0bb07cc
Cleanup: Stop passing around a superfluous []ClientState
zenhack Dec 31, 2021
d6f577d
Add a Promise.Resolve helper method.
zenhack Dec 29, 2021
d3afa77
Attach a capnp.Promise to answer table entries.
zenhack Dec 29, 2021
f1e859d
WIP: handle receiverAnswer in recvCap.
zenhack Dec 29, 2021
f7fdaa7
Add some missing format specifiers.
zenhack Dec 29, 2021
94607e6
WIP test using receiverAnswer
zenhack Dec 29, 2021
c243a76
Fill in the rest of the test.
zenhack Dec 30, 2021
8accd1b
Remember to wrap the args in a struct.
zenhack Dec 30, 2021
79a966a
Get rid of unnecessary server.Policy.
zenhack Dec 30, 2021
e9af70b
Update comments re: my current understanding of wrt disembargos
zenhack Dec 30, 2021
b82c609
Fix wrong variable name in comments.
zenhack Dec 31, 2021
41a9e65
Support arbitrary client metadata, avoid linear seach in sendCap.
zenhack Jan 1, 2022
932cf51
Add wrappers around metadata access for export ids.
zenhack Jan 1, 2022
ad37c20
Merge branch 'client-metadata' into HEAD
zenhack Jan 1, 2022
8ff6ea0
Fix build error as a result of merge.
zenhack Jan 1, 2022
1c27e3b
Fuss with recvCap.
zenhack Jan 1, 2022
12e636d
recvCap: Return local = true for receiverAnswer
zenhack Jan 2, 2022
6cb59e8
Remember to lock metadata.
zenhack Jan 2, 2022
9565bc5
Simplify how we determine whether to embargo
zenhack Jan 2, 2022
2dd7336
Merge branch 'isLocalClient' into handle-receiverAnswer
zenhack Jan 2, 2022
20ca4d0
Add another receiverAnswer test.
zenhack Jan 2, 2022
0e79743
Merge remote-tracking branch 'origin/main' into handle-receiverAnswer
zenhack Jan 3, 2022
778d67a
Merge remote-tracking branch 'origin/main' into handle-receiverAnswer
zenhack Jan 3, 2022
e4e389e
Check for finish before ans.promise.Resolve
zenhack Jan 3, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,18 @@ func (p *Promise) Reject(e error) {
p.resolve(Ptr{}, e)
}

// Resolve resolves the promise.
//
// If e != nil, then this is equivalent to p.Reject(e).
// Otherwise, it is equivalent to p.Fulfill(r).
func (p *Promise) Resolve(r Ptr, e error) {
if e != nil {
p.Reject(e)
} else {
p.Fulfill(r)
}
}

// resolve moves p into the resolved state from unresolved or pending
// join. The caller must be holding onto p.mu.
func (p *Promise) resolve(r Ptr, e error) {
Expand Down
576 changes: 288 additions & 288 deletions internal/aircraftlib/aircraft.capnp.go

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions internal/demo/books/books.capnp.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 27 additions & 3 deletions rpc/answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ type answer struct {
// pcall is the PipelineCaller returned by RecvCall. It will be set
// to nil once results are ready.
pcall capnp.PipelineCaller
// promise is a promise wrapping pcall. It will be resolved, and
// subsequently set to nil, once the results are ready.
promise *capnp.Promise

// pcalls is added to for every pending RecvCall and subtracted from
// for every RecvCall return (delivery acknowledgement). This is used
Expand Down Expand Up @@ -109,10 +112,13 @@ func (c *Conn) newReturn(ctx context.Context) (rpccp.Return, func() error, capnp
// setPipelineCaller sets ans.pcall to pcall if the answer has not
// already returned. The caller MUST NOT be holding onto ans.c.mu
// or the sender lock.
func (ans *answer) setPipelineCaller(pcall capnp.PipelineCaller) {
//
// This also sets ans.promise to a new promise, wrapping pcall.
func (ans *answer) setPipelineCaller(m capnp.Method, pcall capnp.PipelineCaller) {
syncutil.With(&ans.c.mu, func() {
if ans.flags&resultsReady == 0 {
ans.pcall = pcall
ans.promise = capnp.NewPromise(m, pcall)
}
})
}
Expand Down Expand Up @@ -202,11 +208,12 @@ func (ans *answer) Return(e error) {
//
// The caller must be holding onto ans.c.mu and the sender lock.
// The result's capability table must have been extracted into
// ans.resultsCapTable before calling sendReturn. Only one of
// ans.resultCapTable before calling sendReturn. Only one of
// sendReturn or sendException should be called.
func (ans *answer) sendReturn() (releaseList, error) {
ans.pcall = nil
ans.flags |= resultsReady

var err error
ans.exportRefs, err = ans.c.fillPayloadCapTable(ans.results, ans.resultCapTable)
if err != nil {
Expand All @@ -218,6 +225,18 @@ func (ans *answer) sendReturn() (releaseList, error) {
case <-ans.c.bgctx.Done():
default:
fin := ans.flags&finishReceived != 0
if ans.promise != nil {
if fin {
// Can't use ans.result after a finish, but it's
// ok to return an error if the finish comes in
// before the return. Possible enhancement: use
// the cancel variant of return.
ans.promise.Reject(failedf("Received finish before return"))
} else {
ans.promise.Resolve(ans.results.Content())
}
ans.promise = nil
}
ans.c.mu.Unlock()
if err := ans.sendMsg(); err != nil {
ans.c.report(failedf("send return: %w", err))
Expand All @@ -244,13 +263,18 @@ func (ans *answer) sendReturn() (releaseList, error) {
//
// The caller must be holding onto ans.c.mu and the sender lock.
// The result's capability table must have been extracted into
// ans.resultsCapTable before calling sendException. Only one of
// ans.resultCapTable before calling sendException. Only one of
// sendReturn or sendException should be called.
func (ans *answer) sendException(e error) releaseList {
ans.err = e
ans.pcall = nil
ans.flags |= resultsReady

if ans.promise != nil {
ans.promise.Reject(e)
ans.promise = nil
}

select {
case <-ans.c.bgctx.Done():
default:
Expand Down
5 changes: 5 additions & 0 deletions rpc/internal/testcapnp/test.capnp
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,8 @@ interface PingPong {
interface StreamTest {
push @0 (data :Data) -> stream;
}

interface CapArgsTest {
call @0 (cap :Capability);
self @1 () -> (self :CapArgsTest);
}
Loading