Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: Add filter predicate to MetricReader (push-down predicate) #3324

Closed
asafm opened this issue Mar 19, 2023 · 15 comments · Fixed by #3566
Closed

Proposal: Add filter predicate to MetricReader (push-down predicate) #3324

asafm opened this issue Mar 19, 2023 · 15 comments · Fixed by #3566
Assignees
Labels
area:sdk Related to the SDK [label deprecated] triaged-needmoreinfo [label deprecated] The issue is triaged - the OTel community needs more information to decide spec:metrics Related to the specification/metrics directory

Comments

@asafm
Copy link
Contributor

asafm commented Mar 19, 2023

What are you trying to achieve?
I would like as a user to define a predicate (programmatically at first) to MetricReader which will allow me to read only a portion of metrics available in the SDK metrics producer or any other MetricProducer.

This will be marked as experimental in the spec.

What did you expect to see?

A new operation in MetricReader: setPredicate(predicate)

Where predicate will be a function matching the following interface:
(instrumentationScope, instrumentName, instrumentDescription, instrumentKind, attributes, unit) -> boolean
The metric reader will provide that function to any registered MetricProducer upon calling collect(). Each producer will use that function to determine whether to produce a metric for given attributes in an instrument. A True value means the data point will be included in the result of the metrics producer, and there by respected by what ever the reader does with the data point.

A new optional parameter to Produce() operation of MetricProducer:
producer(predicate), where predicate is defined as above.

Additional context.

I'm working on designing the integration of OpenTelemetry Metrics into Apache Pulsar.
Pulsar supports today up-to 100k of topics (partitions in Kafka terms) in a single broker, and in the near future 1M topics per broker (possible since Pulsar persists the message into a virtual append-only files to Apache Bookkeeper, as opposed to actual file descriptors on disk).

In a single broker having 100k topics, with 50 instruments for topics, this means, 5000k attribute sets overall. As you can imagine, it is too much, response body size and cost wise. Hence, there are two ways I planned to solve it, where the 2nd way is the reason for this proposal:

  1. Introduce new instruments, aggregating topic metrics to new level called Metric Topic Groups (grouping topics) which should have reasonable cardinality.
  2. Given most will use the Metric Groups instruments, I would like to have the ability to filter by default all topic-level instruments. When is alerted something is wrong with a given topic group, they will want to dynamically "turn on" topic-level instruments for the topics in that group.
    This can be achieved with decorating the registered MetricProducer and filtering the result list, but this means we pay the cost of memory and CPU associated with collecting those amount of attributes (that can be quite a lot, as shows). In latency sensitive applications such as Apache Pulsar, every millisecond in latency counts.
    Hence, I wanted to be able to provide a filter function which the SDK metric producer will use to determine which (instrument, attribute) pairs to collect or skip, whether synchronous or asynchronous (the callback record() method will do nothing).

I believe this is useful also for other purposes. Imagine the applications wish to expose some instruments and attributes via REST API in certain format. It can create a metric reader and set the filter, to obtain those values. The alternative is to record the values manually in memory and expose it via asynchronous instruments (which is not possible for histograms).
This REST API can also be a nice UI showing current metrics of the system.

Also, Prometheus /metrics endpoint today supports this kind of dynamic filtering via query parameters (name[]). This proposal will make it more efficient and more accurate (delta temporality resets count upon collect, but data got discarded forward in the pipeline).

@asafm asafm added the spec:metrics Related to the specification/metrics directory label Mar 19, 2023
@pirgeo
Copy link
Member

pirgeo commented Mar 21, 2023

Does this lead to the same output as basically turning off all the (instrument, attribute_set) tuples that are not desirable? Maybe I am missing the point here, I have never worked with Pulsar, but why record them in the first place if you intend to disable them later?

@asafm
Copy link
Contributor Author

asafm commented Mar 21, 2023

Turning off --> delete?
Think of it as DEBUG log-level. You record logs that in most cases you don't need, right? But in some cases you need to turn on DEBUG level to better understand what goes on, and then you turn it off.

Apache Pulsar is a messaging system like Kafka. It has topics, but as opposed to Kafka: It supports huge amount of topics, which results of 10k to 100k topics per broker.

