Skip to content

Commit

Permalink
Merge branch 'master' into deprecate_span_cache
Browse files Browse the repository at this point in the history
  • Loading branch information
rauljordan authored Apr 21, 2020
2 parents 2654dee + 8cba109 commit 666a5c6
Show file tree
Hide file tree
Showing 6 changed files with 116 additions and 3 deletions.
2 changes: 1 addition & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ http_archive(

go_repository(
name = "com_github_ethereum_go_ethereum",
commit = "861ae1b1875c17d86a6a5d68118708ab2b099658",
commit = "0beb54b2147b3473a4c55e5ce6f02643ce403b14",
importpath = "github.com/ethereum/go-ethereum",
# Note: go-ethereum is not bazel-friendly with regards to cgo. We have a
# a fork that has resolved these issues by disabling HID/USB support and
Expand Down
6 changes: 5 additions & 1 deletion beacon-chain/state/stategen/migrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func (s *State) MigrateToCold(ctx context.Context, finalizedSlot uint64, finaliz
if !s.beaconDB.HasState(ctx, r) {
recoveredArchivedState, err := s.ComputeStateUpToSlot(ctx, stateSummary.Slot)
if err != nil {
return err
// For whatever reason if node fails to generate archived state of a certain slot,
// a node should just skip that slot rather than fail to whole process block routine.
// Missing an archived point of a certain slot is less of a deal than failing process block.
log.Warnf("Unable to generate archived state: %v", err)
continue
}
if err := s.beaconDB.SaveState(ctx, recoveredArchivedState.Copy(), r); err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/state/stategen/replay.go
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ func (s *State) lastSavedState(ctx context.Context, slot uint64) (*state.BeaconS

lastSaved, err := s.beaconDB.HighestSlotStatesBelow(ctx, slot+1)
if err != nil {
return nil, errUnknownState
return nil, err
}

// Given this is used to query canonical state. There should only be one saved canonical block of a given slot.
Expand Down
2 changes: 2 additions & 0 deletions slasher/db/iface/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type ReadOnlyDatabase interface {
// MinMaxSpan related methods.
EpochSpansMap(ctx context.Context, epoch uint64) (map[uint64]detectionTypes.Span, error)
EpochSpanByValidatorIndex(ctx context.Context, validatorIdx uint64, epoch uint64) (detectionTypes.Span, error)
EpochsSpanByValidatorsIndices(ctx context.Context, validatorIndices []uint64, maxEpoch uint64) (map[uint64]map[uint64]detectionTypes.Span, error)

// ProposerSlashing related methods.
ProposalSlashingsByStatus(ctx context.Context, status types.SlashingStatus) ([]*ethpb.ProposerSlashing, error)
Expand Down Expand Up @@ -64,6 +65,7 @@ type WriteAccessDatabase interface {
SaveEpochSpansMap(ctx context.Context, epoch uint64, spanMap map[uint64]detectionTypes.Span) error
SaveValidatorEpochSpan(ctx context.Context, validatorIdx uint64, epoch uint64, spans detectionTypes.Span) error
SaveCachedSpansMaps(ctx context.Context) error
SaveEpochsSpanByValidatorsIndices(ctx context.Context, epochsSpans map[uint64]map[uint64]detectionTypes.Span) error
DeleteEpochSpans(ctx context.Context, validatorIdx uint64) error
DeleteValidatorSpanByEpoch(ctx context.Context, validatorIdx uint64, epoch uint64) error

Expand Down
62 changes: 62 additions & 0 deletions slasher/db/kv/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,68 @@ func (db *Store) EpochSpanByValidatorIndex(ctx context.Context, validatorIdx uin
return spans, err
}

// EpochsSpanByValidatorsIndices accepts validator indices and epoch and
// returns all their previous corresponding spans for slashing detection epoch=> validator index => spammap.
// Returns empty map if no values exists and error on db error.
func (db *Store) EpochsSpanByValidatorsIndices(ctx context.Context, validatorIndices []uint64, maxEpoch uint64) (map[uint64]map[uint64]types.Span, error) {
ctx, span := trace.StartSpan(ctx, "slasherDB.EpochsSpanByValidatorsIndices")
defer span.End()

var err error
epochsSpanMap := make(map[uint64]map[uint64]types.Span)
err = db.view(func(tx *bolt.Tx) error {
b := tx.Bucket(validatorsMinMaxSpanBucket)
epoch := maxEpoch
epochBucket := b.Bucket(bytesutil.Bytes8(epoch))

for epochBucket != nil {
valSpans := make(map[uint64]types.Span, len(validatorIndices))
for _, v := range validatorIndices {
enc := epochBucket.Get(bytesutil.Bytes8(v))
value, err := unmarshalSpan(ctx, enc)
if err != nil {
return err
}
valSpans[v] = value
}
epochsSpanMap[epoch] = valSpans
if epoch == 0 {
break
}
epoch--
epochBucket = b.Bucket(bytesutil.Bytes8(epoch))
}
return nil
})
return epochsSpanMap, err
}

// SaveEpochsSpanByValidatorsIndices accepts epochs span maps by validator indices and
// writes them to db.
// Returns error on db write error.
func (db *Store) SaveEpochsSpanByValidatorsIndices(ctx context.Context, epochsSpans map[uint64]map[uint64]types.Span) error {
ctx, span := trace.StartSpan(ctx, "slasherDB.SaveEpochsSpanByValidatorsIndices")
defer span.End()

err := db.update(func(tx *bolt.Tx) error {
b := tx.Bucket(validatorsMinMaxSpanBucket)
for epoch, indicesSpanMaps := range epochsSpans {
epochBucket, err := b.CreateBucketIfNotExists(bytesutil.Bytes8(epoch))
if err != nil {
return err
}
for idx, v := range indicesSpanMaps {
enc := marshalSpan(v)
if err := epochBucket.Put(bytesutil.Bytes8(idx), enc); err != nil {
return err
}
}
}
return nil
})
return err
}

// SaveValidatorEpochSpan accepts validator index epoch and spans returns.
// it reads the epoch spans from cache, updates it and save it back to cache
// if caching is enabled.
Expand Down
45 changes: 45 additions & 0 deletions slasher/db/kv/spanner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -265,3 +265,48 @@ func TestValidatorSpanMap_SaveCachedSpansMaps(t *testing.T) {
}
}
}

func TestStore_ReadWriteEpochsSpanByValidatorsIndices(t *testing.T) {
app := cli.App{}
set := flag.NewFlagSet("test", 0)
db := setupDB(t, cli.NewContext(&app, set, nil))
defer teardownDB(t, db)
ctx := context.Background()

for _, tt := range spanTests {
err := db.SaveEpochSpansMap(ctx, tt.epoch, tt.spanMap)
if err != nil {
t.Fatalf("Save validator span map failed: %v", err)
}
}
res, err := db.EpochsSpanByValidatorsIndices(ctx, []uint64{1, 2, 3}, 3)
if err != nil {
t.Fatal(err)
}
if len(res) != len(spanTests) {
t.Errorf("Wanted map of %d elemets, received map of %d elements", len(spanTests), len(res))
}
for _, tt := range spanTests {
if !reflect.DeepEqual(res[tt.epoch], tt.spanMap) {
t.Errorf("Wanted span map to be equal to: %v , received span map: %v ", spanTests[0].spanMap, res[1])
}
}
teardownDB(t, db)
db = setupDB(t, cli.NewContext(&app, set, nil))
if err := db.SaveEpochsSpanByValidatorsIndices(ctx, res); err != nil {
t.Fatal(err)
}
res, err = db.EpochsSpanByValidatorsIndices(ctx, []uint64{1, 2, 3}, 3)
if err != nil {
t.Fatal(err)
}
if len(res) != len(spanTests) {
t.Errorf("Wanted map of %d elemets, received map of %d elements", len(spanTests), len(res))
}
for _, tt := range spanTests {
if !reflect.DeepEqual(res[tt.epoch], tt.spanMap) {
t.Errorf("Wanted span map to be equal to: %v , received span map: %v ", spanTests[0].spanMap, res[1])
}
}

}

0 comments on commit 666a5c6

Please sign in to comment.