From 00bc81efe8b1f542073d071fc46ee0e6940ccdd1 Mon Sep 17 00:00:00 2001 From: Sean Owen Date: Fri, 28 Mar 2014 21:50:20 +0000 Subject: [PATCH] Remove use of fastutil and replace with use of java.io, spark.util and Guava classes --- core/pom.xml | 4 --- .../spark/broadcast/HttpBroadcast.scala | 9 +++-- .../spark/partial/GroupedCountEvaluator.scala | 32 +++++++---------- .../main/scala/org/apache/spark/rdd/RDD.scala | 28 +++++++-------- .../spark/scheduler/ReplayListenerBus.scala | 5 ++- .../org/apache/spark/scheduler/Task.scala | 9 ++--- .../apache/spark/serializer/Serializer.scala | 9 ++--- .../apache/spark/storage/BlockManager.scala | 10 +++--- .../spark/storage/BlockObjectWriter.scala | 6 ++-- .../org/apache/spark/util/FileLogger.scala | 5 ++- .../org/apache/spark/util/SizeEstimator.scala | 5 ++- .../spark/util/collection/AppendOnlyMap.scala | 6 ++-- .../collection/ExternalAppendOnlyMap.scala | 5 ++- .../spark/util/collection/OpenHashMap.scala | 2 +- .../spark/util/collection/OpenHashSet.scala | 4 +-- pom.xml | 5 --- project/SparkBuild.scala | 1 - .../spark/streaming/util/RawTextHelper.scala | 34 +++++-------------- .../spark/streaming/util/RawTextSender.scala | 11 +++--- 19 files changed, 66 insertions(+), 124 deletions(-) diff --git a/core/pom.xml b/core/pom.xml index 1f808380817c9..a1bdd8ec68aeb 100644 --- a/core/pom.xml +++ b/core/pom.xml @@ -157,10 +157,6 @@ - - it.unimi.dsi - fastutil - colt colt diff --git a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala index f6a8a8af91e4b..29372f16f2cac 100644 --- a/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala +++ b/core/src/main/scala/org/apache/spark/broadcast/HttpBroadcast.scala @@ -18,11 +18,10 @@ package org.apache.spark.broadcast import java.io.{File, FileOutputStream, ObjectInputStream, ObjectOutputStream, OutputStream} -import java.net.{URI, URL, URLConnection} +import java.io.{BufferedInputStream, BufferedOutputStream} +import java.net.{URL, URLConnection, URI} import java.util.concurrent.TimeUnit -import it.unimi.dsi.fastutil.io.{FastBufferedInputStream, FastBufferedOutputStream} - import org.apache.spark.{HttpServer, Logging, SecurityManager, SparkConf, SparkEnv} import org.apache.spark.io.CompressionCodec import org.apache.spark.storage.{BroadcastBlockId, StorageLevel} @@ -164,7 +163,7 @@ private[spark] object HttpBroadcast extends Logging { if (compress) { compressionCodec.compressedOutputStream(new FileOutputStream(file)) } else { - new FastBufferedOutputStream(new FileOutputStream(file), bufferSize) + new BufferedOutputStream(new FileOutputStream(file), bufferSize) } } val ser = SparkEnv.get.serializer.newInstance() @@ -195,7 +194,7 @@ private[spark] object HttpBroadcast extends Logging { if (compress) { compressionCodec.compressedInputStream(inputStream) } else { - new FastBufferedInputStream(inputStream, bufferSize) + new BufferedInputStream(inputStream, bufferSize) } } val ser = SparkEnv.get.serializer.newInstance() diff --git a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala index 40b70baabcad9..925a3de363701 100644 --- a/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala +++ b/core/src/main/scala/org/apache/spark/partial/GroupedCountEvaluator.scala @@ -22,36 +22,33 @@ import java.util.{HashMap => JHashMap} import scala.collection.JavaConversions.mapAsScalaMap import scala.collection.Map import scala.collection.mutable.HashMap +import scala.reflect.ClassTag import cern.jet.stat.Probability -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} + +import org.apache.spark.util.collection.OpenHashMap /** * An ApproximateEvaluator for counts by key. Returns a map of key to confidence interval. */ -private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Double) - extends ApproximateEvaluator[OLMap[T], Map[T, BoundedDouble]] { +private[spark] class GroupedCountEvaluator[T : ClassTag](totalOutputs: Int, confidence: Double) + extends ApproximateEvaluator[OpenHashMap[T,Long], Map[T, BoundedDouble]] { var outputsMerged = 0 - var sums = new OLMap[T] // Sum of counts for each key + var sums = new OpenHashMap[T,Long]() // Sum of counts for each key - override def merge(outputId: Int, taskResult: OLMap[T]) { + override def merge(outputId: Int, taskResult: OpenHashMap[T,Long]) { outputsMerged += 1 - val iter = taskResult.object2LongEntrySet.fastIterator() - while (iter.hasNext) { - val entry = iter.next() - sums.put(entry.getKey, sums.getLong(entry.getKey) + entry.getLongValue) + taskResult.foreach{ case (key,value) => + sums.changeValue(key, value, _ + value) } } override def currentResult(): Map[T, BoundedDouble] = { if (outputsMerged == totalOutputs) { val result = new JHashMap[T, BoundedDouble](sums.size) - val iter = sums.object2LongEntrySet.fastIterator() - while (iter.hasNext) { - val entry = iter.next() - val sum = entry.getLongValue() - result(entry.getKey) = new BoundedDouble(sum, 1.0, sum, sum) + sums.foreach{ case (key,sum) => + result(key) = new BoundedDouble(sum, 1.0, sum, sum) } result } else if (outputsMerged == 0) { @@ -60,16 +57,13 @@ private[spark] class GroupedCountEvaluator[T](totalOutputs: Int, confidence: Dou val p = outputsMerged.toDouble / totalOutputs val confFactor = Probability.normalInverse(1 - (1 - confidence) / 2) val result = new JHashMap[T, BoundedDouble](sums.size) - val iter = sums.object2LongEntrySet.fastIterator() - while (iter.hasNext) { - val entry = iter.next() - val sum = entry.getLongValue + sums.foreach{ case (key, sum) => val mean = (sum + 1 - p) / p val variance = (sum + 1) * (1 - p) / (p * p) val stdev = math.sqrt(variance) val low = mean - confFactor * stdev val high = mean + confFactor * stdev - result(entry.getKey) = new BoundedDouble(mean, confidence, low, high) + result(key) = new BoundedDouble(mean, confidence, low, high) } result } diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala index 3437b2cac19c2..80270856f8777 100644 --- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala +++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala @@ -25,7 +25,6 @@ import scala.collection.mutable.ArrayBuffer import scala.reflect.{classTag, ClassTag} import com.clearspring.analytics.stream.cardinality.HyperLogLog -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} import org.apache.hadoop.io.BytesWritable import org.apache.hadoop.io.compress.CompressionCodec import org.apache.hadoop.io.NullWritable @@ -43,6 +42,7 @@ import org.apache.spark.partial.GroupedCountEvaluator import org.apache.spark.partial.PartialResult import org.apache.spark.storage.StorageLevel import org.apache.spark.util.{BoundedPriorityQueue, SerializableHyperLogLog, Utils} +import org.apache.spark.util.collection.OpenHashMap import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler} /** @@ -834,19 +834,16 @@ abstract class RDD[T: ClassTag]( throw new SparkException("countByValue() does not support arrays") } // TODO: This should perhaps be distributed by default. - def countPartition(iter: Iterator[T]): Iterator[OLMap[T]] = { - val map = new OLMap[T] - while (iter.hasNext) { - val v = iter.next() - map.put(v, map.getLong(v) + 1L) + def countPartition(iter: Iterator[T]): Iterator[OpenHashMap[T,Long]] = { + val map = new OpenHashMap[T,Long] + iter.foreach{ + t => map.changeValue(t, 1L, _ + 1L) } Iterator(map) } - def mergeMaps(m1: OLMap[T], m2: OLMap[T]): OLMap[T] = { - val iter = m2.object2LongEntrySet.fastIterator() - while (iter.hasNext) { - val entry = iter.next() - m1.put(entry.getKey, m1.getLong(entry.getKey) + entry.getLongValue) + def mergeMaps(m1: OpenHashMap[T,Long], m2: OpenHashMap[T,Long]): OpenHashMap[T,Long] = { + m2.foreach{ case (key, value) => + m1.changeValue(key, value, _ + value) } m1 } @@ -866,11 +863,10 @@ abstract class RDD[T: ClassTag]( if (elementClassTag.runtimeClass.isArray) { throw new SparkException("countByValueApprox() does not support arrays") } - val countPartition: (TaskContext, Iterator[T]) => OLMap[T] = { (ctx, iter) => - val map = new OLMap[T] - while (iter.hasNext) { - val v = iter.next() - map.put(v, map.getLong(v) + 1L) + val countPartition: (TaskContext, Iterator[T]) => OpenHashMap[T,Long] = { (ctx, iter) => + val map = new OpenHashMap[T,Long] + iter.foreach{ + t => map.changeValue(t, 1L, _ + 1L) } map } diff --git a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala index b03665fd56d33..f868e772cf58a 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/ReplayListenerBus.scala @@ -17,11 +17,10 @@ package org.apache.spark.scheduler -import java.io.InputStream +import java.io.{BufferedInputStream, InputStream} import scala.io.Source -import it.unimi.dsi.fastutil.io.FastBufferedInputStream import org.apache.hadoop.fs.{Path, FileSystem} import org.json4s.jackson.JsonMethods._ @@ -62,7 +61,7 @@ private[spark] class ReplayListenerBus( var currentLine = "" try { fileStream = Some(fileSystem.open(path)) - bufferedStream = Some(new FastBufferedInputStream(fileStream.get)) + bufferedStream = Some(new BufferedInputStream(fileStream.get)) compressStream = Some(wrapForCompression(bufferedStream.get)) // Parse each line as an event and post the event to all attached listeners diff --git a/core/src/main/scala/org/apache/spark/scheduler/Task.scala b/core/src/main/scala/org/apache/spark/scheduler/Task.scala index b85b4a50cd93a..a8bcb7dfe2f3c 100644 --- a/core/src/main/scala/org/apache/spark/scheduler/Task.scala +++ b/core/src/main/scala/org/apache/spark/scheduler/Task.scala @@ -17,13 +17,11 @@ package org.apache.spark.scheduler -import java.io.{DataInputStream, DataOutputStream} +import java.io.{ByteArrayOutputStream, DataInputStream, DataOutputStream} import java.nio.ByteBuffer import scala.collection.mutable.HashMap -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - import org.apache.spark.TaskContext import org.apache.spark.executor.TaskMetrics import org.apache.spark.serializer.SerializerInstance @@ -104,7 +102,7 @@ private[spark] object Task { serializer: SerializerInstance) : ByteBuffer = { - val out = new FastByteArrayOutputStream(4096) + val out = new ByteArrayOutputStream(4096) val dataOut = new DataOutputStream(out) // Write currentFiles @@ -125,8 +123,7 @@ private[spark] object Task { dataOut.flush() val taskBytes = serializer.serialize(task).array() out.write(taskBytes) - out.trim() - ByteBuffer.wrap(out.array) + ByteBuffer.wrap(out.toByteArray) } /** diff --git a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala index 9f04dc6e427c0..f2c8f9b6218d6 100644 --- a/core/src/main/scala/org/apache/spark/serializer/Serializer.scala +++ b/core/src/main/scala/org/apache/spark/serializer/Serializer.scala @@ -17,11 +17,9 @@ package org.apache.spark.serializer -import java.io.{EOFException, InputStream, OutputStream} +import java.io.{ByteArrayOutputStream, EOFException, InputStream, OutputStream} import java.nio.ByteBuffer -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - import org.apache.spark.SparkEnv import org.apache.spark.annotation.DeveloperApi import org.apache.spark.util.{ByteBufferInputStream, NextIterator} @@ -73,10 +71,9 @@ trait SerializerInstance { def serializeMany[T](iterator: Iterator[T]): ByteBuffer = { // Default implementation uses serializeStream - val stream = new FastByteArrayOutputStream() + val stream = new ByteArrayOutputStream() serializeStream(stream).writeAll(iterator) - val buffer = ByteBuffer.allocate(stream.position.toInt) - buffer.put(stream.array, 0, stream.position.toInt) + val buffer = ByteBuffer.wrap(stream.toByteArray) buffer.flip() buffer } diff --git a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala index df9bb4044e37a..f14017051fa07 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockManager.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockManager.scala @@ -17,7 +17,7 @@ package org.apache.spark.storage -import java.io.{File, InputStream, OutputStream} +import java.io.{File, InputStream, OutputStream, BufferedOutputStream, ByteArrayOutputStream} import java.nio.{ByteBuffer, MappedByteBuffer} import scala.collection.mutable.{ArrayBuffer, HashMap} @@ -26,7 +26,6 @@ import scala.concurrent.duration._ import scala.util.Random import akka.actor.{ActorSystem, Cancellable, Props} -import it.unimi.dsi.fastutil.io.{FastBufferedOutputStream, FastByteArrayOutputStream} import sun.nio.ch.DirectBuffer import org.apache.spark.{Logging, MapOutputTracker, SecurityManager, SparkConf, SparkEnv, SparkException} @@ -992,7 +991,7 @@ private[spark] class BlockManager( outputStream: OutputStream, values: Iterator[Any], serializer: Serializer = defaultSerializer) { - val byteStream = new FastBufferedOutputStream(outputStream) + val byteStream = new BufferedOutputStream(outputStream) val ser = serializer.newInstance() ser.serializeStream(wrapForCompression(blockId, byteStream)).writeAll(values).close() } @@ -1002,10 +1001,9 @@ private[spark] class BlockManager( blockId: BlockId, values: Iterator[Any], serializer: Serializer = defaultSerializer): ByteBuffer = { - val byteStream = new FastByteArrayOutputStream(4096) + val byteStream = new ByteArrayOutputStream(4096) dataSerializeStream(blockId, byteStream, values, serializer) - byteStream.trim() - ByteBuffer.wrap(byteStream.array) + ByteBuffer.wrap(byteStream.toByteArray) } /** diff --git a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala index 696b930a26b9e..a2687e6be4e34 100644 --- a/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala +++ b/core/src/main/scala/org/apache/spark/storage/BlockObjectWriter.scala @@ -17,11 +17,9 @@ package org.apache.spark.storage -import java.io.{FileOutputStream, File, OutputStream} +import java.io.{BufferedOutputStream, FileOutputStream, File, OutputStream} import java.nio.channels.FileChannel -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream - import org.apache.spark.Logging import org.apache.spark.serializer.{SerializationStream, Serializer} @@ -119,7 +117,7 @@ private[spark] class DiskBlockObjectWriter( ts = new TimeTrackingOutputStream(fos) channel = fos.getChannel() lastValidPosition = initialPosition - bs = compressStream(new FastBufferedOutputStream(ts, bufferSize)) + bs = compressStream(new BufferedOutputStream(ts, bufferSize)) objOut = serializer.newInstance().serializeStream(bs) initialized = true this diff --git a/core/src/main/scala/org/apache/spark/util/FileLogger.scala b/core/src/main/scala/org/apache/spark/util/FileLogger.scala index 0080a8b342b05..68a12e8ed67d7 100644 --- a/core/src/main/scala/org/apache/spark/util/FileLogger.scala +++ b/core/src/main/scala/org/apache/spark/util/FileLogger.scala @@ -17,12 +17,11 @@ package org.apache.spark.util -import java.io._ +import java.io.{FileOutputStream, BufferedOutputStream, PrintWriter, IOException} import java.net.URI import java.text.SimpleDateFormat import java.util.Date -import it.unimi.dsi.fastutil.io.FastBufferedOutputStream import org.apache.hadoop.fs.{FSDataOutputStream, Path} import org.apache.spark.{Logging, SparkConf} @@ -100,7 +99,7 @@ private[spark] class FileLogger( hadoopDataStream.get } - val bstream = new FastBufferedOutputStream(dstream, outputBufferSize) + val bstream = new BufferedOutputStream(dstream, outputBufferSize) val cstream = if (compress) compressionCodec.compressedOutputStream(bstream) else bstream new PrintWriter(cstream) } diff --git a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala index b955612ca7749..08465575309c6 100644 --- a/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala +++ b/core/src/main/scala/org/apache/spark/util/SizeEstimator.scala @@ -27,9 +27,8 @@ import java.util.concurrent.ConcurrentHashMap import scala.collection.mutable.ArrayBuffer -import it.unimi.dsi.fastutil.ints.IntOpenHashSet - import org.apache.spark.Logging +import org.apache.spark.util.collection.OpenHashSet /** * Estimates the sizes of Java objects (number of bytes of memory they occupy), for use in @@ -207,7 +206,7 @@ private[spark] object SizeEstimator extends Logging { // Estimate the size of a large array by sampling elements without replacement. var size = 0.0 val rand = new Random(42) - val drawn = new IntOpenHashSet(ARRAY_SAMPLE_SIZE) + val drawn = new OpenHashSet[Int](ARRAY_SAMPLE_SIZE) for (i <- 0 until ARRAY_SAMPLE_SIZE) { var index = 0 do { diff --git a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala index 025492b177a77..7839fb8e414d0 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/AppendOnlyMap.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.collection import java.util.{Arrays, Comparator} +import com.google.common.hash.Hashing import org.apache.spark.annotation.DeveloperApi @@ -199,11 +200,8 @@ class AppendOnlyMap[K, V](initialCapacity: Int = 64) /** * Re-hash a value to deal better with hash functions that don't differ in the lower bits. - * We use the Murmur Hash 3 finalization step that's also used in fastutil. */ - private def rehash(h: Int): Int = { - it.unimi.dsi.fastutil.HashCommon.murmurHash3(h) - } + private def rehash(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt() /** Double the table's size and re-hash everything */ protected def growTable() { diff --git a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala index dd01ae821f705..d615767284c0b 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/ExternalAppendOnlyMap.scala @@ -17,14 +17,13 @@ package org.apache.spark.util.collection -import java.io._ +import java.io.{InputStream, BufferedInputStream, FileInputStream, File, Serializable, EOFException} import java.util.Comparator import scala.collection.mutable import scala.collection.mutable.ArrayBuffer import com.google.common.io.ByteStreams -import it.unimi.dsi.fastutil.io.FastBufferedInputStream import org.apache.spark.{Logging, SparkEnv} import org.apache.spark.annotation.DeveloperApi @@ -350,7 +349,7 @@ class ExternalAppendOnlyMap[K, V, C]( private class DiskMapIterator(file: File, blockId: BlockId, batchSizes: ArrayBuffer[Long]) extends Iterator[(K, C)] { private val fileStream = new FileInputStream(file) - private val bufferedStream = new FastBufferedInputStream(fileStream, fileBufferSize) + private val bufferedStream = new BufferedInputStream(fileStream, fileBufferSize) // An intermediate stream that reads from exactly one batch // This guards against pre-fetching and other arbitrary behavior of higher level streams diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala index 62f99f3981793..487791963d01d 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashMap.scala @@ -30,7 +30,7 @@ import org.apache.spark.annotation.DeveloperApi * Under the hood, it uses our OpenHashSet implementation. */ @DeveloperApi -class OpenHashMap[K >: Null : ClassTag, @specialized(Long, Int, Double) V: ClassTag]( +class OpenHashMap[K : ClassTag, @specialized(Long, Int, Double) V: ClassTag]( initialCapacity: Int) extends Iterable[(K, V)] with Serializable { diff --git a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala index 148c12e64d2ce..19af4f8cbe428 100644 --- a/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala +++ b/core/src/main/scala/org/apache/spark/util/collection/OpenHashSet.scala @@ -18,6 +18,7 @@ package org.apache.spark.util.collection import scala.reflect._ +import com.google.common.hash.Hashing /** * A simple, fast hash set optimized for non-null insertion-only use case, where keys are never @@ -256,9 +257,8 @@ class OpenHashSet[@specialized(Long, Int) T: ClassTag]( /** * Re-hash a value to deal better with hash functions that don't differ in the lower bits. - * We use the Murmur Hash 3 finalization step that's also used in fastutil. */ - private def hashcode(h: Int): Int = it.unimi.dsi.fastutil.HashCommon.murmurHash3(h) + private def hashcode(h: Int): Int = Hashing.murmur3_32().hashInt(h).asInt() private def nextPowerOf2(n: Int): Int = { val highBit = Integer.highestOneBit(n) diff --git a/pom.xml b/pom.xml index c03bb35c99442..5f66cbe768592 100644 --- a/pom.xml +++ b/pom.xml @@ -348,11 +348,6 @@ - - it.unimi.dsi - fastutil - 6.4.4 - colt colt diff --git a/project/SparkBuild.scala b/project/SparkBuild.scala index 21163760e6277..a6058bba3d211 100644 --- a/project/SparkBuild.scala +++ b/project/SparkBuild.scala @@ -331,7 +331,6 @@ object SparkBuild extends Build { "org.spark-project.akka" %% "akka-slf4j" % akkaVersion excludeAll(excludeNetty), "org.spark-project.akka" %% "akka-testkit" % akkaVersion % "test", "org.json4s" %% "json4s-jackson" % "3.2.6" excludeAll(excludeScalap), - "it.unimi.dsi" % "fastutil" % "6.4.4", "colt" % "colt" % "1.2.0", "org.apache.mesos" % "mesos" % "0.13.0", "commons-net" % "commons-net" % "2.2", diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala index bd1df55cf70f5..dcbe6bd6e4c08 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextHelper.scala @@ -19,40 +19,22 @@ package org.apache.spark.streaming.util import org.apache.spark.SparkContext import org.apache.spark.SparkContext._ -import it.unimi.dsi.fastutil.objects.{Object2LongOpenHashMap => OLMap} +import org.apache.spark.util.collection.OpenHashMap import scala.collection.JavaConversions.mapAsScalaMap private[streaming] object RawTextHelper { - /** - * Splits lines and counts the words in them using specialized object-to-long hashmap - * (to avoid boxing-unboxing overhead of Long in java/scala HashMap) + /** + * Splits lines and counts the words. */ def splitAndCountPartitions(iter: Iterator[String]): Iterator[(String, Long)] = { - val map = new OLMap[String] - var i = 0 - var j = 0 - while (iter.hasNext) { - val s = iter.next() - i = 0 - while (i < s.length) { - j = i - while (j < s.length && s.charAt(j) != ' ') { - j += 1 - } - if (j > i) { - val w = s.substring(i, j) - val c = map.getLong(w) - map.put(w, c + 1) - } - i = j - while (i < s.length && s.charAt(i) == ' ') { - i += 1 - } - } + val map = new OpenHashMap[String,Long] + val tokenized = iter.flatMap(_.split(" ").filterNot(_.isEmpty)) + tokenized.foreach{ s => + map.changeValue(s, 1L, _ + 1L) } - map.toIterator.map{case (k, v) => (k, v)} + map.iterator } /** diff --git a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala index 684b38e8b3102..a7850812bd612 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/RawTextSender.scala @@ -17,14 +17,12 @@ package org.apache.spark.streaming.util -import java.io.IOException +import java.io.{ByteArrayOutputStream, IOException} import java.net.ServerSocket import java.nio.ByteBuffer import scala.io.Source -import it.unimi.dsi.fastutil.io.FastByteArrayOutputStream - import org.apache.spark.{SparkConf, Logging} import org.apache.spark.serializer.KryoSerializer import org.apache.spark.util.IntParam @@ -45,16 +43,15 @@ object RawTextSender extends Logging { // Repeat the input data multiple times to fill in a buffer val lines = Source.fromFile(file).getLines().toArray - val bufferStream = new FastByteArrayOutputStream(blockSize + 1000) + val bufferStream = new ByteArrayOutputStream(blockSize + 1000) val ser = new KryoSerializer(new SparkConf()).newInstance() val serStream = ser.serializeStream(bufferStream) var i = 0 - while (bufferStream.position < blockSize) { + while (bufferStream.size < blockSize) { serStream.writeObject(lines(i)) i = (i + 1) % lines.length } - bufferStream.trim() - val array = bufferStream.array + val array = bufferStream.toByteArray val countBuf = ByteBuffer.wrap(new Array[Byte](4)) countBuf.putInt(array.length)