Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/bloombits: use atomic type #26993

Merged
merged 1 commit into from
Mar 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ type Matcher struct {
retrievals chan chan *Retrieval // Retriever processes waiting for task allocations
deliveries chan *Retrieval // Retriever processes waiting for task response deliveries

running uint32 // Atomic flag whether a session is live or not
running atomic.Bool // Atomic flag whether a session is live or not
}

// NewMatcher creates a new pipeline for retrieving bloom bit streams and doing
Expand Down Expand Up @@ -146,10 +146,10 @@ func (m *Matcher) addScheduler(idx uint) {
// channel is closed.
func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uint64) (*MatcherSession, error) {
// Make sure we're not creating concurrent sessions
if atomic.SwapUint32(&m.running, 1) == 1 {
if m.running.Swap(true) {
return nil, errors.New("matcher already running")
}
defer atomic.StoreUint32(&m.running, 0)
defer m.running.Store(false)

// Initiate a new matching round
session := &MatcherSession{
Expand Down
12 changes: 6 additions & 6 deletions core/bloombits/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, in
}
}
// Track the number of retrieval requests made
var requested uint32
var requested atomic.Uint32

// Start the matching session for the filter and the retriever goroutines
quit := make(chan struct{})
Expand Down Expand Up @@ -208,15 +208,15 @@ func testMatcher(t *testing.T, filter [][]bloomIndexes, start, blocks uint64, in
session.Close()
close(quit)

if retrievals != 0 && requested != retrievals {
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested, retrievals)
if retrievals != 0 && requested.Load() != retrievals {
t.Errorf("filter = %v blocks = %v intermittent = %v: request count mismatch, have #%v, want #%v", filter, blocks, intermittent, requested.Load(), retrievals)
}
return requested
return requested.Load()
}

// startRetrievers starts a batch of goroutines listening for section requests
// and serving them.
func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *uint32, batch int) {
func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *atomic.Uint32, batch int) {
requests := make(chan chan *Retrieval)

for i := 0; i < 10; i++ {
Expand All @@ -238,7 +238,7 @@ func startRetrievers(session *MatcherSession, quit chan struct{}, retrievals *ui
for i, section := range task.Sections {
if rand.Int()%4 != 0 { // Handle occasional missing deliveries
task.Bitsets[i] = generateBitset(task.Bit, section)
atomic.AddUint32(retrievals, 1)
retrievals.Add(1)
}
}
request <- task
Expand Down
6 changes: 3 additions & 3 deletions core/bloombits/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,13 @@ func testScheduler(t *testing.T, clients int, fetchers int, requests int) {
fetch := make(chan *request, 16)
defer close(fetch)

var delivered uint32
var delivered atomic.Uint32
for i := 0; i < fetchers; i++ {
go func() {
defer fetchPend.Done()

for req := range fetch {
atomic.AddUint32(&delivered, 1)
delivered.Add(1)

f.deliver([]uint64{
req.section + uint64(requests), // Non-requested data (ensure it doesn't go out of bounds)
Expand Down Expand Up @@ -97,7 +97,7 @@ func testScheduler(t *testing.T, clients int, fetchers int, requests int) {
}
pend.Wait()

if have := atomic.LoadUint32(&delivered); int(have) != requests {
if have := delivered.Load(); int(have) != requests {
t.Errorf("request count mismatch: have %v, want %v", have, requests)
}
}