Skip to content

Commit

Permalink
zipper: add ieNotBoundToTxn to InternalInflightTraceZipper
Browse files Browse the repository at this point in the history
Release note: None
  • Loading branch information
ZhouXing19 committed Nov 15, 2022
1 parent 298e301 commit a00f7f5
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
13 changes: 8 additions & 5 deletions pkg/jobs/adopt.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,11 +165,13 @@ func getProcessQuery(
}

// processClaimedJobs processes all jobs currently claimed by the registry.
func (r *Registry) processClaimedJobs(ctx context.Context, s sqlliveness.Session) error {
func (r *Registry) processClaimedJobs(
ctx context.Context, s sqlliveness.Session, ieNotBoundToTxn sqlutil.InternalExecutor,
) error {
query, args := getProcessQuery(ctx, s, r)

it, err := r.ex.QueryIteratorEx(
ctx, "select-running/get-claimed-jobs", nil,
it, err := ieNotBoundToTxn.QueryIteratorEx(
ctx, "select-running/get-claimed-jobs", nil, /* txn */
sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, query, args...,
)
if err != nil {
Expand Down Expand Up @@ -247,8 +249,9 @@ func (r *Registry) resumeJob(ctx context.Context, jobID jobspb.JobID, s sqlliven
resumeQuery := resumeQueryWithBackoff
args := []interface{}{jobID, s.ID().UnsafeBytes(),
r.clock.Now().GoTime(), r.RetryInitialDelay(), r.RetryMaxDelay()}
row, err := r.ex.QueryRowEx(
ctx, "get-job-row", nil,
ieNotBoundToTxn := r.internalExecutorFactory.MakeInternalExecutorWithoutTxn()
row, err := ieNotBoundToTxn.QueryRowEx(
ctx, "get-job-row", nil, /* txn */
sessiondata.InternalExecutorOverride{User: username.NodeUserName()}, resumeQuery, args...,
)
if err != nil {
Expand Down
16 changes: 9 additions & 7 deletions pkg/jobs/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -742,14 +742,14 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
// removeClaimsFromDeadSessions queries the jobs table for non-terminal
// jobs and nullifies their claims if the claims are owned by known dead sessions.
removeClaimsFromDeadSessions := func(ctx context.Context, s sqlliveness.Session) {
if err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := r.internalExecutorFactory.TxnWithExecutor(ctx, r.db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
// Run the expiration transaction at low priority to ensure that it does
// not contend with foreground reads. Note that the adoption and cancellation
// queries also use low priority so they will interact nicely.
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
return errors.WithAssertionFailure(err)
}
_, err := r.ex.ExecEx(
_, err := ie.ExecEx(
ctx, "expire-sessions", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
removeClaimsForDeadSessionsQuery,
Expand Down Expand Up @@ -792,14 +792,14 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
// removeClaimsFromJobs queries the jobs table for non-terminal jobs and
// nullifies their claims if the claims are owned by the current session.
removeClaimsFromSession := func(ctx context.Context, s sqlliveness.Session) {
if err := r.db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
if err := r.internalExecutorFactory.TxnWithExecutor(ctx, r.db, nil /* sessionData */, func(ctx context.Context, txn *kv.Txn, ie sqlutil.InternalExecutor) error {
// Run the expiration transaction at low priority to ensure that it does
// not contend with foreground reads. Note that the adoption and cancellation
// queries also use low priority so they will interact nicely.
if err := txn.SetUserPriority(roachpb.MinUserPriority); err != nil {
return errors.WithAssertionFailure(err)
}
_, err := r.ex.ExecEx(
_, err := ie.ExecEx(
ctx, "remove-claims-for-session", txn,
sessiondata.InternalExecutorOverride{User: username.RootUserName()},
removeClaimsForSessionQuery, s.ID().UnsafeBytes(),
Expand All @@ -825,7 +825,8 @@ func (r *Registry) Start(ctx context.Context, stopper *stop.Stopper) error {
r.cancelAllAdoptedJobs()
return
}
if err := r.processClaimedJobs(ctx, s); err != nil {
ieNotBoundToTxn := r.internalExecutorFactory.MakeInternalExecutorWithoutTxn()
if err := r.processClaimedJobs(ctx, s, ieNotBoundToTxn); err != nil {
log.Errorf(ctx, "error processing claimed jobs: %s", err)
}
})
Expand Down Expand Up @@ -953,7 +954,8 @@ const expiredJobsQuery = "SELECT id, payload, status, created FROM system.jobs "
func (r *Registry) cleanupOldJobsPage(
ctx context.Context, olderThan time.Time, minID jobspb.JobID, pageSize int,
) (done bool, maxID jobspb.JobID, retErr error) {
it, err := r.ex.QueryIterator(ctx, "gc-jobs", nil /* txn */, expiredJobsQuery, olderThan, minID, pageSize)
ieNotBoundToTxn := r.internalExecutorFactory.MakeInternalExecutorWithoutTxn()
it, err := ieNotBoundToTxn.QueryIterator(ctx, "gc-jobs", nil /* txn */, expiredJobsQuery, olderThan, minID, pageSize)
if err != nil {
return false, 0, err
}
Expand Down Expand Up @@ -993,7 +995,7 @@ func (r *Registry) cleanupOldJobsPage(
log.VEventf(ctx, 2, "attempting to clean up %d expired job records", len(toDelete.Array))
const stmt = `DELETE FROM system.jobs WHERE id = ANY($1)`
var nDeleted int
if nDeleted, err = r.ex.Exec(
if nDeleted, err = ieNotBoundToTxn.Exec(
ctx, "gc-jobs", nil /* txn */, stmt, toDelete,
); err != nil {
log.Warningf(ctx, "error cleaning up %d jobs: %v", len(toDelete.Array), err)
Expand Down
6 changes: 3 additions & 3 deletions pkg/util/tracing/zipper/zipper.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ type InflightTraceZipper interface {
type InternalInflightTraceZipper struct {
traceStrBuf *bytes.Buffer
nodeTraceCollection *tracingpb.TraceCollection
ie sqlutil.InternalExecutor
ieNotBoundToTxn sqlutil.InternalExecutor
z *memzipper.Zipper
}

Expand All @@ -78,7 +78,7 @@ func (i *InternalInflightTraceZipper) getZipper() *memzipper.Zipper {
func (i *InternalInflightTraceZipper) Zip(
ctx context.Context, traceID int64,
) (zipBytes []byte, retErr error) {
it, err := i.ie.QueryIterator(ctx, "internal-zipper", nil, fmt.Sprintf(inflightTracesQuery, traceID))
it, err := i.ieNotBoundToTxn.QueryIterator(ctx, "internal-zipper", nil, fmt.Sprintf(inflightTracesQuery, traceID))
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -192,7 +192,7 @@ func MakeInternalExecutorInflightTraceZipper(
t := &InternalInflightTraceZipper{
traceStrBuf: &bytes.Buffer{},
nodeTraceCollection: nil,
ie: ie,
ieNotBoundToTxn: ie,
}
t.z = &memzipper.Zipper{}
t.z.Init()
Expand Down

0 comments on commit a00f7f5

Please sign in to comment.