Skip to content
This repository has been archived by the owner on Apr 23, 2024. It is now read-only.

sdk/agent: make open/pay/close ops consistently async #241

Merged
merged 3 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/console/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func run() error {
fmt.Fprintf(os.Stdout, "open - open a channel with asset\n")
fmt.Fprintf(os.Stdout, "deposit <amount> - deposit asset into escrow account\n")
fmt.Fprintf(os.Stdout, "pay <amount> - pay amount of asset to peer\n")
fmt.Fprintf(os.Stdout, "declareclose - declare to close the channel\n")
fmt.Fprintf(os.Stdout, "close - close the channel\n")
fmt.Fprintf(os.Stdout, "exit - exit the application\n")
case "listen":
Expand All @@ -179,6 +180,12 @@ func run() error {
fmt.Fprintf(os.Stdout, "error: %v\n", err)
continue
}
case "declareclose":
err := agent.DeclareClose()
if err != nil {
fmt.Fprintf(os.Stdout, "error: %v\n", err)
continue
}
case "close":
err := agent.Close()
if err != nil {
Expand Down
79 changes: 52 additions & 27 deletions sdk/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ type Agent struct {
channel *state.Channel

conn io.ReadWriter

// closeSignal is not nil if closing or closed, and the chan is closed once
// the payment channel is closed.
closeSignal chan struct{}
}

// Channel returns the channel the agent is managing. The channel will be nil if
Expand Down Expand Up @@ -142,33 +138,36 @@ func (a *Agent) Payment(paymentAmount string) error {
return nil
}

// Close kicks off the close process by synchronously submitting a tx to the
// network to begin the close process, then synchronously coordinating with the
// remote participant to coordinate the close, then synchronously submitting the
// final close tx either after the observation period is waited out.
func (a *Agent) Close() error {
// DeclareClose kicks off the close process by submitting a tx to the network to
// begin the close process, then asynchronously coordinating with the remote
// participant to coordinate the close. If the participant responds the agent
// will automatically submit the final close tx that can be submitted
// immediately. If no closed notification occurs before the observation period,
// manually submit the close by calling Close.
func (a *Agent) DeclareClose() error {
if a.conn == nil {
return fmt.Errorf("not connected")
}
if a.channel == nil {
return fmt.Errorf("no channel")
}
a.closeSignal = make(chan struct{})
// Submit declaration tx
declTx, closeTx, err := a.channel.CloseTxs()

// Submit declaration tx.
declTx, _, err := a.channel.CloseTxs()
if err != nil {
return fmt.Errorf("building declaration tx: %w", err)
}
declHash, err := declTx.HashHex(a.NetworkPassphrase)
if err != nil {
return fmt.Errorf("hashing close tx: %w", err)
return fmt.Errorf("hashing decl tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "submitting declaration", declHash)
fmt.Fprintln(a.LogWriter, "submitting declaration:", declHash)
err = a.Submitter.SubmitTx(declTx)
if err != nil {
return fmt.Errorf("submitting declaration tx: %w", err)
}
// Revising agreement to close early

// Attempt revising the close agreement to close early.
fmt.Fprintln(a.LogWriter, "proposing a revised close for immediate submission")
ca, err := a.channel.ProposeClose()
if err != nil {
Expand All @@ -182,22 +181,31 @@ func (a *Agent) Close() error {
if err != nil {
return fmt.Errorf("error: sending the close proposal: %w\n", err)
}

return nil
}

// Close closes the channel. The close must have been declared first either by
// calling DeclareClose or by the other participant. If the close fails it may
// be because the channel is already closed, or the participant has submitted
// the same close which is already queued but not yet processed, or the
// observation period has not yet passed since the close was declared.
func (a *Agent) Close() error {
_, closeTx, err := a.channel.CloseTxs()
if err != nil {
return fmt.Errorf("building close tx: %w", err)
}
closeHash, err := closeTx.HashHex(a.NetworkPassphrase)
if err != nil {
return fmt.Errorf("hashing close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "waiting observation period to submit delayed close tx", closeHash)
select {
case <-a.closeSignal:
fmt.Fprintln(a.LogWriter, "aborting sending delayed close tx", closeHash)
return nil
case <-time.After(a.ObservationPeriodTime):
}
fmt.Fprintln(a.LogWriter, "submitting delayed close tx", closeHash)
fmt.Fprintln(a.LogWriter, "submitting close tx:", closeHash)
err = a.Submitter.SubmitTx(closeTx)
if err != nil {
return fmt.Errorf("submitting declaration tx: %w", err)
fmt.Fprintln(a.LogWriter, "error submitting close tx:", closeHash, ",", err)
return fmt.Errorf("submitting close tx %s: %w", closeHash, err)
}
fmt.Fprintln(a.LogWriter, "submitted close tx:", closeHash)
return nil
}

Expand Down Expand Up @@ -369,6 +377,7 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error {
return fmt.Errorf("no channel")
}

// Agree to the close and send it back to requesting participant.
closeIn := *m.CloseRequest
close, err := a.channel.ConfirmClose(closeIn)
if err != nil {
Expand All @@ -382,6 +391,22 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error {
return fmt.Errorf("encoding close to send back: %v\n", err)
}
fmt.Fprintln(a.LogWriter, "close ready")

// Submit the close immediately since it is valid immediately.
_, closeTx, err := a.channel.CloseTxs()
if err != nil {
return fmt.Errorf("building close tx: %w", err)
}
hash, err := closeTx.HashHex(a.NetworkPassphrase)
if err != nil {
return fmt.Errorf("hashing close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "submitting close", hash)
err = a.Submitter.SubmitTx(closeTx)
if err != nil {
return fmt.Errorf("submitting close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "close successful")
return nil
}

Expand All @@ -390,12 +415,15 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error {
return fmt.Errorf("no channel")
}

// Store updated agreement from other participant.
closeIn := *m.CloseResponse
_, err := a.channel.ConfirmClose(closeIn)
if err != nil {
return fmt.Errorf("confirming close: %v\n", err)
}
fmt.Fprintln(a.LogWriter, "close ready")

// Submit the close immediately since it is valid immediately.
_, closeTx, err := a.channel.CloseTxs()
if err != nil {
return fmt.Errorf("building close tx: %w", err)
Expand All @@ -410,8 +438,5 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error {
return fmt.Errorf("submitting close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "close successful")
if a.closeSignal != nil {
close(a.closeSignal)
}
return nil
}