Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] Setup dashboard for Airflow monitoring #10341

Closed
2 of 3 tasks
wu-sheng opened this issue Feb 4, 2023 · 81 comments
Closed
2 of 3 tasks

[Feature] Setup dashboard for Airflow monitoring #10341

wu-sheng opened this issue Feb 4, 2023 · 81 comments
Assignees
Labels
backend OAP backend related. feature New feature good first issue Good first issue for beginners

Comments

@wu-sheng
Copy link
Member

wu-sheng commented Feb 4, 2023

Search before asking

  • I had searched in the issues and found no similar feature requirement.

Description

This is an open issue for new contributors. Apache Airflow is a widely used workflow scheduler. We are encouraging someone new to the community to add a new level catalog(workflow) for Airflow.

Metrics

Airflow exposes metrics through StatsD, https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/metrics.html.
We could use StatsD + OpenTelemetry StatesD(https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/statsdreceiver/README.md) + OpenTelemetry OTEL exporter to ship the metrics to SkyWalking OTEL receiver.
Then use MAL to build metrics as well as a dashboard for those metrics. Notice, a new layer and new UI menu should be added.

Logging

Airflow supports Fluents to ship metrics, https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/logging-architecture.html. SkyWalking already has FluentD setup support, so we should be able to receive and catalog the logs.

Additionally, Task Logs seems an interesting think. We could use LAL(Log Analysis) to group the logs by task name(or ID) by treating tasks as endpoints(SkyWalking concept).

Use case

Add more observability for Airflow server.

Related issues

No response

Are you willing to submit a PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@wu-sheng wu-sheng added backend OAP backend related. feature New feature good first issue Good first issue for beginners labels Feb 4, 2023
@mufiye
Copy link
Contributor

mufiye commented Feb 5, 2023

I have done one issue about the agent and I'm interested in oap. I think I can do this task, could you please assign it to me?

@wu-sheng
Copy link
Member Author

wu-sheng commented Feb 5, 2023

Assigned. Good luck.

@mufiye
Copy link
Contributor

mufiye commented Feb 5, 2023

Assigned. Good luck.

Ok, I will do my best.

@wu-sheng
Copy link
Member Author

wu-sheng commented Feb 5, 2023

@mufiye You could take one step at a time. Make metrics available for airflow first. Then move forward on logs.

@mufiye
Copy link
Contributor

mufiye commented Feb 5, 2023

@mufiye You could take one step at a time. Make metrics available for airflow first. Then move forward on logs.

Ok, I will do the metrics part first. And I think I can refer to other similar issues about how to add the metrics dashboard, such as issue #9677.

@mufiye
Copy link
Contributor

mufiye commented Feb 12, 2023

Hello, @wu-sheng . I find that all data opentelemetry collector received use tag to compose "metrics name" but have no "tag attributes", such as "airflow.ti.finish.tutorial.templated.up_for_reschedule", "airflow.ti.finish.tutorial.print_date.shutdown". I have no idea how to write the mal rules to process these data.

@wu-sheng
Copy link
Member Author

What tag do you need? Tag is not required. For describing airflow server, that could be set through otel collector, like we did for mysql metrics.

@mufiye
Copy link
Contributor

mufiye commented Feb 12, 2023

What tag do you need? Tag is not required. For describing airflow server, that could be set through otel collector, like we did for mysql metrics.

I mean that all info is contained in the "metrics name" such as <job_name>, <job_id>, <dag_id>, <task_id>, <operator_name>, and so on. But I have no way to filter and process. Or I just don't consider these metrics? Because of the statsD data format, the otel data collected will not have "key value pair" tag attributes.

@wu-sheng
Copy link
Member Author

Does the original statsd have these metadata?

@mufiye
Copy link
Contributor

mufiye commented Feb 12, 2023

These metadata compose the name, such as "local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>".

@wu-sheng
Copy link
Member Author

@potiuk Do you have time to help?
We want to monitor airflow with meteics having job/dag/task IDs to group metrics rather than just metrics for the whole airflow server.

@wu-sheng
Copy link
Member Author

These metadata compose the name, such as "local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>".

