Skip to content

Commit

Permalink
major refactoring to Cleanable impl to register & run cleansweep more…
Browse files Browse the repository at this point in the history
… efficiently

Lifespan initialization after shipping is rewritten to be more reliable

CleanWebDriver now has more resilient clean up impl

all printing in integration tests are either removed or replaced with logging

default console log level changed to WARN

Simplify NOTSerializable

aggregate validateBeforeAndAfterAll

instancesShouldBeClean condition is improved to check Compound types

MAX_TOTAL_MEMORY set to 8

TestHelper lifespan changed to use Hadoop ShutdownHookManager

reorganize Lifespan impls

several unit test no longer relies on manual check

Lifespan of Session changed to TaskOrJVM

test-reports now also list failed tests at the end

add --webdriver-loglevel=ERROR into phantomJS cli option

update session progress before & after driver dispatching

Remove a lock in Cleanable that may cause deadlock

upgrade sizzle to the latest

BroadcastWrapper won't run cleanup if SparkContext is stopped

2 shell/maven has been removed for being inefficient: set -e, --fail-at-end

add envvar to bypass bazelbuild/rules_closure#351 in ubuntu 22.04+

proxy conf of PhantomJS & HtmlUnit selenium drivers are unified
  • Loading branch information
Peng Cheng committed May 20, 2022
1 parent 763e894 commit 2846d28
Show file tree
Hide file tree
Showing 76 changed files with 3,561 additions and 2,943 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object ConflictDetection {

def conflicts: Seq[Try[Unit]] = {

val allObj = Cleanable.getTyped[ConflictDetection]
val allObj = Cleanable.All.typed[ConflictDetection]

val allResourceIDs: Map[String, Seq[Any]] = allObj
.map {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package com.tribbloids.spookystuff

import java.util.UUID
import java.util.concurrent.TimeUnit
import com.esotericsoftware.kryo.Kryo
import com.tribbloids.spookystuff.conf.{Dir, SpookyConf}
import com.tribbloids.spookystuff.dsl._
import com.tribbloids.spookystuff.doc._
import com.tribbloids.spookystuff.dsl._
import com.tribbloids.spookystuff.metrics.SpookyMetrics
import com.tribbloids.spookystuff.row.FetchedRow
import org.apache.spark.SerializableWritable
import org.apache.spark.serializer.KryoRegistrator
import org.apache.spark.sql.catalyst.ScalaReflection.universe.TypeTag

import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.collection.immutable.ListMap
import scala.concurrent.duration.FiniteDuration

Expand All @@ -37,7 +36,6 @@ class SpookyKryoRegistrator extends KryoRegistrator {
//used by broadcast & accumulator
classOf[SpookyConf],
classOf[Dir.Conf],
classOf[SerializableWritable[_]],
classOf[SpookyContext],
classOf[SpookyMetrics],
//used by Expressions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,6 @@ trait Action extends ActionLike with TraceAPI {
case ae: ActionException => ae
case _ => new ActionException(message, e)
}
// ex.setStackTrace(e.getStackTrace)
throw ex
}

Expand Down Expand Up @@ -99,7 +98,7 @@ trait Action extends ActionLike with TraceAPI {
}
}

protected[actions] def withDriversDuring[T](session: Session)(f: => T): T = {
protected[actions] def withTimeoutDuring[T](session: Session)(f: => T): T = {

var baseStr = s"[${session.taskContextOpt.map(_.partitionId()).getOrElse(0)}]+> ${this.toString}"
this match {
Expand All @@ -121,7 +120,7 @@ trait Action extends ActionLike with TraceAPI {
}

final def exe(session: Session): Seq[DocOption] = {
withDriversDuring(session) {
withTimeoutDuring(session) {
doExe(session)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object ClusterRetry {
e
)
} else logger.warn(s"Failover on ${e.getClass.getSimpleName}: Cluster-wise retries has depleted")
logger.info("\t\\-->", e)
logger.debug("\t\\-->", e)
}

pages
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,27 +2,26 @@ package com.tribbloids.spookystuff.actions

import com.tribbloids.spookystuff.Const
import com.tribbloids.spookystuff.session.Session
import com.tribbloids.spookystuff.utils.TimeoutConf

import com.tribbloids.spookystuff.utils.Timeout

trait Timed extends Action {

var _timeout: TimeoutConf = _
var _timeout: Timeout = _

def in(timeout: TimeoutConf): this.type = {
def in(timeout: Timeout): this.type = {
this._timeout = timeout
this
}

def timeout(session: Session): TimeoutConf = {
def timeout(session: Session): Timeout = {
val base =
if (this._timeout == null) session.spooky.spookyConf.remoteResourceTimeout
else this._timeout

base
}

def hardTerminateTimeout(session: Session): TimeoutConf = {
def hardTerminateTimeout(session: Session): Timeout = {
val original = timeout(session)
original.copy(max = original.max + Const.hardTerminateOverhead)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ package com.tribbloids.spookystuff.conf

import com.tribbloids.spookystuff.session.{DriverLike, Session}
import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentMap
import com.tribbloids.spookystuff.utils.lifespan.{Cleanable, Lifespan}
import com.tribbloids.spookystuff.utils.lifespan.Cleanable.Lifespan
import com.tribbloids.spookystuff.utils.lifespan.Cleanable
import com.tribbloids.spookystuff.{DriverStatus, SpookyContext}
import org.apache.spark.TaskContext

Expand All @@ -38,7 +39,8 @@ sealed abstract class DriverFactory[D <: DriverLike] extends Serializable {
// release all Drivers that belong to a session
def release(session: Session): Unit

def driverLifespan(session: Session): Lifespan = Lifespan.TaskOrJVM(ctxFactory = () => session.lifespan.ctx)
def driverLifespan(session: Session): Lifespan =
Lifespan.TaskOrJVM(ctxFactory = () => session.lifespan.ctx).forShipping

def deployGlobally(spooky: SpookyContext): Unit = {}
}
Expand Down Expand Up @@ -80,7 +82,7 @@ object DriverFactory {

final def destroy(driver: D, tcOpt: Option[TaskContext]): Unit = {
driver match {
case v: Cleanable => v.tryClean()
case v: Cleanable => v.clean()
case _ =>
}
}
Expand All @@ -107,11 +109,11 @@ object DriverFactory {
override def dispatch(session: Session): D = {

val ls = driverLifespan(session)
val taskLocalOpt = taskLocals.get(ls.batchIDs)
val taskLocalOpt = taskLocals.get(ls.registeredID)

def newDriver: D = {
val fresh = delegate.create(session)
taskLocals.put(ls.batchIDs, new DriverStatus(fresh))
taskLocals.put(ls.registeredID, new DriverStatus(fresh))
fresh
}

Expand Down Expand Up @@ -144,34 +146,12 @@ object DriverFactory {
override def release(session: Session): Unit = {

val ls = driverLifespan(session)
val statusOpt = taskLocals.get(ls.batchIDs)
val statusOpt = taskLocals.get(ls.registeredID)
statusOpt.foreach { status =>
status.isBusy = false
}
}

override def deployGlobally(spooky: SpookyContext): Unit = delegate.deployGlobally(spooky)
}

////just for debugging
////a bug in this driver has caused it unusable in Firefox 32
//object FirefoxDriverFactory extends DriverFactory {
//
// val baseCaps = new DesiredCapabilities
// // baseCaps.setJavascriptEnabled(true); //< not really needed: JS enabled by default
// // baseCaps.setCapability(CapabilityType.SUPPORTS_FINDING_BY_CSS,true)
//
// // val FirefoxRootPath = "/usr/lib/phantomjs/"
// // baseCaps.setCapability("webdriver.firefox.bin", "firefox");
// // baseCaps.setCapability("webdriver.firefox.profile", "WebDriver");
//
// override def newInstance(capabilities: Capabilities, spooky: SpookyContext): WebDriver = {
// val newCap = baseCaps.merge(capabilities)
//
// Utils.retry(Const.DFSInPartitionRetry) {
// Utils.withDeadline(spooky.distributedResourceTimeout) {new FirefoxDriver(newCap)}
// }
// }
//}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.tribbloids.spookystuff.conf

import com.tribbloids.spookystuff.SpookyContext
import com.tribbloids.spookystuff.session.{PythonDriver, Session}
import com.tribbloids.spookystuff.utils.lifespan.Lifespan
import com.tribbloids.spookystuff.utils.lifespan.Cleanable.Lifespan

case class PythonDriverFactory(
getExecutable: SpookyContext => String
Expand All @@ -18,5 +18,7 @@ case class PythonDriverFactory(

object PythonDriverFactory {

object _3 extends PythonDriverFactory((_: SpookyContext) => "python3")
lazy val python3 = "python3"

object _3 extends PythonDriverFactory((_: SpookyContext) => python3)
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.tribbloids.spookystuff.conf
import com.tribbloids.spookystuff.dsl._
import com.tribbloids.spookystuff.row.Sampler
import com.tribbloids.spookystuff.session._
import com.tribbloids.spookystuff.utils.TimeoutConf
import com.tribbloids.spookystuff.utils.Timeout
import org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel

Expand Down Expand Up @@ -46,8 +46,8 @@ case class SpookyConf(
var errorDump: Boolean = true,
var errorScreenshot: Boolean = true,
var errorDumpFilePath: ByDoc[String] = FilePaths.UUIDName(FilePaths.Hierarchical),
var remoteResourceTimeout: TimeoutConf = TimeoutConf(60.seconds),
var DFSTimeout: TimeoutConf = TimeoutConf(40.seconds),
var remoteResourceTimeout: Timeout = Timeout(60.seconds),
var DFSTimeout: Timeout = Timeout(40.seconds),
var failOnDFSRead: Boolean = false,
var defaultJoinType: JoinType = JoinType.Inner,
var defaultFlattenSampler: Sampler[Any] = identity,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,19 +2,20 @@ package com.tribbloids.spookystuff.python.ref

import com.tribbloids.spookystuff.SpookyContext
import com.tribbloids.spookystuff.session.PythonDriver
import com.tribbloids.spookystuff.utils.lifespan.{Lifespan, LocalCleanable}
import com.tribbloids.spookystuff.utils.lifespan.Cleanable.Lifespan
import com.tribbloids.spookystuff.utils.lifespan.LocalCleanable

trait BindedRef extends PyRef with LocalCleanable {

def driverTemplate: PythonDriver

@transient var _driver: PythonDriver = _
def driver = this.synchronized {
def driver: PythonDriver = this.synchronized {
Option(_driver).getOrElse {
val v = new PythonDriver(
driverTemplate.pythonExe,
driverTemplate.autoImports,
_lifespan = new Lifespan.JVM(nameOpt = Some(this.getClass.getSimpleName))
_lifespan = Lifespan.JVM.apply(nameOpt = Some(this.getClass.getSimpleName))
)
_driver = v
v
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import com.tribbloids.spookystuff.SpookyContext
import com.tribbloids.spookystuff.python.PyConverter
import com.tribbloids.spookystuff.session.{PythonDriver, Session}
import com.tribbloids.spookystuff.utils.CachingUtils.ConcurrentMap
import com.tribbloids.spookystuff.utils.TreeThrowable
import com.tribbloids.spookystuff.utils.lifespan.Cleanable
import org.apache.spark.ml.dsl.utils._

import scala.util.Try

trait PyRef extends Cleanable {

Expand Down Expand Up @@ -103,9 +105,11 @@ trait PyRef extends Cleanable {

override protected def cleanImpl(): Unit = {

bindings.foreach { binding =>
binding.clean(true)
}
TreeThrowable.&&&(
bindings.map { binding =>
Try(binding.clean(true))
}
)
}
}

Expand All @@ -114,8 +118,8 @@ object PyRef {
object ROOT extends PyRef {}

def sanityCheck(): Unit = {
val subs = Cleanable.getTyped[PyBinding]
val refSubs = Cleanable.getTyped[PyRef].map(_.bindings)
val subs = Cleanable.All.typed[PyBinding]
val refSubs = Cleanable.All.typed[PyRef].map(_.bindings)
assert(
subs.intersect(refSubs).size <= refSubs.size, {
"INTERNAL ERROR: dangling tree!"
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ object ConflictDetection {

def conflicts: Seq[Try[Unit]] = {

val allObj = Cleanable.getTyped[ConflictDetection]
val allObj = Cleanable.All.typed[ConflictDetection]

val allResourceIDs: Map[String, Seq[Any]] = allObj
.map {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package com.tribbloids.spookystuff.session

import com.tribbloids.spookystuff.conf.Python
import com.tribbloids.spookystuff.driver.PythonProcess
import com.tribbloids.spookystuff.utils.lifespan.Lifespan
import com.tribbloids.spookystuff.utils.{BypassingRule, CommonUtils, SpookyUtils}
import com.tribbloids.spookystuff.utils.lifespan.Cleanable.Lifespan
import com.tribbloids.spookystuff.utils.{BypassingRule, CommonConst, CommonUtils, SpookyUtils}
import com.tribbloids.spookystuff.{PyException, PyInterpretationException, SpookyContext}
import org.apache.commons.io.FileUtils
import org.apache.spark.ml.dsl.utils.DSLUtils
Expand Down Expand Up @@ -94,12 +94,10 @@ class PythonDriver(
|import os
|from __future__ import print_function
""".trim.stripMargin,
override val _lifespan: Lifespan = Lifespan.TaskOrJVM()
override val _lifespan: Lifespan = Lifespan.TaskOrJVM().forShipping
) extends PythonProcess(pythonExe)
with DriverLike {

import scala.concurrent.duration._

/**
* NOT thread safe
*/
Expand Down Expand Up @@ -143,10 +141,10 @@ class PythonDriver(

override def cleanImpl(): Unit = {
Try {
CommonUtils.retry(5) {
CommonUtils.retry(CommonConst.driverClosingRetries) {
try {
if (process.isAlive) {
CommonUtils.withTimeout(3.seconds) {
CommonUtils.withTimeout(CommonConst.driverClosingTimeout) {
try {
this._interpret("exit()")
} catch {
Expand Down
Loading

0 comments on commit 2846d28

Please sign in to comment.