Skip to content

Commit

Permalink
Merge pull request #134675 from cockroachdb/blathers/backport-release…
Browse files Browse the repository at this point in the history
…-24.3-134655

release-24.3: backupccl: remove ldr job id references on restoring tables
  • Loading branch information
msbutler authored Nov 8, 2024
2 parents 045ef0a + c8bc9d0 commit d8ebfbe
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 0 deletions.
4 changes: 4 additions & 0 deletions pkg/ccl/backupccl/restore_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -1174,6 +1174,10 @@ func createImportingDescriptors(
// Set the new descriptors' states to offline.
for _, desc := range mutableTables {
desc.SetOffline("restoring")

// Remove any LDR Jobs from the table descriptor, ensuring schema changes
// can be run on the table descriptor.
desc.LDRJobIDs = nil
}
for _, desc := range typesToWrite {
desc.SetOffline("restoring")
Expand Down
31 changes: 31 additions & 0 deletions pkg/ccl/crosscluster/logical/logical_replication_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,37 @@ func TestLogicalStreamIngestionCancelUpdatesProducerJob(t *testing.T) {
replicationutils.WaitForPTSProtectionToNotExist(t, ctx, dbA, s, producerJobID)
}

func TestRestoreFromLDR(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderDeadlock(t)
defer log.Scope(t).Close(t)

ctx := context.Background()
dataDir, dirCleanupFunc := testutils.TempDir(t)
defer dirCleanupFunc()
args := testClusterBaseClusterArgs
args.ServerArgs.ExternalIODir = dataDir
server, s, dbA, dbB := setupLogicalTestServer(t, ctx, args, 1)
defer server.Stopper().Stop(ctx)

dbAURL, cleanup := s.PGUrl(t, serverutils.DBName("a"))
defer cleanup()

var jobBID jobspb.JobID
dbA.Exec(t, "INSERT INTO tab VALUES (1, 'hello')")
dbB.QueryRow(t, "CREATE LOGICAL REPLICATION STREAM FROM TABLE tab ON $1 INTO TABLE tab", dbAURL.String()).Scan(&jobBID)

WaitUntilReplicatedTime(t, s.Clock().Now(), dbB, jobBID)

dbB.Exec(t, "BACKUP DATABASE b INTO 'nodelocal://1/backup'")
dbB.Exec(t, "RESTORE DATABASE b FROM LATEST IN 'nodelocal://1/backup' with new_db_name = 'c'")

// Verify that the index backfill schema changes can run on the restored table.
dbC := sqlutils.MakeSQLRunner(s.SQLConn(t, serverutils.DBName("c")))
dbC.Exec(t, "ALTER TABLE tab ADD COLUMN new_col INT DEFAULT 2")
dbC.CheckQueryResults(t, "SELECT * FROM tab", [][]string{{"1", "hello", "2"}})
}

func TestLogicalStreamIngestionErrors(t *testing.T) {
defer leaktest.AfterTest(t)()
skip.UnderDeadlock(t)
Expand Down

0 comments on commit d8ebfbe

Please sign in to comment.