Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
118907: build: update patch with bug fix to runtime grunning r=sumeerbhola,rickystewart a=aadityasondhi

In our previous version of this patch, we missed two entry points and one exit point into the grunning state of a Goroutine. This led to `grunning.Time()` being non-monotonic.

This new patch adds those missing integration points.

Fixes #95529.

Release note: None

119356: jobs: remove double-load from metrics poller r=msbutler a=stevendanna

All but the last commit is #119355 

Previously, this code loaded the job twice, once via LoadJobWithTxn
and again via the call to Unpaused. Here, we re-arrange the code so
that it only loads it once.

Epic: none
Release note: None

119480: opt: reduce allocations for filter implication with disjunctions r=mgartner a=mgartner

This commit reduces allocations in `Implicator.FiltersImplyPredicate` by
using stack-allocated arrays when there are five or less adjacent
disjunctions in the filter or partial index predicate. This should cover
the most common cases of filters and predicates with disjunctions.

Epic: None

Release note: None


119519: ttljob: skip TestSpanToQueryBoundsCompositeKeys under stress r=fqazi a=annrpom

This patch skips TestSpanToQueryBoundsCompositeKeys under stress/stressrace as it often hits OOM issues under these conditions.

Epic: none

Release note: None

Co-authored-by: Aaditya Sondhi <[email protected]>
Co-authored-by: Steven Danna <[email protected]>
Co-authored-by: Marcus Gartner <[email protected]>
Co-authored-by: Annie Pompa <[email protected]>
  • Loading branch information
