Skip to content

Commit

Permalink
streamingest: ALTER VIRTUAL CLUSTER RESET DATA
Browse files Browse the repository at this point in the history
This enables resetting a virtual cluster's data to a prior timestamp.

This is possible if the prior timestamp is still retained in the mvcc
history of the virtual cluster, the virtual cluster has stopped service,
and is run by a user with the MANAGEVIRTUALCLUSTER (or admin) privilage
in the system tenant.

Revisions of data in the system tenant newer than the target time to
which it is being reset are destoryed, reverting the tenant to the state
it was in as of the time reverted to. Destroyed revisions are not
recoverable; once a tenant has been reset to a timestamp, it cannot be
'unreset' back to a higher timestmap.

Release note (cluster virtualization): Added a new 'flashback' command
to revert a virtual cluster to an earlier state using ALTER VIRTUAL
CLUSTER RESET DATA.

Epic: CRDB-34233.
  • Loading branch information
dt committed Jan 9, 2024
1 parent 5d6cf56 commit 7c18055
Show file tree
Hide file tree
Showing 3 changed files with 60 additions and 3 deletions.
56 changes: 56 additions & 0 deletions pkg/ccl/streamingccl/streamingest/alter_replication_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (

const (
alterReplicationJobOp = "ALTER VIRTUAL CLUSTER REPLICATION"
alterTenantResetOp = "ALTER VIRTUAL CLUSTER RESET"
createReplicationOp = "CREATE VIRTUAL CLUSTER FROM REPLICATION"
)

Expand Down Expand Up @@ -349,6 +350,61 @@ func alterTenantOptions(

}

func alterTenantResetHook(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (sql.PlanHookRowFn, colinfo.ResultColumns, []sql.PlanNode, bool, error) {
alterTenantStmt, ok := stmt.(*tree.AlterTenantReset)
if !ok {
return nil, nil, nil, false, nil
}
if !p.ExecCfg().Codec.ForSystemTenant() {
return nil, nil, nil, false, pgerror.Newf(pgcode.InsufficientPrivilege, "only the system tenant can alter tenant")
}

timestamp, err := asof.EvalSystemTimeExpr(ctx, &p.ExtendedEvalContext().Context, p.SemaCtx(), alterTenantStmt.Timestamp,
alterTenantResetOp, asof.ReplicationCutover)
if err != nil {
return nil, nil, nil, false, err
}

fn := func(ctx context.Context, _ []sql.PlanNode, resultsCh chan<- tree.Datums) error {
if err := sql.CanManageTenant(ctx, p); err != nil {
return err
}
if err := utilccl.CheckEnterpriseEnabled(p.ExecCfg().Settings, alterTenantResetOp); err != nil {
return err
}

tenInfo, err := p.LookupTenantInfo(ctx, alterTenantStmt.TenantSpec, alterTenantResetOp)
if err != nil {
return err
}
return revertTenantToTimestamp(ctx, &p.ExtendedEvalContext().Context, tenInfo.Name, timestamp, p.ExtendedEvalContext().SessionID)
}
return fn, nil, nil, false, nil
}

func alterTenantResetHookTypeCheck(
ctx context.Context, stmt tree.Statement, p sql.PlanHookState,
) (bool, colinfo.ResultColumns, error) {
alterStmt, ok := stmt.(*tree.AlterTenantReset)
if !ok {
return false, nil, nil
}
if err := exprutil.TypeCheck(
ctx, alterTenantResetOp, p.SemaCtx(), exprutil.TenantSpec{TenantSpec: alterStmt.TenantSpec},
); err != nil {
return false, nil, err
}
if _, err := asof.TypeCheckSystemTimeExpr(
ctx, p.SemaCtx(), alterStmt.Timestamp, alterTenantResetOp,
); err != nil {
return false, nil, err
}
return true, nil, nil
}

func init() {
sql.AddPlanHook("alter replication job", alterReplicationJobHook, alterReplicationJobTypeCheck)
sql.AddPlanHook("alter virtual cluster reset", alterTenantResetHook, alterTenantResetHookTypeCheck)
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func TestTenantStreamingCreationErrors(t *testing.T) {
})
t.Run("destination tenant revert timestamp must match resume timestamp", func(t *testing.T) {
sysSQL.Exec(t, "CREATE TENANT bat")
sysSQL.Exec(t, "SELECT crdb_internal.unsafe_revert_tenant_to_timestamp('bat', cluster_logical_timestamp())")
sysSQL.Exec(t, "ALTER VIRTUAL CLUSTER bat RESET DATA TO SYSTEM TIME cluster_logical_timestamp()")
sysSQL.ExpectErr(t, "doesn't match last revert timestamp",
"CREATE TENANT bat FROM REPLICATION OF source ON $1 WITH RESUME TIMESTAMP = cluster_logical_timestamp()", srcPgURL.String())
})
Expand Down Expand Up @@ -251,7 +251,7 @@ func TestTenantStreamingFailback(t *testing.T) {
sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f STOP SERVICE")
waitUntilTenantServerStopped(t, serverA.SystemLayer(), "f")
t.Logf("starting replication g->f")
sqlA.Exec(t, fmt.Sprintf("SELECT crdb_internal.unsafe_revert_tenant_to_timestamp('f', %s)", ts1))
sqlA.Exec(t, "ALTER VIRTUAL CLUSTER f RESET DATA TO SYSTEM TIME ($1::decimal)", ts1)
sqlA.Exec(t, fmt.Sprintf("CREATE VIRTUAL CLUSTER f FROM REPLICATION OF g ON $1 WITH RESUME TIMESTAMP = '%s'", ts1), serverBURL.String())
_, consumerFJobID := replicationtestutils.GetStreamJobIds(t, ctx, sqlA, roachpb.TenantName("f"))
t.Logf("waiting for f@%s", ts2)
Expand Down Expand Up @@ -281,7 +281,7 @@ func TestTenantStreamingFailback(t *testing.T) {
sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g STOP SERVICE")
waitUntilTenantServerStopped(t, serverB.SystemLayer(), "g")
t.Logf("starting replication f->g")
sqlB.Exec(t, fmt.Sprintf("SELECT crdb_internal.unsafe_revert_tenant_to_timestamp('g', %s)", ts3))
sqlB.Exec(t, "ALTER VIRTUAL CLUSTER g RESET DATA TO SYSTEM TIME ($1::decimal)", ts3)
sqlB.Exec(t, fmt.Sprintf("CREATE VIRTUAL CLUSTER g FROM REPLICATION OF f ON $1 WITH RESUME TIMESTAMP = '%s'", ts3), serverAURL.String())
_, consumerGJobID = replicationtestutils.GetStreamJobIds(t, ctx, sqlB, roachpb.TenantName("g"))
t.Logf("waiting for g@%s", ts3)
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/opaque.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,7 @@ func init() {
&tree.AlterBackup{},
&tree.AlterBackupSchedule{},
&tree.AlterTenantReplication{},
&tree.AlterTenantReset{},
&tree.Backup{},
&tree.ShowBackup{},
&tree.Restore{},
Expand Down

0 comments on commit 7c18055

Please sign in to comment.