From 8b43aa76c25dd5b83088feb91dc3771af561e438 Mon Sep 17 00:00:00 2001 From: scwf Date: Wed, 12 Nov 2014 15:00:25 -0800 Subject: [PATCH 1/2] first version --- .../org/apache/spark/ExternalResource.scala | 29 ++++++++++++ .../spark/ExternalResourceManager.scala | 46 +++++++++++++++++++ .../scala/org/apache/spark/SparkContext.scala | 5 ++ 3 files changed, 80 insertions(+) create mode 100644 core/src/main/scala/org/apache/spark/ExternalResource.scala create mode 100644 core/src/main/scala/org/apache/spark/ExternalResourceManager.scala diff --git a/core/src/main/scala/org/apache/spark/ExternalResource.scala b/core/src/main/scala/org/apache/spark/ExternalResource.scala new file mode 100644 index 0000000000000..fdfb398bddfd2 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ExternalResource.scala @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +abstract class ExternalResource[T] extends Serializable with Logging{ + // initialize a resource T + def initialize: Unit + + // stop the resource + def stop: Unit + + // return this resource + def getResource: T +} diff --git a/core/src/main/scala/org/apache/spark/ExternalResourceManager.scala b/core/src/main/scala/org/apache/spark/ExternalResourceManager.scala new file mode 100644 index 0000000000000..600a4088e4f37 --- /dev/null +++ b/core/src/main/scala/org/apache/spark/ExternalResourceManager.scala @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import scala.collection.mutable.{HashMap, Stack} + + +class ExternalResourceManager { + // resource name -> resource class name + val resourceNameByClass = new HashMap[String, String] + + // resources, a Map of resource name -> resource stack + val resources = new HashMap[String, Stack[ExternalResource]] + + def addResources(resourceName: String, className: String): Unit = { + resourceNameByClass(resourceName) = className + resources(resourceName) = new Stack[ExternalResource] + } + + // ask for a resource + def askResource(name: String): ExternalResource = synchronized { + assert(resources.get(name) != None, s"resources stack for $name is None") + resources.get(name).get.pop() + } + + // return resource + def retResource(name: String, resource: ExternalResource): Unit = synchronized { + assert(resources.get(name) != None, s"resources stack for $name is None") + resources.get(name).get.push(resource) + } +} diff --git a/core/src/main/scala/org/apache/spark/SparkContext.scala b/core/src/main/scala/org/apache/spark/SparkContext.scala index 03ea672c813d1..1e8f97676ca22 100644 --- a/core/src/main/scala/org/apache/spark/SparkContext.scala +++ b/core/src/main/scala/org/apache/spark/SparkContext.scala @@ -925,6 +925,11 @@ class SparkContext(config: SparkConf) extends SparkStatusAPI with Logging { bc } + // second argument here to use string ok? + def registerExternalResource(name: String, externalResource: String): Unit = { + + } + /** * Add a file to be downloaded with this Spark job on every node. * The `path` passed can be either a local file, a file in HDFS (or other Hadoop-supported From c47dd3420a59119fcb43c58570077739aad73773 Mon Sep 17 00:00:00 2001 From: scwf Date: Sat, 15 Nov 2014 17:11:52 +0800 Subject: [PATCH 2/2] test case --- .../org/apache/spark/ExternalResource.scala | 16 +-- .../spark/ExternalResourceManager.scala | 23 ---- .../scala/org/apache/spark/TestExternal.scala | 114 ++++++++++++++++++ 3 files changed, 122 insertions(+), 31 deletions(-) create mode 100644 core/src/main/scala/org/apache/spark/TestExternal.scala diff --git a/core/src/main/scala/org/apache/spark/ExternalResource.scala b/core/src/main/scala/org/apache/spark/ExternalResource.scala index fdfb398bddfd2..9d317aa417620 100644 --- a/core/src/main/scala/org/apache/spark/ExternalResource.scala +++ b/core/src/main/scala/org/apache/spark/ExternalResource.scala @@ -17,13 +17,13 @@ package org.apache.spark -abstract class ExternalResource[T] extends Serializable with Logging{ - // initialize a resource T - def initialize: Unit +case class ExternalResource[T]( + name: String, + shared: Boolean = false, + params: Seq[_], + init: (Int, Seq[_]) => T = null, // Initialization function + term: (Int, T, Seq[_]) => Unit = null, // Termination function + partitionAffined: Boolean = false, // partition speficication preferred + expiration: Int = -1) extends Serializable { - // stop the resource - def stop: Unit - - // return this resource - def getResource: T } diff --git a/core/src/main/scala/org/apache/spark/ExternalResourceManager.scala b/core/src/main/scala/org/apache/spark/ExternalResourceManager.scala index 600a4088e4f37..c247d72a92b15 100644 --- a/core/src/main/scala/org/apache/spark/ExternalResourceManager.scala +++ b/core/src/main/scala/org/apache/spark/ExternalResourceManager.scala @@ -19,28 +19,5 @@ package org.apache.spark import scala.collection.mutable.{HashMap, Stack} - class ExternalResourceManager { - // resource name -> resource class name - val resourceNameByClass = new HashMap[String, String] - - // resources, a Map of resource name -> resource stack - val resources = new HashMap[String, Stack[ExternalResource]] - - def addResources(resourceName: String, className: String): Unit = { - resourceNameByClass(resourceName) = className - resources(resourceName) = new Stack[ExternalResource] - } - - // ask for a resource - def askResource(name: String): ExternalResource = synchronized { - assert(resources.get(name) != None, s"resources stack for $name is None") - resources.get(name).get.pop() - } - - // return resource - def retResource(name: String, resource: ExternalResource): Unit = synchronized { - assert(resources.get(name) != None, s"resources stack for $name is None") - resources.get(name).get.push(resource) - } } diff --git a/core/src/main/scala/org/apache/spark/TestExternal.scala b/core/src/main/scala/org/apache/spark/TestExternal.scala new file mode 100644 index 0000000000000..db537691fdfbc --- /dev/null +++ b/core/src/main/scala/org/apache/spark/TestExternal.scala @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark + +import java.sql.{DriverManager, Connection} + +object TestExternal { + def main (args: Array[String]) { + val sparkConf = new SparkConf().setAppName("test external") + val sc = new SparkContext(sparkConf) + args(0) match { + case "1" => test1(sc) + case "2" => test2(sc) + case _ => println("error paras") + } + sc.stop() + } + + def test1(sc: SparkContext) = { + def init(a: Int, b: Seq[Any]): String = { + b(2).asInstanceOf[String] + } + def term(a: Int, b: String, c: Seq[Any]): Unit = { + + } + val external = new ExternalResource[String]("test external", true, Seq(1, 2, "wf"), init _, term _) + + val wf = sc.broadcast(external) + + sc.parallelize(1 to 40, 4).foreachPartition { iter => + + val external = wf.value + + val seq = external.params + + val init = external.init + + println(init(1, seq)) + } + } + + def test2(sc: SparkContext) = { + + val driver = "com.mysql.jdbc.Driver" + val url = "jdbc:mysql://127.0.0.1/mysql" + val username = "ken" + val password = "km" + var myparams=Array(driver, url, username, password) + + def myinit(split: Int, params: Seq[_]): Connection = { + require(params.size > 3, s"parameters error, current param size: " + params.size) + val p = params + val driver = p(0).toString + val url = p(1).toString + val username = p(2).toString + val password = p(3).toString + + var connection:Connection = null + try { + val loader = Option(Thread.currentThread().getContextClassLoader + ).getOrElse(getClass.getClassLoader) + val x = Class.forName(driver, true, loader) + println(x.toString) + println("get driver class " + x) + connection = DriverManager.getConnection(url, username, password) + } catch { + case e: Throwable => e.printStackTrace + } + connection + } + + def myterm(split: Int, conn: Any, params: Seq[_]) = { + require(Option(conn) != None, "Connection error") + try{ + val c = conn.asInstanceOf[Connection] + c.close() + }catch { + case e: Throwable => e.printStackTrace + } + } + + val external = new ExternalResource[Connection]("mysql ExtRsc test", false, myparams, myinit _ , + myterm _) + + val wf = sc.broadcast(external) + sc.addJar("file:///home/kf/Downloads/mysql-connector-java-5.1.18.jar") + sc.parallelize(1 to 40, 4).foreachPartition { iter => + + val external = wf.value + + val seq = external.params + + val init = external.init + + println(init(1, seq)) + } + } + +}