Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
89747: restore: only read from manifests with startTime < AOST r=stevendanna a=msbutler

Previously, all backup manifests in the chain could be added to a restoreSpanEntry during makeSimpleImportSpans, even if the the backup manifest's startTime was greater than the user's AOST. This caused an unecessary amount of data to get read from external storage and filtered out in the restore processor.

This small patch prevents these newer manifests from being considered in makeSimpleImportSpans and thus unecessarily read from during the distsql flow.

Release note: None

89888: tenantcostserver, filetable: replace some `*.executor`s with `InternalExecutorFactory` r=ajwerner a=ZhouXing19

This is part of migrating the existing usages of the internal executor to the new internal executor interfaces.
The changes are based on this logic: trace up the `txn` in the exec functions of the internal executor, find out where 
they are from, and ensure that an IE is created along with this txn via `planner.WithInternalExecutor` or `sqlutil.InternalExecutorFactory.TxnWithExecutor`.

The new interfaces allows the internal executor bound to the outer txn, along with txn-related metadata.

Release note: None

link Epic CRDB-19135

89899: kvserver: improve consistency check fail message r=erikgrinaker a=pavelkalinnikov

Epic: none
Touches #21128

Release note (ops change): Consistency check failure message is made more informative, and suggests a few actions that operators should/may do in the unlikely event it occurs.

Co-authored-by: Michael Butler <[email protected]>
Co-authored-by: Jane Xing <[email protected]>
Co-authored-by: Pavel Kalinnikov <[email protected]>
  • Loading branch information
4 people committed Oct 17, 2022
4 parents 9281a67 + d7d87a7 + 60a2a80 + 442bf90 commit 669e9a5
Show file tree
Hide file tree
Showing 19 changed files with 201 additions and 152 deletions.
10 changes: 7 additions & 3 deletions pkg/ccl/backupccl/backupinfo/manifest_handling.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,19 +656,20 @@ func WriteTableStatistics(
return cloud.WriteFile(ctx, exportStore, BackupStatisticsFileName, bytes.NewReader(statsBuf))
}

