Skip to content

Latest commit

 

History

History
189 lines (132 loc) · 13.8 KB

XX54-tuning-brave_and_foolish.asciidoc

File metadata and controls

189 lines (132 loc) · 13.8 KB

Hadoop Tuning

Cluster Sizing Rules of thumb

Let’s define

  • ram_total == total cluster ram = ram_per_machine * num_machines

  • cpu_bw_per_slot == actual CPU bandwidth per mapreduce slot for a highly cpu-intensive job ~= actual CPU bandwidth / cores per machine

  • disk_bw_per_slot == actual disk bandwidth per mapreduce slot ~= actual disk bandwidth / cores per machine

  • ntwk_bw_per_slot == actual ntwk bandwidth per mapreduce slot ~= actual ntwk bandwidth / cores per machine

  • X = amount of data sent to the reducers in the largest job the customer expects to run performantly.

  • max_data_under_management == amount of data stored on HDFS, not accounting for replication

Consider a job that reads X data into mapper, sends X data to the reducer, and emits X data from that reducer (put another way: one of the following stages will be limiting, in which case increasing the amount of data the other stages handle won’t slow down the job.)

  • A: mapper stage

    • reading X data from hdfs disk for mapper via local datanode

    • processing X data in mapper (not limiting)

    • spilling X data to scratch disk (parallelized with mapper reading)

    • sending X data over network to reducer (parallelized with mapper reading)

    • writing X data to scratch disk in first-pass merge sorting (parallelized with mapper reading)

  • B: follow-on merge sort stage

    • reading/writing (Q-2)*X data to scratch disk in follow-on merge sort passes, where Q is the total number of merge-sort passes required (somewhat parallelized with above). It goes as the log of the reduce size over the reducer child process ram size.

  • C: output stage

    • reading X data from scratch disk for reducer

    • processing X data in reducer (not limiting)

    • writing X data to hdfs disk via local datanode (not limiting)

    • sending/receiving 2*X data over network for replication (parallelized with reducing)

    • writing 2*X data to disk for replication by local datanode (parallelized with reducing)

* A well-configured cluster has disk_bw_per_slot as the limiting bottleneck.
  - ntwk_bw_per_slot >= disk_bw_per_slot in-rack
  - ntwk_bw_per_slot >=  80% disk_bw_per_slot between pairs of machines in different racks under full load.
  - cpu_bw_per_slot >= disk_bw_per_slot -- it's actually really hard to find the practical job where this isn't the case
* Thus stage A takes X / disk_bw_per_slot amount of time
* And stage C takes 3*X / disk_bw_per_slot amount of time
* If ram_total >= 40% X, the number of merge passes won't much exceed the runtime of the mappers. A job that sends more than two or three times ram_total to its reducers starts to really suck.

Implications:

  • ram_total = 40% of the largest amount of data they care to regularly process

  • cpu = size each machine so its cpu is faster than its disk, and not faster

  • network = size each machine so its network is faster than its disk, and not too much faster

  • cross-rack switches = size the cross-rack switches so actual bandwidth between pairs of machines in different racks under full load isn’t too much slower than the in-rack bandwidth (say, 80%)

  • the bigjob you’ve just sized the cluster against should take about (5 * X / disk_bw_per_machine) to run

  • total cluster disk capacity = size to 5 * max_data_under_management — factor of 3 for replication, another factor of 1.5 for computing by-production, and then bump it up for overhead

    • scratch space volume capacity >= 2 * X — I assume that X is much less than max_data_under_management, so scratch space fits in the overhead. It’s really nice to have the scratch disks and the hdfs disks held separate.

Top-line Performance/Sanity Checks

  • The first wave of Mappers should start simultaneously.

  • In general, all a job’s full block Map attempts should take roughly the same amount of time.

  • The full map phase should take around ‘average_Map_task_time*(number_of_Map_tasks/number_of_Mapper_slots+1)’

  • Very few non-local Map tasks.

  • Number of spills equals number of Map tasks (unless there are Combiners).

  • If there are Combiners, the Reducer input data should be much less than the Mapper output data (TODO: check this).

  • Record counts and data sizes for Mapper input, Reducer input and Reducer output should correspond to your conception of what the job is doing.

  • Map tasks are full speed (data rate matches your measured baseline)

  • Most Map tasks process a full block of data.

  • Processing stage of Reduce attempts should be full speed.

  • Not too many Merge passes in Reducers.

  • Shuffle and sort time is explained by the number of Merge passes.

  • Commit phase should be brief.

  • Total job runtime is not much more than the combined Map phase and Reduce phase runtimes.

  • Reducers generally process the same amount of data.

  • Most Reducers process at least enough data to be worth it. *

