Skip to content

Commit

Permalink
Add logs for sequencer (#14)
Browse files Browse the repository at this point in the history
* add log statistics

* add reset and batch number

* total duration with milliseconds

* add batch gas used statistics
  • Loading branch information
giskook authored Oct 12, 2023
1 parent 2c7466e commit 6fff3f4
Show file tree
Hide file tree
Showing 4 changed files with 214 additions and 0 deletions.
38 changes: 38 additions & 0 deletions sequencer/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"math/big"
"strconv"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -332,7 +333,10 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {

tx := f.worker.GetBestFittingTx(f.batch.remainingResources)
metrics.WorkerProcessingTime(time.Since(start))
metrics.GetLogStatistics().CumulativeTiming(metrics.GetTx, time.Since(start))

if tx != nil {
metrics.GetLogStatistics().CumulativeCounting(metrics.TxCounter)
log.Debugf("processing tx: %s", tx.Hash.Hex())

// reset the count of effective GasPrice process attempts (since the tx may have been tried to be processed before)
Expand All @@ -344,20 +348,25 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {
if err != nil {
if err == ErrEffectiveGasPriceReprocess {
log.Info("reprocessing tx because of effective gas price calculation: %s", tx.Hash.Hex())
metrics.GetLogStatistics().CumulativeCounting(metrics.ReprocessingTxCounter)
continue
} else {
log.Errorf("failed to process transaction in finalizeBatches, Err: %v", err)
metrics.GetLogStatistics().CumulativeCounting(metrics.FailTxCounter)
break
}
}
metrics.GetLogStatistics().CumulativeValue(metrics.BatchGas, int64(tx.Gas))
break
}

f.sharedResourcesMux.Unlock()
} else {
// wait for new txs
// log.Debugf("no transactions to be processed. Sleeping for %v", f.cfg.SleepDuration.Duration)
if f.cfg.SleepDuration.Duration > 0 {
time.Sleep(f.cfg.SleepDuration.Duration)
metrics.GetLogStatistics().CumulativeCounting(metrics.GetTxPauseCounter)
}
}

Expand All @@ -369,10 +378,18 @@ func (f *finalizer) finalizeBatches(ctx context.Context) {

if f.isDeadlineEncountered() {
log.Infof("closing batch %d because deadline was encountered.", f.batch.batchNumber)
metrics.GetLogStatistics().SetTag(metrics.BatchCloseReason, "deadline")
f.finalizeBatch(ctx)
log.Infof(metrics.GetLogStatistics().Summary())
metrics.GetLogStatistics().ResetStatistics()
metrics.GetLogStatistics().UpdateTimestamp(metrics.NewRound, time.Now())
} else if f.isBatchFull() || f.isBatchAlmostFull() {
log.Infof("closing batch %d because it's almost full.", f.batch.batchNumber)
metrics.GetLogStatistics().SetTag(metrics.BatchCloseReason, "full")
f.finalizeBatch(ctx)
log.Infof(metrics.GetLogStatistics().Summary())
metrics.GetLogStatistics().ResetStatistics()
metrics.GetLogStatistics().UpdateTimestamp(metrics.NewRound, time.Now())
}

if err := ctx.Err(); err != nil {
Expand Down Expand Up @@ -412,8 +429,10 @@ func (f *finalizer) isBatchFull() bool {
// finalizeBatch retries to until successful closes the current batch and opens a new one, potentially processing forced batches between the batch is closed and the resulting new empty batch
func (f *finalizer) finalizeBatch(ctx context.Context) {
start := time.Now()
metrics.GetLogStatistics().SetTag(metrics.FinalizeBatchNumber, strconv.Itoa(int(f.batch.batchNumber)))
defer func() {
metrics.ProcessingTime(time.Since(start))
metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchTiming, time.Since(start))
}()

var err error
Expand Down Expand Up @@ -495,6 +514,7 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {
}

// Reprocess full batch as sanity check
tsReprocessFullBatch := time.Now()
if f.cfg.SequentialReprocessFullBatch {
// Do the full batch reprocess now
_, err := f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.initialStateRoot, f.batch.stateRoot)
Expand All @@ -508,14 +528,18 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {
_, _ = f.reprocessFullBatch(ctx, f.batch.batchNumber, f.batch.initialStateRoot, f.batch.stateRoot)
}()
}
metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchReprocessFullBatch, time.Since(tsReprocessFullBatch))

// Close the current batch
tsCloseBatch := time.Now()
err = f.closeBatch(ctx)
if err != nil {
return nil, fmt.Errorf("failed to close batch, err: %w", err)
}
metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchCloseBatch, time.Since(tsCloseBatch))

// Metadata for the next batch
tsOpenBatch := time.Now()
stateRoot := f.batch.stateRoot
lastBatchNumber := f.batch.batchNumber

Expand Down Expand Up @@ -544,6 +568,7 @@ func (f *finalizer) newWIPBatch(ctx context.Context) (*WipBatch, error) {
f.processRequest.GlobalExitRoot = batch.globalExitRoot
f.processRequest.Transactions = make([]byte, 0, 1)
}
metrics.GetLogStatistics().CumulativeTiming(metrics.FinalizeBatchOpenBatch, time.Since(tsOpenBatch))

return batch, err
}
Expand All @@ -558,6 +583,9 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) (errW
start := time.Now()
defer func() {
metrics.ProcessingTime(time.Since(start))
if tx != nil {
metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxTiming, time.Since(start))
}
}()

