-
Notifications
You must be signed in to change notification settings - Fork 3.8k
/
Copy pathmultitenant_upgrade.go
318 lines (266 loc) · 12.4 KB
/
multitenant_upgrade.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
// Copyright 2021 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.
package tests
import (
"context"
gosql "database/sql"
"time"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/version"
"github.com/stretchr/testify/require"
)
func registerMultiTenantUpgrade(r registry.Registry) {
r.Add(registry.TestSpec{
Name: "multitenant-upgrade",
Cluster: r.MakeClusterSpec(2),
Owner: registry.OwnerKV,
NonReleaseBlocker: false,
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
runMultiTenantUpgrade(ctx, t, c, *t.BuildVersion())
},
})
}
// runMultiTenantUpgrade exercises upgrading tenants and their host cluster.
//
// Sketch of the test:
//
// * Host{Binary: Prev, Cluster: Prev}: Start host cluster.
// * Tenant11{Binary: Prev, Cluster: Prev}: Create tenant 11 and verify it works.
// * Host{Binary: Cur, Cluster: Prev}: Upgrade host cluster (don't finalize).
// * Tenant11{Binary: Prev, Cluster: Prev}: Verify tenant 11 still works.
// * Tenant12{Binary: Prev, Cluster: Prev}: Create tenant 12 and verify it works.
// * Tenant13{Binary: Cur, Cluster: Prev}: Create tenant 13 and verify it works.
// * Tenant11{Binary: Cur, Cluster: Prev}: Upgrade tenant 11 binary and verify it works.
// * Tenant11{Binary: Cur, Cluster: Cur}: Run the version upgrade for the tenant 11.
// * This is supported but not necessarily desirable. Exercise it just to
// show that it doesn't explode. This will verify new guard-rails when
// and if they are added.
// * Host{Binary: Cur, Cluster: Cur}: Finalize the upgrade on the host.
// * Tenant12{Binary: Cur, Cluster: Prev}: Upgrade the tenant 12 binary.
// * Tenant12{Binary: Cur, Cluster: Cur}: Run the version upgrade for tenant 12.
// * Tenant12{Binary: Cur, Cluster: Cur}: Restart tenant 12 and make sure it still works.
// * Tenant13{Binary: Cur, Cluster: Cur}: Run the version upgrade for tenant 13.
// * Tenant12{Binary: Cur, Cluster: Cur}: Restart tenant 13 and make sure it still works.
// * Tenant14{Binary: Cur, Cluster: Cur}: Create tenant 14 and verify it works.
// * Tenant12{Binary: Cur, Cluster: Cur}: Restart tenant 14 and make sure it still works.
func runMultiTenantUpgrade(ctx context.Context, t test.Test, c cluster.Cluster, v version.Version) {
predecessor, err := PredecessorVersion(v)
require.NoError(t, err)
currentBinary := uploadVersion(ctx, t, c, c.All(), "")
predecessorBinary := uploadVersion(ctx, t, c, c.All(), predecessor)
kvNodes := c.Node(1)
settings := install.MakeClusterSettings(install.BinaryOption(predecessorBinary), install.SecureOption(true))
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, kvNodes)
const tenant11HTTPPort, tenant11SQLPort = 8011, 20011
const tenant11ID = 11
runner := sqlutils.MakeSQLRunner(c.Conn(ctx, t.L(), 1))
// We'll sometimes have to wait out the backoff of the host cluster
// auto-update loop (at the time of writing 30s), plus some migrations may be
// genuinely long-running.
runner.SucceedsSoonDuration = 5 * time.Minute
runner.Exec(t, `SELECT crdb_internal.create_tenant($1)`, tenant11ID)
var initialVersion string
runner.QueryRow(t, "SHOW CLUSTER SETTING version").Scan(&initialVersion)
const tenantNode = 2
tenant11 := createTenantNode(ctx, t, c, kvNodes, tenant11ID, tenantNode, tenant11HTTPPort, tenant11SQLPort)
tenant11.start(ctx, t, c, predecessorBinary)
defer tenant11.stop(ctx, t, c)
t.Status("checking that a client can connect to the tenant 11 server")
verifySQL(t, tenant11.pgURL,
mkStmt(`CREATE TABLE foo (id INT PRIMARY KEY, v STRING)`),
mkStmt(`INSERT INTO foo VALUES($1, $2)`, 1, "bar"),
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}))
verifySQL(t, tenant11.pgURL,
mkStmt("SHOW CLUSTER SETTING version").
withResults([][]string{{initialVersion}}),
)
t.Status("preserving downgrade option on host server")
{
s := runner.QueryStr(t, `SHOW CLUSTER SETTING version`)
runner.Exec(
t,
`SET CLUSTER SETTING cluster.preserve_downgrade_option = $1`, s[0][0],
)
}
t.Status("upgrading host server")
c.Stop(ctx, t.L(), option.DefaultStopOpts(), kvNodes)
settings.Binary = currentBinary
c.Start(ctx, t.L(), option.DefaultStartOpts(), settings, kvNodes)
time.Sleep(time.Second)
t.Status("checking the pre-upgrade sql server still works after the KV binary upgrade")
verifySQL(t, tenant11.pgURL,
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}))
t.Status("creating a new tenant 12")
const tenant12HTTPPort, tenant12SQLPort = 8012, 20012
const tenant12ID = 12
runner.Exec(t, `SELECT crdb_internal.create_tenant($1)`, tenant12ID)
t.Status("starting tenant 12 server with older binary")
tenant12 := createTenantNode(ctx, t, c, kvNodes, tenant12ID, tenantNode, tenant12HTTPPort, tenant12SQLPort)
tenant12.start(ctx, t, c, predecessorBinary)
defer tenant12.stop(ctx, t, c)
t.Status("verifying that the tenant 12 server works and is at the earlier version")
verifySQL(t, tenant12.pgURL,
mkStmt(`CREATE TABLE foo (id INT PRIMARY KEY, v STRING)`),
mkStmt(`INSERT INTO foo VALUES($1, $2)`, 1, "bar"),
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}),
mkStmt("SHOW CLUSTER SETTING version").
withResults([][]string{{initialVersion}}),
)
t.Status("creating a new tenant 13")
const tenant13HTTPPort, tenant13SQLPort = 8013, 20013
const tenant13ID = 13
runner.Exec(t, `SELECT crdb_internal.create_tenant($1)`, tenant13ID)
t.Status("starting tenant 13 server with new binary")
tenant13 := createTenantNode(ctx, t, c, kvNodes, tenant13ID, tenantNode, tenant13HTTPPort, tenant13SQLPort)
tenant13.start(ctx, t, c, currentBinary)
defer tenant13.stop(ctx, t, c)
t.Status("verifying that the tenant 13 server works and is at the earlier version")
verifySQL(t, tenant13.pgURL,
mkStmt(`CREATE TABLE foo (id INT PRIMARY KEY, v STRING)`),
mkStmt(`INSERT INTO foo VALUES($1, $2)`, 1, "bar"),
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}),
mkStmt("SHOW CLUSTER SETTING version").
withResults([][]string{{initialVersion}}),
)
t.Status("stopping the tenant 11 server ahead of upgrading")
tenant11.stop(ctx, t, c)
t.Status("starting the tenant 11 server with the current binary")
tenant11.start(ctx, t, c, currentBinary)
t.Status("verify tenant 11 server works with the new binary")
{
verifySQL(t, tenant11.pgURL,
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}),
mkStmt("SHOW CLUSTER SETTING version").
withResults([][]string{{initialVersion}}))
}
// Note that this is exercising a path we likely want to eliminate in the
// future where the tenant is upgraded before the KV nodes.
t.Status("migrating the tenant 11 to the current version before kv is finalized")
verifySQL(t, tenant11.pgURL,
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}),
mkStmt("SHOW CLUSTER SETTING version").
withResults([][]string{{initialVersion}}),
mkStmt("SET CLUSTER SETTING version = crdb_internal.node_executable_version()"),
mkStmt("SELECT version = crdb_internal.node_executable_version() FROM [SHOW CLUSTER SETTING version]").
withResults([][]string{{"true"}}),
)
t.Status("finalizing the kv server")
runner.Exec(t, `SET CLUSTER SETTING cluster.preserve_downgrade_option = DEFAULT`)
runner.CheckQueryResultsRetry(t,
"SELECT version = crdb_internal.node_executable_version() FROM [SHOW CLUSTER SETTING version]",
[][]string{{"true"}})
t.Status("stopping the tenant 12 server ahead of upgrading")
tenant12.stop(ctx, t, c)
t.Status("starting the tenant 12 server with the current binary")
tenant12.start(ctx, t, c, currentBinary)
t.Status("verify tenant 12 server works with the new binary")
verifySQL(t, tenant12.pgURL,
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}),
mkStmt("SHOW CLUSTER SETTING version").
withResults([][]string{{initialVersion}}))
// Upgrade the tenant created in the mixed version state to the final version.
t.Status("migrating tenant 12 to the current version")
verifySQL(t, tenant12.pgURL,
mkStmt("SET CLUSTER SETTING version = crdb_internal.node_executable_version()"),
mkStmt("SELECT version = crdb_internal.node_executable_version() FROM [SHOW CLUSTER SETTING version]").
withResults([][]string{{"true"}}))
t.Status("restarting the tenant 12 server to check it works after a restart")
tenant12.stop(ctx, t, c)
tenant12.start(ctx, t, c, currentBinary)
t.Status("verify tenant 12 server works with the new binary after restart")
verifySQL(t, tenant12.pgURL,
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}),
mkStmt("SELECT version = crdb_internal.node_executable_version() FROM [SHOW CLUSTER SETTING version]").
withResults([][]string{{"true"}}))
// Upgrade the tenant created in the mixed version state to the final version.
t.Status("migrating tenant 13 to the current version")
verifySQL(t, tenant13.pgURL,
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}),
mkStmt("SET CLUSTER SETTING version = crdb_internal.node_executable_version()"),
mkStmt("SELECT version = crdb_internal.node_executable_version() FROM [SHOW CLUSTER SETTING version]").
withResults([][]string{{"true"}}),
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}))
t.Status("restarting the tenant 13 server to check it works after a restart")
tenant13.stop(ctx, t, c)
tenant13.start(ctx, t, c, currentBinary)
t.Status("verify tenant 13 server works with the new binary after restart")
verifySQL(t, tenant13.pgURL,
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}),
mkStmt("SELECT version = crdb_internal.node_executable_version() FROM [SHOW CLUSTER SETTING version]").
withResults([][]string{{"true"}}))
t.Status("creating tenant 14 at the new version")
const tenant14HTTPPort, tenant14SQLPort = 8014, 20014
const tenant14ID = 14
runner.Exec(t, `SELECT crdb_internal.create_tenant($1)`, tenant14ID)
t.Status("verifying the tenant 14 works and has the proper version")
tenant14 := createTenantNode(ctx, t, c, kvNodes, tenant14ID, tenantNode, tenant14HTTPPort, tenant14SQLPort)
tenant14.start(ctx, t, c, currentBinary)
defer tenant14.stop(ctx, t, c)
verifySQL(t, tenant14.pgURL,
mkStmt(`CREATE TABLE foo (id INT PRIMARY KEY, v STRING)`),
mkStmt(`INSERT INTO foo VALUES($1, $2)`, 1, "bar"),
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}),
mkStmt("SELECT version = crdb_internal.node_executable_version() FROM [SHOW CLUSTER SETTING version]").
withResults([][]string{{"true"}}))
t.Status("restarting the tenant 14 server to check it works after a restart")
tenant13.stop(ctx, t, c)
tenant13.start(ctx, t, c, currentBinary)
t.Status("verifying the post-upgrade tenant works and has the proper version")
verifySQL(t, tenant14.pgURL,
mkStmt(`SELECT * FROM foo LIMIT 1`).
withResults([][]string{{"1", "bar"}}),
mkStmt("SELECT version = crdb_internal.node_executable_version() FROM [SHOW CLUSTER SETTING version]").
withResults([][]string{{"true"}}))
}
type sqlVerificationStmt struct {
stmt string
args []interface{}
optionalResults [][]string
}
func (s sqlVerificationStmt) withResults(res [][]string) sqlVerificationStmt {
s.optionalResults = res
return s
}
func mkStmt(stmt string, args ...interface{}) sqlVerificationStmt {
return sqlVerificationStmt{stmt: stmt, args: args}
}
func verifySQL(t test.Test, url string, stmts ...sqlVerificationStmt) {
db, err := gosql.Open("postgres", url)
if err != nil {
t.Fatal(err)
}
defer func() { _ = db.Close() }()
tdb := sqlutils.MakeSQLRunner(db)
for _, stmt := range stmts {
if stmt.optionalResults == nil {
tdb.Exec(t, stmt.stmt, stmt.args...)
} else {
res := tdb.QueryStr(t, stmt.stmt, stmt.args...)
require.Equal(t, stmt.optionalResults, res)
}
}
}