Skip to content

Commit

Permalink
Fix data columns sampling (#14263)
Browse files Browse the repository at this point in the history
* Fix the obvious...

* Data columns sampling: Modify logging.

* `waitForChainStart`: Set it threadsafe - Do only wait once.

* Sampling: Wait for chain start before running the sampling.

Reason: `newDataColumnSampler1D` needs `s.ctxMap`.
`s.ctxMap` is only set when chain is started.

Previously `waitForChainStart` was only called in `s.registerHandlers`, it self called in a go-routine.

==> We had a race condition here: Sometimes `newDataColumnSampler1D` were called once `s.ctxMap` were set, sometimes not.

* Adresse Nishant's comments.

* Sampling: Improve logging.

* `waitForChainStart`: Remove `chainIsStarted` check.
  • Loading branch information
nalepae committed Nov 25, 2024
1 parent dcd5225 commit 1dc6a40
Show file tree
Hide file tree
Showing 7 changed files with 30 additions and 32 deletions.
40 changes: 20 additions & 20 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ func (d *dataColumnSampler1D) refreshPeerInfo() {
}
}

log.WithField("columnFromPeer", d.columnFromPeer).Debug("Peer info refreshed")

columnWithNoPeers := make([]uint64, 0)
for column, peers := range d.peerFromColumn {
if len(peers) == 0 {
Expand Down Expand Up @@ -228,7 +226,7 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
return
}

if coreTime.PeerDASIsActive(data.Slot) {
if !coreTime.PeerDASIsActive(data.Slot) {
// We do not trigger sampling if peerDAS is not active yet.
return
}
Expand All @@ -249,22 +247,12 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
// Randomize columns for sample selection.
randomizedColumns := randomizeColumns(d.nonCustodyColumns)
samplesCount := min(params.BeaconConfig().SamplesPerSlot, uint64(len(d.nonCustodyColumns))-params.BeaconConfig().NumberOfColumns/2)
ok, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount)

// TODO: Use the first output of `incrementalDAS` as input of the fork choice rule.
_, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount)
if err != nil {
log.WithError(err).Error("Failed to run incremental DAS")
}

if ok {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
}).Debug("Data column sampling successful")
} else {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
}).Warning("Data column sampling failed")
}
}

// incrementalDAS samples data columns from active peers using incremental DAS.
Expand All @@ -280,17 +268,28 @@ func (d *dataColumnSampler1D) incrementalDAS(
firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, allowedFailures)
roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary.

start := time.Now()

for round := 1; ; /*No exit condition */ round++ {
if extendedSampleCount > uint64(len(columns)) {
// We already tried to sample all possible columns, this is the unhappy path.
log.WithField("root", fmt.Sprintf("%#x", root)).Warning("Some columns are still missing after sampling all possible columns")
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"round": round - 1,
}).Warning("Some columns are still missing after trying to sample all possible columns")
return false, roundSummaries, nil
}

// Get the columns to sample for this round.
columnsToSample := columns[firstColumnToSample:extendedSampleCount]
columnsToSampleCount := extendedSampleCount - firstColumnToSample

log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"columns": columnsToSample,
"round": round,
}).Debug("Start data columns sampling")

// Sample data columns from peers in parallel.
retrievedSamples := d.sampleDataColumns(ctx, root, columnsToSample)

Expand All @@ -311,7 +310,8 @@ func (d *dataColumnSampler1D) incrementalDAS(
// All columns were correctly sampled, this is the happy path.
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"roundsNeeded": round,
"neededRounds": round,
"duration": time.Since(start),
}).Debug("All columns were successfully sampled")
return true, roundSummaries, nil
}
Expand Down Expand Up @@ -429,14 +429,14 @@ func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"requestedColumns": sortedSliceFromMap(requestedColumns),
}).Debug("All requested columns were successfully sampled from peer")
}).Debug("Sampled columns from peer successfully")
} else {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"requestedColumns": sortedSliceFromMap(requestedColumns),
"retrievedColumns": sortedSliceFromMap(retrievedColumns),
}).Debug("Some requested columns were not sampled from peer")
}).Debug("Sampled columns from peer with some errors")
}

