Skip to content
This repository has been archived by the owner on Apr 23, 2024. It is now read-only.

Commit

Permalink
logger resilience
Browse files Browse the repository at this point in the history
  • Loading branch information
leighmcculloch committed Aug 26, 2021
1 parent 9388e31 commit f8743b4
Showing 1 changed file with 37 additions and 27 deletions.
64 changes: 37 additions & 27 deletions sdk/agent/agent.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,16 @@ type Agent struct {
channel *state.Channel
}

// logWriter returns a writer that can be used for logging regardles of whether
// there is a LogWriter defined for the agent. If none is defined for the agent
// logs are discarded.
func (a *Agent) logWriter() io.Writer {
if a.LogWriter == nil {
return io.Discard
}
return a.LogWriter
}

// Channel returns the channel the agent is managing. The channel will be nil if
// the agent has not established a connection or coordinated a channel with
// another participant.
Expand All @@ -74,7 +84,7 @@ func (a *Agent) Channel() *state.Channel {

// hello sends a hello message to the remote participant over the connection.
func (a *Agent) hello() error {
enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter))
enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter()))
err := enc.Encode(msg.Message{
Type: msg.TypeHello,
Hello: &msg.Hello{
Expand Down Expand Up @@ -140,7 +150,7 @@ func (a *Agent) Open() error {
if err != nil {
return fmt.Errorf("proposing open: %w", err)
}
enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter))
enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter()))
err = enc.Encode(msg.Message{
Type: msg.TypeOpenRequest,
OpenRequest: &open,
Expand Down Expand Up @@ -169,7 +179,7 @@ func (a *Agent) Payment(paymentAmount string) error {
}
ca, err := a.channel.ProposePayment(amountValue)
if errors.Is(err, state.ErrUnderfunded) {
fmt.Fprintf(a.LogWriter, "local is underfunded for this payment based on cached account balances, checking escrow account...\n")
fmt.Fprintf(a.logWriter(), "local is underfunded for this payment based on cached account balances, checking escrow account...\n")
var balance int64
balance, err = a.BalanceCollector.GetBalance(a.channel.LocalEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset)
if err != nil {
Expand All @@ -181,7 +191,7 @@ func (a *Agent) Payment(paymentAmount string) error {
if err != nil {
return fmt.Errorf("proposing payment %d: %w", amountValue, err)
}
enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter))
enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter()))
err = enc.Encode(msg.Message{
Type: msg.TypePaymentRequest,
PaymentRequest: &ca,
Expand Down Expand Up @@ -215,19 +225,19 @@ func (a *Agent) DeclareClose() error {
if err != nil {
return fmt.Errorf("hashing decl tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "submitting declaration:", declHash)
fmt.Fprintln(a.logWriter(), "submitting declaration:", declHash)
err = a.Submitter.SubmitTx(declTx)
if err != nil {
return fmt.Errorf("submitting declaration tx: %w", err)
}

// Attempt revising the close agreement to close early.
fmt.Fprintln(a.LogWriter, "proposing a revised close for immediate submission")
fmt.Fprintln(a.logWriter(), "proposing a revised close for immediate submission")
ca, err := a.channel.ProposeClose()
if err != nil {
return fmt.Errorf("proposing the close: %w", err)
}
enc := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter))
enc := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter()))
err = enc.Encode(msg.Message{
Type: msg.TypeCloseRequest,
CloseRequest: &ca,
Expand All @@ -253,19 +263,19 @@ func (a *Agent) Close() error {
if err != nil {
return fmt.Errorf("hashing close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "submitting close tx:", closeHash)
fmt.Fprintln(a.logWriter(), "submitting close tx:", closeHash)
err = a.Submitter.SubmitTx(closeTx)
if err != nil {
fmt.Fprintln(a.LogWriter, "error submitting close tx:", closeHash, ",", err)
fmt.Fprintln(a.logWriter(), "error submitting close tx:", closeHash, ",", err)
return fmt.Errorf("submitting close tx %s: %w", closeHash, err)
}
fmt.Fprintln(a.LogWriter, "submitted close tx:", closeHash)
fmt.Fprintln(a.logWriter(), "submitted close tx:", closeHash)
return nil
}

func (a *Agent) receive() error {
recv := msg.NewDecoder(io.TeeReader(a.conn, a.LogWriter))
send := msg.NewEncoder(io.MultiWriter(a.conn, a.LogWriter))
recv := msg.NewDecoder(io.TeeReader(a.conn, a.logWriter()))
send := msg.NewEncoder(io.MultiWriter(a.conn, a.logWriter()))
m := msg.Message{}
err := recv.Decode(&m)
if err != nil {
Expand All @@ -282,13 +292,13 @@ func (a *Agent) receiveLoop() {
for {
err := a.receive()
if err != nil {
fmt.Fprintf(a.LogWriter, "error receiving: %v\n", err)
fmt.Fprintf(a.logWriter(), "error receiving: %v\n", err)
}
}
}

func (a *Agent) handle(m msg.Message, send *msg.Encoder) error {
fmt.Fprintf(a.LogWriter, "handling %v\n", m.Type)
fmt.Fprintf(a.logWriter(), "handling %v\n", m.Type)
handler := handlerMap[m.Type]
if handler == nil {
err := fmt.Errorf("handling message %d: unrecognized message type", m.Type)
Expand Down Expand Up @@ -328,8 +338,8 @@ func (a *Agent) handleHello(m msg.Message, send *msg.Encoder) error {
a.otherEscrowAccount = &h.EscrowAccount
a.otherEscrowAccountSigner = &h.Signer

fmt.Fprintf(a.LogWriter, "other's escrow account: %v\n", a.otherEscrowAccount.Address())
fmt.Fprintf(a.LogWriter, "other's signer: %v\n", a.otherEscrowAccountSigner.Address())
fmt.Fprintf(a.logWriter(), "other's escrow account: %v\n", a.otherEscrowAccount.Address())
fmt.Fprintf(a.logWriter(), "other's signer: %v\n", a.otherEscrowAccountSigner.Address())

if a.OnConnected != nil {
a.OnConnected(a)
Expand All @@ -349,7 +359,7 @@ func (a *Agent) handleOpenRequest(m msg.Message, send *msg.Encoder) error {
if err != nil {
return fmt.Errorf("confirming open: %w", err)
}
fmt.Fprintf(a.LogWriter, "open authorized\n")
fmt.Fprintf(a.logWriter(), "open authorized\n")
err = send.Encode(msg.Message{
Type: msg.TypeOpenResponse,
OpenResponse: &open,
Expand Down Expand Up @@ -379,7 +389,7 @@ func (a *Agent) handleOpenResponse(m msg.Message, send *msg.Encoder) error {
if err != nil {
return fmt.Errorf("confirming open: %w", err)
}
fmt.Fprintf(a.LogWriter, "open authorized\n")
fmt.Fprintf(a.logWriter(), "open authorized\n")
formationTx, err := a.channel.OpenTx()
if err != nil {
return fmt.Errorf("building formation tx: %w", err)
Expand All @@ -404,7 +414,7 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error {
paymentIn := *m.PaymentRequest
payment, err := a.channel.ConfirmPayment(paymentIn)
if errors.Is(err, state.ErrUnderfunded) {
fmt.Fprintf(a.LogWriter, "remote is underfunded for this payment based on cached account balances, checking their escrow account...\n")
fmt.Fprintf(a.logWriter(), "remote is underfunded for this payment based on cached account balances, checking their escrow account...\n")
var balance int64
balance, err = a.BalanceCollector.GetBalance(a.channel.RemoteEscrowAccount().Address, a.channel.OpenAgreement().Details.Asset)
if err != nil {
Expand All @@ -416,7 +426,7 @@ func (a *Agent) handlePaymentRequest(m msg.Message, send *msg.Encoder) error {
if err != nil {
return fmt.Errorf("confirming payment: %w", err)
}
fmt.Fprintf(a.LogWriter, "payment authorized\n")
fmt.Fprintf(a.logWriter(), "payment authorized\n")
err = send.Encode(msg.Message{Type: msg.TypePaymentResponse, PaymentResponse: &payment})
if a.OnPaymentReceivedAndConfirmed != nil {
a.OnPaymentReceivedAndConfirmed(a, payment)
Expand All @@ -437,7 +447,7 @@ func (a *Agent) handlePaymentResponse(m msg.Message, send *msg.Encoder) error {
if err != nil {
return fmt.Errorf("confirming payment: %w", err)
}
fmt.Fprintf(a.LogWriter, "payment authorized\n")
fmt.Fprintf(a.logWriter(), "payment authorized\n")
if a.OnPaymentSentAndConfirmed != nil {
a.OnPaymentSentAndConfirmed(a, payment)
}
Expand All @@ -462,7 +472,7 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error {
if err != nil {
return fmt.Errorf("encoding close to send back: %v\n", err)
}
fmt.Fprintln(a.LogWriter, "close ready")
fmt.Fprintln(a.logWriter(), "close ready")

// Submit the close immediately since it is valid immediately.
_, closeTx, err := a.channel.CloseTxs()
Expand All @@ -473,12 +483,12 @@ func (a *Agent) handleCloseRequest(m msg.Message, send *msg.Encoder) error {
if err != nil {
return fmt.Errorf("hashing close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "submitting close", hash)
fmt.Fprintln(a.logWriter(), "submitting close", hash)
err = a.Submitter.SubmitTx(closeTx)
if err != nil {
return fmt.Errorf("submitting close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "close successful")
fmt.Fprintln(a.logWriter(), "close successful")
// TODO: Move the triggering of this event handler to wherever we end up
// ingesting transactions, and trigger it after the channel becomes closed.
if a.OnClosed != nil {
Expand All @@ -498,7 +508,7 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error {
if err != nil {
return fmt.Errorf("confirming close: %v\n", err)
}
fmt.Fprintln(a.LogWriter, "close ready")
fmt.Fprintln(a.logWriter(), "close ready")

// Submit the close immediately since it is valid immediately.
_, closeTx, err := a.channel.CloseTxs()
Expand All @@ -509,12 +519,12 @@ func (a *Agent) handleCloseResponse(m msg.Message, send *msg.Encoder) error {
if err != nil {
return fmt.Errorf("hashing close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "submitting close", hash)
fmt.Fprintln(a.logWriter(), "submitting close", hash)
err = a.Submitter.SubmitTx(closeTx)
if err != nil {
return fmt.Errorf("submitting close tx: %w", err)
}
fmt.Fprintln(a.LogWriter, "close successful")
fmt.Fprintln(a.logWriter(), "close successful")
// TODO: Move the triggering of this event handler to wherever we end up
// ingesting transactions, and trigger it after the channel becomes closed.
if a.OnClosed != nil {
Expand Down

0 comments on commit f8743b4

Please sign in to comment.