Skip to content

Commit

Permalink
crosscluster/physical: add crdb_internal.setup_read_from_standby
Browse files Browse the repository at this point in the history
Previously, we had no way of replicating the catalog from a source
tenant into a destination tenant, so that the destination tenant can
read from the soruce tenant. This patch adds support for extracting and
setting up a reader catalog using crdb_internal.setup_read_from_standby

Release note: None
Epic: CRDB-37521
Fixes: cockroachdb#129439
  • Loading branch information
dt authored and fqazi committed Aug 22, 2024
1 parent 9a7545b commit 625f22a
Show file tree
Hide file tree
Showing 7 changed files with 375 additions and 0 deletions.
12 changes: 12 additions & 0 deletions pkg/ccl/crosscluster/physical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,17 @@ go_library(
"//pkg/settings/cluster",
"//pkg/spanconfig",
"//pkg/sql",
"//pkg/sql/catalog",
"//pkg/sql/catalog/catsessiondata",
"//pkg/sql/catalog/colinfo",
"//pkg/sql/catalog/dbdesc",
"//pkg/sql/catalog/descpb",
"//pkg/sql/catalog/descs",
"//pkg/sql/catalog/funcdesc",
"//pkg/sql/catalog/nstree",
"//pkg/sql/catalog/schemadesc",
"//pkg/sql/catalog/tabledesc",
"//pkg/sql/catalog/typedesc",
"//pkg/sql/clusterunique",
"//pkg/sql/execinfra",
"//pkg/sql/execinfrapb",
Expand All @@ -76,6 +85,7 @@ go_library(
"//pkg/util/humanizeutil",
"//pkg/util/log",
"//pkg/util/metric",
"//pkg/util/mon",
"//pkg/util/protoutil",
"//pkg/util/retry",
"//pkg/util/span",
Expand Down Expand Up @@ -108,6 +118,7 @@ go_test(
"stream_ingestion_job_test.go",
"stream_ingestion_manager_test.go",
"stream_ingestion_processor_test.go",
"stream_reader_catalog_test.go",
],
data = glob(["testdata/**"]),
embed = [":physical"],
Expand Down Expand Up @@ -184,6 +195,7 @@ go_test(
"//pkg/util/randutil",
"//pkg/util/rangedesc",
"//pkg/util/span",
"//pkg/util/stop",
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"@com_github_cockroachdb_datadriven//:datadriven",
Expand Down
191 changes: 191 additions & 0 deletions pkg/ccl/crosscluster/physical/stream_ingest_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,23 @@ import (
"github.com/cockroachdb/cockroach/pkg/ccl/utilccl"
"github.com/cockroachdb/cockroach/pkg/jobs"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/kv"
"github.com/cockroachdb/cockroach/pkg/multitenant/mtinfopb"
"github.com/cockroachdb/cockroach/pkg/repstream"
"github.com/cockroachdb/cockroach/pkg/repstream/streampb"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/catsessiondata"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/dbdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/funcdesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/nstree"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/schemadesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/typedesc"
"github.com/cockroachdb/cockroach/pkg/sql/clusterunique"
"github.com/cockroachdb/cockroach/pkg/sql/isql"
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
Expand All @@ -29,6 +42,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/syntheticprivilege"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/errors"
)

Expand Down Expand Up @@ -109,3 +123,180 @@ func getReplicationStatsAndStatus(
func init() {
repstream.GetStreamIngestManagerHook = newStreamIngestManagerWithPrivilegesCheck
}

// SetupReaderCatalog implements streaming.StreamIngestManager interface.
func (r *streamIngestManagerImpl) SetupReaderCatalog(
ctx context.Context, from, to roachpb.TenantName, asOf hlc.Timestamp,
) error {
execCfg := r.evalCtx.Planner.ExecutorConfig().(*sql.ExecutorConfig)
var fromID, toID roachpb.TenantID
if err := execCfg.InternalDB.Txn(ctx, func(ctx context.Context, txn isql.Txn) error {
fromTenant, err := sql.GetTenantRecordByName(ctx, r.evalCtx.Settings, txn, from)
if err != nil {
return err
}
fromID, err = roachpb.MakeTenantID(fromTenant.ID)
if err != nil {
return err
}
toTenant, err := sql.GetTenantRecordByName(ctx, r.evalCtx.Settings, txn, to)
if err != nil {
return err
}
toID, err = roachpb.MakeTenantID(toTenant.ID)
if err != nil {
return err
}
if fromTenant.DataState != mtinfopb.DataStateReady {
if fromTenant.PhysicalReplicationConsumerJobID == 0 {
return errors.Newf("cannot copy catalog from tenant %s in state %s", from, fromTenant.DataState)
}
job, err := r.jobRegistry.LoadJobWithTxn(ctx, fromTenant.PhysicalReplicationConsumerJobID, txn)
if err != nil {
return errors.Wrap(err, "loading tenant replication job")
}
progress := job.Progress()
replicatedTime := replicationutils.ReplicatedTimeFromProgress(&progress)
if asOf.IsEmpty() {
asOf = replicatedTime
} else if replicatedTime.Less(asOf) {
return errors.Newf("timestamp is not replicated yet")
}
} else if asOf.IsEmpty() {
asOf = execCfg.Clock.Now()
}
if toTenant.ServiceMode != mtinfopb.ServiceModeNone {
return errors.Newf("tenant %s must have service stopped to enable reader mode",
toTenant.Name)
}

return nil
}); err != nil {
return err
}

if fromID.Equal(roachpb.SystemTenantID) || toID.Equal(roachpb.SystemTenantID) {
return errors.New("cannot revert the system tenant")
}

extracted, err := getCatalogForTenantAsOf(ctx, execCfg, fromID, asOf)
if err != nil {
return err
}

m := mon.NewUnlimitedMonitor(ctx, mon.Options{
Name: "tenant_reader",
Res: mon.MemoryResource,
Settings: execCfg.Settings,
})
// Inherit session data, so that we can run validation.
dsdp := catsessiondata.NewDescriptorSessionDataStackProvider(r.evalCtx.SessionDataStack)
writeDescs := descs.NewBareBonesCollectionFactory(execCfg.Settings, keys.MakeSQLCodec(toID)).
NewCollection(ctx, descs.WithMonitor(m), descs.WithDescriptorSessionDataProvider(dsdp))
return execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
// Reset any state between txn retries.
defer writeDescs.ReleaseAll(ctx)
// Resolve any existing descriptors within the tenant, which
// will be use to compute old values for writing.
existingDescriptors, err := writeDescs.GetAllFromStorageUnvalidated(ctx, txn)
if err != nil {
return err
}
b := txn.NewBatch()
if err := extracted.ForEachDescriptor(func(desc catalog.Descriptor) error {
if desc.GetParentID() == keys.SystemDatabaseID ||
desc.GetID() == keys.SystemDatabaseID {
return nil
}
// If there is an existing descriptor with the same ID, we should
// determine the old bytes in storage for the upsert.
var existingRawBytes []byte
if existingDesc := existingDescriptors.LookupDescriptor(desc.GetID()); existingDesc != nil {
existingRawBytes = existingDesc.GetRawBytesInStorage()
}
var mut catalog.MutableDescriptor
switch t := desc.DescriptorProto().GetUnion().(type) {
case *descpb.Descriptor_Table:
t.Table.Version = 1
mutBuilder := tabledesc.NewBuilder(t.Table)
if existingRawBytes != nil {
mutBuilder.SetRawBytesInStorage(existingRawBytes)
}
mutTbl := mutBuilder.BuildCreatedMutable().(*tabledesc.Mutable)
mut = mutTbl
// Convert any physical tables into external row tables.
// Note: Materialized views will be converted, but their
// view definition will be wiped.
if mutTbl.IsPhysicalTable() {
mutTbl.ViewQuery = ""
mutTbl.SetExternalRowData(&descpb.ExternalRowData{TenantID: fromID, TableID: desc.GetID(), AsOf: asOf})
}
case *descpb.Descriptor_Database:
t.Database.Version = 1
mutBuilder := dbdesc.NewBuilder(t.Database)
if existingRawBytes != nil {
mutBuilder.SetRawBytesInStorage(existingRawBytes)
}
mut = mutBuilder.BuildCreatedMutable()
case *descpb.Descriptor_Schema:
t.Schema.Version = 1
mutBuilder := schemadesc.NewBuilder(t.Schema)
if existingRawBytes != nil {
mutBuilder.SetRawBytesInStorage(existingRawBytes)
}
mut = mutBuilder.BuildCreatedMutable()
case *descpb.Descriptor_Function:
t.Function.Version = 1
mutBuilder := funcdesc.NewBuilder(t.Function)
if existingRawBytes != nil {
mutBuilder.SetRawBytesInStorage(existingRawBytes)
}
mut = mutBuilder.BuildCreatedMutable()
case *descpb.Descriptor_Type:
t.Type.Version = 1
mutBuilder := typedesc.NewBuilder(t.Type)
if existingRawBytes != nil {
mutBuilder.SetRawBytesInStorage(existingRawBytes)
}
mut = mutBuilder.BuildCreatedMutable()
}
return errors.Wrapf(writeDescs.WriteDescToBatch(ctx, true, mut, b),
"unable to create replicated descriptor: %d %T", mut.GetID(), mut)
}); err != nil {
return err
}
if err := extracted.ForEachNamespaceEntry(func(e nstree.NamespaceEntry) error {
if e.GetParentID() == keys.SystemDatabaseID ||
e.GetID() == keys.SystemDatabaseID {
return nil
}
return errors.Wrapf(writeDescs.UpsertNamespaceEntryToBatch(ctx, true, e, b), "namespace entry %v", e)
}); err != nil {
return err
}
return errors.Wrap(txn.Run(ctx, b), "running batch")
})
}

// getCatalogForTenantAsOf reads the descriptors from a given tenant
// at the given timestamp.
func getCatalogForTenantAsOf(
ctx context.Context, execCfg *sql.ExecutorConfig, tenantID roachpb.TenantID, asOf hlc.Timestamp,
) (all nstree.Catalog, _ error) {
cf := descs.NewBareBonesCollectionFactory(execCfg.Settings, keys.MakeSQLCodec(tenantID))
err := execCfg.DB.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
err := txn.SetFixedTimestamp(ctx, asOf)
if err != nil {
return err
}
descs := cf.NewCollection(ctx)
defer descs.ReleaseAll(ctx)
all, err = descs.GetAllFromStorageUnvalidated(ctx, txn)
if err != nil {
return err
}

return nil
})
return all, err
}
119 changes: 119 additions & 0 deletions pkg/ccl/crosscluster/physical/stream_reader_catalog_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
// Copyright 2024 The Cockroach Authors.
//
// Licensed as a CockroachDB Enterprise file under the Cockroach Community
// License (the "License"); you may not use this file except in compliance with
// the License. You may obtain a copy of the License at
//
// https://github.com/cockroachdb/cockroach/blob/master/licenses/CCL.txt

