Skip to content
This repository has been archived by the owner on Apr 23, 2024. It is now read-only.

sdk/agent: add event handlers #242

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 54 additions & 3 deletions sdk/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ type Agent struct {

LogWriter io.Writer

OnError func(*Agent, error)
OnConnected func(*Agent)
OnOpened func(*Agent)
OnPaymentReceivedAndConfirmed func(*Agent, state.CloseAgreement)
OnPaymentSentAndConfirmed func(*Agent, state.CloseAgreement)
// TODO: Add closing event when ingestion is implemented.
// OnClosing func(*Agent)
OnClosed func(*Agent)

conn io.ReadWriter
otherEscrowAccount *keypair.FromAddress
otherEscrowAccountSigner *keypair.FromAddress
Expand Down Expand Up @@ -282,11 +291,19 @@ func (a *Agent) handle(m msg.Message, send *msg.Encoder) error {
fmt.Fprintf(a.LogWriter, "handling %v\n", m.Type)
handler := handlerMap[m.Type]
if handler == nil {
return fmt.Errorf("handling message %d: unrecognized message type", m.Type)
err := fmt.Errorf("handling message %d: unrecognized message type", m.Type)
if a.OnError != nil {
a.OnError(a, err)
}
return err
}
err := handler(a, m, send)
if err != nil {
return fmt.Errorf("handling message %d: %w", m.Type, err)
err = fmt.Errorf("handling message %d: %w", m.Type, err)
if a.OnError != nil {
a.OnError(a, err)
}
return err
}
return nil
}
Expand Down Expand Up @@ -314,6 +331,10 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error {
fmt.Fprintf(a.LogWriter, "other's escrow account: %v\n", a.otherEscrowAccount.Address())
fmt.Fprintf(a.LogWriter, "other's signer: %v\n", a.otherEscrowAccountSigner.Address())

if a.OnConnected != nil {
a.OnConnected(a)
}

return nil
}

Expand All @@ -336,6 +357,15 @@ func (a *Agent) handleOpenRequest(m msg.Message, send *msg.Encoder) error {
if err != nil {
return fmt.Errorf("encoding open to send back: %w", err)
}
// TODO: Remove this trigger of the event handler from here once ingesting
// transactions is added and the event is triggered from there. Note that
// technically the channel isn't open at this point and triggering the event
// here is just a hold over until we can trigger it based on ingestion.
// Triggering here assumes that the other participant, the initiator,
// submits the transaction.
if a.OnOpened != nil {
a.OnOpened(a)
}
return nil
}

Expand All @@ -358,6 +388,11 @@ func (a *Agent) handleOpenResponse(m msg.Message, send *msg.Encoder) error {
if err != nil {
return fmt.Errorf("submitting formation tx: %w", err)
}
// TODO: Move the triggering of this event handler to wherever we end up
// ingesting transactions, and trigger it after the channel becomes opened.
if a.OnOpened != nil {
a.OnOpened(a)
}
return nil
}

Expand All @@ -383,6 +418,9 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error {
}
fmt.Fprintf(a.LogWriter, "payment authorized\n")
err = send.Encode(msg.Message{Type: msg.TypePaymentResponse, PaymentResponse: &payment})
if a.OnPaymentReceivedAndConfirmed != nil {
a.OnPaymentReceivedAndConfirmed(a, payment)
}
if err != nil {
return fmt.Errorf("encoding payment to send back: %w", err)
}
Expand All @@ -395,11 +433,14 @@ func (a *Agent) handlePaymentResponse(m msg.Message, send *msg.Encoder) error {
}

paymentIn := *m.PaymentResponse
_, err := a.channel.ConfirmPayment(paymentIn)
payment, err := a.channel.ConfirmPayment(paymentIn)
if err != nil {
return fmt.Errorf("confirming payment: %w", err)
}
fmt.Fprintf(a.LogWriter, "payment authorized\n")
if a.OnPaymentSentAndConfirmed != nil {
a.OnPaymentSentAndConfirmed(a, payment)
}
return nil
}

