Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: influxdb_client/client/write: strip .0 on float values #181

Merged
merged 1 commit into from
Jan 21, 2021

Conversation

rogpeppe
Copy link
Contributor

@rogpeppe rogpeppe commented Jan 8, 2021

Proposed Changes

Floating point whole numbers are encoded with a trailing .0
which is unnecessary (integers are encoded with a trailing i character),
is different from other line-protocol encoders, and makes encoded
entries longer than necessary, so strip that suffix when it's seen.

Checklist

  • pytest tests completes successfully

Copy link
Contributor

@bednar bednar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @rogpeppe,

Thanks for you PR. One more thing that we need to fix before merge is dataframe_serializer.py.

Following code works correctly:

point.py

"""Point data structure to represent LineProtocol."""


import math
from builtins import int
from datetime import datetime, timedelta
from decimal import Decimal
from numbers import Integral

from pytz import UTC
from six import iteritems

from influxdb_client.client.util.date_utils import get_date_helper
from influxdb_client.domain.write_precision import WritePrecision

EPOCH = UTC.localize(datetime.utcfromtimestamp(0))
DEFAULT_WRITE_PRECISION = WritePrecision.NS
_ESCAPE_MEASUREMENT = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '\n': '\\n', '\t': '\\t', '\r': '\\r'})
_ESCAPE_KEY = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '=': r'\=', '\n': '\\n', '\t': '\\t', '\r': '\\r'})
_ESCAPE_STRING = str.maketrans({'\"': r"\"", "\\": r"\\"})


class Point(object):
    """
    Point defines the values that will be written to the database.

    Ref: http://bit.ly/influxdata-point
    """

    @staticmethod
    def measurement(measurement):
        """Create a new Point with specified measurement name."""
        p = Point(measurement)
        return p

    @staticmethod
    def from_dict(dictionary: dict, write_precision: WritePrecision = DEFAULT_WRITE_PRECISION):
        """Initialize point from 'dict' structure."""
        point = Point(dictionary['measurement'])
        if 'tags' in dictionary:
            for tag_key, tag_value in dictionary['tags'].items():
                point.tag(tag_key, tag_value)
        for field_key, field_value in dictionary['fields'].items():
            point.field(field_key, field_value)
        if 'time' in dictionary:
            point.time(dictionary['time'], write_precision=write_precision)
        return point

    def __init__(self, measurement_name):
        """Initialize defaults."""
        self._tags = {}
        self._fields = {}
        self._name = measurement_name
        self._time = None
        self._write_precision = DEFAULT_WRITE_PRECISION
        pass

    def time(self, time, write_precision=DEFAULT_WRITE_PRECISION):
        """
        Specify timestamp for DataPoint with declared precision.

        If time doesn't have specified timezone we assume that timezone is UTC.

        Examples::
            Point.measurement("h2o").field("val", 1).time("2009-11-10T23:00:00.123456Z")
            Point.measurement("h2o").field("val", 1).time(1257894000123456000)
            Point.measurement("h2o").field("val", 1).time(datetime(2009, 11, 10, 23, 0, 0, 123456))
            Point.measurement("h2o").field("val", 1).time(1257894000123456000, write_precision=WritePrecision.NS)


        :param time: the timestamp for your data
        :param write_precision: sets the precision for the supplied time values
        :return: this point
        """
        self._write_precision = write_precision
        self._time = time
        return self

    def tag(self, key, value):
        """Add tag with key and value."""
        self._tags[key] = value
        return self

    def field(self, field, value):
        """Add field with key and value."""
        self._fields[field] = value
        return self

    def to_line_protocol(self):
        """Create LineProtocol."""
        _measurement = _escape_key(self._name, _ESCAPE_MEASUREMENT)
        _tags = _append_tags(self._tags)
        _fields = _append_fields(self._fields)
        if not _fields:
            return ""
        _time = _append_time(self._time, self._write_precision)

        return f"{_measurement}{_tags}{_fields}{_time}"

    @property
    def write_precision(self):
        """Get precision."""
        return self._write_precision


def _append_tags(tags):
    _return = []
    for tag_key, tag_value in sorted(iteritems(tags)):

        if tag_value is None:
            continue

        tag = _escape_key(tag_key)
        value = _escape_tag_value(tag_value)
        if tag != '' and value != '':
            _return.append(f'{tag}={value}')

    return f"{',' if _return else ''}{','.join(_return)} "


