Skip to content

Commit

Permalink
[WX-1410] Sanitize 4 byte UTF-8 characters before inserting into META…
Browse files Browse the repository at this point in the history
…DATA_ENTRY (#7414)
  • Loading branch information
rsaperst authored May 2, 2024
1 parent f9372f9 commit 9a411cf
Show file tree
Hide file tree
Showing 9 changed files with 172 additions and 47 deletions.
5 changes: 5 additions & 0 deletions cromwell.example.backends/cromwell.examples.conf
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,11 @@ services {
# # count against this limit.
# metadata-read-row-number-safety-threshold = 1000000
#
# # Remove any UTF-8 mb4 (4 byte) characters from metadata keys in the list.
# # These characters (namely emojis) will cause metadata writing to fail in database collations
# # that do not support 4 byte UTF-8 characters.
# metadata-keys-to-sanitize-utf8mb4 = ["submittedFiles:workflow", "commandLine"]
#
# metadata-write-statistics {
# # Not strictly necessary since the 'metadata-write-statistics' section itself is enough for statistics to be recorded.
# # However, this can be set to 'false' to disable statistics collection without deleting the section.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,12 +110,14 @@ case class MetadataServiceActor(serviceConfig: Config, globalConfig: Config, ser
val metadataWriteStatisticsConfig = MetadataStatisticsRecorderSettings(
serviceConfig.as[Option[Config]]("metadata-write-statistics")
)
val metadataKeysToClean = serviceConfig.getOrElse[List[String]]("metadata-keys-to-sanitize-utf8mb4", List())
val writeActor = context.actorOf(
WriteMetadataActor.props(dbBatchSize,
dbFlushRate,
serviceRegistryActor,
LoadConfig.MetadataWriteThreshold,
metadataWriteStatisticsConfig
metadataWriteStatisticsConfig,
metadataKeysToClean
),
"WriteMetadataActor"
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ import cromwell.core.Dispatcher.ServiceDispatcher
import cromwell.core.Mailbox.PriorityMailbox
import cromwell.core.WorkflowId
import cromwell.core.instrumentation.InstrumentationPrefixes
import cromwell.services.metadata.MetadataEvent
import cromwell.services.metadata.{MetadataEvent, MetadataValue}
import cromwell.services.metadata.MetadataService._
import cromwell.services.metadata.impl.MetadataStatisticsRecorder.MetadataStatisticsRecorderSettings
import cromwell.services.{EnhancedBatchActor, MetadataServicesStore}
import wdl.util.StringUtil

import scala.concurrent.duration._
import scala.util.{Failure, Success}
Expand All @@ -18,7 +19,8 @@ class WriteMetadataActor(override val batchSize: Int,
override val flushRate: FiniteDuration,
override val serviceRegistryActor: ActorRef,
override val threshold: Int,
metadataStatisticsRecorderSettings: MetadataStatisticsRecorderSettings
metadataStatisticsRecorderSettings: MetadataStatisticsRecorderSettings,
metadataKeysToClean: List[String]
) extends EnhancedBatchActor[MetadataWriteAction](flushRate, batchSize)
with ActorLogging
with MetadataDatabaseAccess
Expand All @@ -27,9 +29,10 @@ class WriteMetadataActor(override val batchSize: Int,
private val statsRecorder = MetadataStatisticsRecorder(metadataStatisticsRecorderSettings)

override def process(e: NonEmptyVector[MetadataWriteAction]) = instrumentedProcess {
val cleanedMetadataWriteActions = if (metadataKeysToClean.isEmpty) e else sanitizeInputs(e)
val empty = (Vector.empty[MetadataEvent], List.empty[(Iterable[MetadataEvent], ActorRef)])

val (putWithoutResponse, putWithResponse) = e.foldLeft(empty) {
val (putWithoutResponse, putWithResponse) = cleanedMetadataWriteActions.foldLeft(empty) {
case ((putEvents, putAndRespondEvents), action: PutMetadataAction) =>
(putEvents ++ action.events, putAndRespondEvents)
case ((putEvents, putAndRespondEvents), action: PutMetadataActionAndRespond) =>
Expand All @@ -46,7 +49,7 @@ class WriteMetadataActor(override val batchSize: Int,
case Success(_) =>
putWithResponse foreach { case (ev, replyTo) => replyTo ! MetadataWriteSuccess(ev) }
case Failure(cause) =>
val (outOfTries, stillGood) = e.toVector.partition(_.maxAttempts <= 1)
val (outOfTries, stillGood) = cleanedMetadataWriteActions.toVector.partition(_.maxAttempts <= 1)

handleOutOfTries(outOfTries, cause)
handleEventsToReconsider(stillGood)
Expand All @@ -55,6 +58,23 @@ class WriteMetadataActor(override val batchSize: Int,
dbAction.map(_ => allPutEvents.size)
}

def sanitizeInputs(
metadataWriteActions: NonEmptyVector[MetadataWriteAction]
): NonEmptyVector[MetadataWriteAction] =
metadataWriteActions.map { metadataWriteAction =>
val metadataEvents =
metadataWriteAction.events.map { event =>
event.value match {
case Some(eventVal) => event.copy(value = Option(MetadataValue(StringUtil.cleanUtf8mb4(eventVal.value))))
case None => event
}
}
metadataWriteAction match {
case action: PutMetadataAction => action.copy(events = metadataEvents)
case actionAndResp: PutMetadataActionAndRespond => actionAndResp.copy(events = metadataEvents)
}
}

private def countActionsByWorkflow(writeActions: Vector[MetadataWriteAction]): Map[WorkflowId, Int] =
writeActions.flatMap(_.events).groupBy(_.key.workflowId).map { case (k, v) => k -> v.size }

Expand Down Expand Up @@ -106,9 +126,18 @@ object WriteMetadataActor {
flushRate: FiniteDuration,
serviceRegistryActor: ActorRef,
threshold: Int,
statisticsRecorderSettings: MetadataStatisticsRecorderSettings
statisticsRecorderSettings: MetadataStatisticsRecorderSettings,
metadataKeysToClean: List[String]
): Props =
Props(new WriteMetadataActor(dbBatchSize, flushRate, serviceRegistryActor, threshold, statisticsRecorderSettings))
Props(
new WriteMetadataActor(dbBatchSize,
flushRate,
serviceRegistryActor,
threshold,
statisticsRecorderSettings,
metadataKeysToClean
)
)
.withDispatcher(ServiceDispatcher)
.withMailbox(PriorityMailbox)
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class WriteMetadataActorBenchmark extends TestKitSuite with AnyFlatSpecLike with

it should "provide good throughput" taggedAs IntegrationTest in {
val writeActor =
TestFSMRef(new WriteMetadataActor(1000, 5.seconds, registry, Int.MaxValue, MetadataStatisticsDisabled) {
TestFSMRef(new WriteMetadataActor(1000, 5.seconds, registry, Int.MaxValue, MetadataStatisticsDisabled, List()) {
override val metadataDatabaseInterface: MetadataSlickDatabase = dataAccess
})

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc

it should "process jobs in the correct batch sizes" in {
val registry = TestProbe().ref
val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue) {
val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List()) {
override val metadataDatabaseInterface = mockDatabaseInterface(0)
})

Expand Down Expand Up @@ -71,9 +71,10 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc
failuresBetweenSuccessValues foreach { failureRate =>
it should s"succeed metadata writes and respond to all senders even with $failureRate failures between each success" in {
val registry = TestProbe().ref
val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue) {
override val metadataDatabaseInterface = mockDatabaseInterface(failureRate)
})
val writeActor =
TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List()) {
override val metadataDatabaseInterface = mockDatabaseInterface(failureRate)
})

def metadataEvent(index: Int, probe: ActorRef) =
PutMetadataActionAndRespond(List(
Expand Down Expand Up @@ -111,7 +112,7 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc

it should s"fail metadata writes and respond to all senders with failures" in {
val registry = TestProbe().ref
val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue) {
val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List()) {
override val metadataDatabaseInterface = mockDatabaseInterface(100)
})

Expand Down Expand Up @@ -146,6 +147,90 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc
writeActor.stop()
}

it should s"test removing emojis from metadata works as expected" in {
val registry = TestProbe().ref
val writeActor =
TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List("metadata_key")) {
override val metadataDatabaseInterface = mockDatabaseInterface(100)
})

def metadataEvent(index: Int, probe: ActorRef) = PutMetadataActionAndRespond(
List(
MetadataEvent(MetadataKey(WorkflowId.randomId(), None, "metadata_key"), MetadataValue(s"🎉_$index"))
),
probe
)

val probes = (0 until 43)
.map { _ =>
val probe = TestProbe()
probe
}
.zipWithIndex
.map { case (probe, index) =>
probe -> metadataEvent(index, probe.ref)
}

val metadataWriteActions = probes.map(probe => probe._2).toVector
val metadataWriteActionNE = NonEmptyVector(metadataWriteActions.head, metadataWriteActions.tail)

val sanitizedWriteActions = writeActor.underlyingActor.sanitizeInputs(metadataWriteActionNE)

sanitizedWriteActions.map { writeAction =>
writeAction.events.map { event =>
if (event.value.getOrElse(fail("Removed value from metadata event")).value.matches("[\\x{10000}-\\x{FFFFF}]")) {
fail("Metadata event contains emoji")
}

if (!event.value.getOrElse(fail("Removed value from metadata event")).value.contains("\uFFFD")) {
fail("Incorrect character used to replace emoji")
}
}
}
}

it should s"test removing emojis from metadata which doesn't contain emojis returns the string" in {
val registry = TestProbe().ref
val writeActor =
TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List("metadata_key")) {
override val metadataDatabaseInterface = mockDatabaseInterface(100)
})

def metadataEvent(index: Int, probe: ActorRef) = PutMetadataActionAndRespond(
List(
MetadataEvent(MetadataKey(WorkflowId.randomId(), None, "metadata_key"), MetadataValue(s"hello_$index"))
),
probe
)

val probes = (0 until 43)
.map { _ =>
val probe = TestProbe()
probe
}
.zipWithIndex
.map { case (probe, index) =>
probe -> metadataEvent(index, probe.ref)
}

val metadataWriteActions = probes.map(probe => probe._2).toVector
val metadataWriteActionNE = NonEmptyVector(metadataWriteActions.head, metadataWriteActions.tail)

val sanitizedWriteActions = writeActor.underlyingActor.sanitizeInputs(metadataWriteActionNE)

sanitizedWriteActions.map { writeAction =>
writeAction.events.map { event =>
if (event.value.getOrElse(fail("Removed value from metadata event")).value.matches("[\\x{10000}-\\x{FFFFF}]")) {
fail("Metadata event contains emoji")
}

if (event.value.getOrElse(fail("Removed value from metadata event")).value.contains("\uFFFD")) {
fail("Incorrectly replaced character in metadata event")
}
}
}
}

// Mock database interface.
// A customizable number of failures occur between each success
def mockDatabaseInterface(failuresBetweenEachSuccess: Int) = new MetadataSqlDatabase with SqlDatabase {
Expand Down Expand Up @@ -382,8 +467,15 @@ object WriteMetadataActorSpec {
class BatchSizeCountingWriteMetadataActor(override val batchSize: Int,
override val flushRate: FiniteDuration,
override val serviceRegistryActor: ActorRef,
override val threshold: Int
) extends WriteMetadataActor(batchSize, flushRate, serviceRegistryActor, threshold, MetadataStatisticsDisabled) {
override val threshold: Int,
val metadataKeysToClean: List[String]
) extends WriteMetadataActor(batchSize,
flushRate,
serviceRegistryActor,
threshold,
MetadataStatisticsDisabled,
metadataKeysToClean
) {

var batchSizes: Vector[Int] = Vector.empty
var failureCount: Int = 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import cromwell.core.ExecutionEvent
import cromwell.core.logging.JobLogger
import mouse.all._
import PipelinesUtilityConversions._
import wdl.util.StringUtil

import scala.language.postfixOps

Expand Down Expand Up @@ -67,7 +68,7 @@ trait PipelinesUtilityConversions {
// characters (typically emoji). Some databases have trouble storing these; replace them with the standard
// "unknown character" unicode symbol.
val name = Option(event.getContainerStopped) match {
case Some(_) => cleanUtf8mb4(event.getDescription)
case Some(_) => StringUtil.cleanUtf8mb4(event.getDescription)
case _ => event.getDescription
}

Expand Down Expand Up @@ -101,9 +102,4 @@ object PipelinesUtilityConversions {
None
}
}

lazy val utf8mb4Regex = "[\\x{10000}-\\x{FFFFF}]"
lazy val utf8mb3Replacement = "\uFFFD" // This is the standard char for replacing invalid/unknown unicode chars
def cleanUtf8mb4(in: String): String =
in.replaceAll(utf8mb4Regex, utf8mb3Replacement)
}

This file was deleted.

17 changes: 17 additions & 0 deletions wdl/model/draft2/src/test/scala/wdl/util/StringUtilSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -70,4 +70,21 @@ class StringUtilSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers
}
}

it should "not modify strings that contain only ascii characters" in {
val input = "hi there!?"
StringUtil.cleanUtf8mb4(input) shouldBe input
}

it should "not modify strings with 3-byte unicode characters" in {
val input = "Here is my non-ascii character: \u1234 Do you like it?"
StringUtil.cleanUtf8mb4(input) shouldBe input
}

it should "replace 4-byte unicode characters" in {
val cry = new String(Character.toChars(Integer.parseInt("1F62D", 16)))
val barf = new String(Character.toChars(Integer.parseInt("1F92E", 16)))
val input = s"When I try to put an emoji in the database it $barf and then I $cry"
val cleaned = "When I try to put an emoji in the database it \uFFFD and then I \uFFFD"
StringUtil.cleanUtf8mb4(input) shouldBe cleaned
}
}
10 changes: 10 additions & 0 deletions wom/src/main/scala/wdl/util/StringUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import scala.annotation.tailrec
* WOMmy TaskDefinition. That should get straightened out. */
object StringUtil {
val Ws = Pattern.compile("[\\ \\t]+")
val utf8mb4Regex = "[\\x{10000}-\\x{FFFFF}]"
val utf8mb3Replacement = "\uFFFD" // This is the standard char for replacing

/**
* 1) Remove all leading newline chars
Expand Down Expand Up @@ -63,4 +65,12 @@ object StringUtil {

start(0)
}

/**
* Remove all utf8mb4 exclusive characters (emoji) from the given string.
* @param in String to clean
* @return String with all utf8mb4 exclusive characters removed
*/
def cleanUtf8mb4(in: String): String =
in.replaceAll(utf8mb4Regex, utf8mb3Replacement)
}

0 comments on commit 9a411cf

Please sign in to comment.