Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core/bloombits: use single channel for shutdown #20878

Merged
merged 5 commits into from
Jul 29, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 24 additions & 35 deletions core/bloombits/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,6 @@ func (m *Matcher) Start(ctx context.Context, begin, end uint64, results chan uin
session := &MatcherSession{
matcher: m,
quit: make(chan struct{}),
kill: make(chan struct{}),
ctx: ctx,
}
for _, scheduler := range m.schedulers {
Expand Down Expand Up @@ -386,10 +385,8 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
requests = make(map[uint][]uint64) // Per-bit list of section requests, ordered by section number
unallocs = make(map[uint]struct{}) // Bits with pending requests but not allocated to any retriever
retrievers chan chan uint // Waiting retrievers (toggled to nil if unallocs is empty)
)
var (
allocs int // Number of active allocations to handle graceful shutdown requests
shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
allocs int // Number of active allocations to handle graceful shutdown requests
shutdown = session.quit // Shutdown request channel, will gracefully wait for pending requests
)

// assign is a helper method fo try to assign a pending bit an actively
Expand All @@ -409,15 +406,12 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
for {
select {
case <-shutdown:
// Graceful shutdown requested, wait until all pending requests are honoured
// Shutdown requested. No more retrievers can be allocated,
// but we still need to wait until all pending requests have returned.
shutdown = nil
if allocs == 0 {
return
}
shutdown = nil

case <-session.kill:
// Pending requests not honoured in time, hard terminate
return

case req := <-dist:
// New retrieval request arrived to be distributed to some fetcher process
Expand Down Expand Up @@ -499,8 +493,9 @@ func (m *Matcher) distributor(dist chan *request, session *MatcherSession) {
assign(result.Bit)
}
}
// If we're in the process of shutting down, terminate
if allocs == 0 && shutdown == nil {

// End the session when all pending deliveries have arrived.
if shutdown == nil && allocs == 0 {
return
}
}
Expand All @@ -514,7 +509,6 @@ type MatcherSession struct {

closer sync.Once // Sync object to ensure we only ever close once
quit chan struct{} // Quit channel to request pipeline termination
kill chan struct{} // Term channel to signal non-graceful forced shutdown

ctx context.Context // Context used by the light client to abort filtering
err atomic.Value // Global error to track retrieval failures deep in the chain
Expand All @@ -529,7 +523,6 @@ func (s *MatcherSession) Close() {
s.closer.Do(func() {
// Signal termination and wait for all goroutines to tear down
close(s.quit)
time.AfterFunc(time.Second, func() { close(s.kill) })
s.pend.Wait()
})
}
Expand All @@ -542,10 +535,10 @@ func (s *MatcherSession) Error() error {
return nil
}

// AllocateRetrieval assigns a bloom bit index to a client process that can either
// allocateRetrieval assigns a bloom bit index to a client process that can either
// immediately request and fetch the section contents assigned to this bit or wait
// a little while for more sections to be requested.
func (s *MatcherSession) AllocateRetrieval() (uint, bool) {
func (s *MatcherSession) allocateRetrieval() (uint, bool) {
fetcher := make(chan uint)

select {
Expand All @@ -557,9 +550,9 @@ func (s *MatcherSession) AllocateRetrieval() (uint, bool) {
}
}

// PendingSections returns the number of pending section retrievals belonging to
// pendingSections returns the number of pending section retrievals belonging to
// the given bloom bit index.
func (s *MatcherSession) PendingSections(bit uint) int {
func (s *MatcherSession) pendingSections(bit uint) int {
fetcher := make(chan uint)

select {
Expand All @@ -571,9 +564,9 @@ func (s *MatcherSession) PendingSections(bit uint) int {
}
}

// AllocateSections assigns all or part of an already allocated bit-task queue
// allocateSections assigns all or part of an already allocated bit-task queue
// to the requesting process.
func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 {
func (s *MatcherSession) allocateSections(bit uint, count int) []uint64 {
fetcher := make(chan *Retrieval)

select {
Expand All @@ -589,14 +582,10 @@ func (s *MatcherSession) AllocateSections(bit uint, count int) []uint64 {
}
}

// DeliverSections delivers a batch of section bit-vectors for a specific bloom
// deliverSections delivers a batch of section bit-vectors for a specific bloom
// bit index to be injected into the processing pipeline.
func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets [][]byte) {
select {
case <-s.kill:
ucwong marked this conversation as resolved.
Show resolved Hide resolved
return
case s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}:
}
func (s *MatcherSession) deliverSections(bit uint, sections []uint64, bitsets [][]byte) {
s.matcher.deliveries <- &Retrieval{Bit: bit, Sections: sections, Bitsets: bitsets}
}

// Multiplex polls the matcher session for retrieval tasks and multiplexes it into
Expand All @@ -608,31 +597,31 @@ func (s *MatcherSession) DeliverSections(bit uint, sections []uint64, bitsets []
func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan *Retrieval) {
for {
// Allocate a new bloom bit index to retrieve data for, stopping when done
bit, ok := s.AllocateRetrieval()
bit, ok := s.allocateRetrieval()
if !ok {
return
}
// Bit allocated, throttle a bit if we're below our batch limit
if s.PendingSections(bit) < batch {
if s.pendingSections(bit) < batch {
select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.AllocateSections(bit, 0)
s.DeliverSections(bit, []uint64{}, [][]byte{})
s.allocateSections(bit, 0)
s.deliverSections(bit, []uint64{}, [][]byte{})
return

case <-time.After(wait):
// Throttling up, fetch whatever's available
}
}
// Allocate as much as we can handle and request servicing
sections := s.AllocateSections(bit, batch)
sections := s.allocateSections(bit, batch)
request := make(chan *Retrieval)

select {
case <-s.quit:
// Session terminating, we can't meaningfully service, abort
s.DeliverSections(bit, sections, make([][]byte, len(sections)))
s.deliverSections(bit, sections, make([][]byte, len(sections)))
return

case mux <- request:
Expand All @@ -644,7 +633,7 @@ func (s *MatcherSession) Multiplex(batch int, wait time.Duration, mux chan chan
s.err.Store(result.Error)
s.Close()
}
s.DeliverSections(result.Bit, result.Sections, result.Bitsets)
s.deliverSections(result.Bit, result.Sections, result.Bitsets)
}
}
}