Skip to content

Commit

Permalink
singleflight: spruce up singleflight
Browse files Browse the repository at this point in the history
This patch improves singleflight with a couple of features that were
done manually by most callers. The singleflight now optionally makes a flight's
context not respond to the cancelation of the caller's ctx, the flights
now run in stopper tasks, and stopper quiescence cancels the flights'
contexts. The callers were doing some of these things, but
inconsistently.

Also, the flights now get a tracing span and callers that join an
existing span get a copy of the recording of the flight leader. We've
wanted this multiple times when debugging, for example in the context of
table descriptor lease acquisitions and range descriptor resolving. The
interface for getting the results out of DoChan() had to change a bit as
a result.

Release note: None
  • Loading branch information
andreimatei committed Dec 14, 2022
1 parent d2e0051 commit 5129578
Show file tree
Hide file tree
Showing 36 changed files with 1,035 additions and 518 deletions.
34 changes: 17 additions & 17 deletions pkg/bench/rttanalysis/testdata/benchmark_expectations
Original file line number Diff line number Diff line change
Expand Up @@ -50,27 +50,27 @@ exp,benchmark
9,Grant/grant_all_on_1_table
9,Grant/grant_all_on_2_tables
9,Grant/grant_all_on_3_tables
11,GrantRole/grant_1_role
15,GrantRole/grant_2_roles
13,GrantRole/grant_1_role
18,GrantRole/grant_2_roles
3,ORMQueries/activerecord_type_introspection_query
5,ORMQueries/django_table_introspection_1_table
5,ORMQueries/django_table_introspection_4_tables
5,ORMQueries/django_table_introspection_8_tables
1,ORMQueries/has_column_privilege_using_attnum
1,ORMQueries/has_column_privilege_using_column_name
0,ORMQueries/has_schema_privilege_1
0,ORMQueries/has_schema_privilege_3
0,ORMQueries/has_schema_privilege_5
1,ORMQueries/has_sequence_privilege_1
3,ORMQueries/has_sequence_privilege_3
5,ORMQueries/has_sequence_privilege_5
1,ORMQueries/has_table_privilege_1
3,ORMQueries/has_table_privilege_3
5,ORMQueries/has_table_privilege_5
5,ORMQueries/has_column_privilege_using_attnum
5,ORMQueries/has_column_privilege_using_column_name
5,ORMQueries/has_schema_privilege_1
15,ORMQueries/has_schema_privilege_3
25,ORMQueries/has_schema_privilege_5
5,ORMQueries/has_sequence_privilege_1
15,ORMQueries/has_sequence_privilege_3
25,ORMQueries/has_sequence_privilege_5
5,ORMQueries/has_table_privilege_1
15,ORMQueries/has_table_privilege_3
25,ORMQueries/has_table_privilege_5
85,ORMQueries/hasura_column_descriptions
85,ORMQueries/hasura_column_descriptions_8_tables
5,ORMQueries/hasura_column_descriptions_modified
5,ORMQueries/information_schema._pg_index_position
9,ORMQueries/information_schema._pg_index_position
4,ORMQueries/pg_attribute
4,ORMQueries/pg_class
6,ORMQueries/pg_is_other_temp_schema
Expand All @@ -82,8 +82,8 @@ exp,benchmark
9,Revoke/revoke_all_on_1_table
9,Revoke/revoke_all_on_2_tables
9,Revoke/revoke_all_on_3_tables
9,RevokeRole/revoke_1_role
11,RevokeRole/revoke_2_roles
10,RevokeRole/revoke_1_role
12,RevokeRole/revoke_2_roles
1,SystemDatabaseQueries/select_system.users_with_empty_database_Name
1,SystemDatabaseQueries/select_system.users_with_schema_Name
1,SystemDatabaseQueries/select_system.users_without_schema_Name
Expand All @@ -95,5 +95,5 @@ exp,benchmark
12,Truncate/truncate_2_column_2_rows
1,VirtualTableQueries/select_crdb_internal.invalid_objects_with_1_fk
1,VirtualTableQueries/select_crdb_internal.tables_with_1_fk
3,VirtualTableQueries/virtual_table_cache_with_point_lookups
11,VirtualTableQueries/virtual_table_cache_with_point_lookups
7,VirtualTableQueries/virtual_table_cache_with_schema_change
57 changes: 29 additions & 28 deletions pkg/ccl/kvccl/kvtenantccl/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type Connector struct {
rpcContext *rpc.Context
rpcRetryOptions retry.Options
rpcDialTimeout time.Duration // for testing
rpcDial singleflight.Group
rpcDial *singleflight.Group
defaultZoneCfg *zonepb.ZoneConfig
addrs []string

Expand Down Expand Up @@ -142,6 +142,7 @@ func NewConnector(cfg kvtenant.ConnectorConfig, addrs []string) *Connector {
tenantID: cfg.TenantID,
AmbientContext: cfg.AmbientCtx,
rpcContext: cfg.RPCContext,
rpcDial: singleflight.NewGroup("dial tenant connector", singleflight.NoTags),
rpcRetryOptions: cfg.RPCRetryOptions,
defaultZoneCfg: cfg.DefaultZoneConfig,
addrs: addrs,
Expand Down Expand Up @@ -673,41 +674,41 @@ func (c *Connector) withClient(
// blocks until either a connection is successfully established or the provided
// context is canceled.
func (c *Connector) getClient(ctx context.Context) (*client, error) {
ctx = c.AnnotateCtx(ctx)
c.mu.RLock()
if client := c.mu.client; client != nil {
c.mu.RUnlock()
return client, nil
}
ch, _ := c.rpcDial.DoChan("dial", func() (interface{}, error) {
dialCtx := c.AnnotateCtx(context.Background())
dialCtx, cancel := c.rpcContext.Stopper.WithCancelOnQuiesce(dialCtx)
defer cancel()
var client *client
err := c.rpcContext.Stopper.RunTaskWithErr(dialCtx, "kvtenant.Connector: dial",
func(ctx context.Context) error {
var err error
client, err = c.dialAddrs(ctx)
return err
})
if err != nil {
return nil, err
}
c.mu.Lock()
defer c.mu.Unlock()
c.mu.client = client
return client, nil
})
future, _ := c.rpcDial.DoChan(ctx,
"dial",
singleflight.DoOpts{
Stop: c.rpcContext.Stopper,
InheritCancelation: false,
},
func(ctx context.Context) (interface{}, error) {
var client *client
err := c.rpcContext.Stopper.RunTaskWithErr(ctx, "kvtenant.Connector: dial",
func(ctx context.Context) error {
var err error
client, err = c.dialAddrs(ctx)
return err
})
if err != nil {
return nil, err
}
c.mu.Lock()
defer c.mu.Unlock()
c.mu.client = client
return client, nil
})
c.mu.RUnlock()

select {
case res := <-ch:
if res.Err != nil {
return nil, res.Err
}
return res.Val.(*client), nil
case <-ctx.Done():
return nil, ctx.Err()
res := future.WaitForResult(ctx)
if res.Err != nil {
return nil, res.Err
}
return res.Val.(*client), nil
}

// dialAddrs attempts to dial each of the configured addresses in a retry loop.
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/bulk/sst_batcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ func runTestImport(t *testing.T, batchSizeValue int64) {
// populate it after the first split but before the second split.
ds := s.DistSenderI().(*kvcoord.DistSender)
mockCache := rangecache.NewRangeCache(s.ClusterSettings(), ds,
func() int64 { return 2 << 10 }, s.Stopper(), s.TracerI().(*tracing.Tracer))
func() int64 { return 2 << 10 }, s.Stopper())
for _, k := range []int{0, split1} {
ent, err := ds.RangeDescriptorCache().Lookup(ctx, keys.MustAddr(key(k)))
require.NoError(t, err)
Expand Down
3 changes: 1 addition & 2 deletions pkg/kv/kvclient/kvcoord/dist_sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,8 +454,7 @@ func NewDistSender(cfg DistSenderConfig) *DistSender {
getRangeDescCacheSize := func() int64 {
return rangeDescriptorCacheSize.Get(&ds.st.SV)
}
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize,
cfg.RPCContext.Stopper, cfg.AmbientCtx.Tracer)
ds.rangeCache = rangecache.NewRangeCache(ds.st, rdb, getRangeDescCacheSize, cfg.RPCContext.Stopper)
if tf := cfg.TestingKnobs.TransportFactory; tf != nil {
ds.transportFactory = tf
} else {
Expand Down
2 changes: 1 addition & 1 deletion pkg/kv/kvclient/kvcoord/dist_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5047,7 +5047,7 @@ func TestSendToReplicasSkipsStaleReplicas(t *testing.T) {
getRangeDescCacheSize := func() int64 {
return 1 << 20
}
rc := rangecache.NewRangeCache(st, nil /* db */, getRangeDescCacheSize, stopper, tr)
rc := rangecache.NewRangeCache(st, nil /* db */, getRangeDescCacheSize, stopper)
rc.Insert(ctx, roachpb.RangeInfo{
Desc: tc.initialDesc,
Lease: roachpb.Lease{
Expand Down
1 change: 0 additions & 1 deletion pkg/kv/kvclient/rangecache/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ go_library(
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/syncutil/singleflight",
"//pkg/util/tracing",
"@com_github_biogo_store//llrb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_logtags//:logtags",
Expand Down
72 changes: 27 additions & 45 deletions pkg/kv/kvclient/rangecache/range_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/syncutil/singleflight"
"github.com/cockroachdb/cockroach/pkg/util/tracing"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/logtags"
)
Expand Down Expand Up @@ -124,7 +123,6 @@ type RangeDescriptorDB interface {
type RangeCache struct {
st *cluster.Settings
stopper *stop.Stopper
tracer *tracing.Tracer
// RangeDescriptorDB is used to retrieve range descriptors from the
// database, which will be cached by this structure.
db RangeDescriptorDB
Expand All @@ -140,7 +138,7 @@ type RangeCache struct {
// lookup requests for the same inferred range descriptor to be
// multiplexed onto the same database lookup. See makeLookupRequestKey
// for details on this inference.
lookupRequests singleflight.Group
lookupRequests *singleflight.Group

// coalesced, if not nil, is sent on every time a request is coalesced onto
// another in-flight one. Used by tests to block until a lookup request is
Expand Down Expand Up @@ -223,13 +221,12 @@ func makeLookupRequestKey(
// NewRangeCache returns a new RangeCache which uses the given RangeDescriptorDB
// as the underlying source of range descriptors.
func NewRangeCache(
st *cluster.Settings,
db RangeDescriptorDB,
size func() int64,
stopper *stop.Stopper,
tracer *tracing.Tracer,
st *cluster.Settings, db RangeDescriptorDB, size func() int64, stopper *stop.Stopper,
) *RangeCache {
rdc := &RangeCache{st: st, db: db, stopper: stopper, tracer: tracer}
rdc := &RangeCache{
st: st, db: db, stopper: stopper,
lookupRequests: singleflight.NewGroup("range lookup", "lookup"),
}
rdc.rangeCache.cache = cache.NewOrderedCache(cache.Config{
Policy: cache.CacheLRU,
ShouldEvict: func(n int, _, _ interface{}) bool {
Expand Down Expand Up @@ -797,23 +794,14 @@ func (rc *RangeCache) tryLookup(
}
}

// Fork a context with a new span before reqCtx is captured by the DoChan
// closure below; the parent span might get finished by the time the closure
// starts. In the "leader" case, the closure will take ownership of the new
// span.
reqCtx, reqSpan := tracing.EnsureChildSpan(ctx, rc.tracer, "range lookup")
resC, leader := rc.lookupRequests.DoChan(requestKey, func() (interface{}, error) {
defer reqSpan.Finish()
var lookupRes lookupResult
if err := rc.stopper.RunTaskWithErr(reqCtx, "rangecache: range lookup", func(ctx context.Context) (err error) {
// Clear the context's cancelation. This request services potentially many
// callers waiting for its result, and using the flight's leader's
// cancelation doesn't make sense.
ctx, cancel := rc.stopper.WithCancelOnQuiesce(
logtags.WithTags(context.Background(), logtags.FromContext(ctx)))
defer cancel()
ctx = tracing.ContextWithSpan(ctx, reqSpan)

future, leader := rc.lookupRequests.DoChan(ctx,
requestKey,
singleflight.DoOpts{
Stop: rc.stopper,
InheritCancelation: false,
},
func(ctx context.Context) (interface{}, error) {
var lookupRes lookupResult
// Attempt to perform the lookup by reading from a follower. If the
// result is too old for the leader of this group, then we'll fall back
// to reading from the leaseholder. Note that it's possible that the
Expand All @@ -823,21 +811,24 @@ func (rc *RangeCache) tryLookup(
// in that goroutine re-fetching.
lookupRes.consistency = ReadFromFollower
{
var err error
lookupRes.EvictionToken, err = tryLookupImpl(ctx, rc, key, lookupRes.consistency, useReverseScan)
shouldReturnError := err != nil && !errors.Is(err, errFailedToFindNewerDescriptor)
if err != nil && !errors.Is(err, errFailedToFindNewerDescriptor) {
return nil, err
}
gotFreshResult := err == nil && !lookupResultIsStale(lookupRes)
if shouldReturnError || gotFreshResult {
return err
if gotFreshResult {
return lookupRes, nil
}
}
var err error
lookupRes.consistency = ReadFromLeaseholder
lookupRes.EvictionToken, err = tryLookupImpl(ctx, rc, key, lookupRes.consistency, useReverseScan)
return err
}); err != nil {
return nil, err
}
return lookupRes, nil
})
if err != nil {
return nil, err
}
return lookupRes, nil
})

// We must use DoChan above so that we can always unlock this mutex. This must
// be done *after* the request has been added to the lookupRequests group, or
Expand All @@ -849,19 +840,10 @@ func (rc *RangeCache) tryLookup(
if rc.coalesced != nil {
rc.coalesced <- struct{}{}
}
// In the leader case, the callback takes ownership of reqSpan. If we're not
// the leader, we've created the span for no reason and have to finish it.
reqSpan.Finish()
}

// Wait for the inflight request.
var res singleflight.Result
select {
case res = <-resC:
case <-ctx.Done():
return EvictionToken{}, errors.Wrap(ctx.Err(), "aborted during range descriptor lookup")
}

res := future.WaitForResult(ctx)
var s string
if res.Err != nil {
s = res.Err.Error()
Expand Down
Loading

0 comments on commit 5129578

Please sign in to comment.