Skip to content

Commit

Permalink
[discussion] make procedure instruction execution from view/Call RO (#…
Browse files Browse the repository at this point in the history
…298)

* add a mutex to SqliteClient for write concurrency control

* make abci Commit guarantee state is committed

* hotfix for concurrent read-schema issue

* execution: commitedOnly on read-only sqlite conn

---------

Co-authored-by: brennan lamey <[email protected]>
  • Loading branch information
jchappelow and brennanjl committed Sep 14, 2023
1 parent 19e8d26 commit d44a842
Show file tree
Hide file tree
Showing 11 changed files with 83 additions and 63 deletions.
23 changes: 1 addition & 22 deletions pkg/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,8 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com
snapshotter: snapshotter,

valAddrToKey: make(map[string][]byte),
valUpdates: make([]*validators.Validator, 0),

log: log.NewNoOp(),

commitSemaphore: make(chan struct{}, 1), // max concurrency for a BeginBlock->Commit+Apply sequence is 1

// state: appState{height, ...}, // TODO
}

for _, opt := range opts {
Expand Down Expand Up @@ -128,15 +123,6 @@ type AbciApp struct {

log log.Logger

// Consensus method requests from cometbft are synchronous, but a portion of
// the work of Commit is launched in a goroutine, so we block a subsequent
// BeginBlock from starting new changes. We do this by acquiring a semaphore
// with max concurrency of 1 at the start of BeginBlock, and releasing it
// when the changes from Commit have finished applying. A mutex is rarely
// held for longer than the duration of a local function, while a waitgroup
// does not provide atomic Wait/Add semantics that fit here.
commitSemaphore chan struct{}

// Expected AppState after bootstrapping the node with a given snapshot,
// state gets updated with the bootupState after bootstrapping
bootupState appState
Expand All @@ -152,8 +138,6 @@ func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.Response
logger := a.log.With(zap.String("stage", "ABCI BeginBlock"), zap.Int("height", int(req.Header.Height)))
logger.Debug("begin block")

a.commitSemaphore <- struct{}{} // peg in (until Commit is applied), there will only be at most one waiter

err := a.committer.Begin(context.Background())
if err != nil {
panic(newFatalError("BeginBlock", &req, err.Error()))
Expand Down Expand Up @@ -473,12 +457,7 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to increment block height: %v", err)))
}

err = a.committer.Commit(ctx, func(err error) {
if err != nil {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err)))
}
<-a.commitSemaphore // peg out (from BeginBlock)
})
err = a.committer.Commit(ctx)
if err != nil {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err)))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/abci/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type AtomicCommitter interface {
ClearWal(ctx context.Context) error
Begin(ctx context.Context) error
ID(ctx context.Context) ([]byte, error)
Commit(ctx context.Context, applyCallback func(error)) error
Commit(ctx context.Context) error
}

// KVStore is an interface for a basic key-value store
Expand Down
7 changes: 6 additions & 1 deletion pkg/engine/dataset/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ func (d *Dataset) Call(ctx context.Context, action string, args []any, opts *TxO
return nil, ErrCallerNotOwner
}

return d.executeOnce(ctx, proc, args, d.getExecutionOpts(proc, opts)...)
if len(args) != len(proc.Args) {
return nil, fmt.Errorf("expected %d args, got %d", len(proc.Args), len(args))
}

execOpts := append(d.getExecutionOpts(proc, opts), execution.CommittedOnly())
return d.engine.ExecuteProcedure(ctx, proc.Name, args, execOpts...)
}

// getProcedure gets a procedure. If the procedure is not found, it returns an error.
Expand Down
16 changes: 11 additions & 5 deletions pkg/engine/dataset/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu
var procedureInstructions []*execution.InstructionExecution
loaderInstructions = []*execution.InstructionExecution{}

