-
Notifications
You must be signed in to change notification settings - Fork 24
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NETOBSERV-578: removing arrays from pipeline interfaces #319
Conversation
New image: ["quay.io/netobserv/flowlogs-pipeline:1e0449c"]. It will expire after two weeks. |
// copy input entry before transform to avoid alteration on parallel stages | ||
outputEntry := inputEntry.Copy() | ||
|
||
// TODO: for efficiency and maintainability, maybe each case in the switch below should be an individual implementation of Transformer | ||
for _, rule := range n.Rules { | ||
switch rule.Type { | ||
case api.TransformNetworkOperationName("AddRegExIf"): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic behind this change is that the result of this invocation is known at runtime while the value of api.OpAddRegexIf
is known at compile time.
That allows a much more efficient implementation of switch
, especially important when this is invoked for each flow, for each rule.
pkg/pipeline/ingest/metrics.go
Outdated
batchSizeSummary = operational.DefineMetric( | ||
flowsProcessed = operational.DefineMetric( | ||
"ingest_flows_processed", // This is intentionally named to emphasize its utility for flows counting, despite being a batch size distribution | ||
"Provides number of flows processed, batches processed, and batch size stats (in number of flows)", | ||
operational.TypeSummary, | ||
"Provides number of flows processed", | ||
operational.TypeCounter, | ||
"stage", | ||
) | ||
batchSizeBytesSummary = operational.DefineMetric( | ||
"ingest_batch_size_bytes", | ||
"Ingested batch size distribution, in bytes", | ||
operational.TypeSummary, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@jotak This PR reverts part of your previous work. Since we don't batch anymore at ingest time.
Another option would be keep this batchSize summary and apply it at the GRPC interceptor, so we know the batch sizes from the client. For the kafka use case, it would happen that the summary reports that every batch is size 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(other option, just keep different metrics: flowsProcessed
, ingestBytes
for kafka ingest and batchSizeSummary
, batchSizeBytesSummary
for GRPC ingest.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea behind these metrics was to get stats like knowing the effective size of agent's batches. As you said, it doesn't work with kafka because kafkago
hides this detail, but that's still interesting for GRPC.
Since it's measuring ingesters input and not their output, I think removing the batches on output shouldn't affect them..
About using different metric names with or without kafka, the drawback is that it's harder to consume: we need to create more prom queries / grafana dashboards. So we need anyway to do a compromise somewhere. Personally I'd like to stick with the previous metrics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sounds reasonable. Bringing them back.
It seems this PR is not possible unless we reimplement the connection tracking stage. @ronensc do you have an estimation about how difficult would be to reimplement the function: func (ct *conntrackImpl) Extract(flowLogs []config.GenericMap) []config.GenericMap To: func (ct *conntrackImpl) Extract(flowLogs config.GenericMap) config.GenericMap Currently it seems it's not able to detect endConnections |
Not only the connection tracking, but also the aggregation and timebased-topk would need changes. In each of these we save some state and do some computation after batches of flow logs. If now the flow logs come one a time, we have to check that our implementation only runs every so often and not after each call to Extract(). |
@jotak @ronensc @KalmanMeth @eranra reopening this PR as I found a compromise solution. I added an intermediate batcher for these parts of the API that really need to work in batches, like the connection tracking and the timebased-topk. This would improve overall performance and memory but specially in those FLP configurations that do not use extractors. At the same time, this gives us more room for a later reimplementation of extractors to avoid batching. |
} | ||
// TODO: replace batcher by rewriting the different extractor implementations | ||
// to keep the status while processing flows one by one | ||
utils.Batcher(utils.ExitChannel(), b.batchMaxLen, b.batchTimeout, in, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Didn't provide unit tests for Batcher as it is already tested as an internal component in the conntrack integration tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice idea to build the Batcher in this way for all the stages that need it!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, currently deployed and working as expected on LLC cluster
I'll let it run to compare after some hours 👍
Thanks @mariomac
This PR removes the batching logic in the Kafka ingester and processes flows 1 by 1 on each pipeline stage.
While the CPU usage doesn't seem to noticeably decrease:
It greatly improves memory consumption:
(as well as simplifies some parts of the code)