Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[discussion] make procedure instruction execution from view/Call RO #298

Merged
merged 4 commits into from
Sep 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cmd/kwil-cli/cmds/database/query.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func queryCmd() *cobra.Command {
}

results := res.ExportString()
printTable(results)
printTableTrunc(results)

return nil
})
Expand Down
46 changes: 46 additions & 0 deletions cmd/kwil-cli/cmds/database/result_printer.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,49 @@ func printTable(data []map[string]string) {
fmt.Println("|")
}
}

// printTableTrunc prints a table, truncating values that are too long.
func printTableTrunc(data []map[string]string) {
if len(data) == 0 {
fmt.Println("No data to display.")
return
}

headers := make([]string, 0)
colWidths := make(map[string]int)
for _, row := range data {
for k, v := range row {
if _, exists := colWidths[k]; !exists {
headers = append(headers, k)
colWidths[k] = len(k)
}
if len(v) > colWidths[k] {
colWidths[k] = len(v)
}
if colWidths[k] > 32 {
colWidths[k] = 32
}
}
}

for _, h := range headers {
fmt.Printf("| %-*s ", colWidths[h], h)
}
fmt.Println("|")

for _, h := range headers {
fmt.Printf("|-%s-", strings.Repeat("-", colWidths[h]))
}
fmt.Println("|")

for _, row := range data {
for _, h := range headers {
val := row[h]
if len(val) > 32 {
val = val[:29] + "..."
}
fmt.Printf("| %-*s ", colWidths[h], val)
}
fmt.Println("|")
}
}
23 changes: 1 addition & 22 deletions pkg/abci/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,13 +74,8 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com
snapshotter: snapshotter,

valAddrToKey: make(map[string][]byte),
valUpdates: make([]*validators.Validator, 0),

log: log.NewNoOp(),

commitSemaphore: make(chan struct{}, 1), // max concurrency for a BeginBlock->Commit+Apply sequence is 1

// state: appState{height, ...}, // TODO
}

for _, opt := range opts {
Expand Down Expand Up @@ -126,15 +121,6 @@ type AbciApp struct {

log log.Logger

// Consensus method requests from cometbft are synchronous, but a portion of
// the work of Commit is launched in a goroutine, so we block a subsequent
// BeginBlock from starting new changes. We do this by acquiring a semaphore
// with max concurrency of 1 at the start of BeginBlock, and releasing it
// when the changes from Commit have finished applying. A mutex is rarely
// held for longer than the duration of a local function, while a waitgroup
// does not provide atomic Wait/Add semantics that fit here.
commitSemaphore chan struct{}

// Expected AppState after bootstrapping the node with a given snapshot,
// state gets updated with the bootupState after bootstrapping
bootupState appState
Expand All @@ -150,8 +136,6 @@ func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.Response
logger := a.log.With(zap.String("stage", "ABCI BeginBlock"), zap.Int("height", int(req.Header.Height)))
logger.Debug("begin block")

a.commitSemaphore <- struct{}{} // peg in (until Commit is applied), there will only be at most one waiter

err := a.committer.Begin(context.Background())
if err != nil {
panic(newFatalError("BeginBlock", &req, err.Error()))
Expand Down Expand Up @@ -473,12 +457,7 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to increment block height: %v", err)))
}

err = a.committer.Commit(ctx, func(err error) {
if err != nil {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err)))
}
<-a.commitSemaphore // peg out (from BeginBlock)
})
err = a.committer.Commit(ctx)
if err != nil {
panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err)))
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/abci/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type AtomicCommitter interface {
ClearWal(ctx context.Context) error
Begin(ctx context.Context) error
ID(ctx context.Context) ([]byte, error)
Commit(ctx context.Context, applyCallback func(error)) error
Commit(ctx context.Context) error
}

// KVStore is an interface for a basic key-value store
Expand Down
7 changes: 6 additions & 1 deletion pkg/engine/dataset/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,12 @@ func (d *Dataset) Call(ctx context.Context, action string, args []any, opts *TxO
return nil, ErrCallerNotOwner
}

return d.executeOnce(ctx, proc, args, d.getExecutionOpts(proc, opts)...)
if len(args) != len(proc.Args) {
return nil, fmt.Errorf("expected %d args, got %d", len(proc.Args), len(args))
}

