Skip to content

Commit

Permalink
Merge #69747
Browse files Browse the repository at this point in the history
69747: workload: synchronise concurrent access to preparedStatements map r=[otan,tbg,erikgrinaker] a=stevendanna

After 60dd572, the TPCC workload started to fail with

```
fatal error: concurrent map writes
```

because it attempts to start multiple sq runners using the same
MultiConnPool. This fix allows for that by putting a mutex around the
map accesses.

Fixes #69758
Fixes #69757
Fixes #69755
Fixes #69751
Fixes #69750
Fixes #69749
Fixes #69748
Fixes #69745
Fixes #69744
Fixes #69743
Fixes #69742
Fixes #69741
Fixes #69740
Fixes #69739
Fixes #69738
Fixes #69735
Fixes #69734

Release justification: Fixes critical bug in TPCC workload.

Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Sep 2, 2021
2 parents 73b0f14 + cb84f32 commit a46e3a2
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 9 deletions.
1 change: 1 addition & 0 deletions pkg/workload/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ go_library(
"//pkg/util/bufalloc",
"//pkg/util/encoding/csv",
"//pkg/util/log",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/workload/histogram",
"@com_github_cockroachdb_errors//:errors",
Expand Down
30 changes: 23 additions & 7 deletions pkg/workload/pgx_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"sync/atomic"

"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/jackc/pgx/v4"
"github.com/jackc/pgx/v4/pgxpool"
"golang.org/x/sync/errgroup"
Expand All @@ -25,9 +26,13 @@ type MultiConnPool struct {
Pools []*pgxpool.Pool
// Atomic counter used by Get().
counter uint32
// preparedStatements is a map from name to SQL. The statements in the map
// are prepared whenever a new connection is acquired from the pool.
preparedStatements map[string]string

mu struct {
syncutil.RWMutex
// preparedStatements is a map from name to SQL. The statements in the map
// are prepared whenever a new connection is acquired from the pool.
preparedStatements map[string]string
}
}

// MultiConnPoolCfg encapsulates the knobs passed to NewMultiConnPool.
Expand Down Expand Up @@ -66,9 +71,9 @@ func (p pgxLogger) Log(
func NewMultiConnPool(
ctx context.Context, cfg MultiConnPoolCfg, urls ...string,
) (*MultiConnPool, error) {
m := &MultiConnPool{
preparedStatements: map[string]string{},
}
m := &MultiConnPool{}
m.mu.preparedStatements = map[string]string{}

connsPerURL := distribute(cfg.MaxTotalConnections, len(urls))
maxConnsPerPool := cfg.MaxConnsPerPool
if maxConnsPerPool == 0 {
Expand All @@ -90,7 +95,9 @@ func NewMultiConnPool(
connCfg.ConnConfig.Logger = pgxLogger{}
connCfg.MaxConns = int32(numConns)
connCfg.BeforeAcquire = func(ctx context.Context, conn *pgx.Conn) bool {
for name, sql := range m.preparedStatements {
m.mu.RLock()
defer m.mu.RUnlock()
for name, sql := range m.mu.preparedStatements {
// Note that calling `Prepare` with a name that has already been
// prepared is idempotent and short-circuits before doing any
// communication to the server.
Expand Down Expand Up @@ -150,6 +157,15 @@ func NewMultiConnPool(
return m, nil
}

// AddPreparedStatement adds the given sql statement to the map of
// statements that will be prepared when a new connection is retrieved
// from the pool.
func (m *MultiConnPool) AddPreparedStatement(name string, statement string) {
m.mu.Lock()
defer m.mu.Unlock()
m.mu.preparedStatements[name] = statement
}

// Get returns one of the pools, in round-robin manner.
func (m *MultiConnPool) Get() *pgxpool.Pool {
if len(m.Pools) == 1 {
Expand Down
3 changes: 1 addition & 2 deletions pkg/workload/sql_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,9 @@ func (sr *SQLRunner) Init(
for i, s := range sr.stmts {
stmtName := fmt.Sprintf("%s-%d", name, i+1)
s.preparedName = stmtName
mcp.preparedStatements[stmtName] = s.sql
mcp.AddPreparedStatement(stmtName, s.sql)
}
}

sr.mcp = mcp
sr.initialized = true
return nil
Expand Down

0 comments on commit a46e3a2

Please sign in to comment.