diff --git a/cmd/kwil-cli/cmds/database/query.go b/cmd/kwil-cli/cmds/database/query.go index 80e1f673a..858363a8f 100644 --- a/cmd/kwil-cli/cmds/database/query.go +++ b/cmd/kwil-cli/cmds/database/query.go @@ -30,7 +30,7 @@ func queryCmd() *cobra.Command { } results := res.ExportString() - printTable(results) + printTableTrunc(results) return nil }) diff --git a/cmd/kwil-cli/cmds/database/result_printer.go b/cmd/kwil-cli/cmds/database/result_printer.go index 997a84fb7..6298e7949 100644 --- a/cmd/kwil-cli/cmds/database/result_printer.go +++ b/cmd/kwil-cli/cmds/database/result_printer.go @@ -59,3 +59,49 @@ func printTable(data []map[string]string) { fmt.Println("|") } } + +// printTableTrunc prints a table, truncating values that are too long. +func printTableTrunc(data []map[string]string) { + if len(data) == 0 { + fmt.Println("No data to display.") + return + } + + headers := make([]string, 0) + colWidths := make(map[string]int) + for _, row := range data { + for k, v := range row { + if _, exists := colWidths[k]; !exists { + headers = append(headers, k) + colWidths[k] = len(k) + } + if len(v) > colWidths[k] { + colWidths[k] = len(v) + } + if colWidths[k] > 32 { + colWidths[k] = 32 + } + } + } + + for _, h := range headers { + fmt.Printf("| %-*s ", colWidths[h], h) + } + fmt.Println("|") + + for _, h := range headers { + fmt.Printf("|-%s-", strings.Repeat("-", colWidths[h])) + } + fmt.Println("|") + + for _, row := range data { + for _, h := range headers { + val := row[h] + if len(val) > 32 { + val = val[:29] + "..." + } + fmt.Printf("| %-*s ", colWidths[h], val) + } + fmt.Println("|") + } +} diff --git a/pkg/abci/abci.go b/pkg/abci/abci.go index 88941716e..5badeb715 100644 --- a/pkg/abci/abci.go +++ b/pkg/abci/abci.go @@ -74,13 +74,8 @@ func NewAbciApp(database DatasetsModule, vldtrs ValidatorModule, kv KVStore, com snapshotter: snapshotter, valAddrToKey: make(map[string][]byte), - valUpdates: make([]*validators.Validator, 0), log: log.NewNoOp(), - - commitSemaphore: make(chan struct{}, 1), // max concurrency for a BeginBlock->Commit+Apply sequence is 1 - - // state: appState{height, ...}, // TODO } for _, opt := range opts { @@ -126,15 +121,6 @@ type AbciApp struct { log log.Logger - // Consensus method requests from cometbft are synchronous, but a portion of - // the work of Commit is launched in a goroutine, so we block a subsequent - // BeginBlock from starting new changes. We do this by acquiring a semaphore - // with max concurrency of 1 at the start of BeginBlock, and releasing it - // when the changes from Commit have finished applying. A mutex is rarely - // held for longer than the duration of a local function, while a waitgroup - // does not provide atomic Wait/Add semantics that fit here. - commitSemaphore chan struct{} - // Expected AppState after bootstrapping the node with a given snapshot, // state gets updated with the bootupState after bootstrapping bootupState appState @@ -150,8 +136,6 @@ func (a *AbciApp) BeginBlock(req abciTypes.RequestBeginBlock) abciTypes.Response logger := a.log.With(zap.String("stage", "ABCI BeginBlock"), zap.Int("height", int(req.Header.Height))) logger.Debug("begin block") - a.commitSemaphore <- struct{}{} // peg in (until Commit is applied), there will only be at most one waiter - err := a.committer.Begin(context.Background()) if err != nil { panic(newFatalError("BeginBlock", &req, err.Error())) @@ -473,12 +457,7 @@ func (a *AbciApp) Commit() abciTypes.ResponseCommit { panic(newFatalError("Commit", nil, fmt.Sprintf("failed to increment block height: %v", err))) } - err = a.committer.Commit(ctx, func(err error) { - if err != nil { - panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err))) - } - <-a.commitSemaphore // peg out (from BeginBlock) - }) + err = a.committer.Commit(ctx) if err != nil { panic(newFatalError("Commit", nil, fmt.Sprintf("failed to commit atomic commit: %v", err))) } diff --git a/pkg/abci/interfaces.go b/pkg/abci/interfaces.go index 30af37c1a..f2c39666f 100644 --- a/pkg/abci/interfaces.go +++ b/pkg/abci/interfaces.go @@ -60,7 +60,7 @@ type AtomicCommitter interface { ClearWal(ctx context.Context) error Begin(ctx context.Context) error ID(ctx context.Context) ([]byte, error) - Commit(ctx context.Context, applyCallback func(error)) error + Commit(ctx context.Context) error } // KVStore is an interface for a basic key-value store diff --git a/pkg/engine/dataset/dataset.go b/pkg/engine/dataset/dataset.go index e4f85c18b..3d2cfd9ca 100644 --- a/pkg/engine/dataset/dataset.go +++ b/pkg/engine/dataset/dataset.go @@ -151,7 +151,12 @@ func (d *Dataset) Call(ctx context.Context, action string, args []any, opts *TxO return nil, ErrCallerNotOwner } - return d.executeOnce(ctx, proc, args, d.getExecutionOpts(proc, opts)...) + if len(args) != len(proc.Args) { + return nil, fmt.Errorf("expected %d args, got %d", len(proc.Args), len(args)) + } + + execOpts := append(d.getExecutionOpts(proc, opts), execution.CommittedOnly()) + return d.engine.ExecuteProcedure(ctx, proc.Name, args, execOpts...) } // getProcedure gets a procedure. If the procedure is not found, it returns an error. diff --git a/pkg/engine/dataset/parse.go b/pkg/engine/dataset/parse.go index 79faf025a..5a757b69f 100644 --- a/pkg/engine/dataset/parse.go +++ b/pkg/engine/dataset/parse.go @@ -147,6 +147,7 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu var procedureInstructions []*execution.InstructionExecution loaderInstructions = []*execution.InstructionExecution{} + mut := procedure.IsMutative() for _, stmt := range procedure.Statements { parsedStmt, err := actparser.Parse(stmt) if err != nil { @@ -158,7 +159,7 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu switch stmtType := parsedStmt.(type) { case *actparser.DMLStmt: - stmtInstructions, stmtLoaderInstructions, err = convertDml(stmtType) + stmtInstructions, stmtLoaderInstructions, err = convertDml(stmtType, mut) case *actparser.ExtensionCallStmt: stmtInstructions, stmtLoaderInstructions, err = convertExtensionExecute(stmtType) case *actparser.ActionCallStmt: @@ -185,9 +186,8 @@ func parseAction(procedure *types.Procedure) (engineProcedure *execution.Procedu }, loaderInstructions, nil } -func convertDml(dml *actparser.DMLStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) { +func convertDml(dml *actparser.DMLStmt, mut bool) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) { uniqueName := randomHash.getRandomHash() - loadOp := &execution.InstructionExecution{ Instruction: execution.OpDMLPrepare, Args: []any{ @@ -202,11 +202,17 @@ func convertDml(dml *actparser.DMLStmt) (procedureInstructions []*execution.Inst uniqueName, }, } + if !mut { // i.e. may be executed with read-only Query + // The entire statement would be unused for mutative statements since + // they always use a prepared statement. Executions doing a read-only + // query for uncommittted data will not use the prepared statement. + procedureOp.Args = append(procedureOp.Args, dml.Statement) + } return []*execution.InstructionExecution{procedureOp}, []*execution.InstructionExecution{loadOp}, nil } -func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) { +func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) { var setters []*execution.InstructionExecution var args []string @@ -235,7 +241,7 @@ func convertExtensionExecute(ext *actparser.ExtensionCallStmt) (procedureInstruc return append(setters, procedureOp), []*execution.InstructionExecution{}, nil } -func convertProcedureCall(action *actparser.ActionCallStmt) (procedureInstructions []*execution.InstructionExecution, loaderInstructions []*execution.InstructionExecution, err error) { +func convertProcedureCall(action *actparser.ActionCallStmt) (procedureInstructions, loaderInstructions []*execution.InstructionExecution, err error) { var setters []*execution.InstructionExecution var args []string diff --git a/pkg/engine/db/db.go b/pkg/engine/db/db.go index 7dcc6e632..2d9606143 100644 --- a/pkg/engine/db/db.go +++ b/pkg/engine/db/db.go @@ -11,6 +11,7 @@ package db import ( "context" "io" + "sync" "github.com/kwilteam/kwil-db/pkg/engine/sqlanalyzer" "github.com/kwilteam/kwil-db/pkg/engine/sqlparser" @@ -19,6 +20,14 @@ import ( type DB struct { Sqldb SqlDB + + // caches metadata from QueryUnsafe. + // this is a really bad practice, but works. + // essentially, we cache the metadata the first time it is retrieved, during schema + // deployment. This prevents the need from calling QueryUnsafe again + metadataCache map[metadataType][]*metadata + + mu sync.RWMutex } func (d *DB) Close() error { @@ -78,7 +87,8 @@ func (d *DB) Savepoint() (sql.Savepoint, error) { func NewDB(ctx context.Context, sqldb SqlDB) (*DB, error) { db := &DB{ - Sqldb: sqldb, + Sqldb: sqldb, + metadataCache: make(map[metadataType][]*metadata), } err := db.initMetadataTable(ctx) diff --git a/pkg/engine/db/metadata.go b/pkg/engine/db/metadata.go index 6dbdeed38..210d8f1ce 100644 --- a/pkg/engine/db/metadata.go +++ b/pkg/engine/db/metadata.go @@ -67,6 +67,13 @@ func (d *DB) storeMetadata(ctx context.Context, meta *metadata) error { } func (d *DB) getMetadata(ctx context.Context, metaType metadataType) ([]*metadata, error) { + d.mu.Lock() + defer d.mu.Unlock() + cached, ok := d.metadataCache[metaType] + if ok { + return cached, nil + } + results, err := d.Sqldb.QueryUnsafe(ctx, selectMetadataStatement, map[string]interface{}{ "$type": metaType, }) @@ -102,6 +109,8 @@ func (d *DB) getMetadata(ctx context.Context, metaType metadataType) ([]*metadat }) } + d.metadataCache[metaType] = metas + return metas, nil } diff --git a/pkg/engine/db/upgrade_test.go b/pkg/engine/db/upgrade_test.go index 5fb951ee8..ca5733b64 100644 --- a/pkg/engine/db/upgrade_test.go +++ b/pkg/engine/db/upgrade_test.go @@ -97,11 +97,13 @@ func Test_UpgradingProcedures(t *testing.T) { &tc.storedProcedure, }, } - database := db.DB{ - Sqldb: datastore, - } ctx := context.Background() + database, err := db.NewDB(ctx, datastore) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + returnedProcedures, err := database.ListProcedures(ctx) if err != nil { t.Fatalf("unexpected error: %v", err) diff --git a/pkg/engine/execution/context.go b/pkg/engine/execution/context.go index 0309b2569..d7a562cfa 100644 --- a/pkg/engine/execution/context.go +++ b/pkg/engine/execution/context.go @@ -24,9 +24,11 @@ type executionContext struct { datasetID string lastDmlResult []map[string]any - // mustBeNonMutative is true if the execution context must be non-mutative. - // if true, execution of mutative statements will fail. - mustBeNonMutative bool + // nonMutative indicates the execution will be non-mutative. + nonMutative bool + // committedReadOnly indicates the execution is from a read-only call that + // should reflect only scommited changes. + committedReadOnly bool } func (ec *executionContext) contextualVariables() map[string]any { @@ -69,7 +71,13 @@ func WithDatasetID(dataset string) ExecutionOpt { func NonMutative() ExecutionOpt { return func(ec *executionContext) { - ec.mustBeNonMutative = true + ec.nonMutative = true + } +} + +func CommittedOnly() ExecutionOpt { + return func(ec *executionContext) { + ec.committedReadOnly = true } } diff --git a/pkg/engine/execution/instructions.go b/pkg/engine/execution/instructions.go index 3069b21c5..b759b38fd 100644 --- a/pkg/engine/execution/instructions.go +++ b/pkg/engine/execution/instructions.go @@ -1,6 +1,7 @@ package execution import ( + "errors" "fmt" "strings" ) @@ -89,7 +90,27 @@ func evalDMLPrepare(ctx *procedureContext, eng *Engine, args ...any) error { } func evalDMLExecute(ctx *procedureContext, eng *Engine, args ...any) error { - if len(args) != 1 { + nonMut := ctx.nonMutative + if ctx.committedReadOnly { + if !nonMut { + return errors.New("procedure cannot be mutative in read-only contexty") + } + if len(args) != 2 { + return fmt.Errorf("%w: dml execute requires 2 arguments, got %d", ErrIncorrectNumArgs, len(args)) + } + stmt, ok := args[1].(string) + if !ok { + return fmt.Errorf("%w: expected string, got %T", ErrIncorrectInputType, args[0]) + } + var err error + ctx.lastDmlResult, err = eng.db.Query(ctx.ctx, stmt, ctx.values) + if err != nil { + return fmt.Errorf("failed to execute statement: %w", err) + } + return nil + } + + if (!nonMut && len(args) != 1) && (nonMut && len(args) != 2) { // may be an extra one if it's non-mutative return fmt.Errorf("%w: dml execute requires 1 argument, got %d", ErrIncorrectNumArgs, len(args)) } @@ -103,12 +124,7 @@ func evalDMLExecute(ctx *procedureContext, eng *Engine, args ...any) error { return fmt.Errorf("%w: '%s'", ErrUnknownPreparedStatement, stmtName) } - ctxMut := ctx.mustBeNonMutative - if ctxMut { - fmt.Print("mutative") - } - - if ctx.mustBeNonMutative && preparedStmt.IsMutative() { + if nonMut && preparedStmt.IsMutative() { return fmt.Errorf("%w: '%s'", ErrMutativeStatement, stmtName) } diff --git a/pkg/engine/execution/interface.go b/pkg/engine/execution/interface.go index 18c045729..27e58c4e8 100644 --- a/pkg/engine/execution/interface.go +++ b/pkg/engine/execution/interface.go @@ -14,7 +14,10 @@ type Initializer interface { // Datastore is an interface for a datastore, usually a sqlite DB. type Datastore interface { + // Prepare will be used for RW execution. Prepare(ctx context.Context, query string) (PreparedStatement, error) + // Query will be used for RO execution. + Query(ctx context.Context, stmt string, args map[string]any) ([]map[string]any, error) } type PreparedStatement interface { diff --git a/pkg/engine/execution/mock_test.go b/pkg/engine/execution/mock_test.go index 420559264..193e10427 100644 --- a/pkg/engine/execution/mock_test.go +++ b/pkg/engine/execution/mock_test.go @@ -24,6 +24,10 @@ func (m *mockDatastore) Prepare(ctx context.Context, uery string) (execution.Pre }, nil } +func (m *mockDatastore) Query(ctx context.Context, stmt string, args map[string]any) ([]map[string]any, error) { + return []map[string]any{}, nil +} + type mockPreparedStatement struct { mutative bool } diff --git a/pkg/sessions/session.go b/pkg/sessions/session.go index 302baab79..8e2fccb74 100644 --- a/pkg/sessions/session.go +++ b/pkg/sessions/session.go @@ -126,12 +126,12 @@ func (a *AtomicCommitter) Begin(ctx context.Context) (err error) { // Commit commits the atomic session. // It can be given a callback function to handle any errors that occur during the apply phase (which proceeds asynchronously) after this function returns. -func (a *AtomicCommitter) Commit(ctx context.Context, applyCallback func(error)) (err error) { +func (a *AtomicCommitter) Commit(ctx context.Context) (err error) { a.mu.Lock() + defer a.mu.Unlock() // if no session in progress, then return without cancelling if a.phase != CommitPhaseCommit { - a.mu.Unlock() return phaseError("Commit", CommitPhaseCommit, a.phase) } a.phase = CommitPhaseApply @@ -141,14 +141,12 @@ func (a *AtomicCommitter) Commit(ctx context.Context, applyCallback func(error)) // if session is in progress but the committer is closed, then cancel the session if a.closed { - a.mu.Unlock() return ErrClosed } // begin the commit in the WAL err = a.wal.WriteBegin(ctx) if err != nil { - a.mu.Unlock() return err } @@ -156,25 +154,18 @@ func (a *AtomicCommitter) Commit(ctx context.Context, applyCallback func(error)) // any changes to the WAL err = a.endCommit(ctx) if err != nil { - a.mu.Unlock() return err } // commit the WAL err = a.wal.WriteCommit(ctx) if err != nil { - a.mu.Unlock() return err } - go func() { - err2 := a.apply(ctx) - applyCallback(err2) - a.phase = CommitPhaseNone - a.mu.Unlock() - }() + a.phase = CommitPhaseNone - return nil + return a.apply(ctx) } // ID returns a deterministic identifier representative of all state changes that have occurred in the session. diff --git a/pkg/sessions/sessions_test.go b/pkg/sessions/sessions_test.go index 90a98b699..b580012ac 100644 --- a/pkg/sessions/sessions_test.go +++ b/pkg/sessions/sessions_test.go @@ -156,15 +156,7 @@ func Test_Session(t *testing.T) { return err } - applyErr := make(chan error) - err = committer.Commit(ctx, func(err error) { - applyErr <- err - }) - if err != nil { - return err - } - - err = <-applyErr + err = committer.Commit(ctx) if err != nil { return err } diff --git a/pkg/sql/client/client.go b/pkg/sql/client/client.go index 3ea774ed1..08c809fa7 100644 --- a/pkg/sql/client/client.go +++ b/pkg/sql/client/client.go @@ -18,6 +18,7 @@ import ( "encoding/json" "fmt" "io" + "sync" "github.com/kwilteam/kwil-db/pkg/log" "github.com/kwilteam/kwil-db/pkg/sql" @@ -31,6 +32,8 @@ type SqliteClient struct { // log is self-explanatory. log log.Logger + + writerMtx sync.Mutex } func NewSqliteStore(name string, opts ...SqliteOpts) (*SqliteClient, error) { @@ -71,6 +74,8 @@ func WrapSqliteConn(conn *sqlite.Connection, logger log.Logger) (*SqliteClient, // Execute executes a statement. func (s *SqliteClient) Execute(ctx context.Context, stmt string, args map[string]any) error { + s.writerMtx.Lock() + defer s.writerMtx.Unlock() return s.conn.Execute(stmt, args) } @@ -106,7 +111,8 @@ func (s *SqliteClient) Prepare(stmt string) (sql.Statement, error) { // It SHOULD be read-only, but there is nothing forcing it to be. use with caution // This should get deleted once we redo the engine func (w *SqliteClient) QueryUnsafe(ctx context.Context, query string, args map[string]any) ([]map[string]any, error) { - + w.writerMtx.Lock() + defer w.writerMtx.Unlock() stmt, err := w.Prepare(query) if err != nil { return nil, err @@ -172,6 +178,8 @@ func ResultsfromReader(reader io.Reader) ([]map[string]any, error) { } func (s *SqliteClient) CreateSession() (sql.Session, error) { + s.writerMtx.Lock() + defer s.writerMtx.Unlock() sess, err := s.conn.CreateSession() if err != nil { return nil, err @@ -183,17 +191,25 @@ func (s *SqliteClient) CreateSession() (sql.Session, error) { } func (s *SqliteClient) ApplyChangeset(reader io.Reader) error { + s.writerMtx.Lock() + defer s.writerMtx.Unlock() return s.conn.ApplyChangeset(reader) } func (s *SqliteClient) CheckpointWal() error { + s.writerMtx.Lock() + defer s.writerMtx.Unlock() return s.conn.CheckpointWal() } func (s *SqliteClient) DisableForeignKey() error { + s.writerMtx.Lock() + defer s.writerMtx.Unlock() return s.conn.DisableForeignKey() } func (s *SqliteClient) EnableForeignKey() error { + s.writerMtx.Lock() + defer s.writerMtx.Unlock() return s.conn.EnableForeignKey() } diff --git a/scripts/publish/dockerhub b/scripts/publish/dockerhub index e15ec7a0f..efa999c9b 100755 --- a/scripts/publish/dockerhub +++ b/scripts/publish/dockerhub @@ -7,4 +7,4 @@ set -eu echo Building kwild for multiarch and pushing to dockerhub, tag: ${TAG} -docker buildx build --platform linux/amd64,linux/arm64/v8 -t kwildb/kwil:${TAG} --push -f ./build/package/docker/kwild.dockerfile . \ No newline at end of file +docker buildx build --platform linux/amd64,linux/arm64/v8 -t kwildb/kwil:${TAG} --push -f ./build/package/docker/kwild.auto-init.dockerfile . \ No newline at end of file