execOpts := append(d.getExecutionOpts(proc, opts), execution.CommittedOnly())
return d.engine.ExecuteProcedure(ctx, proc.Name, args, execOpts...)
}

// getProcedure gets a procedure. If the procedure is not found, it returns an error.
Expand Down
16 changes: 11 additions & 5 deletions pkg/engine/dataset/parse.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu
var procedureInstructions []*execution.InstructionExecution
loaderInstructions = []*execution.InstructionExecution{}

mut := procedure.IsMutative()
for _, stmt := range procedure.Statements {
parsedStmt, err := actparser.Parse(stmt)
if err != nil {
Expand All @@ -158,7 +159,7 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu

switch stmtType := parsedStmt.(type) {
case *actparser.DMLStmt:
stmtInstructions, stmtLoaderInstructions, err = convertDml(stmtType)
stmtInstructions, stmtLoaderInstructions, err = convertDml(stmtType, mut)
case *actparser.ExtensionCallStmt:
stmtInstructions, stmtLoaderInstructions, err = convertExtensionExecute(stmtType)
case *actparser.ActionCallStmt:
Expand All @@ -185,9 +186,8 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu
}, loaderInstructions, nil
}

func convertDml(dml *actparser.DMLStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) {
func convertDml(dml *actparser.DMLStmt, mut bool) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) {
uniqueName := randomHash.getRandomHash()

loadOp := &execution.InstructionExecution{
Instruction: execution.OpDMLPrepare,
Args: []any{
Expand All @@ -202,11 +202,17 @@ func convertDml(dml *actparser.DMLStmt) (procedureInstructions []*execution.Inst
uniqueName,
},
}
if !mut { // i.e. may be executed with read-only Query
// The entire statement would be unused for mutative statements since
// they always use a prepared statement. Executions doing a read-only
// query for uncommittted data will not use the prepared statement.
procedureOp.Args = append(procedureOp.Args, dml.Statement)
}

return []*execution.InstructionExecution{procedureOp}, []*execution.InstructionExecution{loadOp}, nil
}

func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) {
func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) {
var setters []*execution.InstructionExecution

var args []string
Expand Down Expand Up @@ -235,7 +241,7 @@ func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstruc
return append(setters, procedureOp), []*execution.InstructionExecution{}, nil
}

func convertProcedureCall(action *actparser.ActionCallStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) {
func convertProcedureCall(action *actparser.ActionCallStmt) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) {
var setters []*execution.InstructionExecution

var args []string
Expand Down
12 changes: 11 additions & 1 deletion pkg/engine/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package db
import (
"context"
"io"
"sync"

"github.com/kwilteam/kwil-db/pkg/engine/sqlanalyzer"
"github.com/kwilteam/kwil-db/pkg/engine/sqlparser"
Expand All @@ -19,6 +20,14 @@ import (

type DB struct {
Sqldb SqlDB

// caches metadata from QueryUnsafe.
// this is a really bad practice, but works.
// essentially, we cache the metadata the first time it is retrieved, during schema
// deployment. This prevents the need from calling QueryUnsafe again
metadataCache map[metadataType][]*metadata

mu sync.RWMutex
}

func (d *DB) Close() error {
Expand Down Expand Up @@ -78,7 +87,8 @@ func (d *DB) Savepoint() (sql.Savepoint, error) {

func NewDB(ctx context.Context, sqldb SqlDB) (*DB, error) {
db := &DB{
Sqldb: sqldb,
Sqldb: sqldb,
metadataCache: make(map[metadataType][]*metadata),
}

err := db.initMetadataTable(ctx)
Expand Down
9 changes: 9 additions & 0 deletions pkg/engine/db/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (d *DB) storeMetadata(ctx context.Context, meta *metadata) error {
}

func (d *DB) getMetadata(ctx context.Context, metaType metadataType) ([]*metadata, error) {
d.mu.Lock()
defer d.mu.Unlock()
cached, ok := d.metadataCache[metaType]
if ok {
return cached, nil
}

results, err := d.Sqldb.QueryUnsafe(ctx, selectMetadataStatement, map[string]interface{}{
"$type": metaType,
})
Expand Down Expand Up @@ -102,6 +109,8 @@ func (d *DB) getMetadata(ctx context.Context, metaType metadataType) ([]*metadat
})
}

d.metadataCache[metaType] = metas

return metas, nil
}

Expand Down
8 changes: 5 additions & 3 deletions pkg/engine/db/upgrade_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,13 @@ func Test_UpgradingProcedures(t *testing.T) {
&tc.storedProcedure,
},
}
database := db.DB{
Sqldb: datastore,
}

ctx := context.Background()
database, err := db.NewDB(ctx, datastore)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}

returnedProcedures, err := database.ListProcedures(ctx)
if err != nil {
t.Fatalf("unexpected error: %v", err)
Expand Down
16 changes: 12 additions & 4 deletions pkg/engine/execution/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ type executionContext struct {
datasetID string
lastDmlResult []map[string]any

// mustBeNonMutative is true if the execution context must be non-mutative.
// if true, execution of mutative statements will fail.
mustBeNonMutative bool
// nonMutative indicates the execution will be non-mutative.
nonMutative bool
// committedReadOnly indicates the execution is from a read-only call that
// should reflect only scommited changes.
committedReadOnly bool
}

func (ec *executionContext) contextualVariables() map[string]any {
Expand Down Expand Up @@ -69,7 +71,13 @@ func WithDatasetID(dataset string) ExecutionOpt {

func NonMutative() ExecutionOpt {
return func(ec *executionContext) {
ec.mustBeNonMutative = true
ec.nonMutative = true
}
}

func CommittedOnly() ExecutionOpt {
return func(ec *executionContext) {
ec.committedReadOnly = true
}
}

Expand Down
30 changes: 23 additions & 7 deletions pkg/engine/execution/instructions.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package execution

import (
"errors"
"fmt"
"strings"
)
Expand Down Expand Up @@ -89,7 +90,27 @@ func evalDMLPrepare(ctx *procedureContext, eng *Engine, args ...any) error {
}

func evalDMLExecute(ctx *procedureContext, eng *Engine, args ...any) error {
if len(args) != 1 {
nonMut := ctx.nonMutative
if ctx.committedReadOnly {
if !nonMut {
return errors.New("procedure cannot be mutative in read-only contexty")
}
if len(args) != 2 {
return fmt.Errorf("%w: dml execute requires 2 arguments, got %d", ErrIncorrectNumArgs, len(args))
}
stmt, ok := args[1].(string)
if !ok {
return fmt.Errorf("%w: expected string, got %T", ErrIncorrectInputType, args[0])
}
var err error
ctx.lastDmlResult, err = eng.db.Query(ctx.ctx, stmt, ctx.values)
if err != nil {
return fmt.Errorf("failed to execute statement: %w", err)
}
return nil
}

if (!nonMut && len(args) != 1) && (nonMut && len(args) != 2) { // may be an extra one if it's non-mutative
return fmt.Errorf("%w: dml execute requires 1 argument, got %d", ErrIncorrectNumArgs, len(args))
}

Expand All @@ -103,12 +124,7 @@ func evalDMLExecute(ctx *procedureContext, eng *Engine, args ...any) error {
return fmt.Errorf("%w: '%s'", ErrUnknownPreparedStatement, stmtName)
}

ctxMut := ctx.mustBeNonMutative
if ctxMut {
fmt.Print("mutative")
}

if ctx.mustBeNonMutative && preparedStmt.IsMutative() {
if nonMut && preparedStmt.IsMutative() {
return fmt.Errorf("%w: '%s'", ErrMutativeStatement, stmtName)
}

Expand Down
3 changes: 3 additions & 0 deletions pkg/engine/execution/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@ type Initializer interface {

// Datastore is an interface for a datastore, usually a sqlite DB.
type Datastore interface {
// Prepare will be used for RW execution.
Prepare(ctx context.Context, query string) (PreparedStatement, error)
// Query will be used for RO execution.
Query(ctx context.Context, stmt string, args map[string]any) ([]map[string]any, error)
}

type PreparedStatement interface {
Expand Down
4 changes: 4 additions & 0 deletions pkg/engine/execution/mock_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@ func (m *mockDatastore) Prepare(ctx context.Context, uery string) (execution.Pre
}, nil
}

func (m *mockDatastore) Query(ctx context.Context, stmt string, args map[string]any) ([]map[string]any, error) {
return []map[string]any{}, nil
}

type mockPreparedStatement struct {
mutative bool
}
Expand Down
Loading