return retrievedColumns
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,11 @@ go_test(
"@com_github_crate_crypto_go_kzg_4844//:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enode:go_default_library",
"@com_github_ethereum_go_ethereum//p2p/enr:go_default_library",
"@com_github_libp2p_go_libp2p//:go_default_library",
"@com_github_libp2p_go_libp2p//core:go_default_library",
"@com_github_libp2p_go_libp2p//core/crypto:go_default_library",
"@com_github_libp2p_go_libp2p//core/network:go_default_library",
"@com_github_libp2p_go_libp2p//core/peer:go_default_library",
"@com_github_libp2p_go_libp2p//p2p/net/swarm/testing:go_default_library",
"@com_github_paulbellamy_ratecounter//:go_default_library",
"@com_github_sirupsen_logrus//:go_default_library",
"@com_github_sirupsen_logrus//hooks/test:go_default_library",
Expand Down
4 changes: 2 additions & 2 deletions beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ import (
GoKZG "github.com/crate-crypto/go-kzg-4844"
"github.com/ethereum/go-ethereum/p2p/enode"
"github.com/ethereum/go-ethereum/p2p/enr"
"github.com/libp2p/go-libp2p"
libp2pcore "github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
swarmt "github.com/libp2p/go-libp2p/p2p/net/swarm/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/kzg"
mock "github.com/prysmaticlabs/prysm/v5/beacon-chain/blockchain/testing"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas"
Expand Down Expand Up @@ -1485,7 +1485,7 @@ func createAndConnectPeer(
require.NoError(t, err)

// Create the peer.
peer := p2ptest.NewTestP2P(t, swarmt.OptPeerPrivateKey(privateKey))
peer := p2ptest.NewTestP2P(t, libp2p.Identity(privateKey))

// Create a call counter.
countFromRequest := make(map[string]int, len(peerParams.toRespond))
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_data_column_sidecars_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
"requested": requestedColumnsList,
"custodiedCount": len(custodiedColumnsList),
"requestedCount": len(requestedColumnsList),
}).Debug("Received data column sidecar by root request")
}).Debug("Data column sidecar by root request received")

// Subscribe to the data column feed.
rootIndexChan := make(chan filesystem.RootIndexPair)
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/sync/rpc_send_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,6 @@ func SendDataColumnSidecarByRoot(
}

// Send the request to the peer.
log.WithField("topic", topic).Debug("Sending data column sidecar request")
stream, err := p2pApi.Send(ctx, req, topic, pid)
if err != nil {
return nil, errors.Wrap(err, "send")
Expand Down
12 changes: 6 additions & 6 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,12 +253,6 @@ func (s *Service) Start() {

// Update sync metrics.
async.RunEvery(s.ctx, syncMetricsInterval, s.updateMetrics)

// Run data column sampling
if params.PeerDASEnabled() {
s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier)
go s.sampler.Run(s.ctx)
}
}

// Stop the regular sync service.
Expand Down Expand Up @@ -359,6 +353,12 @@ func (s *Service) startTasksPostInitialSync() {
// Start the fork watcher.
go s.forkWatcher()

// Start data columns sampling if peerDAS is enabled.
if params.PeerDASEnabled() {
s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier)
go s.sampler.Run(s.ctx)
}

case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
}
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/verification/initializer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"github.com/prysmaticlabs/prysm/v5/network/forks"
ethpb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"github.com/prysmaticlabs/prysm/v5/time/slots"
log "github.com/sirupsen/logrus"
)

// Forkchoicer represents the forkchoice methods that the verifiers need.
Expand Down

0 comments on commit 1dc6a40

Please sign in to comment.