Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Commit

Permalink
Use jsonpb to marshal CloudEvents Data (#424)
Browse files Browse the repository at this point in the history
* Use jsonpb to marshal CloudEvents Data

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* update unit tests

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* fix unit tests

Signed-off-by: Haytham Abuelfutuh <[email protected]>

* lint

Signed-off-by: Haytham Abuelfutuh <[email protected]>
  • Loading branch information
EngHabu authored May 31, 2022
1 parent b551784 commit a330c37
Show file tree
Hide file tree
Showing 8 changed files with 113 additions and 72 deletions.
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package implementations

import (
"bytes"
"context"
"fmt"
"reflect"
"time"

"github.com/golang/protobuf/jsonpb"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"

"github.com/flyteorg/flyteadmin/pkg/async/notifications/implementations"
Expand Down Expand Up @@ -74,7 +77,18 @@ func (p *Publisher) Publish(ctx context.Context, notificationType string, msg pr
event.SetTime(eventTime)
event.SetExtension(jsonSchemaURLKey, jsonSchemaURL)

if err := event.SetData(cloudevents.ApplicationJSON, &msg); err != nil {
// Explicitly jsonpb marshal the proto. Otherwise, event.SetData will use json.Marshal which doesn't work well
// with proto oneof fields.
marshaler := jsonpb.Marshaler{}
buf := bytes.NewBuffer([]byte{})
err := marshaler.Marshal(buf, msg)
if err != nil {
p.systemMetrics.PublishError.Inc()
logger.Errorf(ctx, "Failed to jsonpb marshal [%v] with error: %v", msg, err)
return fmt.Errorf("failed to jsonpb marshal [%v] with error: %w", msg, err)
}

if err := event.SetData(cloudevents.ApplicationJSON, buf.Bytes()); err != nil {
p.systemMetrics.PublishError.Inc()
logger.Errorf(ctx, "Failed to encode message [%v] with error: %v", msg, err)
return err
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package implementations

import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"
"time"

"github.com/golang/protobuf/jsonpb"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
Expand Down Expand Up @@ -174,9 +175,9 @@ func TestNewCloudEventsPublisher_EventTypes(t *testing.T) {
assert.Equal(t, cloudEvent.Source(), cloudEventSource)
assert.Equal(t, cloudEvent.Extensions(), map[string]interface{}{jsonSchemaURLKey: jsonSchemaURL})

e, err := json.Marshal(event)
e, err := (&jsonpb.Marshaler{}).MarshalToString(event)
assert.Nil(t, err)
assert.Equal(t, cloudEvent.Data(), e)
assert.Equal(t, string(cloudEvent.Data()), e)
cnt++
}
}
Expand Down
2 changes: 2 additions & 0 deletions flyteadmin/pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -1367,10 +1367,12 @@ func (m *ExecutionManager) CreateWorkflowEvent(ctx context.Context, request admi
return nil, err
}
}

if err := m.eventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil {
m.systemMetrics.PublishEventError.Inc()
logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err)
}

go func() {
if err := m.cloudEventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil {
m.systemMetrics.PublishEventError.Inc()
Expand Down
44 changes: 27 additions & 17 deletions flyteadmin/pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ import (
"context"
"strconv"

cloudeventInterfaces "github.com/flyteorg/flyteadmin/pkg/async/cloudevent/interfaces"

"github.com/flyteorg/flytestdlib/promutils/labeled"

eventWriter "github.com/flyteorg/flyteadmin/pkg/async/events/interfaces"
Expand Down Expand Up @@ -53,14 +55,15 @@ type nodeExecutionMetrics struct {
}

type NodeExecutionManager struct {
db repoInterfaces.Repository
config runtimeInterfaces.Configuration
storagePrefix []string
storageClient *storage.DataStore
metrics nodeExecutionMetrics
urlData dataInterfaces.RemoteURLInterface
eventPublisher notificationInterfaces.Publisher
dbEventWriter eventWriter.NodeExecutionEventWriter
db repoInterfaces.Repository
config runtimeInterfaces.Configuration
storagePrefix []string
storageClient *storage.DataStore
metrics nodeExecutionMetrics
urlData dataInterfaces.RemoteURLInterface
eventPublisher notificationInterfaces.Publisher
cloudEventPublisher cloudeventInterfaces.Publisher
dbEventWriter eventWriter.NodeExecutionEventWriter
}

type updateNodeExecutionStatus int
Expand Down Expand Up @@ -293,6 +296,12 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi
logger.Infof(ctx, "error publishing event [%+v] with err: [%v]", request.RequestId, err)
}

go func() {
if err := m.cloudEventPublisher.Publish(ctx, proto.MessageName(&request), &request); err != nil {
logger.Infof(ctx, "error publishing cloud event [%+v] with err: [%v]", request.RequestId, err)
}
}()

return &admin.NodeExecutionEventResponse{}, nil
}

Expand Down Expand Up @@ -546,7 +555,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData(

func NewNodeExecutionManager(db repoInterfaces.Repository, config runtimeInterfaces.Configuration,
storagePrefix []string, storageClient *storage.DataStore, scope promutils.Scope, urlData dataInterfaces.RemoteURLInterface,
eventPublisher notificationInterfaces.Publisher,
eventPublisher notificationInterfaces.Publisher, cloudEventPublisher cloudeventInterfaces.Publisher,
eventWriter eventWriter.NodeExecutionEventWriter) interfaces.NodeExecutionInterface {
metrics := nodeExecutionMetrics{
Scope: scope,
Expand All @@ -570,13 +579,14 @@ func NewNodeExecutionManager(db repoInterfaces.Repository, config runtimeInterfa
"overall count of publish event errors when invoking publish()"),
}
return &NodeExecutionManager{
db: db,
config: config,
storagePrefix: storagePrefix,
storageClient: storageClient,
metrics: metrics,
urlData: urlData,
eventPublisher: eventPublisher,
dbEventWriter: eventWriter,
db: db,
config: config,
storagePrefix: storagePrefix,
storageClient: storageClient,
metrics: metrics,
urlData: urlData,
eventPublisher: eventPublisher,
dbEventWriter: eventWriter,
cloudEventPublisher: cloudEventPublisher,
}
}
Loading

0 comments on commit a330c37

Please sign in to comment.