Performance Comparison Worksheet

(TODO: DL Make a table comparing performance baseline figures on AWS and fixed hardware. reference clusters.)

Memory

Here’s a plausible configuration for a 16-GB physical machine with 8 cores:

  `mapred.tasktracker.reduce.tasks.maximum`   = 2
  `mapred.tasktracker.map.tasks.maximum`      = 5
  `mapred.child.java.opts`                    = 2 GB
  `mapred.map.child.java.opts`                = blank (inherits mapred.child.java.opts)
  `mapred.reduce.child.java.opts`             = blank (inherits mapred.child.java.opts)

  total mappers' heap size                    = 10   GB (5 * 2GB)
  total reducers' heap size                   =  4   GB (2 * 2GB)
  datanode heap size                          =  0.5 GB
  tasktracker heap size                       =  0.5 GB
  .....                                         ...
  total                                       = 15   GB on a 16 GB machine
  • It’s rare that you need to increase the tasktracker heap at all. With both the TT and DN daemons, just monitor them under load; as long as the heap healthily exceeds their observed usage you’re fine.

  • If you find that most of your time is spent in reduce, you can grant the reducers more RAM with mapred.reduce.child.java.opts (in which case lower the child heap size setting for the mappers to compensate).

    • It’s standard practice to disable swap — you’re better off OOM’ing footnote[OOM = Out of Memory error, causing the kernel to start killing processes outright] than swapping. If you do not disable swap, make sure to reduce the swappiness sysctl (5 is reasonable). Also consider setting overcommit_memory (1) and overcommit_ratio (100). Your sysadmin might get angry when you suggest these changes — on a typical server, OOM errors cause pagers to go off. A misanthropically funny T-shirt, or whiskey, will help establish your bona fides.

    • io.sort.mb default X, recommended at least 1.25 * typical output size (so for a 128MB block size, 160). It’s reasonable to devote up to 70% of the child heap size to this value.

  • io.sort.factor: default X, recommended io.sort.mb * 0.x5 * (seeks/s) / (thruput MB/s)

    • you want transfer time to dominate seek time; too many input streams and the disk will spend more time switching among them than reading them.

    • you want the CPU well-fed: too few input streams and the merge sort will run the sort buffers dry.

    • My laptop does 76 seeks/s and has 56 MB/s throughput, so with io.sort.mb = 320 I’d set io.sort.factor to 27.

    • A server that does 100 seeks/s with 100 MB/s throughput and a 160MB sort buffer should set io.sort.factor to 80.

  • io.sort.record.percent default X, recommended X (but adjust for certain jobs)

  • mapred.reduce.parallel.copies: default X, recommended to be in the range of sqrt(Nw*Nm) to Nw*Nm/2 You should see the shuffle/copy phase of your reduce tasks speed up.

  • mapred.job.reuse.jvm.num.tasks default 1, recommended 10. If a job requires a fresh JVM for each process, you can override that in its jobconf. Going to -1 (reuse unlimited times) can fill up the dist if your input format uses "delete on exit" temporary files (as for example the S3 filesystem does), with little additional speedup.

  • You never want Java to be doing stop-the-world garbage collection, but for large JVM heap sizes (above 4GB) they can become especially dangerous. If a full garbage collect takes too long, sockets can time out, causing loads to increase, causing garbage collects to happen, causing…​ trouble, as you can guess.

  • Given the number of files and amount of data you’re storing, I would set the NN heap size agressively - at least 4GB to start, and keep an eye on it. Having the NN run out of memory is Not Good. Always make sure the secondary name node has the same heap setting as the name node.