Topic group are the key to reduce cardinality to still be able to monitor Pulsar.

You will have instruments for topic level granularity, each having 100k attribute sets (1 per topic), and instruments for topic group level granularity (1 per group), each having 1k attribute sets.

By default, topic level will be filtered out. You will group level instruments. Once you see a group misbehave, you turn it on to allow exporting only that group topics. This will be controlled dynamically by the user in the predicate that will be provided to the metric reader.

Once you understood and solved the issue, you can turn off the metrics for that topic group, and you can even decide to monitor a specific topic.

@pirgeo
Copy link
Member

pirgeo commented Mar 21, 2023

I see. What is the difference when compared to Views?

@jack-berg
Copy link
Member

I think this could make sense. Couple of thoughts:

  1. Would be good to see a performance comparison between reading all the data and applying the predicate in the reader, and pushing the predicate down into the SDK. It may be the case that even with a large number series, it doesn't make a impactful difference. For example, with the java SDK prototype to reuse objects allocations for reading metrics are brought to nearly zero. There could be some extra CPU overhead for reading all the metrics when only some are needed, but there's also additional CPU overhead for evaluating the predicate on every series. A benchmark would clear things up.
  2. I think for completeness the predicate would need to accept all the parameters that describe each point, including scope, instrument name, type, description, unit, point kind, and attributes. The instrument name and description should be the names after any views have been applied, rather than before.

@asafm
Copy link
Contributor Author

asafm commented Mar 22, 2023

I see. What is the difference when compared to Views?

  1. Views are not dynamic. They are dictated on initialization.
  2. Views are in scope of an instrument, not a single attribute set. You can either drop the entire instrument or not.

@asafm
Copy link
Contributor Author

asafm commented Mar 22, 2023

  1. Would be good to see a performance comparison between reading all the data and applying the predicate in the reader, and pushing the predicate down into the SDK. It may be the case that even with a large number series, it doesn't make a impactful difference. For example, with the java SDK prototype to reuse objects allocations for reading metrics are brought to nearly zero. There could be some extra CPU overhead for reading all the metrics when only some are needed, but there's also additional CPU overhead for evaluating the predicate on every series. A benchmark would clear things up.

Totally agree. I was planning to do such a comparison, once I reach that phase.
In zero allocation SDKs, such as the branch you started working on, the prolonged cost would probably be high memory usage: If by default in Pulsar I filter 90% (I only export topic metric group level instruments) and in problematic times I open several groups, each time a different group I will end up having say 10% of attributes result objects stored in memory. In the case I don't have the push-down filter, I will need 100% of attribute result objects, and filter 90% of them. So the memory usage is almost x10 higher. This why I think it makes sense to have this filter (performance wise).

  1. I think for completeness the predicate would need to accept all the parameters that describe each point, including scope, instrument name, type, description, unit, point kind, and attributes. The instrument name and description should be the names after any views have been applied, rather than before.

Awesome suggestion. I'm revising my proposal to include it.

@pirgeo
Copy link
Member

pirgeo commented Mar 22, 2023

Views are not dynamic. They are dictated on initialization.

Got it! Nicer to use, and probably a whole lot harder to implement 😄

Views are in scope of an instrument, not a single attribute set. You can either drop the entire instrument or not.

Two more questions:

  • Would you use the same instrument for two different cases (e.g. for reporting metrics on one topic and reporting metrics on one topic group)? If this is the case, it feels like comparing apples to oranges to me - but I might be lacking the context here. If this is not the case, I don't see the problem - then you do want to drop the whole instrument, right?
  • If you don't drop the whole instrument, but only allow a few metrics streams for a certain instrument, then you cannot rely on the aggregate of these metrics anymore. E.g. if you record a counter in 5 places with different attributes, and then only export 3 of the metric streams, you cannot say "the overall sum is the sum of these three exported metrics" since you would be missing the two that were not exported. But, I guess that is what you are actually after since you don't always want all data, is that right?

@asafm
Copy link
Contributor Author

asafm commented Mar 23, 2023

@pirgeo
Each aggregation level has its own instrument:
pulsar.messaging.namespace.bytes.in
pulsar.messaging.metricGroup.bytes.in
pulsar.messaging.topic.bytes.in

