Skip to content

Commit

Permalink
feat: provide Kotlin API for db connections
Browse files Browse the repository at this point in the history
  • Loading branch information
worstell committed Jan 12, 2024
1 parent f9853f7 commit 35d2773
Show file tree
Hide file tree
Showing 6 changed files with 208 additions and 26 deletions.
5 changes: 5 additions & 0 deletions examples/kotlin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@
<artifactId>ftl-runtime</artifactId>
<version>${ftl.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.1</version>
</dependency>
</dependencies>

<build>
Expand Down
78 changes: 76 additions & 2 deletions integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ package integration
import (
"bytes"
"context"
"database/sql"
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
Expand All @@ -14,10 +16,10 @@ import (
"testing"
"time"

"errors"

"connectrpc.com/connect"
"github.com/alecthomas/assert/v2"
_ "github.com/amacneil/dbmate/v2/pkg/driver/postgres"
_ "github.com/jackc/pgx/v5/stdlib" // SQL driver
"golang.org/x/exp/maps"

"github.com/TBD54566975/ftl/backend/common/exec"
Expand All @@ -26,6 +28,7 @@ import (
ftlv1 "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/ftlv1connect"
schemapb "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/schema"
"github.com/TBD54566975/scaffolder"
)

const integrationTestTimeout = time.Second * 60
Expand Down Expand Up @@ -90,6 +93,12 @@ func TestIntegration(t *testing.T) {
assert.True(t, regexp.MustCompile(`^Hello, Alice!`).MatchString(message), "%q does not match %q", message, `^Hello, Alice!`)
}),
}},
{name: "UseKotlinDbConn", assertions: assertions{
setUpKotlinModuleDb(filepath.Join(modulesDir, "ftl-module-echo3")),
run(".", "ftl", "deploy", filepath.Join(modulesDir, "ftl-module-echo3")),
call("dbtest", "create", obj{"data": "Hello"}, func(t testing.TB, resp obj) {}),
validateKotlinModuleDb(),
}},
{name: "SchemaGenerateJS", assertions: assertions{
run(".", "ftl", "schema", "generate", "integration/testdata/schema-generate", "build/schema-generate"),
filesExist(file{"build/schema-generate/test.txt", "olleh"}),
Expand Down Expand Up @@ -221,6 +230,71 @@ func call[Resp any](module, verb string, req obj, onResponse func(t testing.TB,
}
}

func setUpKotlinModuleDb(dir string) assertion {
os.Setenv("FTL_POSTGRES_DSN_dbtest_testdb", "postgres://postgres:secret@localhost:54320/testdb?sslmode=disable")
return func(t testing.TB, ic itContext) error {
db, err := sql.Open("pgx", "postgres://postgres:secret@localhost:54320/ftl?sslmode=disable")
assert.NoError(t, err)
t.Cleanup(func() {
err := db.Close()
if err != nil {
t.Fatal(err)
}
})

err = db.Ping()
assert.NoError(t, err)

var exists bool
query := `SELECT EXISTS(SELECT datname FROM pg_catalog.pg_database WHERE datname = $1);`
err = db.QueryRow(query, "testdb").Scan(&exists)
assert.NoError(t, err)
if !exists {
db.Exec("CREATE DATABASE testdb;")
}

// add DbTest.kt with a new verb that uses the db
err = scaffolder.Scaffold(
filepath.Join(ic.rootDir, "integration/testdata/database"),
filepath.Join(dir, "src/main/kotlin/ftl/dbtest"),
ic,
)
assert.NoError(t, err)

return nil
}
}

func validateKotlinModuleDb() assertion {
return func(t testing.TB, ic itContext) error {
db, err := sql.Open("pgx", "postgres://postgres:secret@localhost:54320/testdb?sslmode=disable")
assert.NoError(t, err)
t.Cleanup(func() {
err := db.Close()
if err != nil {
t.Fatal(err)
}
})

err = db.Ping()
assert.NoError(t, err)

rows, err := db.Query("SELECT data FROM requests")
assert.NoError(t, err)

for rows.Next() {
var data string
err := rows.Scan(&data)
assert.NoError(t, err)
if data == "Hello" {
return nil
}
}

return errors.New("data not found")
}
}

type itContext struct {
context.Context
tmpDir string
Expand Down
35 changes: 35 additions & 0 deletions integration/testdata/database/DbTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package ftl.dbtest

import xyz.block.ftl.Context
import xyz.block.ftl.Verb
import xyz.block.ftl.Database

data class DbRequest(val data: String?)
data class DbResponse(val message: String? = "ok")

val db = Database("testdb")

class DbTest {
@Verb
fun create(context: Context, req: DbRequest): DbResponse {
persistRequest(req)
return DbResponse()
}

fun persistRequest(req: DbRequest) {
db.conn {
it.prepareStatement(
"""
CREATE TABLE IF NOT EXISTS requests
(
data TEXT,
created_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc'),
updated_at TIMESTAMPTZ NOT NULL DEFAULT (NOW() AT TIME ZONE 'utc')
);
"""
).execute()
it.prepareStatement("INSERT INTO requests (data) VALUES ('${req.data}');")
.execute()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package xyz.block.ftl

import xyz.block.ftl.logging.Logging
import java.net.URI
import java.sql.Connection
import java.sql.DriverManager

private const val FTL_DSN_VAR_PREFIX = "FTL_POSTGRES_DSN"

/**
* `Database` is a simple wrapper around the JDBC driver manager that provides a connection to the database specified
* by the FTL_POSTGRES_DSN_<moduleName>_<dbName> environment variable.
*/
class Database(private val name: String) {
private val logger = Logging.logger(Database::class)
private val moduleName: String = Thread.currentThread().stackTrace[2]?.let {
val components = it.className.split(".")
require(components.first() == "ftl") {
"Expected Database to be declared in package ftl.<module>, but was $it"
}

return@let components[1]
} ?: throw IllegalStateException("Could not determine module name from Database declaration")

fun <R> conn(block: (c: Connection) -> R): R {
return try {
val envVar = listOf(FTL_DSN_VAR_PREFIX, moduleName, name).joinToString("_")
val dsn = System.getenv(envVar)
require(dsn != null) { "$envVar environment variable not set" }

DriverManager.getConnection(dsnToJdbcUrl(dsn)).use {
block(it)
}
} catch (e: Exception) {
logger.error("Could not connect to database", e)
throw e
}
}

private fun dsnToJdbcUrl(dsn: String): String {
val uri = URI(dsn)
val scheme = uri.scheme ?: throw IllegalArgumentException("Missing scheme in DSN.")
val userInfo = uri.userInfo?.split(":") ?: throw IllegalArgumentException("Missing userInfo in DSN.")
val user = userInfo.firstOrNull() ?: throw IllegalArgumentException("Missing user in userInfo.")
val password = if (userInfo.size > 1) userInfo[1] else ""
val host = uri.host ?: throw IllegalArgumentException("Missing host in DSN.")
val port = if (uri.port != -1) uri.port.toString() else throw IllegalArgumentException("Missing port in DSN.")
val database = uri.path.trimStart('/')
val parameters = uri.query?.replace("&", "?") ?: ""

val jdbcScheme = when (scheme) {
"postgres" -> "jdbc:postgresql"
else -> throw IllegalArgumentException("Unsupported scheme: $scheme")
}

val jdbcUrl = "$jdbcScheme://$host:$port/$database?$parameters"
return if (user.isNotBlank() && password.isNotBlank()) {
"$jdbcUrl&user=$user&password=$password"
} else {
jdbcUrl
}
}
}
5 changes: 5 additions & 0 deletions kotlin-runtime/scaffolding/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,11 @@
<artifactId>ftl-runtime</artifactId>
<version>${ftl.version}</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.7.1</version>
</dependency>
</dependencies>

<build>
Expand Down
48 changes: 24 additions & 24 deletions protos/xyz/block/ftl/v1/schema/schema.proto
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,30 @@ import "xyz/block/ftl/v1/schema/runtime.proto";
option go_package = "github.com/TBD54566975/ftl/protos/xyz/block/ftl/v1/schema;schemapb";
option java_multiple_files = true;

message DataRef {
optional Position pos = 1;
string name = 2;
string module = 3;
}

message SinkRef {
optional Position pos = 1;
string name = 2;
string module = 3;
}

message SourceRef {
optional Position pos = 1;
string name = 2;
string module = 3;
}

message VerbRef {
optional Position pos = 1;
string name = 2;
string module = 3;
}

message Array {
optional Position pos = 1;
Type element = 2;
Expand Down Expand Up @@ -114,30 +138,6 @@ message Position {
int64 column = 3;
}

message DataRef {
optional Position pos = 1;
string name = 2;
string module = 3;
}

message SinkRef {
optional Position pos = 1;
string name = 2;
string module = 3;
}

message SourceRef {
optional Position pos = 1;
string name = 2;
string module = 3;
}

message VerbRef {
optional Position pos = 1;
string name = 2;
string module = 3;
}

message Schema {
optional Position pos = 1;
repeated Module modules = 2;
Expand Down

0 comments on commit 35d2773

Please sign in to comment.