-
Notifications
You must be signed in to change notification settings - Fork 360
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[WX-1410] Sanitize 4 byte UTF-8 characters before inserting into METADATA_ENTRY #7414
Changes from all commits
efe8ca0
5235ace
9a9521f
e591bca
afc8865
68567c3
7190a69
02f41e2
14ceb40
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,10 +6,11 @@ | |
import cromwell.core.Mailbox.PriorityMailbox | ||
import cromwell.core.WorkflowId | ||
import cromwell.core.instrumentation.InstrumentationPrefixes | ||
import cromwell.services.metadata.MetadataEvent | ||
import cromwell.services.metadata.{MetadataEvent, MetadataValue} | ||
import cromwell.services.metadata.MetadataService._ | ||
import cromwell.services.metadata.impl.MetadataStatisticsRecorder.MetadataStatisticsRecorderSettings | ||
import cromwell.services.{EnhancedBatchActor, MetadataServicesStore} | ||
import wdl.util.StringUtil | ||
|
||
import scala.concurrent.duration._ | ||
import scala.util.{Failure, Success} | ||
|
@@ -18,7 +19,8 @@ | |
override val flushRate: FiniteDuration, | ||
override val serviceRegistryActor: ActorRef, | ||
override val threshold: Int, | ||
metadataStatisticsRecorderSettings: MetadataStatisticsRecorderSettings | ||
metadataStatisticsRecorderSettings: MetadataStatisticsRecorderSettings, | ||
metadataKeysToClean: List[String] | ||
) extends EnhancedBatchActor[MetadataWriteAction](flushRate, batchSize) | ||
with ActorLogging | ||
with MetadataDatabaseAccess | ||
|
@@ -27,9 +29,10 @@ | |
private val statsRecorder = MetadataStatisticsRecorder(metadataStatisticsRecorderSettings) | ||
|
||
override def process(e: NonEmptyVector[MetadataWriteAction]) = instrumentedProcess { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Small recommendation for performance reasons:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe this is resolved by passing in the |
||
val cleanedMetadataWriteActions = if (metadataKeysToClean.isEmpty) e else sanitizeInputs(e) | ||
val empty = (Vector.empty[MetadataEvent], List.empty[(Iterable[MetadataEvent], ActorRef)]) | ||
|
||
val (putWithoutResponse, putWithResponse) = e.foldLeft(empty) { | ||
val (putWithoutResponse, putWithResponse) = cleanedMetadataWriteActions.foldLeft(empty) { | ||
case ((putEvents, putAndRespondEvents), action: PutMetadataAction) => | ||
(putEvents ++ action.events, putAndRespondEvents) | ||
case ((putEvents, putAndRespondEvents), action: PutMetadataActionAndRespond) => | ||
|
@@ -46,7 +49,7 @@ | |
case Success(_) => | ||
putWithResponse foreach { case (ev, replyTo) => replyTo ! MetadataWriteSuccess(ev) } | ||
case Failure(cause) => | ||
val (outOfTries, stillGood) = e.toVector.partition(_.maxAttempts <= 1) | ||
val (outOfTries, stillGood) = cleanedMetadataWriteActions.toVector.partition(_.maxAttempts <= 1) | ||
|
||
handleOutOfTries(outOfTries, cause) | ||
handleEventsToReconsider(stillGood) | ||
|
@@ -55,6 +58,23 @@ | |
dbAction.map(_ => allPutEvents.size) | ||
} | ||
|
||
def sanitizeInputs( | ||
metadataWriteActions: NonEmptyVector[MetadataWriteAction] | ||
): NonEmptyVector[MetadataWriteAction] = | ||
metadataWriteActions.map { metadataWriteAction => | ||
val metadataEvents = | ||
metadataWriteAction.events.map { event => | ||
event.value match { | ||
case Some(eventVal) => event.copy(value = Option(MetadataValue(StringUtil.cleanUtf8mb4(eventVal.value)))) | ||
case None => event | ||
Check warning on line 69 in services/src/main/scala/cromwell/services/metadata/impl/WriteMetadataActor.scala Codecov / codecov/patchservices/src/main/scala/cromwell/services/metadata/impl/WriteMetadataActor.scala#L66-L69
|
||
} | ||
} | ||
metadataWriteAction match { | ||
case action: PutMetadataAction => action.copy(events = metadataEvents) | ||
case actionAndResp: PutMetadataActionAndRespond => actionAndResp.copy(events = metadataEvents) | ||
Check warning on line 74 in services/src/main/scala/cromwell/services/metadata/impl/WriteMetadataActor.scala Codecov / codecov/patchservices/src/main/scala/cromwell/services/metadata/impl/WriteMetadataActor.scala#L73-L74
|
||
} | ||
} | ||
|
||
private def countActionsByWorkflow(writeActions: Vector[MetadataWriteAction]): Map[WorkflowId, Int] = | ||
writeActions.flatMap(_.events).groupBy(_.key.workflowId).map { case (k, v) => k -> v.size } | ||
|
||
|
@@ -106,9 +126,18 @@ | |
flushRate: FiniteDuration, | ||
serviceRegistryActor: ActorRef, | ||
threshold: Int, | ||
statisticsRecorderSettings: MetadataStatisticsRecorderSettings | ||
statisticsRecorderSettings: MetadataStatisticsRecorderSettings, | ||
metadataKeysToClean: List[String] | ||
): Props = | ||
Props(new WriteMetadataActor(dbBatchSize, flushRate, serviceRegistryActor, threshold, statisticsRecorderSettings)) | ||
Props( | ||
new WriteMetadataActor(dbBatchSize, | ||
flushRate, | ||
serviceRegistryActor, | ||
threshold, | ||
statisticsRecorderSettings, | ||
metadataKeysToClean | ||
) | ||
) | ||
.withDispatcher(ServiceDispatcher) | ||
.withMailbox(PriorityMailbox) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,6 +8,7 @@ import cromwell.core.ExecutionEvent | |
import cromwell.core.logging.JobLogger | ||
import mouse.all._ | ||
import PipelinesUtilityConversions._ | ||
import wdl.util.StringUtil | ||
|
||
import scala.language.postfixOps | ||
|
||
|
@@ -67,7 +68,7 @@ trait PipelinesUtilityConversions { | |
// characters (typically emoji). Some databases have trouble storing these; replace them with the standard | ||
// "unknown character" unicode symbol. | ||
val name = Option(event.getContainerStopped) match { | ||
case Some(_) => cleanUtf8mb4(event.getDescription) | ||
case Some(_) => StringUtil.cleanUtf8mb4(event.getDescription) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we use the config to include this metadata in what we're sanitizing rather than cleaning it here? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Followup: in theory we could, but we would need to check every single
Leaving this comment in case anyone else has this idea, but I'm OK leaving this as-is for now. Would be great to add a comment explaining this context, though (we sanitize other metadata values elsewhere, but are handing this one differently). We'll also need to keep in mind that we may need to do something different for Batch. I'll create a ticket in that epic. |
||
case _ => event.getDescription | ||
} | ||
|
||
|
@@ -101,9 +102,4 @@ object PipelinesUtilityConversions { | |
None | ||
} | ||
} | ||
|
||
lazy val utf8mb4Regex = "[\\x{10000}-\\x{FFFFF}]" | ||
lazy val utf8mb3Replacement = "\uFFFD" // This is the standard char for replacing invalid/unknown unicode chars | ||
def cleanUtf8mb4(in: String): String = | ||
in.replaceAll(utf8mb4Regex, utf8mb3Replacement) | ||
} |
This file was deleted.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great comment!