Skip to content

wukong hadoop tuning experiments

Philip (flip) Kromer edited this page May 9, 2012 · 1 revision

wukong-tuning -- Hadoop Cluster Tuning Plan

Metrics

script name 
            
total       	map   	GB      	input size              
total       	map   	GB      	output size             
total       	reduce	GB      	output size             
            
/task       	map   	GB      	input                   
/task       	map   	GB      	output                  
/task       	reduce	GB      	output                  
            
            	map   	        	function (eg FILTER)    
            	reduce	        	function (eg DISTINCT)  
            
total       	job   	duration
            
10/50/90/max	map   	time    	end                     
10/50/90/max	map   	duration	run                     
10/50/90/max	reduce	time    	end                     
10/50/90/max	reduce	duration	run                     
            
total       	reduce	slots   
total       	reduce	tasks   
total       	map   	slots   
total       	map   	tasks   
total       	map   	tasks   	non-data-local map tasks
            
/machine    	reduce	slots   	                        
/machine    	map   	slots   	                        
            
            	input 	        	path names              
            	input 	        	filesystem              
            	output	        	filesystem              
            	      	        	machine size            
            	      	machines	count of machines       
            	map   	GB      	child heap size         
            	reduce	GB      	child heap size         

Comparison

Write

size=4096 # 4 GB
src_file=/mnt/tmp/junk-src-$size
out_file=/mnt/tmp/junk-out-$size
#
time dd if=/dev/rand  of=$out_file   bs=1024k count=$size
time dd if=$src_file  of=/dev/null   bs=1024k count=$size
time dd if=$src_file  of=$out_file   bs=1024k count=$size

Important parameters

from Job Configuration:

mapred.job.shuffle.merge.percent float The memory threshold for fetched map outputs before an in-memory merge is started, expressed as a percentage of memory allocated to storing map outputs in memory. Since map outputs that can't fit in memory can be stalled, setting this high may decrease parallelism between the fetch and merge. Conversely, values as high as 1.0 have been effective for reduces whose input can fit entirely in memory. This parameter influences only the frequency of in-memory merges during the shuffle. io.sort.factor int Specifies the number of segments on disk to be merged at the same time. It limits the number of open files and compression codecs during the merge. If the number of files exceeds this limit, the merge will proceed in several passes. Though this limit also applies to the map, most jobs should be configured so that hitting this limit is unlikely there. mapred.inmem.merge.threshold int The number of sorted map outputs fetched into memory before being merged to disk. Like the spill thresholds in the preceding note, this is not defining a unit of partition, but a trigger. In practice, this is usually set very high (1000) or disabled (0), since merging in-memory segments is often less expensive than merging from disk (see notes following this table). This threshold influences only the frequency of in-memory merges during the shuffle. mapred.job.shuffle.merge.percent float The memory threshold for fetched map outputs before an in-memory merge is started, expressed as a percentage of memory allocated to storing map outputs in memory. Since map outputs that can't fit in memory can be stalled, setting this high may decrease parallelism between the fetch and merge. Conversely, values as high as 1.0 have been effective for reduces whose input can fit entirely in memory. This parameter influences only the frequency of in-memory merges during the shuffle. mapred.job.shuffle.input.buffer.percent float The percentage of memory- relative to the maximum heapsize as typically specified in mapred.child.java.opts- that can be allocated to storing map outputs during the shuffle. Though some memory should be set aside for the framework, in general it is advantageous to set this high enough to store large and numerous map outputs. mapred.job.reduce.input.buffer.percent float The percentage of memory relative to the maximum heapsize in which map outputs may be retained during the reduce. When the reduce begins, map outputs will be merged to disk until those that remain are under the resource limit this defines. By default, all map outputs are merged to disk before the reduce begins to maximize the memory available to the reduce. For less memory-intensive reduces, this should be increased to avoid trips to disk.

Split

  • number of map tasks:
    • mapred.max.split.size, mapred.min.split.size, size of file, size of HDFS block (dfs.block.size)

Map

  • spills
    • io.sort.mb -- size of the output buffer. Increase if your output size exceeds input size.

    • io.sort.spill.percent and io.sort.record.percent -- turn down for skinny rows

      • at io.sort.mb = 250, 85% = 212.5 (data) and 2,457,600 16-byte records. Spill happens at 80% of 212.5, or 170. If your data is larger than this, or you are outputting more records than this, you'll spill.
      • for 128 MB, io.sort.mb should be at least 190 MB
    • mapred.local.dir -- stripe across many local drives

Turn up block size for large files. Note that this will lead to spill.