5 people committed Feb 22, 2024
5 parents 0232a67 + 6af173f + faeda85 + 4198fcd + eabc7c2 commit 9e52705
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 61 deletions.
16 changes: 8 additions & 8 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ load(
go_download_sdk(
name = "go_sdk",
sdks = {
"darwin_amd64": ("go1.21.5.darwin-amd64.tar.gz", "9a9af12fddf7315db68ceeb2980d9cecbf058d50356787bc1680417d9c5892eb"),
"darwin_arm64": ("go1.21.5.darwin-arm64.tar.gz", "7a5c1f1ca0831980ea06e7383a7e56af7665f8863bc923e19fd050c9fd4866a9"),
"linux_amd64": ("go1.21.5.linux-amd64.tar.gz", "7656c08487ea9d8817047be5698bf96a787c195e237d62c2d605ad754c55022d"),
"linux_arm64": ("go1.21.5.linux-arm64.tar.gz", "5ca30901c990538e21a8adf4a0945b013ecae34cfbfc3fbac893ac24c2c33c0c"),
"windows_amd64": ("go1.21.5.windows-amd64.tar.gz", "f32ad2e23031dc7e43db29dce7bf0cb1bba095760f96972ace094159f6bd2986"),
"darwin_amd64": ("go1.21.5.darwin-amd64.tar.gz", "182c3e597111d474b29d72fbfee1ddb2915acd39b7c3e534a78eaddfa767089d"),
"darwin_arm64": ("go1.21.5.darwin-arm64.tar.gz", "1894e6f86814cd882c51bb5a73cf1312551c734bd4ac9fb33866df2a50e1d1a0"),
"linux_amd64": ("go1.21.5.linux-amd64.tar.gz", "ff04d463bf9da03cfe6aefeff6039312c701f74ab291162b937b31026f262228"),
"linux_arm64": ("go1.21.5.linux-arm64.tar.gz", "9a09561064580e837af46bc314536b450bd2546985955ddf1e70699a4abe2fb6"),
"windows_amd64": ("go1.21.5.windows-amd64.tar.gz", "93375cd3e51b811297a52a0e7a9d98b126da3ae02daaabb5d423e1c496275f06"),
},
urls = ["https://storage.googleapis.com/public-bazel-artifacts/go/20240201-224012/{}"],
urls = ["https://storage.googleapis.com/public-bazel-artifacts/go/20240220-195058/{}"],
version = "1.21.5",
)

Expand Down Expand Up @@ -659,8 +659,8 @@ go_download_sdk(
# able to provide additional diagnostic information such as the expected version of OpenSSL.
experiments = ["boringcrypto"],
sdks = {
"linux_amd64": ("go1.21.5fips.linux-amd64.tar.gz", "33bc9129c5a9ebc3a5f6deb3e651f05216a035479404c849d443d0930aec2b6a"),
"linux_amd64": ("go1.21.5fips.linux-amd64.tar.gz", "68fefc15b4328244fe78f1269e8856a2a2cd52f1fb9f451bb8bc0c23d590fe14"),
},
urls = ["https://storage.googleapis.com/public-bazel-artifacts/go/20240201-224012/{}"],
urls = ["https://storage.googleapis.com/public-bazel-artifacts/go/20240220-195058/{}"],
version = "1.21.5fips",
)
12 changes: 6 additions & 6 deletions build/bazelutil/distdir_files.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -1202,12 +1202,12 @@ DISTDIR_FILES = {
"https://storage.googleapis.com/public-bazel-artifacts/c-deps/20230718-202534/libproj_foreign.macos.20230718-202534.tar.gz": "96771a33542beb72067afcafaeb790134014e56798fa4cbe291894c4ebf8b68d",
"https://storage.googleapis.com/public-bazel-artifacts/c-deps/20230718-202534/libproj_foreign.macosarm.20230718-202534.tar.gz": "b2c60ffe1f50c6e81ba906f773b95d3a6699538d57e71749579552f4211a1e3e",
"https://storage.googleapis.com/public-bazel-artifacts/c-deps/20230718-202534/libproj_foreign.windows.20230718-202534.tar.gz": "16de1e76ee8de4bd144dc57bfde05385d086943ca1b64cc246055c8b0cd71c65",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240201-224012/go1.21.5.darwin-amd64.tar.gz": "9a9af12fddf7315db68ceeb2980d9cecbf058d50356787bc1680417d9c5892eb",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240201-224012/go1.21.5.darwin-arm64.tar.gz": "7a5c1f1ca0831980ea06e7383a7e56af7665f8863bc923e19fd050c9fd4866a9",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240201-224012/go1.21.5.linux-amd64.tar.gz": "7656c08487ea9d8817047be5698bf96a787c195e237d62c2d605ad754c55022d",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240201-224012/go1.21.5.linux-arm64.tar.gz": "5ca30901c990538e21a8adf4a0945b013ecae34cfbfc3fbac893ac24c2c33c0c",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240201-224012/go1.21.5.windows-amd64.tar.gz": "f32ad2e23031dc7e43db29dce7bf0cb1bba095760f96972ace094159f6bd2986",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240201-224012/go1.21.5fips.linux-amd64.tar.gz": "33bc9129c5a9ebc3a5f6deb3e651f05216a035479404c849d443d0930aec2b6a",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240220-195058/go1.21.5.darwin-amd64.tar.gz": "182c3e597111d474b29d72fbfee1ddb2915acd39b7c3e534a78eaddfa767089d",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240220-195058/go1.21.5.darwin-arm64.tar.gz": "1894e6f86814cd882c51bb5a73cf1312551c734bd4ac9fb33866df2a50e1d1a0",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240220-195058/go1.21.5.linux-amd64.tar.gz": "ff04d463bf9da03cfe6aefeff6039312c701f74ab291162b937b31026f262228",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240220-195058/go1.21.5.linux-arm64.tar.gz": "9a09561064580e837af46bc314536b450bd2546985955ddf1e70699a4abe2fb6",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240220-195058/go1.21.5.windows-amd64.tar.gz": "93375cd3e51b811297a52a0e7a9d98b126da3ae02daaabb5d423e1c496275f06",
"https://storage.googleapis.com/public-bazel-artifacts/go/20240220-195058/go1.21.5fips.linux-amd64.tar.gz": "68fefc15b4328244fe78f1269e8856a2a2cd52f1fb9f451bb8bc0c23d590fe14",
"https://storage.googleapis.com/public-bazel-artifacts/java/railroad/rr-1.63-java8.zip": "d2791cd7a44ea5be862f33f5a9b3d40aaad9858455828ebade7007ad7113fb41",
"https://storage.googleapis.com/public-bazel-artifacts/js/rules_jest-v0.18.4.tar.gz": "d3bb833f74b8ad054e6bff5e41606ff10a62880cc99e4d480f4bdfa70add1ba7",
"https://storage.googleapis.com/public-bazel-artifacts/js/rules_js-v1.26.1.tar.gz": "08061ba5e5e7f4b1074538323576dac819f9337a0c7d75aee43afc8ae7cb6e18",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,10 @@ index 26dcf0bd52..01b84268db 100644
error at each collection, summarizing the amount of memory collected and the
length of the pause. The format of this line is subject to change. Included in
diff --git a/src/runtime/malloc.go b/src/runtime/malloc.go
index b2026ad0dc..6db0ca52f9 100644
index 44479cc2be..a24ce64a24 100644
--- a/src/runtime/malloc.go
+++ b/src/runtime/malloc.go
@@ -1274,7 +1274,7 @@ func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
@@ -1270,7 +1270,7 @@ func mallocgc(size uintptr, typ *_type, needzero bool) unsafe.Pointer {
// Returns the G for which the assist credit was accounted.
func deductAssistCredit(size uintptr) *g {
var assistG *g
Expand All @@ -272,10 +272,34 @@ index b2026ad0dc..6db0ca52f9 100644
assistG = getg()
if assistG.m.curg != nil {
diff --git a/src/runtime/proc.go b/src/runtime/proc.go
index afb33c1e8b..ccdaaab3e8 100644
index afb33c1e8b..390b99be13 100644
--- a/src/runtime/proc.go
+++ b/src/runtime/proc.go
@@ -1073,7 +1073,18 @@ func casgstatus(gp *g, oldval, newval uint32) {
@@ -1004,6 +1004,11 @@ func casfrom_Gscanstatus(gp *g, oldval, newval uint32) {
dumpgstatus(gp)
throw("casfrom_Gscanstatus: gp->status is not in scan state")
}
+ // We're transitioning into the running state, record the timestamp for
+ // subsequent use.
+ if newval == _Grunning {
+ gp.lastsched = nanotime()
+ }
releaseLockRank(lockRankGscan)
}

@@ -1019,6 +1024,11 @@ func castogscanstatus(gp *g, oldval, newval uint32) bool {
r := gp.atomicstatus.CompareAndSwap(oldval, newval)
if r {
acquireLockRank(lockRankGscan)
+ // We're transitioning out of running, record how long we were in the
+ // state.
+ if oldval == _Grunning {
+ gp.runningnanos += nanotime() - gp.lastsched
+ }
}
return r

@@ -1073,7 +1083,18 @@ func casgstatus(gp *g, oldval, newval uint32) {
}
}

Expand All @@ -294,23 +318,23 @@ index afb33c1e8b..ccdaaab3e8 100644
// Track every gTrackingPeriod time a goroutine transitions out of running.
if casgstatusAlwaysTrack || gp.trackingSeq%gTrackingPeriod == 0 {
gp.tracking = true
@@ -1094,7 +1105,6 @@ func casgstatus(gp *g, oldval, newval uint32) {
@@ -1094,7 +1115,6 @@ func casgstatus(gp *g, oldval, newval uint32) {
// We transitioned out of runnable, so measure how much
// time we spent in this state and add it to
// runnableTime.
- now := nanotime()
gp.runnableTime += now - gp.trackingStamp
gp.trackingStamp = 0
case _Gwaiting:
@@ -1107,7 +1117,6 @@ func casgstatus(gp *g, oldval, newval uint32) {
@@ -1107,7 +1127,6 @@ func casgstatus(gp *g, oldval, newval uint32) {
// a more representative estimate of the absolute value.
// gTrackingPeriod also represents an accurate sampling period
// because we can only enter this state from _Grunning.
- now := nanotime()
sched.totalMutexWaitTime.Add((now - gp.trackingStamp) * gTrackingPeriod)
gp.trackingStamp = 0
}
@@ -1118,12 +1127,10 @@ func casgstatus(gp *g, oldval, newval uint32) {
@@ -1118,12 +1137,10 @@ func casgstatus(gp *g, oldval, newval uint32) {
break
}
// Blocking on a lock. Write down the timestamp.
Expand All @@ -323,7 +347,17 @@ index afb33c1e8b..ccdaaab3e8 100644
gp.trackingStamp = now
case _Grunning:
// We're transitioning into running, so turn off
@@ -3646,6 +3653,14 @@ func dropg() {
@@ -1174,6 +1191,9 @@ func casGToPreemptScan(gp *g, old, new uint32) {
acquireLockRank(lockRankGscan)
for !gp.atomicstatus.CompareAndSwap(_Grunning, _Gscan|_Gpreempted) {
}
+ // We're transitioning out of running, record how long we were in the
+ // state.
+ gp.runningnanos += nanotime() - gp.lastsched
}

// casGFromPreempted attempts to transition gp from _Gpreempted to
@@ -3646,6 +3666,14 @@ func dropg() {
setGNoWB(&gp.m.curg, nil)
}

Expand All @@ -338,7 +372,7 @@ index afb33c1e8b..ccdaaab3e8 100644
// checkTimers runs any timers for the P that are ready.
// If now is not 0 it is the current time.
// It returns the passed time or the current time if now was passed as 0.
@@ -3880,6 +3895,8 @@ func goexit0(gp *g) {
@@ -3880,6 +3908,8 @@ func goexit0(gp *g) {
gp.param = nil
gp.labels = nil
gp.timer = nil
Expand Down
68 changes: 38 additions & 30 deletions pkg/jobs/metricspoller/job_statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
const pausedJobsCountQuery = string(`
SELECT job_type, count(*)
FROM system.jobs
WHERE status = '` + jobs.StatusPaused + `'
WHERE status = '` + jobs.StatusPaused + `'
GROUP BY job_type`)

// updatePausedMetrics counts the number of paused jobs per job type.
Expand Down Expand Up @@ -163,42 +163,50 @@ func processJobPTSRecord(
ptsStats map[jobspb.Type]*ptsStat,
txn isql.Txn,
) error {
j, err := execCfg.JobRegistry.LoadJobWithTxn(ctx, jobspb.JobID(jobID), txn)
var stats *ptsStat
defer func() {
if stats != nil {
stats.numRecords++
if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) {
stats.oldest = rec.Timestamp
}
}
}()

err := execCfg.JobRegistry.UpdateJobWithTxn(ctx, jobspb.JobID(jobID), txn,
func(txn isql.Txn, md jobs.JobMetadata, ju *jobs.JobUpdater) error {
p := md.Payload
jobType, err := p.CheckType()
if err != nil {
return err
}
stats = ptsStats[jobType]
if stats == nil {
stats = &ptsStat{}
ptsStats[jobType] = stats
}

// If MaximumPTSAge is set on the job payload, verify if PTS record
// timestamp is fresh enough.
if p.MaximumPTSAge > 0 &&
rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) {
stats.expired++
ptsExpired := errors.Newf(
"protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s",
rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge)
log.Warningf(ctx, "job %d canceled due to %s", jobID, ptsExpired)
return ju.CancelRequestedWithReason(ctx, md, ptsExpired)
}
return nil
})
if err != nil {
if jobs.HasJobNotFoundError(err) {
return nil // nolint:returnerrcheck -- job maybe deleted when we run; just keep going.
}
return err
}
p := j.Payload()
jobType, err := p.CheckType()
if err != nil {
return err
}
stats := ptsStats[jobType]
if stats == nil {
stats = &ptsStat{}
ptsStats[jobType] = stats
}
stats.numRecords++
if stats.oldest.IsEmpty() || rec.Timestamp.Less(stats.oldest) {
stats.oldest = rec.Timestamp
}

// If MaximumPTSAge is set on the job payload, verify if PTS record
// timestamp is fresh enough.
if p.MaximumPTSAge > 0 &&
rec.Timestamp.GoTime().Add(p.MaximumPTSAge).Before(timeutil.Now()) {
stats.expired++
ptsExpired := errors.Newf(
"protected timestamp records %s as of %s (age %s) exceeds job configured limit of %s",
rec.ID, rec.Timestamp, timeutil.Since(rec.Timestamp.GoTime()), p.MaximumPTSAge)
if err := j.WithTxn(txn).CancelRequestedWithReason(ctx, ptsExpired); err != nil {
return err
}
log.Warningf(ctx, "job %d canceled due to %s", jobID, ptsExpired)
}
return nil

}

func updateJobPTSMetrics(
Expand Down
14 changes: 6 additions & 8 deletions pkg/sql/opt/partialidx/implicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -403,8 +403,9 @@ func (im *Implicator) orExprImpliesPredicate(e *memo.OrExpr, pred opt.ScalarExpr
//
// We must flatten all adjacent ORs in order to handle cases such as:
// (a OR b) => ((a OR b) OR c)
eFlat := flattenOrExpr(e)
predFlat := flattenOrExpr(pt)
var eScratch, predScratch [5]opt.ScalarExpr
eFlat := flattenOrExpr(e, eScratch[:0])
predFlat := flattenOrExpr(pt, predScratch[:0])
for i := range eFlat {
eChildImpliesAnyPredChild := false
for j := range predFlat {
Expand Down Expand Up @@ -777,8 +778,8 @@ func (im *Implicator) simplifyScalarExpr(e opt.ScalarExpr, exactMatches exprSet)
}
}

// flattenOrExpr returns a list of ScalarExprs that are all adjacent via
// disjunctions to the input OrExpr.
// flattenOrExpr appends all ScalarExprs that are adjacent to the input OrExpr
// via disjunctions to the "ors" slice, and returns the updated slice.
//
// For example, the input:
//
Expand All @@ -787,9 +788,7 @@ func (im *Implicator) simplifyScalarExpr(e opt.ScalarExpr, exactMatches exprSet)
// Results in:
//
// [a, (b AND c), d, e]
func flattenOrExpr(or *memo.OrExpr) []opt.ScalarExpr {
ors := make([]opt.ScalarExpr, 0, 2)

func flattenOrExpr(or *memo.OrExpr, ors []opt.ScalarExpr) []opt.ScalarExpr {
var collect func(e opt.ScalarExpr)
collect = func(e opt.ScalarExpr) {
if and, ok := e.(*memo.OrExpr); ok {
Expand All @@ -800,7 +799,6 @@ func flattenOrExpr(or *memo.OrExpr) []opt.ScalarExpr {
}
}
collect(or)

return ors
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/ttl/ttljob/ttljob_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/sql/ttl/ttljob"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/skip"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
Expand Down Expand Up @@ -224,6 +225,9 @@ func TestSpanToQueryBoundsCompositeKeys(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

skip.UnderStress(t)
skip.UnderStressRace(t)

testCases := []struct {
desc string
// tablePKValues are PK values initially inserted into the table.
Expand Down

0 comments on commit 9e52705

Please sign in to comment.