diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala similarity index 95% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala index eb9c07e9cf61f..fb33507768e6d 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/FileSegment.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/FileSegment.scala @@ -14,6 +14,6 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util private[streaming] case class FileSegment (path: String, offset: Long, length: Int) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala similarity index 98% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala index efb12b82ae949..a1826959bb7da 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/HdfsUtils.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/HdfsUtils.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataInputStream, FSDataOutputStream, Path} diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala similarity index 95% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala index 4a4578707917b..0bfed99132866 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogManager.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogManager.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util import java.nio.ByteBuffer @@ -25,9 +25,8 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.Path import org.apache.hadoop.fs.permission.FsPermission import org.apache.spark.Logging -import org.apache.spark.streaming.storage.WriteAheadLogManager._ -import org.apache.spark.streaming.util.{Clock, SystemClock} import org.apache.spark.util.Utils +import WriteAheadLogManager._ /** * This class manages write ahead log files. @@ -35,8 +34,8 @@ import org.apache.spark.util.Utils * - Recovers the log files and the reads the recovered records upon failures. * - Cleans up old log files. * - * Uses [[org.apache.spark.streaming.storage.WriteAheadLogWriter]] to write - * and [[org.apache.spark.streaming.storage.WriteAheadLogReader]] to read. + * Uses [[org.apache.spark.streaming.util.WriteAheadLogWriter]] to write + * and [[org.apache.spark.streaming.util.WriteAheadLogReader]] to read. * *@param logDirectory Directory when rotating log files will be created. * @param hadoopConf Hadoop configuration for reading/writing log files. @@ -199,7 +198,7 @@ private[streaming] class WriteAheadLogManager( } } -private[storage] object WriteAheadLogManager { +private[util] object WriteAheadLogManager { case class LogInfo(startTime: Long, endTime: Long, path: String) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala similarity index 93% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala index 912c4308aa8e5..16ad8279528aa 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogRandomReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogRandomReader.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util import java.io.Closeable import java.nio.ByteBuffer @@ -23,7 +23,7 @@ import org.apache.hadoop.conf.Configuration /** * A random access reader for reading write ahead log files written using - * [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. Given the file segment info, + * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. Given the file segment info, * this reads the record (bytebuffer) from the log file. */ private[streaming] class WriteAheadLogRandomReader(path: String, conf: Configuration) diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala similarity index 94% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala index 28b5d352cee01..adc2160fdf130 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogReader.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogReader.scala @@ -14,9 +14,9 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util -import java.io.{EOFException, Closeable} +import java.io.{Closeable, EOFException} import java.nio.ByteBuffer import org.apache.hadoop.conf.Configuration @@ -24,7 +24,7 @@ import org.apache.spark.Logging /** * A reader for reading write ahead log files written using - * [[org.apache.spark.streaming.storage.WriteAheadLogWriter]]. This reads + * [[org.apache.spark.streaming.util.WriteAheadLogWriter]]. This reads * the records (bytebuffers) in the log file sequentially and return them as an * iterator of bytebuffers. */ diff --git a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala similarity index 97% rename from streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala rename to streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala index d4e417cc21faa..ddbb989165f2e 100644 --- a/streaming/src/main/scala/org/apache/spark/streaming/storage/WriteAheadLogWriter.scala +++ b/streaming/src/main/scala/org/apache/spark/streaming/util/WriteAheadLogWriter.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util import java.io._ import java.net.URI @@ -24,7 +24,6 @@ import scala.util.Try import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{FSDataOutputStream, FileSystem} -import org.apache.spark.streaming.storage.FileSegment /** * A writer for writing byte-buffers to a write ahead log file. diff --git a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala similarity index 99% rename from streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala rename to streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala index 726393b3dbc86..577fc81d0688f 100644 --- a/streaming/src/test/scala/org/apache/spark/streaming/storage/WriteAheadLogSuite.scala +++ b/streaming/src/test/scala/org/apache/spark/streaming/util/WriteAheadLogSuite.scala @@ -14,7 +14,7 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.spark.streaming.storage +package org.apache.spark.streaming.util import java.io.{DataInputStream, File, FileInputStream, RandomAccessFile} import java.nio.ByteBuffer @@ -22,18 +22,16 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.concurrent.duration._ import scala.language.implicitConversions +import scala.language.postfixOps import scala.util.Random -import org.scalatest.{BeforeAndAfter, FunSuite} -import org.scalatest.concurrent.Eventually._ - +import WriteAheadLogSuite._ import com.google.common.io.Files import org.apache.commons.io.FileUtils import org.apache.hadoop.conf.Configuration - -import org.apache.spark.streaming.util.ManualClock import org.apache.spark.util.Utils -import WriteAheadLogSuite._ +import org.scalatest.{BeforeAndAfter, FunSuite} +import org.scalatest.concurrent.Eventually._ /** * This testsuite tests all classes related to write ahead logs.