def _append_fields(fields):
    _return = []

    for field, value in sorted(iteritems(fields)):
        if value is None:
            continue

        if isinstance(value, float) or isinstance(value, Decimal):
            if not math.isfinite(value):
                continue
            s = trailing_zeros(value)
            _return.append(f'{_escape_key(field)}={s}')
        elif isinstance(value, int) and not isinstance(value, bool):
            _return.append(f'{_escape_key(field)}={str(value)}i')
        elif isinstance(value, bool):
            _return.append(f'{_escape_key(field)}={str(value).lower()}')
        elif isinstance(value, str):
            _return.append(f'{_escape_key(field)}="{_escape_string(value)}"')
        else:
            raise ValueError()

    return f"{','.join(_return)}"


def trailing_zeros(value):
    s = str(value)
    # It's common to represent whole numbers as floats
    # and the trailing ".0" that Python produces is unnecessary
    # in line-protocol, inconsistent with other line-protocol encoders,
    # and takes more space than needed, so trim it off.
    if s.endswith('.0'):
        s = s[:-2]
    return s


def _append_time(time, write_precision):
    if time is None:
        return ''
    return f" {int(_convert_timestamp(time, write_precision))}"


def _escape_key(tag, escape_list=None):
    if escape_list is None:
        escape_list = _ESCAPE_KEY
    return str(tag).translate(escape_list)


def _escape_tag_value(value):
    ret = _escape_key(value)
    if ret.endswith('\\'):
        ret += ' '
    return ret


def _escape_string(value):
    return str(value).translate(_ESCAPE_STRING)


def _convert_timestamp(timestamp, precision=DEFAULT_WRITE_PRECISION):
    date_helper = get_date_helper()
    if isinstance(timestamp, Integral):
        return timestamp  # assume precision is correct if timestamp is int

    if isinstance(timestamp, str):
        timestamp = date_helper.parse_date(timestamp)

    if isinstance(timestamp, timedelta) or isinstance(timestamp, datetime):

        if isinstance(timestamp, datetime):
            if not timestamp.tzinfo:
                timestamp = UTC.localize(timestamp)
            else:
                timestamp = timestamp.astimezone(UTC)
            timestamp = timestamp - EPOCH

        ns = date_helper.to_nanoseconds(timestamp)

        if precision is None or precision == WritePrecision.NS:
            return ns
        elif precision == WritePrecision.US:
            return ns / 1e3
        elif precision == WritePrecision.MS:
            return ns / 1e6
        elif precision == WritePrecision.S:
            return ns / 1e9

    raise ValueError(timestamp)

dataframe_serializer.py

"""
Functions for serialize Pandas DataFrame.

Much of the code here is inspired by that in the aioinflux packet found here: https://github.com/gusutabopb/aioinflux
"""

import re
from functools import reduce
from itertools import chain

from influxdb_client.client.write.point import _ESCAPE_KEY, _ESCAPE_STRING, _ESCAPE_MEASUREMENT, trailing_zeros


def _replace(data_frame):
    from ...extras import np

    # string columns
    obj_cols = {k for k, v in dict(data_frame.dtypes).items() if v is np.dtype('O')}

    # number columns
    other_cols = set(data_frame.columns) - obj_cols

    obj_nans = (f'{k}=nan' for k in obj_cols)
    other_nans = (f'{k}=nani?' for k in other_cols)

    replacements = [
        ('|'.join(chain(obj_nans, other_nans)), ''),
        (',{2,}', ','),
        ('|'.join([', ,', ', ', ' ,']), ' '),
    ]

    return replacements


def _itertuples(data_frame):
    cols = [data_frame.iloc[:, k] for k in range(len(data_frame.columns))]
    return zip(data_frame.index, *cols)


def _is_nan(x):
    return x != x


def _any_not_nan(p, indexes):
    return any(map(lambda inx: not _is_nan(p[inx]), indexes))


