Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
56130: opt: updated normalization rules for folding is expressions with null predicate r=mgartner a=jayshrivastava

opt: updated normalization rules for folding is expressions with null predicate

Previously, for statements such as `SELECT (foo,bar) IS DISTINCT FROM NULL FROM a_table`,
the expression `(foo,bar) IS DISTINCT FROM NULL` would not be normalized to `true`.
Similarly, if `IS NOT DISTINCT FROM NULL` were used, then the expression would not be
normalized to `false`. The previous statement would only normalize if the tuple/array in
the statement contained only constants. Given the updates in this commit, normalization
will be applied when any arrays or tuples are provided in this situation.

Release note: None

56217: sql: clean up uses of Statement r=RaduBerinde a=RaduBerinde

This change cleans up the use of `sql.Statement` and reduces some
allocations. Specifically:

 - we create a  `Statement` lower in the stack (in `execStmtInOpenState`),
   and pass only what we need in the higher layers;

 - we change various functions to take a `tree.Statement` rather than
   an entire `Statement` when possible;

 - we move down the `withStatement` context allocation, so that it is
   avoided in the implicit transaction state transition;

 - we store a copy rather than a pointer to the Statement in the
   planner;

 - we avoid directly using `stmt` fields from `func()` declarations
   that escape;

 - we populate `Statement.AnonymizedStr` upfront. The anonymized
   string is always needed (to update statement stats).

```
name                               old time/op    new time/op    delta
EndToEnd/kv-read/EndToEnd             153µs ± 1%     154µs ± 2%    ~     (p=0.486 n=4+4)
EndToEnd/kv-read-no-prep/EndToEnd     216µs ± 1%     217µs ± 1%    ~     (p=0.886 n=4+4)
EndToEnd/kv-read-const/EndToEnd       111µs ± 1%     113µs ± 1%  +1.01%  (p=0.029 n=4+4)

name                               old alloc/op   new alloc/op   delta
EndToEnd/kv-read/EndToEnd            25.8kB ± 1%    25.5kB ± 1%    ~     (p=0.114 n=4+4)
EndToEnd/kv-read-no-prep/EndToEnd    32.2kB ± 1%    31.9kB ± 1%    ~     (p=0.686 n=4+4)
EndToEnd/kv-read-const/EndToEnd      21.0kB ± 1%    20.7kB ± 2%    ~     (p=0.200 n=4+4)

name                               old allocs/op  new allocs/op  delta
EndToEnd/kv-read/EndToEnd               252 ± 1%       250 ± 0%  -0.99%  (p=0.029 n=4+4)
EndToEnd/kv-read-no-prep/EndToEnd       332 ± 0%       330 ± 1%    ~     (p=0.229 n=4+4)
EndToEnd/kv-read-const/EndToEnd         214 ± 0%       212 ± 0%  -0.93%  (p=0.029 n=4+4)
```

Release note: None

56243: liveness: introduce GetLivenessesFromKV r=irfansharif a=irfansharif

