Skip to content

Commit

Permalink
Remove use of fastutil and replace with use of java.io, spark.util an…
Browse files Browse the repository at this point in the history
…d Guava classes
  • Loading branch information
srowen committed Apr 11, 2014
1 parent 98225a6 commit 00bc81e
Show file tree
Hide file tree
Showing 19 changed files with 66 additions and 124 deletions.
4 changes: 0 additions & 4 deletions core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -157,10 +157,6 @@
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>it.unimi.dsi</groupId>
<artifactId>fastutil</artifactId>
</dependency>
<dependency>
<groupId>colt</groupId>
<artifactId>colt</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
package org.apache.spark.broadcast

import java.io.{File, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream}
import java.net.{URI, URL, URLConnection}
import java.io.{BufferedInputStream, BufferedOutputStream}
import java.net.{URL, URLConnection, URI}
import java.util.concurrent.TimeUnit

import it.unimi.dsi.fastutil.io.{FastBufferedInputStream, FastBufferedOutputStream}

import org.apache.spark.{HttpServer, Logging, SecurityManager, SparkConf, SparkEnv}
import org.apache.spark.io.CompressionCodec
import org.apache.spark.storage.{BroadcastBlockId, StorageLevel}
Expand Down Expand Up @@ -164,7 +163,7 @@ private[spark] object HttpBroadcast extends Logging {
if (compress) {
compressionCodec.compressedOutputStream(new FileOutputStream(file))
} else {
new FastBufferedOutputStream(new FileOutputStream(file), bufferSize)
new BufferedOutputStream(new FileOutputStream(file), bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
Expand Down Expand Up @@ -195,7 +194,7 @@ private[spark] object HttpBroadcast extends Logging {
if (compress) {
compressionCodec.compressedInputStream(inputStream)
} else {
new FastBufferedInputStream(inputStream, bufferSize)
new BufferedInputStream(inputStream, bufferSize)
}
}
val ser = SparkEnv.get.serializer.newInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,36 +22,33 @@ import java.util.{HashMap => JHashMap}
import scala.collection.JavaConversions.mapAsScalaMap
import scala.collection.Map
import scala.collection.mutable.HashMap
import scala.reflect.ClassTag

import cern.jet.stat.Probability
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}

import org.apache.spark.util.collection.OpenHashMap

/**
* An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval.
*/
private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] {
private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double)
extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] {

var outputsMerged = 0
var sums = new OLMap[T] // Sum of counts for each key
var sums = new OpenHashMap[T,Long]() // Sum of counts for each key

override def merge(outputId: Int, taskResult: OLMap[T]) {
override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) {
outputsMerged += 1
val iter = taskResult.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue)
taskResult.foreach{ case (key,value) =>
sums.changeValue(key, value, _ + value)
}
}

override def currentResult(): Map[T, BoundedDouble] = {
if (outputsMerged == totalOutputs) {
val result = new JHashMap[T, BoundedDouble](sums.size)
val iter = sums.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
val sum = entry.getLongValue()
result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum)
sums.foreach{ case (key,sum) =>
result(key) = new BoundedDouble(sum, 1.0, sum, sum)
}
result
} else if (outputsMerged == 0) {
Expand All @@ -60,16 +57,13 @@ private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Dou
val p = outputsMerged.toDouble / totalOutputs
val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2)
val result = new JHashMap[T, BoundedDouble](sums.size)
val iter = sums.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
val sum = entry.getLongValue
sums.foreach{ case (key, sum) =>
val mean = (sum + 1 - p) / p
val variance = (sum + 1) * (1 - p) / (p * p)
val stdev = math.sqrt(variance)
val low = mean - confFactor * stdev
val high = mean + confFactor * stdev
result(entry.getKey) = new BoundedDouble(mean, confidence, low, high)
result(key) = new BoundedDouble(mean, confidence, low, high)
}
result
}
Expand Down
28 changes: 12 additions & 16 deletions core/src/main/scala/org/apache/spark/rdd/RDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer
import scala.reflect.{classTag, ClassTag}

import com.clearspring.analytics.stream.cardinality.HyperLogLog
import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap}
import org.apache.hadoop.io.BytesWritable
import org.apache.hadoop.io.compress.CompressionCodec
import org.apache.hadoop.io.NullWritable
Expand All @@ -43,6 +42,7 @@ import org.apache.spark.partial.GroupedCountEvaluator
import org.apache.spark.partial.PartialResult
import org.apache.spark.storage.StorageLevel
import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils}
import org.apache.spark.util.collection.OpenHashMap
import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler}