// LoadBackupManifests reads and returns the BackupManifests at the
// ExternalStorage locations in `uris`.
// LoadBackupManifestsAtTime reads and returns the BackupManifests at the
// ExternalStorage locations in `uris`. Only manifests with a startTime < AsOf are returned.
//
// The caller is responsible for shrinking `mem` by the returned size once they
// are done with the returned manifests.
func LoadBackupManifests(
func LoadBackupManifestsAtTime(
ctx context.Context,
mem *mon.BoundAccount,
uris []string,
user username.SQLUsername,
makeExternalStorageFromURI cloud.ExternalStorageFromURIFactory,
encryption *jobspb.BackupEncryptionOptions,
kmsEnv cloud.KMSEnv,
asOf hlc.Timestamp,
) ([]backuppb.BackupManifest, int64, error) {
ctx, sp := tracing.ChildSpan(ctx, "backupinfo.LoadBackupManifests")
defer sp.Finish()
Expand All @@ -686,6 +687,9 @@ func LoadBackupManifests(
if err != nil {
return nil, 0, errors.Wrapf(err, "failed to read backup descriptor")
}
if !asOf.IsEmpty() && asOf.Less(desc.StartTime) {
break
}
reserved += memSize
backupManifests[i] = desc
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ func loadBackupSQLDescs(
encryption *jobspb.BackupEncryptionOptions,
kmsEnv cloud.KMSEnv,
) ([]backuppb.BackupManifest, backuppb.BackupManifest, []catalog.Descriptor, int64, error) {
backupManifests, sz, err := backupinfo.LoadBackupManifests(ctx, mem, details.URIs,
p.User(), p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, encryption, kmsEnv)
backupManifests, sz, err := backupinfo.LoadBackupManifestsAtTime(ctx, mem, details.URIs,
p.User(), p.ExecCfg().DistSQLSrv.ExternalStorageFromURI, encryption, kmsEnv, details.EndTime)
if err != nil {
return nil, backuppb.BackupManifest{}, nil, 0, err
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/ccl/multitenantccl/tenantcostserver/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ go_library(
"//pkg/sql/pgwire/pgerror",
"//pkg/sql/sem/tree",
"//pkg/sql/sessiondata",
"//pkg/sql/sqlutil",
"//pkg/util/buildutil",
"//pkg/util/log",
"//pkg/util/metric",
Expand Down Expand Up @@ -55,6 +56,7 @@ go_test(
"//pkg/security/securitytest",
"//pkg/server",
"//pkg/sql",
"//pkg/sql/sqlutil",
"//pkg/testutils",
"//pkg/testutils/metrictestutils",
"//pkg/testutils/serverutils",
Expand Down
14 changes: 8 additions & 6 deletions pkg/ccl/multitenantccl/tenantcostserver/configure.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/sessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/errors"
)

Expand All @@ -26,18 +27,19 @@ import (
func (s *instance) ReconfigureTokenBucket(
ctx context.Context,
txn *kv.Txn,
ie sqlutil.InternalExecutor,
tenantID roachpb.TenantID,
availableRU float64,
refillRate float64,
maxBurstRU float64,
asOf time.Time,
asOfConsumedRequestUnits float64,
) error {
if err := s.checkTenantID(ctx, txn, tenantID); err != nil {
if err := s.checkTenantID(ctx, txn, ie, tenantID); err != nil {
return err
}
h := makeSysTableHelper(ctx, s.executor, txn, tenantID)
state, err := h.readTenantState()
h := makeSysTableHelper(ctx, tenantID)
state, err := h.readTenantState(txn, ie)
if err != nil {
return err
}
Expand All @@ -47,17 +49,17 @@ func (s *instance) ReconfigureTokenBucket(
ctx, tenantID, availableRU, refillRate, maxBurstRU, asOf, asOfConsumedRequestUnits,
now, state.Consumption.RU,
)
if err := h.updateTenantState(state); err != nil {
if err := h.updateTenantState(state, ie, txn); err != nil {
return err
}
return nil
}

// checkTenantID verifies that the tenant exists and is active.
func (s *instance) checkTenantID(
ctx context.Context, txn *kv.Txn, tenantID roachpb.TenantID,
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor, tenantID roachpb.TenantID,
) error {
row, err := s.executor.QueryRowEx(
row, err := ie.QueryRowEx(
ctx, "check-tenant", txn, sessiondata.NodeUserSessionDataOverride,
`SELECT active FROM system.tenants WHERE id = $1`, tenantID.ToUint64(),
)
Expand Down
12 changes: 6 additions & 6 deletions pkg/ccl/multitenantccl/tenantcostserver/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ import (
"github.com/cockroachdb/cockroach/pkg/server"
"github.com/cockroachdb/cockroach/pkg/settings"
"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/util/metric"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
)

type instance struct {
db *kv.DB
executor *sql.InternalExecutor
ief sqlutil.InternalExecutorFactory
metrics Metrics
timeSource timeutil.TimeSource
settings *cluster.Settings
Expand All @@ -43,12 +43,12 @@ var instanceInactivity = settings.RegisterDurationSetting(
func newInstance(
settings *cluster.Settings,
db *kv.DB,
executor *sql.InternalExecutor,
ief sqlutil.InternalExecutorFactory,
timeSource timeutil.TimeSource,
) *instance {
res := &instance{
db: db,
executor: executor,
ief: ief,
timeSource: timeSource,
settings: settings,
}
Expand All @@ -67,8 +67,8 @@ func init() {
server.NewTenantUsageServer = func(
settings *cluster.Settings,
db *kv.DB,
executor *sql.InternalExecutor,
ief sqlutil.InternalExecutorFactory,
) multitenant.TenantUsageServer {
return newInstance(settings, db, executor, timeutil.DefaultTimeSource{})
return newInstance(settings, db, ief, timeutil.DefaultTimeSource{})
}
}
20 changes: 15 additions & 5 deletions pkg/ccl/multitenantccl/tenantcostserver/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/multitenant"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/sqlutil"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/testutils/metrictestutils"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
Expand Down Expand Up @@ -83,7 +84,8 @@ func (ts *testState) start(t *testing.T) {
ts.tenantUsage = tenantcostserver.NewInstance(
ts.s.ClusterSettings(),
ts.kvDB,
ts.s.InternalExecutor().(*sql.InternalExecutor), ts.clock,
ts.s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory),
ts.clock,
)
ts.metricsReg = metric.NewRegistry()
ts.metricsReg.AddMetricStruct(ts.tenantUsage.Metrics())
Expand Down Expand Up @@ -241,12 +243,20 @@ func (ts *testState) configure(t *testing.T, d *datadriven.TestData) string {
if err := yaml.UnmarshalStrict([]byte(d.Input), &args); err != nil {
d.Fatalf(t, "failed to parse request yaml: %v", err)
}
if err := ts.kvDB.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
ief := ts.s.InternalExecutorFactory().(sqlutil.InternalExecutorFactory)
if err := ief.TxnWithExecutor(context.Background(), ts.kvDB, nil /* sessionData */, func(
ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor,
) error {
return ts.tenantUsage.ReconfigureTokenBucket(
ctx, txn,
ctx,
txn,
ie,
roachpb.MakeTenantID(tenantID),
args.AvailableRU, args.RefillRate, args.MaxBurstRU,
time.Time{}, 0,
args.AvailableRU,
args.RefillRate,
args.MaxBurstRU,
time.Time{},
0,
)
}); err != nil {
d.Fatalf(t, "reconfigure error: %v", err)
Expand Down
Loading

0 comments on commit 669e9a5

Please sign in to comment.