OK, if it has, we could write a small MAL script(groovy based) to split these matadata.
But meanwhile, I think how to match these metrics are a little challenging. @kezhenxu94 @hanahmily @wankai123 What do you suggest? Do we have to write a regex based analysis?

@wu-sheng
Copy link
Member Author

@mufiye Do you check OTEL side configurations? Is there a way to change their style? Otherwise, we maybe need to build a stated receiver.

@mufiye
Copy link
Contributor

mufiye commented Feb 12, 2023

@mufiye Do you check OTEL side configurations? Is there a way to change their style? Otherwise, we maybe need to build a stated receiver.

I think maybe the processor of the otel collector can do this and I need to check this part.

@kezhenxu94
Copy link
Member

What tag do you need? Tag is not required. For describing airflow server, that could be set through otel collector, like we did for mysql metrics.

I mean that all info is contained in the "metrics name" such as <job_name>, <job_id>, <dag_id>, <task_id>, <operator_name>, and so on. But I have no way to filter and process. Or I just don't consider these metrics? Because of the statsD data format, the otel data collected will not have "key value pair" tag attributes.

I doubt that you mixed the concept between "airflow job" and "opentelemetry job"? We use the OpenTelemetry Job name to distinguish data sources.

As for the metrics name like "local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>", I think you should spit them in the OpenTelemetry processor and move the metadata into tags then send to OAP, anyway I'll take a look at Airflow's doc to see what's the case.

@mufiye
Copy link
Contributor

mufiye commented Feb 12, 2023

What tag do you need? Tag is not required. For describing airflow server, that could be set through otel collector, like we did for mysql metrics.

I mean that all info is contained in the "metrics name" such as <job_name>, <job_id>, <dag_id>, <task_id>, <operator_name>, and so on. But I have no way to filter and process. Or I just don't consider these metrics? Because of the statsD data format, the otel data collected will not have "key value pair" tag attributes.

I doubt that you mixed the concept between "airflow job" and "opentelemetry job"? We use the OpenTelemetry Job name to distinguish data sources.

As for the metrics name like "local_task_job.task_exit.<job_id>.<dag_id>.<task_id>.<return_code>", I think you should spit them in the OpenTelemetry processor and move the metadata into tags then send to OAP, anyway I'll take a look at Airflow's doc to see what's the case.

I think I say something wrong. It could add "key value pair" tag to the statsD message, but actually airflow only use the name to contain these metadata. I think using OpenTelemetry processor to process the data maybe a feasible method. About the "job", it is just the original airflow metrics name.

@potiuk
Copy link
Member

potiuk commented Feb 13, 2023

Not much time to help (catching up with some stuff) , but from what it is worth - statsd of Airflow is not the "best" to consume for Skywalking - unforatunately you'd indeed need to parse the metric name and while I am not sure how OTEL processor might work, regexp approach might be a good idea.

However just to give you perspective - Airflow's metrics are evolving.

Quite recently (coming in next version of Airflow) - 2.6 most likely @hussein-awala improved Statsd metrics with DataDog metadata tags - apache/airflow#28961 and maybe, rather than focusing on pure statsd metrics you could integrate those.

Also - a bit more long term - In Airlfow we already approved Open Telemetry support for Airflow https://cwiki.apache.org/confluence/display/AIRFLOW/AIP-49+OpenTelemetry+Support+for+Apache+Airflow and we even have a chance to progress with the implementation - @feruzzi is looking into the integration and even is adding better support for Airflow's statsd metrics testing in Breeze (Airflow development environment) with Grafana and Prometheus - apache/airflow#29449

So maybe it could be a nice teamwork.

@mufiye
Copy link
Contributor

mufiye commented Feb 13, 2023

