-
-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[monitoring] Adding influxDB 2.x version support #274 #584
base: master
Are you sure you want to change the base?
Conversation
e2b2446
to
295446d
Compare
83694c5
to
cfe8da0
Compare
87f36b9
to
d02db9c
Compare
5647cd4
to
0aa3c6d
Compare
June 18th weekly call summary:
|
Fixes #274
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's too many if
statements in the code (eg: if influxdb1 or influxdb2).
This is not the way to go. The logic for influxdb1 or influxdb2 must be encapsulated in each respective timeseries DB backend, the rest of the code should just be adapted to call that logic.
Please have a look at the work which was done for the elasticsearch backend to get an idea:
#164
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In our call today, @praptisharma28 walked me through the code and helped me understand the changes. In this process, we found some areas for improvement.
timezone=settings.TIME_ZONE | ||
): | ||
bucket = self.bucket | ||
measurement = params.get('measurement') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
params.get('measurement') is equal to params.get('key'). Let's avoid duplicating the values that are already present to the method.
'start_date': start_date, | ||
'end_date': end_date, | ||
'measurement': self.config_dict.get('measurement', self.metric.key), | ||
'field_name': fields or self.config_dict.get('field_name'), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you are not using the field_name
in the query, then please remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I used pdb, got:
(Pdb) params
{'field_name': None, 'key': 'test_metric', 'time': '2024-06-25', 'days': '7d', 'content_type': 'openwisp_users.user', 'object_id': '4f5f77d3-3b30-4eca-97c6-1301217a4edc', 'start_date': None, 'end_date': None, 'measurement': 'test_metric'}
But if I remove it, I lose on the summary of uptime
chart. Charts like mostly reachable, unreachable, partially reachable
are lost.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
params.update({ | ||
'start_date': start_date, | ||
'end_date': end_date, | ||
'measurement': self.config_dict.get('measurement', self.metric.key), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is duplicate to key
, let's remove it.
return timeseries_db._get_top_fields( | ||
query=q, | ||
default_query=self._default_query, | ||
query=self.get_query(), | ||
chart_type=self.type, | ||
group_map=self._get_group_map(params['days']), | ||
number=number, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The InfluxDB 2 client does not accept default_query
keyword argument. We need to ensure that this works at par with InfluxDB 1 and add any required tests for this method.
points = summary = timeseries_db._get_top_fields( | ||
default_query=self._default_query, | ||
chart_type=self.type, | ||
group_map=self.GROUP_MAP, | ||
number=self.top_fields, | ||
params=self._get_query_params(self.DEFAULT_TIME), | ||
time=time, | ||
query=self.query, | ||
get_fields=False, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change is wrong. Please revert them to what was here before.
summary = timeseries_db.get_list_query(summary_query) | ||
points = timeseries_db.get_list_query(data_query, key=self.metric.key) | ||
summary = timeseries_db.get_list_query( | ||
summary_query, key=self.metric.key |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
summary_query, key=self.metric.key | |
summary_query, |
I don't see the key
argument being used in the timeseries_db.get_list_query
method. Remove it if it is not needed.
for point in points: | ||
time_value = point.get('time') or point.get('_time') |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's avoid doing this and rename the time field in the flux query.
if not time_value: | ||
logger.warning(f"Point missing time value: {point}") | ||
continue |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will this code ever get executed? Isn't every point in timeseries db mapped to a timestamp.
if decimal_places and isinstance(value, (int, float)): | ||
if decimal_places is not None and value is not None: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you please verify the error you were getting here?
def get_ping_data_query(self, bucket, start, stop, device_ids): | ||
device_filter = ' or '.join([f'r["object_id"] == "{id}"' for id in device_ids]) | ||
query = f''' | ||
from(bucket: "{bucket}") | ||
|> range(start: {start}, stop: {stop}) | ||
|> filter(fn: (r) => r["_measurement"] == "ping") | ||
|> filter(fn: (r) => r["_field"] == "loss" or r["_field"] == "reachable" or r["_field"] == "rtt_avg" or r["_field"] == "rtt_max" or r["_field"] == "rtt_min") | ||
|> filter(fn: (r) => r["content_type"] == "config.device") | ||
|> filter(fn: (r) => {device_filter}) | ||
|> aggregateWindow(every: v.windowPeriod, fn: mean, createEmpty: false) | ||
|> yield(name: "mean") | ||
''' | ||
return query |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please move this query to queries.py and use get_query()
method to generate the final query.
bucket = self.bucket | ||
|
||
# Start building the Flux query | ||
flux_query = f'from(bucket:"{bucket}")' |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@praptisharma28 why are you building the query like this here? Why aren't you using the queries defined in queries.py?
This looks wrong. Charts for different metrics are handled differently. That's why there are individual queries for each chart.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My bad, I got confused between the get_list_queries
and read
method. The current implementation is okay.
Fixes #274
Checks: