Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Metrics reporting system for native parallel batch ingestion #10352

Open
jihoonson opened this issue Sep 3, 2020 · 4 comments
Open

Metrics reporting system for native parallel batch ingestion #10352

jihoonson opened this issue Sep 3, 2020 · 4 comments

Comments

@jihoonson
Copy link
Contributor

jihoonson commented Sep 3, 2020

Motivation

Currently, the Parallel task doesn't provide any metrics and so you need to read through task logs when something goes wrong. Reading task logs is harder when it comes to Parallel task because you first need to find out what subtask went wrong in the supervisor task logs and then read through the subtask logs again. Even when you find the right task logs of a bad subtask, it might be hard to find anything good since task logs only have limited information of what the task has been doing which you need to interpret instead of actual metrics. We already find providing those metrics useful for streaming ingestion. The batch ingestion could similarly benefit from it.

Proposed changes

To help with easy debugging, the native parallel batch ingestion should provide useful metrics. These metrics will be exposed via both task reporting system and metrics emitter.

Task reports

Both live and complete task reports will be provided. Live reports will be provided while the ingestion task is running and complete task reports will be available once the task is done.

The subtask report will include metrics for bytes in/out, rows in/filtered/unparseable/out, disk spills, fetch time, and errors. The supervisor task report will include the metrics per phase which are mostly the average of subtask metrics.

Live reports

The live reports of the supervisor task will include:

  • Complete phases
    • Phase duration
    • Complete status (number of succeeded/failed subtasks)
    • Errors of N last failed subtasks
    • Average/min/max duration of succeeded subtasks
    • Average/min/max bytes/rows in/out of subtasks
    • Total bytes in/out of subtasks
    • Total rows in/filtered/unparseable/out of subtasks
    • Average/min/max number of disk spills of subtasks
    • Average/min/max fetch time of subtasks
    • Average/min/max number of created segments of subtasks
    • Total number of created segments
  • Current phase
    • Runtime of current phase
    • Progress (number of succeeded/failed/expected to succeed subtasks)
    • Errors of recently failed subtasks
    • Average/min/max duration of succeeded subtasks
    • Min/max bytes/rows in/out of succeeded subtasks
    • Moving average of running total of bytes/rows in/out of running subtasks
    • Running total of bytes in/out of subtasks (succeeded + running)
    • Running total of rows in/filtered/unparseable/out of subtasks (succeeded + running)
    • Average/min/max number of disk spills and spill time of succeeded subtasks
    • Average/min/max fetch time of succeeded subtasks
    • Average/min/max number of created segments of succeeded subtasks
    • Running total of number of created segments
  • Remaining phases
    • Remaining phase names

The live reports of subtasks will include:

  • Total bytes/rows in/out per emission period
  • Running total of bytes/rows in/filtered/unparseable/out
  • Running total of number of disk spills, average spill time
  • Fetch time
Complete reports

The complete reports of the supervisor task will include:

  • Duration of the supervisor task
  • Segment metrics
    • Total number of segments published
    • Average/min/max number of segments published per interval
    • Average/min/max number of rows per segment
  • Total bytes in/out
  • Total rows in/filtered/unparseable/out
  • Supervisor task error if any
  • Per-phase metrics
    • Phase duration
    • Total bytes in/out
    • Total rows in/filtered/unparseable/out
    • Number of succeeded/failed subtasks
    • Errors of N recent failed subtasks
    • Average/min/max runtime of subtasks
    • Average/min/max number of total disk spills and spill time of subtasks
    • Average/min/max fetch time of subtasks

The complete reports of the subtasks will include:

  • Total bytes in/out
  • Total rows in/filtered/unparseable/out
  • Total number of disk spills and spill time
  • Fetch time
  • Error if failed

Metrics

For task metrics, all the above metrics will be emitted via metrics emitter as well.

MiddleManager will additionally emit these metrics.

  • Shuffle bytes per emission period
  • Shuffle requests per emission period

Live reporting system for Parallel task

live reporting system
live reporting system (1)

  • Subtasks periodically send their live reports to the supervisor task
    • Failing in sending metrics can make the subtask fail. Needs retries.
    • Subtasks can report their current status with metrics directly to the supervisor task.
  • The supervisor task can use those reports as a heartbeat signal
    • If a missing report is found, the supervisor task will check with the Overlord to see if the subtask did fail. If the subtask is alive, the missing report should be noted in the live report of the supervisor task. If the subtask died, the supervisor task issues a new subtask for retry.
    • The supervisor task immediately retries failed subtasks when they report failures. They cannot succeed after reporting failures.
    • When subtasks report successes, the supervisor task checks with the Overlord if they did succeed. They can fail even after reporting successes.