mut := procedure.IsMutative()
for _, stmt := range procedure.Statements {
parsedStmt, err := actparser.Parse(stmt)
if err != nil {
Expand All @@ -158,7 +159,7 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu

switch stmtType := parsedStmt.(type) {
case *actparser.DMLStmt:
stmtInstructions, stmtLoaderInstructions, err = convertDml(stmtType)
stmtInstructions, stmtLoaderInstructions, err = convertDml(stmtType, mut)
case *actparser.ExtensionCallStmt:
stmtInstructions, stmtLoaderInstructions, err = convertExtensionExecute(stmtType)
case *actparser.ActionCallStmt:
Expand All @@ -185,9 +186,8 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu
}, loaderInstructions, nil
}

func convertDml(dml *actparser.DMLStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) {
func convertDml(dml *actparser.DMLStmt, mut bool) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) {
uniqueName := randomHash.getRandomHash()

loadOp := &execution.InstructionExecution{
Instruction: execution.OpDMLPrepare,
Args: []any{
Expand All @@ -202,11 +202,17 @@ func convertDml(dml *actparser.DMLStmt) (procedureInstructions []*execution.Inst
uniqueName,
},
}
if !mut { // i.e. may be executed with read-only Query
// The entire statement would be unused for mutative statements since
// they always use a prepared statement. Executions doing a read-only
// query for uncommittted data will not use the prepared statement.
procedureOp.Args = append(procedureOp.Args, dml.Statement)
}

return []*execution.InstructionExecution{procedureOp}, []*execution.InstructionExecution{loadOp}, nil
}

func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) {
func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) {
var setters []*execution.InstructionExecution

var args []string
Expand Down Expand Up @@ -235,7 +241,7 @@ func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstruc
return append(setters, procedureOp), []*execution.InstructionExecution{}, nil
}

func convertProcedureCall(action *actparser.ActionCallStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) {
func convertProcedureCall(action *actparser.ActionCallStmt) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) {
var setters []*execution.InstructionExecution

var args []string
Expand Down
16 changes: 12 additions & 4 deletions pkg/engine/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ type executionContext struct {
datasetID string
lastDmlResult []map[string]any

// mustBeNonMutative is true if the execution context must be non-mutative.
// if true, execution of mutative statements will fail.
mustBeNonMutative bool
// nonMutative indicates the execution will be non-mutative.
nonMutative bool
// committedReadOnly indicates the execution is from a read-only call that
// should reflect only scommited changes.
committedReadOnly bool
}

func (ec *executionContext) contextualVariables() map[string]any {
Expand Down Expand Up @@ -69,7 +71,13 @@ func WithDatasetID(dataset string) ExecutionOpt {

func NonMutative() ExecutionOpt {
return func(ec *executionContext) {
ec.mustBeNonMutative = true
ec.nonMutative = true
}
}

func CommittedOnly() ExecutionOpt {
return func(ec *executionContext) {
ec.committedReadOnly = true
}
}

Expand Down
30 changes: 23 additions & 7 deletions pkg/engine/execution/instructions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package execution

import (
"errors"
"fmt"
"strings"
)
Expand Down Expand Up @@ -89,7 +90,27 @@ func evalDMLPrepare(ctx *procedureContext, eng *Engine, args ...any) error {
}

func evalDMLExecute(ctx *procedureContext, eng *Engine, args ...any) error {
if len(args) != 1 {
nonMut := ctx.nonMutative
if ctx.committedReadOnly {
if !nonMut {
return errors.New("procedure cannot be mutative in read-only contexty")
}
if len(args) != 2 {
return fmt.Errorf("%w: dml execute requires 2 arguments, got %d", ErrIncorrectNumArgs, len(args))
}
stmt, ok := args[1].(string)
if !ok {
return fmt.Errorf("%w: expected string, got %T", ErrIncorrectInputType, args[0])
}
var err error
ctx.lastDmlResult, err = eng.db.Query(ctx.ctx, stmt, ctx.values)
if err != nil {
return fmt.Errorf("failed to execute statement: %w", err)
}
return nil
}

if (!nonMut && len(args) != 1) && (nonMut && len(args) != 2) { // may be an extra one if it's non-mutative
return fmt.Errorf("%w: dml execute requires 1 argument, got %d", ErrIncorrectNumArgs, len(args))
}

Expand All @@ -103,12 +124,7 @@ func evalDMLExecute(ctx *procedureContext, eng *Engine, args ...any) error {
return fmt.Errorf("%w: '%s'", ErrUnknownPreparedStatement, stmtName)
}

ctxMut := ctx.mustBeNonMutative
if ctxMut {
fmt.Print("mutative")
}

if ctx.mustBeNonMutative && preparedStmt.IsMutative() {
if nonMut && preparedStmt.IsMutative() {
return fmt.Errorf("%w: '%s'", ErrMutativeStatement, stmtName)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/engine/execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ type Initializer interface {

// Datastore is an interface for a datastore, usually a sqlite DB.
type Datastore interface {
// Prepare will be used for RW execution.
Prepare(ctx context.Context, query string) (PreparedStatement, error)
// Query will be used for RO execution.
Query(ctx context.Context, stmt string, args map[string]any) ([]map[string]any, error)
}

type PreparedStatement interface {
Expand Down
4 changes: 4 additions & 0 deletions pkg/engine/execution/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (m *mockDatastore) Prepare(ctx context.Context, uery string) (execution.Pre
}, nil
}

func (m *mockDatastore) Query(ctx context.Context, stmt string, args map[string]any) ([]map[string]any, error) {
return []map[string]any{}, nil
}

type mockPreparedStatement struct {
mutative bool
}
Expand Down
17 changes: 4 additions & 13 deletions pkg/sessions/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,12 @@ func (a *AtomicCommitter) Begin(ctx context.Context) (err error) {

// Commit commits the atomic session.
// It can be given a callback function to handle any errors that occur during the apply phase (which proceeds asynchronously) after this function returns.
func (a *AtomicCommitter) Commit(ctx context.Context, applyCallback func(error)) (err error) {
func (a *AtomicCommitter) Commit(ctx context.Context) (err error) {
a.mu.Lock()
defer a.mu.Unlock()

// if no session in progress, then return without cancelling
if a.phase != CommitPhaseCommit {
a.mu.Unlock()
return phaseError("Commit", CommitPhaseCommit, a.phase)
}
a.phase = CommitPhaseApply
Expand All @@ -141,40 +141,31 @@ func (a *AtomicCommitter) Commit(ctx context.Context, applyCallback func(error))

// if session is in progress but the committer is closed, then cancel the session
if a.closed {
a.mu.Unlock()
return ErrClosed
}

// begin the commit in the WAL
err = a.wal.WriteBegin(ctx)
if err != nil {
a.mu.Unlock()
return err
}

// tell all committables to finish phase 1, and submit
// any changes to the WAL
err = a.endCommit(ctx)
if err != nil {
a.mu.Unlock()
return err
}

// commit the WAL
err = a.wal.WriteCommit(ctx)
if err != nil {
a.mu.Unlock()
return err
}

go func() {
err2 := a.apply(ctx)
applyCallback(err2)
a.phase = CommitPhaseNone
a.mu.Unlock()
}()
a.phase = CommitPhaseNone

return nil
return a.apply(ctx)
}

// ID returns a deterministic identifier representative of all state changes that have occurred in the session.
Expand Down
10 changes: 1 addition & 9 deletions pkg/sessions/sessions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,7 @@ func Test_Session(t *testing.T) {
return err
}

applyErr := make(chan error)
err = committer.Commit(ctx, func(err error) {
applyErr <- err
})
if err != nil {
return err
}

err = <-applyErr
err = committer.Commit(ctx)
if err != nil {
return err
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/sql/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"encoding/json"
"fmt"
"io"
"sync"

"github.com/kwilteam/kwil-db/pkg/log"
"github.com/kwilteam/kwil-db/pkg/sql"
Expand All @@ -31,6 +32,8 @@ type SqliteClient struct {

// log is self-explanatory.
log log.Logger

writerMtx sync.Mutex
}

func NewSqliteStore(name string, opts ...SqliteOpts) (*SqliteClient, error) {
Expand Down Expand Up @@ -71,6 +74,8 @@ func WrapSqliteConn(conn *sqlite.Connection, logger log.Logger) (*SqliteClient,

// Execute executes a statement.
func (s *SqliteClient) Execute(ctx context.Context, stmt string, args map[string]any) error {
s.writerMtx.Lock()
defer s.writerMtx.Unlock()
return s.conn.Execute(stmt, args)
}

Expand Down Expand Up @@ -106,7 +111,8 @@ func (s *SqliteClient) Prepare(stmt string) (sql.Statement, error) {
// It SHOULD be read-only, but there is nothing forcing it to be. use with caution
// This should get deleted once we redo the engine
func (w *SqliteClient) QueryUnsafe(ctx context.Context, query string, args map[string]any) ([]map[string]any, error) {

w.writerMtx.Lock()
defer w.writerMtx.Unlock()
stmt, err := w.Prepare(query)
if err != nil {
return nil, err
Expand Down Expand Up @@ -172,6 +178,8 @@ func ResultsfromReader(reader io.Reader) ([]map[string]any, error) {
}

func (s *SqliteClient) CreateSession() (sql.Session, error) {
s.writerMtx.Lock()
defer s.writerMtx.Unlock()
sess, err := s.conn.CreateSession()
if err != nil {
return nil, err
Expand All @@ -183,17 +191,25 @@ func (s *SqliteClient) CreateSession() (sql.Session, error) {
}

func (s *SqliteClient) ApplyChangeset(reader io.Reader) error {
s.writerMtx.Lock()
defer s.writerMtx.Unlock()
return s.conn.ApplyChangeset(reader)
}

func (s *SqliteClient) CheckpointWal() error {
s.writerMtx.Lock()
defer s.writerMtx.Unlock()
return s.conn.CheckpointWal()
}

func (s *SqliteClient) DisableForeignKey() error {
s.writerMtx.Lock()
defer s.writerMtx.Unlock()
return s.conn.DisableForeignKey()
}

func (s *SqliteClient) EnableForeignKey() error {
s.writerMtx.Lock()
defer s.writerMtx.Unlock()
return s.conn.EnableForeignKey()
}

0 comments on commit d44a842

Please sign in to comment.