diff --git a/internal/pkg/service/stream/api/service/source.go b/internal/pkg/service/stream/api/service/source.go index a3a7cc4c37..3ba1e9291c 100644 --- a/internal/pkg/service/stream/api/service/source.go +++ b/internal/pkg/service/stream/api/service/source.go @@ -65,7 +65,9 @@ func (s *service) CreateSource(ctx context.Context, d dependencies.BranchRequest formatMsg, bridge.Params{ ProjectID: d.ProjectID(), + BranchID: d.Branch().BranchID, SourceID: source.SourceID, + SourceKey: source.SourceKey, SourceName: source.Name, }, ) @@ -209,7 +211,9 @@ func (s *service) DeleteSource(ctx context.Context, d dependencies.SourceRequest formatMsg, bridge.Params{ ProjectID: d.ProjectID(), + BranchID: d.Branch().BranchID, SourceID: source.SourceID, + SourceKey: source.SourceKey, SourceName: source.Name, }, ) diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go index 3b2cb7273a..2077c3f033 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go @@ -32,7 +32,9 @@ const ( type Params struct { ProjectID keboola.ProjectID + BranchID keboola.BranchID SourceID key.SourceID + SourceKey key.SourceKey SourceName string SinkID key.SinkID Stats statistics.Value @@ -75,7 +77,9 @@ func (b *Bridge) SendSliceUploadEvent( formatMsg, Params{ ProjectID: sliceKey.ProjectID, + BranchID: sliceKey.BranchID, SourceID: sliceKey.SourceID, + SourceKey: sliceKey.SourceKey, SinkID: sliceKey.SinkID, Stats: stats, }, @@ -126,7 +130,9 @@ func (b *Bridge) SendFileImportEvent( formatMsg, Params{ ProjectID: fileKey.ProjectID, + BranchID: fileKey.BranchID, SourceID: fileKey.SourceID, + SourceKey: fileKey.SourceKey, SinkID: fileKey.SinkID, Stats: stats, }, @@ -157,7 +163,9 @@ func SendEvent( Duration: client.DurationSeconds(duration), Results: map[string]any{ "projectId": params.ProjectID, + "branchId": params.BranchID, "sourceId": params.SourceID, + "streamId": params.SourceKey.String(), "sinkId": params.SinkID, }, } diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go index 1548b8958f..978f67ba42 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go @@ -52,7 +52,7 @@ func TestBridge_SendSliceUploadEvent_OkEvent(t *testing.T) { "duration": 3, "message": "Slice upload done.", "params": "null", - "results": "{\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600}}", + "results": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}", "type": "info" }`, body) } @@ -84,7 +84,7 @@ func TestBridge_SendSliceUploadEvent_ErrorEvent(t *testing.T) { "duration": 3, "message": "Slice upload failed.", "params": "null", - "results": "{\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\"}", + "results": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", "type": "error" }`, body) } @@ -139,7 +139,7 @@ func TestBridge_SendFileImportEvent_OkEvent(t *testing.T) { "duration": 3, "message": "File import done.", "params": "null", - "results": "{\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600}}", + "results": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}", "type": "info" }`, body) } @@ -171,7 +171,7 @@ func TestBridge_SendFileImportEvent_ErrorEvent(t *testing.T) { "duration": 3, "message": "File import failed.", "params": "null", - "results": "{\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\"}", + "results": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", "type": "error" }`, body) }