Skip to content

Commit

Permalink
sql: more directly init the timestamps in EvalCtx
Browse files Browse the repository at this point in the history
... and also make session.newPlanner() not need an Executor any more.

The initialization of the timestamp fields in the EvalContext was pretty
scattered. I've centralized it now in the ctor. The places where these
fields are not initialized are now more visible.

This commit does not change any behavior. As a reminder, there's three
timestamps that power SQL builtin functions:
- evalCtx.StmtTimestamp powers statement_timestamp()
- evalCtx.TxnTimestamp powers now(), current_timestamp() and
transaction_timestamp()
- txnProto.OrigTimestamp powers cluster_logical_timestamp()

TxnTimestamp is reset by the Executor whenever a new SQL transaction
starts. It is *not* reset on transaction restarts, automatic or
user-directed.
StmtTimestamp is reset before planning/preparing/executing each statement.

A while ago I believe a couple of us agreed that the fact that
TxnTimestamp is not reset on restarts was a mistake. Resetting it would
bring now() closer to cluster_logical_timestamp(), guaranteeing that for
conflicting txns, the results of now() would be ordered as the
transactions were. This remanins to be done in another PR.

release note: none
  • Loading branch information
andreimatei committed Jan 16, 2018
1 parent 6385024 commit a46b0a0
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 56 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func (s *Session) ProcessCopyData(
cf := s.copyFrom
buf := cf.buf

evalCtx := s.extendedEvalCtx()
evalCtx := s.extendedEvalCtx(
s.TxnState.mu.txn, s.TxnState.sqlTimestamp, s.execCfg.Clock.PhysicalTime())

switch msg {
case copyMsgData:
Expand Down
29 changes: 22 additions & 7 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,13 @@ func (e *Executor) Start(
ctx = log.WithLogTag(ctx, "startup", nil)
startupSession := NewSession(ctx, SessionArgs{}, e, nil, startupMemMetrics)
startupSession.StartUnlimitedMonitor()
if err := e.virtualSchemas.init(ctx, startupSession.newPlanner(e, nil)); err != nil {
if err := e.virtualSchemas.init(
ctx, startupSession.newPlanner(
nil, /* txn */
time.Time{}, /* txnTimestamp */
e.cfg.Clock.PhysicalTime(), /* stmtTimestamp */
e.reCache),
); err != nil {
log.Fatal(ctx, err)
}
startupSession.Finish(e)
Expand Down Expand Up @@ -516,7 +522,12 @@ func (e *Executor) Prepare(
txn.Proto().OrigTimestamp = e.cfg.Clock.Now()
}

planner := session.newPlanner(e, txn)
planner := session.newPlanner(
txn,
session.TxnState.sqlTimestamp,
e.cfg.Clock.PhysicalTime(), /* stmtTimestamp */
e.reCache,
)
planner.semaCtx.Placeholders.SetTypeHints(placeholderHints)
planner.extendedEvalCtx.PrepareOnly = true
planner.extendedEvalCtx.ActiveMemAcc = &prepared.memAcc
Expand Down Expand Up @@ -1793,18 +1804,17 @@ func (e *Executor) execStmtInOpenTxn(

var p *planner
runInParallel := parallelize && !txnState.implicitTxn
stmtTs := e.cfg.Clock.PhysicalTime()
if runInParallel {
// Create a new planner from the Session to execute the statement, since
// we're executing in parallel.
p = session.newPlanner(e, txnState.mu.txn)
p = session.newPlanner(txnState.mu.txn, txnState.sqlTimestamp, stmtTs, e.reCache)
} else {
// We're not executing in parallel. We can use the cached planner on the
// session.
p = &session.planner
session.resetPlanner(p, e, txnState.mu.txn)
session.resetPlanner(p, txnState.mu.txn, txnState.sqlTimestamp, stmtTs, e.reCache)
}
p.extendedEvalCtx.SetTxnTimestamp(txnState.sqlTimestamp)
p.extendedEvalCtx.SetStmtTimestamp(e.cfg.Clock.PhysicalTime())
p.semaCtx.Placeholders.Assign(pinfo)
p.extendedEvalCtx.Placeholders = &p.semaCtx.Placeholders
p.asOfSystemTime = asOfSystemTime
Expand Down Expand Up @@ -2569,7 +2579,12 @@ func isAsOf(session *Session, stmt tree.Statement, max hlc.Timestamp) (*hlc.Time
return nil, nil
}

evalCtx := session.extendedEvalCtx()
// the evalCtx is not needed for the purposes of evaluating AOST. See #16424.
evalCtx := session.extendedEvalCtx(
nil, /* txn */
time.Time{}, /* txnTimestamp */
time.Time{}, /* stmtTimestamp */
)
ts, err := EvalAsOfTimestamp(&evalCtx.EvalContext, asOf, max)
return &ts, err
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/parallel_stmts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,19 @@ func planQuery(
t *testing.T, s serverutils.TestServerInterface, sql string,
) (*planner, planNode, func()) {
kvDB := s.KVClient().(*client.DB)
now := s.Clock().Now()
physicalNow := s.Clock().PhysicalTime()
txn := client.NewTxn(kvDB, s.NodeID())
txn.Proto().OrigTimestamp = s.Clock().Now()
txn.Proto().OrigTimestamp = now
p := makeInternalPlanner("plan", txn, security.RootUser, &MemoryMetrics{})
p.session.tables.leaseMgr = s.LeaseManager().(*LeaseManager)
p.session.dataMutator.SetDatabase("test")
// HACK: the internal planner is not really meant to execute more than one
// statement, but here we neede to set the session database, so we did that by
// accessing the session directly. That requires us to reset the planner, so
// that its copy of the session variable is reset.
p.session.resetPlanner(p, nil /* executor*/, txn)
p.session.resetPlanner(
p, txn, physicalNow /* txnTimestamp */, physicalNow /* stmtTimestamp */, nil /* reCache */)

stmts, err := p.parser.Parse(sql)
if err != nil {
Expand Down
23 changes: 3 additions & 20 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,14 @@ func makeInternalPlanner(
-1, noteworthyInternalMemoryUsageBytes/5)
s.TxnState.mon.Start(ctx, &s.mon, mon.BoundAccount{})

p := s.newPlanner(nil /* executor */, txn)

var ts time.Time
if txn != nil {
if txn.Proto().OrigTimestamp == (hlc.Timestamp{}) {
panic("makeInternalPlanner called with a transaction without timestamps")
}
ts := txn.Proto().OrigTimestamp.GoTime()
p.extendedEvalCtx.SetTxnTimestamp(ts)
p.extendedEvalCtx.SetStmtTimestamp(ts)
ts = txn.Proto().OrigTimestamp.GoTime()
}
p := s.newPlanner(txn, ts /* txnTimestamp */, ts /* stmtTimestamp */, nil /* reCache */)

p.extendedEvalCtx.Placeholders = &p.semaCtx.Placeholders
p.extendedEvalCtx.Tables = &s.tables
Expand Down Expand Up @@ -316,21 +314,6 @@ func (p *planner) DistLoader() *DistLoader {
return &DistLoader{distSQLPlanner: p.extendedEvalCtx.DistSQLPlanner}
}

// setTxn resets the current transaction in the planner and
// initializes the timestamps used by SQL built-in functions from
// the new txn object, if any.
func (p *planner) setTxn(txn *client.Txn) {
p.txn = txn
p.extendedEvalCtx.Txn = txn
if txn != nil {
p.extendedEvalCtx.SetClusterTimestamp(txn.OrigTimestamp())
} else {
p.extendedEvalCtx.SetTxnTimestamp(time.Time{})
p.extendedEvalCtx.SetStmtTimestamp(time.Time{})
p.extendedEvalCtx.SetClusterTimestamp(hlc.Timestamp{})
}
}

// makeInternalPlan initializes a planNode from a SQL statement string.
// Close() must be called on the returned planNode after use.
// This function changes the planner's placeholder map. It is the caller's
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/sem/tree/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -2071,7 +2071,7 @@ type EvalContext struct {
TxnTimestamp time.Time
// The cluster timestamp. Needs to be stable for the lifetime of the
// transaction. Used for cluster_logical_timestamp().
clusterTimestamp hlc.Timestamp
ClusterTimestamp hlc.Timestamp

// Placeholders relates placeholder names to their type and, later, value.
// This pointer should always be set to the location of the PlaceholderInfo
Expand Down Expand Up @@ -2171,20 +2171,20 @@ func (ctx *EvalContext) GetStmtTimestamp() time.Time {
func (ctx *EvalContext) GetClusterTimestamp() *DDecimal {
// TODO(knz): a zero timestamp should never be read, even during
// Prepare. This will need to be addressed.
if !ctx.PrepareOnly && ctx.clusterTimestamp == (hlc.Timestamp{}) {
if !ctx.PrepareOnly && ctx.ClusterTimestamp == (hlc.Timestamp{}) {
panic("zero cluster timestamp in EvalContext")
}

return TimestampToDecimal(ctx.clusterTimestamp)
return TimestampToDecimal(ctx.ClusterTimestamp)
}

// GetClusterTimestampRaw exposes the clusterTimestamp field. Also see
// GetClusterTimestampRaw exposes the ClusterTimestamp field. Also see
// GetClusterTimestamp().
func (ctx *EvalContext) GetClusterTimestampRaw() hlc.Timestamp {
if !ctx.PrepareOnly && ctx.clusterTimestamp == (hlc.Timestamp{}) {
if !ctx.PrepareOnly && ctx.ClusterTimestamp == (hlc.Timestamp{}) {
panic("zero cluster timestamp in EvalContext")
}
return ctx.clusterTimestamp
return ctx.ClusterTimestamp
}

// HasPlaceholders returns true if this EvalContext's placeholders have been
Expand Down Expand Up @@ -2255,7 +2255,7 @@ func (ctx *EvalContext) SetStmtTimestamp(ts time.Time) {

// SetClusterTimestamp sets the corresponding timestamp in the EvalContext.
func (ctx *EvalContext) SetClusterTimestamp(ts hlc.Timestamp) {
ctx.clusterTimestamp = ts
ctx.ClusterTimestamp = ts
}

// GetLocation returns the session timezone.
Expand Down
56 changes: 37 additions & 19 deletions pkg/sql/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -557,8 +558,15 @@ func (s *Session) Ctx() context.Context {
return s.context
}

func (s *Session) resetPlanner(p *planner, e *Executor, txn *client.Txn) {
func (s *Session) resetPlanner(
p *planner,
txn *client.Txn,
txnTimestamp time.Time,
stmtTimestamp time.Time,
reCache *tree.RegexpCache,
) {
p.session = s
p.txn = txn
// phaseTimes is an array, not a slice, so this performs a copy-by-value.
p.phaseTimes = s.phaseTimes
p.stmt = nil
Expand All @@ -568,19 +576,17 @@ func (s *Session) resetPlanner(p *planner, e *Executor, txn *client.Txn) {
p.semaCtx.Location = &s.data.Location
p.semaCtx.SearchPath = s.data.SearchPath

p.extendedEvalCtx = s.extendedEvalCtx()
p.extendedEvalCtx = s.extendedEvalCtx(txn, txnTimestamp, stmtTimestamp)
p.extendedEvalCtx.Planner = p
if e != nil {
p.extendedEvalCtx.ClusterID = e.cfg.ClusterID()
p.extendedEvalCtx.NodeID = e.cfg.NodeID.Get()
p.extendedEvalCtx.ReCache = e.reCache
if s.execCfg != nil {
p.extendedEvalCtx.ClusterID = s.execCfg.ClusterID()
p.extendedEvalCtx.NodeID = s.execCfg.NodeID.Get()
}
p.extendedEvalCtx.ReCache = reCache

p.sessionDataMutator = s.dataMutator
p.preparedStatements = &s.PreparedStatements
p.autoCommit = false

p.setTxn(txn)
}

// FinishPlan releases the resources that were consumed by the currently active
Expand Down Expand Up @@ -609,15 +615,19 @@ func (s *Session) FinishPlan() {
// newPlanner creates a planner inside the scope of the given Session. The
// statement executed by the planner will be executed in txn. The planner
// should only be used to execute one statement.
func (s *Session) newPlanner(e *Executor, txn *client.Txn) *planner {
func (s *Session) newPlanner(
txn *client.Txn, txnTimestamp time.Time, stmtTimestamp time.Time, reCache *tree.RegexpCache,
) *planner {
p := &planner{}
s.resetPlanner(p, e, txn)
s.resetPlanner(p, txn, txnTimestamp, stmtTimestamp, reCache)
return p
}

// extendedEvalCtx creates an evaluation context from the Session's current
// configuration.
func (s *Session) extendedEvalCtx() extendedEvalContext {
func (s *Session) extendedEvalCtx(
txn *client.Txn, txnTimestamp time.Time, stmtTimestamp time.Time,
) extendedEvalContext {
var evalContextTestingKnobs tree.EvalContextTestingKnobs
var st *cluster.Settings
var statusServer serverpb.StatusServer
Expand All @@ -628,16 +638,24 @@ func (s *Session) extendedEvalCtx() extendedEvalContext {
st = s.execCfg.Settings
statusServer = s.execCfg.StatusServer
}
var clusterTs hlc.Timestamp
if txn != nil {
clusterTs = txn.OrigTimestamp()
}
return extendedEvalContext{
EvalContext: tree.EvalContext{
SessionData: s.data,
ApplicationName: s.dataMutator.ApplicationName(),
TxnState: getTransactionState(&s.TxnState),
TxnReadOnly: s.TxnState.readOnly,
Settings: st,
CtxProvider: s,
Mon: &s.TxnState.mon,
TestingKnobs: evalContextTestingKnobs,
Txn: txn,
SessionData: s.data,
ApplicationName: s.dataMutator.ApplicationName(),
TxnState: getTransactionState(&s.TxnState),
TxnReadOnly: s.TxnState.readOnly,
Settings: st,
CtxProvider: s,
Mon: &s.TxnState.mon,
TestingKnobs: evalContextTestingKnobs,
StmtTimestamp: stmtTimestamp,
TxnTimestamp: txnTimestamp,
ClusterTimestamp: clusterTs,
},
SessionMutator: s.dataMutator,
VirtualSchemas: &s.virtualSchemas,
Expand Down

0 comments on commit a46b0a0

Please sign in to comment.