From 4f75e2906bc543a98c36b78dac2fbb353c7325d2 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Sat, 21 Aug 2021 03:37:04 +0000 Subject: [PATCH 01/30] sdk/agent: make open/pay/close ops consistently async --- examples/console/main.go | 7 ++++ sdk/agent/agent.go | 79 ++++++++++++++++++++++++++-------------- 2 files changed, 59 insertions(+), 27 deletions(-) diff --git a/examples/console/main.go b/examples/console/main.go index 416eda25..9a5af3e2 100644 --- a/examples/console/main.go +++ b/examples/console/main.go @@ -153,6 +153,7 @@ func run() error { fmt.Fprintf(os.Stdout, "open - open a channel with asset\n") fmt.Fprintf(os.Stdout, "deposit - deposit asset into escrow account\n") fmt.Fprintf(os.Stdout, "pay - pay amount of asset to peer\n") + fmt.Fprintf(os.Stdout, "declareclose - declare to close the channel\n") fmt.Fprintf(os.Stdout, "close - close the channel\n") fmt.Fprintf(os.Stdout, "exit - exit the application\n") case "listen": @@ -179,6 +180,12 @@ func run() error { fmt.Fprintf(os.Stdout, "error: %v\n", err) continue } + case "declareclose": + err := agent.DeclareClose() + if err != nil { + fmt.Fprintf(os.Stdout, "error: %v\n", err) + continue + } case "close": err := agent.Close() if err != nil { diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 5d0a98f7..47cb9ca1 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -53,10 +53,6 @@ type Agent struct { channel *state.Channel conn io.ReadWriter - - // closeSignal is not nil if closing or closed, and the chan is closed once - // the payment channel is closed. - closeSignal chan struct{} } // Channel returns the channel the agent is managing. The channel will be nil if @@ -142,33 +138,36 @@ func (a *Agent) Payment(paymentAmount string) error { return nil } -// Close kicks off the close process by synchronously submitting a tx to the -// network to begin the close process, then synchronously coordinating with the -// remote participant to coordinate the close, then synchronously submitting the -// final close tx either after the observation period is waited out. -func (a *Agent) Close() error { +// DeclareClose kicks off the close process by synchronously submitting a tx to +// the network to begin the close process, then asynchronously coordinating with +// the remote participant to coordinate the close. If the participant responds +// the agent will automatically submit the final close tx that can be submitted +// immediately. If no closed notification occurs before the observation period, +// manually submit the close by calling Close. +func (a *Agent) DeclareClose() error { if a.conn == nil { return fmt.Errorf("not connected") } if a.channel == nil { return fmt.Errorf("no channel") } - a.closeSignal = make(chan struct{}) - // Submit declaration tx - declTx, closeTx, err := a.channel.CloseTxs() + + // Submit declaration tx. + declTx, _, err := a.channel.CloseTxs() if err != nil { return fmt.Errorf("building declaration tx: %w", err) } declHash, err := declTx.HashHex(a.NetworkPassphrase) if err != nil { - return fmt.Errorf("hashing close tx: %w", err) + return fmt.Errorf("hashing decl tx: %w", err) } - fmt.Fprintln(a.LogWriter, "submitting declaration", declHash) + fmt.Fprintln(a.LogWriter, "submitting declaration:", declHash) err = a.Submitter.SubmitTx(declTx) if err != nil { return fmt.Errorf("submitting declaration tx: %w", err) } - // Revising agreement to close early + + // Attempt revising the close agreement to close early. fmt.Fprintln(a.LogWriter, "proposing a revised close for immediate submission") ca, err := a.channel.ProposeClose() if err != nil { @@ -182,22 +181,31 @@ func (a *Agent) Close() error { if err != nil { return fmt.Errorf("error: sending the close proposal: %w\n", err) } + + return nil +} + +// Close closes the channel. The close must have been declared first either by +// calling DeclareClose or by the other participant. If the close fails it may +// be because the channel is already closed, or the participant has submitted +// the same close which is already queued but not yet processed, or the +// observation period has not yet passed since the close was declared. +func (a *Agent) Close() error { + _, closeTx, err := a.channel.CloseTxs() + if err != nil { + return fmt.Errorf("building close tx: %w", err) + } closeHash, err := closeTx.HashHex(a.NetworkPassphrase) if err != nil { return fmt.Errorf("hashing close tx: %w", err) } - fmt.Fprintln(a.LogWriter, "waiting observation period to submit delayed close tx", closeHash) - select { - case <-a.closeSignal: - fmt.Fprintln(a.LogWriter, "aborting sending delayed close tx", closeHash) - return nil - case <-time.After(a.ObservationPeriodTime): - } - fmt.Fprintln(a.LogWriter, "submitting delayed close tx", closeHash) + fmt.Fprintln(a.LogWriter, "submitting close tx:", closeHash) err = a.Submitter.SubmitTx(closeTx) if err != nil { - return fmt.Errorf("submitting declaration tx: %w", err) + fmt.Fprintln(a.LogWriter, "error submitting close tx:", closeHash, ",", err) + return fmt.Errorf("submitting close tx %s: %w", closeHash, err) } + fmt.Fprintln(a.LogWriter, "submitted close tx:", closeHash) return nil } @@ -369,6 +377,7 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("no channel") } + // Agree to the close and send it back to requesting participant. closeIn := *m.CloseRequest close, err := a.channel.ConfirmClose(closeIn) if err != nil { @@ -382,6 +391,22 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("encoding close to send back: %v\n", err) } fmt.Fprintln(a.LogWriter, "close ready") + + // Submit the close immediately since it is valid immediately. + _, closeTx, err := a.channel.CloseTxs() + if err != nil { + return fmt.Errorf("building close tx: %w", err) + } + hash, err := closeTx.HashHex(a.NetworkPassphrase) + if err != nil { + return fmt.Errorf("hashing close tx: %w", err) + } + fmt.Fprintln(a.LogWriter, "submitting close", hash) + err = a.Submitter.SubmitTx(closeTx) + if err != nil { + return fmt.Errorf("submitting close tx: %w", err) + } + fmt.Fprintln(a.LogWriter, "close successful") return nil } @@ -390,12 +415,15 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("no channel") } + // Store updated agreement from other participant. closeIn := *m.CloseResponse _, err := a.channel.ConfirmClose(closeIn) if err != nil { return fmt.Errorf("confirming close: %v\n", err) } fmt.Fprintln(a.LogWriter, "close ready") + + // Submit the close immediately since it is valid immediately. _, closeTx, err := a.channel.CloseTxs() if err != nil { return fmt.Errorf("building close tx: %w", err) @@ -410,8 +438,5 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("submitting close tx: %w", err) } fmt.Fprintln(a.LogWriter, "close successful") - if a.closeSignal != nil { - close(a.closeSignal) - } return nil } From 764dca2202c3b882f7b07980014d5ee92df6739b Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Sat, 21 Aug 2021 04:20:51 +0000 Subject: [PATCH 02/30] sdk/agent: add event handlers (wip) --- sdk/agent/agent.go | 50 ++++++++++++++++++++++++++++++++++++---------- 1 file changed, 39 insertions(+), 11 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 47cb9ca1..d6930504 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -50,6 +50,14 @@ type Agent struct { LogWriter io.Writer + HandleHelloError func(*Agent, error) + HandleOpenError func(*Agent, error) + HandleOpenConfirmed func(*Agent, *state.OpenAgreement) + HandlePaymentError func(*Agent, error) + HandlePaymentConfirmed func(*Agent, *state.CloseAgreement) + HandleCloseError func(*Agent, error) + HandleCloseConfirmed func(*Agent, *state.CloseAgreement) + channel *state.Channel conn io.ReadWriter @@ -233,22 +241,42 @@ func (a *Agent) handle(m msg.Message, send *msg.Encoder) error { if handler == nil { return fmt.Errorf("unrecognized message type %v", m.Type) } - err := handler(a, m, send) - if err != nil { - return fmt.Errorf("handling message type %v: %w", m.Type, err) - } + handler(a, m, send) return nil } var handlerMap = map[msg.Type]func(*Agent, msg.Message, *msg.Encoder) error{ - msg.TypeHello: (*Agent).handleHello, - msg.TypeOpenRequest: (*Agent).handleOpenRequest, - msg.TypeOpenResponse: (*Agent).handleOpenResponse, - msg.TypePaymentRequest: (*Agent).handlePaymentRequest, - msg.TypePaymentResponse: (*Agent).handlePaymentResponse, - msg.TypeCloseRequest: (*Agent).handleCloseRequest, - msg.TypeCloseResponse: (*Agent).handleCloseResponse, + msg.TypeHello: handleMessageAndErrAndSuccess((*Agent).handleHello, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), + msg.TypeOpenRequest: handleMessageAndErrAndSuccess((*Agent).handleOpenRequest, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), + msg.TypeOpenResponse: handleMessageAndErrAndSuccess((*Agent).handleOpenResponse, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), + msg.TypePaymentRequest: handleMessageAndErrAndSuccess((*Agent).handlePaymentRequest, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), + msg.TypePaymentResponse: handleMessageAndErrAndSuccess((*Agent).handlePaymentResponse, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), + msg.TypeCloseRequest: handleMessageAndErrAndSuccess((*Agent).handleCloseRequest, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), + msg.TypeCloseResponse: handleMessageAndErrAndSuccess((*Agent).handleCloseResponse, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), +} + +func handleMessageAndErrAndSuccess( + handleMessage func(*Agent, msg.Message, *msg.Encoder) error, + handleErr func(*Agent, error), + handleSuccess func(*Agent), +) func(*Agent, msg.Message, *msg.Encoder) error { + return func(a *Agent, m msg.Message, e *msg.Encoder) error { + err := handleMessage(a, m, e) + if err != nil { + handleErr(a, err) + return err + } + handleSuccess(a) + return nil + } } +func (a *Agent) handleHelloError func(*Agent, error) +func (a *Agent) handleOpenError func(*Agent, error) +func (a *Agent) handleOpenConfirmed func(*Agent, *state.OpenAgreement) +func (a *Agent) handlePaymentError func(*Agent, error) +func (a *Agent) handlePaymentConfirmed func(*Agent, *state.CloseAgreement) +func (a *Agent) handleCloseError func(*Agent, error) +func (a *Agent) handleCloseConfirmed func(*Agent, *state.CloseAgreement) func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { if a.channel != nil { From 442b88b455d99cd8d10c925065be12e1bd406ae8 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Mon, 23 Aug 2021 23:10:09 +0000 Subject: [PATCH 03/30] 1 --- sdk/agent/agent.go | 70 +++++++++++++++++++--------------------------- 1 file changed, 29 insertions(+), 41 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index d6930504..0edba524 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -50,13 +50,11 @@ type Agent struct { LogWriter io.Writer - HandleHelloError func(*Agent, error) - HandleOpenError func(*Agent, error) - HandleOpenConfirmed func(*Agent, *state.OpenAgreement) - HandlePaymentError func(*Agent, error) - HandlePaymentConfirmed func(*Agent, *state.CloseAgreement) - HandleCloseError func(*Agent, error) - HandleCloseConfirmed func(*Agent, *state.CloseAgreement) + OnError func(*Agent, error) + OnInitialized func(*Agent) + OnOpened func(*Agent) + OnPaymentConfirmed func(*Agent, *state.CloseAgreement) + OnCloseConfirmed func(*Agent, *state.CloseAgreement) channel *state.Channel @@ -146,10 +144,10 @@ func (a *Agent) Payment(paymentAmount string) error { return nil } -// DeclareClose kicks off the close process by synchronously submitting a tx to -// the network to begin the close process, then asynchronously coordinating with -// the remote participant to coordinate the close. If the participant responds -// the agent will automatically submit the final close tx that can be submitted +// DeclareClose kicks off the close process by submitting a tx to the network to +// begin the close process, then asynchronously coordinating with the remote +// participant to coordinate the close. If the participant responds the agent +// will automatically submit the final close tx that can be submitted // immediately. If no closed notification occurs before the observation period, // manually submit the close by calling Close. func (a *Agent) DeclareClose() error { @@ -241,42 +239,29 @@ func (a *Agent) handle(m msg.Message, send *msg.Encoder) error { if handler == nil { return fmt.Errorf("unrecognized message type %v", m.Type) } - handler(a, m, send) + err := handler(a, m, send) + if err != nil { + if a.OnError != nil { + a.OnError(a, err) + } + return fmt.Errorf("handling message %d: %w", m.Type, err) + } return nil } var handlerMap = map[msg.Type]func(*Agent, msg.Message, *msg.Encoder) error{ - msg.TypeHello: handleMessageAndErrAndSuccess((*Agent).handleHello, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), - msg.TypeOpenRequest: handleMessageAndErrAndSuccess((*Agent).handleOpenRequest, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), - msg.TypeOpenResponse: handleMessageAndErrAndSuccess((*Agent).handleOpenResponse, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), - msg.TypePaymentRequest: handleMessageAndErrAndSuccess((*Agent).handlePaymentRequest, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), - msg.TypePaymentResponse: handleMessageAndErrAndSuccess((*Agent).handlePaymentResponse, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), - msg.TypeCloseRequest: handleMessageAndErrAndSuccess((*Agent).handleCloseRequest, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), - msg.TypeCloseResponse: handleMessageAndErrAndSuccess((*Agent).handleCloseResponse, (*Agent).HandleHelloError, (*Agent).HandleCloseConfirmed), + msg.TypeHello: (*Agent).handleHello, + msg.TypeOpenRequest: (*Agent).handleOpenRequest, + msg.TypeOpenResponse: (*Agent).handleOpenResponse, + msg.TypePaymentRequest: (*Agent).handlePaymentRequest, + msg.TypePaymentResponse: (*Agent).handlePaymentResponse, + msg.TypeCloseRequest: (*Agent).handleCloseRequest, + msg.TypeCloseResponse: (*Agent).handleCloseResponse, } -func handleMessageAndErrAndSuccess( - handleMessage func(*Agent, msg.Message, *msg.Encoder) error, - handleErr func(*Agent, error), - handleSuccess func(*Agent), -) func(*Agent, msg.Message, *msg.Encoder) error { - return func(a *Agent, m msg.Message, e *msg.Encoder) error { - err := handleMessage(a, m, e) - if err != nil { - handleErr(a, err) - return err - } - handleSuccess(a) - return nil - } +func isInitiator(self, other *keypair.FromAddress) bool { + return self.Address() > other.Address() } -func (a *Agent) handleHelloError func(*Agent, error) -func (a *Agent) handleOpenError func(*Agent, error) -func (a *Agent) handleOpenConfirmed func(*Agent, *state.OpenAgreement) -func (a *Agent) handlePaymentError func(*Agent, error) -func (a *Agent) handlePaymentConfirmed func(*Agent, *state.CloseAgreement) -func (a *Agent) handleCloseError func(*Agent, error) -func (a *Agent) handleCloseConfirmed func(*Agent, *state.CloseAgreement) func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { if a.channel != nil { @@ -300,7 +285,7 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { a.channel = state.NewChannel(state.Config{ NetworkPassphrase: a.NetworkPassphrase, MaxOpenExpiry: a.MaxOpenExpiry, - Initiator: a.EscrowAccountKey.Address() > h.EscrowAccount.Address(), + Initiator: isInitiator(a.EscrowAccountKey, &h.EscrowAccount), LocalEscrowAccount: &state.EscrowAccount{ Address: a.EscrowAccountKey, SequenceNumber: escrowAccountSeqNum, @@ -312,6 +297,9 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { LocalSigner: a.EscrowAccountSigner, RemoteSigner: &h.Signer, }) + if a.OnInitialized != nil { + a.OnInitialized(a) + } return nil } From 392b170c4927a738e677bc40844f01ec40aa0ad9 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Mon, 23 Aug 2021 23:12:11 +0000 Subject: [PATCH 04/30] sdk/agent: move logic for determining initiator to a func --- sdk/agent/agent.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 48dc197a..48a15e2a 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -250,6 +250,10 @@ var handlerMap = map[msg.Type]func(*Agent, msg.Message, *msg.Encoder) error{ msg.TypeCloseResponse: (*Agent).handleCloseResponse, } +func isInitiator(self, other *keypair.FromAddress) bool { + return self.Address() > other.Address() +} + func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { if a.channel != nil { return fmt.Errorf("extra hello received when channel already setup") @@ -272,7 +276,7 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { a.channel = state.NewChannel(state.Config{ NetworkPassphrase: a.NetworkPassphrase, MaxOpenExpiry: a.MaxOpenExpiry, - Initiator: a.EscrowAccountKey.Address() > h.EscrowAccount.Address(), + Initiator: isInitiator(a.EscrowAccountKey, &h.EscrowAccount), LocalEscrowAccount: &state.EscrowAccount{ Address: a.EscrowAccountKey, SequenceNumber: escrowAccountSeqNum, From fe5db7e5d301f858527eb2f943ed33eb5ba2977b Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Mon, 23 Aug 2021 23:32:30 +0000 Subject: [PATCH 05/30] wip --- sdk/agent/agent.go | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 0edba524..4a659fd2 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -321,6 +321,15 @@ func (a *Agent) handleOpenRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("encoding open to send back: %w", err) } + // TODO: Remove this trigger of the event handler from here once ingesting + // transactions is added and the event is triggered from there. Note that + // technically the channel isn't open at this point and triggering the event + // here is just a hold over until we can trigger it based on ingestion. + // Triggering here assumes that the other participant, the initiator, + // submits the transaction. + if a.OnOpened != nil { + a.OnOpened(a) + } return nil } @@ -343,6 +352,11 @@ func (a *Agent) handleOpenResponse(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("submitting formation tx: %w", err) } + // TODO: Move the triggering of this event handler to wherever we end up + // ingesting transactions, and trigger it after the channel becomes opened. + if a.OnOpened != nil { + a.OnOpened(a) + } return nil } From 1969226417159e09db89f32dd737674f5c523dc1 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Mon, 23 Aug 2021 23:33:07 +0000 Subject: [PATCH 06/30] sdk/agent: add todo to remove logic --- sdk/agent/agent.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 48dc197a..287d3c45 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -338,6 +338,8 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { paymentIn := *m.PaymentRequest payment, err := a.channel.ConfirmPayment(paymentIn) if errors.Is(err, state.ErrUnderfunded) { + // TODO: Remove this logic once the agent is ingesting transactions and + // updating account balance that way. fmt.Fprintf(a.LogWriter, "remote is underfunded for this payment based on cached account balances, checking their escrow account...\n") var balance int64 balance, err = a.BalanceCollector.GetBalance(a.channel.RemoteEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset) From 89a7d309cf334ac42a4f89f032f3d159c4e164dc Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 00:44:29 +0000 Subject: [PATCH 07/30] wip --- sdk/agent/agent.go | 23 ++++++++++++++++++----- 1 file changed, 18 insertions(+), 5 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index a9be6936..6e405ca3 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -50,11 +50,11 @@ type Agent struct { LogWriter io.Writer - OnError func(*Agent, error) - OnInitialized func(*Agent) - OnOpened func(*Agent) - OnPaymentConfirmed func(*Agent, *state.CloseAgreement) - OnCloseConfirmed func(*Agent, *state.CloseAgreement) + OnError func(*Agent, error) + OnInitialized func(*Agent) + OnOpened func(*Agent) + OnPayment func(*Agent, state.CloseAgreement) + OnClosed func(*Agent) channel *state.Channel @@ -387,6 +387,9 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("encoding payment to send back: %w", err) } + if a.OnPayment != nil { + a.OnPayment(a, payment) + } return nil } @@ -439,6 +442,11 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("submitting close tx: %w", err) } fmt.Fprintln(a.LogWriter, "close successful") + // TODO: Move the triggering of this event handler to wherever we end up + // ingesting transactions, and trigger it after the channel becomes closed. + if a.OnClosed != nil { + a.OnClosed(a) + } return nil } @@ -470,5 +478,10 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("submitting close tx: %w", err) } fmt.Fprintln(a.LogWriter, "close successful") + // TODO: Move the triggering of this event handler to wherever we end up + // ingesting transactions, and trigger it after the channel becomes closed. + if a.OnClosed != nil { + a.OnClosed(a) + } return nil } From 32a8da7968ff6985f0352a7b0d330305dd852532 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 06:00:58 +0000 Subject: [PATCH 08/30] wip --- sdk/agent/agent.go | 104 ++++++++++++++++++++++++++------------------- 1 file changed, 60 insertions(+), 44 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 6e405ca3..0cdf1032 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -50,15 +50,16 @@ type Agent struct { LogWriter io.Writer - OnError func(*Agent, error) - OnInitialized func(*Agent) - OnOpened func(*Agent) - OnPayment func(*Agent, state.CloseAgreement) - OnClosed func(*Agent) - - channel *state.Channel - - conn io.ReadWriter + OnError func(*Agent, error) + OnConnected func(*Agent) + OnOpened func(*Agent) + OnPayment func(*Agent, state.CloseAgreement) + OnClosed func(*Agent) + + channel *state.Channel + otherEscrowAccount *keypair.FromAddress + otherEscrowAccountSigner *keypair.FromAddress + conn io.ReadWriter } // Channel returns the channel the agent is managing. The channel will be nil if @@ -84,14 +85,53 @@ func (a *Agent) hello() error { return nil } +func isInitiator(self, other *keypair.FromAddress) bool { + return self.Address() > other.Address() +} + +func (a *Agent) initChannel() error { + if a.channel != nil { + return fmt.Errorf("channel already created") + } + escrowAccountSeqNum, err := a.SequenceNumberCollector.GetSequenceNumber(a.EscrowAccountKey) + if err != nil { + return err + } + otherEscrowAccountSeqNum, err := a.SequenceNumberCollector.GetSequenceNumber(a.otherEscrowAccount) + if err != nil { + return err + } + a.channel = state.NewChannel(state.Config{ + NetworkPassphrase: a.NetworkPassphrase, + MaxOpenExpiry: a.MaxOpenExpiry, + Initiator: isInitiator(a.EscrowAccountKey, a.otherEscrowAccount), + LocalEscrowAccount: &state.EscrowAccount{ + Address: a.EscrowAccountKey, + SequenceNumber: escrowAccountSeqNum, + }, + RemoteEscrowAccount: &state.EscrowAccount{ + Address: a.otherEscrowAccount, + SequenceNumber: otherEscrowAccountSeqNum, + }, + LocalSigner: a.EscrowAccountSigner, + RemoteSigner: a.otherEscrowAccountSigner, + }) + return nil +} + // Open kicks off the open process which will continue after the function // returns. func (a *Agent) Open() error { if a.conn == nil { return fmt.Errorf("not connected") } - if a.channel == nil { - return fmt.Errorf("no channel") + if a.channel != nil { + return fmt.Errorf("channel already exists") + } + + err := a.initChannel() + if err != nil { + return fmt.Errorf("init channel: %w", err) } open, err := a.channel.ProposeOpen(state.OpenParams{ ObservationPeriodTime: a.ObservationPeriodTime, @@ -259,10 +299,6 @@ var handlerMap = map[msg.Type]func(*Agent, msg.Message, *msg.Encoder) error{ msg.TypeCloseResponse: (*Agent).handleCloseResponse, } -func isInitiator(self, other *keypair.FromAddress) bool { - return self.Address() > other.Address() -} - func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { if a.channel != nil { return fmt.Errorf("extra hello received when channel already setup") @@ -272,40 +308,20 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { fmt.Fprintf(a.LogWriter, "other's signer: %v\n", h.Signer.Address()) fmt.Fprintf(a.LogWriter, "other's escrow account: %v\n", h.EscrowAccount.Address()) - escrowAccountSeqNum, err := a.SequenceNumberCollector.GetSequenceNumber(a.EscrowAccountKey) - if err != nil { - return err - } - otherEscrowAccountSeqNum, err := a.SequenceNumberCollector.GetSequenceNumber(&h.EscrowAccount) - if err != nil { - return err - } - fmt.Fprintf(a.LogWriter, "escrow account seq: %v\n", escrowAccountSeqNum) - fmt.Fprintf(a.LogWriter, "other's escrow account seq: %v\n", otherEscrowAccountSeqNum) - a.channel = state.NewChannel(state.Config{ - NetworkPassphrase: a.NetworkPassphrase, - MaxOpenExpiry: a.MaxOpenExpiry, - Initiator: isInitiator(a.EscrowAccountKey, &h.EscrowAccount), - LocalEscrowAccount: &state.EscrowAccount{ - Address: a.EscrowAccountKey, - SequenceNumber: escrowAccountSeqNum, - }, - RemoteEscrowAccount: &state.EscrowAccount{ - Address: &h.EscrowAccount, - SequenceNumber: otherEscrowAccountSeqNum, - }, - LocalSigner: a.EscrowAccountSigner, - RemoteSigner: &h.Signer, - }) - if a.OnInitialized != nil { - a.OnInitialized(a) + a.otherEscrowAccount = &h.EscrowAccount + a.otherEscrowAccountSigner = &h.Signer + + if a.OnConnected != nil { + a.OnConnected(a) } + return nil } func (a *Agent) handleOpenRequest(m msg.Message, send *msg.Encoder) error { - if a.channel == nil { - return fmt.Errorf("no channel") + err := a.initChannel() + if err != nil { + return fmt.Errorf("init channel: %w", err) } openIn := *m.OpenRequest From 4474a6349cc09ab135a0584e56f02c17b88eef99 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 06:06:56 +0000 Subject: [PATCH 09/30] sdk/agent: move channel creation into open process --- sdk/agent/agent.go | 91 +++++++++++++++++++++++++++------------------- 1 file changed, 53 insertions(+), 38 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 7eaedee4..a4508102 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -50,9 +50,10 @@ type Agent struct { LogWriter io.Writer - channel *state.Channel - - conn io.ReadWriter + conn io.ReadWriter + otherEscrowAccount *keypair.FromAddress + otherEscrowAccountSigner *keypair.FromAddress + channel *state.Channel } // Channel returns the channel the agent is managing. The channel will be nil if @@ -78,14 +79,52 @@ func (a *Agent) hello() error { return nil } +func isInitiator(self, other *keypair.FromAddress) bool { + return self.Address() > other.Address() +} + +func (a *Agent) initChannel() error { + if a.channel != nil { + return fmt.Errorf("channel already created") + } + escrowAccountSeqNum, err := a.SequenceNumberCollector.GetSequenceNumber(a.EscrowAccountKey) + if err != nil { + return err + } + otherEscrowAccountSeqNum, err := a.SequenceNumberCollector.GetSequenceNumber(a.otherEscrowAccount) + if err != nil { + return err + } + a.channel = state.NewChannel(state.Config{ + NetworkPassphrase: a.NetworkPassphrase, + MaxOpenExpiry: a.MaxOpenExpiry, + Initiator: isInitiator(a.EscrowAccountKey, a.otherEscrowAccount), + LocalEscrowAccount: &state.EscrowAccount{ + Address: a.EscrowAccountKey, + SequenceNumber: escrowAccountSeqNum, + }, + RemoteEscrowAccount: &state.EscrowAccount{ + Address: a.otherEscrowAccount, + SequenceNumber: otherEscrowAccountSeqNum, + }, + LocalSigner: a.EscrowAccountSigner, + RemoteSigner: a.otherEscrowAccountSigner, + }) + return nil +} + // Open kicks off the open process which will continue after the function // returns. func (a *Agent) Open() error { if a.conn == nil { return fmt.Errorf("not connected") } - if a.channel == nil { - return fmt.Errorf("no channel") + if a.channel != nil { + return fmt.Errorf("channel already exists") + } + err := a.initChannel() + if err != nil { + return fmt.Errorf("init channel: %w", err) } open, err := a.channel.ProposeOpen(state.OpenParams{ ObservationPeriodTime: a.ObservationPeriodTime, @@ -250,50 +289,26 @@ var handlerMap = map[msg.Type]func(*Agent, msg.Message, *msg.Encoder) error{ msg.TypeCloseResponse: (*Agent).handleCloseResponse, } -func isInitiator(self, other *keypair.FromAddress) bool { - return self.Address() > other.Address() -} - func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { if a.channel != nil { return fmt.Errorf("extra hello received when channel already setup") } - h := *m.Hello + h := m.Hello - fmt.Fprintf(a.LogWriter, "other's signer: %v\n", h.Signer.Address()) fmt.Fprintf(a.LogWriter, "other's escrow account: %v\n", h.EscrowAccount.Address()) - escrowAccountSeqNum, err := a.SequenceNumberCollector.GetSequenceNumber(a.EscrowAccountKey) - if err != nil { - return err - } - otherEscrowAccountSeqNum, err := a.SequenceNumberCollector.GetSequenceNumber(&h.EscrowAccount) - if err != nil { - return err - } - fmt.Fprintf(a.LogWriter, "escrow account seq: %v\n", escrowAccountSeqNum) - fmt.Fprintf(a.LogWriter, "other's escrow account seq: %v\n", otherEscrowAccountSeqNum) - a.channel = state.NewChannel(state.Config{ - NetworkPassphrase: a.NetworkPassphrase, - MaxOpenExpiry: a.MaxOpenExpiry, - Initiator: isInitiator(a.EscrowAccountKey, &h.EscrowAccount), - LocalEscrowAccount: &state.EscrowAccount{ - Address: a.EscrowAccountKey, - SequenceNumber: escrowAccountSeqNum, - }, - RemoteEscrowAccount: &state.EscrowAccount{ - Address: &h.EscrowAccount, - SequenceNumber: otherEscrowAccountSeqNum, - }, - LocalSigner: a.EscrowAccountSigner, - RemoteSigner: &h.Signer, - }) + a.otherEscrowAccount = &h.EscrowAccount + + fmt.Fprintf(a.LogWriter, "other's signer: %v\n", h.Signer.Address()) + a.otherEscrowAccountSigner = &h.Signer + return nil } func (a *Agent) handleOpenRequest(m msg.Message, send *msg.Encoder) error { - if a.channel == nil { - return fmt.Errorf("no channel") + err := a.initChannel() + if err != nil { + return fmt.Errorf("init channel: %w", err) } openIn := *m.OpenRequest From e50696b16b3b90d9002a0a02cda96385a7891783 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 17:05:31 +0000 Subject: [PATCH 10/30] make the initiator the open proposer --- sdk/agent/agent.go | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index a4508102..ff48f30b 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -79,11 +79,7 @@ func (a *Agent) hello() error { return nil } -func isInitiator(self, other *keypair.FromAddress) bool { - return self.Address() > other.Address() -} - -func (a *Agent) initChannel() error { +func (a *Agent) initChannel(initiator bool) error { if a.channel != nil { return fmt.Errorf("channel already created") } @@ -98,7 +94,7 @@ func (a *Agent) initChannel() error { a.channel = state.NewChannel(state.Config{ NetworkPassphrase: a.NetworkPassphrase, MaxOpenExpiry: a.MaxOpenExpiry, - Initiator: isInitiator(a.EscrowAccountKey, a.otherEscrowAccount), + Initiator: initiator, LocalEscrowAccount: &state.EscrowAccount{ Address: a.EscrowAccountKey, SequenceNumber: escrowAccountSeqNum, @@ -122,7 +118,7 @@ func (a *Agent) Open() error { if a.channel != nil { return fmt.Errorf("channel already exists") } - err := a.initChannel() + err := a.initChannel(true) if err != nil { return fmt.Errorf("init channel: %w", err) } @@ -306,7 +302,7 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { } func (a *Agent) handleOpenRequest(m msg.Message, send *msg.Encoder) error { - err := a.initChannel() + err := a.initChannel(false) if err != nil { return fmt.Errorf("init channel: %w", err) } From 521424944115da8058508dbb3e16a72483697b88 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 18:47:15 +0000 Subject: [PATCH 11/30] Reduce changes --- sdk/agent/agent.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index ac2e3e48..f3007946 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -56,10 +56,10 @@ type Agent struct { OnPayment func(*Agent, state.CloseAgreement) OnClosed func(*Agent) - channel *state.Channel + conn io.ReadWriter otherEscrowAccount *keypair.FromAddress otherEscrowAccountSigner *keypair.FromAddress - conn io.ReadWriter + channel *state.Channel } // Channel returns the channel the agent is managing. The channel will be nil if From 85d1a53e1b9f4876f96421fa71035a4d93f3b0b7 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 19:03:26 +0000 Subject: [PATCH 12/30] on error in more cases --- sdk/agent/agent.go | 24 ++++++++++++++++-------- 1 file changed, 16 insertions(+), 8 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index f3007946..d7689d5e 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -268,20 +268,28 @@ func (a *Agent) loop() { } } -func (a *Agent) handle(m msg.Message, send *msg.Encoder) error { +func (a *Agent) handle(m msg.Message, send *msg.Encoder) (err error) { fmt.Fprintf(a.LogWriter, "handling %v\n", m.Type) + + defer func() { + if err != nil && a.OnError != nil { + a.OnError(a, err) + } + }() + handler := handlerMap[m.Type] if handler == nil { - return fmt.Errorf("unrecognized message type %v", m.Type) + err = fmt.Errorf("unrecognized message type %v", m.Type) + return } - err := handler(a, m, send) + + err = handler(a, m, send) if err != nil { - if a.OnError != nil { - a.OnError(a, err) - } - return fmt.Errorf("handling message %d: %w", m.Type, err) + err = fmt.Errorf("handling message %d: %w", m.Type, err) + return } - return nil + + return } var handlerMap = map[msg.Type]func(*Agent, msg.Message, *msg.Encoder) error{ From 3430c58362a50118231d45b7cd6bf0860cf44a2a Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 19:38:00 +0000 Subject: [PATCH 13/30] simplify --- sdk/agent/agent.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index d7689d5e..7f6ba27b 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -268,28 +268,25 @@ func (a *Agent) loop() { } } -func (a *Agent) handle(m msg.Message, send *msg.Encoder) (err error) { +func (a *Agent) handle(m msg.Message, send *msg.Encoder) error { fmt.Fprintf(a.LogWriter, "handling %v\n", m.Type) - - defer func() { - if err != nil && a.OnError != nil { - a.OnError(a, err) - } - }() - handler := handlerMap[m.Type] if handler == nil { err = fmt.Errorf("unrecognized message type %v", m.Type) - return + if a.OnError != nil { + a.OnError(a, err) + } + return err } - err = handler(a, m, send) if err != nil { err = fmt.Errorf("handling message %d: %w", m.Type, err) - return + if a.OnError != nil { + a.OnError(a, err) + } + return err } - - return + return nil } var handlerMap = map[msg.Type]func(*Agent, msg.Message, *msg.Encoder) error{ From 487092fbbf45d806fd5d2c5fda556b94d23a737a Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 19:38:39 +0000 Subject: [PATCH 14/30] simplify --- sdk/agent/agent.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 7f6ba27b..a0e36d18 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -272,13 +272,13 @@ func (a *Agent) handle(m msg.Message, send *msg.Encoder) error { fmt.Fprintf(a.LogWriter, "handling %v\n", m.Type) handler := handlerMap[m.Type] if handler == nil { - err = fmt.Errorf("unrecognized message type %v", m.Type) + err := fmt.Errorf("unrecognized message type %v", m.Type) if a.OnError != nil { a.OnError(a, err) } return err } - err = handler(a, m, send) + err := handler(a, m, send) if err != nil { err = fmt.Errorf("handling message %d: %w", m.Type, err) if a.OnError != nil { From b1d02484604eb761bd706f8b8dc21af83fdb422a Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 19:39:35 +0000 Subject: [PATCH 15/30] consistent error message --- sdk/agent/agent.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index a0e36d18..1ec0cbf8 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -272,7 +272,7 @@ func (a *Agent) handle(m msg.Message, send *msg.Encoder) error { fmt.Fprintf(a.LogWriter, "handling %v\n", m.Type) handler := handlerMap[m.Type] if handler == nil { - err := fmt.Errorf("unrecognized message type %v", m.Type) + err := fmt.Errorf("handling message %d: unrecognized message type", m.Type) if a.OnError != nil { a.OnError(a, err) } From 2a3f943352659c703623ec4884d55fdb2f102c19 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 19:40:27 +0000 Subject: [PATCH 16/30] log error --- examples/console/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/console/main.go b/examples/console/main.go index 9a5af3e2..88446453 100644 --- a/examples/console/main.go +++ b/examples/console/main.go @@ -114,6 +114,9 @@ func run() error { EscrowAccountKey: escrowAccountKey.FromAddress(), EscrowAccountSigner: signerKey, LogWriter: os.Stderr, + OnError: func(a *agent.Agent, err error) { + fmt.Fprintf(os.Stderr, "error: %v\n", err) + }, } tx, err := txbuild.CreateEscrow(txbuild.CreateEscrowParams{ From 42329620b6affc9b798da1a9c92b49801c59738a Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 19:41:59 +0000 Subject: [PATCH 17/30] group --- sdk/agent/agent.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index ff48f30b..f2e2ff34 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -292,12 +292,12 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { h := m.Hello - fmt.Fprintf(a.LogWriter, "other's escrow account: %v\n", h.EscrowAccount.Address()) a.otherEscrowAccount = &h.EscrowAccount - - fmt.Fprintf(a.LogWriter, "other's signer: %v\n", h.Signer.Address()) a.otherEscrowAccountSigner = &h.Signer + fmt.Fprintf(a.LogWriter, "other's escrow account: %v\n", a.otherEscrowAccount.Address()) + fmt.Fprintf(a.LogWriter, "other's signer: %v\n", a.otherEscrowAccountSigner.Address()) + return nil } From 6ddb5670462bc1ef0088064194becb1330c8bd53 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 22:14:50 +0000 Subject: [PATCH 18/30] event handling --- examples/console/main.go | 22 +++++++++++++++++++++- sdk/agent/agent.go | 24 +++++++++++++++--------- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/examples/console/main.go b/examples/console/main.go index 88446453..23ab639d 100644 --- a/examples/console/main.go +++ b/examples/console/main.go @@ -10,6 +10,7 @@ import ( "github.com/stellar/experimental-payment-channels/sdk/agent" "github.com/stellar/experimental-payment-channels/sdk/horizon" + "github.com/stellar/experimental-payment-channels/sdk/state" "github.com/stellar/experimental-payment-channels/sdk/submit" "github.com/stellar/experimental-payment-channels/sdk/txbuild" "github.com/stellar/go/amount" @@ -115,7 +116,26 @@ func run() error { EscrowAccountSigner: signerKey, LogWriter: os.Stderr, OnError: func(a *agent.Agent, err error) { - fmt.Fprintf(os.Stderr, "error: %v\n", err) + fmt.Fprintf(os.Stderr, "agent error: %v\n", err) + }, + OnConnected: func(a *agent.Agent) { + fmt.Fprintf(os.Stderr, "agent connected\n") + }, + OnOpened: func(a *agent.Agent) { + fmt.Fprintf(os.Stderr, "agent channel opened\n") + }, + OnPaymentReceivedAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { + fmt.Fprintf(os.Stderr, "agent channel received payment: iteration=%d balance=%d", ca.Details.IterationNumber, ca.Details.Balance) + }, + OnPaymentSentAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { + fmt.Fprintf(os.Stderr, "agent channel sent payment and other participant confirmed: iteration=%d balance=%d", ca.Details.IterationNumber, ca.Details.Balance) + }, + // TODO: Add when ingestion is added to agent. + // OnClosing: func(a *agent.Agent) { + // fmt.Fprintf(os.Stderr, "agent channel closing\n") + // }, + OnClosed: func(a *agent.Agent) { + fmt.Fprintf(os.Stderr, "agent channel closed\n") }, } diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 40adc1d2..91de843f 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -50,11 +50,14 @@ type Agent struct { LogWriter io.Writer - OnError func(*Agent, error) - OnConnected func(*Agent) - OnOpened func(*Agent) - OnPayment func(*Agent, state.CloseAgreement) - OnClosed func(*Agent) + OnError func(*Agent, error) + OnConnected func(*Agent) + OnOpened func(*Agent) + OnPaymentReceivedAndConfirmed func(*Agent, state.CloseAgreement) + OnPaymentSentAndConfirmed func(*Agent, state.CloseAgreement) + // TODO: Add closing event when ingestion is implemented. + // OnClosing func(*Agent) + OnClosed func(*Agent) conn io.ReadWriter otherEscrowAccount *keypair.FromAddress @@ -400,13 +403,13 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("confirming payment: %w", err) } fmt.Fprintf(a.LogWriter, "payment authorized\n") + if a.OnPaymentReceivedAndConfirmed != nil { + defer a.OnPaymentReceivedAndConfirmed(a, payment) + } err = send.Encode(msg.Message{Type: msg.TypePaymentResponse, PaymentResponse: &payment}) if err != nil { return fmt.Errorf("encoding payment to send back: %w", err) } - if a.OnPayment != nil { - a.OnPayment(a, payment) - } return nil } @@ -416,11 +419,14 @@ func (a *Agent) handlePaymentResponse(m msg.Message, send *msg.Encoder) error { } paymentIn := *m.PaymentResponse - _, err := a.channel.ConfirmPayment(paymentIn) + payment, err := a.channel.ConfirmPayment(paymentIn) if err != nil { return fmt.Errorf("confirming payment: %w", err) } fmt.Fprintf(a.LogWriter, "payment authorized\n") + if a.OnPaymentSentAndConfirmed != nil { + a.OnPaymentSentAndConfirmed(a, payment) + } return nil } From d653992d4c213e7230fadf4ee69b097642a1eb26 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 22:24:11 +0000 Subject: [PATCH 19/30] remove defer --- sdk/agent/agent.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 91de843f..a409d742 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -403,10 +403,10 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("confirming payment: %w", err) } fmt.Fprintf(a.LogWriter, "payment authorized\n") + err = send.Encode(msg.Message{Type: msg.TypePaymentResponse, PaymentResponse: &payment}) if a.OnPaymentReceivedAndConfirmed != nil { - defer a.OnPaymentReceivedAndConfirmed(a, payment) + a.OnPaymentReceivedAndConfirmed(a, payment) } - err = send.Encode(msg.Message{Type: msg.TypePaymentResponse, PaymentResponse: &payment}) if err != nil { return fmt.Errorf("encoding payment to send back: %w", err) } From c514fab57b0065f68a8491a16e0c8c25a584e73a Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Tue, 24 Aug 2021 23:27:48 +0000 Subject: [PATCH 20/30] wip --- sdk/agent/agent_test.go | 135 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 sdk/agent/agent_test.go diff --git a/sdk/agent/agent_test.go b/sdk/agent/agent_test.go new file mode 100644 index 00000000..a988b275 --- /dev/null +++ b/sdk/agent/agent_test.go @@ -0,0 +1,135 @@ +package agent + +import ( + "strings" + "testing" + "time" + + "github.com/stellar/experimental-payment-channels/sdk/agent" + "github.com/stellar/experimental-payment-channels/sdk/state" + "github.com/stellar/experimental-payment-channels/sdk/txbuild" + "github.com/stellar/go/keypair" + "github.com/stellar/go/network" + "github.com/stellar/go/txnbuild" +) + +type sequenceNumberCollector func(accountID *keypair.FromAddress) (int64, error) + +func (f sequenceNumberCollector) GetSequenceNumber(accountID *keypair.FromAddress) (int64, error) { + return f(accountID) +} + +type balanceCollectorFunc func(accountID *keypair.FromAddress, asset state.Asset) (int64, error) + +func (f balanceCollectorFunc) GetBalance(accountID *keypair.FromAddress, asset state.Asset) (int64, error) { + return f(accountID, asset) +} + +type submitterFunc func(tx *txnbuild.Transaction) error + +func (f submitterFunc) SubmitTx(tx *txnbuild.Transaction) error { + return f(tx) +} + +func TestAgent_openPaymentClose(t *testing.T) { + localEscrow := keypair.MustRandom() + localSigner := keypair.MustRandom() + + remoteEscrow := keypair.MustRandom() + remoteSigner := keypair.MustRandom() + + // Fields that the test will write to when the agent triggers events or + // logs. + var ( + logs strings.Builder + submittedTx *txnbuild.Transaction + err error + connected, opened, closed bool + lastPaymentAgreement state.CloseAgreement + ) + + localAgent := &Agent{ + ObservationPeriodTime: 20 * time.Second, + ObservationPeriodLedgerGap: 1, + MaxOpenExpiry: 5 * time.Minute, + NetworkPassphrase: network.TestNetworkPassphrase, + SequenceNumberCollector: sequenceNumberCollector(func(accountID *keypair.FromAddress) (int64, error) { + return 1, nil + }), + BalanceCollector: balanceCollectorFunc(func(accountID *keypair.FromAddress, asset state.Asset) (int64, error) { + return 100_0000000, nil + }), + Submitter: submitterFunc(func(tx *txnbuild.Transaction) error { + submittedTx = tx + return nil + }), + EscrowAccountKey: localEscrow.FromAddress(), + EscrowAccountSigner: localSigner, + LogWriter: &logs, + OnError: func(a *agent.Agent, e error) { + err = e + }, + OnConnected: func(a *agent.Agent) { + connected = true + }, + OnOpened: func(a *agent.Agent) { + opened = true + }, + OnPaymentReceivedAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { + lastPaymentAgreement = ca + }, + OnPaymentSentAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { + lastPaymentAgreement = ca + }, + // TODO: Test when ingestion is added to agent. + // OnClosing: func(a *agent.Agent) { + // }, + OnClosed: func(a *agent.Agent) { + closed = true + }, + } + + remoteAgent := &Agent{ + ObservationPeriodTime: 20 * time.Second, + ObservationPeriodLedgerGap: 1, + MaxOpenExpiry: 5 * time.Minute, + NetworkPassphrase: network.TestNetworkPassphrase, + SequenceNumberCollector: sequenceNumberCollector(func(accountID *keypair.FromAddress) (int64, error) { + return 1, nil + }), + BalanceCollector: balanceCollectorFunc(func(accountID *keypair.FromAddress, asset state.Asset) (int64, error) { + return 100_0000000, nil + }), + Submitter: submitterFunc(func(tx *txnbuild.Transaction) error { + submittedTx = tx + return nil + }), + EscrowAccountKey: localEscrow.FromAddress(), + EscrowAccountSigner: localSigner, + LogWriter: &logs, + OnError: func(a *agent.Agent, e error) { + err = e + }, + OnConnected: func(a *agent.Agent) { + connected = true + }, + OnOpened: func(a *agent.Agent) { + opened = true + }, + OnPaymentReceivedAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { + lastPaymentAgreement = ca + }, + OnPaymentSentAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { + lastPaymentAgreement = ca + }, + // TODO: Test when ingestion is added to agent. + // OnClosing: func(a *agent.Agent) { + // }, + OnClosed: func(a *agent.Agent) { + closed = true + }, + } + + // TODO + +} From 7a03c0e8ed17d4752d539ac6be31cd1b0e1e4f6d Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 26 Aug 2021 00:23:16 +0000 Subject: [PATCH 21/30] wip --- sdk/agent/agent.go | 38 +++++++--- sdk/agent/agent_test.go | 164 ++++++++++++++++++++++++++++------------ sdk/agent/tcp.go | 4 +- 3 files changed, 145 insertions(+), 61 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index a409d742..0a9339d6 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -168,7 +168,17 @@ func (a *Agent) Payment(paymentAmount string) error { return fmt.Errorf("parsing amount %s: %w", paymentAmount, err) } ca, err := a.channel.ProposePayment(amountValue) - if err != nil { + if errors.Is(err, state.ErrUnderfunded) { + // TODO: Remove this logic once the agent is ingesting transactions and + // updating account balance that way. + fmt.Fprintf(a.LogWriter, "local is underfunded for this payment based on cached account balances, checking escrow account...\n") + var balance int64 + balance, err = a.BalanceCollector.GetBalance(a.channel.LocalEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset) + if err != nil { + return err + } + a.channel.UpdateLocalEscrowAccountBalance(balance) + } else if err != nil { return fmt.Errorf("proposing payment %d: %w", amountValue, err) } enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) @@ -253,20 +263,26 @@ func (a *Agent) Close() error { return nil } -func (a *Agent) loop() { - var err error +func (a *Agent) receive() error { recv := msg.NewDecoder(io.TeeReader(a.conn, a.LogWriter)) send := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) + m := msg.Message{} + err := recv.Decode(&m) + if err != nil { + return fmt.Errorf("reading and decoding: %v\n", err) + } + err = a.handle(m, send) + if err != nil { + return fmt.Errorf("handling message: %v\n", err) + } + return nil +} + +func (a *Agent) receiveLoop() { for { - m := msg.Message{} - err = recv.Decode(&m) - if err != nil { - fmt.Fprintf(a.LogWriter, "error reading: %v\n", err) - break - } - err = a.handle(m, send) + err := a.receive() if err != nil { - fmt.Fprintf(a.LogWriter, "error handling message: %v\n", err) + fmt.Fprintf(a.LogWriter, "error receiving: %v\n", err) } } } diff --git a/sdk/agent/agent_test.go b/sdk/agent/agent_test.go index a988b275..7d1e0a7e 100644 --- a/sdk/agent/agent_test.go +++ b/sdk/agent/agent_test.go @@ -1,16 +1,18 @@ package agent import ( + "bytes" + "io" "strings" "testing" "time" - "github.com/stellar/experimental-payment-channels/sdk/agent" "github.com/stellar/experimental-payment-channels/sdk/state" - "github.com/stellar/experimental-payment-channels/sdk/txbuild" "github.com/stellar/go/keypair" "github.com/stellar/go/network" "github.com/stellar/go/txnbuild" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) type sequenceNumberCollector func(accountID *keypair.FromAddress) (int64, error) @@ -34,20 +36,19 @@ func (f submitterFunc) SubmitTx(tx *txnbuild.Transaction) error { func TestAgent_openPaymentClose(t *testing.T) { localEscrow := keypair.MustRandom() localSigner := keypair.MustRandom() - remoteEscrow := keypair.MustRandom() remoteSigner := keypair.MustRandom() - // Fields that the test will write to when the agent triggers events or - // logs. - var ( - logs strings.Builder - submittedTx *txnbuild.Transaction - err error - connected, opened, closed bool - lastPaymentAgreement state.CloseAgreement - ) - + // Setup the local agent. + localVars := struct { + logs strings.Builder + submittedTx *txnbuild.Transaction + err error + connected bool + opened bool + closed bool + lastPaymentAgreement state.CloseAgreement + }{} localAgent := &Agent{ ObservationPeriodTime: 20 * time.Second, ObservationPeriodLedgerGap: 1, @@ -60,35 +61,45 @@ func TestAgent_openPaymentClose(t *testing.T) { return 100_0000000, nil }), Submitter: submitterFunc(func(tx *txnbuild.Transaction) error { - submittedTx = tx + localVars.submittedTx = tx return nil }), EscrowAccountKey: localEscrow.FromAddress(), EscrowAccountSigner: localSigner, - LogWriter: &logs, - OnError: func(a *agent.Agent, e error) { - err = e + LogWriter: &localVars.logs, + OnError: func(a *Agent, err error) { + localVars.err = err }, - OnConnected: func(a *agent.Agent) { - connected = true + OnConnected: func(a *Agent) { + localVars.connected = true }, - OnOpened: func(a *agent.Agent) { - opened = true + OnOpened: func(a *Agent) { + localVars.opened = true }, - OnPaymentReceivedAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { - lastPaymentAgreement = ca + OnPaymentReceivedAndConfirmed: func(a *Agent, ca state.CloseAgreement) { + localVars.lastPaymentAgreement = ca }, - OnPaymentSentAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { - lastPaymentAgreement = ca + OnPaymentSentAndConfirmed: func(a *Agent, ca state.CloseAgreement) { + localVars.lastPaymentAgreement = ca }, - // TODO: Test when ingestion is added to agent. - // OnClosing: func(a *agent.Agent) { + // TODO: Test when ingestion is added to + // OnClosing: func(a *Agent) { // }, - OnClosed: func(a *agent.Agent) { - closed = true + OnClosed: func(a *Agent) { + localVars.closed = true }, } + // Setup the remote agent. + remoteVars := struct { + logs strings.Builder + submittedTx *txnbuild.Transaction + err error + connected bool + opened bool + closed bool + lastPaymentAgreement state.CloseAgreement + }{} remoteAgent := &Agent{ ObservationPeriodTime: 20 * time.Second, ObservationPeriodLedgerGap: 1, @@ -101,35 +112,92 @@ func TestAgent_openPaymentClose(t *testing.T) { return 100_0000000, nil }), Submitter: submitterFunc(func(tx *txnbuild.Transaction) error { - submittedTx = tx + remoteVars.submittedTx = tx return nil }), - EscrowAccountKey: localEscrow.FromAddress(), - EscrowAccountSigner: localSigner, - LogWriter: &logs, - OnError: func(a *agent.Agent, e error) { - err = e + EscrowAccountKey: remoteEscrow.FromAddress(), + EscrowAccountSigner: remoteSigner, + LogWriter: &remoteVars.logs, + OnError: func(a *Agent, err error) { + remoteVars.err = err }, - OnConnected: func(a *agent.Agent) { - connected = true + OnConnected: func(a *Agent) { + remoteVars.connected = true }, - OnOpened: func(a *agent.Agent) { - opened = true + OnOpened: func(a *Agent) { + remoteVars.opened = true }, - OnPaymentReceivedAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { - lastPaymentAgreement = ca + OnPaymentReceivedAndConfirmed: func(a *Agent, ca state.CloseAgreement) { + remoteVars.lastPaymentAgreement = ca }, - OnPaymentSentAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { - lastPaymentAgreement = ca + OnPaymentSentAndConfirmed: func(a *Agent, ca state.CloseAgreement) { + remoteVars.lastPaymentAgreement = ca }, - // TODO: Test when ingestion is added to agent. - // OnClosing: func(a *agent.Agent) { + // TODO: Test when ingestion is added to + // OnClosing: func(a *Agent) { // }, - OnClosed: func(a *agent.Agent) { - closed = true + OnClosed: func(a *Agent) { + remoteVars.closed = true }, } - // TODO + type ReadWriter struct { + io.Reader + io.Writer + } + + // Connect the two agents. + localMsgs := bytes.Buffer{} + remoteMsgs := bytes.Buffer{} + localAgent.conn = ReadWriter{ + Reader: &remoteMsgs, + Writer: &localMsgs, + } + remoteAgent.conn = ReadWriter{ + Reader: &localMsgs, + Writer: &remoteMsgs, + } + err := localAgent.hello() + require.NoError(t, err) + err = remoteAgent.receive() + require.NoError(t, err) + err = remoteAgent.hello() + require.NoError(t, err) + err = localAgent.receive() + require.NoError(t, err) + + // Open the channel. + err = localAgent.Open() + require.NoError(t, err) + err = remoteAgent.receive() + require.NoError(t, err) + err = localAgent.receive() + require.NoError(t, err) + + // Make a payment. + err = localAgent.Payment("50.0") + require.NoError(t, err) + err = remoteAgent.receive() + require.NoError(t, err) + err = localAgent.receive() + require.NoError(t, err) + + // Make another payment. + err = remoteAgent.Payment("20.0") + require.NoError(t, err) + err = localAgent.receive() + require.NoError(t, err) + err = remoteAgent.receive() + require.NoError(t, err) + + // Close. + err = localAgent.DeclareClose() + require.NoError(t, err) + err = remoteAgent.receive() + require.NoError(t, err) + err = localAgent.receive() + require.NoError(t, err) + assert.NotNil(t, localVars.submittedTx) + assert.NotNil(t, remoteVars.submittedTx) } diff --git a/sdk/agent/tcp.go b/sdk/agent/tcp.go index 34ed07bf..73daf66f 100644 --- a/sdk/agent/tcp.go +++ b/sdk/agent/tcp.go @@ -23,7 +23,7 @@ func (a *Agent) ServeTCP(addr string) error { if err != nil { return fmt.Errorf("sending hello: %w", err) } - go a.loop() + go a.receiveLoop() return nil } @@ -42,6 +42,6 @@ func (a *Agent) ConnectTCP(addr string) error { if err != nil { return fmt.Errorf("sending hello: %w", err) } - go a.loop() + go a.receiveLoop() return nil } From dd5548217c763254af71b83204ef86204a18ec56 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 26 Aug 2021 04:13:51 +0000 Subject: [PATCH 22/30] remove todos --- sdk/agent/agent.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 0a9339d6..3c5bd3ea 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -169,8 +169,6 @@ func (a *Agent) Payment(paymentAmount string) error { } ca, err := a.channel.ProposePayment(amountValue) if errors.Is(err, state.ErrUnderfunded) { - // TODO: Remove this logic once the agent is ingesting transactions and - // updating account balance that way. fmt.Fprintf(a.LogWriter, "local is underfunded for this payment based on cached account balances, checking escrow account...\n") var balance int64 balance, err = a.BalanceCollector.GetBalance(a.channel.LocalEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset) @@ -404,8 +402,6 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { paymentIn := *m.PaymentRequest payment, err := a.channel.ConfirmPayment(paymentIn) if errors.Is(err, state.ErrUnderfunded) { - // TODO: Remove this logic once the agent is ingesting transactions and - // updating account balance that way. fmt.Fprintf(a.LogWriter, "remote is underfunded for this payment based on cached account balances, checking their escrow account...\n") var balance int64 balance, err = a.BalanceCollector.GetBalance(a.channel.RemoteEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset) From 9388e31c67cf95f5a141d420cc7c7c2e1a5c3eab Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 26 Aug 2021 06:15:01 +0000 Subject: [PATCH 23/30] test passing --- sdk/agent/agent.go | 4 ++- sdk/agent/agent_test.go | 61 ++++++++++++++++++++++++++++++++++------- 2 files changed, 54 insertions(+), 11 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 3c5bd3ea..1d163622 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -176,7 +176,9 @@ func (a *Agent) Payment(paymentAmount string) error { return err } a.channel.UpdateLocalEscrowAccountBalance(balance) - } else if err != nil { + ca, err = a.channel.ProposePayment(amountValue) + } + if err != nil { return fmt.Errorf("proposing payment %d: %w", amountValue, err) } enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) diff --git a/sdk/agent/agent_test.go b/sdk/agent/agent_test.go index 7d1e0a7e..d6140038 100644 --- a/sdk/agent/agent_test.go +++ b/sdk/agent/agent_test.go @@ -3,7 +3,6 @@ package agent import ( "bytes" "io" - "strings" "testing" "time" @@ -41,7 +40,6 @@ func TestAgent_openPaymentClose(t *testing.T) { // Setup the local agent. localVars := struct { - logs strings.Builder submittedTx *txnbuild.Transaction err error connected bool @@ -66,7 +64,6 @@ func TestAgent_openPaymentClose(t *testing.T) { }), EscrowAccountKey: localEscrow.FromAddress(), EscrowAccountSigner: localSigner, - LogWriter: &localVars.logs, OnError: func(a *Agent, err error) { localVars.err = err }, @@ -92,7 +89,6 @@ func TestAgent_openPaymentClose(t *testing.T) { // Setup the remote agent. remoteVars := struct { - logs strings.Builder submittedTx *txnbuild.Transaction err error connected bool @@ -117,7 +113,6 @@ func TestAgent_openPaymentClose(t *testing.T) { }), EscrowAccountKey: remoteEscrow.FromAddress(), EscrowAccountSigner: remoteSigner, - LogWriter: &remoteVars.logs, OnError: func(a *Agent, err error) { remoteVars.err = err }, @@ -141,12 +136,11 @@ func TestAgent_openPaymentClose(t *testing.T) { }, } + // Connect the two agents. type ReadWriter struct { io.Reader io.Writer } - - // Connect the two agents. localMsgs := bytes.Buffer{} remoteMsgs := bytes.Buffer{} localAgent.conn = ReadWriter{ @@ -166,6 +160,10 @@ func TestAgent_openPaymentClose(t *testing.T) { err = localAgent.receive() require.NoError(t, err) + // Expect connected event. + assert.True(t, localVars.connected) + assert.True(t, remoteVars.connected) + // Open the channel. err = localAgent.Open() require.NoError(t, err) @@ -174,6 +172,16 @@ func TestAgent_openPaymentClose(t *testing.T) { err = localAgent.receive() require.NoError(t, err) + // Expect opened event. + assert.True(t, localVars.opened) + assert.True(t, remoteVars.opened) + + // Expect the open tx to have been submitted. + openTx, err := localAgent.channel.OpenTx() + require.NoError(t, err) + assert.Equal(t, openTx, localVars.submittedTx) + localVars.submittedTx = nil + // Make a payment. err = localAgent.Payment("50.0") require.NoError(t, err) @@ -182,6 +190,12 @@ func TestAgent_openPaymentClose(t *testing.T) { err = localAgent.receive() require.NoError(t, err) + // Expect payment events. + assert.Equal(t, int64(2), localVars.lastPaymentAgreement.Details.IterationNumber) + assert.Equal(t, int64(50_0000000), localVars.lastPaymentAgreement.Details.Balance) + assert.Equal(t, int64(2), remoteVars.lastPaymentAgreement.Details.IterationNumber) + assert.Equal(t, int64(50_0000000), remoteVars.lastPaymentAgreement.Details.Balance) + // Make another payment. err = remoteAgent.Payment("20.0") require.NoError(t, err) @@ -190,14 +204,41 @@ func TestAgent_openPaymentClose(t *testing.T) { err = remoteAgent.receive() require.NoError(t, err) - // Close. + // Expect payment events. + assert.Equal(t, int64(3), localVars.lastPaymentAgreement.Details.IterationNumber) + assert.Equal(t, int64(30_0000000), localVars.lastPaymentAgreement.Details.Balance) + assert.Equal(t, int64(3), remoteVars.lastPaymentAgreement.Details.IterationNumber) + assert.Equal(t, int64(30_0000000), remoteVars.lastPaymentAgreement.Details.Balance) + + // Expect no txs to have been submitted for payments. + assert.Nil(t, localVars.submittedTx) + assert.Nil(t, remoteVars.submittedTx) + + // Declare the close, and start negotiating for an early close. err = localAgent.DeclareClose() require.NoError(t, err) + + // Expect the declaration tx to have been submitted. + localDeclTx, _, err := localAgent.channel.CloseTxs() + require.NoError(t, err) + assert.Equal(t, localDeclTx, localVars.submittedTx) + + // Receive the declaration at the remote and complete negotiation. err = remoteAgent.receive() require.NoError(t, err) err = localAgent.receive() require.NoError(t, err) - assert.NotNil(t, localVars.submittedTx) - assert.NotNil(t, remoteVars.submittedTx) + // Expect the close tx to have been submitted. + _, localCloseTx, err := localAgent.channel.CloseTxs() + require.NoError(t, err) + _, remoteCloseTx, err := remoteAgent.channel.CloseTxs() + require.NoError(t, err) + assert.Equal(t, localCloseTx, remoteCloseTx) + assert.Equal(t, localCloseTx, localVars.submittedTx) + assert.Equal(t, remoteCloseTx, remoteVars.submittedTx) + + // Expect closed event. + assert.True(t, localVars.closed) + assert.True(t, remoteVars.closed) } From f8743b4519f81e01e3f51e1fe874a233d15da533 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 26 Aug 2021 06:17:19 +0000 Subject: [PATCH 24/30] logger resilience --- sdk/agent/agent.go | 64 +++++++++++++++++++++++++++------------------- 1 file changed, 37 insertions(+), 27 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 1d163622..9710d5c6 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -65,6 +65,16 @@ type Agent struct { channel *state.Channel } +// logWriter returns a writer that can be used for logging regardles of whether +// there is a LogWriter defined for the agent. If none is defined for the agent +// logs are discarded. +func (a *Agent) logWriter() io.Writer { + if a.LogWriter == nil { + return io.Discard + } + return a.LogWriter +} + // Channel returns the channel the agent is managing. The channel will be nil if // the agent has not established a connection or coordinated a channel with // another participant. @@ -74,7 +84,7 @@ func (a *Agent) Channel() *state.Channel { // hello sends a hello message to the remote participant over the connection. func (a *Agent) hello() error { - enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) + enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter())) err := enc.Encode(msg.Message{ Type: msg.TypeHello, Hello: &msg.Hello{ @@ -140,7 +150,7 @@ func (a *Agent) Open() error { if err != nil { return fmt.Errorf("proposing open: %w", err) } - enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) + enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter())) err = enc.Encode(msg.Message{ Type: msg.TypeOpenRequest, OpenRequest: &open, @@ -169,7 +179,7 @@ func (a *Agent) Payment(paymentAmount string) error { } ca, err := a.channel.ProposePayment(amountValue) if errors.Is(err, state.ErrUnderfunded) { - fmt.Fprintf(a.LogWriter, "local is underfunded for this payment based on cached account balances, checking escrow account...\n") + fmt.Fprintf(a.logWriter(), "local is underfunded for this payment based on cached account balances, checking escrow account...\n") var balance int64 balance, err = a.BalanceCollector.GetBalance(a.channel.LocalEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset) if err != nil { @@ -181,7 +191,7 @@ func (a *Agent) Payment(paymentAmount string) error { if err != nil { return fmt.Errorf("proposing payment %d: %w", amountValue, err) } - enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) + enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter())) err = enc.Encode(msg.Message{ Type: msg.TypePaymentRequest, PaymentRequest: &ca, @@ -215,19 +225,19 @@ func (a *Agent) DeclareClose() error { if err != nil { return fmt.Errorf("hashing decl tx: %w", err) } - fmt.Fprintln(a.LogWriter, "submitting declaration:", declHash) + fmt.Fprintln(a.logWriter(), "submitting declaration:", declHash) err = a.Submitter.SubmitTx(declTx) if err != nil { return fmt.Errorf("submitting declaration tx: %w", err) } // Attempt revising the close agreement to close early. - fmt.Fprintln(a.LogWriter, "proposing a revised close for immediate submission") + fmt.Fprintln(a.logWriter(), "proposing a revised close for immediate submission") ca, err := a.channel.ProposeClose() if err != nil { return fmt.Errorf("proposing the close: %w", err) } - enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) + enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter())) err = enc.Encode(msg.Message{ Type: msg.TypeCloseRequest, CloseRequest: &ca, @@ -253,19 +263,19 @@ func (a *Agent) Close() error { if err != nil { return fmt.Errorf("hashing close tx: %w", err) } - fmt.Fprintln(a.LogWriter, "submitting close tx:", closeHash) + fmt.Fprintln(a.logWriter(), "submitting close tx:", closeHash) err = a.Submitter.SubmitTx(closeTx) if err != nil { - fmt.Fprintln(a.LogWriter, "error submitting close tx:", closeHash, ",", err) + fmt.Fprintln(a.logWriter(), "error submitting close tx:", closeHash, ",", err) return fmt.Errorf("submitting close tx %s: %w", closeHash, err) } - fmt.Fprintln(a.LogWriter, "submitted close tx:", closeHash) + fmt.Fprintln(a.logWriter(), "submitted close tx:", closeHash) return nil } func (a *Agent) receive() error { - recv := msg.NewDecoder(io.TeeReader(a.conn, a.LogWriter)) - send := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) + recv := msg.NewDecoder(io.TeeReader(a.conn, a.logWriter())) + send := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter())) m := msg.Message{} err := recv.Decode(&m) if err != nil { @@ -282,13 +292,13 @@ func (a *Agent) receiveLoop() { for { err := a.receive() if err != nil { - fmt.Fprintf(a.LogWriter, "error receiving: %v\n", err) + fmt.Fprintf(a.logWriter(), "error receiving: %v\n", err) } } } func (a *Agent) handle(m msg.Message, send *msg.Encoder) error { - fmt.Fprintf(a.LogWriter, "handling %v\n", m.Type) + fmt.Fprintf(a.logWriter(), "handling %v\n", m.Type) handler := handlerMap[m.Type] if handler == nil { err := fmt.Errorf("handling message %d: unrecognized message type", m.Type) @@ -328,8 +338,8 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { a.otherEscrowAccount = &h.EscrowAccount a.otherEscrowAccountSigner = &h.Signer - fmt.Fprintf(a.LogWriter, "other's escrow account: %v\n", a.otherEscrowAccount.Address()) - fmt.Fprintf(a.LogWriter, "other's signer: %v\n", a.otherEscrowAccountSigner.Address()) + fmt.Fprintf(a.logWriter(), "other's escrow account: %v\n", a.otherEscrowAccount.Address()) + fmt.Fprintf(a.logWriter(), "other's signer: %v\n", a.otherEscrowAccountSigner.Address()) if a.OnConnected != nil { a.OnConnected(a) @@ -349,7 +359,7 @@ func (a *Agent) handleOpenRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("confirming open: %w", err) } - fmt.Fprintf(a.LogWriter, "open authorized\n") + fmt.Fprintf(a.logWriter(), "open authorized\n") err = send.Encode(msg.Message{ Type: msg.TypeOpenResponse, OpenResponse: &open, @@ -379,7 +389,7 @@ func (a *Agent) handleOpenResponse(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("confirming open: %w", err) } - fmt.Fprintf(a.LogWriter, "open authorized\n") + fmt.Fprintf(a.logWriter(), "open authorized\n") formationTx, err := a.channel.OpenTx() if err != nil { return fmt.Errorf("building formation tx: %w", err) @@ -404,7 +414,7 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { paymentIn := *m.PaymentRequest payment, err := a.channel.ConfirmPayment(paymentIn) if errors.Is(err, state.ErrUnderfunded) { - fmt.Fprintf(a.LogWriter, "remote is underfunded for this payment based on cached account balances, checking their escrow account...\n") + fmt.Fprintf(a.logWriter(), "remote is underfunded for this payment based on cached account balances, checking their escrow account...\n") var balance int64 balance, err = a.BalanceCollector.GetBalance(a.channel.RemoteEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset) if err != nil { @@ -416,7 +426,7 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("confirming payment: %w", err) } - fmt.Fprintf(a.LogWriter, "payment authorized\n") + fmt.Fprintf(a.logWriter(), "payment authorized\n") err = send.Encode(msg.Message{Type: msg.TypePaymentResponse, PaymentResponse: &payment}) if a.OnPaymentReceivedAndConfirmed != nil { a.OnPaymentReceivedAndConfirmed(a, payment) @@ -437,7 +447,7 @@ func (a *Agent) handlePaymentResponse(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("confirming payment: %w", err) } - fmt.Fprintf(a.LogWriter, "payment authorized\n") + fmt.Fprintf(a.logWriter(), "payment authorized\n") if a.OnPaymentSentAndConfirmed != nil { a.OnPaymentSentAndConfirmed(a, payment) } @@ -462,7 +472,7 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("encoding close to send back: %v\n", err) } - fmt.Fprintln(a.LogWriter, "close ready") + fmt.Fprintln(a.logWriter(), "close ready") // Submit the close immediately since it is valid immediately. _, closeTx, err := a.channel.CloseTxs() @@ -473,12 +483,12 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("hashing close tx: %w", err) } - fmt.Fprintln(a.LogWriter, "submitting close", hash) + fmt.Fprintln(a.logWriter(), "submitting close", hash) err = a.Submitter.SubmitTx(closeTx) if err != nil { return fmt.Errorf("submitting close tx: %w", err) } - fmt.Fprintln(a.LogWriter, "close successful") + fmt.Fprintln(a.logWriter(), "close successful") // TODO: Move the triggering of this event handler to wherever we end up // ingesting transactions, and trigger it after the channel becomes closed. if a.OnClosed != nil { @@ -498,7 +508,7 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("confirming close: %v\n", err) } - fmt.Fprintln(a.LogWriter, "close ready") + fmt.Fprintln(a.logWriter(), "close ready") // Submit the close immediately since it is valid immediately. _, closeTx, err := a.channel.CloseTxs() @@ -509,12 +519,12 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("hashing close tx: %w", err) } - fmt.Fprintln(a.LogWriter, "submitting close", hash) + fmt.Fprintln(a.logWriter(), "submitting close", hash) err = a.Submitter.SubmitTx(closeTx) if err != nil { return fmt.Errorf("submitting close tx: %w", err) } - fmt.Fprintln(a.LogWriter, "close successful") + fmt.Fprintln(a.logWriter(), "close successful") // TODO: Move the triggering of this event handler to wherever we end up // ingesting transactions, and trigger it after the channel becomes closed. if a.OnClosed != nil { From 6c2750b48bc84246615f2cb39ccd79090b5c5ec9 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 26 Aug 2021 06:19:49 +0000 Subject: [PATCH 25/30] Revert "logger resilience" This reverts commit f8743b4519f81e01e3f51e1fe874a233d15da533. --- sdk/agent/agent.go | 64 +++++++++++++++++++--------------------------- 1 file changed, 27 insertions(+), 37 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 9710d5c6..1d163622 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -65,16 +65,6 @@ type Agent struct { channel *state.Channel } -// logWriter returns a writer that can be used for logging regardles of whether -// there is a LogWriter defined for the agent. If none is defined for the agent -// logs are discarded. -func (a *Agent) logWriter() io.Writer { - if a.LogWriter == nil { - return io.Discard - } - return a.LogWriter -} - // Channel returns the channel the agent is managing. The channel will be nil if // the agent has not established a connection or coordinated a channel with // another participant. @@ -84,7 +74,7 @@ func (a *Agent) Channel() *state.Channel { // hello sends a hello message to the remote participant over the connection. func (a *Agent) hello() error { - enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter())) + enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) err := enc.Encode(msg.Message{ Type: msg.TypeHello, Hello: &msg.Hello{ @@ -150,7 +140,7 @@ func (a *Agent) Open() error { if err != nil { return fmt.Errorf("proposing open: %w", err) } - enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter())) + enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) err = enc.Encode(msg.Message{ Type: msg.TypeOpenRequest, OpenRequest: &open, @@ -179,7 +169,7 @@ func (a *Agent) Payment(paymentAmount string) error { } ca, err := a.channel.ProposePayment(amountValue) if errors.Is(err, state.ErrUnderfunded) { - fmt.Fprintf(a.logWriter(), "local is underfunded for this payment based on cached account balances, checking escrow account...\n") + fmt.Fprintf(a.LogWriter, "local is underfunded for this payment based on cached account balances, checking escrow account...\n") var balance int64 balance, err = a.BalanceCollector.GetBalance(a.channel.LocalEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset) if err != nil { @@ -191,7 +181,7 @@ func (a *Agent) Payment(paymentAmount string) error { if err != nil { return fmt.Errorf("proposing payment %d: %w", amountValue, err) } - enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter())) + enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) err = enc.Encode(msg.Message{ Type: msg.TypePaymentRequest, PaymentRequest: &ca, @@ -225,19 +215,19 @@ func (a *Agent) DeclareClose() error { if err != nil { return fmt.Errorf("hashing decl tx: %w", err) } - fmt.Fprintln(a.logWriter(), "submitting declaration:", declHash) + fmt.Fprintln(a.LogWriter, "submitting declaration:", declHash) err = a.Submitter.SubmitTx(declTx) if err != nil { return fmt.Errorf("submitting declaration tx: %w", err) } // Attempt revising the close agreement to close early. - fmt.Fprintln(a.logWriter(), "proposing a revised close for immediate submission") + fmt.Fprintln(a.LogWriter, "proposing a revised close for immediate submission") ca, err := a.channel.ProposeClose() if err != nil { return fmt.Errorf("proposing the close: %w", err) } - enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter())) + enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) err = enc.Encode(msg.Message{ Type: msg.TypeCloseRequest, CloseRequest: &ca, @@ -263,19 +253,19 @@ func (a *Agent) Close() error { if err != nil { return fmt.Errorf("hashing close tx: %w", err) } - fmt.Fprintln(a.logWriter(), "submitting close tx:", closeHash) + fmt.Fprintln(a.LogWriter, "submitting close tx:", closeHash) err = a.Submitter.SubmitTx(closeTx) if err != nil { - fmt.Fprintln(a.logWriter(), "error submitting close tx:", closeHash, ",", err) + fmt.Fprintln(a.LogWriter, "error submitting close tx:", closeHash, ",", err) return fmt.Errorf("submitting close tx %s: %w", closeHash, err) } - fmt.Fprintln(a.logWriter(), "submitted close tx:", closeHash) + fmt.Fprintln(a.LogWriter, "submitted close tx:", closeHash) return nil } func (a *Agent) receive() error { - recv := msg.NewDecoder(io.TeeReader(a.conn, a.logWriter())) - send := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter())) + recv := msg.NewDecoder(io.TeeReader(a.conn, a.LogWriter)) + send := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter)) m := msg.Message{} err := recv.Decode(&m) if err != nil { @@ -292,13 +282,13 @@ func (a *Agent) receiveLoop() { for { err := a.receive() if err != nil { - fmt.Fprintf(a.logWriter(), "error receiving: %v\n", err) + fmt.Fprintf(a.LogWriter, "error receiving: %v\n", err) } } } func (a *Agent) handle(m msg.Message, send *msg.Encoder) error { - fmt.Fprintf(a.logWriter(), "handling %v\n", m.Type) + fmt.Fprintf(a.LogWriter, "handling %v\n", m.Type) handler := handlerMap[m.Type] if handler == nil { err := fmt.Errorf("handling message %d: unrecognized message type", m.Type) @@ -338,8 +328,8 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { a.otherEscrowAccount = &h.EscrowAccount a.otherEscrowAccountSigner = &h.Signer - fmt.Fprintf(a.logWriter(), "other's escrow account: %v\n", a.otherEscrowAccount.Address()) - fmt.Fprintf(a.logWriter(), "other's signer: %v\n", a.otherEscrowAccountSigner.Address()) + fmt.Fprintf(a.LogWriter, "other's escrow account: %v\n", a.otherEscrowAccount.Address()) + fmt.Fprintf(a.LogWriter, "other's signer: %v\n", a.otherEscrowAccountSigner.Address()) if a.OnConnected != nil { a.OnConnected(a) @@ -359,7 +349,7 @@ func (a *Agent) handleOpenRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("confirming open: %w", err) } - fmt.Fprintf(a.logWriter(), "open authorized\n") + fmt.Fprintf(a.LogWriter, "open authorized\n") err = send.Encode(msg.Message{ Type: msg.TypeOpenResponse, OpenResponse: &open, @@ -389,7 +379,7 @@ func (a *Agent) handleOpenResponse(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("confirming open: %w", err) } - fmt.Fprintf(a.logWriter(), "open authorized\n") + fmt.Fprintf(a.LogWriter, "open authorized\n") formationTx, err := a.channel.OpenTx() if err != nil { return fmt.Errorf("building formation tx: %w", err) @@ -414,7 +404,7 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { paymentIn := *m.PaymentRequest payment, err := a.channel.ConfirmPayment(paymentIn) if errors.Is(err, state.ErrUnderfunded) { - fmt.Fprintf(a.logWriter(), "remote is underfunded for this payment based on cached account balances, checking their escrow account...\n") + fmt.Fprintf(a.LogWriter, "remote is underfunded for this payment based on cached account balances, checking their escrow account...\n") var balance int64 balance, err = a.BalanceCollector.GetBalance(a.channel.RemoteEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset) if err != nil { @@ -426,7 +416,7 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("confirming payment: %w", err) } - fmt.Fprintf(a.logWriter(), "payment authorized\n") + fmt.Fprintf(a.LogWriter, "payment authorized\n") err = send.Encode(msg.Message{Type: msg.TypePaymentResponse, PaymentResponse: &payment}) if a.OnPaymentReceivedAndConfirmed != nil { a.OnPaymentReceivedAndConfirmed(a, payment) @@ -447,7 +437,7 @@ func (a *Agent) handlePaymentResponse(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("confirming payment: %w", err) } - fmt.Fprintf(a.logWriter(), "payment authorized\n") + fmt.Fprintf(a.LogWriter, "payment authorized\n") if a.OnPaymentSentAndConfirmed != nil { a.OnPaymentSentAndConfirmed(a, payment) } @@ -472,7 +462,7 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("encoding close to send back: %v\n", err) } - fmt.Fprintln(a.logWriter(), "close ready") + fmt.Fprintln(a.LogWriter, "close ready") // Submit the close immediately since it is valid immediately. _, closeTx, err := a.channel.CloseTxs() @@ -483,12 +473,12 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("hashing close tx: %w", err) } - fmt.Fprintln(a.logWriter(), "submitting close", hash) + fmt.Fprintln(a.LogWriter, "submitting close", hash) err = a.Submitter.SubmitTx(closeTx) if err != nil { return fmt.Errorf("submitting close tx: %w", err) } - fmt.Fprintln(a.logWriter(), "close successful") + fmt.Fprintln(a.LogWriter, "close successful") // TODO: Move the triggering of this event handler to wherever we end up // ingesting transactions, and trigger it after the channel becomes closed. if a.OnClosed != nil { @@ -508,7 +498,7 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("confirming close: %v\n", err) } - fmt.Fprintln(a.logWriter(), "close ready") + fmt.Fprintln(a.LogWriter, "close ready") // Submit the close immediately since it is valid immediately. _, closeTx, err := a.channel.CloseTxs() @@ -519,12 +509,12 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("hashing close tx: %w", err) } - fmt.Fprintln(a.logWriter(), "submitting close", hash) + fmt.Fprintln(a.LogWriter, "submitting close", hash) err = a.Submitter.SubmitTx(closeTx) if err != nil { return fmt.Errorf("submitting close tx: %w", err) } - fmt.Fprintln(a.logWriter(), "close successful") + fmt.Fprintln(a.LogWriter, "close successful") // TODO: Move the triggering of this event handler to wherever we end up // ingesting transactions, and trigger it after the channel becomes closed. if a.OnClosed != nil { From e9f9c38a311ddbe81847e0fa1cf8f99ebd8560af Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 26 Aug 2021 06:20:42 +0000 Subject: [PATCH 26/30] ignore logs --- sdk/agent/agent_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/sdk/agent/agent_test.go b/sdk/agent/agent_test.go index d6140038..73a6eaa6 100644 --- a/sdk/agent/agent_test.go +++ b/sdk/agent/agent_test.go @@ -64,6 +64,7 @@ func TestAgent_openPaymentClose(t *testing.T) { }), EscrowAccountKey: localEscrow.FromAddress(), EscrowAccountSigner: localSigner, + LogWriter: io.Discard, OnError: func(a *Agent, err error) { localVars.err = err }, @@ -113,6 +114,7 @@ func TestAgent_openPaymentClose(t *testing.T) { }), EscrowAccountKey: remoteEscrow.FromAddress(), EscrowAccountSigner: remoteSigner, + LogWriter: io.Discard, OnError: func(a *Agent, err error) { remoteVars.err = err }, From 1d73c0e837f4dad2791dd29e1cb908e30c8fdc8c Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 26 Aug 2021 06:25:01 +0000 Subject: [PATCH 27/30] sdk/agent: repair failing payment due to stale balance data --- sdk/agent/agent.go | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index f2e2ff34..cc78e03f 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -159,6 +159,16 @@ func (a *Agent) Payment(paymentAmount string) error { return fmt.Errorf("parsing amount %s: %w", paymentAmount, err) } ca, err := a.channel.ProposePayment(amountValue) + if errors.Is(err, state.ErrUnderfunded) { + fmt.Fprintf(a.LogWriter, "local is underfunded for this payment based on cached account balances, checking escrow account...\n") + var balance int64 + balance, err = a.BalanceCollector.GetBalance(a.channel.LocalEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset) + if err != nil { + return err + } + a.channel.UpdateLocalEscrowAccountBalance(balance) + ca, err = a.channel.ProposePayment(amountValue) + } if err != nil { return fmt.Errorf("proposing payment %d: %w", amountValue, err) } @@ -353,8 +363,6 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { paymentIn := *m.PaymentRequest payment, err := a.channel.ConfirmPayment(paymentIn) if errors.Is(err, state.ErrUnderfunded) { - // TODO: Remove this logic once the agent is ingesting transactions and - // updating account balance that way. fmt.Fprintf(a.LogWriter, "remote is underfunded for this payment based on cached account balances, checking their escrow account...\n") var balance int64 balance, err = a.BalanceCollector.GetBalance(a.channel.RemoteEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset) From 19d17c710280013d41749367e935b0013d7b6ae6 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 26 Aug 2021 06:35:03 +0000 Subject: [PATCH 28/30] strip event handlers from change --- sdk/agent/agent.go | 57 ++----------------------------- sdk/agent/agent_test.go | 76 ----------------------------------------- 2 files changed, 3 insertions(+), 130 deletions(-) diff --git a/sdk/agent/agent.go b/sdk/agent/agent.go index 1d163622..33a11b2f 100644 --- a/sdk/agent/agent.go +++ b/sdk/agent/agent.go @@ -50,15 +50,6 @@ type Agent struct { LogWriter io.Writer - OnError func(*Agent, error) - OnConnected func(*Agent) - OnOpened func(*Agent) - OnPaymentReceivedAndConfirmed func(*Agent, state.CloseAgreement) - OnPaymentSentAndConfirmed func(*Agent, state.CloseAgreement) - // TODO: Add closing event when ingestion is implemented. - // OnClosing func(*Agent) - OnClosed func(*Agent) - conn io.ReadWriter otherEscrowAccount *keypair.FromAddress otherEscrowAccountSigner *keypair.FromAddress @@ -291,19 +282,11 @@ func (a *Agent) handle(m msg.Message, send *msg.Encoder) error { fmt.Fprintf(a.LogWriter, "handling %v\n", m.Type) handler := handlerMap[m.Type] if handler == nil { - err := fmt.Errorf("handling message %d: unrecognized message type", m.Type) - if a.OnError != nil { - a.OnError(a, err) - } - return err + return fmt.Errorf("handling message %d: unrecognized message type", m.Type) } err := handler(a, m, send) if err != nil { - err = fmt.Errorf("handling message %d: %w", m.Type, err) - if a.OnError != nil { - a.OnError(a, err) - } - return err + return fmt.Errorf("handling message %d: %w", m.Type, err) } return nil } @@ -331,10 +314,6 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error { fmt.Fprintf(a.LogWriter, "other's escrow account: %v\n", a.otherEscrowAccount.Address()) fmt.Fprintf(a.LogWriter, "other's signer: %v\n", a.otherEscrowAccountSigner.Address()) - if a.OnConnected != nil { - a.OnConnected(a) - } - return nil } @@ -357,15 +336,6 @@ func (a *Agent) handleOpenRequest(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("encoding open to send back: %w", err) } - // TODO: Remove this trigger of the event handler from here once ingesting - // transactions is added and the event is triggered from there. Note that - // technically the channel isn't open at this point and triggering the event - // here is just a hold over until we can trigger it based on ingestion. - // Triggering here assumes that the other participant, the initiator, - // submits the transaction. - if a.OnOpened != nil { - a.OnOpened(a) - } return nil } @@ -388,11 +358,6 @@ func (a *Agent) handleOpenResponse(m msg.Message, send *msg.Encoder) error { if err != nil { return fmt.Errorf("submitting formation tx: %w", err) } - // TODO: Move the triggering of this event handler to wherever we end up - // ingesting transactions, and trigger it after the channel becomes opened. - if a.OnOpened != nil { - a.OnOpened(a) - } return nil } @@ -418,9 +383,6 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error { } fmt.Fprintf(a.LogWriter, "payment authorized\n") err = send.Encode(msg.Message{Type: msg.TypePaymentResponse, PaymentResponse: &payment}) - if a.OnPaymentReceivedAndConfirmed != nil { - a.OnPaymentReceivedAndConfirmed(a, payment) - } if err != nil { return fmt.Errorf("encoding payment to send back: %w", err) } @@ -433,14 +395,11 @@ func (a *Agent) handlePaymentResponse(m msg.Message, send *msg.Encoder) error { } paymentIn := *m.PaymentResponse - payment, err := a.channel.ConfirmPayment(paymentIn) + _, err := a.channel.ConfirmPayment(paymentIn) if err != nil { return fmt.Errorf("confirming payment: %w", err) } fmt.Fprintf(a.LogWriter, "payment authorized\n") - if a.OnPaymentSentAndConfirmed != nil { - a.OnPaymentSentAndConfirmed(a, payment) - } return nil } @@ -479,11 +438,6 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("submitting close tx: %w", err) } fmt.Fprintln(a.LogWriter, "close successful") - // TODO: Move the triggering of this event handler to wherever we end up - // ingesting transactions, and trigger it after the channel becomes closed. - if a.OnClosed != nil { - a.OnClosed(a) - } return nil } @@ -515,10 +469,5 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error { return fmt.Errorf("submitting close tx: %w", err) } fmt.Fprintln(a.LogWriter, "close successful") - // TODO: Move the triggering of this event handler to wherever we end up - // ingesting transactions, and trigger it after the channel becomes closed. - if a.OnClosed != nil { - a.OnClosed(a) - } return nil } diff --git a/sdk/agent/agent_test.go b/sdk/agent/agent_test.go index 73a6eaa6..16c7b616 100644 --- a/sdk/agent/agent_test.go +++ b/sdk/agent/agent_test.go @@ -41,11 +41,6 @@ func TestAgent_openPaymentClose(t *testing.T) { // Setup the local agent. localVars := struct { submittedTx *txnbuild.Transaction - err error - connected bool - opened bool - closed bool - lastPaymentAgreement state.CloseAgreement }{} localAgent := &Agent{ ObservationPeriodTime: 20 * time.Second, @@ -65,37 +60,11 @@ func TestAgent_openPaymentClose(t *testing.T) { EscrowAccountKey: localEscrow.FromAddress(), EscrowAccountSigner: localSigner, LogWriter: io.Discard, - OnError: func(a *Agent, err error) { - localVars.err = err - }, - OnConnected: func(a *Agent) { - localVars.connected = true - }, - OnOpened: func(a *Agent) { - localVars.opened = true - }, - OnPaymentReceivedAndConfirmed: func(a *Agent, ca state.CloseAgreement) { - localVars.lastPaymentAgreement = ca - }, - OnPaymentSentAndConfirmed: func(a *Agent, ca state.CloseAgreement) { - localVars.lastPaymentAgreement = ca - }, - // TODO: Test when ingestion is added to - // OnClosing: func(a *Agent) { - // }, - OnClosed: func(a *Agent) { - localVars.closed = true - }, } // Setup the remote agent. remoteVars := struct { submittedTx *txnbuild.Transaction - err error - connected bool - opened bool - closed bool - lastPaymentAgreement state.CloseAgreement }{} remoteAgent := &Agent{ ObservationPeriodTime: 20 * time.Second, @@ -115,27 +84,6 @@ func TestAgent_openPaymentClose(t *testing.T) { EscrowAccountKey: remoteEscrow.FromAddress(), EscrowAccountSigner: remoteSigner, LogWriter: io.Discard, - OnError: func(a *Agent, err error) { - remoteVars.err = err - }, - OnConnected: func(a *Agent) { - remoteVars.connected = true - }, - OnOpened: func(a *Agent) { - remoteVars.opened = true - }, - OnPaymentReceivedAndConfirmed: func(a *Agent, ca state.CloseAgreement) { - remoteVars.lastPaymentAgreement = ca - }, - OnPaymentSentAndConfirmed: func(a *Agent, ca state.CloseAgreement) { - remoteVars.lastPaymentAgreement = ca - }, - // TODO: Test when ingestion is added to - // OnClosing: func(a *Agent) { - // }, - OnClosed: func(a *Agent) { - remoteVars.closed = true - }, } // Connect the two agents. @@ -162,10 +110,6 @@ func TestAgent_openPaymentClose(t *testing.T) { err = localAgent.receive() require.NoError(t, err) - // Expect connected event. - assert.True(t, localVars.connected) - assert.True(t, remoteVars.connected) - // Open the channel. err = localAgent.Open() require.NoError(t, err) @@ -174,10 +118,6 @@ func TestAgent_openPaymentClose(t *testing.T) { err = localAgent.receive() require.NoError(t, err) - // Expect opened event. - assert.True(t, localVars.opened) - assert.True(t, remoteVars.opened) - // Expect the open tx to have been submitted. openTx, err := localAgent.channel.OpenTx() require.NoError(t, err) @@ -192,12 +132,6 @@ func TestAgent_openPaymentClose(t *testing.T) { err = localAgent.receive() require.NoError(t, err) - // Expect payment events. - assert.Equal(t, int64(2), localVars.lastPaymentAgreement.Details.IterationNumber) - assert.Equal(t, int64(50_0000000), localVars.lastPaymentAgreement.Details.Balance) - assert.Equal(t, int64(2), remoteVars.lastPaymentAgreement.Details.IterationNumber) - assert.Equal(t, int64(50_0000000), remoteVars.lastPaymentAgreement.Details.Balance) - // Make another payment. err = remoteAgent.Payment("20.0") require.NoError(t, err) @@ -206,12 +140,6 @@ func TestAgent_openPaymentClose(t *testing.T) { err = remoteAgent.receive() require.NoError(t, err) - // Expect payment events. - assert.Equal(t, int64(3), localVars.lastPaymentAgreement.Details.IterationNumber) - assert.Equal(t, int64(30_0000000), localVars.lastPaymentAgreement.Details.Balance) - assert.Equal(t, int64(3), remoteVars.lastPaymentAgreement.Details.IterationNumber) - assert.Equal(t, int64(30_0000000), remoteVars.lastPaymentAgreement.Details.Balance) - // Expect no txs to have been submitted for payments. assert.Nil(t, localVars.submittedTx) assert.Nil(t, remoteVars.submittedTx) @@ -239,8 +167,4 @@ func TestAgent_openPaymentClose(t *testing.T) { assert.Equal(t, localCloseTx, remoteCloseTx) assert.Equal(t, localCloseTx, localVars.submittedTx) assert.Equal(t, remoteCloseTx, remoteVars.submittedTx) - - // Expect closed event. - assert.True(t, localVars.closed) - assert.True(t, remoteVars.closed) } From f23ad1e9a78ce967739761880e6124248283c804 Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 26 Aug 2021 06:39:05 +0000 Subject: [PATCH 29/30] remove event handlers --- examples/console/main.go | 23 ----------------------- 1 file changed, 23 deletions(-) diff --git a/examples/console/main.go b/examples/console/main.go index 23ab639d..9a5af3e2 100644 --- a/examples/console/main.go +++ b/examples/console/main.go @@ -10,7 +10,6 @@ import ( "github.com/stellar/experimental-payment-channels/sdk/agent" "github.com/stellar/experimental-payment-channels/sdk/horizon" - "github.com/stellar/experimental-payment-channels/sdk/state" "github.com/stellar/experimental-payment-channels/sdk/submit" "github.com/stellar/experimental-payment-channels/sdk/txbuild" "github.com/stellar/go/amount" @@ -115,28 +114,6 @@ func run() error { EscrowAccountKey: escrowAccountKey.FromAddress(), EscrowAccountSigner: signerKey, LogWriter: os.Stderr, - OnError: func(a *agent.Agent, err error) { - fmt.Fprintf(os.Stderr, "agent error: %v\n", err) - }, - OnConnected: func(a *agent.Agent) { - fmt.Fprintf(os.Stderr, "agent connected\n") - }, - OnOpened: func(a *agent.Agent) { - fmt.Fprintf(os.Stderr, "agent channel opened\n") - }, - OnPaymentReceivedAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { - fmt.Fprintf(os.Stderr, "agent channel received payment: iteration=%d balance=%d", ca.Details.IterationNumber, ca.Details.Balance) - }, - OnPaymentSentAndConfirmed: func(a *agent.Agent, ca state.CloseAgreement) { - fmt.Fprintf(os.Stderr, "agent channel sent payment and other participant confirmed: iteration=%d balance=%d", ca.Details.IterationNumber, ca.Details.Balance) - }, - // TODO: Add when ingestion is added to agent. - // OnClosing: func(a *agent.Agent) { - // fmt.Fprintf(os.Stderr, "agent channel closing\n") - // }, - OnClosed: func(a *agent.Agent) { - fmt.Fprintf(os.Stderr, "agent channel closed\n") - }, } tx, err := txbuild.CreateEscrow(txbuild.CreateEscrowParams{ From a8fbd45d02387d49765009b5cd987fe5eae6023e Mon Sep 17 00:00:00 2001 From: Leigh McCulloch <351529+leighmcculloch@users.noreply.github.com> Date: Thu, 26 Aug 2021 06:40:33 +0000 Subject: [PATCH 30/30] fix fmt --- sdk/agent/agent_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sdk/agent/agent_test.go b/sdk/agent/agent_test.go index 16c7b616..f29d398f 100644 --- a/sdk/agent/agent_test.go +++ b/sdk/agent/agent_test.go @@ -40,7 +40,7 @@ func TestAgent_openPaymentClose(t *testing.T) { // Setup the local agent. localVars := struct { - submittedTx *txnbuild.Transaction + submittedTx *txnbuild.Transaction }{} localAgent := &Agent{ ObservationPeriodTime: 20 * time.Second, @@ -64,7 +64,7 @@ func TestAgent_openPaymentClose(t *testing.T) { // Setup the remote agent. remoteVars := struct { - submittedTx *txnbuild.Transaction + submittedTx *txnbuild.Transaction }{} remoteAgent := &Agent{ ObservationPeriodTime: 20 * time.Second,