Skip to content
This repository has been archived by the owner on Apr 23, 2024. It is now read-only.

sdk/agent: make open/pay/close ops consistently async #241

Merged
merged 3 commits into from
Aug 23, 2021
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions examples/console/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,7 @@ func run() error {
fmt.Fprintf(os.Stdout, "open - open a channel with asset\n")
fmt.Fprintf(os.Stdout, "deposit <amount> - deposit asset into escrow account\n")
fmt.Fprintf(os.Stdout, "pay <amount> - pay amount of asset to peer\n")
fmt.Fprintf(os.Stdout, "declareclose - declare to close the channel\n")
fmt.Fprintf(os.Stdout, "close - close the channel\n")
fmt.Fprintf(os.Stdout, "exit - exit the application\n")
case "listen":
Expand All @@ -179,6 +180,12 @@ func run() error {
fmt.Fprintf(os.Stdout, "error: %v\n", err)
continue
}
case "declareclose":
err := agent.DeclareClose()
if err != nil {
fmt.Fprintf(os.Stdout, "error: %v\n", err)
continue
}
case "close":
err := agent.Close()
if err != nil {
Expand Down
79 changes: 52 additions & 27 deletions sdk/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,6 @@ type Agent struct {
channel *state.Channel

conn io.ReadWriter

// closeSignal is not nil if closing or closed, and the chan is closed once
// the payment channel is closed.
closeSignal chan struct{}
}

// Channel returns the channel the agent is managing. The channel will be nil if
Expand Down Expand Up @@ -142,33 +138,36 @@ func (a *Agent) Payment(paymentAmount string) error {
return nil
}

// Close kicks off the close process by synchronously submitting a tx to the
// network to begin the close process, then synchronously coordinating with the
// remote participant to coordinate the close, then synchronously submitting the
// final close tx either after the observation period is waited out.
func (a *Agent) Close() error {
// DeclareClose kicks off the close process by synchronously submitting a tx to
leighmcculloch marked this conversation as resolved.
Show resolved Hide resolved
// the network to begin the close process, then asynchronously coordinating with
// the remote participant to coordinate the close. If the participant responds
// the agent will automatically submit the final close tx that can be submitted
// immediately. If no closed notification occurs before the observation period,
// manually submit the close by calling Close.
func (a *Agent) DeclareClose() error {
if a.conn == nil {
return fmt.Errorf("not connected")
}
if a.channel == nil {
return fmt.Errorf("no channel")
}
a.closeSignal = make(chan struct{})
// Submit declaration tx
declTx, closeTx, err := a.channel.CloseTxs()

// Submit declaration tx.
declTx, _, err := a.channel.CloseTxs()
if err != nil {
return fmt.Errorf("building declaration tx: %w", err)
}
declHash, err := declTx.HashHex(a.NetworkPassphrase)
if err != nil {
return fmt.Errorf("hashing close tx: %w", err)
return fmt.Errorf("hashing decl tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "submitting declaration", declHash)
fmt.Fprintln(a.LogWriter, "submitting declaration:", declHash)
err = a.Submitter.SubmitTx(declTx)
if err != nil {
return fmt.Errorf("submitting declaration tx: %w", err)
}
// Revising agreement to close early

// Attempt revising the close agreement to close early.
fmt.Fprintln(a.LogWriter, "proposing a revised close for immediate submission")
ca, err := a.channel.ProposeClose()
if err != nil {
Expand All @@ -182,22 +181,31 @@ func (a *Agent) Close() error {
if err != nil {
return fmt.Errorf("error: sending the close proposal: %w\n", err)
}

return nil
}

// Close closes the channel. The close must have been declared first either by
// calling DeclareClose or by the other participant. If the close fails it may
// be because the channel is already closed, or the participant has submitted
// the same close which is already queued but not yet processed, or the
// observation period has not yet passed since the close was declared.
func (a *Agent) Close() error {
_, closeTx, err := a.channel.CloseTxs()
if err != nil {
return fmt.Errorf("building close tx: %w", err)
}
closeHash, err := closeTx.HashHex(a.NetworkPassphrase)
if err != nil {
return fmt.Errorf("hashing close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "waiting observation period to submit delayed close tx", closeHash)
select {
case <-a.closeSignal:
fmt.Fprintln(a.LogWriter, "aborting sending delayed close tx", closeHash)
return nil
case <-time.After(a.ObservationPeriodTime):
}
fmt.Fprintln(a.LogWriter, "submitting delayed close tx", closeHash)
fmt.Fprintln(a.LogWriter, "submitting close tx:", closeHash)
err = a.Submitter.SubmitTx(closeTx)
if err != nil {
return fmt.Errorf("submitting declaration tx: %w", err)
fmt.Fprintln(a.LogWriter, "error submitting close tx:", closeHash, ",", err)
return fmt.Errorf("submitting close tx %s: %w", closeHash, err)
}
fmt.Fprintln(a.LogWriter, "submitted close tx:", closeHash)
return nil
}

Expand Down Expand Up @@ -369,6 +377,7 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error {
return fmt.Errorf("no channel")
}

// Agree to the close and send it back to requesting participant.
closeIn := *m.CloseRequest
close, err := a.channel.ConfirmClose(closeIn)
if err != nil {
Expand All @@ -382,6 +391,22 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error {
return fmt.Errorf("encoding close to send back: %v\n", err)
}
fmt.Fprintln(a.LogWriter, "close ready")

// Submit the close immediately since it is valid immediately.
_, closeTx, err := a.channel.CloseTxs()
if err != nil {
return fmt.Errorf("building close tx: %w", err)
}
hash, err := closeTx.HashHex(a.NetworkPassphrase)
if err != nil {
return fmt.Errorf("hashing close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "submitting close", hash)
err = a.Submitter.SubmitTx(closeTx)
if err != nil {
return fmt.Errorf("submitting close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "close successful")
return nil
}

Expand All @@ -390,12 +415,15 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error {
return fmt.Errorf("no channel")
}

// Store updated agreement from other participant.
closeIn := *m.CloseResponse
_, err := a.channel.ConfirmClose(closeIn)
if err != nil {
return fmt.Errorf("confirming close: %v\n", err)
}
fmt.Fprintln(a.LogWriter, "close ready")

// Submit the close immediately since it is valid immediately.
_, closeTx, err := a.channel.CloseTxs()
if err != nil {
return fmt.Errorf("building close tx: %w", err)
Expand All @@ -410,8 +438,5 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error {
return fmt.Errorf("submitting close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "close successful")
if a.closeSignal != nil {
close(a.closeSignal)
}
return nil
}