ACMurthy: "Applications should ensure that each reduce should process at least 1-2 GB of data, and at most 5-10GB of data, in most scenarios"

  • other

    • mapred.map.tasks.speculative.execution
    • mapred.reduce.tasks.speculative.execution
    • mapred.job.reuse.jvm.num.tasks = -1 -- to the number of times to reuse a JVM for the same map or reduce transform -or to -1 to reuse without limits. This reduces JVM startup/teardown times.
  • map-side merge

    • io.sort.factor -- number of streams to merge at once
  • copy

    • mapred.compress.map.output and mapred.map.output.compression.codec

    • tracker.http.threads > (# reducers) * (mapred.reduce.parallel.copies)

    • dfs.namenode.handler.count

    • mapred.job.tracker.handler.count

    • dfs.datanode.handler.count

    • dfs.datanode.max.xceivers / dfs.datanode.max.xcievers -- limit on the number of files. Increase to 4096. It's misspelled, so we set both.

    • dfs.datanode.socket.write.timeout

Reduce

  • merge
    • mapred.reduce.parallel.copies -- number of threads to copy inputs
    • mapred.job.shuffle.input.buffer.percent
    • merge starts when mapred.job.shuffle.merge.percent OR mapred.inmem.merge.threshold is exceeded
    • mapred.job.reduce.input.buffer.percent -- keep sorted input in memory
    • mapred.reduce.copy.backoff

Memory

  • mapred.tasktracker.map.tasks.maximum
  • mapred.tasktracker.reduce.tasks.maximum
  • mapred.child.java.opts, mapred.reduce.child.java.opts, mapred.map.child.java.opts

Experiments:

ex1: constant input, sampled map output, distinct

1x m1.xlarge master, 5x m1.xlarge worker, 6 map (1200m) 2 red (2400m) java: -Xmx2400m -Xss128k -XX:+UseCompressedOops -XX:MaxNewSize=200m -server

daemon heap 1000, jobtracker heap 3072

Used 360GB input data, emitted X/60 sample of the data:

    twitter_out_04	    20742179859	        19.3 GB
    twitter_out_05	    26011764400	        24.2 GB
    twitter_out_07	    36689819407	        34.2 GB
    twitter_out_10	    54037083442	        50.3 GB
    twitter_out_12	    65942075457	        61.4 GB
    twitter_out_15	    83782104433	        78.0 GB
    twitter_out_18	   101422765493	        94.5 GB
    twitter_out_20	   113325576268	       105.5 GB
    twitter_out_30	   173331942369	       161.4 GB

ex2:

io.sort.mb 250 io.sort.factor 25
mapred.inmem.merge.threshold 0
mapred.job.shuffle.merge.percent 0.66 mapred.job.shuffle.input.buffer.percent 0.7 mapred.job.reduce.input.buffer.percent 0.0 mapred.job.reduce.markreset.buffer.percent 0.3

  • ex 1: 0005 19gb DISTINCT mapred.job.reduce.input.buffer.percent 0
  • ex 2: 0007 106gb DISTINCT mapred.job.reduce.input.buffer.percent 0.5
  • ex 3: 0008 106gb DISTINCT mapred.job.reduce.input.buffer.percent 0.7
  • ex 4: 0009 19gb GROUP/FLATTEN
  • ex 5: 0010 7gb GROUP/FLATTEN
  • ex 6: 0011 50gb GROUP/FLATTEN
  • ex 7: 0012 50GB DISTINCT
  • ex 8: 0013 50GB DISTINCT
  • ex 9: 0014 106GB GROUP/FLATTEN
  • ex10: 0015 106GB DISTINCT
  • ex11: 0016 333GB DISTINCT

io.sort.mb 200 io.sort.factor 10
mapred.inmem.merge.threshold 0 mapred.job.shuffle.merge.percent 0.66 mapred.job.shuffle.input.buffer.percent 0.70 mapred.job.reduce.input.buffer.percent 0.70 mapred.reduce.parallel.copies 30

Ratio of map tasks to reduce tasks => how many map spills


HDFS Write Speed

http://old.nabble.com/HDFS-read-write-speeds,-and-read-optimization-td22980463.html

For comparison, on a 1400 node cluster, I can checksum 100 TB in 
around 10 minutes, which means I'm seeing read averages of roughly 166
GB/sec. For writes with replication of 3, I see roughly 40-50 minutes 
to write 100TB, so roughly 33 GB/sec average. Of course the peaks are 
much higher. Each node has 4 SATA disks, dual quad core, and 8 GB of ram.

-- Owen

Pig config variables

    pig.tmpfilecompression          true
    pig.tmpfilecompression.codec    org.apache.hadoop.io.compress.SnappyCodec

    # Specifies the size, in bytes, of data to be processed by a single map. Smaller files are combined untill this size is reached.
    # Set to 64MB (1/2 our HDFS block size)
    pig.maxCombinedSplitSize        67108864

    # Should Pig combine small files so that they are processed as a single map? Processing input (either user input or intermediate input) from multiple small files can be inefficient because a separate map has to be created for each file.
    pig.splitCombination


    If neither "set default parallel" nor the PARALLEL clause are used, Pig sets the number of reducers using a heuristic based on the size of the input data. You can set the values for these properties:

    # Defines the number of input bytes per reduce; default value is 1000*1000*1000 (1GB).
    pig.exec.reducers.bytes.per.reducer

    # Defines the upper bound on the number of reducers; default is 999.
    pig.exec.reducers.max

    # The formula, shown below, is very simple and will improve over time. The
    # computed value takes all inputs within the script into account and applies the
    # computed value to all the jobs within Pig script.
    # 
    # #reducers = MIN (pig.exec.reducers.max, total input size (in bytes) / bytes per reducer)

Clone this wiki locally