Skip to content

Commit

Permalink
Create a flyteadmin event sink filter (flyteorg#341)
Browse files Browse the repository at this point in the history
* moved events to top-level and merged with flyteidl events

Signed-off-by: Daniel Rammer <[email protected]>

* fixed events imports on tests

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* added error message length validation and truncating if necessary.

Signed-off-by: Daniel Rammer <[email protected]>

* fixed lint issues

Signed-off-by: Daniel Rammer <[email protected]>

* added admin event error message truncating

Signed-off-by: Daniel Rammer <[email protected]>

* began adding oppo bloom filter to admin event sink

Signed-off-by: Daniel Rammer <[email protected]>

* removed truncating error message - didn't actually fix anything

Signed-off-by: Daniel Rammer <[email protected]>

* added oppobloom filter to admin event sink

Signed-off-by: Daniel Rammer <[email protected]>

* moved EventRecorder and EventSink mocks to the mocks package

Signed-off-by: Daniel Rammer <[email protected]>

* updated tests to use fastcheck.Filter mock

Signed-off-by: Daniel Rammer <[email protected]>

* resolved merge conflicts with go.mod and go.sum

Signed-off-by: Daniel Rammer <[email protected]>

* moved idFromMessage to separate func

Signed-off-by: Daniel Rammer <[email protected]>

* updated go.mod with newest flytestdlib version to include filter mocks

Signed-off-by: Daniel Rammer <[email protected]>

* added another level of prometheus scope to better clarify metric

Signed-off-by: Daniel Rammer <[email protected]>

* exported IDFromMessage and added explicit unit tests for it

Signed-off-by: Daniel Rammer <[email protected]>

* added comment on IDFromMessage function

Signed-off-by: Daniel Rammer <[email protected]>
  • Loading branch information
hamersaw authored Oct 21, 2021
1 parent 99c4dbc commit 7b09010
Show file tree
Hide file tree
Showing 40 changed files with 1,414 additions and 123 deletions.
158 changes: 158 additions & 0 deletions flytepropeller/events/admin_eventsink.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package events

import (
"context"
"fmt"

admin2 "github.com/flyteorg/flyteidl/clients/go/admin"

"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/service"
"github.com/flyteorg/flytepropeller/events/errors"
"github.com/flyteorg/flytestdlib/fastcheck"
"github.com/flyteorg/flytestdlib/logger"
"github.com/flyteorg/flytestdlib/promutils"
"github.com/golang/protobuf/proto"
"golang.org/x/time/rate"
)

type adminEventSink struct {
adminClient service.AdminServiceClient
filter fastcheck.Filter
rateLimiter *rate.Limiter
cfg *Config
}

// Constructs a new EventSink that sends events to FlyteAdmin through gRPC
func NewAdminEventSink(ctx context.Context, adminClient service.AdminServiceClient, config *Config, filter fastcheck.Filter) (EventSink, error) {
rateLimiter := rate.NewLimiter(rate.Limit(config.Rate), config.Capacity)

eventSink := &adminEventSink{
adminClient: adminClient,
filter: filter,
rateLimiter: rateLimiter,
cfg: config,
}

logger.Infof(ctx, "Created new AdminEventSink to Admin service")
return eventSink, nil
}

// Sends events to the FlyteAdmin service through gRPC
func (s *adminEventSink) Sink(ctx context.Context, message proto.Message) error {
logger.Debugf(ctx, "AdminEventSink received a new event %s", message.String())

// Short-circuit if event has already been sent
id, err := IDFromMessage(message)
if err != nil {
return fmt.Errorf("Failed to parse message id [%v]", message.String())
}

if s.filter.Contains(ctx, id) {
logger.Debugf(ctx, "event '%s' has already been sent", string(id))
return nil
}

// Validate submission with rate limiter and send admin event
if s.rateLimiter.Allow() {
switch eventMessage := message.(type) {
case *event.WorkflowExecutionEvent:
request := &admin.WorkflowExecutionEventRequest{
Event: eventMessage,
}

_, err := s.adminClient.CreateWorkflowEvent(ctx, request)
if err != nil {
return errors.WrapError(err)
}
case *event.NodeExecutionEvent:
request := &admin.NodeExecutionEventRequest{
Event: eventMessage,
}

_, err := s.adminClient.CreateNodeEvent(ctx, request)
if err != nil {
return errors.WrapError(err)
}
case *event.TaskExecutionEvent:
request := &admin.TaskExecutionEventRequest{
Event: eventMessage,
}

_, err := s.adminClient.CreateTaskEvent(ctx, request)
if err != nil {
return errors.WrapError(err)
}
default:
return fmt.Errorf("unknown event type [%s]", eventMessage.String())
}
} else {
return &errors.EventError{Code: errors.ResourceExhausted,
Cause: fmt.Errorf("Admin EventSink throttling admin traffic"), Message: "Resource Exhausted"}
}

s.filter.Add(ctx, id)
return nil
}

// Closes the gRPC client connection. This should be deferred on the client does shutdown cleanup.
func (s *adminEventSink) Close() error {
return nil
}

// Generates an ID which uniquely represents the admin event entity and associated phase.
func IDFromMessage(message proto.Message) ([]byte, error) {
var id string
switch eventMessage := message.(type) {
case *event.WorkflowExecutionEvent:
wid := eventMessage.ExecutionId
id = fmt.Sprintf("%s:%s:%s:%d", wid.Project, wid.Domain, wid.Name, eventMessage.Phase)
case *event.NodeExecutionEvent:
nid := eventMessage.Id
wid := nid.ExecutionId
id = fmt.Sprintf("%s:%s:%s:%s:%d", wid.Project, wid.Domain, wid.Name, nid.NodeId, eventMessage.Phase)
case *event.TaskExecutionEvent:
tid := eventMessage.TaskId
nid := eventMessage.ParentNodeExecutionId
wid := nid.ExecutionId
id = fmt.Sprintf("%s:%s:%s:%s:%s:%s:%d:%d", wid.Project, wid.Domain, wid.Name, nid.NodeId, tid.Name, tid.Version, eventMessage.Phase, eventMessage.PhaseVersion)
default:
return nil, fmt.Errorf("unknown event type [%s]", eventMessage.String())
}

return []byte(id), nil
}

func initializeAdminClientFromConfig(ctx context.Context) (client service.AdminServiceClient, err error) {
cfg := admin2.GetConfig(ctx)
clients, err := admin2.NewClientsetBuilder().WithConfig(cfg).Build(ctx)
if err != nil {
return nil, fmt.Errorf("failed to initialize clientset. Error: %w", err)
}

return clients.AdminClient(), nil
}

func ConstructEventSink(ctx context.Context, config *Config, scope promutils.Scope) (EventSink, error) {
switch config.Type {
case EventSinkLog:
return NewLogSink()
case EventSinkFile:
return NewFileSink(config.FilePath)
case EventSinkAdmin:
adminClient, err := initializeAdminClientFromConfig(ctx)
if err != nil {
return nil, err
}

filter, err := fastcheck.NewOppoBloomFilter(50000, scope.NewSubScope("admin").NewSubScope("filter"))
if err != nil {
return nil, err
}

return NewAdminEventSink(ctx, adminClient, config, filter)
default:
return NewStdoutSink()
}
}
68 changes: 68 additions & 0 deletions flytepropeller/events/admin_eventsink_integration_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// +build integration
// Add this tag to your project settings if you want to pick it up.

package events

import (
"context"
"fmt"
"github.com/stretchr/testify/assert"
netUrl "net/url"
"testing"
"time"

"github.com/flyteorg/flyteidl/clients/go/admin"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyteidl/gen/pb-go/flyteidl/event"
"github.com/flyteorg/flytestdlib/config"
"github.com/golang/protobuf/ptypes"
)

var (
u, _ = netUrl.Parse("localhost:8089")
adminServiceConfig = admin.Config{
Endpoint: config.URL{URL: *u},
UseInsecureConnection: true,
PerRetryTimeout: config.Duration{1 * time.Second},
MaxRetries: 1,
}
)

// To run this test, and see if the deadline working, pick an existing successful execution from your admin database
// select * from executions;
// Then delete all the events from it.
// delete from execution_events where execution_name = 'ikuy55mn0y';
// Then run this
// begin work; lock table executions in ACCESS EXCLUSIVE mode; SELECT pg_sleep(20); commit work;
// This will lock your table so that admin can't read it, causing the grpc call to timeout.
// On timeout, you should get a deadline exceeded error. Otherwise, you should get an error to the effect of
// "Invalid phase change from SUCCEEDED to RUNNING" or something like that.
// Lastly be sure to port forward admin, or change url above to the dns name if running in-cluster
func TestAdminEventSinkTimeout(t *testing.T) {
ctx := context.Background()
fmt.Println(u.Scheme)

adminClient := admin.InitializeAdminClient(ctx, adminServiceConfig)

eventSinkConfig := &Config{
Rate: 1,
Capacity: 1,
}

eventSink, err := NewAdminEventSink(ctx, adminClient, eventSinkConfig)

wfEvent := &event.WorkflowExecutionEvent{
Phase: core.WorkflowExecution_RUNNING,
OccurredAt: ptypes.TimestampNow(),
ExecutionId: &core.WorkflowExecutionIdentifier{
Project: "flyteexamples",
Domain: "development",
Name: "ikuy55mn0y",
},
ProducerId: "testproducer",
OutputResult: &event.WorkflowExecutionEvent_OutputUri{"s3://blah/blah/blah"},
}

err = eventSink.Sink(ctx, wfEvent)
assert.NoError(t, err)
}
Loading

0 comments on commit 7b09010

Please sign in to comment.