if f.batch.isEmpty() {
Expand Down Expand Up @@ -624,6 +652,7 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) (errW
}

log.Infof("processTransaction: single tx. Batch.BatchNumber: %d, BatchNumber: %d, OldStateRoot: %s, txHash: %s, GER: %s", f.batch.batchNumber, f.processRequest.BatchNumber, f.processRequest.OldStateRoot, hashStr, f.processRequest.GlobalExitRoot.String())
tsCommit := time.Now()
processBatchResponse, err := f.executor.ProcessBatch(ctx, f.processRequest, true)
if err != nil && errors.Is(err, runtime.ErrExecutorDBError) {
log.Errorf("failed to process transaction: %s", err)
Expand All @@ -643,10 +672,15 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) (errW
log.Errorf("failed to update status to invalid in the pool for tx: %s, err: %s", tx.Hash.String(), err)
} else {
metrics.TxProcessed(metrics.TxProcessedLabelInvalid, 1)
metrics.GetLogStatistics().CumulativeCounting(metrics.ProcessingInvalidTxCounter)
}
return nil, err
}
if tx != nil {
metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxCommit, time.Since(tsCommit))
}

tsProcessResponse := time.Now()
oldStateRoot := f.batch.stateRoot
if len(processBatchResponse.Responses) > 0 && tx != nil {
errWg, err = f.handleProcessTransactionResponse(ctx, tx, processBatchResponse, oldStateRoot)
Expand All @@ -660,6 +694,10 @@ func (f *finalizer) processTransaction(ctx context.Context, tx *TxTracker) (errW
f.batch.localExitRoot = processBatchResponse.NewLocalExitRoot
log.Infof("processTransaction: data loaded in memory. batch.batchNumber: %d, batchNumber: %d, result.NewStateRoot: %s, result.NewLocalExitRoot: %s, oldStateRoot: %s", f.batch.batchNumber, f.processRequest.BatchNumber, processBatchResponse.NewStateRoot.String(), processBatchResponse.NewLocalExitRoot.String(), oldStateRoot.String())

if tx != nil {
metrics.GetLogStatistics().CumulativeTiming(metrics.ProcessingTxResponse, time.Since(tsProcessResponse))
}

return nil, nil
}

Expand Down
40 changes: 40 additions & 0 deletions sequencer/metrics/logstatistics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package metrics

import (
"time"
)

type LogTag string

type LogStatistics interface {
CumulativeCounting(tag LogTag)
CumulativeValue(tag LogTag, value int64)
CumulativeTiming(tag LogTag, duration time.Duration)
SetTag(tag LogTag, value string)
Summary() string
ResetStatistics()

UpdateTimestamp(tag LogTag, tm time.Time)
}

const (
TxCounter LogTag = "TxCounter"
GetTx LogTag = "GetTx"
GetTxPauseCounter LogTag = "GetTxPauseCounter"
BatchCloseReason LogTag = "BatchCloseReason"
ReprocessingTxCounter LogTag = "ReProcessingTxCounter"
FailTxCounter LogTag = "FailTxCounter"
NewRound LogTag = "NewRound"
BatchGas LogTag = "BatchGas"

ProcessingTxTiming LogTag = "ProcessingTxTiming"
ProcessingInvalidTxCounter LogTag = "ProcessingInvalidTxCounter"
ProcessingTxCommit LogTag = "ProcessingTxCommit"
ProcessingTxResponse LogTag = "ProcessingTxResponse"

FinalizeBatchTiming LogTag = "FinalizeBatchTiming"
FinalizeBatchNumber LogTag = "FinalizeBatchNumber"
FinalizeBatchReprocessFullBatch LogTag = "FinalizeBatchReprocessFullBatch"
FinalizeBatchCloseBatch LogTag = "FinalizeBatchCloseBatch"
FinalizeBatchOpenBatch LogTag = "FinalizeBatchOpenBatch"
)
85 changes: 85 additions & 0 deletions sequencer/metrics/logstatisticsimpl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package metrics

import (
"strconv"
"sync"
"time"
)

var instance *logStatisticsInstance
var once sync.Once

func GetLogStatistics() LogStatistics {
once.Do(func() {
instance = &logStatisticsInstance{}
instance.init()
})
return instance
}

type logStatisticsInstance struct {
timestamp map[LogTag]time.Time
statistics map[LogTag]int64 // value maybe the counter or time.Duration(ms)
tags map[LogTag]string
}

func (l *logStatisticsInstance) init() {
l.timestamp = make(map[LogTag]time.Time)
l.statistics = make(map[LogTag]int64)
l.tags = make(map[LogTag]string)
}

func (l *logStatisticsInstance) CumulativeCounting(tag LogTag) {
l.statistics[tag]++
}

func (l *logStatisticsInstance) CumulativeValue(tag LogTag, value int64) {
l.statistics[tag] += value
}

func (l *logStatisticsInstance) CumulativeTiming(tag LogTag, duration time.Duration) {
l.statistics[tag] += duration.Milliseconds()
}

func (l *logStatisticsInstance) SetTag(tag LogTag, value string) {
l.tags[tag] = value
}

func (l *logStatisticsInstance) UpdateTimestamp(tag LogTag, tm time.Time) {
l.timestamp[tag] = tm
}

func (l *logStatisticsInstance) ResetStatistics() {
l.statistics = make(map[LogTag]int64)
l.tags = make(map[LogTag]string)
}

func (l *logStatisticsInstance) Summary() string {
batchTotalDuration := "-"
if key, ok := l.timestamp[NewRound]; ok {
batchTotalDuration = strconv.Itoa(int(time.Since(key).Milliseconds()))
}
processTxTiming := "ProcessTx<" + strconv.Itoa(int(l.statistics[ProcessingTxTiming])) + "ms, " +
"Commit<" + strconv.Itoa(int(l.statistics[ProcessingTxCommit])) + "ms>, " +
"ProcessResponse<" + strconv.Itoa(int(l.statistics[ProcessingTxResponse])) + "ms>>, "

finalizeBatchTiming := "FinalizeBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchTiming])) + "ms, " +
"ReprocessFullBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchReprocessFullBatch])) + "ms>, " +
"CloseBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchCloseBatch])) + "ms>, " +
"OpenBatch<" + strconv.Itoa(int(l.statistics[FinalizeBatchOpenBatch])) + "ms>>, "

result := "Batch<" + l.tags[FinalizeBatchNumber] + ">, " +
"TotalDuration<" + batchTotalDuration + "ms>, " +
"GasUsed<" + strconv.Itoa(int(l.statistics[BatchGas])) + ">, " +
"Tx<" + strconv.Itoa(int(l.statistics[TxCounter])) + ">, " +
"GetTx<" + strconv.Itoa(int(l.statistics[GetTx])) + "ms>, " +
"GetTxPause<" + strconv.Itoa(int(l.statistics[GetTxPauseCounter])) + ">, " +
"ReprocessTx<" + strconv.Itoa(int(l.statistics[ReprocessingTxCounter])) + ">, " +
"FailTx<" + strconv.Itoa(int(l.statistics[FailTxCounter])) + ">, " +
"InvalidTx<" + strconv.Itoa(int(l.statistics[ProcessingInvalidTxCounter])) + ">, " +
processTxTiming +
finalizeBatchTiming +
"BatchCloseReason<" + l.tags[BatchCloseReason] + ">"

return result
}
51 changes: 51 additions & 0 deletions sequencer/metrics/logstatisticsimpl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package metrics

import (
"testing"
"time"
)

func Test_logStatisticsInstance_Summary(t *testing.T) {
type fields struct {
timestamp map[LogTag]time.Time
statistics map[LogTag]int64
tags map[LogTag]string
}
tests := []struct {
name string
fields fields
want string
}{
// TODO: Add test cases.
{"1", fields{
timestamp: map[LogTag]time.Time{NewRound: time.Now().Add(-time.Second)},
statistics: map[LogTag]int64{
BatchGas: 111111,
TxCounter: 10,
GetTx: time.Second.Milliseconds(),
GetTxPauseCounter: 2,
ReprocessingTxCounter: 3,
FailTxCounter: 1,
ProcessingInvalidTxCounter: 2,
ProcessingTxTiming: time.Second.Milliseconds() * 30,
ProcessingTxCommit: time.Second.Milliseconds() * 10,
ProcessingTxResponse: time.Second.Milliseconds() * 15,
FinalizeBatchTiming: time.Second.Milliseconds() * 50,
FinalizeBatchReprocessFullBatch: time.Second.Milliseconds() * 20,
FinalizeBatchCloseBatch: time.Second.Milliseconds() * 10,
FinalizeBatchOpenBatch: time.Second.Milliseconds() * 10,
},
tags: map[LogTag]string{BatchCloseReason: "deadline", FinalizeBatchNumber: "123"},
}, "test"},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := &logStatisticsInstance{
timestamp: tt.fields.timestamp,
statistics: tt.fields.statistics,
tags: tt.fields.tags,
}
t.Log(l.Summary())
})
}
}

0 comments on commit 6fff3f4

Please sign in to comment.