Complete reporting system for Parallel task

  • The final report is pushed to both deep storage and to the supervisor task
    • TaskReportFileWriter will be used to push to deep storage in middleManager and indexer
    • TaskRunnerListener can be used to send to supervisor task in peon and indexer

Rationale

Rationale for the list of metrics

Live reports and metrics are mostly useful for debugging. The new metrics should be able to answer these questions.

How is my ingestion going?

  • How long has my supervisor task been running?
  • How long does each subtask run for?
  • How many phases left to run in my parallel ingestion?
  • How many subtasks left to run in the current phase?
  • How large data is each subtask processing?

Why is my ingestion slow?

  • Are there any intermittent subtask failures?
  • Is each subtask processing too many data?
  • In each subtask, are there too many spilling on disk?
  • Is shuffle slow?

What was the last state of my succeeded ingestion?

  • How many segments did my ingestion create?
  • What was total size of created segments?
  • How many subtask failures were in my ingestion? What were those failures if any?
  • How long did my ingestion take?

Why did my ingestion fail?

  • Were there any subtask failures? If so, what were the error messages?
  • Did the parallel task fail? If so, what was the error message?

Why does my ingestion not create segments?

  • How many rows were in published segments?
  • How many rows were filtered out?
  • How many rows were unparseable?

Why is my query slow after ingestion?

  • Are there too few rows per segment?
  • Are there too many segments per time chunk?

More HTTP connections for live reporting system

In the proposed live reporting system, each subtask needs to talk to its supervisor task over HTTP. This will result in more HTTP connections between tasks. However, I would like to go with the current approach for now instead of making connections between middleManagers because

  • Currently, there is another API in the supervisor task which every subtask directly calls to allocate new segments in dynamic partitioning. This API call cannot be delegated by middleManager without introducing a new async API framework.
  • Even though there is already a supervisor task API called by every subtask, I would say the number of HTTP connections won’t be that large in most cases. The number of HTTP connections is computed by maxNumConcurrentSubTasks * druid.global.http.numConnections (20 by default). In general, maxNumConcurrentSubTasks doesn’t go beyond 200 even in large clusters.
  • The concern with high number of connections could be that too many connections can affect query performance somehow in the data node model where a middleManager and a historical live in the same machine. However, even though every subtask is already making connections to the supervisor task in dynamic partitioning, I haven’t heard of any problem in that yet.
  • If we did observe some problem with very large number of HTTP connections, there would be still workarounds.
    • We can adjust maxNumConcurrentSubTasks or druid.global.http.numConnections. Especially for druid.global.http.numConnections, I’m not sure why it’s defaulted as 20 for peons and middleManagers. We should consider lowering it.
    • Another workaround is using Indexer. This problem doesn’t exist with Indexers since the connections will be made between Indexers, not tasks.

Additional memory pressure in the supervisor task

The supervisor task will track metrics per phase not per subtask (except for error messages in failed subtasks). The metrics for each phase is computed by aggregating subtask metrics whenever they send reports. As a result, the supervisor task needs to keep more or less 20 metrics per phase in memory. This shouldn't be large.

Operational impact

As described above, there will be two changes of more HTTP connections between peons and additional memory usage in the supervisor task. However, neither of them is expected to have a huge impact in operation.

Test plan

The live and complete reports should be tested in integration tests. I will perform some testing for metrics on our internal cluster.

@jihoonson
Copy link
Contributor Author

I haven't come up with metrics names yet. Will add them to this proposal later.

@liran-funaro
Copy link
Contributor

I'm delighted to see this proposal. I think adding these metrics are essential for users who need to analyze issues during ingestion.
Currently, to evaluate #10001, parsing logs was the only method that we could use to analyze resource consumption in our experiments and in production.
I hope this proposal will gain the attention it deserves.

@mghosh4
Copy link
Contributor

mghosh4 commented Oct 28, 2020

Thanks @jihoonson for working on this. This will be super useful. I had a few things that I wanted to draw your attention to:

  1. We have scenarios in our setup where maxNumConcurrentSubTasks can become as large as 700 or so. That being said, I do believe having 20 numConnections is highly unnecessary considering they will not get any queries (batch ingestion). Do you foresee any issues at this scale?
  2. Another alternate design that I can think of is that the tasks continue to report their metrics to the Overlord (heartbeat) and the supervisor task polls them like it already does to check for health status. It would mean slightly higher memory requirements on Overlord side assuming they might have to store this information. But overlord can probably store aggregates for most metrics. What are other downsides you see? One pro is it does not add any new HTTP connection requirements.

@loquisgon
Copy link

Putting the communication among tasks/indexer/supervisor in a streaming system (async) is an alternative to sync communication using HTTP.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants