Skip to content
This repository has been archived by the owner on Jul 13, 2023. It is now read-only.

Commit

Permalink
feat: add parquet unified schema and cleanup logging messages
Browse files Browse the repository at this point in the history
Adds a unified parquet stream for all log messages with a message
matcher for necessary endpoint and connection node messages.
Logging fields were cleaned up for consistency and status code
logging uses a 0 instead of "" to avoid mixing types.

Closes #874, #882
  • Loading branch information
bbangert committed Apr 27, 2017
1 parent 3882ef7 commit 63d2981
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 13 deletions.
2 changes: 1 addition & 1 deletion autopush/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def _init_info(self):
remote_ip=self.request.headers.get('x-forwarded-for',
self.request.remote_ip),
authorization=self.request.headers.get('authorization', ""),
message_ttl=self.request.headers.get('ttl', ""),
message_ttl=self.request.headers.get('ttl', None),
uri=self.request.uri,
python_version=sys.version,
)
Expand Down
8 changes: 4 additions & 4 deletions autopush/web/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,17 +269,17 @@ def _router_fail_err(self, fail):
self.log.failure(
format=fmt,
failure=fail, status_code=exc.status_code,
errno=exc.errno or "",
errno=exc.errno or 0,
client_info=self._client_info) # pragma nocover
if 200 <= exc.status_code < 300:
self.log.info(format="Success", status_code=exc.status_code,
logged_status=exc.logged_status or "",
logged_status=exc.logged_status or 0,
client_info=self._client_info)
elif 400 <= exc.status_code < 500:
self.log.info(format="Client error",
status_code=exc.status_code,
logged_status=exc.logged_status or "",
errno=exc.errno or "",
logged_status=exc.logged_status or 0,
errno=exc.errno or 0,
client_info=self._client_info)
self._router_response(exc)

