Skip to content

Commit

Permalink
[discussion] make procedure instruction execution from view/Call RO (#…
Browse files Browse the repository at this point in the history
…298)

* add a mutex to SqliteClient for write concurrency control

* make abci Commit guarantee state is committed

* hotfix for concurrent read-schema issue

* execution: commitedOnly on read-only sqlite conn

---------

Co-authored-by: brennan lamey <[email protected]>
  • Loading branch information
jchappelow and brennanjl committed Feb 26, 2024
1 parent d8a8337 commit f6820fb
Show file tree
Hide file tree
Showing 17 changed files with 156 additions and 69 deletions.
2 changes: 1 addition & 1 deletion cmd/kwil-cli/cmds/database/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func queryCmd() *cobra.Command {
}

results := res.ExportString()
printTable(results)
printTableTrunc(results)

return nil
})
Expand Down
46 changes: 46 additions & 0 deletions cmd/kwil-cli/cmds/database/result_printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,49 @@ func printTable(data []map[string]string) {
fmt.Println("|")
}
}

// printTableTrunc prints a table, truncating values that are too long.
func printTableTrunc(data []map[string]string) {
if len(data) == 0 {
fmt.Println("No data to display.")
return
}

headers := make([]string, 0)
colWidths := make(map[string]int)
for _, row := range data {
for k, v := range row {
if _, exists := colWidths[k]; !exists {
headers = append(headers, k)
colWidths[k] = len(k)
}
if len(v) > colWidths[k] {
colWidths[k] = len(v)
}
if colWidths[k] > 32 {
colWidths[k] = 32
}
}
}

for _, h := range headers {
fmt.Printf("| %-*s ", colWidths[h], h)
}
fmt.Println("|")

for _, h := range headers {
fmt.Printf("|-%s-", strings.Repeat("-", colWidths[h]))
}
fmt.Println("|")

for _, row := range data {
for _, h := range headers {
val := row[h]
if len(val) > 32 {
val = val[:29] + "..."
}
fmt.Printf("| %-*s ", colWidths[h], val)
}
fmt.Println("|")
}
}
23 changes: 1 addition & 22 deletions pkg/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,8 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com
snapshotter: snapshotter,

valAddrToKey: make(map[string][]byte),
valUpdates: make([]*validators.Validator, 0),

log: log.NewNoOp(),

commitSemaphore: make(chan struct{}, 1), // max concurrency for a BeginBlock->Commit+Apply sequence is 1

// state: appState{height, ...}, // TODO
}

for _, opt := range opts {
Expand Down Expand Up @@ -126,15 +121,6 @@ type AbciApp struct {

log log.Logger

// Consensus method requests from cometbft are synchronous, but a portion of
// the work of Commit is launched in a goroutine, so we block a subsequent
// BeginBlock from starting new changes. We do this by acquiring a semaphore
// with max concurrency of 1 at the start of BeginBlock, and releasing it
// when the changes from Commit have finished applying. A mutex is rarely
// held for longer than the duration of a local function, while a waitgroup
// does not provide atomic Wait/Add semantics that fit here.
commitSemaphore chan struct{}

// Expected AppState after bootstrapping the node with a given snapshot,
// state gets updated with the bootupState after bootstrapping
bootupState appState
Expand All @@ -150,8 +136,6 @@ func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.Response
logger := a.log.With(zap.String("stage", "ABCI BeginBlock"), zap.Int("height", int(req.Header.Height)))
logger.Debug("begin block")

a.commitSemaphore <- struct{}{} // peg in (until Commit is applied), there will only be at most one waiter

err := a.committer.Begin(context.Background())
if err != nil {
panic(newFatalError("BeginBlock", &req, err.Error()))
Expand Down Expand Up @@ -473,12 +457,7 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to increment block height: %v", err)))
}

err = a.committer.Commit(ctx, func(err error) {
if err != nil {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err)))
}
<-a.commitSemaphore // peg out (from BeginBlock)
})
err = a.committer.Commit(ctx)
if err != nil {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err)))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/abci/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type AtomicCommitter interface {
ClearWal(ctx context.Context) error
Begin(ctx context.Context) error
ID(ctx context.Context) ([]byte, error)
Commit(ctx context.Context, applyCallback func(error)) error
Commit(ctx context.Context) error
}