/**
Expand Down Expand Up @@ -834,19 +834,16 @@ abstract class RDD[T: ClassTag](
throw new SparkException("countByValue() does not support arrays")
}
// TODO: This should perhaps be distributed by default.
def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = {
val map = new OLMap[T]
while (iter.hasNext) {
val v = iter.next()
map.put(v, map.getLong(v) + 1L)
def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = {
val map = new OpenHashMap[T,Long]
iter.foreach{
t => map.changeValue(t, 1L, _ + 1L)
}
Iterator(map)
}
def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = {
val iter = m2.object2LongEntrySet.fastIterator()
while (iter.hasNext) {
val entry = iter.next()
m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue)
def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = {
m2.foreach{ case (key, value) =>
m1.changeValue(key, value, _ + value)
}
m1
}
Expand All @@ -866,11 +863,10 @@ abstract class RDD[T: ClassTag](
if (elementClassTag.runtimeClass.isArray) {
throw new SparkException("countByValueApprox() does not support arrays")
}
val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) =>
val map = new OLMap[T]
while (iter.hasNext) {
val v = iter.next()
map.put(v, map.getLong(v) + 1L)
val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) =>
val map = new OpenHashMap[T,Long]
iter.foreach{
t => map.changeValue(t, 1L, _ + 1L)
}
map
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@

package org.apache.spark.scheduler

import java.io.InputStream
import java.io.{BufferedInputStream, InputStream}

import scala.io.Source

import it.unimi.dsi.fastutil.io.FastBufferedInputStream
import org.apache.hadoop.fs.{Path, FileSystem}
import org.json4s.jackson.JsonMethods._

Expand Down Expand Up @@ -62,7 +61,7 @@ private[spark] class ReplayListenerBus(
var currentLine = "<not started>"
try {
fileStream = Some(fileSystem.open(path))
bufferedStream = Some(new FastBufferedInputStream(fileStream.get))
bufferedStream = Some(new BufferedInputStream(fileStream.get))
compressStream = Some(wrapForCompression(bufferedStream.get))

// Parse each line as an event and post the event to all attached listeners
Expand Down
9 changes: 3 additions & 6 deletions core/src/main/scala/org/apache/spark/scheduler/Task.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,11 @@

package org.apache.spark.scheduler

import java.io.{DataInputStream, DataOutputStream}
import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream}
import java.nio.ByteBuffer

import scala.collection.mutable.HashMap

import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream

import org.apache.spark.TaskContext
import org.apache.spark.executor.TaskMetrics
import org.apache.spark.serializer.SerializerInstance
Expand Down Expand Up @@ -104,7 +102,7 @@ private[spark] object Task {
serializer: SerializerInstance)
: ByteBuffer = {

val out = new FastByteArrayOutputStream(4096)
val out = new ByteArrayOutputStream(4096)
val dataOut = new DataOutputStream(out)

// Write currentFiles
Expand All @@ -125,8 +123,7 @@ private[spark] object Task {
dataOut.flush()
val taskBytes = serializer.serialize(task).array()
out.write(taskBytes)
out.trim()
ByteBuffer.wrap(out.array)
ByteBuffer.wrap(out.toByteArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.serializer

import java.io.{EOFException, InputStream, OutputStream}
import java.io.{ByteArrayOutputStream, EOFException, InputStream, OutputStream}
import java.nio.ByteBuffer

import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream

import org.apache.spark.SparkEnv
import org.apache.spark.annotation.DeveloperApi
import org.apache.spark.util.{ByteBufferInputStream, NextIterator}
Expand Down Expand Up @@ -73,10 +71,9 @@ trait SerializerInstance {

def serializeMany[T](iterator: Iterator[T]): ByteBuffer = {
// Default implementation uses serializeStream
val stream = new FastByteArrayOutputStream()
val stream = new ByteArrayOutputStream()
serializeStream(stream).writeAll(iterator)
val buffer = ByteBuffer.allocate(stream.position.toInt)
buffer.put(stream.array, 0, stream.position.toInt)
val buffer = ByteBuffer.wrap(stream.toByteArray)
buffer.flip()
buffer
}
Expand Down
10 changes: 4 additions & 6 deletions core/src/main/scala/org/apache/spark/storage/BlockManager.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.spark.storage

import java.io.{File, InputStream, OutputStream}
import java.io.{File, InputStream, OutputStream, BufferedOutputStream, ByteArrayOutputStream}
import java.nio.{ByteBuffer, MappedByteBuffer}

import scala.collection.mutable.{ArrayBuffer, HashMap}
Expand All @@ -26,7 +26,6 @@ import scala.concurrent.duration._
import scala.util.Random

import akka.actor.{ActorSystem, Cancellable, Props}
import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream}
import sun.nio.ch.DirectBuffer

import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, SparkConf, SparkEnv, SparkException}
Expand Down Expand Up @@ -992,7 +991,7 @@ private[spark] class BlockManager(
outputStream: OutputStream,
values: Iterator[Any],
serializer: Serializer = defaultSerializer) {
val byteStream = new FastBufferedOutputStream(outputStream)
val byteStream = new BufferedOutputStream(outputStream)
val ser = serializer.newInstance()
ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close()
}
Expand All @@ -1002,10 +1001,9 @@ private[spark] class BlockManager(
blockId: BlockId,
values: Iterator[Any],
serializer: Serializer = defaultSerializer): ByteBuffer = {
val byteStream = new FastByteArrayOutputStream(4096)
val byteStream = new ByteArrayOutputStream(4096)
dataSerializeStream(blockId, byteStream, values, serializer)
byteStream.trim()
ByteBuffer.wrap(byteStream.array)
ByteBuffer.wrap(byteStream.toByteArray)
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,9 @@

package org.apache.spark.storage

import java.io.{FileOutputStream, File, OutputStream}
import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream}
import java.nio.channels.FileChannel

import it.unimi.dsi.fastutil.io.FastBufferedOutputStream

import org.apache.spark.Logging
import org.apache.spark.serializer.{SerializationStream, Serializer}

Expand Down Expand Up @@ -119,7 +117,7 @@ private[spark] class DiskBlockObjectWriter(
ts = new TimeTrackingOutputStream(fos)
channel = fos.getChannel()
lastValidPosition = initialPosition
bs = compressStream(new FastBufferedOutputStream(ts, bufferSize))
bs = compressStream(new BufferedOutputStream(ts, bufferSize))
objOut = serializer.newInstance().serializeStream(bs)
initialized = true
this
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/util/FileLogger.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,11 @@

package org.apache.spark.util

import java.io._
import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException}
import java.net.URI
import java.text.SimpleDateFormat
import java.util.Date

import it.unimi.dsi.fastutil.io.FastBufferedOutputStream
import org.apache.hadoop.fs.{FSDataOutputStream, Path}

import org.apache.spark.{Logging, SparkConf}
Expand Down Expand Up @@ -100,7 +99,7 @@ private[spark] class FileLogger(
hadoopDataStream.get
}

val bstream = new FastBufferedOutputStream(dstream, outputBufferSize)
val bstream = new BufferedOutputStream(dstream, outputBufferSize)
val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream
new PrintWriter(cstream)
}
Expand Down
5 changes: 2 additions & 3 deletions core/src/main/scala/org/apache/spark/util/SizeEstimator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,8 @@ import java.util.concurrent.ConcurrentHashMap

import scala.collection.mutable.ArrayBuffer

import it.unimi.dsi.fastutil.ints.IntOpenHashSet

import org.apache.spark.Logging
import org.apache.spark.util.collection.OpenHashSet

/**
* Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in
Expand Down Expand Up @@ -207,7 +206,7 @@ private[spark] object SizeEstimator extends Logging {
// Estimate the size of a large array by sampling elements without replacement.
var size = 0.0
val rand = new Random(42)
val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE)
val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE)
for (i <- 0 until ARRAY_SAMPLE_SIZE) {
var index = 0
do {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.util.collection

import java.util.{Arrays, Comparator}
import com.google.common.hash.Hashing

import org.apache.spark.annotation.DeveloperApi

Expand Down Expand Up @@ -199,11 +200,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64)

/**
* Re-hash a value to deal better with hash functions that don't differ in the lower bits.
* We use the Murmur Hash 3 finalization step that's also used in fastutil.
*/
private def rehash(h: Int): Int = {
it.unimi.dsi.fastutil.HashCommon.murmurHash3(h)
}
private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt()

/** Double the table's size and re-hash everything */
protected def growTable() {
Expand Down
Loading

0 comments on commit 00bc81e

Please sign in to comment.