From 291ab7c0513db5644915cee4daee048136e955fb Mon Sep 17 00:00:00 2001 From: Ben Bangert Date: Thu, 27 Apr 2017 09:28:13 -0700 Subject: [PATCH] feat: add parquet unified schema and cleanup logging messages Adds a unified parquet stream for all log messages with a message matcher for necessary endpoint and connection node messages. Logging fields were cleaned up for consistency and status code logging uses a 0 instead of "" to avoid mixing types. Closes #874, #882 --- autopush/web/base.py | 8 ++--- autopush/web/webpush.py | 4 +-- autopush/websocket.py | 10 +++---- parquet_schemas/README.md | 50 +++++++++++++++++++++++++++++++ parquet_schemas/autopush/push.cfg | 46 ++++++++++++++++++++++++++++ 5 files changed, 107 insertions(+), 11 deletions(-) create mode 100644 parquet_schemas/README.md create mode 100644 parquet_schemas/autopush/push.cfg diff --git a/autopush/web/base.py b/autopush/web/base.py index 759789d8..9fa44798 100644 --- a/autopush/web/base.py +++ b/autopush/web/base.py @@ -269,17 +269,17 @@ def _router_fail_err(self, fail): self.log.failure( format=fmt, failure=fail, status_code=exc.status_code, - errno=exc.errno or "", + errno=exc.errno or 0, client_info=self._client_info) # pragma nocover if 200 <= exc.status_code < 300: self.log.info(format="Success", status_code=exc.status_code, - logged_status=exc.logged_status or "", + logged_status=exc.logged_status or 0, client_info=self._client_info) elif 400 <= exc.status_code < 500: self.log.info(format="Client error", status_code=exc.status_code, - logged_status=exc.logged_status or "", - errno=exc.errno or "", + logged_status=exc.logged_status or 0, + errno=exc.errno or 0, client_info=self._client_info) self._router_response(exc) diff --git a/autopush/web/webpush.py b/autopush/web/webpush.py index 57c51805..f99bc95b 100644 --- a/autopush/web/webpush.py +++ b/autopush/web/webpush.py @@ -413,11 +413,11 @@ def post(self, user_data = subscription["user_data"] self._client_info.update( message_id=notification.message_id, - uaid=hasher(user_data.get("uaid")), + uaid_hash=hasher(user_data.get("uaid")), channel_id=user_data.get("chid"), router_key=user_data["router_type"], message_size=len(notification.data or ""), - ttl=notification.ttl, + message_ttl=notification.ttl, version=notification.version ) diff --git a/autopush/websocket.py b/autopush/websocket.py index 93be35dc..55205a5c 100644 --- a/autopush/websocket.py +++ b/autopush/websocket.py @@ -1199,7 +1199,7 @@ def send_register_finish(self, result, endpoint, chid): self.sendJSON(msg) self.ps.metrics.increment("updates.client.register", tags=self.base_tags) - self.log.info(format="Register", channelID=chid, + self.log.info(format="Register", channel_id=chid, endpoint=endpoint, uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, @@ -1279,7 +1279,7 @@ def ver_filter(notif): if found: msg = found[0] size = len(msg.data) if msg.data else 0 - self.log.info(format="Ack", router_key="webpush", channelID=chid, + self.log.info(format="Ack", router_key="webpush", channel_id=chid, message_id=version, message_source="direct", message_size=size, uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, code=code, @@ -1293,7 +1293,7 @@ def ver_filter(notif): if found: msg = found[0] size = len(msg.data) if msg.data else 0 - self.log.info(format="Ack", router_key="webpush", channelID=chid, + self.log.info(format="Ack", router_key="webpush", channel_id=chid, message_id=version, message_source="stored", message_size=size, uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, code=code, @@ -1348,13 +1348,13 @@ def _handle_simple_ack(self, chid, version, code): self.ps.direct_updates[chid] <= version: del self.ps.direct_updates[chid] self.log.info(format="Ack", router_key="simplepush", - channelID=chid, message_id=version, + channel_id=chid, message_id=version, message_source="direct", uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, code=code, **self.ps.raw_agent) return - self.log.info(format="Ack", router_key="simplepush", channelID=chid, + self.log.info(format="Ack", router_key="simplepush", channel_id=chid, message_id=version, message_source="stored", uaid_hash=self.ps.uaid_hash, user_agent=self.ps.user_agent, code=code, diff --git a/parquet_schemas/README.md b/parquet_schemas/README.md new file mode 100644 index 00000000..a00e34e1 --- /dev/null +++ b/parquet_schemas/README.md @@ -0,0 +1,50 @@ +Overview +--- +These files are appended to the hindsight configurations that produce parquet +files in s3. + +Structure +--- +These files are organized by log stream. Log streams are named +`${app}.${type}.${source}`, where `app` is always `autopush`, `type` is +`autopush` or `autoendpoint`, and `source` is `docker.autopush` or +`docker.autoendpoint`, but may change based on logging configuration. + +For each log stream there can be a `${log_stream}.cfg` file, or a directory +`${log_stream}/` which contains .cfg files. All cfg files must contain a +`parquet_schema`, and are interpreted as lua. If no cfg file is specified for a +stream, then a fallback schema is used. Files without a `.cfg` extension are +ignore.d + +Providing a file for a stream indicates that there is only one schema for the +stream, and generally only requires specifying `parquet_schema`. This field +accesses the message after PII is scrubbed. A string to string map of mozlog +`Fields` is provided, and all values are also copied outside the map for use as +schema columns, with field names modified by converting to lowercase, replacing +`.`s with `_`, and adding a prefix of `fields_`. + +Providing a directory for a stream indicates that the stream is going to be +split into multiple parquet schemas, so `message_matcher` and +`s3_path_dimensions` should be specified. These fields access the message +before PII is scrubbed, so metadata is referenced as `Hostname`, mozlog +metadata is referenced as `Fields[Hostname]`, and mozlog fields are accessed +like `Fields[Fields.agent]`. + +The default value of `message_matcher` will match the log stream for the file, +so extending the matcher is preferrable using the lua `..` operator for string +concatenation. In order to keep logs that don't match specific schemas, a +fallback cfg should be provided that negates all the other message matchers in +the directory, and uses the default mozlog parquet schema. + +Date and Hour are special fields that are extracted from the mozlog `Timestamp` +value, for use in `s3_path_dimesions`. `s3_path_dimensions` is a list of +partition names mapped to a hindsight `read_message()` source. It is standard +to add a partition between `log` and `date` called `type` that matches the name +of the cfg file. The default value partitions logs by log stream (heka message +`Type`), `Date`, and `Hour` with: + + s3_path_dimensions = { + {name="log", source="Type"}, + {name="date", source="Fields[Date]"}, + {name="hour", source="Fields[Hour]"}, + } diff --git a/parquet_schemas/autopush/push.cfg b/parquet_schemas/autopush/push.cfg new file mode 100644 index 00000000..dec8569f --- /dev/null +++ b/parquet_schemas/autopush/push.cfg @@ -0,0 +1,46 @@ +-- vim:ft=lua +message_matcher = message_matcher .. " && (Fields[Fields.message] == 'Successful delivery' || Fields[Fields.message] == 'Router miss, message stored.' || Fields[Fields.message] == 'Request timings'" +message_matcher = message_matcher .. " || Fields[Fields.message] == 'hello' || Fields[Fields.message] == 'Register' || Fields[Fields.message] == 'Unregister' || Fields[Fields.message] == 'Ack' || Fields[Fields.message] == 'Nack')" + +s3_path_dimensions = { + {name="log", source="Type"}, + {name="date", source="Fields[Date]"}, + {name="hour", source="Fields[Hour]"}, +} + +parquet_schema = [=[ +message Log { + required int64 Timestamp; + optional binary Type (UTF8); + optional binary Hostname (UTF8); + optional binary Logger (UTF8); + optional binary EnvVersion (UTF8); + optional int64 Severity; + optional int64 Pid; + + optional int64 fields_code; + optional binary fields_remote_ip (UTF8); + optional binary fields_uaid_hash (UTF8); + optional binary fields_message (UTF8); + optional boolean fields_error; + optional binary fields_user_agent (UTF8); + optional binary fields_channel_id (UTF8); + optional binary fields_jwt_aud (UTF8); + optional binary fields_jwt_crypto_key (UTF8); + optional binary fields_jwt_sub (UTF8); + optional binary fields_message_id (UTF8); + optional int64 fields_message_size; + optional binary fields_message_source (UTF8); + optional int64 fields_message_ttl; + optional int64 fields_request_time; + optional int64 fields_route_time; + optional int64 fields_validation_time; + optional binary fields_router_key (UTF8); + optional int64 fields_status_code; + optional binary fields_ua_browser_family (UTF8); + optional binary fields_ua_browser_ver (UTF8); + optional binary fields_ua_os_family (UTF8); + optional binary fields_ua_os_ver (UTF8); + optional binary fields_system (UTF8); +} +]=]