diff --git a/dev b/dev index 0ecc418e1c40..51c0d44c3582 100755 --- a/dev +++ b/dev @@ -8,7 +8,7 @@ fi set -euo pipefail # Bump this counter to force rebuilding `dev` on all machines. -DEV_VERSION=53 +DEV_VERSION=54 THIS_DIR=$(cd "$(dirname "$0")" && pwd) BINARY_DIR=$THIS_DIR/bin/dev-versions diff --git a/docs/RFCS/20220628_invisible_index.md b/docs/RFCS/20220628_invisible_index.md new file mode 100644 index 000000000000..bb55f13a8553 --- /dev/null +++ b/docs/RFCS/20220628_invisible_index.md @@ -0,0 +1,428 @@ +- Feature Name: Invisible Index +- Status: in-progress +- Start Date: 2022-06-28 +- Authors: Wenyi Hu +- RFC PR: https://github.com/cockroachdb/cockroach/pull/83531 +- Cockroach Issue: https://github.com/cockroachdb/cockroach/issues/72576, + https://github.com/cockroachdb/cockroach/issues/82363 + +# Summary + +An invisible index is an index that is maintained up-to-date but is ignored by +the optimizer unless explicitly selected with [index +hinting](https://www.cockroachlabs.com/docs/v22.1/table-expressions#force-index-selection) +or for constraint purposes. + +The main purpose of this RFC is to introduce the feature, document +implementation decisions, and propose a technical design. + +# Motivation: use cases + +### 1. Roll out new indexes with more confidence. +When you create a new index, all queries are able to pick it which could have +an immediate effect on the workload. Currently, some users with large +production scales are concerned about the impact of introducing new indexes and +potentially affecting their applications significantly. + +With invisible indexes, you can introduce the index as invisible first. In a new +session, you could give a workout and observe the impact of the new index by +turning `optimizer_use_not_visible_indexes` on or with index hinting. If this +index does turn out to be useful, you can then change this index to be visible +in your database. + +Note that this allows us to see the impact more safely; the maintenance cost +associated with an index during inserts, upserts, updates, or delete is still +needed. + +### 2. Drop indexes with less risk. +A question that comes up frequently about indexes is whether an index is +actually useful for queries or if it is just sitting around and wasting +maintenance cost. Currently, the only way you can test this is by dropping the +index and then recreating it if the index turns out to be useful. However, when +the table gets large, recreating the index can become really expensive. + +With invisible indexes, you can mark the index as invisible first, wait for a +few weeks to measure the impact, and then drop the index if no drop in +performance is observed. If the index turns out to be needed, you can easily +change the index back to visible without the cost of rebuilding an index. + +Note that using an invisible index reduces the risk associated with dropping the +index but not with no risks. First, just because an index is not used during +this observation period, this does not mean it will not be used in the future. +Second, invisible indexes are still used behind the scene by the optimizer for +any constraint check and maintenance purposes (more details below). In that +case, you cannot expect the database to behave in the exact same way as dropping +an index. + +### 3. Debugging. +If queries suddenly start using an index unexpectedly and is causing performance +issues, you can change the index to be invisible as a short term solution. You +can then investigate what the problem might be using +`optimizer_use_not_visible_indexes` or index hinting in a new session. Once the +issue has been solved, the index can be made visible again. + +### 4. Make indexes only available to specific queries. +If you know certain queries have problems and creating an index would help, you +can use invisible index and make this index available only to queries you want +with index hinting. In this way you can leave the rest of your application +unaffected. + +# Implementation Decisions +### Conclusion: +- Users can create an invisible index or change an existing index to be invisible. +- By default, indexes are visible. +- Primary indexes cannot be invisible. +- Constraints cannot be created with invisible indexes. Creating unique or foreign key constraints with invisible indexes is not supported. +- Partial invisible indexes or inverted invisible indexes are both supported. The behavior is as expected. +- Queries can be instructed to use invisible indexes explicitly through [index + hinting](https://www.cockroachlabs.com/docs/v22.1/table-expressions#force-index-selection). +- Session variable, `optimizer_use_not_visible_indexes`, can be set to true to tell the optimizer to treat invisible indexes as they are visible. By default, `optimizer_use_not_visible_indexes` is set to false. + +The following points are where things might be unexpected as making an index invisible is not exactly the same as dropping the index. +- Force index or index hinting with invisible index is allowed and will override the invisible index feature. + - If the index is dropped instead, this will throw an error. +- Invisible indexes will be treated as visible while policing unique or foreign key constraints. In other words, we will temporarily disable the invisible index feature during any constraint check. + - If the index is dropped, the query plan for constraint check could be different and lead to a full table scan. + +### 1. What types of indexes can be invisible? +Primary indexes cannot be invisible. Any secondary indexes including unique +indexes can be invisible. + +In [MySQL](https://dev.mysql.com/doc/refman/8.0/en/invisible-indexes.html), all +indexes other than primary keys can be invisible. In MySQL, a table with no +explicit primary key will use the first unique index on NOT NULL columns as an +implicit primary key. Implicit and explicit primary indexes are both not allowed +to be invisible. However, a table with no explicit primary key creates a new +rowid column in CRDB, and creating a unique index on a null column will not +change the primary key. + +### 2. Can constraints be invisible? Can constraints be created with invisible indexes? +No. Constraints cannot be invisible, and they cannot be created with invisible +indexes. Having invisible constraint means that the constraint can be on and off +at different times. Since constraint is an insert-time enforcement, allowing +invisible constraint could lead to corrupted indexes. + +One might think creating unique constraints with invisible indexes is similar to +creating unique constraints without indexes (which is something CRDB is +currently supporting). But they have very different semantic meanings. First, +creating a constraint without index is not user-friendly and was created for +multi-tenant testing purposes. Second, creating a constraint with an invisible +index is still an index but just ignored by the optimizer. + +Overall, only indexes can be invisible. Creating a unique constraint or a +foreign key constraint with invisible indexes will not be supported. This leads +to an issue with the parser. This behavior aligns with MySQL. + +This leads to another issue in the parser; creating an invisible unique index +inside a `CREATE TABLE` definition is supported by the grammar rule, but the +parser will throw an error. This is because the parser is doing a round trip in +`pkg/testutils/sqlutils/pretty.go`. In sql.y, creating a unique index in a +`CREATE TABLE` statement returns a new structure +`tree.UniqueConstraintTableDef`. However, creating unique constraints with not +visible indexes is not supported by the parser. When the parser does a round +trip for the following statement, it formats it to a pretty statement using the +unique constraint definition. But the parser does not support unique constraint +with not visible index syntax. So it will fail while parsing the pretty +statement. Since logictest also performs a roundtrip check in +`pkg/sql/logictest/logic.go`, logictest would also fail. But creating a unique +index inside a `CREATE TABLE` definition will still work in a cluster. This is a +known issue. See more details in +https://github.com/cockroachdb/cockroach/pull/65825. + +### 3. Should Force Index with invisible index be allowed? +Using index hinting with invisible indexes is allowed and is part of the feature +design. Although this may lead to different behavior with the index being +dropped, this offers more flexibility with the feature. For example, the +fallback of some queries might be a full table scan and may be too expensive. + +In MySQL, index hinting with invisible indexes +[errors](https://dev.mysql.com/doc/refman/8.0/en/invisible-indexes.html). +Instead, MySQL supports index hinting with the session variable +[optimizer_use_not_visible_indexes](https://dev.mysql.com/doc/refman/8.0/en/switchable-optimizations.html#optflag_use-invisible-indexes). +Users can instruct queries to use invisible indexes by setting this session +variable to true for only specific queries. + +### 4. Are invisible indexes still maintained and up-to-date? +Yes. Just like any other indexes, an invisible index consumes maintenance cost +and resources. Regardless of visibility, indexes are maintained up-to-date with +insert, delete, upsert, and update. + +This behavior aligns with [MySQL](https://dev.mysql.com/doc/refman/8.0/en/invisible-indexes.html). + +### 5. Are unique constraints with invisible indexes still in effect? +Regardless of index visibility, unique indexes still prevent checks for +duplicate values when inserting or updating data. Creating a foreign key +constraint requires unique indexes or constraints on the parent table. Foreign +key constraints are still enforced even if the unique indexes on the parent +table become invisible; if a column in a child table is referencing another +column in the parent table, then this value in the parent table must exist. + +This behavior aligns with [MySQL](https://dev.mysql.com/doc/refman/8.0/en/invisible-indexes.html). + +### 6. Scope of Invisibility: to what extent should the optimizer ignore invisible indexes? Should constraint check use or ignore invisible indexes? +Consider the following situation. Creating a child table requires a parent table +to have a unique index on the FK columns. What happens if the unique index is +invisible here? What happens if this unique index is changed to invisible after +the child table has been created? Consider another case. What happens if INSERT +ON CONFLICT is performed on an invisible unique index? + +The first option would be to ignore the invisible index completely. However, +this means that when insert on the child table may require a full table scan to +police the foreign key check. The same situation applies if a parent table +performs delete or update, or if a child table performs insert, upsert, or +update. This would not only lead to performance issues; having a unique +constraint was necessary to create the child table or to perform INSERT ON +CONFLICT. If the index becomes invisible, does it really make sense to allow +these operations? Overall, this option is not viable. + +The second option would be to throw an error when an operation requires a +constraint check using invisible indexes. The justification behind this option +would be if someone wants to test the impact of dropping an index, this would be +the expected behavior. + +However, if someone wants to drop an index, it does not make sense if they still +want to have a foreign key constraint on it or to perform `INSERT ON CONFLICT`. +In addition, this makes this feature much more limited. As described above in +the motivation section, there are other use cases other than testing the impact +of dropping an index. For example, this feature is also helpful for a staged +rollout of a new index. Throwing an error with `INSERT ON CONFLICT` could lead +to confusion. + +The only option left with us is to allow the optimizer to still use invisible +indexes while policing foreign key constraints or unique constraints. This +obviously has some drawbacks; users can no longer expect dropping an index to +behave exactly the same as marking an index as invisible. But we will try our +best to document this well and log messages on occasions where they cannot +expect the same behavior. On the bright side, this should be the more +standardized way based on MySQL and Oracle. + +We should log warning messages on occasions where users cannot expect the invisible index to be equivalent to dropping an index. +We will log this message: +- if users are changing an existing visible index to invisible or if users are dropping an invisible index +- if this invisible index may be used to police constraint check + - when this invisible index is unique + - or when this invisible index is on a child table, and the first column stored by the index is part of the FK constraint. + +**Conclusion** + +The optimizer will treat all invisible indexes as they are visible for any +unique or foreign key constraint purposes. + +### 7. How to observe the impact on invisible indexes? +- SQL statements or queries will have different execution plans. + - You can see this using `EXPLAIN`. + - If you want to know whether invisible indexes are used for constraint check, + you can use `EXPLAIN (VERBOSE)` and check if `disabled not visible index + feature` is set as part of the scan flags. +- Queries or workload will have different performance. + +# Technical Design +## 1. How does the optimizer support this feature? +As discussed above, to fully support the invisible index feature, we need to +ignore the invisible index unless it is used for constraint check or used during +force index. + +First, let’s ignore the part where we need to disable the invisible index +feature and focus on how the optimizer will ignore invisible indexes in general. + +During exploration, the optimizer will explore every possible query plan using +transformation rules. While constructing equivalent memo groups, the optimizer +will enumerate indexes on a given Scan operator’s table using `ForEach` under +`pkg/sql/opt/xform/scan_index_iter.go`. This is where we can hide the index away +from the optimizer. While enumerating every index, the optimizer can check if +the index is invisible and ignore if it is. The optimizer can effectively ignore +the invisible index by blocking the creation of query plans with invisible +indexes. + +Second, let’s think about what happens when force index is used with invisible +index. Force index will override the invisible index feature. We will just need +to check if the flag for force index is set before ignoring invisible indexes +during exploration. + +Third, let’s think about how to disable invisible index features during +constraint check. During Optbuild, we are constructing scan expression on a +given table using `buildScan` under `pkg/sql/opt/optbuilder/select.go`. We can +add a flag to `ScanPrivate` to indicate if this Scan expression was built for a +constraint check. When the factory constructs the scan expression, this flag +will be passed along as a scan operator property. + +When the optimizer enumerates indexes on a given Scan operator under +`pkg/sql/opt/xform/scan_index_iter.go`, the optimizer can then check if the scan +is built for constraint check before ignoring the invisible index. + +### Foreign key constraint check will be needed: + - When a parent table performs an `UPDATE` or `DELETE` operation, FK check on the child table is needed. + - When a child table performs an `INSERT`, `UPSERT`,or `UPDATE` operation, FK check on the parent table is needed. + - There may be different foreign key actions `[UPDATE | DELETE] [ON CASCADE | SET DEFAULT | SET NULL | NO ACTION | RESTRICT| ON CONSTRAINT]`. +### Unique constraint check will be needed: + - When `INSERT [ON CONFLICT DO NOTHING | DO UPDATE SET | ON CONSTRAINT | DISTINCT ON]` + - When `UPSERT`, `UPDATE` + +## 2. Syntax +### a. CREATE INDEX, CREATE TABLE, ALTER INDEX statements +#### Create Index Statements +```sql +CREATE [UNIQUE | INVERTED] INDEX [CONCURRENTLY] [IF NOT EXISTS] [] + ON ( [ASC | DESC] [, ...] ) + [USING HASH] [STORING ( )] + [PARTITION BY ] + [WITH ] [WHERE ] + [VISIBLE | NOT VISIBLE] +``` +- Example + +```sql +CREATE INDEX a ON b.c (d) VISIBLE +CREATE INDEX a ON b.c (d) NOT VISIBLE + +CREATE INDEX a ON b (c) WITH (fillfactor = 100, y_bounds = 50) VISIBLE +CREATE INDEX a ON b (c) WITH (fillfactor = 100, y_bounds = 50) NOT VISIBLE + +CREATE INDEX geom_idx ON t USING GIST(geom) VISIBLE +CREATE INDEX geom_idx ON t USING GIST(geom) NOT VISIBLE + +CREATE UNIQUE INDEX IF NOT EXISTS a ON b (c) WHERE d > 3 VISIBLE +CREATE UNIQUE INDEX IF NOT EXISTS a ON b (c) WHERE d > 3 NOT VISIBLE +``` + +#### Create Table Statements +```sql +CREATE [[GLOBAL | LOCAL] {TEMPORARY | TEMP}] TABLE [IF NOT EXISTS] [table_element_list] [] +``` + +```sql +table_element_list: index_def +[UNIQUE | INVERTED] INDEX [] ( [ASC | DESC] [, ...] + [USING HASH] [{STORING | INCLUDE | COVERING} ( )] + [PARTITION BY ] + [WITH ] [WHERE ] + [VISIBLE | NOT VISIBLE] +``` + +- Example: +```sql +CREATE TABLE a (b INT8, c STRING, INDEX (b ASC, c DESC) STORING (c) VISIBLE) +CREATE TABLE a (b INT8, c STRING, INDEX (b ASC, c DESC) STORING (c) NOT VISIBLE) + +CREATE TABLE a (b INT, UNIQUE INDEX foo (b) WHERE c > 3 VISIBLE) +CREATE TABLE a (b INT, UNIQUE INDEX foo (b) WHERE c > 3 NOT VISIBLE) +``` + +#### ALTER INDEX Statements +```sql +ALTER INDEX [IF EXISTS] [VISIBLE | NOT VISIBLE] +``` + +```sql +ALTER INDEX a@b VISIBLE +ALTER INDEX a@b NOT VISIBLE +``` + +### b. SHOW INDEX Statements +A new column needs to be added to the output of following SQL statements: +```sql +SHOW INDEX FROM (table_name) +SHOW INDEXES FROM(table_name) +SHOW KEYS FROM (table_name) + +SHOW INDEX FROM DATABASE(database_name) +SHOW INDEXES FROM DATABASE (database_name) +SHOW KEYS FROM DATABASE (database_name) +``` + +``` +table_name index_name non_unique seq_in_index column_name direction storing implicit visible +``` + +### c. Tables that store indexes information +A new column needs to be added to the output of `crdb_internal.table_indexes` and `information_schema.statistics`. +`crdb_internal.table_indexes` +``` +descriptor_id descriptor_name index_id index_name index_type is_unique is_inverted is_sharded ***is_visible*** shard_bucket_count created_at +``` + +`information_schema.statistics` +``` +table_catalog table_schema table_name non_unique index_schema index_name seq_in_index column_name COLLATION cardinality direction storing implicit ***is_visible*** +``` + +## d. Alternative Syntax Considered +### a. CREATE INDEX, CREATE TABLE, ALTER INDEX statements +Invisible index feature is introducing four new user facing syntax. Since +PostgreSQL does not support the invisible index feature yet, we will use MySQL +and Oracle as a reference for the standardized syntax. + +The two options that we have discussed are `NOT VISIBLE` and `INVISIBLE`. + +- Reason why `NOT VISIBLE` is good: CRDB currently supports a similar feature, +invisible column feature. And invisible column feature is using `NOT VISIBLE` +for its syntax. If you are wondering about why the invisible column feature chose +`NOT VISIBLE` over `INVISIBLE`, please look at this PR +https://github.com/cockroachdb/cockroach/pull/26644 for more information. +- Reason why `INVISIBLE` is good: MySQL and Oracle both support `INVISIBLE`. + +**Conclusion**: we have decided that being consistent internally with what CRDB +already has is more important than being consistent with other database engines. + +There has been discussion about supporting INVISIBLE as an alias. But this could +lead to more issues: +1. If we support INVISIBLE as an alias for invisible index +feature, we would have to support INVISIBLE as an alias for the invisible column +feature as well. There are some technical issues in the grammar to do that. +2. If users are migrating from other database to CRDB, they would need to rewrite +their SQL anyway. +3. This might lead to confusion when user tries to create invisible columns or + indexes. Overall, supporting `INVISIBLE` as an alias doesn't seem to provide + a large benefit. + + +### b. SHOW INDEX Statements +The three options are `is_hidden`, `is_visible`, and `visible`. + +- Reason why `is_hidden` is good: invisible column feature is using `is_hidden` for [`SHOW COLUMNS`](https://www.cockroachlabs.com/docs/stable/show-columns.html). +- Reason why `visible` is good: this is more consistent what we chose with the first syntax --- VISIBLE | NOT VISIBLE. MySQL is also using [`visible`](https://dev.mysql.com/doc/refman/8.0/en/show-index.html). +- Reason why `is_visible` is bad: less consistent with other columns in [`SHOW INDEX`](https://www.cockroachlabs.com/docs/stable/show-index.html) such as `storing, implicit, non_unique`. + +**Conclusion**: `visible` it is more important to stay consistent with the first user-facing syntax. + +### c. Tables that store indexes information: `crdb_internal.table_indexes` and `information_schema.statistics` +The three options are `is_hidden`, `is_visible`, and `visible`. + +- Reason why `hidden` is good: Invisible column feature uses `hidden` for `table_columns`. +- Reason why `is_visible` is good: MySQL uses `is_visible`. Also, this is more consistent with other columns in `table_indexes`, such as is_unique, is_inverted, is_sharded. +- Reason why `visibility` is good: Oracle uses `visibility`. + +- **Conclusion**: `is_visible` it is more important to stay consistent with the second-user facing syntax. + +### d. Index Descriptor + +We are also introducing another field in the index descriptor (just for internal +use). The options are `Hidden`, `Invisible`, or `NotVisible`. The invisible +column feature is using `Hidden` in the column descriptor. Using visible or +visibility would be odd as well since the default boolean value is false (by +default, index should be visible). + +- **Conclusion**: `NotVisible`. Since we chose `visible` for all invisible index features above, choosing `NotVisible` or `Invisible` here is more consistent. `NotVisible` is preferred here because we are trying to stay away from the keyword `Invisible` to avoid confusion for the first user-facing syntax. + +For more context on how this conclusion was drawn, please see https://github.com/cockroachdb/cockroach/pull/83388 and this RFC PR’s discussion + +# Fine Grained Control of Index Visibility +As of now, the plan is to introduce the general feature of invisible index +first. The design and implementation details for fine-grained control of index +visibility will be discussed in the future. + +Later on, we want to extend this feature and allow a more fine-grained control +of index visibility by introducing the following two features. + +1. Indexes are not restricted to just being visible or invisible; users can + experiment with different levels of visibility. In other words, instead of + using a boolean invisible flag, users can set a float invisible flag between + 0.0 and 100.0. The index would be made invisible only to a corresponding + fraction of queries. Related: + https://github.com/cockroachdb/cockroach/issues/72576#issuecomment-1034301996 + +2. Different sessions of users can set different index visibility. + Related: https://github.com/cockroachdb/cockroach/issues/82363 + +3. We can consider introducing another session variable or another type of + indexes that provides the exact same behaviour as dropping an index. diff --git a/docs/generated/settings/settings-for-tenants.txt b/docs/generated/settings/settings-for-tenants.txt index 04a31da8019e..abcdfc7eea6a 100644 --- a/docs/generated/settings/settings-for-tenants.txt +++ b/docs/generated/settings/settings-for-tenants.txt @@ -294,4 +294,4 @@ trace.jaeger.agent string the address of a Jaeger agent to receive traces using trace.opentelemetry.collector string address of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabled boolean true if set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collector string the address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -version version 22.1-62 set the active cluster version in the format '.' +version version 1000022.1-62 set the active cluster version in the format '.' diff --git a/docs/generated/settings/settings.html b/docs/generated/settings/settings.html index 746c9ab3dba4..3b5b70516311 100644 --- a/docs/generated/settings/settings.html +++ b/docs/generated/settings/settings.html @@ -228,6 +228,6 @@ trace.opentelemetry.collectorstringaddress of an OpenTelemetry trace collector to receive traces using the otel gRPC protocol, as :. If no port is specified, 4317 will be used. trace.span_registry.enabledbooleantrueif set, ongoing traces can be seen at https:///#/debug/tracez trace.zipkin.collectorstringthe address of a Zipkin instance to receive traces, as :. If no port is specified, 9411 will be used. -versionversion22.1-62set the active cluster version in the format '.' +versionversion1000022.1-62set the active cluster version in the format '.' diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index 2d40d27a41e5..4427a90cf0e8 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -1727,6 +1727,7 @@ GO_TARGETS = [ "//pkg/sql/syntheticprivilege:syntheticprivilege_test", "//pkg/sql/tests:tests", "//pkg/sql/tests:tests_test", + "//pkg/sql/ttl/ttlbase:ttlbase", "//pkg/sql/ttl/ttljob:ttljob", "//pkg/sql/ttl/ttljob:ttljob_test", "//pkg/sql/ttl/ttlschedule:ttlschedule", @@ -2752,6 +2753,7 @@ GET_X_DATA_TARGETS = [ "//pkg/sql/storageparam/tablestorageparam:get_x_data", "//pkg/sql/syntheticprivilege:get_x_data", "//pkg/sql/tests:get_x_data", + "//pkg/sql/ttl/ttlbase:get_x_data", "//pkg/sql/ttl/ttljob:get_x_data", "//pkg/sql/ttl/ttlschedule:get_x_data", "//pkg/sql/types:get_x_data", diff --git a/pkg/ccl/backupccl/backup_test.go b/pkg/ccl/backupccl/backup_test.go index d46e1637b9ce..dde171b6f341 100644 --- a/pkg/ccl/backupccl/backup_test.go +++ b/pkg/ccl/backupccl/backup_test.go @@ -8228,7 +8228,7 @@ func TestManifestTooNew(t *testing.T) { require.NoError(t, protoutil.Unmarshal(manifestData, &backupManifest)) // Bump the version and write it back out to make it look newer. - backupManifest.ClusterVersion = roachpb.Version{Major: 99, Minor: 1} + backupManifest.ClusterVersion = roachpb.Version{Major: math.MaxInt32, Minor: 1} manifestData, err = protoutil.Marshal(&backupManifest) require.NoError(t, err) require.NoError(t, os.WriteFile(manifestPath, manifestData, 0644 /* perm */)) @@ -8238,7 +8238,7 @@ func TestManifestTooNew(t *testing.T) { require.NoError(t, os.WriteFile(manifestPath+backupinfo.BackupManifestChecksumSuffix, checksum, 0644 /* perm */)) // Verify we reject it. - sqlDB.ExpectErr(t, "backup from version 99.1 is newer than current version", `RESTORE DATABASE r1 FROM 'nodelocal://0/too_new'`) + sqlDB.ExpectErr(t, "backup from version 2147483647.1 is newer than current version", `RESTORE DATABASE r1 FROM 'nodelocal://0/too_new'`) // Bump the version down and write it back out to make it look older. backupManifest.ClusterVersion = roachpb.Version{Major: 20, Minor: 2, Internal: 2} diff --git a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant index 4108928e244e..3d6473ca810a 100644 --- a/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant +++ b/pkg/ccl/logictestccl/testdata/logic_test/crdb_internal_tenant @@ -360,7 +360,7 @@ select crdb_internal.get_vmodule() · query T -select regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''); +select regexp_replace(regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''), '10000', ''); ---- 22.1 @@ -453,7 +453,7 @@ select * from crdb_internal.gossip_alerts # Anyone can see the executable version. query T -select regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''); +select regexp_replace(regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''), '10000', ''); ---- 22.1 diff --git a/pkg/clusterversion/cockroach_versions.go b/pkg/clusterversion/cockroach_versions.go index 0df42e33dbab..328224f65e51 100644 --- a/pkg/clusterversion/cockroach_versions.go +++ b/pkg/clusterversion/cockroach_versions.go @@ -154,7 +154,7 @@ type Key int // //go:generate stringer -type=Key const ( - _ Key = iota - 1 // want first named one to start at zero + invalidVersionKey Key = iota - 1 // want first named one to start at zero // V21_2 is CockroachDB v21.2. It's used for all v21.2.x patch releases. V21_2 @@ -302,8 +302,8 @@ const TODOPreV21_2 = V21_2 // previously referenced a < 22.1 version until that check/gate can be removed. const TODOPreV22_1 = V22_1 -// versionsSingleton lists all historical versions here in chronological order, -// with comments describing what backwards-incompatible features were +// rawVersionsSingleton lists all historical versions here in chronological +// order, with comments describing what backwards-incompatible features were // introduced. // // A roachpb.Version has the colloquial form MAJOR.MINOR[.PATCH][-INTERNAL], @@ -319,7 +319,11 @@ const TODOPreV22_1 = V22_1 // Such clusters would need to be wiped. As a result, do not bump the major or // minor version until we are absolutely sure that no new migrations will need // to be added (i.e., when cutting the final release candidate). -var versionsSingleton = keyedVersions{ +// +// rawVersionsSingleton is converted to versionsSingleton below, by adding a +// large number to every major if building from master, so as to ensure that +// master builds cannot be upgraded to release-branch builds. +var rawVersionsSingleton = keyedVersions{ { // V21_2 is CockroachDB v21.2. It's used for all v21.2.x patch releases. Key: V21_2, @@ -479,6 +483,36 @@ var versionsSingleton = keyedVersions{ // ************************************************* } +const ( + // unstableVersionsAbove is a cluster version Key above which any upgrades in + // this version are considered unstable development-only versions if it is not + // negative, and upgrading to them should permanently move a cluster to + // development versions. On master it should be the minted version of the last + // release, while on release branches it can be set to invalidVersionKey to + // disable marking any versions as development versions. + unstableVersionsAbove = V22_1 + + // finalVersion should be set on a release branch to the minted final cluster + // version key, e.g. to V22_2 on the release-22.2 branch once it is minted. + // Setting it has the effect of ensuring no versions are subsequently added. + finalVersion = invalidVersionKey +) + +var versionsSingleton = func() keyedVersions { + if unstableVersionsAbove > invalidVersionKey { + const devOffset = 1000000 + // Throw every version above the last release (which will be none on a release + // branch) 1 million major versions into the future, so any "upgrade" to a + // release branch build will be a downgrade and thus blocked. + for i := range rawVersionsSingleton { + if rawVersionsSingleton[i].Key > unstableVersionsAbove { + rawVersionsSingleton[i].Major += devOffset + } + } + } + return rawVersionsSingleton +}() + // TODO(irfansharif): clusterversion.binary{,MinimumSupported}Version // feels out of place. A "cluster version" and a "binary version" are two // separate concepts. @@ -497,11 +531,12 @@ var ( ) func init() { - const isReleaseBranch = false - if isReleaseBranch { - if binaryVersion != ByKey(V21_2) { - panic("unexpected cluster version greater than release's binary version") + if finalVersion > invalidVersionKey { + if binaryVersion != ByKey(finalVersion) { + panic("binary version does not match final version") } + } else if binaryVersion.Internal == 0 { + panic("a non-upgrade cluster version must be the final version") } } diff --git a/pkg/clusterversion/key_string.go b/pkg/clusterversion/key_string.go index 540216642d36..3a25cd0a7915 100644 --- a/pkg/clusterversion/key_string.go +++ b/pkg/clusterversion/key_string.go @@ -8,6 +8,7 @@ func _() { // An "invalid array index" compiler error signifies that the constant values have changed. // Re-run the stringer command to generate them again. var x [1]struct{} + _ = x[invalidVersionKey - -1] _ = x[V21_2-0] _ = x[Start22_1-1] _ = x[ProbeRequest-2] @@ -47,13 +48,14 @@ func _() { _ = x[NoNonMVCCAddSSTable-36] } -const _Key_name = "V21_2Start22_1ProbeRequestEnableSpanConfigStoreEnableNewStoreRebalancerV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJobRangefeedUseOneStreamPerNodeNoNonMVCCAddSSTable" +const _Key_name = "invalidVersionKeyV21_2Start22_1ProbeRequestEnableSpanConfigStoreEnableNewStoreRebalancerV22_1Start22_2LocalTimestampsPebbleFormatSplitUserKeysMarkedCompactedEnsurePebbleFormatVersionRangeKeysEnablePebbleFormatVersionRangeKeysTrigramInvertedIndexesRemoveGrantPrivilegeMVCCRangeTombstonesUpgradeSequenceToBeReferencedByIDSampledStmtDiagReqsAddSSTableTombstonesSystemPrivilegesTableEnablePredicateProjectionChangefeedAlterSystemSQLInstancesAddLocalitySystemExternalConnectionsTableAlterSystemStatementStatisticsAddIndexRecommendationsRoleIDSequenceAddSystemUserIDColumnSystemUsersIDColumnIsBackfilledSetSystemUsersUserIDColumnNotNullSQLSchemaTelemetryScheduledJobsSchemaChangeSupportsCreateFunctionDeleteRequestReturnKeyPebbleFormatPrePebblev1MarkedRoleOptionsTableHasIDColumnRoleOptionsIDColumnIsBackfilledSetRoleOptionsUserIDColumnNotNullUseDelRangeInGCJobWaitedForDelRangeInGCJobRangefeedUseOneStreamPerNodeNoNonMVCCAddSSTable" -var _Key_index = [...]uint16{0, 5, 14, 26, 47, 71, 76, 85, 100, 140, 174, 208, 230, 250, 269, 302, 321, 341, 362, 397, 431, 461, 514, 528, 549, 580, 613, 644, 678, 700, 729, 756, 787, 820, 838, 862, 890, 909} +var _Key_index = [...]uint16{0, 17, 22, 31, 43, 64, 88, 93, 102, 117, 157, 191, 225, 247, 267, 286, 319, 338, 358, 379, 414, 448, 478, 531, 545, 566, 597, 630, 661, 695, 717, 746, 773, 804, 837, 855, 879, 907, 926} func (i Key) String() string { + i -= -1 if i < 0 || i >= Key(len(_Key_index)-1) { - return "Key(" + strconv.FormatInt(int64(i), 10) + ")" + return "Key(" + strconv.FormatInt(int64(i+-1), 10) + ")" } return _Key_name[_Key_index[i]:_Key_index[i+1]] } diff --git a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go index 8ea19348e216..aeab508e8a01 100644 --- a/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go +++ b/pkg/kv/kvclient/kvcoord/dist_sender_rangefeed.go @@ -225,6 +225,8 @@ type PartialRangeFeed struct { CreatedTime time.Time LastValueReceived time.Time Resolved hlc.Timestamp + NumErrs int + LastErr error } // ActiveRangeFeedIterFn is an iterator function which is passed PartialRangeFeed structure. @@ -280,6 +282,14 @@ func (a *activeRangeFeed) onRangeEvent( a.RangeID = rangeID } +func (a *activeRangeFeed) setLastError(err error) { + a.Lock() + defer a.Unlock() + a.LastErr = errors.Wrapf(err, "disconnect at %s: checkpoint %s/-%s", + timeutil.Now().Format(time.RFC3339), a.Resolved, timeutil.Since(a.Resolved.GoTime())) + a.NumErrs++ +} + // rangeFeedRegistry is responsible for keeping track of currently executing // range feeds. type rangeFeedRegistry struct { @@ -389,6 +399,8 @@ func (ds *DistSender) partialRangeFeed( startAfter.Forward(maxTS) if err != nil { + active.setLastError(err) + if log.V(1) { log.Infof(ctx, "RangeFeed %s disconnected with last checkpoint %s ago: %v", span, timeutil.Since(startAfter.GoTime()), err) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go index cfb40e9a7295..bb1b92eb7380 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator.go @@ -87,6 +87,20 @@ var leaseRebalancingAggressiveness = settings.RegisterFloatSetting( settings.NonNegativeFloat, ) +// recoveryStoreSelector controls the strategy for choosing a store to recover +// replicas to: either to any valid store ("good") or to a store that has low +// range count ("best"). With this set to "good", recovering from a dead node or +// from a decommissioning node can be faster, because nodes can send replicas to +// more target stores (instead of multiple nodes sending replicas to a few +// stores with a low range count). +var recoveryStoreSelector = settings.RegisterStringSetting( + settings.SystemOnly, + "kv.allocator.recovery_store_selector", + "if set to 'good', the allocator may recover replicas to any valid store, if set "+ + "to 'best' it will pick one of the most ideal stores", + "good", +) + // AllocatorAction enumerates the various replication adjustments that may be // recommended by the allocator. type AllocatorAction int @@ -850,14 +864,64 @@ type decisionDetails struct { Existing string `json:",omitempty"` } +// CandidateSelector is an interface to select a store from a list of +// candidates. +type CandidateSelector interface { + selectOne(cl candidateList) *candidate +} + +// BestCandidateSelector in used to choose the best store to allocate. +type BestCandidateSelector struct { + randGen allocatorRand +} + +// NewBestCandidateSelector returns a CandidateSelector for choosing the best +// candidate store. +func (a *Allocator) NewBestCandidateSelector() CandidateSelector { + return &BestCandidateSelector{a.randGen} +} + +func (s *BestCandidateSelector) selectOne(cl candidateList) *candidate { + return cl.selectBest(s.randGen) +} + +// GoodCandidateSelector is used to choose a random store out of the stores that +// are good enough. +type GoodCandidateSelector struct { + randGen allocatorRand +} + +// NewGoodCandidateSelector returns a CandidateSelector for choosing a random store +// out of the stores that are good enough. +func (a *Allocator) NewGoodCandidateSelector() CandidateSelector { + return &GoodCandidateSelector{a.randGen} +} + +func (s *GoodCandidateSelector) selectOne(cl candidateList) *candidate { + return cl.selectGood(s.randGen) +} + func (a *Allocator) allocateTarget( ctx context.Context, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replicaStatus ReplicaStatus, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string, error) { candidateStoreList, aliveStoreCount, throttled := a.StorePool.GetStoreList(storepool.StoreFilterThrottled) + // If the replica is alive we are upreplicating, and in that case we want to + // allocate new replicas on the best possible store. Otherwise, the replica is + // dead or decommissioned, and we want to recover the missing replica as soon + // as possible, and therefore any store that is good enough will be + // considered. + var selector CandidateSelector + if replicaStatus == Alive || recoveryStoreSelector.Get(&a.StorePool.St.SV) == "best" { + selector = a.NewBestCandidateSelector() + } else { + selector = a.NewGoodCandidateSelector() + } + target, details := a.AllocateTargetFromList( ctx, candidateStoreList, @@ -865,6 +929,7 @@ func (a *Allocator) allocateTarget( existingVoters, existingNonVoters, a.ScorerOptions(ctx), + selector, // When allocating a *new* replica, we explicitly disregard nodes with any // existing replicas. This is important for multi-store scenarios as // otherwise, stores on the nodes that have existing replicas are simply @@ -902,8 +967,9 @@ func (a *Allocator) AllocateVoter( ctx context.Context, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, VoterTarget) + return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, VoterTarget) } // AllocateNonVoter returns a suitable store for a new allocation of a @@ -913,8 +979,9 @@ func (a *Allocator) AllocateNonVoter( ctx context.Context, conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + replicaStatus ReplicaStatus, ) (roachpb.ReplicationTarget, string, error) { - return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, NonVoterTarget) + return a.allocateTarget(ctx, conf, existingVoters, existingNonVoters, replicaStatus, NonVoterTarget) } // AllocateTargetFromList returns a suitable store for a new allocation of a @@ -926,6 +993,7 @@ func (a *Allocator) AllocateTargetFromList( conf roachpb.SpanConfig, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, options ScorerOptions, + selector CandidateSelector, allowMultipleReplsPerNode bool, targetType TargetReplicaType, ) (roachpb.ReplicationTarget, string) { @@ -967,7 +1035,7 @@ func (a *Allocator) AllocateTargetFromList( ) log.VEventf(ctx, 3, "allocate %s: %s", targetType, candidates) - if target := candidates.selectGood(a.randGen); target != nil { + if target := selector.selectOne(candidates); target != nil { log.VEventf(ctx, 3, "add target: %s", target) details := decisionDetails{Target: target.compactString()} detailsBytes, err := json.Marshal(details) @@ -1101,7 +1169,7 @@ func (a Allocator) RemoveTarget( ) log.VEventf(ctx, 3, "remove %s: %s", targetType, rankedCandidates) - if bad := rankedCandidates.selectBad(a.randGen); bad != nil { + if bad := rankedCandidates.selectWorst(a.randGen); bad != nil { for _, exist := range existingReplicas { if exist.StoreID == bad.store.StoreID { log.VEventf(ctx, 3, "remove target: %s", bad) diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go index cb79b1017424..6df62e184ae3 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer.go @@ -778,6 +778,23 @@ func (cl candidateList) best() candidateList { return cl } +// good returns all the elements in a sorted (by score reversed) candidate list +// that share the highest diversity score and are valid. +func (cl candidateList) good() candidateList { + cl = cl.onlyValidAndHealthyDisk() + if len(cl) <= 1 { + return cl + } + for i := 1; i < len(cl); i++ { + if cl[i].necessary == cl[0].necessary && + scoresAlmostEqual(cl[i].diversityScore, cl[0].diversityScore) { + continue + } + return cl[:i] + } + return cl +} + // worst returns all the elements in a sorted (by score reversed) candidate list // that share the lowest constraint score (for instance, the set of candidates // that result in the lowest diversity score for the range, or the set of @@ -836,9 +853,9 @@ func (cl candidateList) betterThan(c candidate) candidateList { return cl } -// selectGood randomly chooses a good candidate store from a sorted (by score -// reversed) candidate list using the provided random generator. -func (cl candidateList) selectGood(randGen allocatorRand) *candidate { +// selectBest randomly chooses one of the best candidate stores from a sorted +// (by score reversed) candidate list using the provided random generator. +func (cl candidateList) selectBest(randGen allocatorRand) *candidate { cl = cl.best() if len(cl) == 0 { return nil @@ -858,9 +875,26 @@ func (cl candidateList) selectGood(randGen allocatorRand) *candidate { return best } -// selectBad randomly chooses a bad candidate store from a sorted (by score +// selectGood randomly chooses a good candidate store from a sorted (by score // reversed) candidate list using the provided random generator. -func (cl candidateList) selectBad(randGen allocatorRand) *candidate { +func (cl candidateList) selectGood(randGen allocatorRand) *candidate { + cl = cl.good() + if len(cl) == 0 { + return nil + } + if len(cl) == 1 { + return &cl[0] + } + randGen.Lock() + r := randGen.Intn(len(cl)) + randGen.Unlock() + c := &cl[r] + return c +} + +// selectWorst randomly chooses one of the worst candidate stores from a sorted +// (by score reversed) candidate list using the provided random generator. +func (cl candidateList) selectWorst(randGen allocatorRand) *candidate { cl = cl.worst() if len(cl) == 0 { return nil @@ -1570,7 +1604,7 @@ func bestRebalanceTarget( if len(option.candidates) == 0 { continue } - target := option.candidates.selectGood(randGen) + target := option.candidates.selectBest(randGen) if target == nil { continue } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go index f6d6e1111ab9..f0d49c02eb34 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_scorer_test.go @@ -27,6 +27,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/kr/pretty" + "github.com/stretchr/testify/require" ) type storeScore struct { @@ -95,9 +96,8 @@ func TestOnlyValidAndHealthyDisk(t *testing.T) { } } -// TestSelectGoodPanic is a basic regression test against a former panic in -// selectGood when called with just invalid/full stores. -func TestSelectGoodPanic(t *testing.T) { +// TestNilSelection verifies selection with just invalid/full stores. +func TestNilSelection(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -107,12 +107,11 @@ func TestSelectGoodPanic(t *testing.T) { }, } allocRand := makeAllocatorRand(rand.NewSource(0)) - if good := cl.selectGood(allocRand); good != nil { - t.Errorf("cl.selectGood() got %v, want nil", good) - } + require.Nil(t, cl.selectBest(allocRand)) + require.Nil(t, cl.selectGood(allocRand)) } -// TestCandidateSelection tests select{good,bad} and {best,worst}constraints. +// TestCandidateSelection tests select{Best,Good,Worst} and {best,good,worst}constraints. func TestCandidateSelection(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) @@ -153,60 +152,76 @@ func TestCandidateSelection(t *testing.T) { } testCases := []struct { - candidates []scoreTuple - best []scoreTuple - worst []scoreTuple - good scoreTuple - bad scoreTuple + candidates []scoreTuple + best []scoreTuple + good []scoreTuple + worst []scoreTuple + bestChosen scoreTuple + goodChosen scoreTuple + worstChosen scoreTuple }{ { - candidates: []scoreTuple{{0, 0}}, - best: []scoreTuple{{0, 0}}, - worst: []scoreTuple{{0, 0}}, - good: scoreTuple{0, 0}, - bad: scoreTuple{0, 0}, - }, - { - candidates: []scoreTuple{{0, 0}, {0, 1}}, - best: []scoreTuple{{0, 0}, {0, 1}}, - worst: []scoreTuple{{0, 0}, {0, 1}}, - good: scoreTuple{0, 0}, - bad: scoreTuple{0, 1}, - }, - { - candidates: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, - best: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, - worst: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, - good: scoreTuple{0, 1}, - bad: scoreTuple{0, 2}, - }, - { - candidates: []scoreTuple{{1, 0}, {0, 1}}, - best: []scoreTuple{{1, 0}}, - worst: []scoreTuple{{0, 1}}, - good: scoreTuple{1, 0}, - bad: scoreTuple{0, 1}, - }, - { - candidates: []scoreTuple{{1, 0}, {0, 1}, {0, 2}}, - best: []scoreTuple{{1, 0}}, - worst: []scoreTuple{{0, 1}, {0, 2}}, - good: scoreTuple{1, 0}, - bad: scoreTuple{0, 2}, - }, - { - candidates: []scoreTuple{{1, 0}, {1, 1}, {0, 2}}, - best: []scoreTuple{{1, 0}, {1, 1}}, - worst: []scoreTuple{{0, 2}}, - good: scoreTuple{1, 0}, - bad: scoreTuple{0, 2}, - }, - { - candidates: []scoreTuple{{1, 0}, {1, 1}, {0, 2}, {0, 3}}, - best: []scoreTuple{{1, 0}, {1, 1}}, - worst: []scoreTuple{{0, 2}, {0, 3}}, - good: scoreTuple{1, 0}, - bad: scoreTuple{0, 3}, + candidates: []scoreTuple{{0, 0}}, + best: []scoreTuple{{0, 0}}, + good: []scoreTuple{{0, 0}}, + worst: []scoreTuple{{0, 0}}, + bestChosen: scoreTuple{0, 0}, + goodChosen: scoreTuple{0, 0}, + worstChosen: scoreTuple{0, 0}, + }, + { + candidates: []scoreTuple{{0, 0}, {0, 1}}, + best: []scoreTuple{{0, 0}, {0, 1}}, + good: []scoreTuple{{0, 0}, {0, 1}}, + worst: []scoreTuple{{0, 0}, {0, 1}}, + bestChosen: scoreTuple{0, 0}, + goodChosen: scoreTuple{0, 1}, + worstChosen: scoreTuple{0, 1}, + }, + { + candidates: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, + best: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, + good: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, + worst: []scoreTuple{{0, 0}, {0, 1}, {0, 2}}, + bestChosen: scoreTuple{0, 0}, + goodChosen: scoreTuple{0, 0}, + worstChosen: scoreTuple{0, 1}, + }, + { + candidates: []scoreTuple{{1, 0}, {0, 1}}, + best: []scoreTuple{{1, 0}}, + good: []scoreTuple{{1, 0}}, + worst: []scoreTuple{{0, 1}}, + bestChosen: scoreTuple{1, 0}, + goodChosen: scoreTuple{1, 0}, + worstChosen: scoreTuple{0, 1}, + }, + { + candidates: []scoreTuple{{1, 0}, {0, 1}, {0, 2}}, + best: []scoreTuple{{1, 0}}, + good: []scoreTuple{{1, 0}}, + worst: []scoreTuple{{0, 1}, {0, 2}}, + bestChosen: scoreTuple{1, 0}, + goodChosen: scoreTuple{1, 0}, + worstChosen: scoreTuple{0, 2}, + }, + { + candidates: []scoreTuple{{1, 0}, {1, 1}, {0, 2}}, + best: []scoreTuple{{1, 0}, {1, 1}}, + good: []scoreTuple{{1, 0}, {1, 1}}, + worst: []scoreTuple{{0, 2}}, + bestChosen: scoreTuple{1, 0}, + goodChosen: scoreTuple{1, 1}, + worstChosen: scoreTuple{0, 2}, + }, + { + candidates: []scoreTuple{{1, 0}, {1, 1}, {0, 2}, {0, 3}}, + best: []scoreTuple{{1, 0}, {1, 1}}, + good: []scoreTuple{{1, 0}, {1, 1}}, + worst: []scoreTuple{{0, 2}, {0, 3}}, + bestChosen: scoreTuple{1, 0}, + goodChosen: scoreTuple{1, 0}, + worstChosen: scoreTuple{0, 3}, }, } @@ -218,6 +233,11 @@ func TestCandidateSelection(t *testing.T) { t.Errorf("expected:%s actual:%s diff:%v", formatter(e), formatter(a), pretty.Diff(e, a)) } }) + t.Run(fmt.Sprintf("good-%s", formatter(cl)), func(t *testing.T) { + if a, e := cl.good(), genCandidates(tc.good, 1); !reflect.DeepEqual(a, e) { + t.Errorf("expected:%s actual:%s diff:%v", formatter(e), formatter(a), pretty.Diff(e, a)) + } + }) t.Run(fmt.Sprintf("worst-%s", formatter(cl)), func(t *testing.T) { // Shifting the ids is required to match the end of the list. if a, e := cl.worst(), genCandidates( @@ -227,24 +247,34 @@ func TestCandidateSelection(t *testing.T) { t.Errorf("expected:%s actual:%s diff:%v", formatter(e), formatter(a), pretty.Diff(e, a)) } }) - t.Run(fmt.Sprintf("good-%s", formatter(cl)), func(t *testing.T) { + t.Run(fmt.Sprintf("select-best-%s", formatter(cl)), func(t *testing.T) { + best := cl.selectBest(allocRand) + if best == nil { + t.Fatalf("no 'best' candidate found") + } + actual := scoreTuple{int(best.diversityScore + 0.5), best.rangeCount} + if actual != tc.bestChosen { + t.Errorf("expected:%v actual:%v", tc.bestChosen, actual) + } + }) + t.Run(fmt.Sprintf("select-good-%s", formatter(cl)), func(t *testing.T) { good := cl.selectGood(allocRand) if good == nil { - t.Fatalf("no good candidate found") + t.Fatalf("no 'good' candidate found") } actual := scoreTuple{int(good.diversityScore + 0.5), good.rangeCount} - if actual != tc.good { - t.Errorf("expected:%v actual:%v", tc.good, actual) + if actual != tc.goodChosen { + t.Errorf("expected:%v actual:%v", tc.goodChosen, actual) } }) - t.Run(fmt.Sprintf("bad-%s", formatter(cl)), func(t *testing.T) { - bad := cl.selectBad(allocRand) - if bad == nil { - t.Fatalf("no bad candidate found") + t.Run(fmt.Sprintf("select-worst-%s", formatter(cl)), func(t *testing.T) { + worst := cl.selectWorst(allocRand) + if worst == nil { + t.Fatalf("no 'worst' candidate found") } - actual := scoreTuple{int(bad.diversityScore + 0.5), bad.rangeCount} - if actual != tc.bad { - t.Errorf("expected:%v actual:%v", tc.bad, actual) + actual := scoreTuple{int(worst.diversityScore + 0.5), worst.rangeCount} + if actual != tc.worstChosen { + t.Errorf("expected:%v actual:%v", tc.worstChosen, actual) } }) } diff --git a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go index 8699c700d5bb..3fb2113e9430 100644 --- a/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go +++ b/pkg/kv/kvserver/allocator/allocatorimpl/allocator_test.go @@ -559,6 +559,7 @@ func TestAllocatorSimpleRetrieval(t *testing.T) { ctx, simpleSpanConfig, nil /* existingVoters */, nil, /* existingNonVoters */ + Dead, ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -579,6 +580,7 @@ func TestAllocatorNoAvailableDisks(t *testing.T) { ctx, simpleSpanConfig, nil /* existingVoters */, nil, /* existingNonVoters */ + Dead, ) if !roachpb.Empty(result) { t.Errorf("expected nil result: %+v", result) @@ -594,64 +596,84 @@ func TestAllocatorReadAmpCheck(t *testing.T) { ctx := context.Background() type testCase struct { - name string - stores []*roachpb.StoreDescriptor - conf roachpb.SpanConfig - expectedAddTarget roachpb.StoreID - enforcement StoreHealthEnforcement + name string + stores []*roachpb.StoreDescriptor + conf roachpb.SpanConfig + // The expected store to add when replicas are alive. The allocator should + // pick one of the best stores, with low range count. + expectedTargetIfAlive roachpb.StoreID + // The expected store to add when a replica is dead or decommissioning. The + // allocator should pick a store that is good enough, ignoring the range + // count. + expectedTargetIfDead roachpb.StoreID + enforcement StoreHealthEnforcement } tests := []testCase{ { - name: "ignore read amp on allocation when StoreHealthNoAction enforcement", + name: "ignore read amp on allocation when StoreHealthNoAction enforcement", + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), // NB: All stores have high read amp, this should be ignored and // allocate to the store with the lowest range count. - stores: allStoresHighReadAmp, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(3), - enforcement: StoreHealthNoAction, + expectedTargetIfAlive: roachpb.StoreID(3), + // Recovery of a dead node can pick any valid store, not necessarily the + // one with the lowest range count. + expectedTargetIfDead: roachpb.StoreID(2), + enforcement: StoreHealthNoAction, }, { name: "ignore read amp on allocation when storeHealthLogOnly enforcement", // NB: All stores have high read amp, this should be ignored and // allocate to the store with the lowest range count. - stores: allStoresHighReadAmp, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(3), - enforcement: StoreHealthLogOnly, + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedTargetIfAlive: roachpb.StoreID(3), + // Recovery of a dead node can pick any valid store, not necessarily the + // one with the lowest range count. + expectedTargetIfDead: roachpb.StoreID(2), + enforcement: StoreHealthLogOnly, }, { name: "ignore read amp on allocation when StoreHealthBlockRebalanceTo enforcement", // NB: All stores have high read amp, this should be ignored and // allocate to the store with the lowest range count. - stores: allStoresHighReadAmp, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(3), - enforcement: StoreHealthBlockRebalanceTo, + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedTargetIfAlive: roachpb.StoreID(3), + // Recovery of a dead node can pick any valid store, not necessarily the + // one with the lowest range count. + expectedTargetIfDead: roachpb.StoreID(2), + enforcement: StoreHealthBlockRebalanceTo, }, { name: "don't allocate to stores when all have high read amp and StoreHealthBlockAll", // NB: All stores have high read amp (limit + 1), none are above the watermark, select the lowest range count. - stores: allStoresHighReadAmp, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(3), - enforcement: StoreHealthBlockAll, + stores: allStoresHighReadAmp, + conf: emptySpanConfig(), + expectedTargetIfAlive: roachpb.StoreID(3), + // Recovery of a dead node can pick any valid store, not necessarily the + // one with the lowest range count. + expectedTargetIfDead: roachpb.StoreID(2), + enforcement: StoreHealthBlockAll, }, { name: "allocate to store below the mean when all have high read amp and StoreHealthBlockAll", // NB: All stores have high read amp, however store 1 is below the watermark mean read amp. - stores: allStoresHighReadAmpSkewed, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(1), - enforcement: StoreHealthBlockAll, + stores: allStoresHighReadAmpSkewed, + conf: emptySpanConfig(), + expectedTargetIfAlive: roachpb.StoreID(1), + expectedTargetIfDead: roachpb.StoreID(1), + enforcement: StoreHealthBlockAll, }, { name: "allocate to lowest range count store without high read amp when StoreHealthBlockAll enforcement", // NB: Store 1, 2 and 3 have high read amp and are above the watermark, the lowest range count (4) // should be selected. - stores: threeStoresHighReadAmpAscRangeCount, - conf: emptySpanConfig(), - expectedAddTarget: roachpb.StoreID(4), - enforcement: StoreHealthBlockAll, + stores: threeStoresHighReadAmpAscRangeCount, + conf: emptySpanConfig(), + expectedTargetIfAlive: roachpb.StoreID(4), + expectedTargetIfDead: roachpb.StoreID(4), + enforcement: StoreHealthBlockAll, }, } @@ -661,22 +683,45 @@ func TestAllocatorReadAmpCheck(t *testing.T) { for i, test := range tests { t.Run(fmt.Sprintf("%d_%s", i+1, test.name), func(t *testing.T) { - stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, false /* deterministic */) + stopper, g, _, a, _ := CreateTestAllocator(ctx, 10, true /* deterministic */) defer stopper.Stop(ctx) sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) // Enable read disk health checking in candidate exclusion. l0SublevelsThresholdEnforce.Override(ctx, &a.StorePool.St.SV, int64(test.enforcement)) + + // Allocate a voter where all replicas are alive (e.g. up-replicating a valid range). add, _, err := a.AllocateVoter( ctx, test.conf, nil, nil, + Alive, + ) + require.NoError(t, err) + require.Truef(t, + chk(add, test.expectedTargetIfAlive), + "the addition target %+v from AllocateVoter doesn't match expectation", + add) + + // Allocate a voter where we have a dead (or decommissioning) replica. + add, _, err = a.AllocateVoter( + ctx, + test.conf, + nil, + nil, + // Dead and Decommissioning should behave the same here, use either. + func() ReplicaStatus { + if i%2 == 0 { + return Dead + } + return Decommissioning + }(), ) require.NoError(t, err) require.Truef(t, - chk(add, test.expectedAddTarget), + chk(add, test.expectedTargetIfDead), "the addition target %+v from AllocateVoter doesn't match expectation", add) }) @@ -695,6 +740,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { ctx, multiDCConfigSSD, nil /* existingVoters */, nil, /* existingNonVoters */ + Dead, ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -706,6 +752,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { NodeID: result1.NodeID, StoreID: result1.StoreID, }}, nil, /* existingNonVoters */ + Dead, ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -729,6 +776,7 @@ func TestAllocatorTwoDatacenters(t *testing.T) { StoreID: result2.StoreID, }, }, nil, /* existingNonVoters */ + Dead, ) if err == nil { t.Errorf("expected error on allocation without available stores: %+v", result3) @@ -762,6 +810,7 @@ func TestAllocatorExistingReplica(t *testing.T) { StoreID: 2, }, }, nil, /* existingNonVoters */ + Dead, ) if err != nil { t.Fatalf("Unable to perform allocation: %+v", err) @@ -865,6 +914,7 @@ func TestAllocatorMultipleStoresPerNode(t *testing.T) { { result, _, err := a.AllocateVoter( ctx, emptySpanConfig(), tc.existing, nil, + Dead, ) if e, a := tc.expectTargetAllocate, !roachpb.Empty(result); e != a { t.Errorf( @@ -2920,7 +2970,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { // Allocate the voting replica first, before the non-voter. This is the // order in which we'd expect the allocator to repair a given range. See // TestAllocatorComputeAction. - voterTarget, _, err := a.AllocateVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters) + voterTarget, _, err := a.AllocateVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldVoterAllocFail { require.Errorf(t, err, "expected voter allocation to fail; got %v as a valid target instead", voterTarget) } else { @@ -2929,7 +2979,7 @@ func TestAllocatorConstraintsAndVoterConstraints(t *testing.T) { test.existingVoters = append(test.existingVoters, replicas(voterTarget.StoreID)...) } - nonVoterTarget, _, err := a.AllocateNonVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters) + nonVoterTarget, _, err := a.AllocateNonVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldNonVoterAllocFail { require.Errorf(t, err, "expected non-voter allocation to fail; got %v as a valid target instead", nonVoterTarget) } else { @@ -3003,7 +3053,7 @@ func TestAllocatorAllocateTargetLocality(t *testing.T) { StoreID: storeID, } } - targetStore, details, err := a.AllocateVoter(ctx, emptySpanConfig(), existingRepls, nil) + targetStore, details, err := a.AllocateVoter(ctx, emptySpanConfig(), existingRepls, nil, Dead) if err != nil { t.Fatal(err) } @@ -3469,7 +3519,7 @@ func TestAllocatorNonVoterAllocationExcludesVoterNodes(t *testing.T) { sg := gossiputil.NewStoreGossiper(g) sg.GossipStores(test.stores, t) - result, _, err := a.AllocateNonVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters) + result, _, err := a.AllocateNonVoter(ctx, test.conf, test.existingVoters, test.existingNonVoters, Dead) if test.shouldFail { require.Error(t, err) require.Regexp(t, test.expError, err) diff --git a/pkg/kv/kvserver/allocator_impl_test.go b/pkg/kv/kvserver/allocator_impl_test.go index 04d174baa883..ba454a8bcf01 100644 --- a/pkg/kv/kvserver/allocator_impl_test.go +++ b/pkg/kv/kvserver/allocator_impl_test.go @@ -252,14 +252,14 @@ func TestAllocatorThrottled(t *testing.T) { defer stopper.Stop(ctx) // First test to make sure we would send the replica to purgatory. - _, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) + _, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); !ok { t.Fatalf("expected a purgatory error, got: %+v", err) } // Second, test the normal case in which we can allocate to the store. gossiputil.NewStoreGossiper(g).GossipStores(singleStore, t) - result, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) + result, _, err := a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if err != nil { t.Fatalf("unable to perform allocation: %+v", err) } @@ -276,7 +276,7 @@ func TestAllocatorThrottled(t *testing.T) { } storeDetail.ThrottledUntil = timeutil.Now().Add(24 * time.Hour) a.StorePool.DetailsMu.Unlock() - _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil) + _, _, err = a.AllocateVoter(ctx, simpleSpanConfig, []roachpb.ReplicaDescriptor{}, nil, allocatorimpl.Dead) if _, ok := IsPurgatoryError(err); ok { t.Fatalf("expected a non purgatory error, got: %+v", err) } diff --git a/pkg/kv/kvserver/client_migration_test.go b/pkg/kv/kvserver/client_migration_test.go index ddb17fca56a6..6efacf3226ad 100644 --- a/pkg/kv/kvserver/client_migration_test.go +++ b/pkg/kv/kvserver/client_migration_test.go @@ -55,7 +55,7 @@ func TestStorePurgeOutdatedReplicas(t *testing.T) { t.Run(fmt.Sprintf("with-initial-version=%t", withInitialVersion), func(t *testing.T) { const numStores = 3 ctx := context.Background() - migrationVersion := roachpb.Version{Major: 42} + migrationVersion := roachpb.Version{Major: 1000042} storeKnobs := &kvserver.StoreTestingKnobs{ DisableEagerReplicaRemoval: true, diff --git a/pkg/kv/kvserver/kvserverpb/raft.proto b/pkg/kv/kvserver/kvserverpb/raft.proto index 1b02b41d473b..f5cbd37383a6 100644 --- a/pkg/kv/kvserver/kvserverpb/raft.proto +++ b/pkg/kv/kvserver/kvserverpb/raft.proto @@ -155,6 +155,15 @@ message SnapshotRequest { reserved 2; } + // QueueName indicates the source of the snapshot. Snapshots are prioritized + // within a queue and round-robin selected between queues for both the sending + // and receiving side. + enum QueueName { + OTHER = 0; + REPLICATE_QUEUE = 1; + RAFT_SNAPSHOT_QUEUE = 2; + } + message Header { // The replica state at the time the snapshot was generated. Note // that ReplicaState.Desc differs from the above range_descriptor @@ -170,12 +179,16 @@ message SnapshotRequest { int64 range_size = 3; // The priority of the snapshot. + // Deprecated, prefer sender_queue_priority. + // TODO(abaptist): Remove this field for v23.1. Priority priority = 6; // The strategy of the snapshot. Strategy strategy = 7; // The type of the snapshot. + // Deprecated, prefer sender_queue_name. + // TODO(abaptist): Remove this field for v23.1. Type type = 9; // Whether the snapshot uses the unreplicated RaftTruncatedState or not. @@ -190,6 +203,14 @@ message SnapshotRequest { // TODO(irfansharif): Remove this in v22.1. bool deprecated_unreplicated_truncated_state = 8; + // The sending queue's name, to be utilized to ensure fairness across + // different snapshot sending sources. + SnapshotRequest.QueueName sender_queue_name = 10; + + // The sending queue's priority, to be utilized to prioritize snapshots + // from a particular sending source. + double sender_queue_priority = 11; + reserved 1, 4; } @@ -237,6 +258,12 @@ message DelegateSnapshotRequest { // The priority of the snapshot. SnapshotRequest.Priority priority = 5; + // The sending queue's name. + SnapshotRequest.QueueName sender_queue_name = 9; + + // The sending queue's priority. + double sender_queue_priority = 10; + // The type of the snapshot. SnapshotRequest.Type type = 6; diff --git a/pkg/kv/kvserver/multiqueue/multi_queue.go b/pkg/kv/kvserver/multiqueue/multi_queue.go index 813c07bd3a6a..588c4556de46 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue.go @@ -120,8 +120,8 @@ type Permit struct { } // tryRunNextLocked will run the next task in order round-robin through the -// queues and in priority order within a queue. It will return true if it ran a -// task. The MultiQueue.mu lock must be held before calling this func. +// queues and in priority order within a queue. +// MultiQueue.mu lock must be held before calling this function. func (m *MultiQueue) tryRunNextLocked() { // If no permits are left, then we can't run anything. if m.remainingRuns <= 0 { @@ -130,7 +130,7 @@ func (m *MultiQueue) tryRunNextLocked() { for i := 0; i < len(m.outstanding); i++ { // Start with the next queue in order and iterate through all empty queues. - // If all queues are empty then return false signaling that nothing was run. + // If all queues are empty then return, as there is nothing to run. index := (m.lastQueueIndex + i + 1) % len(m.outstanding) if m.outstanding[index].Len() > 0 { task := heap.Pop(&m.outstanding[index]).(*Task) @@ -142,7 +142,7 @@ func (m *MultiQueue) tryRunNextLocked() { } } -// Add returns a Task that must be closed (calling Task.Close) to +// Add returns a Task that must be closed (calling m.Release(..)) to // release the Permit. The number of types is expected to // be relatively small and not be changing over time. func (m *MultiQueue) Add(queueType int, priority float64) *Task { @@ -166,10 +166,7 @@ func (m *MultiQueue) Add(queueType int, priority float64) *Task { } heap.Push(&m.outstanding[pos], &newTask) - // Once we are done adding a task, signal the main loop in case it finished - // all its work and was waiting for more work. We are holding the mu lock when - // signaling, so we guarantee that it will not be able to respond to the - // signal until after we release the lock. + // Once we are done adding a task, attempt to signal the next waiting task. m.tryRunNextLocked() return &newTask @@ -184,21 +181,25 @@ func (m *MultiQueue) Cancel(task *Task) { // Task will track its position within the queue. queueIdx := m.mapping[task.queueType] ok := m.outstanding[queueIdx].tryRemove(task) + // Close the permit channel so that waiters stop blocking. + if ok { + close(task.permitC) + return + } // If we get here, we are racing with the task being started. The concern is // that the caller may also call MultiQueue.Release since the task was // started. Either we get the permit or the caller, so we guarantee only one // release will be called. - if !ok { - select { - case p, ok := <-task.permitC: - // Only release if the channel is open, and we can get the permit. - if ok { - m.releaseLocked(p) - } - default: - // If we are not able to get the permit, this means the permit has already - // been given to the caller, and they must call Release on it. + select { + case p, ok := <-task.permitC: + // Only release if the channel is open, and we can get the permit. + if ok { + close(task.permitC) + m.releaseLocked(p) } + default: + // If we are not able to get the permit, this means the permit has already + // been given to the caller, and they must call Release on it. } } diff --git a/pkg/kv/kvserver/multiqueue/multi_queue_test.go b/pkg/kv/kvserver/multiqueue/multi_queue_test.go index ce7e676b3348..8e693a1e210f 100644 --- a/pkg/kv/kvserver/multiqueue/multi_queue_test.go +++ b/pkg/kv/kvserver/multiqueue/multi_queue_test.go @@ -121,6 +121,100 @@ func TestMultiQueueRemove(t *testing.T) { verifyOrder(t, queue, a3, b3, c3, a2, c2) } +func TestMultiQueueCancelOne(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + queue := NewMultiQueue(1) + task := queue.Add(1, 1) + queue.Cancel(task) +} + +func TestMultiQueueCancelInProgress(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + queue := NewMultiQueue(1) + + const a = 1 + const b = 2 + const c = 3 + + a3 := queue.Add(a, 5.0) + a2 := queue.Add(a, 4.0) + b1 := queue.Add(b, 1.1) + b2 := queue.Add(b, 2.1) + c3 := queue.Add(c, 2.2) + b3 := queue.Add(b, 6.1) + + queue.Cancel(b2) + queue.Cancel(b1) + + started := 0 + completed := 0 + startTask := func(task *Task) (*Permit, bool) { + select { + case permit, ok := <-task.GetWaitChan(): + if ok { + started++ + return permit, true + } + case <-time.After(time.Second): + t.Fatalf(`should not wait for task on queue %d with priority %f to start`, + task.queueType, task.priority, + ) + } + return nil, false + } + + completeTask := func(task *Task, permit *Permit) { + releaseStarted := make(chan struct{}) + releaseFinished := make(chan struct{}) + go func() { + close(releaseStarted) + queue.Release(permit) + close(releaseFinished) + }() + <-releaseStarted + select { + case <-releaseFinished: + completed++ + case <-time.After(time.Second): + t.Fatalf(`should not wait for task on queue %d with priority %f to complete`, + task.queueType, task.priority, + ) + } + } + + // Execute a3. + a3Permit, ok := startTask(a3) + require.True(t, ok) + completeTask(a3, a3Permit) + + // Cancel b3 before starting. Should not be able to get permit. + queue.Cancel(b3) + _, ok = startTask(b3) + require.False(t, ok) + + // Now, should be able to execute c3 immediately. + c3Permit, ok := startTask(c3) + require.True(t, ok) + + // A and C started + require.Equal(t, 2, started) + // A completed + require.Equal(t, 1, completed) + + // Complete c3 and cancel after completion. + completeTask(c3, c3Permit) + queue.Cancel(c3) + + // Start a2, which is the final item and also should not block to start. + startTask(a2) + + require.Equal(t, 3, started) + require.Equal(t, 2, completed) +} + // TestMultiQueueStress calls Add from multiple threads. It chooses different // names and different priorities for the requests. The goal is simply to make // sure that all the requests are serviced and nothing hangs or fails. diff --git a/pkg/kv/kvserver/raft_snapshot_queue.go b/pkg/kv/kvserver/raft_snapshot_queue.go index 61419699ecec..c3f1d848325a 100644 --- a/pkg/kv/kvserver/raft_snapshot_queue.go +++ b/pkg/kv/kvserver/raft_snapshot_queue.go @@ -140,7 +140,7 @@ func (rq *raftSnapshotQueue) processRaftSnapshot( } } - err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY) + err := repl.sendSnapshot(ctx, repDesc, snapType, kvserverpb.SnapshotRequest_RECOVERY, kvserverpb.SnapshotRequest_RAFT_SNAPSHOT_QUEUE, raftSnapshotPriority) // NB: if the snapshot fails because of an overlapping replica on the // recipient which is also waiting for a snapshot, the "smart" thing is to diff --git a/pkg/kv/kvserver/replica_command.go b/pkg/kv/kvserver/replica_command.go index 4d669222f666..d32d3e80bf6c 100644 --- a/pkg/kv/kvserver/replica_command.go +++ b/pkg/kv/kvserver/replica_command.go @@ -981,13 +981,15 @@ func (r *Replica) ChangeReplicas( return nil, errors.New("must disable replicate queue to use ChangeReplicas manually") } } - return r.changeReplicasImpl(ctx, desc, priority, reason, details, chgs) + return r.changeReplicasImpl(ctx, desc, priority, kvserverpb.SnapshotRequest_OTHER, 0.0, reason, details, chgs) } func (r *Replica) changeReplicasImpl( ctx context.Context, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + senderName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, reason kvserverpb.RangeLogEventReason, details string, chgs roachpb.ReplicationChanges, @@ -1054,7 +1056,7 @@ func (r *Replica) changeReplicasImpl( _ = roachpb.ReplicaSet.LearnerDescriptors var err error desc, err = r.initializeRaftLearners( - ctx, desc, priority, reason, details, adds, roachpb.LEARNER, + ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.LEARNER, ) if err != nil { return nil, err @@ -1100,7 +1102,7 @@ func (r *Replica) changeReplicasImpl( // disruption to foreground traffic. See // https://github.com/cockroachdb/cockroach/issues/63199 for an example. desc, err = r.initializeRaftLearners( - ctx, desc, priority, reason, details, adds, roachpb.NON_VOTER, + ctx, desc, priority, senderName, senderQueuePriority, reason, details, adds, roachpb.NON_VOTER, ) if err != nil { return nil, err @@ -1654,6 +1656,8 @@ func (r *Replica) initializeRaftLearners( ctx context.Context, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + senderName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, reason kvserverpb.RangeLogEventReason, details string, targets []roachpb.ReplicationTarget, @@ -1799,7 +1803,9 @@ func (r *Replica) initializeRaftLearners( // orphaned learner. Second, this tickled some bugs in etcd/raft around // switching between StateSnapshot and StateProbe. Even if we worked through // these, it would be susceptible to future similar issues. - if err := r.sendSnapshot(ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority); err != nil { + if err := r.sendSnapshot( + ctx, rDesc, kvserverpb.SnapshotRequest_INITIAL, priority, senderName, senderQueuePriority, + ); err != nil { return nil, err } } @@ -2584,6 +2590,8 @@ func (r *Replica) sendSnapshot( recipient roachpb.ReplicaDescriptor, snapType kvserverpb.SnapshotRequest_Type, priority kvserverpb.SnapshotRequest_Priority, + senderQueueName kvserverpb.SnapshotRequest_QueueName, + senderQueuePriority float64, ) (retErr error) { defer func() { // Report the snapshot status to Raft, which expects us to do this once we @@ -2631,13 +2639,15 @@ func (r *Replica) sendSnapshot( // Create new delegate snapshot request with only required metadata. delegateRequest := &kvserverpb.DelegateSnapshotRequest{ - RangeID: r.RangeID, - CoordinatorReplica: sender, - RecipientReplica: recipient, - Priority: priority, - Type: snapType, - Term: status.Term, - DelegatedSender: sender, + RangeID: r.RangeID, + CoordinatorReplica: sender, + RecipientReplica: recipient, + Priority: priority, + SenderQueueName: senderQueueName, + SenderQueuePriority: senderQueuePriority, + Type: snapType, + Term: status.Term, + DelegatedSender: sender, } err = contextutil.RunWithTimeout( ctx, "delegate-snapshot", sendSnapshotTimeout, func(ctx context.Context) error { @@ -2777,10 +2787,12 @@ func (r *Replica) followerSendSnapshot( Snapshot: snap.RaftSnap, }, }, - RangeSize: rangeSize, - Priority: req.Priority, - Strategy: kvserverpb.SnapshotRequest_KV_BATCH, - Type: req.Type, + RangeSize: rangeSize, + Priority: req.Priority, + SenderQueueName: req.SenderQueueName, + SenderQueuePriority: req.SenderQueuePriority, + Strategy: kvserverpb.SnapshotRequest_KV_BATCH, + Type: req.Type, } newBatchFn := func() storage.Batch { return r.store.Engine().NewUnindexedBatch(true /* writeOnly */) @@ -3226,6 +3238,7 @@ func (r *Replica) relocateOne( existingVoters, existingNonVoters, r.store.allocator.ScorerOptions(ctx), + r.store.allocator.NewBestCandidateSelector(), // NB: Allow the allocator to return target stores that might be on the // same node as an existing replica. This is to ensure that relocations // that require "lateral" movement of replicas within a node can succeed. diff --git a/pkg/kv/kvserver/replicate_queue.go b/pkg/kv/kvserver/replicate_queue.go index cdd55b16f6a7..592d31ea2eb7 100644 --- a/pkg/kv/kvserver/replicate_queue.go +++ b/pkg/kv/kvserver/replicate_queue.go @@ -774,7 +774,7 @@ func (rq *replicateQueue) processOneChange( // unavailability; see: _ = execChangeReplicasTxn - action, _ := rq.allocator.ComputeAction(ctx, conf, desc) + action, allocatorPrio := rq.allocator.ComputeAction(ctx, conf, desc) log.VEventf(ctx, 1, "next replica action: %s", action) switch action { @@ -787,13 +787,13 @@ func (rq *replicateQueue) processOneChange( // Add replicas. case allocatorimpl.AllocatorAddVoter: requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun, + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun, ) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err case allocatorimpl.AllocatorAddNonVoter: requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, dryRun, + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, -1 /* removeIdx */, allocatorimpl.Alive, allocatorPrio, dryRun, ) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err @@ -821,7 +821,7 @@ func (rq *replicateQueue) processOneChange( deadVoterReplicas[0], voterReplicas) } requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err case allocatorimpl.AllocatorReplaceDeadNonVoter: @@ -836,7 +836,7 @@ func (rq *replicateQueue) processOneChange( deadNonVoterReplicas[0], nonVoterReplicas) } requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Dead, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) return requeue, err @@ -854,7 +854,7 @@ func (rq *replicateQueue) processOneChange( decommissioningVoterReplicas[0], voterReplicas) } requeue, err := rq.addOrReplaceVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} @@ -872,7 +872,7 @@ func (rq *replicateQueue) processOneChange( decommissioningNonVoterReplicas[0], nonVoterReplicas) } requeue, err := rq.addOrReplaceNonVoters( - ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, dryRun) + ctx, repl, liveVoterReplicas, liveNonVoterReplicas, removeIdx, allocatorimpl.Decommissioning, allocatorPrio, dryRun) rq.metrics.trackResultByAllocatorAction(action, err, dryRun) if err != nil { return requeue, decommissionPurgatoryError{err} @@ -919,6 +919,7 @@ func (rq *replicateQueue) processOneChange( repl, voterReplicas, nonVoterReplicas, + allocatorPrio, canTransferLeaseFrom, scatter, dryRun, @@ -963,6 +964,7 @@ func (rq *replicateQueue) addOrReplaceVoters( liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, replicaStatus allocatorimpl.ReplicaStatus, + allocatorPriority float64, dryRun bool, ) (requeue bool, _ error) { desc, conf := repl.DescAndSpanConfig() @@ -991,7 +993,7 @@ func (rq *replicateQueue) addOrReplaceVoters( // we're removing it (i.e. dead or decommissioning). If we left the replica in // the slice, the allocator would not be guaranteed to pick a replica that // fills the gap removeRepl leaves once it's gone. - newVoter, details, err := rq.allocator.AllocateVoter(ctx, conf, remainingLiveVoters, remainingLiveNonVoters) + newVoter, details, err := rq.allocator.AllocateVoter(ctx, conf, remainingLiveVoters, remainingLiveNonVoters, replicaStatus) if err != nil { return false, err } @@ -1023,7 +1025,7 @@ func (rq *replicateQueue) addOrReplaceVoters( oldPlusNewReplicas, roachpb.ReplicaDescriptor{NodeID: newVoter.NodeID, StoreID: newVoter.StoreID}, ) - _, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters) + _, _, err := rq.allocator.AllocateVoter(ctx, conf, oldPlusNewReplicas, remainingLiveNonVoters, replicaStatus) if err != nil { // It does not seem possible to go to the next odd replica state. Note // that AllocateVoter returns an allocatorError (a PurgatoryError) @@ -1082,6 +1084,7 @@ func (rq *replicateQueue) addOrReplaceVoters( ops, desc, kvserverpb.SnapshotRequest_RECOVERY, + allocatorPriority, kvserverpb.ReasonRangeUnderReplicated, details, dryRun, @@ -1101,12 +1104,13 @@ func (rq *replicateQueue) addOrReplaceNonVoters( liveVoterReplicas, liveNonVoterReplicas []roachpb.ReplicaDescriptor, removeIdx int, replicaStatus allocatorimpl.ReplicaStatus, + allocatorPrio float64, dryRun bool, ) (requeue bool, _ error) { desc, conf := repl.DescAndSpanConfig() existingNonVoters := desc.Replicas().NonVoterDescriptors() - newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, conf, liveVoterReplicas, liveNonVoterReplicas) + newNonVoter, details, err := rq.allocator.AllocateNonVoter(ctx, conf, liveVoterReplicas, liveNonVoterReplicas, replicaStatus) if err != nil { return false, err } @@ -1138,6 +1142,7 @@ func (rq *replicateQueue) addOrReplaceNonVoters( ops, desc, kvserverpb.SnapshotRequest_RECOVERY, + allocatorPrio, kvserverpb.ReasonRangeUnderReplicated, details, dryRun, @@ -1326,6 +1331,7 @@ func (rq *replicateQueue) removeVoter( roachpb.MakeReplicationChanges(roachpb.REMOVE_VOTER, removeVoter), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonRangeOverReplicated, details, dryRun, @@ -1369,7 +1375,8 @@ func (rq *replicateQueue) removeNonVoter( repl, roachpb.MakeReplicationChanges(roachpb.REMOVE_NON_VOTER, target), desc, - kvserverpb.SnapshotRequest_UNKNOWN, + kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonRangeOverReplicated, details, dryRun, @@ -1429,6 +1436,7 @@ func (rq *replicateQueue) removeDecommissioning( roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonStoreDecommissioning, "", dryRun, ); err != nil { return false, err @@ -1475,6 +1483,7 @@ func (rq *replicateQueue) removeDead( roachpb.MakeReplicationChanges(targetType.RemoveChangeType(), target), desc, kvserverpb.SnapshotRequest_UNKNOWN, // unused + 0.0, // unused kvserverpb.ReasonStoreDead, "", dryRun, @@ -1488,6 +1497,7 @@ func (rq *replicateQueue) considerRebalance( ctx context.Context, repl *Replica, existingVoters, existingNonVoters []roachpb.ReplicaDescriptor, + allocatorPrio float64, canTransferLeaseFrom func(ctx context.Context, repl *Replica) bool, scatter, dryRun bool, ) (requeue bool, _ error) { @@ -1574,6 +1584,7 @@ func (rq *replicateQueue) considerRebalance( chgs, desc, kvserverpb.SnapshotRequest_REBALANCE, + allocatorPrio, kvserverpb.ReasonRebalance, details, dryRun, @@ -1794,6 +1805,7 @@ func (rq *replicateQueue) changeReplicas( chgs roachpb.ReplicationChanges, desc *roachpb.RangeDescriptor, priority kvserverpb.SnapshotRequest_Priority, + allocatorPriority float64, reason kvserverpb.RangeLogEventReason, details string, dryRun bool, @@ -1804,7 +1816,10 @@ func (rq *replicateQueue) changeReplicas( // NB: this calls the impl rather than ChangeReplicas because // the latter traps tests that try to call it while the replication // queue is active. - if _, err := repl.changeReplicasImpl(ctx, desc, priority, reason, details, chgs); err != nil { + if _, err := repl.changeReplicasImpl( + ctx, desc, priority, kvserverpb.SnapshotRequest_REPLICATE_QUEUE, allocatorPriority, reason, + details, chgs, + ); err != nil { return err } rangeUsageInfo := rangeUsageInfoForRepl(repl) diff --git a/pkg/kv/kvserver/stores.go b/pkg/kv/kvserver/stores.go index fde593a1c248..60cdd1b6d8fc 100644 --- a/pkg/kv/kvserver/stores.go +++ b/pkg/kv/kvserver/stores.go @@ -13,6 +13,7 @@ package kvserver import ( "context" "fmt" + math "math" "sync" "unsafe" @@ -364,7 +365,7 @@ func SynthesizeClusterVersionFromEngines( origin string } - maxPossibleVersion := roachpb.Version{Major: 999999} // Sort above any real version. + maxPossibleVersion := roachpb.Version{Major: math.MaxInt32} // Sort above any real version. minStoreVersion := originVersion{ Version: maxPossibleVersion, origin: "(no store)", diff --git a/pkg/sql/conn_executor_exec.go b/pkg/sql/conn_executor_exec.go index ade924808a02..e0512349e242 100644 --- a/pkg/sql/conn_executor_exec.go +++ b/pkg/sql/conn_executor_exec.go @@ -2296,6 +2296,7 @@ func (ex *connExecutor) recordTransactionFinish( RowsRead: ex.extraTxnState.rowsRead, RowsWritten: ex.extraTxnState.rowsWritten, BytesRead: ex.extraTxnState.bytesRead, + Priority: ex.state.priority, } if ex.server.cfg.TestingKnobs.OnRecordTxnFinish != nil { diff --git a/pkg/sql/crdb_internal.go b/pkg/sql/crdb_internal.go index d8a7ee973a36..ec22b92699a9 100644 --- a/pkg/sql/crdb_internal.go +++ b/pkg/sql/crdb_internal.go @@ -5664,7 +5664,9 @@ CREATE TABLE crdb_internal.active_range_feeds ( range_start STRING, range_end STRING, resolved STRING, - last_event_utc INT + last_event_utc INT, + num_errs INT, + last_err STRING );`, populate: func(ctx context.Context, p *planner, _ catalog.DatabaseDescriptor, addRow func(...tree.Datum) error) error { return p.execCfg.DistSender.ForEachActiveRangeFeed( @@ -5675,6 +5677,12 @@ CREATE TABLE crdb_internal.active_range_feeds ( } else { lastEvent = tree.NewDInt(tree.DInt(rf.LastValueReceived.UTC().UnixNano())) } + var lastErr tree.Datum + if rf.LastErr == nil { + lastErr = tree.DNull + } else { + lastErr = tree.NewDString(rf.LastErr.Error()) + } return addRow( tree.NewDInt(tree.DInt(rfCtx.ID)), @@ -5688,6 +5696,8 @@ CREATE TABLE crdb_internal.active_range_feeds ( tree.NewDString(keys.PrettyPrint(nil /* valDirs */, rf.Span.EndKey)), tree.NewDString(rf.Resolved.AsOfSystemTime()), lastEvent, + tree.NewDInt(tree.DInt(rf.NumErrs)), + lastErr, ) }, ) @@ -6344,7 +6354,7 @@ CREATE TABLE crdb_internal.%s ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING, exec_node_ids INT[] NOT NULL, @@ -6456,7 +6466,7 @@ func populateExecutionInsights( tree.NewDString(insight.Statement.PlanGist), tree.NewDInt(tree.DInt(insight.Statement.RowsRead)), tree.NewDInt(tree.DInt(insight.Statement.RowsWritten)), - tree.NewDFloat(tree.DFloat(insight.Transaction.UserPriority)), + tree.NewDString(insight.Transaction.UserPriority), tree.NewDInt(tree.DInt(insight.Statement.Retries)), autoRetryReason, execNodeIDs, diff --git a/pkg/sql/crdb_internal_test.go b/pkg/sql/crdb_internal_test.go index 6b184003a87f..3208886a1118 100644 --- a/pkg/sql/crdb_internal_test.go +++ b/pkg/sql/crdb_internal_test.go @@ -923,7 +923,7 @@ func TestIsAtLeastVersion(t *testing.T) { errorRE string }{ {version: "21.2", expected: "true"}, - {version: "99.2", expected: "false"}, + {version: "1000099.2", expected: "false"}, {version: "foo", errorRE: ".*invalid version.*"}, } { query := fmt.Sprintf("SELECT crdb_internal.is_at_least_version('%s')", tc.version) diff --git a/pkg/sql/logictest/testdata/logic_test/crdb_internal b/pkg/sql/logictest/testdata/logic_test/crdb_internal index 3e790d4f2350..46f1f3a07cbc 100644 --- a/pkg/sql/logictest/testdata/logic_test/crdb_internal +++ b/pkg/sql/logictest/testdata/logic_test/crdb_internal @@ -534,7 +534,7 @@ select crdb_internal.get_vmodule() query T select regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''); ---- -22.1 +1000022.1 query ITTT colnames select node_id, component, field, regexp_replace(regexp_replace(value, '^\d+$', ''), e':\\d+', ':') as value from crdb_internal.node_runtime_info @@ -693,7 +693,7 @@ select * from crdb_internal.node_inflight_trace_spans query T select regexp_replace(crdb_internal.node_executable_version()::string, '(-\d+)?$', ''); ---- -22.1 +1000022.1 user root diff --git a/pkg/sql/logictest/testdata/logic_test/create_statements b/pkg/sql/logictest/testdata/logic_test/create_statements index 7c528e201e62..2bb5809dbd04 100644 --- a/pkg/sql/logictest/testdata/logic_test/create_statements +++ b/pkg/sql/logictest/testdata/logic_test/create_statements @@ -43,7 +43,9 @@ CREATE TABLE crdb_internal.active_range_feeds ( range_start STRING NULL, range_end STRING NULL, resolved STRING NULL, - last_event_utc INT8 NULL + last_event_utc INT8 NULL, + num_errs INT8 NULL, + last_err STRING NULL ) CREATE TABLE crdb_internal.active_range_feeds ( id INT8 NULL, tags STRING NULL, @@ -55,7 +57,9 @@ CREATE TABLE crdb_internal.active_range_feeds ( range_start STRING NULL, range_end STRING NULL, resolved STRING NULL, - last_event_utc INT8 NULL + last_event_utc INT8 NULL, + num_errs INT8 NULL, + last_err STRING NULL ) {} {} CREATE TABLE crdb_internal.backward_dependencies ( descriptor_id INT8 NULL, @@ -261,7 +265,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, @@ -285,7 +289,7 @@ CREATE TABLE crdb_internal.cluster_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, @@ -977,7 +981,7 @@ CREATE TABLE crdb_internal.node_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, @@ -1001,7 +1005,7 @@ CREATE TABLE crdb_internal.node_execution_insights ( plan_gist STRING NOT NULL, rows_read INT8 NOT NULL, rows_written INT8 NOT NULL, - priority FLOAT8 NOT NULL, + priority STRING NOT NULL, retries INT8 NOT NULL, last_retry_reason STRING NULL, exec_node_ids INT8[] NOT NULL, diff --git a/pkg/sql/sqlstats/insights/insights.proto b/pkg/sql/sqlstats/insights/insights.proto index 4b053493b94a..eebfbe115386 100644 --- a/pkg/sql/sqlstats/insights/insights.proto +++ b/pkg/sql/sqlstats/insights/insights.proto @@ -56,7 +56,7 @@ message Transaction { [(gogoproto.customname) = "FingerprintID", (gogoproto.customtype) = "github.com/cockroachdb/cockroach/pkg/roachpb.TransactionFingerprintID", (gogoproto.nullable) = false]; - double user_priority = 3; + string user_priority = 3; } message Statement { diff --git a/pkg/sql/sqlstats/insights/integration/insights_test.go b/pkg/sql/sqlstats/insights/integration/insights_test.go index 129aa003ecc0..2c4aa4da337d 100644 --- a/pkg/sql/sqlstats/insights/integration/insights_test.go +++ b/pkg/sql/sqlstats/insights/integration/insights_test.go @@ -123,6 +123,112 @@ func TestInsightsIntegration(t *testing.T) { }, 1*time.Second) } +func TestInsightsPriorityIntegration(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + const appName = "TestInsightsPriorityIntegration" + + // Start the cluster. (One node is sufficient; the outliers system is currently in-memory only.) + ctx := context.Background() + settings := cluster.MakeTestingClusterSettings() + args := base.TestClusterArgs{ServerArgs: base.TestServerArgs{Settings: settings}} + tc := testcluster.StartTestCluster(t, 1, args) + defer tc.Stopper().Stop(ctx) + conn := tc.ServerConn(0) + + // Enable detection by setting a latencyThreshold > 0. + latencyThreshold := 50 * time.Millisecond + insights.LatencyThreshold.Override(ctx, &settings.SV, latencyThreshold) + + _, err := conn.ExecContext(ctx, "SET SESSION application_name=$1", appName) + require.NoError(t, err) + + _, err = conn.Exec("CREATE TABLE t (id string, s string);") + require.NoError(t, err) + + queryDelayInSeconds := 2 * latencyThreshold.Seconds() + // Execute a "long-running" statement, running longer than our latencyThreshold. + _, err = conn.ExecContext(ctx, "SELECT pg_sleep($1)", queryDelayInSeconds) + require.NoError(t, err) + + var priorities = []struct { + setPriorityQuery string + query string + queryNoValues string + expectedPriorityValue string + }{ + { + setPriorityQuery: "SET TRANSACTION PRIORITY LOW", + query: "INSERT INTO t(id, s) VALUES ('test', 'originalValue')", + queryNoValues: "INSERT INTO t(id, s) VALUES ('_', '_')", + expectedPriorityValue: "low", + }, + { + setPriorityQuery: "SET TRANSACTION PRIORITY NORMAL", + query: "UPDATE t set s = 'updatedValue' where id = 'test'", + queryNoValues: "UPDATE t SET s = '_' WHERE id = '_'", + expectedPriorityValue: "normal", + }, + { + setPriorityQuery: "SELECT 1", // use a dummy query to validate default scenario + query: "UPDATE t set s = 'updatedValue'", + queryNoValues: "UPDATE t SET s = '_'", + expectedPriorityValue: "normal", + }, + { + setPriorityQuery: "SET TRANSACTION PRIORITY HIGH", + query: "DELETE FROM t WHERE t.s = 'originalValue'", + queryNoValues: "DELETE FROM t WHERE t.s = '_'", + expectedPriorityValue: "high", + }, + } + + for _, p := range priorities { + testutils.SucceedsWithin(t, func() error { + tx, errTxn := conn.BeginTx(ctx, &gosql.TxOptions{}) + require.NoError(t, errTxn) + + _, errTxn = tx.ExecContext(ctx, p.setPriorityQuery) + require.NoError(t, errTxn) + + _, errTxn = tx.ExecContext(ctx, p.query) + require.NoError(t, errTxn) + + _, errTxn = tx.ExecContext(ctx, "select pg_sleep(.1);") + require.NoError(t, errTxn) + errTxn = tx.Commit() + require.NoError(t, errTxn) + return nil + }, 2*time.Second) + + testutils.SucceedsWithin(t, func() error { + row := conn.QueryRowContext(ctx, "SELECT "+ + "query, "+ + "priority "+ + "FROM crdb_internal.node_execution_insights where "+ + "app_name = $1 and query = $2 ", appName, p.queryNoValues) + + var query, priority string + err = row.Scan(&query, &priority) + + if err != nil { + return err + } + + if query != p.queryNoValues { + return fmt.Errorf("expected '%s', but was %s", p.queryNoValues, query) + } + + if priority != p.expectedPriorityValue { + return fmt.Errorf("expected '%s', but was %s", p.expectedPriorityValue, priority) + } + + return nil + }, 2*time.Second) + } +} + func TestInsightsIntegrationForContention(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) diff --git a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go index d5faea4db507..4c5056637097 100644 --- a/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go +++ b/pkg/sql/sqlstats/ssmemstorage/ss_mem_writer.go @@ -313,7 +313,8 @@ func (s *Container) RecordTransaction( s.insights.ObserveTransaction(value.SessionID, &insights.Transaction{ ID: value.TransactionID, - FingerprintID: key}) + FingerprintID: key, + UserPriority: value.Priority.String()}) return nil } diff --git a/pkg/sql/sqlstats/ssprovider.go b/pkg/sql/sqlstats/ssprovider.go index 650781052435..eb4d7b92a66a 100644 --- a/pkg/sql/sqlstats/ssprovider.go +++ b/pkg/sql/sqlstats/ssprovider.go @@ -235,4 +235,5 @@ type RecordedTxnStats struct { RowsRead int64 RowsWritten int64 BytesRead int64 + Priority roachpb.UserPriority } diff --git a/pkg/sql/ttl/ttlbase/BUILD.bazel b/pkg/sql/ttl/ttlbase/BUILD.bazel new file mode 100644 index 000000000000..b3cb815e5a72 --- /dev/null +++ b/pkg/sql/ttl/ttlbase/BUILD.bazel @@ -0,0 +1,12 @@ +load("//build/bazelutil/unused_checker:unused.bzl", "get_x_data") +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "ttlbase", + srcs = ["ttl_helpers.go"], + importpath = "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase", + visibility = ["//visibility:public"], + deps = ["//pkg/sql/lexbase"], +) + +get_x_data(name = "get_x_data") diff --git a/pkg/sql/ttl/ttlbase/ttl_helpers.go b/pkg/sql/ttl/ttlbase/ttl_helpers.go new file mode 100644 index 000000000000..a7d1dac2818a --- /dev/null +++ b/pkg/sql/ttl/ttlbase/ttl_helpers.go @@ -0,0 +1,52 @@ +// Copyright 2022 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +package ttlbase + +import ( + "bytes" + "time" + + "github.com/cockroachdb/cockroach/pkg/sql/lexbase" +) + +// DefaultAOSTDuration is the default duration to use in the AS OF SYSTEM TIME +// clause used in the SELECT query. +const DefaultAOSTDuration = -time.Second * 30 + +// SelectTemplate is the format string used to build SELECT queries for the +// TTL job. +const SelectTemplate = `SELECT %[1]s FROM [%[2]d AS tbl_name] +AS OF SYSTEM TIME %[3]s +WHERE %[4]s <= $1 +%[5]s%[6]s +ORDER BY %[1]s +LIMIT %[7]v` + +// DeleteTemplate is the format string used to build DELETE queries for the +// TTL job. +const DeleteTemplate = `DELETE FROM [%d AS tbl_name] +WHERE %s <= $1 +AND (%s) IN (%s)` + +// MakeColumnNamesSQL converts columns into an escape string +// for an order by clause, e.g.: +// {"a", "b"} => a, b +// {"escape-me", "b"} => "escape-me", b +func MakeColumnNamesSQL(columns []string) string { + var b bytes.Buffer + for i, pkColumn := range columns { + if i > 0 { + b.WriteString(", ") + } + lexbase.EncodeRestrictedSQLIdent(&b, pkColumn, lexbase.EncNoFlags) + } + return b.String() +} diff --git a/pkg/sql/ttl/ttljob/BUILD.bazel b/pkg/sql/ttl/ttljob/BUILD.bazel index 7b43ed3e990c..9edce1de6384 100644 --- a/pkg/sql/ttl/ttljob/BUILD.bazel +++ b/pkg/sql/ttl/ttljob/BUILD.bazel @@ -30,7 +30,6 @@ go_library( "//pkg/sql/catalog/descs", "//pkg/sql/execinfra", "//pkg/sql/execinfrapb", - "//pkg/sql/lexbase", "//pkg/sql/physicalplan", "//pkg/sql/rowenc", "//pkg/sql/rowexec", @@ -39,6 +38,7 @@ go_library( "//pkg/sql/sessiondatapb", "//pkg/sql/sqltelemetry", "//pkg/sql/sqlutil", + "//pkg/sql/ttl/ttlbase", "//pkg/sql/types", "//pkg/util/ctxgroup", "//pkg/util/log", @@ -79,6 +79,7 @@ go_test( "//pkg/sql/parser", "//pkg/sql/randgen", "//pkg/sql/sem/tree", + "//pkg/sql/ttl/ttlbase", "//pkg/sql/types", "//pkg/testutils", "//pkg/testutils/serverutils", diff --git a/pkg/sql/ttl/ttljob/ttljob.go b/pkg/sql/ttl/ttljob/ttljob.go index ab988b9be69f..d8220efa644c 100644 --- a/pkg/sql/ttl/ttljob/ttljob.go +++ b/pkg/sql/ttl/ttljob/ttljob.go @@ -31,6 +31,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/physicalplan" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqltelemetry" + "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase" "github.com/cockroachdb/cockroach/pkg/sql/types" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -103,7 +104,7 @@ func (t rowLevelTTLResumer) Resume(ctx context.Context, execCtx interface{}) err details := t.job.Details().(jobspb.RowLevelTTLDetails) - aostDuration := -time.Second * 30 + aostDuration := ttlbase.DefaultAOSTDuration if knobs.AOSTDuration != nil { aostDuration = *knobs.AOSTDuration } diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder.go b/pkg/sql/ttl/ttljob/ttljob_query_builder.go index 547388ecacc0..aba6791ebd7d 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder.go @@ -11,7 +11,6 @@ package ttljob import ( - "bytes" "context" "fmt" "time" @@ -20,11 +19,11 @@ import ( "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" - "github.com/cockroachdb/cockroach/pkg/sql/lexbase" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sessiondata" "github.com/cockroachdb/cockroach/pkg/sql/sessiondatapb" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase" "github.com/cockroachdb/errors" ) @@ -91,8 +90,8 @@ func makeSelectQueryBuilder( cachedArgs: cachedArgs, isFirst: true, - pkColumnNamesSQL: makeColumnNamesSQL(pkColumns), - endPKColumnNamesSQL: makeColumnNamesSQL(pkColumns[:len(endPK)]), + pkColumnNamesSQL: ttlbase.MakeColumnNamesSQL(pkColumns), + endPKColumnNamesSQL: ttlbase.MakeColumnNamesSQL(pkColumns[:len(endPK)]), } } @@ -117,7 +116,7 @@ func (b *selectQueryBuilder) buildQuery() string { var filterClause string if !b.isFirst { // After the first query, we always want (col1, ...) > (cursor_col_1, ...) - filterClause = fmt.Sprintf(" AND (%s) > (", b.pkColumnNamesSQL) + filterClause = fmt.Sprintf("AND (%s) > (", b.pkColumnNamesSQL) for i := range b.pkColumns { if i > 0 { filterClause += ", " @@ -129,7 +128,7 @@ func (b *selectQueryBuilder) buildQuery() string { filterClause += ")" } else if len(startPK) > 0 { // For the the first query, we want (col1, ...) >= (cursor_col_1, ...) - filterClause = fmt.Sprintf(" AND (%s) >= (", makeColumnNamesSQL(b.pkColumns[:len(startPK)])) + filterClause = fmt.Sprintf("AND (%s) >= (", ttlbase.MakeColumnNamesSQL(b.pkColumns[:len(startPK)])) for i := range startPK { if i > 0 { filterClause += ", " @@ -142,11 +141,7 @@ func (b *selectQueryBuilder) buildQuery() string { } return fmt.Sprintf( - `SELECT %[1]s FROM [%[2]d AS tbl_name] -AS OF SYSTEM TIME %[3]s -WHERE %[4]s <= $1%[5]s%[6]s -ORDER BY %[1]s -LIMIT %[7]d`, + ttlbase.SelectTemplate, b.pkColumnNamesSQL, b.tableID, tree.MustMakeDTimestampTZ(b.aost, time.Microsecond), @@ -254,7 +249,7 @@ func makeDeleteQueryBuilder( } func (b *deleteQueryBuilder) buildQuery(numRows int) string { - columnNamesSQL := makeColumnNamesSQL(b.pkColumns) + columnNamesSQL := ttlbase.MakeColumnNamesSQL(b.pkColumns) var placeholderStr string for i := 0; i < numRows; i++ { if i > 0 { @@ -271,7 +266,7 @@ func (b *deleteQueryBuilder) buildQuery(numRows int) string { } return fmt.Sprintf( - `DELETE FROM [%d AS tbl_name] WHERE %s <= $1 AND (%s) IN (%s)`, + ttlbase.DeleteTemplate, b.tableID, b.ttlExpr, columnNamesSQL, @@ -316,18 +311,3 @@ func (b *deleteQueryBuilder) run( ) return int64(rowCount), err } - -// makeColumnNamesSQL converts columns into an escape string -// for an order by clause, e.g.: -// {"a", "b"} => a, b -// {"escape-me", "b"} => "escape-me", b -func makeColumnNamesSQL(columns []string) string { - var b bytes.Buffer - for i, pkColumn := range columns { - if i > 0 { - b.WriteString(", ") - } - lexbase.EncodeRestrictedSQLIdent(&b, pkColumn, lexbase.EncNoFlags) - } - return b.String() -} diff --git a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go index fc518f116814..bbb3707d61b4 100644 --- a/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go +++ b/pkg/sql/ttl/ttljob/ttljob_query_builder_test.go @@ -16,6 +16,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/catalog/colinfo" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" "github.com/stretchr/testify/require" @@ -56,7 +57,8 @@ func TestSelectQueryBuilder(t *testing.T) { { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) >= ($4, $5) AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) >= ($4, $5) AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -72,7 +74,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -88,7 +91,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -117,6 +121,7 @@ LIMIT 2`, expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' WHERE crdb_internal_expiration <= $1 + ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -130,7 +135,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -145,7 +151,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -175,7 +182,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1) >= ($3) AND (col1) < ($2) +WHERE crdb_internal_expiration <= $1 +AND (col1) >= ($3) AND (col1) < ($2) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -191,7 +199,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($3, $4) AND (col1) < ($2) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($3, $4) AND (col1) < ($2) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -207,7 +216,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($3, $4) AND (col1) < ($2) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($3, $4) AND (col1) < ($2) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -237,7 +247,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 + AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -252,7 +263,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -268,7 +280,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($4, $5) AND (col1, col2) < ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -298,7 +311,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) >= ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) >= ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -313,7 +327,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -328,7 +343,8 @@ LIMIT 2`, { expectedQuery: `SELECT col1, col2 FROM [1 AS tbl_name] AS OF SYSTEM TIME '2000-01-01 13:30:45+00:00' -WHERE crdb_internal_expiration <= $1 AND (col1, col2) > ($2, $3) +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) > ($2, $3) ORDER BY col1, col2 LIMIT 2`, expectedArgs: []interface{}{ @@ -382,7 +398,9 @@ func TestDeleteQueryBuilder(t *testing.T) { {tree.NewDInt(10), tree.NewDInt(15)}, {tree.NewDInt(12), tree.NewDInt(16)}, }, - expectedQuery: `DELETE FROM [1 AS tbl_name] WHERE crdb_internal_expiration <= $1 AND (col1, col2) IN (($2, $3), ($4, $5))`, + expectedQuery: `DELETE FROM [1 AS tbl_name] +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) IN (($2, $3), ($4, $5))`, expectedArgs: []interface{}{ mockTime, tree.NewDInt(10), tree.NewDInt(15), @@ -401,7 +419,9 @@ func TestDeleteQueryBuilder(t *testing.T) { {tree.NewDInt(12), tree.NewDInt(16)}, {tree.NewDInt(12), tree.NewDInt(18)}, }, - expectedQuery: `DELETE FROM [1 AS tbl_name] WHERE crdb_internal_expiration <= $1 AND (col1, col2) IN (($2, $3), ($4, $5), ($6, $7))`, + expectedQuery: `DELETE FROM [1 AS tbl_name] +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) IN (($2, $3), ($4, $5), ($6, $7))`, expectedArgs: []interface{}{ mockTime, tree.NewDInt(10), tree.NewDInt(15), @@ -415,7 +435,9 @@ func TestDeleteQueryBuilder(t *testing.T) { {tree.NewDInt(112), tree.NewDInt(116)}, {tree.NewDInt(112), tree.NewDInt(118)}, }, - expectedQuery: `DELETE FROM [1 AS tbl_name] WHERE crdb_internal_expiration <= $1 AND (col1, col2) IN (($2, $3), ($4, $5), ($6, $7))`, + expectedQuery: `DELETE FROM [1 AS tbl_name] +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) IN (($2, $3), ($4, $5), ($6, $7))`, expectedArgs: []interface{}{ mockTime, tree.NewDInt(110), tree.NewDInt(115), @@ -427,7 +449,9 @@ func TestDeleteQueryBuilder(t *testing.T) { rows: []tree.Datums{ {tree.NewDInt(1210), tree.NewDInt(1215)}, }, - expectedQuery: `DELETE FROM [1 AS tbl_name] WHERE crdb_internal_expiration <= $1 AND (col1, col2) IN (($2, $3))`, + expectedQuery: `DELETE FROM [1 AS tbl_name] +WHERE crdb_internal_expiration <= $1 +AND (col1, col2) IN (($2, $3))`, expectedArgs: []interface{}{ mockTime, tree.NewDInt(1210), tree.NewDInt(1215), @@ -464,7 +488,7 @@ func TestMakeColumnNamesSQL(t *testing.T) { for _, tc := range testCases { t.Run(tc.expected, func(t *testing.T) { - require.Equal(t, tc.expected, makeColumnNamesSQL(tc.cols)) + require.Equal(t, tc.expected, ttlbase.MakeColumnNamesSQL(tc.cols)) }) } } diff --git a/pkg/sql/ttl/ttlschedule/BUILD.bazel b/pkg/sql/ttl/ttlschedule/BUILD.bazel index b22e534a08f7..6ec47978d9ee 100644 --- a/pkg/sql/ttl/ttlschedule/BUILD.bazel +++ b/pkg/sql/ttl/ttlschedule/BUILD.bazel @@ -13,6 +13,7 @@ go_library( "//pkg/scheduledjobs", "//pkg/security/username", "//pkg/sql", + "//pkg/sql/catalog", "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/descs", "//pkg/sql/pgwire/pgcode", @@ -20,6 +21,7 @@ go_library( "//pkg/sql/sem/tree", "//pkg/sql/sqlerrors", "//pkg/sql/sqlutil", + "//pkg/sql/ttl/ttlbase", "//pkg/util/metric", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/sql/ttl/ttlschedule/ttlschedule.go b/pkg/sql/ttl/ttlschedule/ttlschedule.go index 4d9589de875c..d7f9c0a40f5f 100644 --- a/pkg/sql/ttl/ttlschedule/ttlschedule.go +++ b/pkg/sql/ttl/ttlschedule/ttlschedule.go @@ -20,6 +20,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/scheduledjobs" "github.com/cockroachdb/cockroach/pkg/security/username" "github.com/cockroachdb/cockroach/pkg/sql" + "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode" @@ -27,6 +28,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/sql/sqlerrors" "github.com/cockroachdb/cockroach/pkg/sql/sqlutil" + "github.com/cockroachdb/cockroach/pkg/sql/ttl/ttlbase" "github.com/cockroachdb/cockroach/pkg/util/metric" "github.com/cockroachdb/cockroach/pkg/util/timeutil" "github.com/cockroachdb/errors" @@ -209,6 +211,33 @@ func (s rowLevelTTLExecutor) GetCreateScheduleStatement( return fmt.Sprintf(`ALTER TABLE %s WITH (ttl = 'on', ...)`, tn.FQString()), nil } +func makeTTLJobDescription(tableDesc catalog.TableDescriptor, tn *tree.TableName) string { + pkColumns := tableDesc.GetPrimaryIndex().IndexDesc().KeyColumnNames + pkColumnNamesSQL := ttlbase.MakeColumnNamesSQL(pkColumns) + selectQuery := fmt.Sprintf( + ttlbase.SelectTemplate, + pkColumnNamesSQL, + tableDesc.GetID(), + fmt.Sprintf("'%v'", ttlbase.DefaultAOSTDuration), + "", + fmt.Sprintf("AND (%s) > ()", pkColumnNamesSQL), + fmt.Sprintf(" AND (%s) < ()", pkColumnNamesSQL), + "", + ) + deleteQuery := fmt.Sprintf( + ttlbase.DeleteTemplate, + tableDesc.GetID(), + "", + pkColumnNamesSQL, + "", + ) + return fmt.Sprintf(`ttl for %s +-- for each range, iterate to find rows: +%s +-- then delete with: +%s`, tn.FQString(), selectQuery, deleteQuery) +} + func createRowLevelTTLJob( ctx context.Context, createdByInfo *jobs.CreatedByInfo, @@ -226,7 +255,7 @@ func createRowLevelTTLJob( return 0, err } record := jobs.Record{ - Description: fmt.Sprintf("ttl for %s", tn.FQString()), + Description: makeTTLJobDescription(tableDesc, tn), Username: username.NodeUserName(), Details: jobspb.RowLevelTTLDetails{ TableID: ttlArgs.TableID, diff --git a/pkg/upgrade/upgrades/precondition_before_starting_an_upgrade_external_test.go b/pkg/upgrade/upgrades/precondition_before_starting_an_upgrade_external_test.go index c2cf38c85d25..413bc086e754 100644 --- a/pkg/upgrade/upgrades/precondition_before_starting_an_upgrade_external_test.go +++ b/pkg/upgrade/upgrades/precondition_before_starting_an_upgrade_external_test.go @@ -14,6 +14,7 @@ import ( "context" gosql "database/sql" "encoding/hex" + "strings" "testing" "github.com/cockroachdb/cockroach/pkg/base" @@ -107,7 +108,7 @@ func TestPreconditionBeforeStartingAnUpgrade(t *testing.T) { "There exists invalid descriptors as listed below. Fix these descriptors before attempting to upgrade again.\n"+ "Invalid descriptor: defaultdb.public.t (104) because 'relation \"t\" (104): invalid depended-on-by relation back reference: referenced descriptor ID 53: referenced descriptor not found'\n"+ "Invalid descriptor: defaultdb.public.temp_tbl (104) because 'no matching name info found in non-dropped relation \"t\"'", - err.Error()) + strings.ReplaceAll(err.Error(), "1000022", "22")) // The cluster version should remain at `v0`. tdb.CheckQueryResults(t, "SHOW CLUSTER SETTING version", [][]string{{v0.String()}}) }) diff --git a/pkg/upgrade/upgrades/wait_for_del_range_in_gc_job_test.go b/pkg/upgrade/upgrades/wait_for_del_range_in_gc_job_test.go index f8c98867e5cd..41e6101f5258 100644 --- a/pkg/upgrade/upgrades/wait_for_del_range_in_gc_job_test.go +++ b/pkg/upgrade/upgrades/wait_for_del_range_in_gc_job_test.go @@ -94,7 +94,7 @@ SELECT count(*) WHERE job_type = 'SCHEMA CHANGE GC' AND status = 'paused'`, [][]string{{"2"}}) - tdb.ExpectErr(t, `verifying precondition for version 22.1-\d+: `+ + tdb.ExpectErr(t, `verifying precondition for version \d*22.1-\d+: `+ `paused GC jobs prevent upgrading GC job behavior: \[\d+ \d+]`, "SET CLUSTER SETTING version = crdb_internal.node_executable_version()")