Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/dutydb: delete cancelled queries #1595

Merged
merged 2 commits into from
Jan 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 60 additions & 5 deletions core/dutydb/memory.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,11 +172,15 @@ func (db *MemDB) Store(_ context.Context, duty core.Duty, unsignedSet core.Unsig

// AwaitBeaconBlock implements core.DutyDB, see its godoc.
func (db *MemDB) AwaitBeaconBlock(ctx context.Context, slot int64) (*spec.VersionedBeaconBlock, error) {
db.mu.Lock()
cancel := make(chan struct{})
defer close(cancel)
response := make(chan *spec.VersionedBeaconBlock, 1)

db.mu.Lock()
db.proQueries = append(db.proQueries, proQuery{
Key: slot,
Response: response,
Cancel: cancel,
})
db.resolveProQueriesUnsafe()
db.mu.Unlock()
Expand All @@ -193,11 +197,15 @@ func (db *MemDB) AwaitBeaconBlock(ctx context.Context, slot int64) (*spec.Versio

// AwaitBlindedBeaconBlock implements core.DutyDB, see its godoc.
func (db *MemDB) AwaitBlindedBeaconBlock(ctx context.Context, slot int64) (*eth2api.VersionedBlindedBeaconBlock, error) {
db.mu.Lock()
cancel := make(chan struct{})
defer close(cancel)
response := make(chan *eth2api.VersionedBlindedBeaconBlock, 1)

db.mu.Lock()
db.builderProQueries = append(db.builderProQueries, builderProQuery{
Key: slot,
Response: response,
Cancel: cancel,
})
db.resolveBuilderProQueriesUnsafe()
db.mu.Unlock()
Expand All @@ -214,14 +222,18 @@ func (db *MemDB) AwaitBlindedBeaconBlock(ctx context.Context, slot int64) (*eth2

// AwaitAttestation implements core.DutyDB, see its godoc.
func (db *MemDB) AwaitAttestation(ctx context.Context, slot int64, commIdx int64) (*eth2p0.AttestationData, error) {
db.mu.Lock()
cancel := make(chan struct{})
defer close(cancel)
response := make(chan *eth2p0.AttestationData, 1) // Instance of one so resolving never blocks

db.mu.Lock()
db.attQueries = append(db.attQueries, attQuery{
Key: attKey{
Slot: slot,
CommIdx: commIdx,
},
Response: response,
Cancel: cancel,
})
db.resolveAttQueriesUnsafe()
db.mu.Unlock()
Expand All @@ -240,14 +252,18 @@ func (db *MemDB) AwaitAttestation(ctx context.Context, slot int64, commIdx int64
// and attestation when available.
func (db *MemDB) AwaitAggAttestation(ctx context.Context, slot int64, attestationRoot eth2p0.Root,
) (*eth2p0.Attestation, error) {
db.mu.Lock()
cancel := make(chan struct{})
defer close(cancel)
response := make(chan core.AggregatedAttestation, 1) // Instance of one so resolving never blocks

db.mu.Lock()
db.aggQueries = append(db.aggQueries, aggQuery{
Key: aggKey{
Slot: slot,
Root: attestationRoot,
},
Response: response,
Cancel: cancel,
})
db.resolveAggQueriesUnsafe()
db.mu.Unlock()
Expand Down Expand Up @@ -275,15 +291,19 @@ func (db *MemDB) AwaitAggAttestation(ctx context.Context, slot int64, attestatio
// AwaitSyncContribution blocks and returns the sync committee contribution data for the slot and
// the subcommittee and the beacon block root when available.
func (db *MemDB) AwaitSyncContribution(ctx context.Context, slot, subcommIdx int64, beaconBlockRoot eth2p0.Root) (*altair.SyncCommitteeContribution, error) {
db.mu.Lock()
cancel := make(chan struct{})
defer close(cancel)
response := make(chan *altair.SyncCommitteeContribution, 1) // Instance of one so resolving never blocks

db.mu.Lock()
db.contribQueries = append(db.contribQueries, contribQuery{
Key: contribKey{
Slot: slot,
SubcommIdx: subcommIdx,
Root: beaconBlockRoot,
},
Response: response,
Cancel: cancel,
})
db.resolveContribQueriesUnsafe()
db.mu.Unlock()
Expand Down Expand Up @@ -528,6 +548,10 @@ func (db *MemDB) storeBlindedBeaconBlockUnsafe(unsignedData core.UnsignedData) e
func (db *MemDB) resolveAttQueriesUnsafe() {
var unresolved []attQuery
for _, query := range db.attQueries {
if cancelled(query.Cancel) {
continue // Drop cancelled queries.
}

value, ok := db.attDuties[query.Key]
if !ok {
unresolved = append(unresolved, query)
Expand All @@ -545,6 +569,10 @@ func (db *MemDB) resolveAttQueriesUnsafe() {
func (db *MemDB) resolveProQueriesUnsafe() {
var unresolved []proQuery
for _, query := range db.proQueries {
if cancelled(query.Cancel) {
continue // Drop cancelled queries.
}

value, ok := db.proDuties[query.Key]
if !ok {
unresolved = append(unresolved, query)
Expand All @@ -562,6 +590,10 @@ func (db *MemDB) resolveProQueriesUnsafe() {
func (db *MemDB) resolveAggQueriesUnsafe() {
var unresolved []aggQuery
for _, query := range db.aggQueries {
if cancelled(query.Cancel) {
continue // Drop cancelled queries.
}

value, ok := db.aggDuties[query.Key]
if !ok {
unresolved = append(unresolved, query)
Expand All @@ -579,6 +611,10 @@ func (db *MemDB) resolveAggQueriesUnsafe() {
func (db *MemDB) resolveBuilderProQueriesUnsafe() {
var unresolved []builderProQuery
for _, query := range db.builderProQueries {
if cancelled(query.Cancel) {
continue // Drop cancelled queries.
}

value, ok := db.builderProDuties[query.Key]
if !ok {
unresolved = append(unresolved, query)
Expand All @@ -596,6 +632,10 @@ func (db *MemDB) resolveBuilderProQueriesUnsafe() {
func (db *MemDB) resolveContribQueriesUnsafe() {
var unresolved []contribQuery
for _, query := range db.contribQueries {
if cancelled(query.Cancel) {
continue // Drop cancelled queries.
}

contribution, ok := db.contribDuties[query.Key]
if !ok {
unresolved = append(unresolved, query)
Expand Down Expand Up @@ -668,28 +708,43 @@ type contribKey struct {
type attQuery struct {
Key attKey
Response chan<- *eth2p0.AttestationData
Cancel <-chan struct{}
}

// proQuery is a waiting proQuery with a response channel.
type proQuery struct {
Key int64
Response chan<- *spec.VersionedBeaconBlock
Cancel <-chan struct{}
}

// aggQuery is a waiting aggQuery with a response channel.
type aggQuery struct {
Key aggKey
Response chan<- core.AggregatedAttestation
Cancel <-chan struct{}
}

// builderProQuery is a waiting builderProQuery with a response channel.
type builderProQuery struct {
Key int64
Response chan<- *eth2api.VersionedBlindedBeaconBlock
Cancel <-chan struct{}
}

// contribQuery is a waiting contribQuery with a response channel.
type contribQuery struct {
Key contribKey
Response chan<- *altair.SyncCommitteeContribution
Cancel <-chan struct{}
}

// cancelled returns true if channel has been closed.
func cancelled(cancel <-chan struct{}) bool {
select {
case <-cancel:
return true
Comment on lines +745 to +746
Copy link
Contributor

@xenowits xenowits Jan 6, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case _, ok := <-cancel:
   return !ok

ok is false if there are no more values to receive and the channel is closed. https://go.dev/tour/concurrency/4

default:
return false
}
}
82 changes: 82 additions & 0 deletions core/dutydb/memory_internal_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
// Copyright © 2022 Obol Labs Inc.
//
// This program is free software: you can redistribute it and/or modify it
// under the terms of the GNU General Public License as published by the Free
// Software Foundation, either version 3 of the License, or (at your option)
// any later version.
//
// This program is distributed in the hope that it will be useful, but WITHOUT
// ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
// FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for
// more details.
//
// You should have received a copy of the GNU General Public License along with
// this program. If not, see <http://www.gnu.org/licenses/>.

package dutydb

import (
"context"
"testing"

eth2p0 "github.com/attestantio/go-eth2-client/spec/phase0"
"github.com/stretchr/testify/require"

"github.com/obolnetwork/charon/core"
)

func TestCancelledQueries(t *testing.T) {
ctx := context.Background()

db := NewMemDB(noopDeadliner{})
db.Shutdown()

const slot = 99

// Enqueue queries of each type.
_, err := db.AwaitAttestation(ctx, slot, 0)
require.ErrorContains(t, err, "shutdown")

_, err = db.AwaitAggAttestation(ctx, slot, eth2p0.Root{})
require.ErrorContains(t, err, "shutdown")

_, err = db.AwaitBeaconBlock(ctx, slot)
require.ErrorContains(t, err, "shutdown")

_, err = db.AwaitBlindedBeaconBlock(ctx, slot)
require.ErrorContains(t, err, "shutdown")

_, err = db.AwaitSyncContribution(ctx, slot, 0, eth2p0.Root{})
require.ErrorContains(t, err, "shutdown")

// Ensure all queries are preset.
require.NotEmpty(t, db.contribQueries)
require.NotEmpty(t, db.attQueries)
require.NotEmpty(t, db.proQueries)
require.NotEmpty(t, db.aggQueries)
require.NotEmpty(t, db.builderProQueries)

// Resolve queries
db.resolveAggQueriesUnsafe()
db.resolveAttQueriesUnsafe()
db.resolveContribQueriesUnsafe()
db.resolveProQueriesUnsafe()
db.resolveBuilderProQueriesUnsafe()

// Ensure all queries are gone.
require.Empty(t, db.contribQueries)
require.Empty(t, db.attQueries)
require.Empty(t, db.proQueries)
require.Empty(t, db.aggQueries)
require.Empty(t, db.builderProQueries)
}

type noopDeadliner struct{}

func (t noopDeadliner) Add(duty core.Duty) bool {
return true
}

func (t noopDeadliner) C() <-chan core.Duty {
return make(chan core.Duty)
}