metricGroup will likely to have normal cardinality, so this would probably not be filtered out.
pulsar.messaging.topic.bytes.in - here I expect most of the filtering to happen.

Even you filter most topics, you still have the metricGroup
In rare cases where you filtered by metricGroup you still have namespace level.
So you never lose data, so to speak.

Of course, you need to be aware that if you explicitly configured Pulsar to filter out metrics, some will not appear there :)

@arminru arminru added area:sdk Related to the SDK [label deprecated] triaged-needmoreinfo [label deprecated] The issue is triaged - the OTel community needs more information to decide labels Apr 3, 2023
@arminru
Copy link
Member

arminru commented Apr 3, 2023

@asafm Would you like to discuss this topic on the spec call tomorrow to move this proposal forward?

@asafm
Copy link
Contributor Author

asafm commented Apr 4, 2023

@arminru I would, but unfortunately I can't today. I had a lengthy conversation with @jack-berg and @jmacd - I think they managed to understand the use case. If either of them will be present, they may be able to present the idea. If not, once the holiday season ends here, I will join the next meeting for sure.

@asafm
Copy link
Contributor Author

asafm commented Apr 30, 2023

@arminru I will try to join this Tuesday

@jmacd
Copy link
Contributor

jmacd commented May 9, 2023

@asafm I understand that you are working out a way to offer debug-level metrics and have arrived at an approach where the reader supports a configurable predicate allowing you to skip through the data.

This led me to think through other possible ways to accomplish this, and I like your approach. As long as we're imagining, though, I think it's worth comparing alternatives.

One alternative is to let the SDK output high-cardinality data, send it to a collector, where (somehow) the level of aggregation could be dynamically controlled. Although we can find ways to describe varying-dimension data in OTLP, we are likely to run into problems at most consumers.

A similar alternative could output high-cardinality data to an intermediate representation, then re-process that data to reduce it to the intended level of detail at runtime. This might be more efficient than the collector-based re-aggregation, it is still more expensive than what you're considering.

One alternative might be to try to register dynamic views, but especially for UpDownCounter instruments (like counting items in a queue), the finest intended granularity (i.e., most dimensions) has to be aggregated from the start.

Your solution to the problems associated with varying-dimensions is to establish what I may call "dormant views", which are view configurations that are generally being calculated but not exported most of the time. When a user is diagnosing a specific problem, they could enable a specific dormant view and/or enter a predicate to report specific sets of attributes to a sporadically-reported timeseries used for debugging. You can turn the debug-level data on and off, without disturbing the ordinary group-level timeseries.

All the side effects associated with the SDK's Collect() would still take place when a predicate skips the output. All the callbacks are evaluated. If the reader is configured with Delta temporality, there will be gaps with missing data, if cumulative there will be gaps with average data.

I support this as an experimental option for SDKs to implement. I am wary of using "Filter" to describe this action of skipping data, I've used "skip" and "predicate" to avoid that word because that word is associated with reducing dimensionality in a View, and this is a separate mechanism.

@asafm
Copy link
Contributor Author

asafm commented May 11, 2023

One alternative is to let the SDK output high-cardinality data, send it to a collector, where (somehow) the level of aggregation could be dynamically controlled. Although we can find ways to describe varying-dimension data in OTLP, we are likely to run into problems at most consumers.

I thought about it at first, but there were a couple of hurdles I saw to it:

  1. The amount of traffic that will be shipped from the application to the collector. I was afraid of a bit of timeouts, memory required by the shipper, etc.
  2. User experience. I wanted to give a Pulsar user the best UX. If I were to tell them they need to configure OTel with aggregating, which is not 1–5 lines of configuration, it's a lot, and probably done per instrument, it would make their life very difficult. I wanted to give the ultimate UX for operating it. As close to changing INFO to DEBUG in log4j.xml.
  3. Some folks may have decided to ship directly to a vendor, and not go through a collector/agent. For them, it will be painful, as I'm forcing them to install an agent and go through configuration hurdles.
  4. There is a good chance they will be using more than 1 collector. This means, it will require them to write/use a way to dynamically deploy a configuration change when ever they need to see the "DEBUG" metrics, to a fleet of agents. Maybe restart them if they don't support dynamic config. It's not a good UX.