I have done the research. And start a new discussion in the opentelemetry collector contrib. @wu-sheng @kezhenxu94
I have tried to use the metrics transform processor, transform processor and attributes processor.
The metrics transform processor's combine function can process the counter and gauge data, but can not process summary and histogram data. It will encounter some problems when processing histogram data it will report errors in the terminal, and for summary data, the datapoint part of the result data is null.
For the transform processor, I could use functions like replace_pattern to make the metrics name concise, such as "ti.finish.<dag_id>.<task_id>." to "ti.finish", but I have no idea how to add the <dag_id> and <task_id> into the attributes, maybe the Split function to split name by "." and set function, but it does not work.
Could anyone give me some suggestions about my next step? I think I can try to process the counter, gauge and most part timer(convert to summary or histogram in the otel) data in the stasD -> otel collector -> skywalking case. Or anyone who is familiar with opentelemetry collector contrib could have solution to solve the problem that splitting metrics names into key-value pair attributes?

@wu-sheng
Copy link
Member Author

Could you use set(target, value) and replace_match to achieve this? Only at this time replace_match is using regex to split the value of the specific key, such as dag_id. Such as only matching the text after the 2nd dot.

@mufiye
Copy link
Contributor

mufiye commented Feb 13, 2023

Could you use set(target, value) and replace_match to achieve this? Only at this time replace_match is using regex to split the value of the specific key, such as dag_id. Such as only matching the text after the 2nd dot.

I try it but it doesn't work. Because the third argument of replace_match is the string, and it will replace the key, we can't get the dag_id and use it as the third argument in the replace_match function. I have also tried the Split function before, but it will return a string array and the transform processor does not provide the index operation of this array.

@wu-sheng
Copy link
Member Author

Are you considering this too complex? In the transfer process, you should be able to hardcode most of them, right?

it will replace the key

Tag key is static and hard codes, such as task_id a key.

replace_match would change everything, it may not be a good one. Split should be good. You should have an expression to get the array and then use the index.

And I can find the index relative docs, https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md#lists

@mufiye
Copy link
Contributor

mufiye commented Feb 13, 2023

Are you considering this too complex? In the transfer process, you should be able to hardcode most of them, right?

it will replace the key

I think I just say something wrong, I want to say that it will change the value of the relative key.

@mufiye
Copy link
Contributor

mufiye commented Feb 13, 2023

replace_match would change everything, it may not be a good one. Split should be good. You should have an expression to get the array and then use the index.

And I can find the index relative docs, https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md#lists

I think we can not get the single string in the array. The doc says that "the grammar does not provide an accessor to individual list entries".

@wu-sheng
Copy link
Member Author

We could use this(replace ConvertCase to another function) to set the metric name without parameter.

metrics:
  set(metric.name, ConvertCase(metric.name, "snake"))

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md#convert-metric-name-to-snake-case


Meanwhile, we could set(attributes["job_id"], replace(metric.name, xxx)). Of course, the set(attributes...) must run first, otherwise, the metadata lost.


Could you check what I am missing?

@mufiye
Copy link
Contributor

mufiye commented Feb 13, 2023

We could use this(replace ConvertCase to another function) to set the metric name without parameter.

metrics:
  set(metric.name, ConvertCase(metric.name, "snake"))

https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/pkg/ottl/README.md#convert-metric-name-to-snake-case

Meanwhile, we could set(attributes["job_id"], replace(metric.name, xxx)). Of course, the set(attributes...) must run first, otherwise, the metadata lost.

Could you check what I am missing?

Yes, you are right. And I have tried this before by below config. The most important thing I think is how to process the attributes in set(attributes["job_id"], replace(metric.name, xxx)).

processors:
  transform:
    metric_statements:
      - context: resource
        statements:
      - context: datapoint
        statements:
          - set(attributes["job_id"], metric.name)
      - context: metric
        statements:
          - replace_match(name, "system.*.cpu", "system.cpu")

@wu-sheng
Copy link
Member Author

I just read https://opentelemetry.io/docs/reference/specification/metrics/data-model/#exponentialhistogram, it seems it is just the typical Prometheus Histogram setup in practice.

Back to you question

Should I make this exponentialHistogram type be supported or use summary type for timer metric?

We should transfer this to our histogram, I think. You need to get the bucket transfer correctly from exponentialHistogram to histogram.

@mufiye
Copy link
Contributor

mufiye commented Feb 22, 2023

I just read https://opentelemetry.io/docs/reference/specification/metrics/data-model/#exponentialhistogram, it seems it is just the typical Prometheus Histogram setup in practice.

Back to you question

