-
Notifications
You must be signed in to change notification settings - Fork 3.9k
/
Copy pathconnection.go
139 lines (128 loc) · 4.44 KB
/
connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
// Copyright 2018 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package workload
import (
"fmt"
"net/url"
"runtime"
"strings"
"time"
"github.com/spf13/pflag"
)
// ConnFlags is helper of common flags that are relevant to QueryLoads.
type ConnFlags struct {
*pflag.FlagSet
DBOverride string
Concurrency int
Method string // Method for issuing queries; see SQLRunner.
ConnHealthCheckPeriod time.Duration
MaxConnIdleTime time.Duration
MaxConnLifetime time.Duration
MaxConnLifetimeJitter time.Duration
MinConns int
WarmupConns int
}
// NewConnFlags returns an initialized ConnFlags.
func NewConnFlags(genFlags *Flags) *ConnFlags {
c := &ConnFlags{}
c.FlagSet = pflag.NewFlagSet(`conn`, pflag.ContinueOnError)
c.StringVar(&c.DBOverride, `db`, ``,
`Override for the SQL database to use. If empty, defaults to the generator name`)
c.IntVar(&c.Concurrency, `concurrency`, 2*runtime.GOMAXPROCS(0),
`Number of concurrent workers`)
c.StringVar(&c.Method, `method`, `cache_statement`, `SQL issue method (cache_statement, cache_describe, describe_exec, exec, simple_protocol)`)
c.DurationVar(&c.ConnHealthCheckPeriod, `conn-healthcheck-period`, 30*time.Second, `Interval that health checks are run on connections`)
c.IntVar(&c.MinConns, `min-conns`, 0, `Minimum number of connections to attempt to keep in the pool`)
c.DurationVar(&c.MaxConnIdleTime, `max-conn-idle-time`, 150*time.Second, `Max time an idle connection will be kept around`)
c.DurationVar(&c.MaxConnLifetime, `max-conn-lifetime`, 300*time.Second, `Max connection lifetime`)
c.DurationVar(&c.MaxConnLifetimeJitter, `max-conn-lifetime-jitter`, 150*time.Second, `Jitter max connection lifetime by this amount`)
c.IntVar(&c.WarmupConns, `warmup-conns`, 0, `Number of connections to warmup in each connection pool`)
genFlags.AddFlagSet(c.FlagSet)
if genFlags.Meta == nil {
genFlags.Meta = make(map[string]FlagMeta)
}
for _, k := range []string{
`concurrency`,
`conn-healthcheck-period`,
`db`,
`max-conn-idle-time`,
`max-conn-lifetime-jitter`,
`max-conn-lifetime`,
`method`,
`min-conns`,
`warmup-conns`,
} {
v, ok := genFlags.Meta[k]
if !ok {
v = FlagMeta{}
}
v.RuntimeOnly = true
genFlags.Meta[k] = v
}
return c
}
// SanitizeUrls verifies that the give SQL connection strings have the correct
// SQL database set, rewriting them in place if necessary. This database name is
// returned.
func SanitizeUrls(gen Generator, dbOverride string, urls []string) (string, error) {
dbName := gen.Meta().Name
if dbOverride != `` {
dbName = dbOverride
}
for i := range urls {
parsed, err := url.Parse(urls[i])
if err != nil {
return "", err
}
if d := strings.TrimPrefix(parsed.Path, `/`); d != `` && d != dbName {
return "", fmt.Errorf(`%s specifies database %q, but database %q is expected`,
urls[i], d, dbName)
}
parsed.Path = dbName
q := parsed.Query()
q.Set("application_name", gen.Meta().Name)
parsed.RawQuery = q.Encode()
switch parsed.Scheme {
case "postgres", "postgresql":
urls[i] = parsed.String()
default:
return ``, fmt.Errorf(`unsupported scheme: %s`, parsed.Scheme)
}
}
return dbName, nil
}
// SetDefaultIsolationLevel configures the provided URLs with the specified
// default transaction isolation level, if any.
func SetDefaultIsolationLevel(urls []string, isoLevel string) error {
if isoLevel == "" {
return nil
}
// As a convenience, replace underscores with spaces. This allows users of the
// workload tool to pass --isolation-level=read_committed instead of needing
// to pass --isolation-level="read committed".
isoLevel = strings.ReplaceAll(isoLevel, "_", " ")
// NOTE: validation of the isolation level value is done by the server during
// connection establishment.
return setUrlParam(urls, "default_transaction_isolation", isoLevel)
}
// setUrlParam sets the given parameter to the given value in the provided URLs.
func setUrlParam(urls []string, param, value string) error {
for i := range urls {
parsed, err := url.Parse(urls[i])
if err != nil {
return err
}
q := parsed.Query()
q.Set(param, value)
parsed.RawQuery = q.Encode()
urls[i] = parsed.String()
}
return nil
}