Handlers and threads

  • dfs.namenode.handler.count: default X, recommended: (0.1 to 1) * size of cluster, depending on how many blocks your HDFS holds.

  • tasktracker.http.threads default X, recommended X

  • Set mapred.reduce.tasks so that all your reduce slots are utilized — If you typically only run one job at a time on the cluster, that means set it to the number of reduce slots. (You can adjust this per-job too). Roughly speaking: keep number of reducers * reducer memory within a factor of two of your reduce data size.

  • dfs.datanode.handler.count: controls how many connections the datanodes can maintain. It’s set to 3 — you need to account for the constant presence of the flume connections. I think this may be causing the datanode problems. Something like 8-10 is appropriate.

  • You’ve increased dfs.datanode.max.xcievers to 8k, which is good.

  • io.file.buffer.size: default X recommended 65536; always use a multiple of 4096.

Storage

  • mapred.system.dir: default X recommended /hadoop/mapred/system Note that this is a path on the HDFS, not the filesystem).

  • Ensure the HDFS data dirs (dfs.name.dir, dfs.data.dir and fs.checkpoint.dir), and the mapreduce local scratch dirs (mapred.system.dir) include all your data volumes (and are off the root partition). The more volumes to write to the better. Include all the volumes in all of the preceding. If you have a lot of volumes, you’ll need to ensure they’re all attended to; have 0.5-2x the number of cores as physical volumes.

    • HDFS-3652 — don’t name your dirs /data1/hadoop/nn, name them /data1/hadoop/nn1 ( final element differs).

  • Solid-state drives are unjustifiable from a cost perspective. Though they’re radically better on seek they don’t improve performance on bulk transfer, which is what limits Hadoop. Use regular disks.

  • Do not construct a RAID partition for Hadoop — it is happiest with a large JBOD. (There’s no danger to having hadoop sit on top of a RAID volume; you’re just hurting performance).

  • We use xfs; I’d avoid ext3.

  • Set the noatime option (turns off tracking of last-access-time) — otherwise the OS updates the disk on every read.

  • Increase the ulimits for open file handles (nofile) and number of processes (nproc) to a large number for the hdfs and mapred users: we use 32768 and 50000.

    • be aware: you need to fix the ulimit for root (?instead ? as well?)

  • dfs.blockreport.intervalMsec: default 3_600_000 (1 hour); recommended 21_600_000 (6 hours) for a large cluster.

    • 100_000 blocks per data node for a good ratio of CPU to disk

Other

  • mapred.map.output.compression.codec: default XX, recommended ``. Enable Snappy codec for intermediate task output.

    • mapred.compress.map.output

    • mapred.output.compress

    • mapred.output.compression.type

    • mapred.output.compression.codec

  • mapred.reduce.slowstart.completed.maps: default X, recommended 0.2 for a single-purpose cluster, 0.8 for a multi-user cluster. Controls how long, as a fraction of the full map run, the reducers should wait to start. Set this too high, and you use the network poorly — reducers will be waiting to copy all their data. Set this too low, and you will hog all the reduce slots.

  • mapred.map.tasks.speculative.execution: default: true, recommended: true. Speculative execution (FIXME: explain). So this setting makes jobs finish faster, but makes cluster utilization higher; the tradeoff is typically worth it, especially in a development environment. Disable this for any map-only job that writes to a database or has side effects besides its output. Also disable this if the map tasks are expensive and your cluster utilization is high.

  • mapred.reduce.tasks.speculative.execution: default false, recommended: false.

  • (hadoop log location): default /var/log/hadoop, recommended /var/log/hadoop (usually). As long as the root partition isn’t under heavy load, store the logs on the root partition. Check the Jobtracker however — it typically has a much larger log volume than the others, and low disk utilization otherwise. In other words: use the disk with the least competition.

  • fs.trash.interval default 1440 (one day), recommended 2880 (two days). I’ve found that files are either a) so huge I want them gone immediately, or b) of no real concern. A setting of two days lets you to realize in the afternoon today that you made a mistake in the morning yesterday,

  • Unless you have a ton of people using the cluster, increase the amount of time the jobtracker holds log and job info; it’s nice to be able to look back a couple days at least. Also increase mapred.jobtracker.completeuserjobs.maximum to a larger value. These are just for politeness to the folks writing jobs.

    • mapred.userlog.retain.hours

    • mapred.jobtracker.retirejob.interval

    • mapred.jobtracker.retirejob.check

    • mapred.jobtracker.completeuserjobs.maximum

    • mapred.job.tracker.retiredjobs.cache

    • mapred.jobtracker.restart.recover

  • Bump mapreduce.job.counters.limit — it’s not configurable per-job.