Expand Down Expand Up @@ -438,6 +479,11 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error {
return fmt.Errorf("submitting close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "close successful")
// TODO: Move the triggering of this event handler to wherever we end up
// ingesting transactions, and trigger it after the channel becomes closed.
if a.OnClosed != nil {
a.OnClosed(a)
}
return nil
}

Expand Down Expand Up @@ -469,5 +515,10 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error {
return fmt.Errorf("submitting close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "close successful")
// TODO: Move the triggering of this event handler to wherever we end up
// ingesting transactions, and trigger it after the channel becomes closed.
if a.OnClosed != nil {
a.OnClosed(a)
}
return nil
}
80 changes: 78 additions & 2 deletions sdk/agent/agent_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,12 @@ func TestAgent_openPaymentClose(t *testing.T) {

// Setup the local agent.
localVars := struct {
submittedTx *txnbuild.Transaction
submittedTx *txnbuild.Transaction
err error
connected bool
opened bool
closed bool
lastPaymentAgreement state.CloseAgreement
}{}
localAgent := &Agent{
ObservationPeriodTime: 20 * time.Second,
Expand All @@ -60,11 +65,37 @@ func TestAgent_openPaymentClose(t *testing.T) {
EscrowAccountKey: localEscrow.FromAddress(),
EscrowAccountSigner: localSigner,
LogWriter: io.Discard,
OnError: func(a *Agent, err error) {
localVars.err = err
},
OnConnected: func(a *Agent) {
localVars.connected = true
},
OnOpened: func(a *Agent) {
localVars.opened = true
},
OnPaymentReceivedAndConfirmed: func(a *Agent, ca state.CloseAgreement) {
localVars.lastPaymentAgreement = ca
},
OnPaymentSentAndConfirmed: func(a *Agent, ca state.CloseAgreement) {
localVars.lastPaymentAgreement = ca
},
// TODO: Test when ingestion is added to
// OnClosing: func(a *Agent) {
// },
OnClosed: func(a *Agent) {
localVars.closed = true
},
}

// Setup the remote agent.
remoteVars := struct {
submittedTx *txnbuild.Transaction
submittedTx *txnbuild.Transaction
err error
connected bool
opened bool
closed bool
lastPaymentAgreement state.CloseAgreement
}{}
remoteAgent := &Agent{
ObservationPeriodTime: 20 * time.Second,
Expand All @@ -84,6 +115,27 @@ func TestAgent_openPaymentClose(t *testing.T) {
EscrowAccountKey: remoteEscrow.FromAddress(),
EscrowAccountSigner: remoteSigner,
LogWriter: io.Discard,
OnError: func(a *Agent, err error) {
remoteVars.err = err
},
OnConnected: func(a *Agent) {
remoteVars.connected = true
},
OnOpened: func(a *Agent) {
remoteVars.opened = true
},
OnPaymentReceivedAndConfirmed: func(a *Agent, ca state.CloseAgreement) {
remoteVars.lastPaymentAgreement = ca
},
OnPaymentSentAndConfirmed: func(a *Agent, ca state.CloseAgreement) {
remoteVars.lastPaymentAgreement = ca
},
// TODO: Test when ingestion is added to
// OnClosing: func(a *Agent) {
// },
OnClosed: func(a *Agent) {
remoteVars.closed = true
},
}

// Connect the two agents.
Expand All @@ -110,6 +162,10 @@ func TestAgent_openPaymentClose(t *testing.T) {
err = localAgent.receive()
require.NoError(t, err)

// Expect connected event.
assert.True(t, localVars.connected)
assert.True(t, remoteVars.connected)

// Open the channel.
err = localAgent.Open()
require.NoError(t, err)
Expand All @@ -118,6 +174,10 @@ func TestAgent_openPaymentClose(t *testing.T) {
err = localAgent.receive()
require.NoError(t, err)

// Expect opened event.
assert.True(t, localVars.opened)
assert.True(t, remoteVars.opened)

// Expect the open tx to have been submitted.
openTx, err := localAgent.channel.OpenTx()
require.NoError(t, err)
Expand All @@ -132,6 +192,12 @@ func TestAgent_openPaymentClose(t *testing.T) {
err = localAgent.receive()
require.NoError(t, err)

// Expect payment events.
assert.Equal(t, int64(2), localVars.lastPaymentAgreement.Details.IterationNumber)
assert.Equal(t, int64(50_0000000), localVars.lastPaymentAgreement.Details.Balance)
assert.Equal(t, int64(2), remoteVars.lastPaymentAgreement.Details.IterationNumber)
assert.Equal(t, int64(50_0000000), remoteVars.lastPaymentAgreement.Details.Balance)

// Make another payment.
err = remoteAgent.Payment("20.0")
require.NoError(t, err)
Expand All @@ -140,6 +206,12 @@ func TestAgent_openPaymentClose(t *testing.T) {
err = remoteAgent.receive()
require.NoError(t, err)

// Expect payment events.
assert.Equal(t, int64(3), localVars.lastPaymentAgreement.Details.IterationNumber)
assert.Equal(t, int64(30_0000000), localVars.lastPaymentAgreement.Details.Balance)
assert.Equal(t, int64(3), remoteVars.lastPaymentAgreement.Details.IterationNumber)
assert.Equal(t, int64(30_0000000), remoteVars.lastPaymentAgreement.Details.Balance)

// Expect no txs to have been submitted for payments.
assert.Nil(t, localVars.submittedTx)
assert.Nil(t, remoteVars.submittedTx)
Expand Down Expand Up @@ -167,4 +239,8 @@ func TestAgent_openPaymentClose(t *testing.T) {
assert.Equal(t, localCloseTx, remoteCloseTx)
assert.Equal(t, localCloseTx, localVars.submittedTx)
assert.Equal(t, remoteCloseTx, remoteVars.submittedTx)

// Expect closed event.
assert.True(t, localVars.closed)
assert.True(t, remoteVars.closed)
}