Skip to content

Commit

Permalink
var rename
Browse files Browse the repository at this point in the history
Signed-off-by: Tim Vaillancourt <[email protected]>
  • Loading branch information
timvaillancourt committed Nov 23, 2024
1 parent 0b51839 commit 4bc537f
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 34 deletions.
2 changes: 1 addition & 1 deletion go/vt/discovery/healthcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,7 @@ func NewHealthCheck(ctx context.Context, retryDelay, healthCheckTimeout time.Dur
if c == "" {
continue
}
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultConcurrency))
topoWatchers = append(topoWatchers, NewTopologyWatcher(ctx, topoServer, hc, filters, c, refreshInterval, refreshKnownTablets, topo.DefaultReadConcurrency))
}

hc.topoWatchers = topoWatchers
Expand Down
26 changes: 4 additions & 22 deletions go/vt/topo/keyspace.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,44 +22,26 @@ import (
"sort"
"sync"

"github.com/spf13/pflag"
"golang.org/x/sync/errgroup"

"vitess.io/vitess/go/constants/sidecar"
"vitess.io/vitess/go/event"
"vitess.io/vitess/go/sqlescape"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/servenv"
"vitess.io/vitess/go/vt/vterrors"

"vitess.io/vitess/go/event"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/topo/events"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/events"
"vitess.io/vitess/go/vt/vterrors"
)

// This file contains keyspace utility functions.

// Default concurrency to use in order to avoid overhwelming the topo server.
var DefaultConcurrency = 32

// shardKeySuffix is the suffix of a shard key.
// The full key looks like this:
// /vitess/global/keyspaces/customer/shards/80-/Shard
const shardKeySuffix = "Shard"

func registerFlags(fs *pflag.FlagSet) {
fs.IntVar(&DefaultConcurrency, "topo_read_concurrency", DefaultConcurrency, "Concurrency of topo reads.")
}

func init() {
servenv.OnParseFor("vtcombo", registerFlags)
servenv.OnParseFor("vtctld", registerFlags)
servenv.OnParseFor("vtgate", registerFlags)
servenv.OnParseFor("vtorc", registerFlags)
}

// KeyspaceInfo is a meta struct that contains metadata to give the
// data more context and convenience. This is the main way we interact
// with a keyspace.
Expand Down Expand Up @@ -228,7 +210,7 @@ func (ts *Server) FindAllShardsInKeyspace(ctx context.Context, keyspace string,
opt = &FindAllShardsInKeyspaceOptions{}
}
if opt.Concurrency <= 0 {
opt.Concurrency = DefaultConcurrency
opt.Concurrency = DefaultReadConcurrency
}

// Unescape the keyspace name as this can e.g. come from the VSchema where
Expand Down
4 changes: 4 additions & 0 deletions go/vt/topo/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,9 @@ var (

FlagBinaries = []string{"vttablet", "vtctl", "vtctld", "vtcombo", "vtgate",
"vtorc", "vtbackup"}

// Default read concurrency to use in order to avoid overhwelming the topo server.
DefaultReadConcurrency = 32
)

