Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
134347: jobs: don't redact job ID in log tags; don't redact internal executor opName r=rafiss a=rafiss

This will assist with debugging.

fixes #132113
Release note: None

134370: quantize: add rabitq quantization r=mw5h a=andy-kimball

This commit implements RaBitQ quantization according to the algorithm described in this paper:

    "RaBitQ: Quantizing High-Dimensional Vectors with a Theoretical Error Bound
    for Approximate Nearest Neighbor Search" by Jianyang Gao & Cheng Long.

The RaBitQ quantization method provides good accuracy, produces compact codes, provides practical error bounds, is easy to implement, and can be accelerated with fast SIMD instructions. RaBitQ quantization codes use only 1 bit per dimension in the original vector.

Epic: CRDB-42943

Release note: None

Co-authored-by: Rafi Shamim <[email protected]>
Co-authored-by: Andrew Kimball <[email protected]>
  • Loading branch information
3 people committed Nov 7, 2024
3 parents d1af1a8 + 7b7e6c5 + 1ec9302 commit f77bcd0
Show file tree
Hide file tree
Showing 58 changed files with 1,252 additions and 161 deletions.
1 change: 1 addition & 0 deletions pkg/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2301,6 +2301,7 @@ GO_TARGETS = [
"//pkg/sql/vecindex/internal:internal_test",
"//pkg/sql/vecindex/quantize:quantize",
"//pkg/sql/vecindex/quantize:quantize_test",
"//pkg/sql/vecindex/testutils:testutils",
"//pkg/sql/vtable:vtable",
"//pkg/sql:sql",
"//pkg/sql:sql_test",
Expand Down
53 changes: 27 additions & 26 deletions pkg/ccl/backupccl/system_schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// clusterBackupInclusion is an enum that specifies whether a system table
Expand Down Expand Up @@ -107,7 +108,7 @@ func defaultSystemTableRestoreFunc(
ctx context.Context, _ customRestoreFuncDeps, txn isql.Txn, systemTableName, tempTableName string,
) error {
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName)
opName := systemTableName + "-data-deletion"
opName := redact.Sprintf("%s-data-deletion", systemTableName)
log.Eventf(ctx, "clearing data from system table %s with query %q",
systemTableName, deleteQuery)

Expand All @@ -118,7 +119,7 @@ func defaultSystemTableRestoreFunc(

restoreQuery := fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s);",
systemTableName, tempTableName)
opName = systemTableName + "-data-insert"
opName = redact.Sprintf("%s-data-insert", systemTableName)
if _, err := txn.Exec(ctx, opName, txn.KV(), restoreQuery); err != nil {
return errors.Wrapf(err, "inserting data to system.%s", systemTableName)
}
Expand Down Expand Up @@ -151,7 +152,7 @@ func tenantSettingsTableRestoreFunc(

func queryTableRowCount(ctx context.Context, txn isql.Txn, tableName string) (int64, error) {
countQuery := fmt.Sprintf("SELECT count(1) FROM %s", tableName)
row, err := txn.QueryRow(ctx, fmt.Sprintf("count-%s", tableName), txn.KV(), countQuery)
row, err := txn.QueryRow(ctx, redact.Sprintf("count-%s", tableName), txn.KV(), countQuery)
if err != nil {
return 0, errors.Wrapf(err, "counting rows in %q", tableName)
}
Expand Down Expand Up @@ -180,7 +181,7 @@ func usersRestoreFunc(
}

deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName)
opName := systemTableName + "-data-deletion"
opName := redact.Sprintf("%s-data-deletion", systemTableName)
log.Eventf(ctx, "clearing data from system table %s with query %q",
systemTableName, deleteQuery)

Expand Down Expand Up @@ -226,7 +227,7 @@ func usersRestoreFunc(

restoreQuery := fmt.Sprintf("INSERT INTO system.%s VALUES ($1, $2, $3, $4)",
systemTableName)
opName = systemTableName + "-data-insert"
opName = redact.Sprintf("%s-data-insert", systemTableName)
if _, err := txn.Exec(ctx, opName, txn.KV(), restoreQuery, username, password, isRole, id); err != nil {
return errors.Wrapf(err, "inserting data to system.%s", systemTableName)
}
Expand All @@ -253,12 +254,12 @@ func roleMembersRestoreFunc(
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName)
log.Eventf(ctx, "clearing data from system table %s with query %q", systemTableName, deleteQuery)

_, err = txn.Exec(ctx, systemTableName+"-data-deletion", txn.KV(), deleteQuery)
_, err = txn.Exec(ctx, redact.Sprintf("%s-data-deletion", systemTableName), txn.KV(), deleteQuery)
if err != nil {
return errors.Wrapf(err, "deleting data from system.%s", systemTableName)
}

roleMembers, err := txn.QueryBufferedEx(ctx, systemTableName+"-query-all-rows",
roleMembers, err := txn.QueryBufferedEx(ctx, redact.Sprintf("%s-query-all-rows", systemTableName),
txn.KV(), sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(`SELECT * FROM %s`, tempTableName),
)
Expand All @@ -273,7 +274,7 @@ VALUES ($1, $2, $3, (SELECT user_id FROM system.users WHERE username = $1), (SEL
role := tree.MustBeDString(roleMember[0])
member := tree.MustBeDString(roleMember[1])
isAdmin := tree.MustBeDBool(roleMember[2])
if _, err := txn.ExecEx(ctx, systemTableName+"-data-insert",
if _, err := txn.ExecEx(ctx, redact.Sprintf("%s-data-insert", systemTableName),
txn.KV(), sessiondata.NodeUserSessionDataOverride,
restoreQuery, role, member, isAdmin,
); err != nil {
Expand Down Expand Up @@ -301,7 +302,7 @@ func roleOptionsRestoreFunc(
}

deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName)
opName := systemTableName + "-data-deletion"
opName := redact.Sprintf("%s-data-deletion", systemTableName)
log.Eventf(ctx, "clearing data from system table %s with query %q",
systemTableName, deleteQuery)

Expand Down Expand Up @@ -349,7 +350,7 @@ func roleOptionsRestoreFunc(

restoreQuery := fmt.Sprintf("INSERT INTO system.%s VALUES ($1, $2, $3, $4)",
systemTableName)
opName = systemTableName + "-data-insert"
opName = redact.Sprintf("%s-data-insert", systemTableName)
if _, err := txn.Exec(ctx, opName, txn.KV(), restoreQuery, username, option, val, id); err != nil {
return errors.Wrapf(err, "inserting data to system.%s", systemTableName)
}
Expand All @@ -374,12 +375,12 @@ func systemPrivilegesRestoreFunc(
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName)
log.Eventf(ctx, "clearing data from system table %s with query %q", systemTableName, deleteQuery)

_, err = txn.Exec(ctx, systemTableName+"-data-deletion", txn.KV(), deleteQuery)
_, err = txn.Exec(ctx, redact.Sprintf("%s-data-deletion", systemTableName), txn.KV(), deleteQuery)
if err != nil {
return errors.Wrapf(err, "deleting data from system.%s", systemTableName)
}

systemPrivilegesRows, err := txn.QueryBufferedEx(ctx, systemTableName+"-query-all-rows",
systemPrivilegesRows, err := txn.QueryBufferedEx(ctx, redact.Sprintf("%s-query-all-rows", systemTableName),
txn.KV(), sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(`SELECT * FROM %s`, tempTableName),
)
Expand All @@ -397,7 +398,7 @@ VALUES ($1, $2, $3, $4, (
))`,
systemTableName, username.PublicRole, username.PublicRoleID)
for _, row := range systemPrivilegesRows {
if _, err := txn.ExecEx(ctx, systemTableName+"-data-insert",
if _, err := txn.ExecEx(ctx, redact.Sprintf("%s-data-insert", systemTableName),
txn.KV(), sessiondata.NodeUserSessionDataOverride,
restoreQuery, row[0], row[1], row[2], row[3],
); err != nil {
Expand Down Expand Up @@ -425,12 +426,12 @@ func systemDatabaseRoleSettingsRestoreFunc(
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName)
log.Eventf(ctx, "clearing data from system table %s with query %q", systemTableName, deleteQuery)

_, err = txn.Exec(ctx, systemTableName+"-data-deletion", txn.KV(), deleteQuery)
_, err = txn.Exec(ctx, redact.Sprintf("%s-data-deletion", systemTableName), txn.KV(), deleteQuery)
if err != nil {
return errors.Wrapf(err, "deleting data from system.%s", systemTableName)
}

databaseRoleSettingsRows, err := txn.QueryBufferedEx(ctx, systemTableName+"-query-all-rows",
databaseRoleSettingsRows, err := txn.QueryBufferedEx(ctx, redact.Sprintf("%s-query-all-rows", systemTableName),
txn.KV(), sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(`SELECT * FROM %s`, tempTableName),
)
Expand All @@ -448,7 +449,7 @@ VALUES ($1, $2, $3, (
))`,
systemTableName, username.EmptyRole, username.EmptyRoleID)
for _, row := range databaseRoleSettingsRows {
if _, err := txn.ExecEx(ctx, systemTableName+"-data-insert",
if _, err := txn.ExecEx(ctx, redact.Sprintf("%s-data-insert", systemTableName),
txn.KV(), sessiondata.NodeUserSessionDataOverride,
restoreQuery, row[0], row[1], row[2],
); err != nil {
Expand Down Expand Up @@ -476,12 +477,12 @@ func systemExternalConnectionsRestoreFunc(
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName)
log.Eventf(ctx, "clearing data from system table %s with query %q", systemTableName, deleteQuery)

_, err = txn.Exec(ctx, systemTableName+"-data-deletion", txn.KV(), deleteQuery)
_, err = txn.Exec(ctx, redact.Sprintf("%s-data-deletion", systemTableName), txn.KV(), deleteQuery)
if err != nil {
return errors.Wrapf(err, "deleting data from system.%s", systemTableName)
}

externalConnectionsRows, err := txn.QueryBufferedEx(ctx, systemTableName+"-query-all-rows",
externalConnectionsRows, err := txn.QueryBufferedEx(ctx, redact.Sprintf("%s-query-all-rows", systemTableName),
txn.KV(), sessiondata.NodeUserSessionDataOverride,
fmt.Sprintf(`SELECT * FROM %s`, tempTableName),
)
Expand All @@ -493,7 +494,7 @@ func systemExternalConnectionsRestoreFunc(
INSERT INTO system.%s (connection_name, created, updated, connection_type, connection_details, owner, owner_id)
VALUES ($1, $2, $3, $4, $5, $6, (SELECT user_id FROM system.users WHERE username = $6))`, systemTableName)
for _, row := range externalConnectionsRows {
if _, err := txn.ExecEx(ctx, systemTableName+"-data-insert",
if _, err := txn.ExecEx(ctx, redact.Sprintf("%s-data-insert", systemTableName),
txn.KV(), sessiondata.NodeUserSessionDataOverride,
restoreQuery, row[0], row[1], row[2], row[3], row[4], row[5],
); err != nil {
Expand Down Expand Up @@ -547,7 +548,7 @@ func systemTenantSettingsTableRestoreFunc(
systemTableName, tempTableName string,
) error {
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE true", systemTableName)
opName := systemTableName + "-data-deletion"
opName := redact.Sprintf("%s-data-deletion", systemTableName)
log.Eventf(ctx, "clearing data from system table %s with query %q",
systemTableName, deleteQuery)

Expand All @@ -559,7 +560,7 @@ func systemTenantSettingsTableRestoreFunc(
restoreQuery := fmt.Sprintf(
"INSERT INTO system.%s (SELECT * FROM %s WHERE NOT (tenant_id = 0 AND name = 'version'));",
systemTableName, tempTableName)
opName = systemTableName + "-data-insert"
opName = redact.Sprintf("%s-data-insert", systemTableName)
if _, err := txn.Exec(ctx, opName, txn.KV(), restoreQuery); err != nil {
return errors.Wrapf(err, "inserting data to system.%s", systemTableName)
}
Expand All @@ -576,7 +577,7 @@ func settingsRestoreFunc(
systemTableName, tempTableName string,
) error {
deleteQuery := fmt.Sprintf("DELETE FROM system.%s WHERE name <> 'version'", systemTableName)
opName := systemTableName + "-data-deletion"
opName := redact.Sprintf("%s-data-deletion", systemTableName)
log.Eventf(ctx, "clearing data from system table %s with query %q",
systemTableName, deleteQuery)

Expand All @@ -587,7 +588,7 @@ func settingsRestoreFunc(

restoreQuery := fmt.Sprintf("INSERT INTO system.%s (SELECT * FROM %s WHERE name <> 'version');",
systemTableName, tempTableName)
opName = systemTableName + "-data-insert"
opName = redact.Sprintf("%s-data-insert", systemTableName)
if _, err := txn.Exec(ctx, opName, txn.KV(), restoreQuery); err != nil {
return errors.Wrapf(err, "inserting data to system.%s", systemTableName)
}
Expand Down Expand Up @@ -891,7 +892,7 @@ func rekeySystemTable(
}
fmt.Fprintf(&q, "ELSE %s END)::%s", colName, typ)
if _, err := txn.Exec(
ctx, fmt.Sprintf("remap-%s", tempTableName), txn.KV(), q.String(),
ctx, redact.Sprintf("remap-%s", tempTableName), txn.KV(), q.String(),
); err != nil {
return errors.Wrapf(err, "remapping IDs %s", tempTableName)
}
Expand All @@ -904,15 +905,15 @@ func rekeySystemTable(
// ID system tables that we do not restore directly, and thus have no entry
// in our remapping, but the configuration of them (comments, zones, etc) is
// expected to be restored.
if _, err := txn.Exec(ctx, fmt.Sprintf("remap-remove-%s", tempTableName), txn.KV(),
if _, err := txn.Exec(ctx, redact.Sprintf("remap-remove-%s", tempTableName), txn.KV(),
fmt.Sprintf("DELETE FROM %s WHERE %s >= 50 AND %s < %d", tempTableName, colName, colName, offset),
); err != nil {
return errors.Wrapf(err, "remapping IDs %s", tempTableName)
}

// Now slide remapped the IDs back down by offset, to their intended values.
if _, err := txn.Exec(ctx,
fmt.Sprintf("remap-%s-deoffset", tempTableName),
redact.Sprintf("remap-%s-deoffset", tempTableName),
txn.KV(),
fmt.Sprintf("UPDATE %s SET %s = (%s::int - %d)::%s WHERE %s::int >= %d", tempTableName, colName, colName, offset, typ, colName, offset),
); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions pkg/ccl/crosscluster/logical/udf_row_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

type applierDecision string
Expand Down Expand Up @@ -280,7 +281,7 @@ func (aq *applierQuerier) applyDecision(

func (aq *applierQuerier) execParsed(
ctx context.Context,
opName string,
opName redact.RedactableString,
txn *kv.Txn,
ie isql.Executor,
o sessiondata.InternalExecutorOverride,
Expand All @@ -296,7 +297,7 @@ func (aq *applierQuerier) execParsed(

func (aq *applierQuerier) queryRowExParsed(
ctx context.Context,
opName string,
opName redact.RedactableString,
txn *kv.Txn,
ie isql.Executor,
o sessiondata.InternalExecutorOverride,
Expand Down
1 change: 1 addition & 0 deletions pkg/cloud/userfile/filetable/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,6 @@ go_library(
"//pkg/util/ioctx",
"//pkg/util/log",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
],
)
29 changes: 23 additions & 6 deletions pkg/cloud/userfile/filetable/file_table_read_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/ioctx"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/errors"
"github.com/cockroachdb/redact"
)

// ChunkDefaultSize is the default size of each chunk a file will be broken into
Expand All @@ -42,10 +43,10 @@ type FileToTableExecutorRows struct {
// FileToTableSystemExecutor is the interface which defines the methods for the
// SQL query executor used by the FileToTableSystem
type FileToTableSystemExecutor interface {
Query(ctx context.Context, opName, query string,
Query(ctx context.Context, opName redact.RedactableString, query string,
user username.SQLUsername,
qargs ...interface{}) (*FileToTableExecutorRows, error)
Exec(ctx context.Context, opName, query string,
Exec(ctx context.Context, opName redact.RedactableString, query string,
user username.SQLUsername,
qargs ...interface{}) error
}
Expand All @@ -67,7 +68,11 @@ func MakeInternalFileToTableExecutor(db isql.DB) *InternalFileToTableExecutor {

// Query implements the FileToTableSystemExecutor interface.
func (i *InternalFileToTableExecutor) Query(
ctx context.Context, opName, query string, user username.SQLUsername, qargs ...interface{},
ctx context.Context,
opName redact.RedactableString,
query string,
user username.SQLUsername,
qargs ...interface{},
) (*FileToTableExecutorRows, error) {
result := FileToTableExecutorRows{}
var err error
Expand All @@ -81,7 +86,11 @@ func (i *InternalFileToTableExecutor) Query(

// Exec implements the FileToTableSystemExecutor interface.
func (i *InternalFileToTableExecutor) Exec(
ctx context.Context, opName, query string, user username.SQLUsername, qargs ...interface{},
ctx context.Context,
opName redact.RedactableString,
query string,
user username.SQLUsername,
qargs ...interface{},
) error {
_, err := i.ie.ExecEx(ctx, opName, nil,
sessiondata.InternalExecutorOverride{User: user}, query, qargs...)
Expand All @@ -104,7 +113,11 @@ func MakeSQLConnFileToTableExecutor(executor cloud.SQLConnI) *SQLConnFileToTable

// Query implements the FileToTableSystemExecutor interface.
func (i *SQLConnFileToTableExecutor) Query(
ctx context.Context, _, query string, _ username.SQLUsername, qargs ...interface{},
ctx context.Context,
_ redact.RedactableString,
query string,
_ username.SQLUsername,
qargs ...interface{},
) (*FileToTableExecutorRows, error) {
result := FileToTableExecutorRows{}

Expand All @@ -118,7 +131,11 @@ func (i *SQLConnFileToTableExecutor) Query(

// Exec implements the FileToTableSystemExecutor interface.
func (i *SQLConnFileToTableExecutor) Exec(
ctx context.Context, _, query string, _ username.SQLUsername, qargs ...interface{},
ctx context.Context,
_ redact.RedactableString,
query string,
_ username.SQLUsername,
qargs ...interface{},
) error {
return i.executor.Exec(ctx, query, qargs...)
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import (
"github.com/cockroachdb/errors"
"github.com/cockroachdb/errors/oserror"
"github.com/cockroachdb/logtags"
"github.com/cockroachdb/redact"
)

// adoptedJobs represents the epoch and cancellation of a job id being run by
Expand Down Expand Up @@ -1617,8 +1618,7 @@ func (r *Registry) stepThroughStateMachine(
return errors.NewAssertionErrorWithWrappedErrf(jobErr,
"job %d: resuming with non-nil error", job.ID())
}
resumeCtx := logtags.AddTag(ctx, "job",
fmt.Sprintf("%s id=%d", jobType, job.ID()))
resumeCtx := logtags.AddTag(ctx, "job", redact.Sprintf("%s id=%d", jobType, job.ID()))
// Adding all tags as pprof labels (including the one we just added for job
// type and id).
resumeCtx, undo := pprofutil.SetProfilerLabelsFromCtxTags(resumeCtx)
Expand Down
1 change: 1 addition & 0 deletions pkg/kv/kvserver/protectedts/ptstorage/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ go_test(
"//pkg/util/randutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_redact//:redact",
"@com_github_stretchr_testify//require",
],
)
Expand Down
Loading

0 comments on commit f77bcd0

Please sign in to comment.