From 35d2773ed078853faffa2ce5fd51bea03cd037dd Mon Sep 17 00:00:00 2001 From: Elizabeth Worstell Date: Mon, 8 Jan 2024 19:01:40 -0800 Subject: [PATCH] feat: provide Kotlin API for db connections --- examples/kotlin/pom.xml | 5 ++ integration/integration_test.go | 78 ++++++++++++++++++- integration/testdata/database/DbTest.kt | 35 +++++++++ .../src/main/kotlin/xyz/block/ftl/Database.kt | 63 +++++++++++++++ kotlin-runtime/scaffolding/pom.xml | 5 ++ protos/xyz/block/ftl/v1/schema/schema.proto | 48 ++++++------ 6 files changed, 208 insertions(+), 26 deletions(-) create mode 100644 integration/testdata/database/DbTest.kt create mode 100644 kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/Database.kt diff --git a/examples/kotlin/pom.xml b/examples/kotlin/pom.xml index 160a69efbd..6f821fd795 100644 --- a/examples/kotlin/pom.xml +++ b/examples/kotlin/pom.xml @@ -34,6 +34,11 @@ ftl-runtime ${ftl.version} + + org.postgresql + postgresql + 42.7.1 + diff --git a/integration/integration_test.go b/integration/integration_test.go index c500897bf6..aaad9cb66d 100644 --- a/integration/integration_test.go +++ b/integration/integration_test.go @@ -5,7 +5,9 @@ package integration import ( "bytes" "context" + "database/sql" "encoding/json" + "errors" "fmt" "os" "path/filepath" @@ -14,10 +16,10 @@ import ( "testing" "time" - "errors" - "connectrpc.com/connect" "github.com/alecthomas/assert/v2" + _ "github.com/amacneil/dbmate/v2/pkg/driver/postgres" + _ "github.com/jackc/pgx/v5/stdlib" // SQL driver "golang.org/x/exp/maps" "github.com/TBD54566975/ftl/backend/common/exec" @@ -26,6 +28,7 @@ import ( ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1" "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/ftlv1connect" schemapb "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/schema" + "github.com/TBD54566975/scaffolder" ) const integrationTestTimeout = time.Second * 60 @@ -90,6 +93,12 @@ func TestIntegration(t *testing.T) { assert.True(t, regexp.MustCompile(`^Hello, Alice!`).MatchString(message), "%q does not match %q", message, `^Hello, Alice!`) }), }}, + {name: "UseKotlinDbConn", assertions: assertions{ + setUpKotlinModuleDb(filepath.Join(modulesDir, "ftl-module-echo3")), + run(".", "ftl", "deploy", filepath.Join(modulesDir, "ftl-module-echo3")), + call("dbtest", "create", obj{"data": "Hello"}, func(t testing.TB, resp obj) {}), + validateKotlinModuleDb(), + }}, {name: "SchemaGenerateJS", assertions: assertions{ run(".", "ftl", "schema", "generate", "integration/testdata/schema-generate", "build/schema-generate"), filesExist(file{"build/schema-generate/test.txt", "olleh"}), @@ -221,6 +230,71 @@ func call[Resp any](module, verb string, req obj, onResponse func(t testing.TB, } } +func setUpKotlinModuleDb(dir string) assertion { + os.Setenv("FTL_POSTGRES_DSN_dbtest_testdb", "postgres://postgres:secret@localhost:54320/testdb?sslmode=disable") + return func(t testing.TB, ic itContext) error { + db, err := sql.Open("pgx", "postgres://postgres:secret@localhost:54320/ftl?sslmode=disable") + assert.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + if err != nil { + t.Fatal(err) + } + }) + + err = db.Ping() + assert.NoError(t, err) + + var exists bool + query := `SELECT EXISTS(SELECT datname FROM pg_catalog.pg_database WHERE datname = $1);` + err = db.QueryRow(query, "testdb").Scan(&exists) + assert.NoError(t, err) + if !exists { + db.Exec("CREATE DATABASE testdb;") + } + + // add DbTest.kt with a new verb that uses the db + err = scaffolder.Scaffold( + filepath.Join(ic.rootDir, "integration/testdata/database"), + filepath.Join(dir, "src/main/kotlin/ftl/dbtest"), + ic, + ) + assert.NoError(t, err) + + return nil + } +} + +func validateKotlinModuleDb() assertion { + return func(t testing.TB, ic itContext) error { + db, err := sql.Open("pgx", "postgres://postgres:secret@localhost:54320/testdb?sslmode=disable") + assert.NoError(t, err) + t.Cleanup(func() { + err := db.Close() + if err != nil { + t.Fatal(err) + } + }) + + err = db.Ping() + assert.NoError(t, err) + + rows, err := db.Query("SELECT data FROM requests") + assert.NoError(t, err) + + for rows.Next() { + var data string + err := rows.Scan(&data) + assert.NoError(t, err) + if data == "Hello" { + return nil + } + } + + return errors.New("data not found") + } +} + type itContext struct { context.Context tmpDir string diff --git a/integration/testdata/database/DbTest.kt b/integration/testdata/database/DbTest.kt new file mode 100644 index 0000000000..cc074307e5 --- /dev/null +++ b/integration/testdata/database/DbTest.kt @@ -0,0 +1,35 @@ +package ftl.dbtest + +import xyz.block.ftl.Context +import xyz.block.ftl.Verb +import xyz.block.ftl.Database + +data class DbRequest(val data: String?) +data class DbResponse(val message: String? = "ok") + +val db = Database("testdb") + +class DbTest { + @Verb + fun create(context: Context, req: DbRequest): DbResponse { + persistRequest(req) + return DbResponse() + } + + fun persistRequest(req: DbRequest) { + db.conn { + it.prepareStatement( + """ + CREATE TABLE IF NOT EXISTS requests + ( + data TEXT, + created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'), + updated_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc') + ); + """ + ).execute() + it.prepareStatement("INSERT INTO requests (data) VALUES ('${req.data}');") + .execute() + } + } +} diff --git a/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/Database.kt b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/Database.kt new file mode 100644 index 0000000000..185272d55b --- /dev/null +++ b/kotlin-runtime/ftl-runtime/src/main/kotlin/xyz/block/ftl/Database.kt @@ -0,0 +1,63 @@ +package xyz.block.ftl + +import xyz.block.ftl.logging.Logging +import java.net.URI +import java.sql.Connection +import java.sql.DriverManager + +private const val FTL_DSN_VAR_PREFIX = "FTL_POSTGRES_DSN" + +/** + * `Database` is a simple wrapper around the JDBC driver manager that provides a connection to the database specified + * by the FTL_POSTGRES_DSN__ environment variable. + */ +class Database(private val name: String) { + private val logger = Logging.logger(Database::class) + private val moduleName: String = Thread.currentThread().stackTrace[2]?.let { + val components = it.className.split(".") + require(components.first() == "ftl") { + "Expected Database to be declared in package ftl., but was $it" + } + + return@let components[1] + } ?: throw IllegalStateException("Could not determine module name from Database declaration") + + fun conn(block: (c: Connection) -> R): R { + return try { + val envVar = listOf(FTL_DSN_VAR_PREFIX, moduleName, name).joinToString("_") + val dsn = System.getenv(envVar) + require(dsn != null) { "$envVar environment variable not set" } + + DriverManager.getConnection(dsnToJdbcUrl(dsn)).use { + block(it) + } + } catch (e: Exception) { + logger.error("Could not connect to database", e) + throw e + } + } + + private fun dsnToJdbcUrl(dsn: String): String { + val uri = URI(dsn) + val scheme = uri.scheme ?: throw IllegalArgumentException("Missing scheme in DSN.") + val userInfo = uri.userInfo?.split(":") ?: throw IllegalArgumentException("Missing userInfo in DSN.") + val user = userInfo.firstOrNull() ?: throw IllegalArgumentException("Missing user in userInfo.") + val password = if (userInfo.size > 1) userInfo[1] else "" + val host = uri.host ?: throw IllegalArgumentException("Missing host in DSN.") + val port = if (uri.port != -1) uri.port.toString() else throw IllegalArgumentException("Missing port in DSN.") + val database = uri.path.trimStart('/') + val parameters = uri.query?.replace("&", "?") ?: "" + + val jdbcScheme = when (scheme) { + "postgres" -> "jdbc:postgresql" + else -> throw IllegalArgumentException("Unsupported scheme: $scheme") + } + + val jdbcUrl = "$jdbcScheme://$host:$port/$database?$parameters" + return if (user.isNotBlank() && password.isNotBlank()) { + "$jdbcUrl&user=$user&password=$password" + } else { + jdbcUrl + } + } +} diff --git a/kotlin-runtime/scaffolding/pom.xml b/kotlin-runtime/scaffolding/pom.xml index ea1f5fe2cb..a5a0a61720 100644 --- a/kotlin-runtime/scaffolding/pom.xml +++ b/kotlin-runtime/scaffolding/pom.xml @@ -35,6 +35,11 @@ ftl-runtime ${ftl.version} + + org.postgresql + postgresql + 42.7.1 + diff --git a/protos/xyz/block/ftl/v1/schema/schema.proto b/protos/xyz/block/ftl/v1/schema/schema.proto index c3a290dc49..9a1934d474 100644 --- a/protos/xyz/block/ftl/v1/schema/schema.proto +++ b/protos/xyz/block/ftl/v1/schema/schema.proto @@ -8,6 +8,30 @@ import "xyz/block/ftl/v1/schema/runtime.proto"; option go_package = "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/schema;schemapb"; option java_multiple_files = true; +message DataRef { + optional Position pos = 1; + string name = 2; + string module = 3; +} + +message SinkRef { + optional Position pos = 1; + string name = 2; + string module = 3; +} + +message SourceRef { + optional Position pos = 1; + string name = 2; + string module = 3; +} + +message VerbRef { + optional Position pos = 1; + string name = 2; + string module = 3; +} + message Array { optional Position pos = 1; Type element = 2; @@ -114,30 +138,6 @@ message Position { int64 column = 3; } -message DataRef { - optional Position pos = 1; - string name = 2; - string module = 3; -} - -message SinkRef { - optional Position pos = 1; - string name = 2; - string module = 3; -} - -message SourceRef { - optional Position pos = 1; - string name = 2; - string module = 3; -} - -message VerbRef { - optional Position pos = 1; - string name = 2; - string module = 3; -} - message Schema { optional Position pos = 1; repeated Module modules = 2;