Skip to content

Commit

Permalink
refactor: cleanup grpc context, slim service method
Browse files Browse the repository at this point in the history
  • Loading branch information
cif committed Oct 26, 2024
1 parent b631e27 commit 395cdc9
Show file tree
Hide file tree
Showing 7 changed files with 62 additions and 82 deletions.
4 changes: 1 addition & 3 deletions src/main/kotlin/com/stabledata/Application.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@ import com.stabledata.context.configureAuth
import com.stabledata.context.configureDocsRouting
import com.stabledata.endpoint.configureApplicationRouting
import com.stabledata.endpoint.configureChoresRouting
import com.stabledata.grpc.GrpcContextInterceptor
import com.stabledata.grpc.GrpcService
import com.stabledata.context.GrpcContextInterceptor
import com.stabledata.grpc.SchemaService
import io.grpc.netty.shaded.io.grpc.netty.NettyServerBuilder
import io.grpc.protobuf.services.ProtoReflectionService
Expand Down Expand Up @@ -34,7 +33,6 @@ fun main() {
.forPort(grpcPort)
.intercept(GrpcContextInterceptor())
.intercept(ExceptionHandlingInterceptor())
.addService(GrpcService())
.addService(SchemaService())
.addService(ProtoReflectionService.newInstance())
.build()
Expand Down
1 change: 0 additions & 1 deletion src/main/kotlin/com/stabledata/context/Envelope.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package com.stabledata.context

import com.stabledata.EventAlreadyProcessedException
import com.stabledata.dao.LogsTable
import com.stabledata.grpc.GrpcContextInterceptor
import com.stabledata.uuidString
import io.grpc.Context
import io.ktor.server.application.*
Expand Down
34 changes: 34 additions & 0 deletions src/main/kotlin/com/stabledata/context/GrpcContextInterceptor.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package com.stabledata.context

import io.grpc.*
import io.ktor.http.*

class GrpcContextInterceptor : ServerInterceptor {
override fun <ReqT, RespT> interceptCall(
call: ServerCall<ReqT, RespT>,
headers: Metadata?,
next: ServerCallHandler<ReqT, RespT>?
): ServerCall.Listener<ReqT> {

val tokenKey = Metadata.Key.of(HttpHeaders.Authorization, Metadata.ASCII_STRING_MARSHALLER)
val eventIdKey = Metadata.Key.of(StableEventIdHeader, Metadata.ASCII_STRING_MARSHALLER)
val eventCreatedKey = Metadata.Key.of(StableEventCreatedOnHeader, Metadata.ASCII_STRING_MARSHALLER)

val token: String? = headers?.get(tokenKey)
val eventId: String? = headers?.get(eventIdKey)
val eventCreatedAt: Long? = headers?.get(eventCreatedKey)?.toLong()

val context: Context = Context.current()
.withValue(tokenContext, token)
.withValue(eventIdContext, eventId)
.withValue(eventCreatedAtContext, eventCreatedAt)

return Contexts.interceptCall(context, call, headers, next)
}

companion object {
val tokenContext: Context.Key<Any> = Context.key(HttpHeaders.Authorization)
val eventIdContext: Context.Key<Any> = Context.key(StableEventIdHeader)
val eventCreatedAtContext: Context.Key<Any> = Context.key(StableEventCreatedOnHeader)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package com.stabledata.context

import com.stabledata.Operations
import com.stabledata.dao.LogEntryBuilder
import com.stabledata.grpc.GrpcContextInterceptor
import io.grpc.Context
import io.ktor.server.application.*
import io.ktor.util.pipeline.*
Expand Down
76 changes: 0 additions & 76 deletions src/main/kotlin/com/stabledata/grpc/GrpcService.kt

This file was deleted.

26 changes: 26 additions & 0 deletions src/main/kotlin/com/stabledata/grpc/SchemaService.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.stabledata.grpc

import com.stabledata.context.WriteRequestContext
import com.stabledata.model.Collection
import com.stabledata.model.toMessage
import com.stabledata.workload.schema.createCollectionWorkload
import io.grpc.stub.StreamObserver
import stable.LogEntry.LogEntryMessage
import stable.Schema
import stable.SchemaServiceGrpc


class SchemaService : SchemaServiceGrpc.SchemaServiceImplBase() {
override fun createCollection(
request: Schema.CollectionRequest,
responseObserver: StreamObserver<LogEntryMessage>
) {
val ctx = WriteRequestContext.fromGrpcContext {
Collection.fromMessage(request)
}
val result = createCollectionWorkload(ctx)
val response = result.toMessage()
responseObserver.onNext(response)
responseObserver.onCompleted()
}
}
2 changes: 1 addition & 1 deletion src/test/kotlin/com/stabledata/ApplicationTestHelpers.kt
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.stabledata

import com.stabledata.context.StableEventIdHeader
import com.stabledata.grpc.GrpcContextInterceptor
import com.stabledata.context.GrpcContextInterceptor
import io.grpc.*
import io.grpc.inprocess.InProcessChannelBuilder
import io.grpc.inprocess.InProcessServerBuilder
Expand Down

0 comments on commit 395cdc9

Please sign in to comment.