Skip to content

Commit

Permalink
Merge pull request cockroachdb#21441 from andreimatei/evalctx-ts
Browse files Browse the repository at this point in the history
sql: more directly init the timestamps in EvalCtx
  • Loading branch information
andreimatei authored Jan 16, 2018
2 parents 6385024 + a46b0a0 commit 4b23c09
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 56 deletions.
3 changes: 2 additions & 1 deletion pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,8 @@ func (s *Session) ProcessCopyData(
cf := s.copyFrom
buf := cf.buf

evalCtx := s.extendedEvalCtx()
evalCtx := s.extendedEvalCtx(
s.TxnState.mu.txn, s.TxnState.sqlTimestamp, s.execCfg.Clock.PhysicalTime())

switch msg {
case copyMsgData:
Expand Down
29 changes: 22 additions & 7 deletions pkg/sql/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,13 @@ func (e *Executor) Start(
ctx = log.WithLogTag(ctx, "startup", nil)
startupSession := NewSession(ctx, SessionArgs{}, e, nil, startupMemMetrics)
startupSession.StartUnlimitedMonitor()
if err := e.virtualSchemas.init(ctx, startupSession.newPlanner(e, nil)); err != nil {
if err := e.virtualSchemas.init(
ctx, startupSession.newPlanner(
nil, /* txn */
time.Time{}, /* txnTimestamp */
e.cfg.Clock.PhysicalTime(), /* stmtTimestamp */
e.reCache),
); err != nil {
log.Fatal(ctx, err)
}
startupSession.Finish(e)
Expand Down Expand Up @@ -516,7 +522,12 @@ func (e *Executor) Prepare(
txn.Proto().OrigTimestamp = e.cfg.Clock.Now()
}

planner := session.newPlanner(e, txn)
planner := session.newPlanner(
txn,
session.TxnState.sqlTimestamp,
e.cfg.Clock.PhysicalTime(), /* stmtTimestamp */
e.reCache,
)
planner.semaCtx.Placeholders.SetTypeHints(placeholderHints)
planner.extendedEvalCtx.PrepareOnly = true
planner.extendedEvalCtx.ActiveMemAcc = &prepared.memAcc
Expand Down Expand Up @@ -1793,18 +1804,17 @@ func (e *Executor) execStmtInOpenTxn(

var p *planner
runInParallel := parallelize && !txnState.implicitTxn
stmtTs := e.cfg.Clock.PhysicalTime()
if runInParallel {
// Create a new planner from the Session to execute the statement, since
// we're executing in parallel.
p = session.newPlanner(e, txnState.mu.txn)
p = session.newPlanner(txnState.mu.txn, txnState.sqlTimestamp, stmtTs, e.reCache)
} else {
// We're not executing in parallel. We can use the cached planner on the
// session.
p = &session.planner
session.resetPlanner(p, e, txnState.mu.txn)
session.resetPlanner(p, txnState.mu.txn, txnState.sqlTimestamp, stmtTs, e.reCache)
}
p.extendedEvalCtx.SetTxnTimestamp(txnState.sqlTimestamp)
p.extendedEvalCtx.SetStmtTimestamp(e.cfg.Clock.PhysicalTime())
p.semaCtx.Placeholders.Assign(pinfo)
p.extendedEvalCtx.Placeholders = &p.semaCtx.Placeholders
p.asOfSystemTime = asOfSystemTime
Expand Down Expand Up @@ -2569,7 +2579,12 @@ func isAsOf(session *Session, stmt tree.Statement, max hlc.Timestamp) (*hlc.Time
return nil, nil
}

evalCtx := session.extendedEvalCtx()
// the evalCtx is not needed for the purposes of evaluating AOST. See #16424.
evalCtx := session.extendedEvalCtx(
nil, /* txn */
time.Time{}, /* txnTimestamp */
time.Time{}, /* stmtTimestamp */
)
ts, err := EvalAsOfTimestamp(&evalCtx.EvalContext, asOf, max)
return &ts, err
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/sql/parallel_stmts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,16 +315,19 @@ func planQuery(
t *testing.T, s serverutils.TestServerInterface, sql string,
) (*planner, planNode, func()) {
kvDB := s.KVClient().(*client.DB)
now := s.Clock().Now()
physicalNow := s.Clock().PhysicalTime()
txn := client.NewTxn(kvDB, s.NodeID())
txn.Proto().OrigTimestamp = s.Clock().Now()
txn.Proto().OrigTimestamp = now
p := makeInternalPlanner("plan", txn, security.RootUser, &MemoryMetrics{})
p.session.tables.leaseMgr = s.LeaseManager().(*LeaseManager)
p.session.dataMutator.SetDatabase("test")
// HACK: the internal planner is not really meant to execute more than one
// statement, but here we neede to set the session database, so we did that by
// accessing the session directly. That requires us to reset the planner, so
// that its copy of the session variable is reset.
p.session.resetPlanner(p, nil /* executor*/, txn)
p.session.resetPlanner(
p, txn, physicalNow /* txnTimestamp */, physicalNow /* stmtTimestamp */, nil /* reCache */)

stmts, err := p.parser.Parse(sql)
if err != nil {
Expand Down
23 changes: 3 additions & 20 deletions pkg/sql/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -256,16 +256,14 @@ func makeInternalPlanner(
-1, noteworthyInternalMemoryUsageBytes/5)
s.TxnState.mon.Start(ctx, &s.mon, mon.BoundAccount{})

p := s.newPlanner(nil /* executor */, txn)

var ts time.Time
if txn != nil {
if txn.Proto().OrigTimestamp == (hlc.Timestamp{}) {
panic("makeInternalPlanner called with a transaction without timestamps")
}
ts := txn.Proto().OrigTimestamp.GoTime()
p.extendedEvalCtx.SetTxnTimestamp(ts)
p.extendedEvalCtx.SetStmtTimestamp(ts)
ts = txn.Proto().OrigTimestamp.GoTime()
}
p := s.newPlanner(txn, ts /* txnTimestamp */, ts /* stmtTimestamp */, nil /* reCache */)

p.extendedEvalCtx.Placeholders = &p.semaCtx.Placeholders
p.extendedEvalCtx.Tables = &s.tables
Expand Down Expand Up @@ -316,21 +314,6 @@ func (p *planner) DistLoader() *DistLoader {
return &DistLoader{distSQLPlanner: p.extendedEvalCtx.DistSQLPlanner}
}

// setTxn resets the current transaction in the planner and
// initializes the timestamps used by SQL built-in functions from
// the new txn object, if any.
func (p *planner) setTxn(txn *client.Txn) {
p.txn = txn
p.extendedEvalCtx.Txn = txn
if txn != nil {
p.extendedEvalCtx.SetClusterTimestamp(txn.OrigTimestamp())
} else {
p.extendedEvalCtx.SetTxnTimestamp(time.Time{})
p.extendedEvalCtx.SetStmtTimestamp(time.Time{})
p.extendedEvalCtx.SetClusterTimestamp(hlc.Timestamp{})
}
}

// makeInternalPlan initializes a planNode from a SQL statement string.
// Close() must be called on the returned planNode after use.
// This function changes the planner's placeholder map. It is the caller's
Expand Down
14 changes: 7 additions & 7 deletions pkg/sql/sem/tree/eval.go
Original file line number Diff line number Diff line change
Expand Up @@ -2071,7 +2071,7 @@ type EvalContext struct {
TxnTimestamp time.Time
// The cluster timestamp. Needs to be stable for the lifetime of the
// transaction. Used for cluster_logical_timestamp().
clusterTimestamp hlc.Timestamp
ClusterTimestamp hlc.Timestamp

// Placeholders relates placeholder names to their type and, later, value.
// This pointer should always be set to the location of the PlaceholderInfo
Expand Down Expand Up @@ -2171,20 +2171,20 @@ func (ctx *EvalContext) GetStmtTimestamp() time.Time {
func (ctx *EvalContext) GetClusterTimestamp() *DDecimal {
// TODO(knz): a zero timestamp should never be read, even during
// Prepare. This will need to be addressed.
if !ctx.PrepareOnly && ctx.clusterTimestamp == (hlc.Timestamp{}) {
if !ctx.PrepareOnly && ctx.ClusterTimestamp == (hlc.Timestamp{}) {
panic("zero cluster timestamp in EvalContext")
}

return TimestampToDecimal(ctx.clusterTimestamp)
return TimestampToDecimal(ctx.ClusterTimestamp)
}

// GetClusterTimestampRaw exposes the clusterTimestamp field. Also see
// GetClusterTimestampRaw exposes the ClusterTimestamp field. Also see
// GetClusterTimestamp().
func (ctx *EvalContext) GetClusterTimestampRaw() hlc.Timestamp {
if !ctx.PrepareOnly && ctx.clusterTimestamp == (hlc.Timestamp{}) {
if !ctx.PrepareOnly && ctx.ClusterTimestamp == (hlc.Timestamp{}) {
panic("zero cluster timestamp in EvalContext")
}
return ctx.clusterTimestamp
return ctx.ClusterTimestamp
}

// HasPlaceholders returns true if this EvalContext's placeholders have been
Expand Down Expand Up @@ -2255,7 +2255,7 @@ func (ctx *EvalContext) SetStmtTimestamp(ts time.Time) {

// SetClusterTimestamp sets the corresponding timestamp in the EvalContext.
func (ctx *EvalContext) SetClusterTimestamp(ts hlc.Timestamp) {
ctx.clusterTimestamp = ts
ctx.ClusterTimestamp = ts
}

// GetLocation returns the session timezone.
Expand Down
56 changes: 37 additions & 19 deletions pkg/sql/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/contextutil"
"github.com/cockroachdb/cockroach/pkg/util/duration"
"github.com/cockroachdb/cockroach/pkg/util/envutil"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/mon"
"github.com/cockroachdb/cockroach/pkg/util/retry"
Expand Down Expand Up @@ -557,8 +558,15 @@ func (s *Session) Ctx() context.Context {
return s.context
}

func (s *Session) resetPlanner(p *planner, e *Executor, txn *client.Txn) {
func (s *Session) resetPlanner(
p *planner,
txn *client.Txn,
txnTimestamp time.Time,
stmtTimestamp time.Time,
reCache *tree.RegexpCache,
) {
p.session = s
p.txn = txn
// phaseTimes is an array, not a slice, so this performs a copy-by-value.
p.phaseTimes = s.phaseTimes
p.stmt = nil
Expand All @@ -568,19 +576,17 @@ func (s *Session) resetPlanner(p *planner, e *Executor, txn *client.Txn) {
p.semaCtx.Location = &s.data.Location
p.semaCtx.SearchPath = s.data.SearchPath

p.extendedEvalCtx = s.extendedEvalCtx()
p.extendedEvalCtx = s.extendedEvalCtx(txn, txnTimestamp, stmtTimestamp)
p.extendedEvalCtx.Planner = p
if e != nil {
p.extendedEvalCtx.ClusterID = e.cfg.ClusterID()
p.extendedEvalCtx.NodeID = e.cfg.NodeID.Get()
p.extendedEvalCtx.ReCache = e.reCache
if s.execCfg != nil {
p.extendedEvalCtx.ClusterID = s.execCfg.ClusterID()
p.extendedEvalCtx.NodeID = s.execCfg.NodeID.Get()
}
p.extendedEvalCtx.ReCache = reCache

p.sessionDataMutator = s.dataMutator
p.preparedStatements = &s.PreparedStatements
p.autoCommit = false

p.setTxn(txn)
}

// FinishPlan releases the resources that were consumed by the currently active
Expand Down Expand Up @@ -609,15 +615,19 @@ func (s *Session) FinishPlan() {
// newPlanner creates a planner inside the scope of the given Session. The
// statement executed by the planner will be executed in txn. The planner
// should only be used to execute one statement.
func (s *Session) newPlanner(e *Executor, txn *client.Txn) *planner {
func (s *Session) newPlanner(
txn *client.Txn, txnTimestamp time.Time, stmtTimestamp time.Time, reCache *tree.RegexpCache,
) *planner {
p := &planner{}
s.resetPlanner(p, e, txn)
s.resetPlanner(p, txn, txnTimestamp, stmtTimestamp, reCache)
return p
}

// extendedEvalCtx creates an evaluation context from the Session's current
// configuration.
func (s *Session) extendedEvalCtx() extendedEvalContext {
func (s *Session) extendedEvalCtx(
txn *client.Txn, txnTimestamp time.Time, stmtTimestamp time.Time,
) extendedEvalContext {
var evalContextTestingKnobs tree.EvalContextTestingKnobs
var st *cluster.Settings
var statusServer serverpb.StatusServer
Expand All @@ -628,16 +638,24 @@ func (s *Session) extendedEvalCtx() extendedEvalContext {
st = s.execCfg.Settings
statusServer = s.execCfg.StatusServer
}
var clusterTs hlc.Timestamp
if txn != nil {
clusterTs = txn.OrigTimestamp()
}
return extendedEvalContext{
EvalContext: tree.EvalContext{
SessionData: s.data,
ApplicationName: s.dataMutator.ApplicationName(),
TxnState: getTransactionState(&s.TxnState),
TxnReadOnly: s.TxnState.readOnly,
Settings: st,
CtxProvider: s,
Mon: &s.TxnState.mon,
TestingKnobs: evalContextTestingKnobs,
Txn: txn,
SessionData: s.data,
ApplicationName: s.dataMutator.ApplicationName(),
TxnState: getTransactionState(&s.TxnState),
TxnReadOnly: s.TxnState.readOnly,
Settings: st,
CtxProvider: s,
Mon: &s.TxnState.mon,
TestingKnobs: evalContextTestingKnobs,
StmtTimestamp: stmtTimestamp,
TxnTimestamp: txnTimestamp,
ClusterTimestamp: clusterTs,
},
SessionMutator: s.dataMutator,
VirtualSchemas: &s.virtualSchemas,
Expand Down

0 comments on commit 4b23c09

Please sign in to comment.