Skip to content

Commit

Permalink
Initialise the chan in the event source
Browse files Browse the repository at this point in the history
  • Loading branch information
peter-hazell committed Nov 7, 2024
1 parent 9a44d5a commit 5366ab7
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 16 deletions.
3 changes: 0 additions & 3 deletions provider-service/internal/outputhandlers/http_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,7 @@ func (hws HttpWebhookSink) In() chan<- any {

func (hws HttpWebhookSink) SendEvents() {
logger := common.LoggerFromContext(hws.context)
logger.Info("in SendEvents")
for data := range hws.in {
logger.Info("In range loop")
var err error
switch object := data.(type) {
case base.StreamMessage:
Expand All @@ -68,7 +66,6 @@ func (hws HttpWebhookSink) SendEvents() {
}

}
logger.Info("Stopped reading from input channel")
}

func (hws HttpWebhookSink) buildRequest(bodyBytes []byte) (*http.Request, error) {
Expand Down
22 changes: 9 additions & 13 deletions provider-service/internal/vai/event_data_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ func NewVAIEventDataSource(ctx context.Context, provider string, namespace strin
RunsSubscription: runsSubscription,
PipelineJobClient: *pipelineJobClient,
Logger: logger,
out: make(chan any),
}

go func() {
Expand All @@ -117,7 +118,7 @@ func (s VAIEventDataSource) subscribe(ctx context.Context) error {
s.Logger.Info("subscribing to pubsub...")

err := s.RunsSubscription.Receive(ctx, func(ctx context.Context, m *pubsub.Message) {
s.Logger.Info("event received")
s.Logger.Info(fmt.Sprintf("message received from Pub/Sub with ID: %s", m.ID))
logEntry := VAILogEntry{}
err := json.Unmarshal(m.Data, &logEntry)
if err != nil {
Expand All @@ -142,7 +143,13 @@ func (s VAIEventDataSource) subscribe(ctx context.Context) error {
}

select {
case s.out <- s.logAndReturnStreamMessage(event, m):
case s.out <- StreamMessage{
RunCompletionEventData: *event,
OnCompleteHandlers: OnCompleteHandlers{
OnSuccessHandler: func() { m.Ack() },
OnFailureHandler: func() { m.Nack() },
},
}:
case <-ctx.Done():
s.Logger.Info("stopped reading from pubsub")
return
Expand All @@ -157,17 +164,6 @@ func (s VAIEventDataSource) subscribe(ctx context.Context) error {
return nil
}

func (s VAIEventDataSource) logAndReturnStreamMessage(rced *common.RunCompletionEventData, pubsubMessage *pubsub.Message) StreamMessage {
s.Logger.Info("Constructing stream message")
return StreamMessage{
RunCompletionEventData: *rced,
OnCompleteHandlers: OnCompleteHandlers{
OnSuccessHandler: func() { pubsubMessage.Ack() },
OnFailureHandler: func() { pubsubMessage.Nack() },
},
}
}

func (s VAIEventDataSource) runCompletionEventDataForRun(ctx context.Context, runId string) *common.RunCompletionEventData {
job, err := s.PipelineJobClient.GetPipelineJob(ctx, &aiplatformpb.GetPipelineJobRequest{
Name: s.ProviderConfig.pipelineJobName(runId),
Expand Down

0 comments on commit 5366ab7

Please sign in to comment.