func init() {
Expand All @@ -193,6 +196,7 @@ func registerTopoFlags(fs *pflag.FlagSet) {
fs.StringVar(&topoImplementation, "topo_implementation", topoImplementation, "the topology implementation to use")
fs.StringVar(&topoGlobalServerAddress, "topo_global_server_address", topoGlobalServerAddress, "the address of the global topology server")
fs.StringVar(&topoGlobalRoot, "topo_global_root", topoGlobalRoot, "the path of the global topology data in the global topology server")
fs.IntVar(&DefaultReadConcurrency, "topo_read_concurrency", DefaultReadConcurrency, "Maximum concurrency of topo reads.")
}

// RegisterFactory registers a Factory for an implementation for a Server.
Expand Down
6 changes: 3 additions & 3 deletions go/vt/topo/shard.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,13 +661,13 @@ func (ts *Server) GetTabletsByShardCell(ctx context.Context, keyspace, shard str
// Divide the concurrency limit by the number of cells. If there are more
// cells than the limit, default to concurrency of 1.
cellConcurrency := 1
if len(cells) < DefaultConcurrency {
cellConcurrency = DefaultConcurrency / len(cells)
if len(cells) < DefaultReadConcurrency {
cellConcurrency = DefaultReadConcurrency / len(cells)
}

mu := sync.Mutex{}
eg, ctx := errgroup.WithContext(ctx)
eg.SetLimit(DefaultConcurrency)
eg.SetLimit(DefaultReadConcurrency)

tablets := make([]*TabletInfo, 0, len(cells))
var kss *KeyspaceShard
Expand Down
29 changes: 29 additions & 0 deletions go/vt/topo/stats_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"time"

"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
Expand All @@ -33,6 +35,11 @@ var (
"TopologyConnOperations timings",
[]string{"Operation", "Cell"})

topoStatsConnWaitTimings = stats.NewMultiTimings(
"TopologyConnWaits",
"TopologyConnWait timings",
[]string{"Operation", "Cell"})

topoStatsConnErrors = stats.NewCountersWithMultiLabels(
"TopologyConnErrors",
"TopologyConnErrors errors per operation",
Expand All @@ -46,6 +53,7 @@ type StatsConn struct {
cell string
conn Conn
readOnly bool
readSem *semaphore.Weighted
}

// NewStatsConn returns a StatsConn
Expand All @@ -54,13 +62,19 @@ func NewStatsConn(cell string, conn Conn) *StatsConn {
cell: cell,
conn: conn,
readOnly: false,
readSem: semaphore.NewWeighted(int64(DefaultReadConcurrency)),
}
}

// ListDir is part of the Conn interface
func (st *StatsConn) ListDir(ctx context.Context, dirPath string, full bool) ([]DirEntry, error) {
startTime := time.Now()
statsKey := []string{"ListDir", st.cell}
if err := st.readSem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer st.readSem.Release(1)
topoStatsConnWaitTimings.Record(statsKey, startTime)
defer topoStatsConnTimings.Record(statsKey, startTime)
res, err := st.conn.ListDir(ctx, dirPath, full)
if err != nil {
Expand Down Expand Up @@ -106,6 +120,11 @@ func (st *StatsConn) Update(ctx context.Context, filePath string, contents []byt
func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version, error) {
startTime := time.Now()
statsKey := []string{"Get", st.cell}
if err := st.readSem.Acquire(ctx, 1); err != nil {
return nil, nil, err
}
defer st.readSem.Release(1)
topoStatsConnWaitTimings.Record(statsKey, startTime)
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, version, err := st.conn.Get(ctx, filePath)
if err != nil {
Expand All @@ -119,6 +138,11 @@ func (st *StatsConn) Get(ctx context.Context, filePath string) ([]byte, Version,
func (st *StatsConn) GetVersion(ctx context.Context, filePath string, version int64) ([]byte, error) {
startTime := time.Now()
statsKey := []string{"GetVersion", st.cell}
if err := st.readSem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer st.readSem.Release(1)
topoStatsConnWaitTimings.Record(statsKey, startTime)
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, err := st.conn.GetVersion(ctx, filePath, version)
if err != nil {
Expand All @@ -132,6 +156,11 @@ func (st *StatsConn) GetVersion(ctx context.Context, filePath string, version in
func (st *StatsConn) List(ctx context.Context, filePathPrefix string) ([]KVInfo, error) {
startTime := time.Now()
statsKey := []string{"List", st.cell}
if err := st.readSem.Acquire(ctx, 1); err != nil {
return nil, err
}
defer st.readSem.Release(1)
topoStatsConnWaitTimings.Record(statsKey, startTime)
defer topoStatsConnTimings.Record(statsKey, startTime)
bytes, err := st.conn.List(ctx, filePathPrefix)
if err != nil {
Expand Down
12 changes: 5 additions & 7 deletions go/vt/topo/tablet.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,19 +26,17 @@ import (

"golang.org/x/sync/semaphore"

"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/vt/key"

"vitess.io/vitess/go/event"
"vitess.io/vitess/go/netutil"
"vitess.io/vitess/go/protoutil"
"vitess.io/vitess/go/trace"
"vitess.io/vitess/go/vt/key"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/topo/events"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vterrors"
)

// IsTrivialTypeChange returns if this db type be trivially reassigned
Expand Down Expand Up @@ -497,7 +495,7 @@ func (ts *Server) GetTabletMap(ctx context.Context, tabletAliases []*topodatapb.
returnErr error
)

concurrency := DefaultConcurrency
concurrency := DefaultReadConcurrency
if opt != nil && opt.Concurrency > 0 {
concurrency = opt.Concurrency
}
Expand Down
4 changes: 3 additions & 1 deletion go/vt/vtorc/logic/tablet_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,9 @@ func refreshTabletsUsing(loader func(tabletAlias string), forceRefresh bool) {
}

func refreshTabletsInCell(ctx context.Context, cell string, loader func(tabletAlias string), forceRefresh bool) {
tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{Concurrency: topo.DefaultConcurrency})
tablets, err := ts.GetTabletsByCell(ctx, cell, &topo.GetTabletsByCellOptions{
Concurrency: topo.DefaultReadConcurrency,
})
if err != nil {
log.Errorf("Error fetching topo info for cell %v: %v", cell, err)
return
Expand Down

0 comments on commit 4bc537f

Please sign in to comment.