Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
86815: insights: report high contention time r=matthewtodd a=matthewtodd

This change aims to align with the logic for selecting transactions with
"high contention time"[^1] in the transaction insights UI, but in practice,
it's a best effort:

Because of the sampling nature of tracing and because a transaction with
"high" contention may be composed of many statements, each with only a
little contention, there's not a guarantee that a highly contented
transaction will be composed of highly contended statements.

But, on balance, the best effort is still valuable, and we will call out
actionable statements with high contention.

[^1]: Note that the protobuf enum name change is safe here as we haven't
  yet deployed this code.

Release justification: Category 2: Bug fixes and low-risk updates to new
functionality.

Release note: None

86846: storage: additional `TestMVCCHistories` cases for scan/get r=jbowens a=erikgrinaker

Touches #86655.

Release justification: non-production code changes

Release note: None

86862: acceptance: fix spurious docker test failure `unexpected extra event &{0 die} (after [])` r=rickystewart,renatolabs a=knz

Needed for #86049.

The root cause of the issue is that the docker container for the DB nodes were crashing, because the test framework **was incorrectly deleting the data/log directory before shutting down the nodes**.

However, this root cause was mostly hidden because of another mistake: the assertion that checks the health of the nodes was not doing its job properly in most cases. So it believed the nodes were healthy when in fact they were not.

This PR fixes both problems.

Informs #58955.
Informs #58951.
Informs #58218.

Release justification: non-production code changes

86865: ttl: remove unused const expiredRowsPerRange r=knz a=ecwall

fixes #86838

Release justification: Linter fix.
Release note: None

Co-authored-by: Matthew Todd <[email protected]>
Co-authored-by: Erik Grinaker <[email protected]>
Co-authored-by: Raphael 'kena' Poss <[email protected]>
Co-authored-by: Evan Wall <[email protected]>
  • Loading branch information
5 people committed Aug 25, 2022
5 parents b5e32df + 4d85218 + d0e02db + 9b4fd92 + dd89551 commit 6b3cd4b
Show file tree
Hide file tree
Showing 10 changed files with 733 additions and 35 deletions.
2 changes: 1 addition & 1 deletion pkg/acceptance/adapter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestDockerC(t *testing.T) {
}

