Skip to content

Commit

Permalink
materialize-boilerplate: fix race condition in extended logger
Browse files Browse the repository at this point in the history
Materializations can send acknowledgement messages concurrently with reading
load or flush messages. This would show up occasionally as a panic due to
concurrent access of variables in the "waiting for documents" part of the
extendedLogger event handler.

This adds a mutex to guard that concurrent access.
  • Loading branch information
williamhbaker committed Jan 10, 2025
1 parent c94068f commit e2d6ba1
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 0 deletions.
5 changes: 5 additions & 0 deletions materialize-boilerplate/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ func (l *extendedLogger) handler() func(transactionsEvent) {
var stopLoadLogger func(func())
var stopStoreLogger func(func())
var stopWaitingForDocsLogger func(func())
var waitingForDocsMu sync.Mutex
var loadPhaseStarted bool
var ackDelayActive bool
var recovery = true
Expand All @@ -164,15 +165,19 @@ func (l *extendedLogger) handler() func(transactionsEvent) {
round++
l.be.round++
case sentAcknowledged:
waitingForDocsMu.Lock()
if !loadPhaseStarted {
stopWaitingForDocsLogger = l.logAsync(l.waitingForDocsLogFn(round))
}
waitingForDocsMu.Unlock()
case readLoad, readFlush:
waitingForDocsMu.Lock()
if !l.waitingForDocsStart.IsZero() {
stopWaitingForDocsLogger(l.finishedWaitingForDocsLogFn(round))
}
l.waitingForDocsStart = time.Time{}
loadPhaseStarted = true
waitingForDocsMu.Unlock()
}

// Start and stop other loggers, resetting counters as needed.
Expand Down
38 changes: 38 additions & 0 deletions materialize-boilerplate/logging_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package boilerplate

import (
"sync"
"testing"

log "github.com/sirupsen/logrus"
)

func TestExtendedLoggerWaitingForDocsRace(t *testing.T) {
// TODO(whb): If we ever start running our tests with the -race flag
// enabled, this outer loop won't be necessary. As is, running the enclosed
// sequence ~100 times or so will reliably produce a panic unless sufficient
// synchronization is provided in the extended logger event handler.
for idx := 0; idx < 100; idx++ {
be := newBindingEvents()
logger := newExtendedLogger(loggerAtLevel{lvl: log.InfoLevel}, be)
handler := logger.handler()

handler(sentStartedCommit)

var wg sync.WaitGroup
wg.Add(2)
go func() {
for idx := 0; idx < 10; idx++ {
handler(readLoad)
}
wg.Done()
}()

go func() {
handler(sentAcknowledged)
wg.Done()
}()

wg.Wait()
}
}

0 comments on commit e2d6ba1

Please sign in to comment.