From e964739b1e8f763dc391e2216656af8b5207e9fe Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Va=C5=A1ko?= Date: Tue, 22 Oct 2024 09:54:36 +0200 Subject: [PATCH 1/2] fix: Add streamID into events --- internal/pkg/service/stream/api/service/source.go | 2 ++ .../stream/sink/type/tablesink/keboola/bridge/event.go | 5 +++++ 2 files changed, 7 insertions(+) diff --git a/internal/pkg/service/stream/api/service/source.go b/internal/pkg/service/stream/api/service/source.go index 18a43be5c7..044fc8bcd7 100644 --- a/internal/pkg/service/stream/api/service/source.go +++ b/internal/pkg/service/stream/api/service/source.go @@ -58,6 +58,7 @@ func (s *service) CreateSource(ctx context.Context, d dependencies.BranchRequest formatMsg, bridge.Params{ ProjectID: d.ProjectID(), + BranchID: d.Branch().BranchID, SourceID: source.SourceID, SourceName: source.Name, }, @@ -190,6 +191,7 @@ func (s *service) DeleteSource(ctx context.Context, d dependencies.SourceRequest formatMsg, bridge.Params{ ProjectID: d.ProjectID(), + BranchID: d.Branch().BranchID, SourceID: source.SourceID, SourceName: source.Name, }, diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go index 3b2cb7273a..b2a70f5298 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go @@ -32,6 +32,7 @@ const ( type Params struct { ProjectID keboola.ProjectID + BranchID keboola.BranchID SourceID key.SourceID SourceName string SinkID key.SinkID @@ -75,6 +76,7 @@ func (b *Bridge) SendSliceUploadEvent( formatMsg, Params{ ProjectID: sliceKey.ProjectID, + BranchID: sliceKey.BranchID, SourceID: sliceKey.SourceID, SinkID: sliceKey.SinkID, Stats: stats, @@ -126,6 +128,7 @@ func (b *Bridge) SendFileImportEvent( formatMsg, Params{ ProjectID: fileKey.ProjectID, + BranchID: fileKey.BranchID, SourceID: fileKey.SourceID, SinkID: fileKey.SinkID, Stats: stats, @@ -157,7 +160,9 @@ func SendEvent( Duration: client.DurationSeconds(duration), Results: map[string]any{ "projectId": params.ProjectID, + "branchId": params.BranchID, "sourceId": params.SourceID, + "streamId": params.SourceID.String(), "sinkId": params.SinkID, }, } From b363d1985320de422ffe2b82b279bc0cc2a42d4e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Martin=20Va=C5=A1ko?= Date: Tue, 22 Oct 2024 10:34:27 +0200 Subject: [PATCH 2/2] fix: Add SourceKey into event as composite key --- internal/pkg/service/stream/api/service/source.go | 2 ++ .../stream/sink/type/tablesink/keboola/bridge/event.go | 5 ++++- .../sink/type/tablesink/keboola/bridge/event_test.go | 8 ++++---- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/internal/pkg/service/stream/api/service/source.go b/internal/pkg/service/stream/api/service/source.go index 044fc8bcd7..e95c0d82ec 100644 --- a/internal/pkg/service/stream/api/service/source.go +++ b/internal/pkg/service/stream/api/service/source.go @@ -60,6 +60,7 @@ func (s *service) CreateSource(ctx context.Context, d dependencies.BranchRequest ProjectID: d.ProjectID(), BranchID: d.Branch().BranchID, SourceID: source.SourceID, + SourceKey: source.SourceKey, SourceName: source.Name, }, ) @@ -193,6 +194,7 @@ func (s *service) DeleteSource(ctx context.Context, d dependencies.SourceRequest ProjectID: d.ProjectID(), BranchID: d.Branch().BranchID, SourceID: source.SourceID, + SourceKey: source.SourceKey, SourceName: source.Name, }, ) diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go index b2a70f5298..2077c3f033 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event.go @@ -34,6 +34,7 @@ type Params struct { ProjectID keboola.ProjectID BranchID keboola.BranchID SourceID key.SourceID + SourceKey key.SourceKey SourceName string SinkID key.SinkID Stats statistics.Value @@ -78,6 +79,7 @@ func (b *Bridge) SendSliceUploadEvent( ProjectID: sliceKey.ProjectID, BranchID: sliceKey.BranchID, SourceID: sliceKey.SourceID, + SourceKey: sliceKey.SourceKey, SinkID: sliceKey.SinkID, Stats: stats, }, @@ -130,6 +132,7 @@ func (b *Bridge) SendFileImportEvent( ProjectID: fileKey.ProjectID, BranchID: fileKey.BranchID, SourceID: fileKey.SourceID, + SourceKey: fileKey.SourceKey, SinkID: fileKey.SinkID, Stats: stats, }, @@ -162,7 +165,7 @@ func SendEvent( "projectId": params.ProjectID, "branchId": params.BranchID, "sourceId": params.SourceID, - "streamId": params.SourceID.String(), + "streamId": params.SourceKey.String(), "sinkId": params.SinkID, }, } diff --git a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go index 1548b8958f..978f67ba42 100644 --- a/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go +++ b/internal/pkg/service/stream/sink/type/tablesink/keboola/bridge/event_test.go @@ -52,7 +52,7 @@ func TestBridge_SendSliceUploadEvent_OkEvent(t *testing.T) { "duration": 3, "message": "Slice upload done.", "params": "null", - "results": "{\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600}}", + "results": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}", "type": "info" }`, body) } @@ -84,7 +84,7 @@ func TestBridge_SendSliceUploadEvent_ErrorEvent(t *testing.T) { "duration": 3, "message": "Slice upload failed.", "params": "null", - "results": "{\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\"}", + "results": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", "type": "error" }`, body) } @@ -139,7 +139,7 @@ func TestBridge_SendFileImportEvent_OkEvent(t *testing.T) { "duration": 3, "message": "File import done.", "params": "null", - "results": "{\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600}}", + "results": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}", "type": "info" }`, body) } @@ -171,7 +171,7 @@ func TestBridge_SendFileImportEvent_ErrorEvent(t *testing.T) { "duration": 3, "message": "File import failed.", "params": "null", - "results": "{\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\"}", + "results": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}", "type": "error" }`, body) }