From 814ae6f67720c254e0011fd985241508b1e0506b Mon Sep 17 00:00:00 2001 From: Daniel Pereira Date: Mon, 27 Sep 2021 15:56:18 -0400 Subject: [PATCH 1/4] Put try..catch in read files --- .../core/jobs/utils/SparkContextUtils.scala | 27 ++++++++++++++----- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala b/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala index e5155340..a07b146a 100644 --- a/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala +++ b/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala @@ -281,7 +281,7 @@ object SparkContextUtils { } catch { case NonFatal(ex) => println(s"Failed to read resource from '$path': ${ex.getMessage} -- ${ex.getFullStackTraceString}") - throw new Exception(s"Failed to read resource from '$path': ${ex.getMessage} -- ${ex.getFullStackTraceString}") + ArrayBuffer() } finally { close(inputStream, path) } @@ -333,7 +333,8 @@ object SparkContextUtils { AutoCloseableIterator.wrap(finalLines, () => close(inputStream, s"${file.path}, slice $slice")) } catch { case NonFatal(e) => - throw new Exception(s"Error on read compressed big file, slice=$slice, file=$file", e) + println(s"Error on read compressed big file, slice=$slice, file=$file \n $e") + AutoCloseableIterator.empty } } } @@ -348,13 +349,25 @@ object SparkContextUtils { "mapreduce.input.fileinputformat.split.maxsize" -> maxSplitSize.toString)) .foldLeft(new Configuration()) { case (acc, (k, v)) => acc.set(k, v); acc } - def read(file: HadoopFile, conf: Configuration) = sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat](conf = conf, fClass = classOf[TextInputFormat], - kClass = classOf[LongWritable], vClass = classOf[Text], path = file.path).map(pair => pair._2.toString) + def read(file: HadoopFile, conf: Configuration) = { - val confUncompressed = confWith(maxBytesPerPartition) - - val union = new UnionRDD(sc, bigFiles.map { file => + try { + sc.newAPIHadoopFile[LongWritable, Text, TextInputFormat]( + conf = conf, + fClass = classOf[TextInputFormat], + kClass = classOf[LongWritable], + vClass = classOf[Text], + path = file.path) + .map(pair => pair._2.toString) + } catch { + case NonFatal(e) => + println(s"Error on read file=$file\n $e") + sc.emptyRDD[String] + } + } + val confUncompressed = confWith(maxBytesPerPartition) + val union: UnionRDD[String] = new UnionRDD(sc, bigFiles.map { file => if (sizeBasedFileHandling.isCompressed(file)) readCompressedBigFile(file, maxBytesPerPartition, minPartitions, sizeBasedFileHandling) else From 3fd79232745785620f3e26b9c972f8aa16b054b0 Mon Sep 17 00:00:00 2001 From: Daniel Pereira Date: Tue, 14 Dec 2021 12:34:13 -0400 Subject: [PATCH 2/4] Support emr --- .../scala/ignition/core/jobs/CoreJobRunner.scala | 15 +++++++-------- tools/cluster.py | 7 +++++-- 2 files changed, 12 insertions(+), 10 deletions(-) diff --git a/src/main/scala/ignition/core/jobs/CoreJobRunner.scala b/src/main/scala/ignition/core/jobs/CoreJobRunner.scala index eb1c7014..2c904b5c 100644 --- a/src/main/scala/ignition/core/jobs/CoreJobRunner.scala +++ b/src/main/scala/ignition/core/jobs/CoreJobRunner.scala @@ -4,12 +4,16 @@ import org.apache.spark.SparkContext import org.apache.spark.sql.SparkSession import org.joda.time.{DateTime, DateTimeZone} import org.slf4j.{Logger, LoggerFactory} +import org.joda.time.format.{DateTimeFormat, DateTimeFormatter} import scala.concurrent.Future object CoreJobRunner { val logger: Logger = LoggerFactory.getLogger(getClass) + val tagFormat = "yyyy_MM_dd'T'HH_mm_ss'UTC'" + val today = DateTime.now().withZone(DateTimeZone.UTC) + val todayFormatted = DateTimeFormat.forPattern(tagFormat).print(today) case class RunnerContext(sparkContext: SparkContext, sparkSession: SparkSession, @@ -29,8 +33,8 @@ object CoreJobRunner { } case class RunnerConfig(setupName: String = "nosetup", - date: DateTime = DateTime.now.withZone(DateTimeZone.UTC), - tag: String = "notag", + date: DateTime = today, + tag: String = todayFormatted, user: String = "nouser", master: String = "local[*]", executorMemory: String = "2G", @@ -76,11 +80,6 @@ object CoreJobRunner { val builder = SparkSession.builder - builder.config("spark.executor.memory", config.executorMemory) - - builder.config("spark.eventLog.dir", "file:///media/tmp/spark-events") - - builder.master(config.master) builder.appName(appName) builder.config("spark.hadoop.mapred.output.committer.class", classOf[DirectOutputCommitter].getName()) @@ -114,7 +113,7 @@ object CoreJobRunner { } catch { case t: Throwable => t.printStackTrace() - System.exit(1) // force exit of all threads + throw t; } import scala.concurrent.ExecutionContext.Implicits.global diff --git a/tools/cluster.py b/tools/cluster.py index 7c50ccae..966dd987 100755 --- a/tools/cluster.py +++ b/tools/cluster.py @@ -423,6 +423,7 @@ def get_assembly_path(): @arg('--detached', help='Run job in background, requires tmux') @arg('--destroy-cluster', help='Will destroy cluster after finishing the job') @arg('--extra', action='append', type=str, help='Additional arguments for the job in the format k=v') +@arg('--disable-propagate-aws-credentials', help='Setting this to true will not propagate your AWS credentials from your environment to the master') @named('run') def job_run(cluster_name, job_name, job_mem, key_file=default_key_file, disable_tmux=False, @@ -439,6 +440,7 @@ def job_run(cluster_name, job_name, job_mem, region=default_region, driver_heap_size=default_driver_heap_size, remove_files=True, + disable_propagate_aws_credentials=False, extra=[]): utc_job_date_example = '2014-05-04T13:13:10Z' @@ -456,14 +458,15 @@ def job_run(cluster_name, job_name, job_mem, remote_hook = '{remote_path}/remote_hook.sh'.format(remote_path=remote_path) notify_param = 'yes' if notify_on_errors else 'no' yarn_param = 'yes' if yarn else 'no' + aws_vars = get_aws_keys_str() if not disable_propagate_aws_credentials else '' job_date = utc_job_date or datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') job_tag = job_tag or job_date.replace(':', '_').replace('-', '_').replace('Z', 'UTC') runner_extra_args = ' '.join('--runner-extra "%s"' % arg for arg in extra) tmux_wait_command = ';(echo Press enter to keep the session open && /bin/bash -c "read -t 5" && sleep 7d)' if not detached else '' tmux_arg = ". /etc/profile; . ~/.profile;tmux new-session {detached} -s spark.{job_name}.{job_tag} '{aws_vars} {remote_hook} {job_name} {job_date} {job_tag} {job_user} {remote_control_dir} {spark_mem} {yarn_param} {notify_param} {driver_heap_size} {runner_extra_args} {tmux_wait_command}' >& /tmp/commandoutput".format( - aws_vars=get_aws_keys_str(), job_name=job_name, job_date=job_date, job_tag=job_tag, job_user=job_user, remote_control_dir=remote_control_dir, remote_hook=remote_hook, spark_mem=job_mem, detached='-d' if detached else '', yarn_param=yarn_param, notify_param=notify_param, driver_heap_size=driver_heap_size, runner_extra_args=runner_extra_args, tmux_wait_command=tmux_wait_command) + aws_vars=aws_vars, job_name=job_name, job_date=job_date, job_tag=job_tag, job_user=job_user, remote_control_dir=remote_control_dir, remote_hook=remote_hook, spark_mem=job_mem, detached='-d' if detached else '', yarn_param=yarn_param, notify_param=notify_param, driver_heap_size=driver_heap_size, runner_extra_args=runner_extra_args, tmux_wait_command=tmux_wait_command) non_tmux_arg = ". /etc/profile; . ~/.profile;{aws_vars} {remote_hook} {job_name} {job_date} {job_tag} {job_user} {remote_control_dir} {spark_mem} {yarn_param} {notify_param} {driver_heap_size} {runner_extra_args} >& /tmp/commandoutput".format( - aws_vars=get_aws_keys_str(), job_name=job_name, job_date=job_date, job_tag=job_tag, job_user=job_user, remote_control_dir=remote_control_dir, remote_hook=remote_hook, spark_mem=job_mem, yarn_param=yarn_param, notify_param=notify_param, driver_heap_size=driver_heap_size, runner_extra_args=runner_extra_args) + aws_vars=aws_vars, job_name=job_name, job_date=job_date, job_tag=job_tag, job_user=job_user, remote_control_dir=remote_control_dir, remote_hook=remote_hook, spark_mem=job_mem, yarn_param=yarn_param, notify_param=notify_param, driver_heap_size=driver_heap_size, runner_extra_args=runner_extra_args) if not disable_assembly_build: From a83046e8f9010d63ecd232dda7e52bae0bf0e801 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A1udio=20Bordoni?= Date: Tue, 12 Apr 2022 13:11:12 -0400 Subject: [PATCH 3/4] Add blacklisted apikeys --- .../core/jobs/utils/SparkContextUtils.scala | 18 ++++++++++++++---- 1 file changed, 14 insertions(+), 4 deletions(-) diff --git a/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala b/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala index a07b146a..9bf3c05d 100644 --- a/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala +++ b/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala @@ -573,13 +573,18 @@ object SparkContextUtils { def isSuccessFile(file: HadoopFile): Boolean = file.path.endsWith("_SUCCESS") || file.path.endsWith("_FINISHED") + // Blacklisted apikeys. These apikeys are for test only and shouldn't be used in production + val blacklist = Set("casasbahia-v2", "pontofrio-v2", "extra-v2", "casasbahia-test") + + def excludeBlacklistedApikeys(file: HadoopFile): Boolean = !blacklist.exists(e => file.path.contains(e)) + def excludePatternValidation(file: HadoopFile): Boolean = - exclusionPattern.map(pattern => !file.path.matches(pattern)).getOrElse(true) + exclusionPattern.forall(pattern => !file.path.matches(pattern)) def endsWithValidation(file: HadoopFile): Boolean = - endsWith.map { pattern => + endsWith.forall { pattern => file.path.endsWith(pattern) || isSuccessFile(file) - }.getOrElse(true) + } def dateValidation(files: WithOptDate[Array[HadoopFile]]): Boolean = { val tryDate = files.date @@ -607,7 +612,12 @@ object SparkContextUtils { None else { val filtered = files.copy(value = files.value - .filter(excludePatternValidation).filter(endsWithValidation).filter(predicate)) + .filter(excludePatternValidation) + .filter(endsWithValidation) + .filter(predicate) + .filter(excludeBlacklistedApikeys) + ) + if (filtered.value.isEmpty || !dateValidation(filtered)) None else From c7626796680ca75e5261aa045256ebdc92d63f27 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Cl=C3=A1udio=20Bordoni?= Date: Tue, 19 Apr 2022 13:43:14 -0400 Subject: [PATCH 4/4] Log files --- src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala b/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala index 9bf3c05d..2e7917a1 100644 --- a/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala +++ b/src/main/scala/ignition/core/jobs/utils/SparkContextUtils.scala @@ -686,6 +686,8 @@ object SparkContextUtils { val foundFiles = listAndFilterFiles(path, requireSuccess, inclusiveStartDate, startDate, inclusiveEndDate, endDate, lastN, ignoreMalformedDates, endsWith, predicate = predicate) + logger.info(s"Found files ${foundFiles}") + if (foundFiles.size < minimumFiles) throw new Exception(s"Tried with start/end time equals to $startDate/$endDate for path $path but but the resulting number of files $foundFiles is less than the required")