Skip to content

Commit

Permalink
check exclusive of primary-py-file and primary-r-file
Browse files Browse the repository at this point in the history
add return type and use change GroupedData.toDF to private[sql]
  • Loading branch information
Davies Liu committed Apr 7, 2015
1 parent 55808e4 commit 59266d1
Show file tree
Hide file tree
Showing 7 changed files with 15 additions and 8 deletions.
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/api/r/RBackend.scala
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ private[spark] class RBackend {
}

private[spark] object RBackend extends Logging {
def main(args: Array[String]) {
def main(args: Array[String]): Unit = {
if (args.length < 1) {
System.err.println("Usage: RBackend <tempFilePath>")
System.exit(-1)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/api/r/RRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ private abstract class BaseRRDD[T: ClassTag, U: ClassTag](
val stream = new BufferedOutputStream(output, bufferSize)

new Thread("writer for R") {
override def run() {
override def run(): Unit = {
try {
SparkEnv.set(env)
val dataOut = new DataOutputStream(stream)
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/org/apache/spark/deploy/RRunner.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import org.apache.spark.util.RedirectThread
* subprocess and then has it connect back to the JVM to access system properties etc.
*/
object RRunner {
def main(args: Array[String]) {
def main(args: Array[String]): Unit = {
val rFile = PythonRunner.formatPath(args(0))

val otherArgs = args.slice(1, args.length)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.types.NumericType
@Experimental
class GroupedData protected[sql](df: DataFrame, groupingExprs: Seq[Expression]) {

private[this] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = {
private[sql] implicit def toDF(aggExprs: Seq[NamedExpression]): DataFrame = {
val namedGroupingExprs = groupingExprs.map {
case expr: NamedExpression => expr
case expr: Expression => Alias(expr, expr.prettyString)()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,7 @@ private[r] object SQLUtils {
case expr: Expression => Alias(expr, expr.simpleString)()
}
}
// TODO(davies): use internal API
val toDF = gd.getClass.getDeclaredMethods.filter(f => f.getName == "toDF").head
toDF.setAccessible(true)
toDF.invoke(gd, aggExprs).asInstanceOf[DataFrame]
gd.toDF(aggExprs)
}

def dfToRowRDD(df: DataFrame): JavaRDD[Array[Byte]] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,11 @@ class ApplicationMasterArguments(val args: Array[String]) {
}
}

if (primaryPyFile != null && primaryRFile != null) {
System.err.println("Cannot have primary-py-file and primary-r-file at the same time")
System.exit(-1)
}

userArgs = userArgsBuffer.readOnly
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,11 @@ private[spark] class ClientArguments(args: Array[String], sparkConf: SparkConf)
throw new IllegalArgumentException(getUsageMessage(args))
}
}

if (primaryPyFile != null && primaryRFile != null) {
throw new IllegalArgumentException("Cannot have primary-py-file and primary-r-file" +
" at the same time")
}
}

private def getUsageMessage(unknownParam: List[String] = null): String = {
Expand Down

0 comments on commit 59266d1

Please sign in to comment.