From 4f75e2906bc543a98c36b78dac2fbb353c7325d2 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Sat, 21 Aug 2021 03:37:04 +0000 Subject: [PATCH 1/2] sdk/agent: make open/pay/close ops consistently async --- examples/console/main.go | 7 ++++ sdk/agent/agent.go | 79 ++++++++++++++++++++++++++-------------- 2 files changed, 59 insertions(+), 27 deletions(-) diff --git a/examples/console/main.go b/examples/console/main.go index 416eda25..9a5af3e2 100644 --- a/examples/console/main.go +++ b/examples/console/main.go @@ -153,6 +153,7 @@ func run() error { fmt.Fprintf(os.Stdout, "open - open a channel with asset\n") fmt.Fprintf(os.Stdout, "deposit - deposit asset into escrow account\n") fmt.Fprintf(os.Stdout, "pay - pay amount of asset to peer\n") + fmt.Fprintf(os.Stdout, "declareclose - declare to close the channel\n") fmt.Fprintf(os.Stdout, "close - close the channel\n") fmt.Fprintf(os.Stdout, "exit - exit the application\n") case "listen": @@ -179,6 +180,12 @@ func run() error { fmt.Fprintf(os.Stdout, "error: %v\n", err) continue } + case "declareclose": + err := agent.DeclareClose() + if err != nil { + fmt.Fprintf(os.Stdout, "error: %v\n", err) + continue + } case "close": err := agent.Close() if err != nil { diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 5d0a98f7..47cb9ca1 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -53,10 +53,6 @@ type Agent struct { channel *state.Channel conn io.ReadWriter - - // closeSignal is not nil if closing or closed, and the chan is closed once - // the payment channel is closed. - closeSignal chan struct{} } // Channel returns the channel the agent is managing. The channel will be nil if @@ -142,33 +138,36 @@ func (a *Agent) Payment(paymentAmount string) error { return nil } -// Close kicks off the close process by synchronously submitting a tx to the -// network to begin the close process, then synchronously coordinating with the -// remote participant to coordinate the close, then synchronously submitting the -// final close tx either after the observation period is waited out. -func (a *Agent) Close() error { +// DeclareClose kicks off the close process by synchronously submitting a tx to +// the network to begin the close process, then asynchronously coordinating with +// the remote participant to coordinate the close. If the participant responds +// the agent will automatically submit the final close tx that can be submitted +// immediately. If no closed notification occurs before the observation period, +// manually submit the close by calling Close. +func (a *Agent) DeclareClose() error { if a.conn == nil { return fmt.Errorf("not connected") } if a.channel == nil { return fmt.Errorf("no channel") } - a.closeSignal = make(chan struct{}) - // Submit declaration tx - declTx, closeTx, err := a.channel.CloseTxs() + + // Submit declaration tx. + declTx, _, err := a.channel.CloseTxs() if err != nil { return fmt.Errorf("building declaration tx: %w", err) } declHash, err := declTx.HashHex(a.NetworkPassphrase) if err != nil { - return fmt.Errorf("hashing close tx: %w", err) + return fmt.Errorf("hashing decl tx: %w", err) } - fmt.Fprintln(a.LogWriter, "submitting declaration", declHash) + fmt.Fprintln(a.LogWriter, "submitting declaration:", declHash) err = a.Submitter.SubmitTx(declTx) if err != nil { return fmt.Errorf("submitting declaration tx: %w", err) } - // Revising agreement to close early + + // Attempt revising the close agreement to close early. fmt.Fprintln(a.LogWriter, "proposing a revised close for immediate submission") ca, err := a.channel.ProposeClose() if err != nil { @@ -182,22 +181,31 @@ func (a *Agent) Close() error { if err != nil { return fmt.Errorf("error: sending the close proposal: %w\n", err) } + + return nil +} + +// Close closes the channel. The close must have been declared first either by +// calling DeclareClose or by the other participant. If the close fails it may +// be because the channel is already closed, or the participant has submitted +// the same close which is already queued but not yet processed, or the +// observation period has not yet passed since the close was declared. +func (a *Agent) Close() error { + _, closeTx, err := a.channel.CloseTxs() + if err != nil { + return fmt.Errorf("building close tx: %w", err) + } closeHash, err := closeTx.HashHex(a.NetworkPassphrase) if err != nil { return fmt.Errorf("hashing close tx: %w", err) } - fmt.Fprintln(a.LogWriter, "waiting observation period to submit delayed close tx", closeHash) - select { - case <-a.closeSignal: - fmt.Fprintln(a.LogWriter, "aborting sending delayed close tx", closeHash) - return nil - case <-time.After(a.ObservationPeriodTime): - } - fmt.Fprintln(a.LogWriter, "submitting delayed close tx", closeHash) + fmt.Fprintln(a.LogWriter, "submitting close tx:", closeHash) err = a.Submitter.SubmitTx(closeTx) if err != nil { - return fmt.Errorf("submitting declaration tx: %w", err) + fmt.Fprintln(a.LogWriter, "error submitting close tx:", closeHash, ",", err) + return fmt.Errorf("submitting close tx %s: %w", closeHash, err) } + fmt.Fprintln(a.LogWriter, "submitted close tx:", closeHash) return nil } @@ -369,6 +377,7 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("no channel") } + // Agree to the close and send it back to requesting participant. closeIn := *m.CloseRequest close, err := a.channel.ConfirmClose(closeIn) if err != nil { @@ -382,6 +391,22 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("encoding close to send back: %v\n", err) } fmt.Fprintln(a.LogWriter, "close ready") + + // Submit the close immediately since it is valid immediately. + _, closeTx, err := a.channel.CloseTxs() + if err != nil { + return fmt.Errorf("building close tx: %w", err) + } + hash, err := closeTx.HashHex(a.NetworkPassphrase) + if err != nil { + return fmt.Errorf("hashing close tx: %w", err) + } + fmt.Fprintln(a.LogWriter, "submitting close", hash) + err = a.Submitter.SubmitTx(closeTx) + if err != nil { + return fmt.Errorf("submitting close tx: %w", err) + } + fmt.Fprintln(a.LogWriter, "close successful") return nil } @@ -390,12 +415,15 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("no channel") } + // Store updated agreement from other participant. closeIn := *m.CloseResponse _, err := a.channel.ConfirmClose(closeIn) if err != nil { return fmt.Errorf("confirming close: %v\n", err) } fmt.Fprintln(a.LogWriter, "close ready") + + // Submit the close immediately since it is valid immediately. _, closeTx, err := a.channel.CloseTxs() if err != nil { return fmt.Errorf("building close tx: %w", err) @@ -410,8 +438,5 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("submitting close tx: %w", err) } fmt.Fprintln(a.LogWriter, "close successful") - if a.closeSignal != nil { - close(a.closeSignal) - } return nil } From a70276c01a07681cff190cb1e26f2cfeffe45f67 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Mon, 23 Aug 2021 18:01:00 +0000 Subject: [PATCH 2/2] Remove synchronously Co-authored-by: Alec Charbonneau <30449853+acharb@users.noreply.github.com> --- sdk/agent/agent.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 47cb9ca1..48dc197a 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -138,10 +138,10 @@ func (a *Agent) Payment(paymentAmount string) error { return nil } -// DeclareClose kicks off the close process by synchronously submitting a tx to -// the network to begin the close process, then asynchronously coordinating with -// the remote participant to coordinate the close. If the participant responds -// the agent will automatically submit the final close tx that can be submitted +// DeclareClose kicks off the close process by submitting a tx to the network to +// begin the close process, then asynchronously coordinating with the remote +// participant to coordinate the close. If the participant responds the agent +// will automatically submit the final close tx that can be submitted // immediately. If no closed notification occurs before the observation period, // manually submit the close by calling Close. func (a *Agent) DeclareClose() error {