Now that we always create a liveness record on start up (#53805), we can simply
fetch all records from KV when wanting an up-to-date view of all nodes that
have ever been a part of the cluster. We add a helper to do as much, which
we'll rely on when introducing long running migrations (#56107).

It's a bit unfortunate that we're further adding on to the liveness API without
changing the underlying look-aside cache structure, but the up-to-date records
from KV directly is the world we're hoping to start moving towards over time.
The TODO added in [1] outlines what the future holds.

We'll also expose the GetLivenessesFromKV API we introduced earlier to pkg/sql.
We'll rely on it when needing to plumb in the liveness instance into the
migration manager process (prototyped in #56107)

It should be noted that this will be a relatively meatier form of a dependency
on node liveness from pkg/sql than we have currently. Today the only uses are
in DistSQL planning and in jobs[2]. As it relates to our multi-tenancy work,
the real use of this API will happen only on the system tenant. System tenants
alone have the privilege to set cluster settings (or at least the version
setting specifically), which is what the migration manager will be wired into.

[1]: d631239
[2]: #48795

Release note: None

---

First commit is from #56221, and can be ignored here.


Co-authored-by: Jayant Shrivastava <[email protected]>
Co-authored-by: Radu Berinde <[email protected]>
Co-authored-by: irfan sharif <[email protected]>
  • Loading branch information
4 people committed Nov 3, 2020
4 parents 384f315 + 651b88c + fb101b0 + b2790ab commit 0653b92
Show file tree
Hide file tree
Showing 30 changed files with 428 additions and 207 deletions.
13 changes: 10 additions & 3 deletions pkg/jobs/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
package jobs

import (
"context"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/jobs/jobspb"
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
Expand All @@ -28,7 +30,7 @@ var FakeNodeID = func() *base.NodeIDContainer {
}()

// FakeNodeLiveness allows simulating liveness failures without the full
// storage.NodeLiveness machinery.
// liveness.NodeLiveness machinery.
type FakeNodeLiveness struct {
mu struct {
syncutil.Mutex
Expand Down Expand Up @@ -62,7 +64,7 @@ func NewFakeNodeLiveness(nodeCount int) *FakeNodeLiveness {
// ModuleTestingKnobs implements base.ModuleTestingKnobs.
func (*FakeNodeLiveness) ModuleTestingKnobs() {}

// Self implements the implicit storage.NodeLiveness interface. It uses NodeID
// Self implements the implicit liveness.NodeLiveness interface. It uses NodeID
// as the node ID. On every call, a nonblocking send is performed over nl.ch to
// allow tests to execute a callback.
func (nl *FakeNodeLiveness) Self() (livenesspb.Liveness, bool) {
Expand All @@ -75,7 +77,7 @@ func (nl *FakeNodeLiveness) Self() (livenesspb.Liveness, bool) {
return *nl.mu.livenessMap[FakeNodeID.Get()], true
}

// GetLivenesses implements the implicit storage.NodeLiveness interface.
// GetLivenesses implements the implicit liveness.NodeLiveness interface.
func (nl *FakeNodeLiveness) GetLivenesses() (out []livenesspb.Liveness) {
select {
case nl.GetLivenessesCalledCh <- struct{}{}:
Expand All @@ -89,6 +91,11 @@ func (nl *FakeNodeLiveness) GetLivenesses() (out []livenesspb.Liveness) {
return out
}

// GetLivenessesFromKV implements the implicit liveness.NodeLiveness interface.
func (nl *FakeNodeLiveness) GetLivenessesFromKV(context.Context) ([]livenesspb.Liveness, error) {
return nil, errors.New("FakeNodeLiveness.GetLivenessesFromKV is unimplemented")
}

// IsLive is unimplemented.
func (nl *FakeNodeLiveness) IsLive(roachpb.NodeID) (bool, error) {
return false, errors.New("FakeNodeLiveness.IsLive is unimplemented")
Expand Down
55 changes: 55 additions & 0 deletions pkg/kv/kvserver/liveness/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ package liveness_test
import (
"context"
"fmt"
"sort"
"strings"
"testing"
"time"
Expand Down Expand Up @@ -83,6 +84,60 @@ func TestNodeLivenessAppearsAtStart(t *testing.T) {
}
}

// TestGetLivenessesFromKV verifies that fetching liveness records from KV
// directly retrieves all the records we expect.
func TestGetLivenessesFromKV(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx := context.Background()
tc := testcluster.StartTestCluster(t, 3, base.TestClusterArgs{})
defer tc.Stopper().Stop(ctx)

// At this point StartTestCluster has waited for all nodes to become live.

// Verify that each servers sees the same set of liveness records in KV.
for i := 0; i < tc.NumServers(); i++ {
nodeID := tc.Server(i).NodeID()
nl := tc.Server(i).NodeLiveness().(*liveness.NodeLiveness)

if live, err := nl.IsLive(nodeID); err != nil {
t.Fatal(err)
} else if !live {
t.Fatalf("node %d not live", nodeID)
}

livenesses, err := nl.GetLivenessesFromKV(ctx)
assert.Nil(t, err)
assert.Equal(t, len(livenesses), tc.NumServers())

var nodeIDs []roachpb.NodeID
for _, liveness := range livenesses {
nodeIDs = append(nodeIDs, liveness.NodeID)

// We expect epoch=1 as nodes first create a liveness record at epoch=0,
// and then increment it during their first heartbeat.
if liveness.Epoch != 1 {
t.Fatalf("expected epoch=1, got epoch=%d", liveness.Epoch)
}
if !liveness.Membership.Active() {
t.Fatalf("expected membership=active, got membership=%s", liveness.Membership)
}
}

sort.Slice(nodeIDs, func(i, j int) bool {
return nodeIDs[i] < nodeIDs[j]
})
for i := range nodeIDs {
expNodeID := roachpb.NodeID(i + 1) // Node IDs are 1-indexed.
if nodeIDs[i] != expNodeID {
t.Fatalf("expected nodeID=%d, got %d", expNodeID, nodeIDs[i])
}
}
}

}

func TestNodeLivenessStatusMap(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
Expand Down
34 changes: 34 additions & 0 deletions pkg/kv/kvserver/liveness/liveness.go
Original file line number Diff line number Diff line change
Expand Up @@ -999,6 +999,40 @@ func (nl *NodeLiveness) GetLivenesses() []livenesspb.Liveness {
return livenesses
}

// GetLivenessesFromKV returns a slice containing the liveness record of all
// nodes that have ever been a part of the cluster. The records are read from
// the KV layer in a KV transaction. This is in contrast to GetLivenesses above,
// which consults a (possibly stale) in-memory cache.
func (nl *NodeLiveness) GetLivenessesFromKV(ctx context.Context) ([]livenesspb.Liveness, error) {
kvs, err := nl.db.Scan(ctx, keys.NodeLivenessPrefix, keys.NodeLivenessKeyMax, 0)
if err != nil {
return nil, errors.Wrap(err, "unable to get liveness")
}

var results []livenesspb.Liveness
for _, kv := range kvs {
if kv.Value == nil {
return nil, errors.AssertionFailedf("missing liveness record")
}
var liveness livenesspb.Liveness
if err := kv.Value.GetProto(&liveness); err != nil {
return nil, errors.Wrap(err, "invalid liveness record")
}

livenessRec := Record{
Liveness: liveness,
raw: kv.Value.TagAndDataBytes(),
}

// Update our cache with the liveness record we just found.
nl.maybeUpdate(ctx, livenessRec)

results = append(results, liveness)
}

return results, nil
}

// GetLiveness returns the liveness record for the specified nodeID. If the
// liveness record is not found (due to gossip propagation delays or due to the
// node not existing), we surface that to the caller. The record returned also
Expand Down
22 changes: 10 additions & 12 deletions pkg/sql/app_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,7 +184,7 @@ func (a *appStats) recordStatement(

// Get the statistics object.
s, stmtID := a.getStatsForStmt(
stmt, implicitTxn,
stmt.AnonymizedStr, implicitTxn,
err, createIfNonExistent,
)

Expand Down Expand Up @@ -236,19 +236,14 @@ func (a *appStats) recordStatement(
// stat object is returned or not, we always return the correct stmtID
// for the given stmt.
func (a *appStats) getStatsForStmt(
stmt *Statement, implicitTxn bool, err error, createIfNonexistent bool,
anonymizedStmt string, implicitTxn bool, err error, createIfNonexistent bool,
) (*stmtStats, roachpb.StmtID) {
// Extend the statement key with various characteristics, so
// that we use separate buckets for the different situations.
key := stmtKey{
failed: err != nil,
implicitTxn: implicitTxn,
}
if stmt.AnonymizedStr != "" {
// Use the cached anonymized string.
key.anonymizedStmt = stmt.AnonymizedStr
} else {
key.anonymizedStmt = anonymizeStmt(stmt.AST)
anonymizedStmt: anonymizedStmt,
failed: err != nil,
implicitTxn: implicitTxn,
}

// We first try and see if we can get by without creating a new entry for this
Expand Down Expand Up @@ -362,6 +357,9 @@ func (a *appStats) Add(other *appStats) {
}

func anonymizeStmt(ast tree.Statement) string {
if ast == nil {
return ""
}
return tree.AsStringWithFlags(ast, tree.FmtHideConstants)
}

Expand Down Expand Up @@ -446,14 +444,14 @@ func (a *appStats) recordTransaction(
// sample logical plan for its corresponding fingerprint. We use
// `logicalPlanCollectionPeriod` to assess how frequently to sample logical
// plans.
func (a *appStats) shouldSaveLogicalPlanDescription(stmt *Statement, implicitTxn bool) bool {
func (a *appStats) shouldSaveLogicalPlanDescription(anonymizedStmt string, implicitTxn bool) bool {
if !sampleLogicalPlans.Get(&a.st.SV) {
return false
}
// We don't know yet if we will hit an error, so we assume we don't. The worst
// that can happen is that for statements that always error out, we will
// always save the tree plan.
stats, _ := a.getStatsForStmt(stmt, implicitTxn, nil /* error */, false /* createIfNonexistent */)
stats, _ := a.getStatsForStmt(anonymizedStmt, implicitTxn, nil /* error */, false /* createIfNonexistent */)
if stats == nil {
// Save logical plan the first time we see new statement fingerprint.
return true
Expand Down
37 changes: 17 additions & 20 deletions pkg/sql/conn_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,16 +773,16 @@ func (ex *connExecutor) closeWrapper(ctx context.Context, recovered interface{})

// If there's a statement currently being executed, we'll report
// on it.
if ex.curStmt != nil {
if ex.curStmtAST != nil {
// A warning header guaranteed to go to stderr.
log.Shoutf(ctx, log.Severity_ERROR,
"a SQL panic has occurred while executing the following statement:\n%s",
// For the log message, the statement is not anonymized.
truncateStatementStringForTelemetry(ex.curStmt.String()))
truncateStatementStringForTelemetry(ex.curStmtAST.String()))

// Embed the statement in the error object for the telemetry
// report below. The statement gets anonymized.
panicErr = WithAnonymizedStatement(panicErr, ex.curStmt)
panicErr = WithAnonymizedStatement(panicErr, ex.curStmtAST)
}

// Report the panic to telemetry in any case.
Expand Down Expand Up @@ -1095,9 +1095,9 @@ type connExecutor struct {
IdleInTransactionSessionTimeout timeout
}

// curStmt is the statement that's currently being prepared or executed, if
// curStmtAST is the statement that's currently being prepared or executed, if
// any. This is printed by high-level panic recovery.
curStmt tree.Statement
curStmtAST tree.Statement

sessionID ClusterWideID

Expand Down Expand Up @@ -1360,7 +1360,7 @@ func (ex *connExecutor) run(
defer ex.server.cfg.SessionRegistry.deregister(ex.sessionID)

for {
ex.curStmt = nil
ex.curStmtAST = nil
if err := ctx.Err(); err != nil {
return err
}
Expand Down Expand Up @@ -1424,7 +1424,7 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
res = ex.clientComm.CreateEmptyQueryResult(pos)
return nil
}
ex.curStmt = tcmd.AST
ex.curStmtAST = tcmd.AST

stmtRes := ex.clientComm.CreateStatementResult(
tcmd.AST,
Expand All @@ -1438,10 +1438,8 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
ex.implicitTxn(),
)
res = stmtRes
curStmt := Statement{Statement: tcmd.Statement}

stmtCtx := withStatement(ctx, ex.curStmt)
ev, payload, err = ex.execStmt(stmtCtx, curStmt, stmtRes, nil /* pinfo */)
ev, payload, err = ex.execStmt(ctx, tcmd.Statement, nil /* prepared */, nil /* pinfo */, stmtRes)
return err
}()
// Note: we write to ex.statsCollector.phaseTimes, instead of ex.phaseTimes,
Expand Down Expand Up @@ -1487,7 +1485,7 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
if log.ExpensiveLogEnabled(ctx, 2) {
log.VEventf(ctx, 2, "portal resolved to: %s", portal.Stmt.AST.String())
}
ex.curStmt = portal.Stmt.AST
ex.curStmtAST = portal.Stmt.AST

pinfo := &tree.PlaceholderInfo{
PlaceholderTypesInfo: tree.PlaceholderTypesInfo{
Expand Down Expand Up @@ -1525,9 +1523,9 @@ func (ex *connExecutor) execCmd(ctx context.Context) error {
}

case PrepareStmt:
ex.curStmt = tcmd.AST
ex.curStmtAST = tcmd.AST
res = ex.clientComm.CreatePrepareResult(pos)
stmtCtx := withStatement(ctx, ex.curStmt)
stmtCtx := withStatement(ctx, ex.curStmtAST)
ev, payload = ex.execPrepare(stmtCtx, tcmd)
case DescribeStmt:
descRes := ex.clientComm.CreateDescribeResult(pos)
Expand Down Expand Up @@ -1786,9 +1784,8 @@ func (ex *connExecutor) setTxnRewindPos(ctx context.Context, pos CmdPos) {
// stmtDoesntNeedRetry returns true if the given statement does not need to be
// retried when performing automatic retries. This means that the results of the
// statement do not change with retries.
func (ex *connExecutor) stmtDoesntNeedRetry(stmt tree.Statement) bool {
wrap := Statement{Statement: parser.Statement{AST: stmt}}
return isSavepoint(wrap) || isSetTransaction(wrap)
func (ex *connExecutor) stmtDoesntNeedRetry(ast tree.Statement) bool {
return isSavepoint(ast) || isSetTransaction(ast)
}

func stateToTxnStatusIndicator(s fsm.State) TransactionStatusIndicator {
Expand Down Expand Up @@ -2180,7 +2177,7 @@ func (ex *connExecutor) resetPlanner(
ctx context.Context, p *planner, txn *kv.Txn, stmtTS time.Time,
) {
p.txn = txn
p.stmt = nil
p.stmt = Statement{}

p.cancelChecker.Reset(ctx)

Expand Down Expand Up @@ -2213,7 +2210,7 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
implicitTxn = os.ImplicitTxn.Get()
}

err := ex.machine.ApplyWithPayload(withStatement(ex.Ctx(), ex.curStmt), ev, payload)
err := ex.machine.ApplyWithPayload(withStatement(ex.Ctx(), ex.curStmtAST), ev, payload)
if err != nil {
if errors.HasType(err, (*fsm.TransitionNotFoundError)(nil)) {
panic(err)
Expand Down Expand Up @@ -2310,14 +2307,14 @@ func (ex *connExecutor) txnStateTransitionsApplyWrapper(
//
// If an error is returned, it is to be considered a query execution error.
func (ex *connExecutor) initStatementResult(
ctx context.Context, res RestrictedCommandResult, stmt *Statement, cols colinfo.ResultColumns,
ctx context.Context, res RestrictedCommandResult, ast tree.Statement, cols colinfo.ResultColumns,
) error {
for _, c := range cols {
if err := checkResultType(c.Typ); err != nil {
return err
}
}
if stmt.AST.StatementType() == tree.Rows {
if ast.StatementType() == tree.Rows {
// Note that this call is necessary even if cols is nil.
res.SetColumns(ctx, cols)
}
Expand Down
Loading

0 comments on commit 0653b92

Please sign in to comment.