Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(processor): Fix panic from nil items #413

Merged
merged 7 commits into from
Nov 29, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 17 additions & 4 deletions pkg/mimicry/p2p/execution/event_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,18 +152,31 @@ func (t TransactionExporter) Shutdown(ctx context.Context) error {
}

func (p *Peer) ExportTransactions(ctx context.Context, items []*TransactionHashItem) error {
if len(items) == 0 {
return nil
}

go func() {
hashes := make([]common.Hash, len(items))
seenMap := map[common.Hash]time.Time{}
var hashes []common.Hash

seenMap := make(map[common.Hash]time.Time, len(items))

for _, item := range items {
if item == nil {
continue
}

for i, item := range items {
exists := p.sharedCache.Transaction.Get(item.Hash.String())
if exists == nil {
hashes[i] = item.Hash
hashes = append(hashes, item.Hash)
seenMap[item.Hash] = item.Seen
}
}

if len(hashes) == 0 {
return
}

txs, err := p.client.GetPooledTransactions(ctx, hashes)
if err != nil {
p.log.WithError(err).Warn("Failed to get pooled transactions")
Expand Down
1 change: 1 addition & 0 deletions pkg/mimicry/p2p/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,7 @@ func (p *Peer) Start(ctx context.Context) (<-chan error, error) {
// I think we can get away with much higher as long as it doesn't go above the
// max client message size.
processor.WithMaxExportBatchSize(50000),
processor.WithWorkers(1),
)
if err != nil {
return nil, err
Expand Down
32 changes: 29 additions & 3 deletions pkg/processor/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,14 @@ func (bvp *BatchItemProcessor[T]) Write(ctx context.Context, s []*T) error {
prepared := []*TraceableItem[T]{}

for _, i := range s[start:end] {
if i == nil {
bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1))

bvp.log.Warnf("Attempted to write a nil item. This item has been dropped. This probably shouldn't happen and is likely a bug.")

continue
}

item := &TraceableItem[T]{
item: i,
}
Expand Down Expand Up @@ -284,8 +292,18 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa
defer cancel()
}

// Since the batch processor filters out nil items upstream,
// we can optimize by pre-allocating the full slice size.
// Worst case is a few wasted allocations if any nil items slip through.
items := make([]*T, len(itemsBatch))

for i, item := range itemsBatch {
if item == nil {
bvp.log.Warnf("Attempted to export a nil item. This item has been dropped. This probably shouldn't happen and is likely a bug.")

continue
}

items[i] = item.item
}

Expand All @@ -298,10 +316,10 @@ func (bvp *BatchItemProcessor[T]) exportWithTimeout(ctx context.Context, itemsBa
bvp.metrics.ObserveExportDuration(bvp.name, duration)

if err != nil {
bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(itemsBatch)))
bvp.metrics.IncItemsFailedBy(bvp.name, float64(len(items)))
} else {
bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(itemsBatch)))
bvp.metrics.ObserveBatchSize(bvp.name, float64(len(itemsBatch)))
bvp.metrics.IncItemsExportedBy(bvp.name, float64(len(items)))
bvp.metrics.ObserveBatchSize(bvp.name, float64(len(items)))
}

for _, item := range itemsBatch {
Expand Down Expand Up @@ -427,6 +445,14 @@ func (bvp *BatchItemProcessor[T]) batchBuilder(ctx context.Context) {

return
case item := <-bvp.queue:
if item == nil {
bvp.metrics.IncItemsDroppedBy(bvp.name, float64(1))

bvp.log.Warnf("Attempted to build a batch with a nil item. This item has been dropped. This probably shouldn't happen and is likely a bug.")

continue
}

batch = append(batch, item)

if len(batch) >= bvp.o.MaxExportBatchSize {
Expand Down
139 changes: 139 additions & 0 deletions pkg/processor/batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -784,3 +784,142 @@ func TestBatchItemProcessorQueueSize(t *testing.T) {

require.Equal(t, float64(itemsToExport-maxQueueSize), *metric.Counter.Value, "Dropped count should be equal to the number of items that exceeded the queue size")
}

func TestBatchItemProcessorNilItem(t *testing.T) {
te := testBatchExporter[TestItem]{}

metrics := NewMetrics("test")
bsp, err := NewBatchItemProcessor[TestItem](
&te,
"processor",
nullLogger(),
WithBatchTimeout(10*time.Millisecond),
WithMaxQueueSize(5),
WithMaxExportBatchSize(5),
WithWorkers(1),
WithShippingMethod(ShippingMethodSync),
WithMetrics(metrics),
)
require.NoError(t, err)

bsp.Start(context.Background())

// Write nil item to processor
err = bsp.Write(context.Background(), []*TestItem{nil})
require.NoError(t, err)

// Write invalid items to processor
err = bsp.Write(context.Background(), nil)
require.NoError(t, err)

// Give processor time to process the nil item
time.Sleep(500 * time.Millisecond)

// Verify processor is still running by writing a valid item
err = bsp.Write(context.Background(), []*TestItem{{name: "test"}})
require.NoError(t, err)
}

func TestBatchItemProcessorNilExporter(t *testing.T) {
metrics := NewMetrics("test")
bsp, err := NewBatchItemProcessor[TestItem](
nil,
"processor",
nullLogger(),
WithBatchTimeout(10*time.Millisecond),
WithMaxQueueSize(5),
WithMaxExportBatchSize(5),
WithWorkers(1),
WithShippingMethod(ShippingMethodSync),
WithMetrics(metrics),
)
require.NoError(t, err)

bsp.Start(context.Background())

// Write an item to processor
err = bsp.Write(context.Background(), []*TestItem{{name: "test"}})
require.Error(t, err)
}

func TestBatchItemProcessorNilExporterAfterProcessing(t *testing.T) {
metrics := NewMetrics("test")
exporter := &testBatchExporter[TestItem]{}
bsp, err := NewBatchItemProcessor[TestItem](
exporter,
"processor",
nullLogger(),
WithBatchTimeout(500*time.Millisecond),
WithMaxQueueSize(5),
WithMaxExportBatchSize(5),
WithWorkers(1),
WithShippingMethod(ShippingMethodAsync),
WithMetrics(metrics),
)
require.NoError(t, err)

bsp.Start(context.Background())

// Write an item to processor with valid exporter
err = bsp.Write(context.Background(), []*TestItem{{name: "test"}})
require.NoError(t, err)

// Give processor time to process the item
time.Sleep(1000 * time.Millisecond)

// Nil the exporter
bsp.e = nil

// Write an item to processor with nil exporter
err = bsp.Write(context.Background(), []*TestItem{{name: "test"}})
require.Error(t, err)

// Verify we can still shutdown without panic
require.NotPanics(t, func() {
err := bsp.Shutdown(context.Background())
require.NoError(t, err)
})
}

func TestBatchItemProcessorNilItemAfterQueue(t *testing.T) {
metrics := NewMetrics("test")
exporter := &testBatchExporter[TestItem]{}
bsp, err := NewBatchItemProcessor[TestItem](
exporter,
"processor",
nullLogger(),
WithBatchTimeout(500*time.Millisecond),
WithMaxQueueSize(5),
WithMaxExportBatchSize(5),
WithWorkers(1),
WithShippingMethod(ShippingMethodAsync),
WithMetrics(metrics),
)
require.NoError(t, err)

bsp.Start(context.Background())

// Write a valid item first
item := &TestItem{name: "test"}
err = bsp.Write(context.Background(), []*TestItem{item})
require.NoError(t, err)

// Inject nil directly into the processor's queue
bsp.queue <- nil

// Write a valid item to ensure the processor is still running
err = bsp.Write(context.Background(), []*TestItem{item})
require.NoError(t, err)

// Give processor time to handle the nil item
time.Sleep(1000 * time.Millisecond)

// Ensure no panic during shutdown
require.NotPanics(t, func() {
err := bsp.Shutdown(context.Background())
require.NoError(t, err)
})

// Verify that valid items were still exported
require.Equal(t, 2, exporter.len())
}
Loading