Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: add parquet unified schema and cleanup logging messages
Browse files Browse the repository at this point in the history
Adds a unified parquet stream for all log messages with a message
matcher for necessary endpoint and connection node messages.
Logging fields were cleaned up for consistency and status code
logging uses a 0 instead of "" to avoid mixing types.

Closes #874, #882
  • Loading branch information
bbangert committed Apr 27, 2017
1 parent ec46786 commit ec271c8
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 7 deletions.
4 changes: 2 additions & 2 deletions autopush/web/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -413,11 +413,11 @@ def post(self,
user_data = subscription["user_data"]
self._client_info.update(
message_id=notification.message_id,
uaid=hasher(user_data.get("uaid")),
uaid_hash=hasher(user_data.get("uaid")),
channel_id=user_data.get("chid"),
router_key=user_data["router_type"],
message_size=len(notification.data or ""),
ttl=notification.ttl,
message_ttl=notification.ttl,
version=notification.version
)

Expand Down
10 changes: 5 additions & 5 deletions autopush/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ def send_register_finish(self, result, endpoint, chid):
self.sendJSON(msg)
self.ps.metrics.increment("updates.client.register",
tags=self.base_tags)
self.log.info(format="Register", channelID=chid,
self.log.info(format="Register", channel_id=chid,
endpoint=endpoint,
uaid_hash=self.ps.uaid_hash,
user_agent=self.ps.user_agent,
Expand Down Expand Up @@ -1279,7 +1279,7 @@ def ver_filter(notif):
if found:
msg = found[0]
size = len(msg.data) if msg.data else 0
self.log.info(format="Ack", router_key="webpush", channelID=chid,
self.log.info(format="Ack", router_key="webpush", channel_id=chid,
message_id=version, message_source="direct",
message_size=size, uaid_hash=self.ps.uaid_hash,
user_agent=self.ps.user_agent, code=code,
Expand All @@ -1293,7 +1293,7 @@ def ver_filter(notif):
if found:
msg = found[0]
size = len(msg.data) if msg.data else 0
self.log.info(format="Ack", router_key="webpush", channelID=chid,
self.log.info(format="Ack", router_key="webpush", channel_id=chid,
message_id=version, message_source="stored",
message_size=size, uaid_hash=self.ps.uaid_hash,
user_agent=self.ps.user_agent, code=code,
Expand Down Expand Up @@ -1348,13 +1348,13 @@ def _handle_simple_ack(self, chid, version, code):
self.ps.direct_updates[chid] <= version:
del self.ps.direct_updates[chid]
self.log.info(format="Ack", router_key="simplepush",
channelID=chid, message_id=version,
channel_id=chid, message_id=version,
message_source="direct",
uaid_hash=self.ps.uaid_hash,
user_agent=self.ps.user_agent, code=code,
**self.ps.raw_agent)
return
self.log.info(format="Ack", router_key="simplepush", channelID=chid,
self.log.info(format="Ack", router_key="simplepush", channel_id=chid,
message_id=version, message_source="stored",
uaid_hash=self.ps.uaid_hash,
user_agent=self.ps.user_agent, code=code,
Expand Down
50 changes: 50 additions & 0 deletions parquet_schemas/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
Overview
---
These files are appended to the hindsight configurations that produce parquet
files in s3.

Structure
---
These files are organized by log stream. Log streams are named
`${app}.${type}.${source}`, where `app` is always `autopush`, `type` is
`autopush` or `autoendpoint`, and `source` is `docker.autopush` or
`docker.autoendpoint`, but may change based on logging configuration.

For each log stream there can be a `${log_stream}.cfg` file, or a directory
`${log_stream}/` which contains .cfg files. All cfg files must contain a
`parquet_schema`, and are interpreted as lua. If no cfg file is specified for a
stream, then a fallback schema is used. Files without a `.cfg` extension are
ignore.d

Providing a file for a stream indicates that there is only one schema for the
stream, and generally only requires specifying `parquet_schema`. This field
accesses the message after PII is scrubbed. A string to string map of mozlog
`Fields` is provided, and all values are also copied outside the map for use as
schema columns, with field names modified by converting to lowercase, replacing
`.`s with `_`, and adding a prefix of `fields_`.

Providing a directory for a stream indicates that the stream is going to be
split into multiple parquet schemas, so `message_matcher` and
`s3_path_dimensions` should be specified. These fields access the message
before PII is scrubbed, so metadata is referenced as `Hostname`, mozlog
metadata is referenced as `Fields[Hostname]`, and mozlog fields are accessed
like `Fields[Fields.agent]`.

The default value of `message_matcher` will match the log stream for the file,
so extending the matcher is preferrable using the lua `..` operator for string
concatenation. In order to keep logs that don't match specific schemas, a
fallback cfg should be provided that negates all the other message matchers in
the directory, and uses the default mozlog parquet schema.

Date and Hour are special fields that are extracted from the mozlog `Timestamp`
value, for use in `s3_path_dimesions`. `s3_path_dimensions` is a list of
partition names mapped to a hindsight `read_message()` source. It is standard
to add a partition between `log` and `date` called `type` that matches the name
of the cfg file. The default value partitions logs by log stream (heka message
`Type`), `Date`, and `Hour` with:

s3_path_dimensions = {
{name="log", source="Type"},
{name="date", source="Fields[Date]"},
{name="hour", source="Fields[Hour]"},
}
46 changes: 46 additions & 0 deletions parquet_schemas/autopush/push.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
-- vim:ft=lua
message_matcher = message_matcher .. " && (Fields[Fields.message] == 'Successful delivery' || Fields[Fields.message] == 'Router miss, message stored.' || Fields[Fields.message] == 'Request timings'"
message_matcher = message_matcher .. " || Fields[Fields.message] == 'hello' || Fields[Fields.message] == 'Register' || Fields[Fields.message] == 'Unregister' || Fields[Fields.message] == 'Ack' || Fields[Fields.message] == 'Nack')"

s3_path_dimensions = {
{name="log", source="Type"},
{name="date", source="Fields[Date]"},
{name="hour", source="Fields[Hour]"},
}

parquet_schema = [=[
message Log {
required int64 Timestamp;
optional binary Type (UTF8);
optional binary Hostname (UTF8);
optional binary Logger (UTF8);
optional binary EnvVersion (UTF8);
optional int64 Severity;
optional int64 Pid;
optional int64 fields_code;
optional binary fields_remote_ip (UTF8);
optional binary fields_uaid_hash (UTF8);
optional binary fields_message (UTF8);
optional boolean fields_error;
optional binary fields_user_agent (UTF8);
optional binary fields_channel_id (UTF8);
optional binary fields_jwt_aud (UTF8);
optional binary fields_jwt_crypto_key (UTF8);
optional binary fields_jwt_sub (UTF8);
optional binary fields_message_id (UTF8);
optional int64 fields_message_size;
optional binary fields_message_source (UTF8);
optional int64 fields_message_ttl;
optional int64 fields_request_time;
optional int64 fields_route_time;
optional int64 fields_validation_time;
optional binary fields_router_key (UTF8);
optional int64 fields_status_code;
optional binary fields_ua_browser_family (UTF8);
optional binary fields_ua_browser_ver (UTF8);
optional binary fields_ua_os_family (UTF8);
optional binary fields_ua_os_ver (UTF8);
optional binary fields_system (UTF8);
}
]=]

0 comments on commit ec271c8

Please sign in to comment.