Skip to content

Commit

Permalink
refactor: fix RPC_REQUEST_STANDARD_NAME proto issues
Browse files Browse the repository at this point in the history
  • Loading branch information
alecthomas committed Nov 29, 2024
1 parent 657e9ef commit 8acee20
Show file tree
Hide file tree
Showing 44 changed files with 2,007 additions and 2,026 deletions.
122 changes: 49 additions & 73 deletions backend/controller/console/console.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"time"

"connectrpc.com/connect"
"github.com/alecthomas/types/optional"
"google.golang.org/protobuf/types/known/durationpb"
"google.golang.org/protobuf/types/known/timestamppb"

"github.com/alecthomas/types/optional"

"github.com/TBD54566975/ftl/backend/controller/admin"
"github.com/TBD54566975/ftl/backend/controller/dal"
dalmodel "github.com/TBD54566975/ftl/backend/controller/dal/model"
Expand Down Expand Up @@ -471,7 +472,7 @@ func addRefToSetMap(m map[schema.RefKey]map[schema.RefKey]bool, key schema.RefKe
m[key][value.ToRefKey()] = true
}

func (c *ConsoleService) GetEvents(ctx context.Context, req *connect.Request[pbconsole.EventsQuery]) (*connect.Response[pbconsole.GetEventsResponse], error) {
func (c *ConsoleService) GetEvents(ctx context.Context, req *connect.Request[pbconsole.GetEventsRequest]) (*connect.Response[pbconsole.GetEventsResponse], error) {
query, err := eventsQueryProtoToDAL(req.Msg)
if err != nil {
return nil, err
Expand Down Expand Up @@ -554,113 +555,88 @@ func (c *ConsoleService) StreamEvents(ctx context.Context, req *connect.Request[
}
}

func eventsQueryProtoToDAL(pb *pbconsole.EventsQuery) ([]timeline.TimelineFilter, error) {
var query []timeline.TimelineFilter
func eventsQueryProtoToDAL(query *pbconsole.GetEventsRequest) ([]timeline.TimelineFilter, error) {
var result []timeline.TimelineFilter

if pb.Order == pbconsole.EventsQuery_ORDER_DESC {
query = append(query, timeline.FilterDescending())
if query.Order == pbconsole.GetEventsRequest_ORDER_DESC {
result = append(result, timeline.FilterDescending())
}

for _, filter := range pb.Filters {
switch filter := filter.Filter.(type) {
case *pbconsole.EventsQuery_Filter_Deployments:
deploymentKeys := make([]model.DeploymentKey, 0, len(filter.Deployments.Deployments))
for _, deployment := range filter.Deployments.Deployments {
for _, filter := range query.Filters {
switch f := filter.Filter.(type) {
case *pbconsole.GetEventsRequest_Filter_Deployments:
deploymentKeys := make([]model.DeploymentKey, 0, len(f.Deployments.Deployments))
for _, deployment := range f.Deployments.Deployments {
deploymentKey, err := model.ParseDeploymentKey(deployment)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
deploymentKeys = append(deploymentKeys, deploymentKey)
}
query = append(query, timeline.FilterDeployments(deploymentKeys...))
result = append(result, timeline.FilterDeployments(deploymentKeys...))

case *pbconsole.EventsQuery_Filter_Requests:
requestKeys := make([]model.RequestKey, 0, len(filter.Requests.Requests))
for _, request := range filter.Requests.Requests {
case *pbconsole.GetEventsRequest_Filter_Requests:
requestKeys := make([]model.RequestKey, 0, len(f.Requests.Requests))
for _, request := range f.Requests.Requests {
requestKey, err := model.ParseRequestKey(request)
if err != nil {
return nil, connect.NewError(connect.CodeInvalidArgument, err)
}
requestKeys = append(requestKeys, requestKey)
}
query = append(query, timeline.FilterRequests(requestKeys...))

case *pbconsole.EventsQuery_Filter_EventTypes:
eventTypes := make([]timeline.EventType, 0, len(filter.EventTypes.EventTypes))
for _, eventType := range filter.EventTypes.EventTypes {
switch eventType {
case pbconsole.EventType_EVENT_TYPE_CALL:
eventTypes = append(eventTypes, timeline.EventTypeCall)
case pbconsole.EventType_EVENT_TYPE_LOG:
eventTypes = append(eventTypes, timeline.EventTypeLog)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_CREATED:
eventTypes = append(eventTypes, timeline.EventTypeDeploymentCreated)
case pbconsole.EventType_EVENT_TYPE_DEPLOYMENT_UPDATED:
eventTypes = append(eventTypes, timeline.EventTypeDeploymentUpdated)
case pbconsole.EventType_EVENT_TYPE_INGRESS:
eventTypes = append(eventTypes, timeline.EventTypeIngress)
case pbconsole.EventType_EVENT_TYPE_CRON_SCHEDULED:
eventTypes = append(eventTypes, timeline.EventTypeCronScheduled)
case pbconsole.EventType_EVENT_TYPE_ASYNC_EXECUTE:
eventTypes = append(eventTypes, timeline.EventTypeAsyncExecute)
case pbconsole.EventType_EVENT_TYPE_PUBSUB_PUBLISH:
eventTypes = append(eventTypes, timeline.EventTypePubSubPublish)
case pbconsole.EventType_EVENT_TYPE_PUBSUB_CONSUME:
eventTypes = append(eventTypes, timeline.EventTypePubSubConsume)
default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown event type %v", eventType))
}
result = append(result, timeline.FilterRequests(requestKeys...))

case *pbconsole.GetEventsRequest_Filter_EventTypes:
var types []timeline.EventType
for _, t := range f.EventTypes.EventTypes {
types = append(types, timeline.EventType(t))
}
query = append(query, timeline.FilterTypes(eventTypes...))
result = append(result, timeline.FilterTypes(types...))

case *pbconsole.EventsQuery_Filter_LogLevel:
level := log.Level(filter.LogLevel.LogLevel)
case *pbconsole.GetEventsRequest_Filter_LogLevel:
level := log.Level(f.LogLevel.LogLevel)
if level < log.Trace || level > log.Error {
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown log level %v", filter.LogLevel.LogLevel))
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown log level %v", f.LogLevel.LogLevel))
}
query = append(query, timeline.FilterLogLevel(level))
result = append(result, timeline.FilterLogLevel(level))

case *pbconsole.EventsQuery_Filter_Time:
case *pbconsole.GetEventsRequest_Filter_Time:
var newerThan, olderThan time.Time
if filter.Time.NewerThan != nil {
newerThan = filter.Time.NewerThan.AsTime()
if f.Time.NewerThan != nil {
newerThan = f.Time.NewerThan.AsTime()
}
if filter.Time.OlderThan != nil {
olderThan = filter.Time.OlderThan.AsTime()
if f.Time.OlderThan != nil {
olderThan = f.Time.OlderThan.AsTime()
}
query = append(query, timeline.FilterTimeRange(olderThan, newerThan))
result = append(result, timeline.FilterTimeRange(olderThan, newerThan))

case *pbconsole.EventsQuery_Filter_Id:
case *pbconsole.GetEventsRequest_Filter_Id:
var lowerThan, higherThan int64
if filter.Id.LowerThan != nil {
lowerThan = *filter.Id.LowerThan
if f.Id.LowerThan != nil {
lowerThan = *f.Id.LowerThan
}
if filter.Id.HigherThan != nil {
higherThan = *filter.Id.HigherThan
if f.Id.HigherThan != nil {
higherThan = *f.Id.HigherThan
}
query = append(query, timeline.FilterIDRange(lowerThan, higherThan))
case *pbconsole.EventsQuery_Filter_Call:
result = append(result, timeline.FilterIDRange(lowerThan, higherThan))

case *pbconsole.GetEventsRequest_Filter_Call:
var sourceModule optional.Option[string]
if filter.Call.SourceModule != nil {
sourceModule = optional.Some(*filter.Call.SourceModule)
if f.Call.SourceModule != nil {
sourceModule = optional.Some(*f.Call.SourceModule)
}
var destVerb optional.Option[string]
if filter.Call.DestVerb != nil {
destVerb = optional.Some(*filter.Call.DestVerb)
if f.Call.DestVerb != nil {
destVerb = optional.Some(*f.Call.DestVerb)
}
query = append(query, timeline.FilterCall(sourceModule, filter.Call.DestModule, destVerb))
case *pbconsole.EventsQuery_Filter_Module:
var verb optional.Option[string]
if filter.Module.Verb != nil {
verb = optional.Some(*filter.Module.Verb)
}
query = append(query, timeline.FilterModule(filter.Module.Module, verb))
result = append(result, timeline.FilterCall(sourceModule, f.Call.DestModule, destVerb))

default:
return nil, connect.NewError(connect.CodeInvalidArgument, fmt.Errorf("unknown filter %T", filter))
return nil, fmt.Errorf("unknown filter type %T", f)
}
}
return query, nil

return result, nil
}

func eventDALToProto(event timeline.Event) *pbconsole.Event {
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -693,7 +693,7 @@ func (s *Service) Ping(ctx context.Context, req *connect.Request[ftlv1.PingReque
}

// GetModuleContext retrieves config, secrets and DSNs for a module.
func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.ModuleContextRequest], resp *connect.ServerStream[ftlv1.GetModuleContextResponse]) error {
func (s *Service) GetModuleContext(ctx context.Context, req *connect.Request[ftlv1.GetModuleContextRequest], resp *connect.ServerStream[ftlv1.GetModuleContextResponse]) error {
name := req.Msg.Module

// Initialize checksum to -1; a zero checksum does occur when the context contains no settings
Expand Down
2 changes: 1 addition & 1 deletion backend/controller/encryption/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestEncryptionForLogs(t *testing.T) {
// confirm that we can read an event for that call
func(t testing.TB, ic in.TestContext) {
in.Infof("Read Logs")
resp, err := ic.Console.GetEvents(ic.Context, connect.NewRequest(&pbconsole.EventsQuery{
resp, err := ic.Console.GetEvents(ic.Context, connect.NewRequest(&pbconsole.GetEventsRequest{
Limit: 10,
}))
assert.NoError(t, err, "could not get events")
Expand Down
1 change: 0 additions & 1 deletion backend/protos/buf.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,5 @@ lint:
except:
- RPC_REQUEST_RESPONSE_UNIQUE
# Remove these exceptions once they're fixed
- RPC_REQUEST_STANDARD_NAME
- ENUM_ZERO_VALUE_SUFFIX
- PACKAGE_VERSION_SUFFIX
Loading

0 comments on commit 8acee20

Please sign in to comment.