package physical

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/stop"
"github.com/stretchr/testify/require"
)

// TestSetupReaderCatalog validates creating a reader catalog
// using crdb_internal.setup_read_from_standby.
func TestSetupReaderCatalog(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()

ts := serverutils.StartServerOnly(t, base.TestServerArgs{
DefaultTestTenant: base.TestControlsTenantsExplicitly,
Insecure: true,
},
)
defer ts.Stopper().Stop(ctx)

// Create the src tenant and insert data into it.
srcID, err := roachpb.MakeTenantID(4)
require.NoError(t, err)
srcStopper := stop.NewStopper()
srcTenant, err := ts.TenantController().StartTenant(ctx, base.TestTenantArgs{
TenantName: "src",
TenantID: srcID,
Stopper: srcStopper,
ForceInsecure: true,
})
require.NoError(t, err)
srcConn := srcTenant.SQLConn(t)
srcRunner := sqlutils.MakeSQLRunner(srcConn)

stmts := []string{
"CREATE DATABASE db1",
"CREATE SCHEMA db1.sc1",
"CREATE SEQUENCE sq1",
"CREATE TYPE IF NOT EXISTS status AS ENUM ('open', 'closed', 'inactive')",
"CREATE TABLE t1(n int default nextval('sq1'), val status)",
"INSERT INTO t1(val) VALUES('open')",
"INSERT INTO t1(val) VALUES('closed')",
"INSERT INTO t1(val) VALUES('inactive')",
"CREATE VIEW v1 AS (SELECT n from t1)",
}

for _, stmt := range stmts {
srcRunner.Exec(t, stmt)
}
defer srcTenant.AppStopper().Stop(ctx)

// Create the tenant to replicate into.
destName := roachpb.TenantName("dest")
createDest := func() (serverutils.ApplicationLayerInterface, *stop.Stopper) {
destID, err := roachpb.MakeTenantID(5)
require.NoError(t, err)
destStopper := stop.NewStopper()
destStopperTenant, err := ts.TenantController().StartTenant(ctx, base.TestTenantArgs{
TenantName: destName,
TenantID: destID,
Stopper: destStopper,
ForceInsecure: true,
})
require.NoError(t, err)
return destStopperTenant, destStopper
}
_, destStopper := createDest()
destStopper.Stop(ctx)

systemConn := ts.SQLConn(t)
systemRunner := sqlutils.MakeSQLRunner(systemConn)
systemRunner.Exec(t, "ALTER VIRTUAL CLUSTER dest STOP SERVICE ")

destName = ""
// Setup the reader catalog.
systemTenant := ts.SQLConn(t)
now := ts.Clock().Now()
_, err = systemTenant.Exec("SELECT * FROM crdb_internal.setup_read_from_standby('src', 'dest', $1);", now.WallTime)
require.NoError(t, err)

// Confirm that data is readable.
dest, destStopper := createDest()
defer destStopper.Stop(ctx)

destConn := dest.SQLConn(t)
destRunner := sqlutils.MakeSQLRunner(destConn)

// compareQueries executes the same query on both catalogs
// and expects the same results.
compareQueries := func(query string) {
expectedResults := srcRunner.QueryStr(t, fmt.Sprintf("SELECT * FROM (%s) AS OF SYSTEM TIME %s", query, now.AsOfSystemTime()))
destRunner.CheckQueryResults(t, query, expectedResults)
}

// Validate basic queries execute correctly, and we can
// read data within tables.
compareQueries("SELECT * FROM t1 ORDER BY n")
compareQueries("SELECT * FROM v1 ORDER BY 1")
}
7 changes: 7 additions & 0 deletions pkg/sql/catalog/tabledesc/structured.go
Original file line number Diff line number Diff line change
Expand Up @@ -2331,6 +2331,13 @@ func (desc *Mutable) SetOffline(reason string) {
desc.OfflineReason = reason
}

func (desc *Mutable) SetExternalRowData(ext *descpb.ExternalRowData) {
// Do not set materialized views for sequences, they will
// handle this on their own.
desc.IsMaterializedView = !desc.IsSequence()
desc.External = ext
}

// IsLocalityRegionalByRow implements the TableDescriptor interface.
func (desc *wrapper) IsLocalityRegionalByRow() bool {
return desc.LocalityConfig.GetRegionalByRow() != nil
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sem/builtins/fixed_oids.go
Original file line number Diff line number Diff line change
Expand Up @@ -2607,6 +2607,7 @@ var builtinOidsArray = []string{
2639: `crdb_internal.start_replication_stream_for_tables(req: bytes) -> bytes`,
2640: `crdb_internal.clear_query_plan_cache() -> void`,
2641: `crdb_internal.clear_table_stats_cache() -> void`,
2642: `crdb_internal.setup_read_from_standby(src_tenant: string, dst_tenant: string, ts: decimal) -> decimal`,
}

var builtinOidsBySignature map[string]oid.Oid
Expand Down
Loading

0 comments on commit 625f22a

Please sign in to comment.