// KVStore is an interface for a basic key-value store
Expand Down
7 changes: 6 additions & 1 deletion pkg/engine/dataset/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ func (d *Dataset) Call(ctx context.Context, action string, args []any, opts *TxO
return nil, ErrCallerNotOwner
}

return d.executeOnce(ctx, proc, args, d.getExecutionOpts(proc, opts)...)
if len(args) != len(proc.Args) {
return nil, fmt.Errorf("expected %d args, got %d", len(proc.Args), len(args))
}

execOpts := append(d.getExecutionOpts(proc, opts), execution.CommittedOnly())
return d.engine.ExecuteProcedure(ctx, proc.Name, args, execOpts...)
}

// getProcedure gets a procedure. If the procedure is not found, it returns an error.
Expand Down
16 changes: 11 additions & 5 deletions pkg/engine/dataset/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu
var procedureInstructions []*execution.InstructionExecution
loaderInstructions = []*execution.InstructionExecution{}

mut := procedure.IsMutative()
for _, stmt := range procedure.Statements {
parsedStmt, err := actparser.Parse(stmt)
if err != nil {
Expand All @@ -158,7 +159,7 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu

switch stmtType := parsedStmt.(type) {
case *actparser.DMLStmt:
stmtInstructions, stmtLoaderInstructions, err = convertDml(stmtType)
stmtInstructions, stmtLoaderInstructions, err = convertDml(stmtType, mut)
case *actparser.ExtensionCallStmt:
stmtInstructions, stmtLoaderInstructions, err = convertExtensionExecute(stmtType)
case *actparser.ActionCallStmt:
Expand All @@ -185,9 +186,8 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu
}, loaderInstructions, nil
}

func convertDml(dml *actparser.DMLStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) {
func convertDml(dml *actparser.DMLStmt, mut bool) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) {
uniqueName := randomHash.getRandomHash()

loadOp := &execution.InstructionExecution{
Instruction: execution.OpDMLPrepare,
Args: []any{
Expand All @@ -202,11 +202,17 @@ func convertDml(dml *actparser.DMLStmt) (procedureInstructions []*execution.Inst
uniqueName,
},
}
if !mut { // i.e. may be executed with read-only Query
// The entire statement would be unused for mutative statements since
// they always use a prepared statement. Executions doing a read-only
// query for uncommittted data will not use the prepared statement.
procedureOp.Args = append(procedureOp.Args, dml.Statement)
}

return []*execution.InstructionExecution{procedureOp}, []*execution.InstructionExecution{loadOp}, nil
}

func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) {
func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) {
var setters []*execution.InstructionExecution

var args []string
Expand Down Expand Up @@ -235,7 +241,7 @@ func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstruc
return append(setters, procedureOp), []*execution.InstructionExecution{}, nil
}

func convertProcedureCall(action *actparser.ActionCallStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) {
func convertProcedureCall(action *actparser.ActionCallStmt) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) {
var setters []*execution.InstructionExecution

var args []string
Expand Down
12 changes: 11 additions & 1 deletion pkg/engine/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package db
import (
"context"
"io"
"sync"

"github.com/kwilteam/kwil-db/pkg/engine/sqlanalyzer"
"github.com/kwilteam/kwil-db/pkg/engine/sqlparser"
Expand All @@ -19,6 +20,14 @@ import (

type DB struct {
Sqldb SqlDB

// caches metadata from QueryUnsafe.
// this is a really bad practice, but works.
// essentially, we cache the metadata the first time it is retrieved, during schema
// deployment. This prevents the need from calling QueryUnsafe again
metadataCache map[metadataType][]*metadata

mu sync.RWMutex
}

func (d *DB) Close() error {
Expand Down Expand Up @@ -78,7 +87,8 @@ func (d *DB) Savepoint() (sql.Savepoint, error) {

func NewDB(ctx context.Context, sqldb SqlDB) (*DB, error) {
db := &DB{
Sqldb: sqldb,
Sqldb: sqldb,
metadataCache: make(map[metadataType][]*metadata),
}

err := db.initMetadataTable(ctx)
Expand Down
9 changes: 9 additions & 0 deletions pkg/engine/db/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (d *DB) storeMetadata(ctx context.Context, meta *metadata) error {
}

func (d *DB) getMetadata(ctx context.Context, metaType metadataType) ([]*metadata, error) {
d.mu.Lock()
defer d.mu.Unlock()
cached, ok := d.metadataCache[metaType]
if ok {
return cached, nil
}

results, err := d.Sqldb.QueryUnsafe(ctx, selectMetadataStatement, map[string]interface{}{
"$type": metaType,
})
Expand Down Expand Up @@ -102,6 +109,8 @@ func (d *DB) getMetadata(ctx context.Context, metaType metadataType) ([]*metadat
})
}

d.metadataCache[metaType] = metas

return metas, nil
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/engine/db/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ func Test_UpgradingProcedures(t *testing.T) {
&tc.storedProcedure,
},
}
database := db.DB{
Sqldb: datastore,
}

ctx := context.Background()
database, err := db.NewDB(ctx, datastore)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

returnedProcedures, err := database.ListProcedures(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down
16 changes: 12 additions & 4 deletions pkg/engine/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ type executionContext struct {
datasetID string
lastDmlResult []map[string]any

// mustBeNonMutative is true if the execution context must be non-mutative.
// if true, execution of mutative statements will fail.
mustBeNonMutative bool
// nonMutative indicates the execution will be non-mutative.
nonMutative bool
// committedReadOnly indicates the execution is from a read-only call that
// should reflect only scommited changes.
committedReadOnly bool
}

func (ec *executionContext) contextualVariables() map[string]any {
Expand Down Expand Up @@ -69,7 +71,13 @@ func WithDatasetID(dataset string) ExecutionOpt {

func NonMutative() ExecutionOpt {
return func(ec *executionContext) {
ec.mustBeNonMutative = true
ec.nonMutative = true
}
}

func CommittedOnly() ExecutionOpt {
return func(ec *executionContext) {
ec.committedReadOnly = true
}
}

Expand Down
30 changes: 23 additions & 7 deletions pkg/engine/execution/instructions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package execution

import (
"errors"
"fmt"
"strings"
)
Expand Down Expand Up @@ -89,7 +90,27 @@ func evalDMLPrepare(ctx *procedureContext, eng *Engine, args ...any) error {
}

func evalDMLExecute(ctx *procedureContext, eng *Engine, args ...any) error {
if len(args) != 1 {
nonMut := ctx.nonMutative
if ctx.committedReadOnly {
if !nonMut {
return errors.New("procedure cannot be mutative in read-only contexty")
}
if len(args) != 2 {
return fmt.Errorf("%w: dml execute requires 2 arguments, got %d", ErrIncorrectNumArgs, len(args))
}
stmt, ok := args[1].(string)
if !ok {
return fmt.Errorf("%w: expected string, got %T", ErrIncorrectInputType, args[0])
}
var err error
ctx.lastDmlResult, err = eng.db.Query(ctx.ctx, stmt, ctx.values)
if err != nil {
return fmt.Errorf("failed to execute statement: %w", err)
}
return nil
}

if (!nonMut && len(args) != 1) && (nonMut && len(args) != 2) { // may be an extra one if it's non-mutative
return fmt.Errorf("%w: dml execute requires 1 argument, got %d", ErrIncorrectNumArgs, len(args))
}

Expand All @@ -103,12 +124,7 @@ func evalDMLExecute(ctx *procedureContext, eng *Engine, args ...any) error {
return fmt.Errorf("%w: '%s'", ErrUnknownPreparedStatement, stmtName)
}

ctxMut := ctx.mustBeNonMutative
if ctxMut {
fmt.Print("mutative")
}

if ctx.mustBeNonMutative && preparedStmt.IsMutative() {
if nonMut && preparedStmt.IsMutative() {
return fmt.Errorf("%w: '%s'", ErrMutativeStatement, stmtName)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/engine/execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ type Initializer interface {

// Datastore is an interface for a datastore, usually a sqlite DB.
type Datastore interface {
// Prepare will be used for RW execution.
Prepare(ctx context.Context, query string) (PreparedStatement, error)
// Query will be used for RO execution.
Query(ctx context.Context, stmt string, args map[string]any) ([]map[string]any, error)
}

type PreparedStatement interface {
Expand Down
4 changes: 4 additions & 0 deletions pkg/engine/execution/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (m *mockDatastore) Prepare(ctx context.Context, uery string) (execution.Pre
}, nil
}

func (m *mockDatastore) Query(ctx context.Context, stmt string, args map[string]any) ([]map[string]any, error) {
return []map[string]any{}, nil
}

type mockPreparedStatement struct {
mutative bool
}
Expand Down
Loading

0 comments on commit f6820fb

Please sign in to comment.