Skip to content

Commit

Permalink
Add example that reads a local file, writes to a DFS path provided by…
Browse files Browse the repository at this point in the history
… th...

...e user, reads the file back from the DFS, and compares word counts on the local and DFS versions. Useful for verifying DFS correctness.

Author: RJ Nowling <[email protected]>

Closes #3347 from rnowling/dfs_read_write_test and squashes the following commits:

af8ccb7 [RJ Nowling] Don't use java.io.File since DFS may not be POSIX-compatible
b0ef9ea [RJ Nowling] Fix string style
07c6132 [RJ Nowling] Fix string style
7d9a8df [RJ Nowling] Fix string style
f74c160 [RJ Nowling] Fix else statement style
b9edf12 [RJ Nowling] Fix spark wc style
44415b9 [RJ Nowling] Fix local wc style
94a4691 [RJ Nowling] Fix space
df59b65 [RJ Nowling] Fix if statements
1b314f0 [RJ Nowling] Add scaladoc
a931d70 [RJ Nowling] Fix import order
0c89558 [RJ Nowling] Add example that reads a local file, writes to a DFS path provided by the user, reads the file back from the DFS, and compares word counts on the local and DFS versions. Useful for verifying DFS correctness.
  • Loading branch information
rnowling authored and Andrew Or committed Jun 19, 2015
1 parent 0c32fc1 commit a985803
Showing 1 changed file with 138 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.examples

import java.io.File

import scala.io.Source._

import org.apache.spark.{SparkContext, SparkConf}
import org.apache.spark.SparkContext._

/**
* Simple test for reading and writing to a distributed
* file system. This example does the following:
*
* 1. Reads local file
* 2. Computes word count on local file
* 3. Writes local file to a DFS
* 4. Reads the file back from the DFS
* 5. Computes word count on the file using Spark
* 6. Compares the word count results
*/
object DFSReadWriteTest {

private var localFilePath: File = new File(".")
private var dfsDirPath: String = ""

private val NPARAMS = 2

private def readFile(filename: String): List[String] = {
val lineIter: Iterator[String] = fromFile(filename).getLines()
val lineList: List[String] = lineIter.toList
lineList
}

private def printUsage(): Unit = {
val usage: String = "DFS Read-Write Test\n" +
"\n" +
"Usage: localFile dfsDir\n" +
"\n" +
"localFile - (string) local file to use in test\n" +
"dfsDir - (string) DFS directory for read/write tests\n"

println(usage)
}

private def parseArgs(args: Array[String]): Unit = {
if (args.length != NPARAMS) {
printUsage()
System.exit(1)
}

var i = 0

localFilePath = new File(args(i))
if (!localFilePath.exists) {
System.err.println("Given path (" + args(i) + ") does not exist.\n")
printUsage()
System.exit(1)
}

if (!localFilePath.isFile) {
System.err.println("Given path (" + args(i) + ") is not a file.\n")
printUsage()
System.exit(1)
}

i += 1
dfsDirPath = args(i)
}

def runLocalWordCount(fileContents: List[String]): Int = {
fileContents.flatMap(_.split(" "))
.flatMap(_.split("\t"))
.filter(_.size > 0)
.groupBy(w => w)
.mapValues(_.size)
.values
.sum
}

def main(args: Array[String]): Unit = {
parseArgs(args)

println("Performing local word count")
val fileContents = readFile(localFilePath.toString())
val localWordCount = runLocalWordCount(fileContents)

println("Creating SparkConf")
val conf = new SparkConf().setAppName("DFS Read Write Test")

println("Creating SparkContext")
val sc = new SparkContext(conf)

println("Writing local file to DFS")
val dfsFilename = dfsDirPath + "/dfs_read_write_test"
val fileRDD = sc.parallelize(fileContents)
fileRDD.saveAsTextFile(dfsFilename)

println("Reading file from DFS and running Word Count")
val readFileRDD = sc.textFile(dfsFilename)

val dfsWordCount = readFileRDD
.flatMap(_.split(" "))
.flatMap(_.split("\t"))
.filter(_.size > 0)
.map(w => (w, 1))
.countByKey()
.values
.sum

sc.stop()

if (localWordCount == dfsWordCount) {
println(s"Success! Local Word Count ($localWordCount) " +
s"and DFS Word Count ($dfsWordCount) agree.")
} else {
println(s"Failure! Local Word Count ($localWordCount) " +
s"and DFS Word Count ($dfsWordCount) disagree.")
}

}
}

2 comments on commit a985803

@davies
Copy link
Contributor

@davies davies commented on a985803 Jun 19, 2015

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@andrewor14 This break master (scala style)

@JoshRosen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's either revert this or push a hotfix ASAP.

Please sign in to comment.