Skip to content

Commit

Permalink
Merge pull request #2113 from keboola/fix-add-streamid-into-events
Browse files Browse the repository at this point in the history
fix: add branchId and streamid into events
  • Loading branch information
jachym-tousek-keboola authored Oct 25, 2024
2 parents b63dd04 + b363d19 commit 16b56a5
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 4 deletions.
4 changes: 4 additions & 0 deletions internal/pkg/service/stream/api/service/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,9 @@ func (s *service) CreateSource(ctx context.Context, d dependencies.BranchRequest
formatMsg,
bridge.Params{
ProjectID: d.ProjectID(),
BranchID: d.Branch().BranchID,
SourceID: source.SourceID,
SourceKey: source.SourceKey,
SourceName: source.Name,
},
)
Expand Down Expand Up @@ -209,7 +211,9 @@ func (s *service) DeleteSource(ctx context.Context, d dependencies.SourceRequest
formatMsg,
bridge.Params{
ProjectID: d.ProjectID(),
BranchID: d.Branch().BranchID,
SourceID: source.SourceID,
SourceKey: source.SourceKey,
SourceName: source.Name,
},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ const (

type Params struct {
ProjectID keboola.ProjectID
BranchID keboola.BranchID
SourceID key.SourceID
SourceKey key.SourceKey
SourceName string
SinkID key.SinkID
Stats statistics.Value
Expand Down Expand Up @@ -75,7 +77,9 @@ func (b *Bridge) SendSliceUploadEvent(
formatMsg,
Params{
ProjectID: sliceKey.ProjectID,
BranchID: sliceKey.BranchID,
SourceID: sliceKey.SourceID,
SourceKey: sliceKey.SourceKey,
SinkID: sliceKey.SinkID,
Stats: stats,
},
Expand Down Expand Up @@ -126,7 +130,9 @@ func (b *Bridge) SendFileImportEvent(
formatMsg,
Params{
ProjectID: fileKey.ProjectID,
BranchID: fileKey.BranchID,
SourceID: fileKey.SourceID,
SourceKey: fileKey.SourceKey,
SinkID: fileKey.SinkID,
Stats: stats,
},
Expand Down Expand Up @@ -157,7 +163,9 @@ func SendEvent(
Duration: client.DurationSeconds(duration),
Results: map[string]any{
"projectId": params.ProjectID,
"branchId": params.BranchID,
"sourceId": params.SourceID,
"streamId": params.SourceKey.String(),
"sinkId": params.SinkID,
},
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestBridge_SendSliceUploadEvent_OkEvent(t *testing.T) {
"duration": 3,
"message": "Slice upload done.",
"params": "null",
"results": "{\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600}}",
"results": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T20:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}",
"type": "info"
}`, body)
}
Expand Down Expand Up @@ -84,7 +84,7 @@ func TestBridge_SendSliceUploadEvent_ErrorEvent(t *testing.T) {
"duration": 3,
"message": "Slice upload failed.",
"params": "null",
"results": "{\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\"}",
"results": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}",
"type": "error"
}`, body)
}
Expand Down Expand Up @@ -139,7 +139,7 @@ func TestBridge_SendFileImportEvent_OkEvent(t *testing.T) {
"duration": 3,
"message": "File import done.",
"params": "null",
"results": "{\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600}}",
"results": "{\"branchId\":456,\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"statistics\":{\"compressedSize\":52428800,\"firstRecordAt\":\"2000-01-01T01:00:00.000Z\",\"lastRecordAt\":\"2000-01-02T01:00:00.000Z\",\"recordsCount\":123,\"slicesCount\":1,\"stagingSize\":26214400,\"uncompressedSize\":104857600},\"streamId\":\"123/456/my-source\"}",
"type": "info"
}`, body)
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func TestBridge_SendFileImportEvent_ErrorEvent(t *testing.T) {
"duration": 3,
"message": "File import failed.",
"params": "null",
"results": "{\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\"}",
"results": "{\"branchId\":456,\"error\":\"some error\",\"projectId\":123,\"sinkId\":\"my-sink\",\"sourceId\":\"my-source\",\"streamId\":\"123/456/my-source\"}",
"type": "error"
}`, body)
}
Expand Down

0 comments on commit 16b56a5

Please sign in to comment.