Expand Down
2 changes: 1 addition & 1 deletion autopush/web/simplepush.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ def put(self, subscription, version, data):
# type: (Dict[str, Any], str, str) -> Deferred
user_data = subscription["user_data"]
self._client_info.update(
uaid=hasher(user_data.get("uaid")),
uaid_hash=hasher(user_data.get("uaid")),
channel_id=user_data.get("chid"),
message_id=version,
router_key=user_data["router_type"]
Expand Down
4 changes: 2 additions & 2 deletions autopush/web/webpush.py
Original file line number Diff line number Diff line change
Expand Up @@ -416,11 +416,11 @@ def post(self,
encoding = notification.headers.get('encoding', '')
self._client_info.update(
message_id=notification.message_id,
uaid=hasher(user_data.get("uaid")),
uaid_hash=hasher(user_data.get("uaid")),
channel_id=user_data.get("chid"),
router_key=user_data["router_type"],
message_size=len(notification.data or ""),
ttl=notification.ttl,
message_ttl=notification.ttl,
version=notification.version,
encoding=encoding,
)
Expand Down
10 changes: 5 additions & 5 deletions autopush/websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -1199,7 +1199,7 @@ def send_register_finish(self, result, endpoint, chid):
self.sendJSON(msg)
self.ps.metrics.increment("updates.client.register",
tags=self.base_tags)
self.log.info(format="Register", channelID=chid,
self.log.info(format="Register", channel_id=chid,
endpoint=endpoint,
uaid_hash=self.ps.uaid_hash,
user_agent=self.ps.user_agent,
Expand Down Expand Up @@ -1279,7 +1279,7 @@ def ver_filter(notif):
if found:
msg = found[0]
size = len(msg.data) if msg.data else 0
self.log.info(format="Ack", router_key="webpush", channelID=chid,
self.log.info(format="Ack", router_key="webpush", channel_id=chid,
message_id=version, message_source="direct",
message_size=size, uaid_hash=self.ps.uaid_hash,
user_agent=self.ps.user_agent, code=code,
Expand All @@ -1293,7 +1293,7 @@ def ver_filter(notif):
if found:
msg = found[0]
size = len(msg.data) if msg.data else 0
self.log.info(format="Ack", router_key="webpush", channelID=chid,
self.log.info(format="Ack", router_key="webpush", channel_id=chid,
message_id=version, message_source="stored",
message_size=size, uaid_hash=self.ps.uaid_hash,
user_agent=self.ps.user_agent, code=code,
Expand Down Expand Up @@ -1348,13 +1348,13 @@ def _handle_simple_ack(self, chid, version, code):
self.ps.direct_updates[chid] <= version:
del self.ps.direct_updates[chid]
self.log.info(format="Ack", router_key="simplepush",
channelID=chid, message_id=version,
channel_id=chid, message_id=version,
message_source="direct",
uaid_hash=self.ps.uaid_hash,
user_agent=self.ps.user_agent, code=code,
**self.ps.raw_agent)
return
self.log.info(format="Ack", router_key="simplepush", channelID=chid,
self.log.info(format="Ack", router_key="simplepush", channel_id=chid,
message_id=version, message_source="stored",
uaid_hash=self.ps.uaid_hash,
user_agent=self.ps.user_agent, code=code,
Expand Down
50 changes: 50 additions & 0 deletions parquet_schemas/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
Overview
---
These files are appended to the hindsight configurations that produce parquet
files in s3.

Structure
---
These files are organized by log stream. Log streams are named
`${app}.${type}.${source}`, where `app` is always `autopush`, `type` is
`autopush` or `autoendpoint`, and `source` is `docker.autopush` or
`docker.autoendpoint`, but may change based on logging configuration.

For each log stream there can be a `${log_stream}.cfg` file, or a directory
`${log_stream}/` which contains .cfg files. All cfg files must contain a
`parquet_schema`, and are interpreted as lua. If no cfg file is specified for a
stream, then a fallback schema is used. Files without a `.cfg` extension are
ignored.

Providing a file for a stream indicates that there is only one schema for the
stream, and generally only requires specifying `parquet_schema`. This field
accesses the message after PII is scrubbed. A string to string map of mozlog
`Fields` is provided, and all values are also copied outside the map for use as
schema columns, with field names modified by converting to lowercase, replacing
`.`s with `_`, and adding a prefix of `fields_`.

Providing a directory for a stream indicates that the stream is going to be
split into multiple parquet schemas, so `message_matcher` and
`s3_path_dimensions` should be specified. These fields access the message
before PII is scrubbed, so metadata is referenced as `Hostname`, mozlog
metadata is referenced as `Fields[Hostname]`, and mozlog fields are accessed
like `Fields[Fields.agent]`.

The default value of `message_matcher` will match the log stream for the file,
so extending the matcher is preferrable using the lua `..` operator for string
concatenation. In order to keep logs that don't match specific schemas, a
fallback cfg should be provided that negates all the other message matchers in
the directory, and uses the default mozlog parquet schema.

Date and Hour are special fields that are extracted from the mozlog `Timestamp`
value, for use in `s3_path_dimesions`. `s3_path_dimensions` is a list of
partition names mapped to a hindsight `read_message()` source. It is standard
to add a partition between `log` and `date` called `type` that matches the name
of the cfg file. The default value partitions logs by log stream (heka message
`Type`), `Date`, and `Hour` with:

s3_path_dimensions = {
{name="log", source="Type"},
{name="date", source="Fields[Date]"},
{name="hour", source="Fields[Hour]"},
}
46 changes: 46 additions & 0 deletions parquet_schemas/autopush/push.cfg
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
-- vim:ft=lua
message_matcher = message_matcher .. " && (Fields[Fields.message] == 'Successful delivery' || Fields[Fields.message] == 'Router miss, message stored.' || Fields[Fields.message] == 'Request timings'"
message_matcher = message_matcher .. " || Fields[Fields.message] == 'hello' || Fields[Fields.message] == 'Register' || Fields[Fields.message] == 'Unregister' || Fields[Fields.message] == 'Ack' || Fields[Fields.message] == 'Nack')"

s3_path_dimensions = {
{name="log", source="Type"},
{name="date", source="Fields[Date]"},
{name="hour", source="Fields[Hour]"},
}

parquet_schema = [=[
message Log {
required int64 Timestamp;
optional binary Type (UTF8);
optional binary Hostname (UTF8);
optional binary Logger (UTF8);
optional binary EnvVersion (UTF8);
optional int64 Severity;
optional int64 Pid;
optional int64 fields_code;
optional int64 fields_errno;
optional binary fields_remote_ip (UTF8);
optional binary fields_uaid_hash (UTF8);
optional binary fields_message (UTF8);
optional boolean fields_error;
optional binary fields_user_agent (UTF8);
optional binary fields_channel_id (UTF8);
optional binary fields_jwt_aud (UTF8);
optional binary fields_jwt_crypto_key (UTF8);
optional binary fields_jwt_sub (UTF8);
optional binary fields_message_id (UTF8);
optional int64 fields_message_size;
optional binary fields_message_source (UTF8);
optional int64 fields_message_ttl;
optional int64 fields_request_time;
optional int64 fields_route_time;
optional int64 fields_validation_time;
optional binary fields_router_key (UTF8);
optional int64 fields_status_code;
optional binary fields_ua_browser_family (UTF8);
optional binary fields_ua_browser_ver (UTF8);
optional binary fields_ua_os_family (UTF8);
optional binary fields_ua_os_ver (UTF8);
}
]=]

0 comments on commit 63d2981

Please sign in to comment.