Skip to content

Commit

Permalink
Convert drive-kotlin to an agent plugin.
Browse files Browse the repository at this point in the history
    $ ftl serve ./drive-kotlin/src/main/kotlin/com/squareup/ftldemo
    ...

    $ ftl list
    com.squareup.ftldemo.notifyCustomer
    com.squareup.ftldemo.makePizza
    com.squareup.ftldemo.pay
    $ ftl call com.squareup.ftldemo.notifyCustomer '{"recipient": "Dhanji"}'
    {"message":"OK"}
  • Loading branch information
alecthomas committed Mar 14, 2023
1 parent 4cbdf3f commit 619dd2a
Show file tree
Hide file tree
Showing 12 changed files with 271 additions and 32 deletions.
8 changes: 2 additions & 6 deletions cmd/ftl/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package main

import (
"context"
"net"
"os"
"os/signal"
"syscall"
Expand Down Expand Up @@ -68,13 +67,10 @@ func main() {

func dialAgent(ctx context.Context) func() (ftlv1.AgentServiceClient, error) {
return func() (ftlv1.AgentServiceClient, error) {
conn, err := grpc.DialContext(ctx, "",
conn, err := grpc.DialContext(ctx, cli.Socket.String(),
// grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
conn, err := socket.Dial(ctx, cli.Socket)
return conn, errors.WithStack(err)
}))
grpc.WithContextDialer(socket.Dialer))
if err != nil {
return nil, errors.WithStack(err)
}
Expand Down
10 changes: 5 additions & 5 deletions common/gen/xyz/block/ftl/v1/ftl.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

32 changes: 24 additions & 8 deletions common/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/TBD54566975/ftl/common/exec"
ftlv1 "github.com/TBD54566975/ftl/common/gen/xyz/block/ftl/v1"
"github.com/TBD54566975/ftl/common/log"
"github.com/TBD54566975/ftl/common/socket"
)

// PingableClient is a gRPC client that can be pinged.
Expand Down Expand Up @@ -87,11 +88,17 @@ func Spawn[Client PingableClient](
return nil, nil, errors.WithStack(err)
}

// Find a free port.
addr, err := allocatePort()
if err != nil {
return nil, nil, errors.WithStack(err)
}

// Start the plugin process.
socket := filepath.Join(workingDir, filepath.Base(exe)+".sock")
logger.Info("Spawning plugin", "dir", dir, "exe", exe, "socket", socket)
pluginSocket := socket.Socket{Network: "tcp", Addr: addr.String()}
logger.Info("Spawning plugin", "dir", dir, "exe", exe, "socket", pluginSocket)
cmd := exec.Command(ctx, dir, exe)
cmd.Env = append(cmd.Env, "FTL_PLUGIN_SOCKET="+socket)
cmd.Env = append(cmd.Env, "FTL_PLUGIN_SOCKET="+pluginSocket.String())
cmd.Env = append(cmd.Env, "FTL_WORKING_DIR="+workingDir)
cmd.Env = append(cmd.Env, opts.envars...)
if err = cmd.Start(); err != nil {
Expand All @@ -116,8 +123,9 @@ func Spawn[Client PingableClient](
}

conn, err := grpc.DialContext(
ctx, "unix://"+socket,
ctx, pluginSocket.String(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(socket.Dialer),
)
if err != nil {
return nil, nil, errors.WithStack(err)
Expand Down Expand Up @@ -151,6 +159,15 @@ func Spawn[Client PingableClient](
return plugin, cmdCtx, nil
}

func allocatePort() (*net.TCPAddr, error) {
l, err := net.ListenTCP("tcp", &net.TCPAddr{IP: net.IPv4(127, 0, 0, 1)})
if err != nil {
return nil, errors.Wrap(err, "failed to allocate port")
}
_ = l.Close()
return l.Addr().(*net.TCPAddr), nil //nolint:forcetypeassert
}

func cleanup(logger *slog.Logger, pidFile string) error {
pidb, err := ioutil.ReadFile(pidFile)
if os.IsNotExist(err) {
Expand All @@ -170,8 +187,8 @@ func cleanup(logger *slog.Logger, pidFile string) error {
}

type serveCli struct {
LogConfig log.Config `embed:"" group:"Logging:"`
Socket string `help:"Socket to listen on." env:"FTL_PLUGIN_SOCKET" required:""`
LogConfig log.Config `embed:"" group:"Logging:"`
Socket socket.Socket `help:"Socket to listen on." env:"FTL_PLUGIN_SOCKET" required:""`
kong.Plugins
}

Expand Down Expand Up @@ -214,8 +231,7 @@ func Start[Impl any, Iface any, Config any](
svc, err := create(ctx, config)
kctx.FatalIfErrorf(err)

_ = os.Remove(cli.Socket)
l, err := (&net.ListenConfig{}).Listen(ctx, "unix", cli.Socket)
l, err := (&net.ListenConfig{}).Listen(ctx, cli.Socket.Network, cli.Socket.Addr)
kctx.FatalIfErrorf(err)
gs := grpc.NewServer(
grpc.ChainUnaryInterceptor(log.UnaryGRPCInterceptor(logger)),
Expand Down
10 changes: 2 additions & 8 deletions common/protos/xyz/block/ftl/v1/ftl.proto
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ syntax = "proto3";
package xyz.block.ftl.v1;

option go_package = "github.com/TBD54566975/ftl/common/gen/xyz/block/ftl/v1;ftlv1";
option java_multiple_files = true;

message PingRequest {}
message PingResponse {}
Expand Down Expand Up @@ -54,15 +55,8 @@ service AgentService {
// DriveService is the service that provides language-specific development and
// deployment functionality.
//
// The DriveService is responsible for hot reloading the ModuleService when code
// The DriveService is responsible for hot reloading the when code
// changes, and passing Verb calls through.
//
// Each implementation of DriveService:
//
// - MUST serve the gRPC service on a Unix socket at the path specified by the
// FTL_DRIVE_SOCKET environment variable.
// - Serve Verbs from the path specified by the FTL_DRIVE_ROOT environment
// variable.
service DriveService {
// Ping the Drive for readiness.
rpc Ping(PingRequest) returns (PingResponse);
Expand Down
14 changes: 14 additions & 0 deletions common/socket/socket.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package socket

import (
"context"
"fmt"
"net"
"net/url"
"os"
Expand All @@ -14,6 +15,10 @@ type Socket struct {
Addr string
}

func (s Socket) String() string {
return fmt.Sprintf("%s://%s", s.Network, s.Addr)
}

func (s *Socket) UnmarshalText(b []byte) error {
tmp, err := Parse(string(b))
if err != nil {
Expand All @@ -23,6 +28,15 @@ func (s *Socket) UnmarshalText(b []byte) error {
return nil
}

// Dialer is a convenience function.
func Dialer(ctx context.Context, addr string) (net.Conn, error) {
sock, err := Parse(addr)
if err != nil {
return nil, errors.WithStack(err)
}
return Dial(ctx, sock)
}

// Dial a Socket.
func Dial(ctx context.Context, s Socket) (net.Conn, error) {
conn, err := (&net.Dialer{}).DialContext(ctx, s.Network, s.Addr)
Expand Down
97 changes: 95 additions & 2 deletions drive-kotlin/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
<kotlin.code.style>official</kotlin.code.style>
<kotlin.compiler.jvmTarget>1.8</kotlin.compiler.jvmTarget>
<kotlin.compiler.incremental>true</kotlin.compiler.incremental>

<!-- gRPC -->
<grpc.version>1.53.0</grpc.version>
<grpc.kotlin.version>1.3.0</grpc.kotlin.version>
<protobuf.version>3.22.2</protobuf.version>
</properties>

<repositories>
Expand Down Expand Up @@ -64,8 +69,8 @@
<artifactId>maven-compiler-plugin</artifactId>
<version>3.11.0</version>
<configuration>
<source>9</source>
<target>9</target>
<source>16</source>
<target>16</target>
</configuration>
</plugin>
<plugin>
Expand All @@ -90,6 +95,62 @@
</execution>
</executions>
</plugin>

<!-- gRPC -->
<plugin>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.0</version> <!-- consider handling this version via properties as well -->
<executions>
<execution>
<phase>initialize</phase>
<goals>
<goal>detect</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<executions>
<execution>
<id>compile</id>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc.version}:exe:${os.detected.classifier}</pluginArtifact>
<protocPlugins>
<protocPlugin>
<id>grpc-kotlin</id>
<groupId>io.grpc</groupId>
<artifactId>protoc-gen-grpc-kotlin</artifactId>
<version>${grpc.kotlin.version}</version>
<classifier>jdk8</classifier>
<mainClass>io.grpc.kotlin.generator.GeneratorRunner</mainClass>
</protocPlugin>
</protocPlugins>
</configuration>
</execution>
<execution>
<id>compile-kt</id>
<goals>
<goal>compile-custom</goal>
</goals>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protobuf.version}:exe:${os.detected.classifier}</protocArtifact>
<outputDirectory>${project.build.directory}/generated-sources/protobuf/kotlin</outputDirectory>
<pluginId>kotlin</pluginId>
</configuration>
</execution>
</executions>
</plugin>

</plugins>
</build>

Expand Down Expand Up @@ -139,6 +200,38 @@
<version>4.8.157</version>
</dependency>

<!-- gRPC -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-kotlin-stub</artifactId>
<version>${grpc.kotlin.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-kotlin</artifactId>
<version>${protobuf.version}</version>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${grpc.version}</version>
</dependency>

<!-- test deps -->
<dependency>
<groupId>org.jetbrains.kotlin</groupId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package xyz.block.ftl.control

import com.google.gson.Gson
import com.google.protobuf.ByteString
import io.grpc.Status
import io.grpc.netty.NettyServerBuilder
import xyz.block.ftl.Context
import xyz.block.ftl.drive.verb.VerbDeck
import xyz.block.ftl.v1.*
import java.net.SocketAddress


class ControlChannelServer(val deck: VerbDeck) : DriveServiceGrpcKt.DriveServiceCoroutineImplBase() {
private val gson = Gson()

override suspend fun ping(request: PingRequest): PingResponse {
return PingResponse.getDefaultInstance()
}

override suspend fun call(request: CallRequest): CallResponse {
val cassette = deck.lookupFullyQualifiedName(request.verb) ?: throw Status.NOT_FOUND.asException()
val req = gson.fromJson<Any>(request.body.toStringUtf8(), cassette.argumentType.java)
var resp: Any
try {
resp = cassette.dispatch(Context(http = null), req) // TODO: do something with Context
} catch (e: Exception) {
return CallResponse.newBuilder()
.setError(
CallResponse.Error.newBuilder()
.setMessage(e.message ?: "no error message")
.build()
)
.build()
}
return CallResponse.newBuilder()
.setBody(ByteString.copyFromUtf8(gson.toJson(resp)))
.build()
}

override suspend fun list(request: ListRequest): ListResponse {
return ListResponse.newBuilder()
.addAllVerbs(deck.list().map { deck.fullyQualifiedName(it) })
.build()
}

override suspend fun fileChange(request: FileChangeRequest): FileChangeResponse {
return FileChangeResponse.getDefaultInstance()
}
}

/**
* Start DriveService on the given socket.
*/
fun startControlChannelServer(socket: SocketAddress, controlChannel: ControlChannelServer) {
val server = NettyServerBuilder
.forAddress(socket)
.addService(controlChannel)
.build()
// TODO: Terminate the process if this fails to startup.
server.start()
}
24 changes: 24 additions & 0 deletions drive-kotlin/src/main/kotlin/xyz/block/ftl/control/Socket.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package xyz.block.ftl.control

import java.net.InetSocketAddress
import java.net.SocketAddress
import java.net.UnixDomainSocketAddress

/**
* Parses a socket URI in the form unix://PATH or tcp://HOST:PORT
*/
fun parseSocket(socket: String): SocketAddress {
val schema = socket.split("://", limit = 2)
return when (schema[0]) {
// TODO(aat) Remove this.
// Unix sockets are effectively unusable in gRPC, unfortunately, as
// they require either kqueue or epoll native implementations
"unix" -> UnixDomainSocketAddress.of(schema[1])
"tcp" -> {
val hostPort = schema[1].split(":", limit = 2)
return InetSocketAddress(hostPort[0], hostPort[1].toInt())
}

else -> throw RuntimeException("unsupported socket type ${schema[0]}")
}
}
Loading

0 comments on commit 619dd2a

Please sign in to comment.