def data_frame_to_list_of_points(data_frame, point_settings, **kwargs):
    """Serialize DataFrame into LineProtocols."""
    from ...extras import pd, np
    if not isinstance(data_frame, pd.DataFrame):
        raise TypeError('Must be DataFrame, but type was: {0}.'
                        .format(type(data_frame)))

    if 'data_frame_measurement_name' not in kwargs:
        raise TypeError('"data_frame_measurement_name" is a Required Argument')

    if isinstance(data_frame.index, pd.PeriodIndex):
        data_frame.index = data_frame.index.to_timestamp()
    else:
        data_frame.index = pd.to_datetime(data_frame.index)

    if data_frame.index.tzinfo is None:
        data_frame.index = data_frame.index.tz_localize('UTC')

    measurement_name = str(kwargs.get('data_frame_measurement_name')).translate(_ESCAPE_MEASUREMENT)
    data_frame_tag_columns = kwargs.get('data_frame_tag_columns')
    data_frame_tag_columns = set(data_frame_tag_columns or [])

    tags = []
    fields = []
    fields_indexes = []
    keys = []

    if point_settings.defaultTags:
        for key, value in point_settings.defaultTags.items():
            data_frame[key] = value
            data_frame_tag_columns.add(key)

    for index, (key, value) in enumerate(data_frame.dtypes.items()):
        key = str(key)
        keys.append(key.translate(_ESCAPE_KEY))
        key_format = f'{{keys[{index}]}}'

        index_value = index + 1
        if key in data_frame_tag_columns:
            tags.append({'key': key, 'value': f"{key_format}={{str(p[{index_value}]).translate(_ESCAPE_KEY)}}"})
        elif issubclass(value.type, np.integer):
            fields.append(f"{key_format}={{p[{index_value}]}}i")
            fields_indexes.append(index_value)
        elif issubclass(value.type, np.bool_):
            fields.append(f"{key_format}={{p[{index_value}]}}")
            fields_indexes.append(index_value)
        elif issubclass(value.type, np.float):
            fields.append(f"{key_format}={{trailing_zeros(p[{index_value}])}}")
            fields_indexes.append(index_value)
        else:
            fields.append(f"{key_format}=\"{{str(p[{index_value}]).translate(_ESCAPE_STRING)}}\"")
            fields_indexes.append(index_value)

    tags.sort(key=lambda x: x['key'])
    tags = ','.join(map(lambda y: y['value'], tags))

    fmt = ('{measurement_name}', f'{"," if tags else ""}', tags,
           ' ', ','.join(fields), ' {p[0].value}')
    f = eval("lambda p: f'{}'".format(''.join(fmt)),
             {'measurement_name': measurement_name, '_ESCAPE_KEY': _ESCAPE_KEY, '_ESCAPE_STRING': _ESCAPE_STRING,
              'keys': keys, 'trailing_zeros': trailing_zeros})

    for k, v in dict(data_frame.dtypes).items():
        if k in data_frame_tag_columns:
            data_frame[k].replace('', np.nan, inplace=True)

    isnull = data_frame.isnull().any(axis=1)

    if isnull.any():
        rep = _replace(data_frame)
        lp = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
              for p in filter(lambda x: _any_not_nan(x, fields_indexes), _itertuples(data_frame)))
        return list(lp)
    else:
        return list(map(f, _itertuples(data_frame)))

Regards

Floating point whole numbers are encoded with a trailing `.0`
which is unnecessary (integers are encoded with a trailing `i` character),
is different from other line-protocol encoders, and makes encoded
entries longer than necessary, so strip that suffix when it's seen.
@rogpeppe rogpeppe force-pushed the rog-001-strip-point-zero branch from 043ac52 to e14fc73 Compare January 21, 2021 11:28
@codecov
Copy link

codecov bot commented Jan 21, 2021

Codecov Report

Merging #181 (e14fc73) into master (5e6569c) will increase coverage by 0.01%.
The diff coverage is 100.00%.

Impacted file tree graph

@@            Coverage Diff             @@
##           master     #181      +/-   ##
==========================================
+ Coverage   89.81%   89.82%   +0.01%     
==========================================
  Files          26       26              
  Lines        1963     1966       +3     
==========================================
+ Hits         1763     1766       +3     
  Misses        200      200              
Impacted Files Coverage Δ
influxdb_client/client/write/point.py 98.40% <100.00%> (+0.03%) ⬆️

Continue to review full report at Codecov.

Legend - Click here to learn more
Δ = absolute <relative> (impact), ø = not affected, ? = missing data
Powered by Codecov. Last update 5e6569c...e14fc73. Read the comment docs.

Copy link
Contributor

@bednar bednar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for PR 👍

@bednar bednar merged commit e584676 into influxdata:master Jan 21, 2021
@bednar bednar added this to the 1.14.0 milestone Jan 21, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants