Skip to content

Commit

Permalink
Merge branch 'main' of github.com:frumioj/go-capnproto2
Browse files Browse the repository at this point in the history
  • Loading branch information
frumioj committed Jan 28, 2022
2 parents 84a7de0 + 461296e commit 9c70849
Show file tree
Hide file tree
Showing 28 changed files with 2,338 additions and 649 deletions.
5 changes: 4 additions & 1 deletion .github/FUNDING.yml
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
github: zombiezen
github:
- lthibault
- zenhack
- zombiezen
161 changes: 95 additions & 66 deletions answer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"sync"

"capnproto.org/go/capnp/v3/internal/errors"
"capnproto.org/go/capnp/v3/internal/syncutil"
)

// A Promise holds the result of an RPC call. Only one of Fulfill,
Expand Down Expand Up @@ -50,10 +51,10 @@ type Promise struct {
// join state.
joined chan struct{}

// signals is the set of resolved channels to close on resolution.
// Has at least one element if the promise is unresolved or pending,
// nil if resolved or joined.
signals []chan<- struct{}
// signals is a list of callbacks to invoke on resolution. Has at least
// one element if the promise is unresolved or pending, nil if resolved
// or joined.
signals []func()

// caller is the hook to make pipelined calls with. Set to nil once
// the promise leaves the unresolved state.
Expand Down Expand Up @@ -104,11 +105,12 @@ func NewPromise(m Method, pc PipelineCaller) *Promise {
p := &Promise{
method: m,
resolved: resolved,
signals: []chan<- struct{}{resolved},
signals: []func(){func() { close(resolved) }},
caller: pc,
clientsRefs: 1,
}
p.ans.f.promise = p
p.ans.metadata = *NewMetadata()
return p
}

Expand Down Expand Up @@ -180,6 +182,18 @@ func (p *Promise) Reject(e error) {
p.resolve(Ptr{}, e)
}

// Resolve resolves the promise.
//
// If e != nil, then this is equivalent to p.Reject(e).
// Otherwise, it is equivalent to p.Fulfill(r).
func (p *Promise) Resolve(r Ptr, e error) {
if e != nil {
p.Reject(e)
} else {
p.Fulfill(r)
}
}

// resolve moves p into the resolved state from unresolved or pending
// join. The caller must be holding onto p.mu.
func (p *Promise) resolve(r Ptr, e error) {
Expand All @@ -193,19 +207,19 @@ func (p *Promise) resolve(r Ptr, e error) {
if p.ongoingCalls > 0 {
p.callsStopped = make(chan struct{})
}
p.mu.Unlock()
res := resolution{p.method, r, e}
for path, row := range p.clients {
t := path.transform()
for i := range row {
row[i].promise.Fulfill(res.client(t))
row[i].promise = nil
syncutil.Without(&p.mu, func() {
res := resolution{p.method, r, e}
for path, row := range p.clients {
t := path.transform()
for i := range row {
row[i].promise.Fulfill(res.client(t))
row[i].promise = nil
}
}
}
if p.callsStopped != nil {
<-p.callsStopped
}
p.mu.Lock()
if p.callsStopped != nil {
<-p.callsStopped
}
})
}

// Move p into resolved state.
Expand All @@ -216,8 +230,8 @@ func (p *Promise) resolve(r Ptr, e error) {
}
p.callsStopped = nil
p.result, p.err = r, e
for _, ch := range p.signals {
close(ch)
for _, f := range p.signals {
f()
}
p.signals = nil
}
Expand Down Expand Up @@ -247,24 +261,24 @@ traversal:
case parent.isPendingResolution():
// Wait for resolution. Next traversal iteration will be resolved.
r := parent.resolved
parent.mu.Unlock()
if p.joined == nil {
p.joined = make(chan struct{})
}
p.mu.Unlock()
<-r
p.mu.Lock()
parent.mu.Lock()
syncutil.Without(&parent.mu, func() {
if p.joined == nil {
p.joined = make(chan struct{})
}
syncutil.Without(&p.mu, func() {
<-r
})
})
case parent.isPendingJoin():
j := parent.joined
parent.mu.Unlock()
if p.joined == nil {
p.joined = make(chan struct{})
}
p.mu.Unlock()
<-j
p.mu.Lock()
parent.mu.Lock()
syncutil.Without(&parent.mu, func() {
if p.joined == nil {
p.joined = make(chan struct{})
}
syncutil.Without(&p.mu, func() {
<-j
})
})
case parent.isResolved():
r, e := parent.result, parent.err
parent.mu.Unlock()
Expand All @@ -284,9 +298,9 @@ traversal:
if p.joined == nil {
p.joined = make(chan struct{})
}
p.mu.Unlock()
<-p.callsStopped
p.mu.Lock()
syncutil.Without(&p.mu, func() {
<-p.callsStopped
})
p.callsStopped = nil
}
if p.joined != nil {
Expand Down Expand Up @@ -359,7 +373,8 @@ type PipelineCaller interface {
// An Answer is a deferred result of a client call. Conceptually, this is a
// future. It is safe to use from multiple goroutines.
type Answer struct {
f Future
f Future
metadata Metadata
}

// ErrorAnswer returns a Answer that always returns error e.
Expand All @@ -381,6 +396,7 @@ func ImmediateAnswer(m Method, s Struct) *Answer {
result: s.ToPtr(),
}
p.ans.f.promise = p
p.ans.metadata = *NewMetadata()
return &p.ans
}

Expand All @@ -389,6 +405,12 @@ func (ans *Answer) Future() *Future {
return &ans.f
}

// Metadata returns a metadata map where callers can store information
// about the answer
func (ans *Answer) Metadata() *Metadata {
return &ans.metadata
}

// Done returns a channel that is closed when the answer's call is finished.
func (ans *Answer) Done() <-chan struct{} {
return ans.f.Done()
Expand Down Expand Up @@ -445,12 +467,12 @@ traversal:
caller := p.caller
p.mu.Unlock()
ans, release := caller.PipelineSend(ctx, transform, s)
p.mu.Lock()
p.ongoingCalls--
if p.ongoingCalls == 0 && p.callsStopped != nil {
close(p.callsStopped)
}
p.mu.Unlock()
syncutil.With(&p.mu, func() {
p.ongoingCalls--
if p.ongoingCalls == 0 && p.callsStopped != nil {
close(p.callsStopped)
}
})
return ans, release
case p.isPendingResolution():
// Block new calls until resolved.
Expand Down Expand Up @@ -503,12 +525,12 @@ traversal:
caller := p.caller
p.mu.Unlock()
pcall := caller.PipelineRecv(ctx, transform, r)
p.mu.Lock()
p.ongoingCalls--
if p.ongoingCalls == 0 && p.callsStopped != nil {
close(p.callsStopped)
}
p.mu.Unlock()
syncutil.With(&p.mu, func() {
p.ongoingCalls--
if p.ongoingCalls == 0 && p.callsStopped != nil {
close(p.callsStopped)
}
})
return pcall
case p.isPendingResolution():
// Block new calls until resolved.
Expand Down Expand Up @@ -589,9 +611,9 @@ traversal:
switch {
case p.isPendingJoin():
j := p.joined
p.mu.Unlock()
<-j
p.mu.Lock()
syncutil.Without(&p.mu, func() {
<-j
})
case p.isJoined():
q := p.next
p.mu.Unlock()
Expand All @@ -608,7 +630,7 @@ traversal:
if row := p.clients[cpath]; len(row) > 0 {
return row[0].client
}
c, pr := NewPromisedClient(pipelineClient{
c, pr := NewPromisedClient(PipelineClient{
p: p,
transform: ft,
})
Expand All @@ -619,9 +641,9 @@ traversal:
p.mu.Unlock()
return c
case p.isPendingResolution():
p.mu.Unlock()
<-p.resolved
p.mu.Lock()
syncutil.Without(&p.mu, func() {
<-p.resolved
})
fallthrough
case p.isResolved():
r := p.resolution()
Expand All @@ -645,34 +667,41 @@ func (f *Future) Field(off uint16, def []byte) *Future {
}
}

// pipelineClient implements ClientHook by calling to the pipeline's answer.
type pipelineClient struct {
// PipelineClient implements ClientHook by calling to the pipeline's answer.
type PipelineClient struct {
p *Promise
transform []PipelineOp
}

func (pc pipelineClient) Send(ctx context.Context, s Send) (*Answer, ReleaseFunc) {
func (pc PipelineClient) Answer() *Answer {
return pc.p.Answer()
}

func (pc PipelineClient) Transform() []PipelineOp {
return pc.transform
}

func (pc PipelineClient) Send(ctx context.Context, s Send) (*Answer, ReleaseFunc) {
return pc.p.ans.PipelineSend(ctx, pc.transform, s)
}

func (pc pipelineClient) Recv(ctx context.Context, r Recv) PipelineCaller {
func (pc PipelineClient) Recv(ctx context.Context, r Recv) PipelineCaller {
return pc.p.ans.PipelineRecv(ctx, pc.transform, r)
}

func (pc pipelineClient) Brand() Brand {
func (pc PipelineClient) Brand() Brand {
select {
case <-pc.p.resolved:
pc.p.mu.Lock()
r := pc.p.resolution()
pc.p.mu.Unlock()
return r.client(pc.transform).State().Brand
default:
// TODO(someday): allow people to obtain the underlying answer.
return Brand{}
return Brand{Value: pc}
}
}

func (pc pipelineClient) Shutdown() {
func (pc PipelineClient) Shutdown() {
}

// A PipelineOp describes a step in transforming a pipeline.
Expand Down
3 changes: 2 additions & 1 deletion canonical.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ func canonicalPtr(dst *Segment, p Ptr) (Ptr, error) {
}
return ll.ToPtr(), nil
case interfacePtrType:
return Ptr{}, newError("cannot canonicalize interface")
iface := NewInterface(dst, p.Interface().Capability())
return iface.ToPtr(), nil
default:
panic("unreachable")
}
Expand Down
18 changes: 18 additions & 0 deletions canonical_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,24 @@ func TestCanonicalize(t *testing.T) {
t.Errorf("Canonicalize(two pointers to zero structs) =\n%s\n; want\n%s", hex.Dump(b), hex.Dump(want))
}
}
{
// pointer to interface
_, seg, _ := NewMessage(SingleSegment(nil))
s, _ := NewStruct(seg, ObjectSize{PointerCount: 2})
iface := NewInterface(seg, 1)
s.SetPtr(0, iface.ToPtr())
b, err := Canonicalize(s)
if err != nil {
t.Fatal("Canonicalize(pointer to interface):", err)
}
want := ([]byte{
0, 0, 0, 0, 0, 0, 1, 0,
3, 0, 0, 0, 1, 0, 0, 0,
})
if !bytes.Equal(b, want) {
t.Errorf("Canonicalize(pointer to interface) =\n%s\n; want\n%s", hex.Dump(b), hex.Dump(want))
}
}
{
// int list
_, seg, _ := NewMessage(SingleSegment(nil))
Expand Down
Loading

0 comments on commit 9c70849

Please sign in to comment.