Should I make this exponentialHistogram type be supported or use summary type for timer metric?

We should transfer this to our histogram, I think. You need to get the bucket transfer correctly from exponentialHistogram to histogram.

I will try to do it later. And there are some other essential points that need to be discussed.

  1. There are some labels in airflow metric names such as <job_name>, <task_id>, <dag_id>, <operator_name>, <pool_name> which represent the components in airflow. Which level should I classify these components to? In airflow concepts, dag contains lots of tasks to be run, pool is where tasks run in, operator_name is one kind of task and job I think is a larger concept than a task because it also includes the scheduler job. I think we classify all these components as endpoints?
  2. As before, we transform the "delta counter metric" to "gauge metric", we can represent some monotonic delta metric as the current metric in one specific period of time. For example, ti_successes metric describes "Overall task instances successes", so we can think the gauge metric as "the successful task instances in this period". But for some nonmonotonic delta counter metric, such as "dag_processing.processes" which means number of currently running DAG parsing processes, it can be negative in one period in my test. How should we handle this kind of metric? Maybe just show them originally for showing the trend of the value.

@wu-sheng
Copy link
Member Author

About <1>, the easiest way is,

  • pool seems a running env, we could catalog it as an instances, naming through pool:xxx. Is the pool shares among tasks?
  • the job, dag, operation_name could be various endpoints as running processes. We could name them by following /job/xxxx, /dag/yyy. Does this make sense?

@wu-sheng
Copy link
Member Author

But for some nonmonotonic delta counter metric, such as "dag_processing.processes" which means number of currently running DAG parsing processes, it can be negative in one period in my test. How should we handle this kind of metric?

How do the process could be negative? What does it mean originally? number of currently running DAG parsing processes should be 0 or positive logically.

@mufiye
Copy link
Contributor

mufiye commented Feb 22, 2023

But for some nonmonotonic delta counter metric, such as "dag_processing.processes" which means number of currently running DAG parsing processes, it can be negative in one period in my test. How should we handle this kind of metric?

How do the process could be negative? What does it mean originally? number of currently running DAG parsing processes should be 0 or positive logically.

Because the total number which is the sum of the gauge value means currently running DAG parsing processes. So one delta value can be negative. The "originally" means we just show the gauge value whether they are negative or positive.

@wu-sheng
Copy link
Member Author

Then, in this case, it seems we never get the absolute value, is it? Does it report absolute time somehow?

@mufiye
Copy link
Contributor

mufiye commented Feb 22, 2023

Then, in this case, it seems we never get the absolute value, is it? Does it report absolute time somehow?

sorry, I can't get it, could you explain your perspective more?

@wu-sheng
Copy link
Member Author

If a time-series value is delta, let's say (-5, 4, 3, 1, -4), unless we know the initial value is 10(or any value), we could know the exact value of process number(use your example).

So, do we have that number or do we have the total of processes? If there isn't, we only could see the trend.

@mufiye
Copy link
Contributor

mufiye commented Feb 22, 2023

If a time-series value is delta, let's say (-5, 4, 3, 1, -4), unless we know the initial value is 10(or any value), we could know the exact value of process number(use your example).

So, do we have that number or do we have the total of processes? If there isn't, we only could see the trend.

I think we can't get the total number of processes unless we add every delta value.

@wu-sheng
Copy link
Member Author

There is no all concept. That is my point on delta issue, we never are able to find out the initial value.

Could you check how this works on stated? Such as check and try apache/airflow#29449?

@mufiye
Copy link
Contributor

mufiye commented Feb 22, 2023

There is no all concept. That is my point on delta issue, we never are able to find out the initial value.

Could you check how this works on stated? Such as check and try apache/airflow#29449?

You mean to check how airflow collect metrics and send out stated data?

@wu-sheng
Copy link
Member Author

I think about how they visualize this type, so I think we could try this on Prometheus/Grafana.
AFAIK, we only could show this value as a trend, I don't know whether there is something we missed.

@mufiye
Copy link
Contributor

mufiye commented Feb 22, 2023

I think about how they visualize this type, so I think we could try this on Prometheus/Grafana. AFAIK, we only could show this value as a trend, I don't know whether there is something we missed.

