Skip to content

Commit

Permalink
sql: make SQL statements operate on a read snapshot
Browse files Browse the repository at this point in the history
Previously, all individual KV reads performed by a SQL statement were
able to observe the most recent KV writes that it performed itself.

This is in violation of PostgreSQL's dialect semantics, which mandate
that statements can only observe data as per a read snapshot taken at
the instant a statement begins execution.

Moreover, this invalid behavior causes a real observable bug: a
statement that reads and writes to the same table may never complete,
as the read part may become able to consume the rows that it itself
writes. Or worse, it could cause logical operations to be performed
multiple times: https://en.wikipedia.org/wiki/Halloween_Problem

This patch fixes it (partially) by exploiting the new KV `Step()` API
which decouples the read and write sequence numbers.

The fix is not complete however; additional sequence points must also
be introduced prior to FK existence checks and cascading actions. See
[#42864](#42864) and
[#33475](#33475) for
details.

For now, this patch excludes any mutation that 1) involves a foreign
key and 2) does not uyse the new CBO-driven FK logic, from the
new (fixed) semantics. When a mutation involves a FK without CBO
involvement, the previous (broken) semantics still apply.

Release note (bug fix): SQL mutation statements that target tables
with no foreign key relationships now correctly read data as per the
state of the database when the statement started execution. This is
required for compatibility with PostgreSQL and to ensure deterministic
behavior when certain operations are parallelized. Prior to this fix,
a statement [could incorrectly operate multiple
times](https://en.wikipedia.org/wiki/Halloween_Problem) on data that
itself was writing, and potentially never terminate. This fix is
limited to tables without FK relationships, and for certain operations
on tables with FK relationships; in other cases, the fix is not
active and the bug is still present. A full fix will be provided
in a later release.
  • Loading branch information
knz committed Jan 15, 2020
1 parent e58c722 commit fda15ca
Show file tree
Hide file tree
Showing 36 changed files with 468 additions and 98 deletions.
4 changes: 2 additions & 2 deletions pkg/internal/client/txn.go
Original file line number Diff line number Diff line change
Expand Up @@ -1189,8 +1189,8 @@ func (txn *Txn) Active() bool {
return txn.mu.sender.Active()
}

// Step enables step-wise execution in the transaction, or
// performs a step if step-wise execution is already enabled.
// Step performs a sequencing step. Step-wise execution must be
// already enabled.
//
// In step-wise execution, reads operate at a snapshot established at
// the last step, instead of the latest write if not yet enabled.
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/alter_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (p *planner) AlterIndex(ctx context.Context, n *tree.AlterIndex) (planNode,
return &alterIndexNode{n: n, tableDesc: tableDesc, indexDesc: indexDesc}, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
// This is because ALTER INDEX performs multiple KV operations on descriptors
// and expects to see its own writes.
func (n *alterIndexNode) ReadingOwnWrites() {}

func (n *alterIndexNode) startExec(params runParams) error {
// Commands can either change the descriptor directly (for
// alterations that don't require a backfill) or add a mutation to
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/alter_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ func (p *planner) AlterSequence(ctx context.Context, n *tree.AlterSequence) (pla
return &alterSequenceNode{n: n, seqDesc: seqDesc}, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
// This is because ALTER SEQUENCE performs multiple KV operations on descriptors
// and expects to see its own writes.
func (n *alterSequenceNode) ReadingOwnWrites() {}

func (n *alterSequenceNode) startExec(params runParams) error {
desc := n.seqDesc

Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/alter_table.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ func (p *planner) AlterTable(ctx context.Context, n *tree.AlterTable) (planNode,
}, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
// This is because ALTER TABLE performs multiple KV operations on descriptors
// and expects to see its own writes.
func (n *alterTableNode) ReadingOwnWrites() {}

func (n *alterTableNode) startExec(params runParams) error {
// Commands can either change the descriptor directly (for
// alterations that don't require a backfill) or add a mutation to
Expand Down
47 changes: 44 additions & 3 deletions pkg/sql/conn_executor_exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,8 +359,40 @@ func (ex *connExecutor) execStmtInOpenState(
discardRows = s.DiscardRows
}

// For regular statements (the ones that get to this point), we don't return
// any event unless an an error happens.
// For regular statements (the ones that get to this point), we
// don't return any event unless an error happens.

// The first order of business is to create a sequencing point. As
// per PostgreSQL's dialect specs, the "read" part of statements
// always see the data as per a snapshot of the database taken the
// instant the statement begins to run. In particular a mutation
// does not see its own writes. If a query contains multiple
// mutations using CTEs (WITH) or a read part following a mutation,
// all still operate on the same read snapshot.
//
// (To communicate data between CTEs and a main query, the result
// set / RETURNING can be used instead. However this is not relevant
// here.)
//
// This is not the only place where a sequencing point is placed. There
// are also sequencing point after every stage of constraint checks
// and cascading actions at the _end_ of a statement's execution.
//
// TODO(knz): At the time of this writing CockroachDB performs
// cascading actions and the corresponding FK existence checks
// interleaved with mutations. This is incorrect; the correct
// behavior, as described in issue
// https://github.com/cockroachdb/cockroach/issues/33475, is to
// execute cascading actions no earlier than after all the "main
// effects" of the current statement (including all its CTEs) have
// completed. There should be a sequence point between the end of
// the main execution and the start of the cascading actions, as
// well as in-between very stage of cascading actions.
// This TODO can be removed when the cascading code is reorganized
// accordingly and the missing call to Step() is introduced.
if err := ex.state.mu.txn.Step(); err != nil {
return makeErrEvent(err)
}

p := &ex.planner
stmtTS := ex.server.cfg.Clock.PhysicalTime()
Expand Down Expand Up @@ -423,6 +455,15 @@ func (ex *connExecutor) execStmtInOpenState(
}

txn := ex.state.mu.txn

// Re-enable stepping mode after the execution has completed.
// This is necessary because until https://github.com/cockroachdb/cockroach/issues/33475 is fixed
// any mutation with FK work unconditionally disables
// stepping mode when it starts.
if _, err := txn.ConfigureStepping(client.SteppingEnabled); err != nil {
return makeErrEvent(err)
}

if !os.ImplicitTxn.Get() && txn.IsSerializablePushAndRefreshNotPossible() {
rc, canAutoRetry := ex.getRewindTxnCapability()
if canAutoRetry {
Expand Down Expand Up @@ -553,7 +594,7 @@ func (ex *connExecutor) checkTableTwoVersionInvariant(ctx context.Context) error

// Create a new transaction to retry with a higher timestamp than the
// timestamps used in the retry loop above.
ex.state.mu.txn = client.NewTxn(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeID)
ex.state.mu.txn = client.NewTxnWithSteppingEnabled(ctx, ex.transitionCtx.db, ex.transitionCtx.nodeID)
if err := ex.state.mu.txn.SetUserPriority(userPriority); err != nil {
return err
}
Expand Down
72 changes: 71 additions & 1 deletion pkg/sql/conn_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/security"
"github.com/cockroachdb/cockroach/pkg/sql"
"github.com/cockroachdb/cockroach/pkg/sql/row"
"github.com/cockroachdb/cockroach/pkg/sql/tests"
"github.com/cockroachdb/cockroach/pkg/storage"
"github.com/cockroachdb/cockroach/pkg/storage/storagebase"
Expand Down Expand Up @@ -91,7 +92,7 @@ func TestSessionFinishRollsBackTxn(t *testing.T) {
params, _ := tests.CreateTestServerParams()
params.Knobs.SQLExecutor = aborter.executorKnobs()
s, mainDB, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())
defer s.Stopper().Stop(context.Background())
{
pgURL, cleanup := sqlutils.PGUrl(
t, s.ServingSQLAddr(), "TestSessionFinishRollsBackTxn", url.User(security.RootUser))
Expand Down Expand Up @@ -348,6 +349,75 @@ CREATE TABLE t.test (k INT PRIMARY KEY, v TEXT);
}
}

func TestHalloweenProblemAvoidance(t *testing.T) {
defer leaktest.AfterTest(t)()

// Populate a sufficiently large number of rows. We want at least as
// many rows as an insert can batch in its output buffer (to force a
// buffer flush), plus as many rows as a fetcher can fetch at once
// (to force a read buffer update), plus some more.
//
// Instead of customizing the working set size of the test up to the
// default settings for the SQL package, we scale down the config
// of the SQL package to the test. The reason for this is that
// race-enable builds are very slow and the default batch sizes
// would cause the test duration to exceed the timeout.
//
// We are also careful to override these defaults before starting
// the server, so as to not risk updating them concurrently with
// some background SQL activity.
const smallerKvBatchSize = 10
defer row.TestingSetKVBatchSize(smallerKvBatchSize)()
const smallerInsertBatchSize = 5
defer sql.TestingSetInsertBatchSize(smallerInsertBatchSize)()
numRows := smallerKvBatchSize + smallerInsertBatchSize + 10

params, _ := tests.CreateTestServerParams()
params.Insecure = true
s, db, _ := serverutils.StartServer(t, params)
defer s.Stopper().Stop(context.TODO())

if _, err := db.Exec(`
CREATE DATABASE t;
CREATE TABLE t.test (x FLOAT);
`); err != nil {
t.Fatal(err)
}

if _, err := db.Exec(
`INSERT INTO t.test(x) SELECT generate_series(1, $1)::FLOAT`,
numRows); err != nil {
t.Fatal(err)
}

// Now slightly modify the values in duplicate rows.
// We choose a float +0.1 to ensure that none of the derived
// values become duplicate of already-present values.
if _, err := db.Exec(`
INSERT INTO t.test(x)
-- the if ensures that no row is processed two times.
SELECT IF(x::INT::FLOAT = x,
x,
crdb_internal.force_error(
'NOOPE', 'insert saw its own writes: ' || x::STRING || ' (it is halloween today)')::FLOAT)
+ 0.1
FROM t.test
`); err != nil {
t.Fatal(err)
}

// Finally verify that no rows has been operated on more than once.
row := db.QueryRow(`SELECT count(DISTINCT x) FROM t.test`)
var cnt int
if err := row.Scan(&cnt); err != nil {
t.Fatal(err)
}

if cnt != 2*numRows {
t.Fatalf("expected %d rows in final table, got %d", 2*numRows, cnt)
}
}

func TestAppNameStatisticsInitialization(t *testing.T) {
defer leaktest.AfterTest(t)()

Expand Down
2 changes: 1 addition & 1 deletion pkg/sql/copy.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,7 +292,7 @@ func (c *copyMachine) preparePlanner(ctx context.Context) func(context.Context,
stmtTs := c.txnOpt.stmtTimestamp
autoCommit := false
if txn == nil {
txn = client.NewTxn(ctx, c.p.execCfg.DB, c.p.execCfg.NodeID.Get())
txn = client.NewTxnWithSteppingEnabled(ctx, c.p.execCfg.DB, c.p.execCfg.NodeID.Get())
txnTs = c.p.execCfg.Clock.PhysicalTime()
stmtTs = txnTs
autoCommit = true
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/create_index.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,11 @@ func MakeIndexDescriptor(n *tree.CreateIndex) (*sqlbase.IndexDescriptor, error)
return &indexDesc, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
// This is because CREATE INDEX performs multiple KV operations on descriptors
// and expects to see its own writes.
func (n *createIndexNode) ReadingOwnWrites() {}

func (n *createIndexNode) startExec(params runParams) error {
_, dropped, err := n.tableDesc.FindIndexByName(string(n.n.Name))
if err == nil {
Expand Down
5 changes: 5 additions & 0 deletions pkg/sql/create_sequence.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ func (p *planner) CreateSequence(ctx context.Context, n *tree.CreateSequence) (p
}, nil
}

// ReadingOwnWrites implements the planNodeReadingOwnWrites interface.
// This is because CREATE SEQUENCE performs multiple KV operations on descriptors
// and expects to see its own writes.
func (n *createSequenceNode) ReadingOwnWrites() {}

func (n *createSequenceNode) startExec(params runParams) error {
// TODO(arul): Allow temporary sequences once temp tables work for regular tables.
if n.n.Temporary {
Expand Down
Loading

0 comments on commit fda15ca

Please sign in to comment.