Skip to content

Commit

Permalink
Protect cache integrity during reads
Browse files Browse the repository at this point in the history
During an invalid cache state, the client will disconnect, then attempt
reconnect to each endpoint. During this process reads against the cache
is are not prohibited. This means a client could be reading stale data
from the cache.

During reconnect we use the meta db.cacheMutex (not the cache mutex) to
control resetting the db's cache. This patch leverages that to guard
reads to the cache based on the same mutex. Additionally, it tries to
ensure that the cache will be in a consistent state when the read takes
place. The db.cacheMutex is not held for the entire reconnect process,
so we need to make some attempt to wait for a signal that a reconnect is
complete... a best effort attempt to give the client an accurate cache
read.

Signed-off-by: Tim Rozet <[email protected]>
  • Loading branch information
trozet committed Nov 3, 2021
1 parent 8b93f8d commit dab66f1
Showing 1 changed file with 35 additions and 2 deletions.
37 changes: 35 additions & 2 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"reflect"
"strings"
"sync"
"time"

"github.com/cenkalti/backoff/v4"
"github.com/cenkalti/rpc2"
Expand Down Expand Up @@ -1088,13 +1089,41 @@ func (o *ovsdbClient) Close() {
o.rpcClient.Close()
}

func isCacheConsistent(db *database) bool {
db.cacheMutex.RLock()
defer db.cacheMutex.RUnlock()
return !db.deferUpdates
}

// best effort to ensure cache is in a good state
func waitForCacheConsistent(db *database) {
if !hasMonitors(db) {
return
}
startTime := time.Now()
// TODO(trozet) make this an exponential backoff
for !isCacheConsistent(db) || time.Since(startTime) > 20*time.Second {
time.Sleep(50 * time.Millisecond)
}
}

func hasMonitors(db *database) bool {
db.monitorsMutex.Lock()
defer db.monitorsMutex.Unlock()
return len(db.monitors) > 0
}

// Client API interface wrapper functions
// We add this wrapper to allow users to access the API directly on the
// client object

//Get implements the API interface's Get function
func (o *ovsdbClient) Get(model model.Model) error {
return o.primaryDB().api.Get(model)
primaryDB := o.primaryDB()
waitForCacheConsistent(primaryDB)
primaryDB.cacheMutex.RLock()
defer primaryDB.cacheMutex.RUnlock()
return primaryDB.api.Get(model)
}

//Create implements the API interface's Create function
Expand All @@ -1104,7 +1133,11 @@ func (o *ovsdbClient) Create(models ...model.Model) ([]ovsdb.Operation, error) {

//List implements the API interface's List function
func (o *ovsdbClient) List(result interface{}) error {
return o.primaryDB().api.List(result)
primaryDB := o.primaryDB()
waitForCacheConsistent(primaryDB)
primaryDB.cacheMutex.RLock()
defer primaryDB.cacheMutex.RUnlock()
return primaryDB.api.List(result)
}

//Where implements the API interface's Where function
Expand Down

0 comments on commit dab66f1

Please sign in to comment.