A similar alternative could output high-cardinality data to an intermediate representation, then re-process that data to reduce it to the intended level of detail at runtime. This might be more efficient than the collector-based re-aggregation, it is still more expensive than what you're considering.

I hope I understood your intention correctly. I had that idea initially, but it failed. The idea was that I will have only topic-level instruments - i.e. pulsar_messaging_received_size would count the size of received messages per topic, so the attribute sets would be topic level sets. Then at the MetricReader level I would decorate the given MetricsProducer with my own, so each the reader calls collect on the producer, I will call collect on the original producer and then would perform the aggregation on the given data points, effectively transforming from topic level attribute set to group level attribute set. The problem in Pulsar is that topics can move between brokers. So if group G has T1 and T2, and so far they had t1=10, t2=20 and G would b 30, on next collect T1 has moved and since it's async instrument, I will only get T2=20 next time which means G=20, so I decreased G from 30 to 20, alhtough it's defined as Counter which means I violated it's always increase premise.
So I decided to codify the group level metrics - i.e. have pulsar_messaging_topic_received_size and pulsar_messaging_group_received_size. This way the group counter will not decrease when T1 attribute set will be stopped reporting since it has its own counter and it's not a function of the topic counters.

One alternative might be to try to register dynamic views, but especially for UpDownCounter instruments (like counting items in a queue), the finest intended granularity (i.e., most dimensions) has to be aggregated from the start.

Dynamic views is an option I explored (I spent a lot of time on this...). The issue there was like this: Suppose we have pulsar_messaging_received_size instrument in topic level granularity. You can dynamically define a view for it which changes the attributes from (group, topic) to (group). This means, the values are recorded in memory at group level. Once you decide to switch to (group, topic) dynamically for that view, you're basically counting from 0, so you lost the ability of the "on/off" switch I wanted - to see the metrics as they were. You will mismatch for those, and it will ruin the graph displayed in Grafana (suddenly dropping to 0).

Your solution to the problems associated with varying-dimensions is to establish what I may call "dormant views", which are view configurations that are generally being calculated but not exported most of the time. When a user is diagnosing a specific problem, they could enable a specific dormant view and/or enter a predicate to report specific sets of attributes to a sporadically-reported timeseries used for debugging. You can turn the debug-level data on and off, without disturbing the ordinary group-level timeseries.

Basically yes. I can define a view for each instrument: One similar to the default one, and another which will change the name from topic to group level, there by creating a new instrument, and changes the attributes to be group level. The one thing I need to use this is something like export=false for a view. Keep counting and have that storage for that view, but don't export it - and it has to be dynamic. Specifically for me, defining so many views is too cumbersome, so it's preferred to code it, even if I had that option.

All the side effects associated with the SDK's Collect() would still take place when a predicate skips the output. All the callbacks are evaluated. If the reader is configured with Delta temporality, there will be gaps with missing data, if cumulative there will be gaps with average data.

Not all side effects. Yes, callback will be evaluated, but the most important thing is saved here: No need to memory allocate an object to return the result. So if most of the time those 100k topics remained filtered out, your memory requirements due to collect are quite low.

I support this as an experimental option for SDKs to implement. I am wary of using "Filter" to describe this action of skipping data, I've used "skip" and "predicate" to avoid that word because that word is associated with reducing dimensionality in a View, and this is a separate mechanism.

I agree it should be experimental and I will edit the issue to reflect that.
You are correct that it is confusing to use the word filter.
I will change it to predicate.

@asafm
Copy link
Contributor Author

asafm commented May 11, 2023

What is the next step for this proposal? Write a PR?

@asafm
Copy link
Contributor Author

asafm commented May 29, 2023

@arminru @jmacd : What is the next step for this proposal? Write a PR?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:sdk Related to the SDK [label deprecated] triaged-needmoreinfo [label deprecated] The issue is triaged - the OTel community needs more information to decide spec:metrics Related to the specification/metrics directory
Projects
None yet
Development

Successfully merging a pull request may close this issue.

5 participants