func TestDockerCSharp(t *testing.T) {
skip.WithIssue(t, 58218, "flaky test")
skip.WithIssue(t, 86852, "test not to up to date anymore")
s := log.Scope(t)
defer s.Close(t)

Expand Down
6 changes: 3 additions & 3 deletions pkg/acceptance/cluster/docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,9 +287,9 @@ func (c *Container) Restart(ctx context.Context, timeout *time.Duration) error {
return nil
}

// Wait waits for a running container to exit.
func (c *Container) Wait(ctx context.Context, condition container.WaitCondition) error {
waitOKBodyCh, errCh := c.cluster.client.ContainerWait(ctx, c.id, condition)
// WaitUntilNotRunning waits for a running container to exit.
func (c *Container) WaitUntilNotRunning(ctx context.Context) error {
waitOKBodyCh, errCh := c.cluster.client.ContainerWait(ctx, c.id, container.WaitConditionNotRunning)
select {
case err := <-errCh:
return err
Expand Down
49 changes: 35 additions & 14 deletions pkg/acceptance/cluster/dockercluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ type DockerCluster struct {
stopper *stop.Stopper
monitorCtx context.Context
monitorCtxCancelFunc func()
monitorDone chan struct{}
clusterID string
networkID string
networkName string
Expand Down Expand Up @@ -191,7 +192,7 @@ func CreateDocker(

func (l *DockerCluster) expectEvent(c *Container, msgs ...string) {
for index, ctr := range l.Nodes {
if c.id != ctr.id {
if ctr.Container == nil || c.id != ctr.id {
continue
}
for _, status := range msgs {
Expand Down Expand Up @@ -237,7 +238,7 @@ func (l *DockerCluster) OneShot(
if err := l.oneshot.Start(ctx); err != nil {
return err
}
return l.oneshot.Wait(ctx, container.WaitConditionNotRunning)
return l.oneshot.WaitUntilNotRunning(ctx)
}

// stopOnPanic is invoked as a deferred function in Start in order to attempt
Expand Down Expand Up @@ -374,7 +375,7 @@ func (l *DockerCluster) initCluster(ctx context.Context) {
// and it'll get in the way of future runs.
l.vols = c
maybePanic(c.Start(ctx))
maybePanic(c.Wait(ctx, container.WaitConditionNotRunning))
maybePanic(c.WaitUntilNotRunning(ctx))
}

// cockroachEntryPoint returns the value to be used as
Expand Down Expand Up @@ -544,6 +545,7 @@ func (l *DockerCluster) processEvent(ctx context.Context, event events.Message)
// If there's currently a oneshot container, ignore any die messages from
// it because those are expected.
if l.oneshot != nil && event.ID == l.oneshot.id && event.Status == eventDie {
log.Infof(ctx, "Docker event was: the oneshot container terminated")
return true
}

Expand Down Expand Up @@ -585,7 +587,9 @@ func (l *DockerCluster) processEvent(ctx context.Context, event events.Message)
return false
}

func (l *DockerCluster) monitor(ctx context.Context) {
func (l *DockerCluster) monitor(ctx context.Context, monitorDone chan struct{}) {
defer close(monitorDone)

if log.V(1) {
log.Infof(ctx, "events monitor starts")
defer log.Infof(ctx, "events monitor exits")
Expand All @@ -603,6 +607,9 @@ func (l *DockerCluster) monitor(ctx context.Context) {
})
for {
select {
case <-l.monitorCtx.Done():
log.Infof(ctx, "monitor shutting down")
return false
case err := <-errq:
log.Infof(ctx, "event stream done, resetting...: %s", err)
// Sometimes we get a random string-wrapped EOF error back.
Expand Down Expand Up @@ -640,7 +647,8 @@ func (l *DockerCluster) Start(ctx context.Context) {

log.Infof(ctx, "starting %d nodes", len(l.Nodes))
l.monitorCtx, l.monitorCtxCancelFunc = context.WithCancel(context.Background())
go l.monitor(ctx)
l.monitorDone = make(chan struct{})
go l.monitor(ctx, l.monitorDone)
var wg sync.WaitGroup
wg.Add(len(l.Nodes))
for _, node := range l.Nodes {
Expand All @@ -661,7 +669,6 @@ func (l *DockerCluster) Start(ctx context.Context) {
// the cluster (restart, kill, ...). In the event of a mismatch, the passed
// Tester receives a fatal error.
func (l *DockerCluster) Assert(ctx context.Context, t testing.TB) {
const almostZero = 50 * time.Millisecond
filter := func(ch chan Event, wait time.Duration) *Event {
select {
case act := <-ch:
Expand All @@ -673,17 +680,28 @@ func (l *DockerCluster) Assert(ctx context.Context, t testing.TB) {

var events []Event
for {
// The expected event channel is buffered and should contain
// all expected events already.
const almostZero = 15 * time.Millisecond
exp := filter(l.expectedEvents, almostZero)
if exp == nil {
break
}
act := filter(l.events, 15*time.Second)
t.Logf("expecting event: %v", exp)
// l.events is connected to the docker controller and may
// receive events more slowly.
const waitForDockerEvent = 15 * time.Second
act := filter(l.events, waitForDockerEvent)
t.Logf("got event: %v", act)
if act == nil || *exp != *act {
t.Fatalf("expected event %v, got %v (after %v)", exp, act, events)
}
events = append(events, *exp)
}
if cur := filter(l.events, almostZero); cur != nil {
// At the end, we leave docker a bit more time to report a final event,
// if any.
const waitForLastDockerEvent = 1 * time.Second
if cur := filter(l.events, waitForLastDockerEvent); cur != nil {
t.Fatalf("unexpected extra event %v (after %v)", cur, events)
}
if log.V(2) {
Expand Down Expand Up @@ -713,13 +731,9 @@ func (l *DockerCluster) stop(ctx context.Context) {
if l.monitorCtxCancelFunc != nil {
l.monitorCtxCancelFunc()
l.monitorCtxCancelFunc = nil
<-l.monitorDone
}

if l.vols != nil {
maybePanic(l.vols.Kill(ctx))
maybePanic(l.vols.Remove(ctx))
l.vols = nil
}
for i, n := range l.Nodes {
if n.Container == nil {
continue
Expand All @@ -742,8 +756,14 @@ func (l *DockerCluster) stop(ctx context.Context) {
log.Infof(ctx, "~~~ node %d CRASHED ~~~~", i)
}
maybePanic(n.Remove(ctx))
n.Container = nil
}

if l.vols != nil {
maybePanic(l.vols.Kill(ctx))
maybePanic(l.vols.Remove(ctx))
l.vols = nil
}
l.Nodes = nil

if l.networkID != "" {
maybePanic(
Expand Down Expand Up @@ -878,6 +898,7 @@ func (l *DockerCluster) Cleanup(ctx context.Context, preserveLogs bool) {
}
for _, v := range volumes {
if preserveLogs && v.Name() == "logs" {
log.Infof(ctx, "preserving log directory: %s", l.volumesDir)
continue
}
if err := os.RemoveAll(filepath.Join(l.volumesDir, v.Name())); err != nil {
Expand Down
52 changes: 37 additions & 15 deletions pkg/acceptance/util_docker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/build/bazel"
"github.com/cockroachdb/cockroach/pkg/security/username"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/containerd/containerd/platforms"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
Expand Down Expand Up @@ -69,19 +70,6 @@ func testDocker(
) error {
var err error
RunDocker(t, func(t *testing.T) {
cfg := cluster.TestConfig{
Name: name,
Duration: *flagDuration,
}
for i := 0; i < num; i++ {
cfg.Nodes = append(cfg.Nodes, cluster.NodeConfig{Stores: []cluster.StoreConfig{{}}})
}
l := StartCluster(ctx, t, cfg).(*cluster.DockerCluster)
defer l.AssertAndStop(ctx, t)

if len(l.Nodes) > 0 {
containerConfig.Env = append(containerConfig.Env, "PGHOST="+l.Hostname(0))
}
var pwd string
pwd, err = os.Getwd()
if err != nil {
Expand Down Expand Up @@ -121,12 +109,46 @@ func testDocker(
}()
hostConfig.Binds = append(hostConfig.Binds, interactivetestsDir+":/mnt/interactive_tests")
}

// Prepare the docker cluster.
// We need to do this "under" the directory preparation above so as
// to prevent the test from crashing because the directory gets
// deleted before the container shutdown assertions get a chance to run.
cfg := cluster.TestConfig{
Name: name,
Duration: *flagDuration,
}
for i := 0; i < num; i++ {
cfg.Nodes = append(cfg.Nodes, cluster.NodeConfig{Stores: []cluster.StoreConfig{{}}})
}
l := StartCluster(ctx, t, cfg).(*cluster.DockerCluster)

var preserveLogs bool
defer func() {
// Check the final health of the cluster nodes and
// stop the cluster after that.
l.AssertAndStop(ctx, t)

// Note: we must be careful to clean up the volumes *after*
// the cluster has been shut down (in the `AssertAndStop` call).
// Otherwise, the directory removal will cause the cluster nodes
// to crash and report abnormal termination, even when the test
// succeeds otherwise.
log.Infof(ctx, "cleaning up docker volume")
l.Cleanup(ctx, preserveLogs)
}()

if len(l.Nodes) > 0 {
containerConfig.Env = append(containerConfig.Env, "PGHOST="+l.Hostname(0))
}

log.Infof(ctx, "starting one-shot container")
err = l.OneShot(
ctx, acceptanceImage, types.ImagePullOptions{}, containerConfig, hostConfig,
platforms.DefaultSpec(), "docker-"+name,
)
preserveLogs := err != nil
l.Cleanup(ctx, preserveLogs)
log.Infof(ctx, "one-shot container terminated: %v", err)
preserveLogs = err != nil
})
return err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/sqlstats/insights/insights.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ enum Problem {
SuboptimalPlan = 2;

// This statement was slow because of contention.
HighWaitTime = 3;
HighContentionTime = 3;

// This statement was slow because of being retried multiple times, again due
// to contention. The "high" threshold may be configured by the
Expand Down
4 changes: 4 additions & 0 deletions pkg/sql/sqlstats/insights/problems.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (p *problems) examine(stmt *Statement) (result []Problem) {
result = append(result, Problem_SuboptimalPlan)
}

if stmt.Contention != nil && *stmt.Contention >= LatencyThreshold.Get(&p.st.SV) {
result = append(result, Problem_HighContentionTime)
}

if stmt.Retries >= HighRetryCountThreshold.Get(&p.st.SV) {
result = append(result, Problem_HighRetryCount)
}
Expand Down
9 changes: 9 additions & 0 deletions pkg/sql/sqlstats/insights/problems_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package insights
import (
"context"
"testing"
"time"

"github.com/cockroachdb/cockroach/pkg/settings/cluster"
"github.com/stretchr/testify/require"
Expand All @@ -22,8 +23,11 @@ func TestProblems(t *testing.T) {
ctx := context.Background()
st := cluster.MakeTestingClusterSettings()
p := &problems{st: st}
LatencyThreshold.Override(ctx, &st.SV, 100*time.Millisecond)
HighRetryCountThreshold.Override(ctx, &st.SV, 10)

var latencyThreshold = LatencyThreshold.Get(&st.SV)

testCases := []struct {
name string
statement *Statement
Expand All @@ -39,6 +43,11 @@ func TestProblems(t *testing.T) {
statement: &Statement{IndexRecommendations: []string{"THIS IS AN INDEX RECOMMENDATION"}},
problems: []Problem{Problem_SuboptimalPlan},
},
{
name: "high contention time",
statement: &Statement{Contention: &latencyThreshold},
problems: []Problem{Problem_HighContentionTime},
},
{
name: "high retry count",
statement: &Statement{Retries: 10},
Expand Down
1 change: 0 additions & 1 deletion pkg/sql/ttl/ttljob/ttljob_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,7 +420,6 @@ func TestRowLevelTTLJobMultipleNodes(t *testing.T) {
tableName,
)
const rowsPerRange = 10
const expiredRowsPerRange = rowsPerRange / 2
splitPoints := make([]serverutils.SplitPoint, len(splitAts))
for i, splitAt := range splitAts {
newLeaseHolderIdx := (leaseHolderIdx + 1 + i) % numNodes
Expand Down
Loading

0 comments on commit 6b3cd4b

Please sign in to comment.