Skip to content

Commit

Permalink
roachtest: fix c2c roachtests that read job payload
Browse files Browse the repository at this point in the history
In cockroachdb#98608 some changes were made to the c2c roachtests
that caused them to break. This change fixes the tests
by querying the correct virutal table.

Fixes: cockroachdb#98973
Fixes: cockroachdb#98969
Fixes: cockroachdb#98962
Informs: cockroachdb#98669

Release note: None
  • Loading branch information
adityamaru committed Mar 22, 2023
1 parent b26b338 commit c70bc16
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 17 deletions.
1 change: 0 additions & 1 deletion pkg/cmd/roachtest/tests/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,6 @@ go_library(
"@com_github_aws_aws_sdk_go_v2_service_rds//:rds",
"@com_github_aws_aws_sdk_go_v2_service_rds//types",
"@com_github_aws_aws_sdk_go_v2_service_secretsmanager//:secretsmanager",
"@com_github_cockroachdb_apd_v3//:apd",
"@com_github_cockroachdb_cockroach_go_v2//crdb",
"@com_github_cockroachdb_errors//:errors",
"@com_github_cockroachdb_ttycolor//:ttycolor",
Expand Down
35 changes: 19 additions & 16 deletions pkg/cmd/roachtest/tests/cluster_to_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"sort"
"time"

"github.com/cockroachdb/apd/v3"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/clusterstats"
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
Expand Down Expand Up @@ -472,7 +471,7 @@ func (sp *replicationTestSpec) stopReplicationStream(ingestionJob int, cutoverTi
err := retry.ForDuration(time.Minute*5, func() error {
var status string
var payloadBytes []byte
sp.setup.dst.sysSQL.QueryRow(sp.t, `SELECT status, payload FROM crdb_internal.jobs WHERE job_id = $1`,
sp.setup.dst.sysSQL.QueryRow(sp.t, `SELECT status, payload FROM crdb_internal.system_jobs WHERE id = $1`,
ingestionJob).Scan(&status, &payloadBytes)
if jobs.Status(status) == jobs.StatusFailed {
payload := &jobspb.Payload{}
Expand Down Expand Up @@ -723,28 +722,32 @@ func (c *streamIngesitonJobInfo) GetError() string { return c.status }
var _ jobInfo = (*streamIngesitonJobInfo)(nil)

func getStreamIngestionJobInfo(db *gosql.DB, jobID int) (jobInfo, error) {
var status, errMsg string
var decimalHighWater *apd.Decimal
var finished hlc.Timestamp
var status string
var payloadBytes []byte
var progressBytes []byte
if err := db.QueryRow(
`SELECT status, high_water_timestamp, error, finished FROM crdb_internal.jobs WHERE job_id = $1`, jobID,
).Scan(&status, &decimalHighWater, &errMsg, &finished); err != nil {
`SELECT status, payload, progress FROM crdb_internal.system_jobs WHERE id = $1`, jobID,
).Scan(&status, &payloadBytes, &progressBytes); err != nil {
return nil, err
}
var payload jobspb.Payload
if err := protoutil.Unmarshal(payloadBytes, &payload); err != nil {
return nil, err
}
var progress jobspb.Progress
if err := protoutil.Unmarshal(progressBytes, &progress); err != nil {
return nil, err
}
var highwaterTime time.Time
if decimalHighWater != nil {
var err error
highwaterTimeHLC, err := hlc.DecimalToHLC(decimalHighWater)
if err != nil {
return nil, err
}
highwaterTime = highwaterTimeHLC.GoTime()
highwater := progress.GetHighWater()
if highwater != nil {
highwaterTime = highwater.GoTime()
}
return &streamIngesitonJobInfo{
status: status,
errMsg: errMsg,
errMsg: payload.Error,
highwaterTime: highwaterTime,
finishedTime: time.UnixMicro(finished.GoTime().UnixMicro()),
finishedTime: time.UnixMicro(payload.FinishedMicros),
}, nil
}

Expand Down

0 comments on commit c70bc16

Please sign in to comment.