Ok, I get it. I will check how they use their metrics.

@mufiye
Copy link
Contributor

mufiye commented Feb 22, 2023

About <1>, the easiest way is,

  • pool seems a running env, we could catalog it as an instances, naming through pool:xxx. Is the pool shares among tasks?
  • the job, dag, operation_name could be various endpoints as running processes. We could name them by following /job/xxxx, /dag/yyy. Does this make sense?

I think tasks and pool are inclusion relation, but others are not. Furthermore, by the metric name, we can not get which task is in which pool. Maybe make these components' level same is the only way.

@wu-sheng
Copy link
Member Author

I don't know as much as you are.
Pick a way you prefer, and we could discuss details when dashboards are out. Adjusting these is not hard.
Don't worry. Everytime, PR takes time.

@mufiye
Copy link
Contributor

mufiye commented Feb 24, 2023

I think about how they visualize this type, so I think we could try this on Prometheus/Grafana.
AFAIK, we only could show this value as a trend, I don't know whether there is something we missed.

I think it is because the counter definition in promethus metrics. A counter is a cumulative metric that represents a single monotonically increasing counter whose value can only increase or be reset to zero on restart.

@wu-sheng
Copy link
Member Author

If Prometheus could identify/use it as a counter, why can't we? We converted it to delta because it isn't cumulative.
What is missed here?

@mufiye
Copy link
Contributor

mufiye commented Feb 24, 2023

If Prometheus could identify/use it as a counter, why can't we? We converted it to delta because it isn't cumulative. What is missed here?

In my opinion, I think they always do the accumulation for the counter metrics weather they have been stored or not. But we can not do the accumulation for metrics have been stored.
I have not verified the process of counter metrics in promethus because I have not learned the golang. It's my future plan.

@wu-sheng
Copy link
Member Author

In my opinion, I think they always do the accumulation for the counter metrics whether they have been stored or not. But we can not do the accumulation for metrics that have been stored.

If you could push a counter to OAP, we could work on that. Your previous context is about there is a delta only.

@mufiye
Copy link
Contributor

mufiye commented Feb 24, 2023

In my opinion, I think they always do the accumulation for the counter metrics whether they have been stored or not. But we can not do the accumulation for metrics that have been stored.

If you could push a counter to OAP, we could work on that. Your previous context is about there is a delta only.

I think I can only push a "delta type counter" to the oap by otel collector. I think maybe we can support to accumulate "delta type counter"? It may be complicated but I can try to do it.
Or I just show the data trend by "delta type counter" data and do the dashboard first.

@wu-sheng
Copy link
Member Author

I think you need to check what is delta counter. Counter is increasing or reset. How does delta apply to this case?

@mufiye
Copy link
Contributor

mufiye commented Feb 24, 2023

I think you need to check what is delta counter. Counter is increasing or reset. How does delta apply to this case?

I think this dag_processing.process does not meet the Prometheus counter definition, it can decrease, I'm sure because I test it. It is the pr I find.

@wu-sheng
Copy link
Member Author

That is my point of asking. Only focus on this metric, whether they show, how they show.

@mufiye
Copy link
Contributor

mufiye commented Feb 24, 2023

That is my point of asking. Only focus on this metric, whether they show, how they show.

Ok, I get it.

@wu-sheng
Copy link
Member Author

wu-sheng commented Mar 8, 2023

@mufiye Any update or block?

@mufiye
Copy link
Contributor

mufiye commented Mar 8, 2023

@mufiye Any update or block?

I think I should block it here temporarily. I am preparing to find an internship now and have no time to continue this issue in the last two weeks. You can unassign this issue to me.
I think the next step is to add one mal function to the meter analyzer. Then write the mal rule and build the dashboard. If anyone take over this task, I can also provide support such as the config file of otel collector.
I'm sorry for this situation.

@wu-sheng
Copy link
Member Author

wu-sheng commented Mar 8, 2023

Got it. Thanks for the feedback.
Take your time for your own interest. That always matters primarily.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backend OAP backend related. feature New feature good first issue Good first issue for beginners
Projects
None yet
Development

No branches or pull requests

4 participants