diff --git a/cromwell.example.backends/cromwell.examples.conf b/cromwell.example.backends/cromwell.examples.conf index 4ca250d1202..00f99d2ff5a 100644 --- a/cromwell.example.backends/cromwell.examples.conf +++ b/cromwell.example.backends/cromwell.examples.conf @@ -498,6 +498,11 @@ services { # # count against this limit. # metadata-read-row-number-safety-threshold = 1000000 # + # # Remove any UTF-8 mb4 (4 byte) characters from metadata keys in the list. + # # These characters (namely emojis) will cause metadata writing to fail in database collations + # # that do not support 4 byte UTF-8 characters. + # metadata-keys-to-sanitize-utf8mb4 = ["submittedFiles:workflow", "commandLine"] + # # metadata-write-statistics { # # Not strictly necessary since the 'metadata-write-statistics' section itself is enough for statistics to be recorded. # # However, this can be set to 'false' to disable statistics collection without deleting the section. diff --git a/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala index 5b45d7bb1e0..bcf6346b601 100644 --- a/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala +++ b/services/src/main/scala/cromwell/services/metadata/impl/MetadataServiceActor.scala @@ -110,12 +110,14 @@ case class MetadataServiceActor(serviceConfig: Config, globalConfig: Config, ser val metadataWriteStatisticsConfig = MetadataStatisticsRecorderSettings( serviceConfig.as[Option[Config]]("metadata-write-statistics") ) + val metadataKeysToClean = serviceConfig.getOrElse[List[String]]("metadata-keys-to-sanitize-utf8mb4", List()) val writeActor = context.actorOf( WriteMetadataActor.props(dbBatchSize, dbFlushRate, serviceRegistryActor, LoadConfig.MetadataWriteThreshold, - metadataWriteStatisticsConfig + metadataWriteStatisticsConfig, + metadataKeysToClean ), "WriteMetadataActor" ) diff --git a/services/src/main/scala/cromwell/services/metadata/impl/WriteMetadataActor.scala b/services/src/main/scala/cromwell/services/metadata/impl/WriteMetadataActor.scala index 16bf344ac6f..82d2ee2a12f 100644 --- a/services/src/main/scala/cromwell/services/metadata/impl/WriteMetadataActor.scala +++ b/services/src/main/scala/cromwell/services/metadata/impl/WriteMetadataActor.scala @@ -6,10 +6,11 @@ import cromwell.core.Dispatcher.ServiceDispatcher import cromwell.core.Mailbox.PriorityMailbox import cromwell.core.WorkflowId import cromwell.core.instrumentation.InstrumentationPrefixes -import cromwell.services.metadata.MetadataEvent +import cromwell.services.metadata.{MetadataEvent, MetadataValue} import cromwell.services.metadata.MetadataService._ import cromwell.services.metadata.impl.MetadataStatisticsRecorder.MetadataStatisticsRecorderSettings import cromwell.services.{EnhancedBatchActor, MetadataServicesStore} +import wdl.util.StringUtil import scala.concurrent.duration._ import scala.util.{Failure, Success} @@ -18,7 +19,8 @@ class WriteMetadataActor(override val batchSize: Int, override val flushRate: FiniteDuration, override val serviceRegistryActor: ActorRef, override val threshold: Int, - metadataStatisticsRecorderSettings: MetadataStatisticsRecorderSettings + metadataStatisticsRecorderSettings: MetadataStatisticsRecorderSettings, + metadataKeysToClean: List[String] ) extends EnhancedBatchActor[MetadataWriteAction](flushRate, batchSize) with ActorLogging with MetadataDatabaseAccess @@ -27,9 +29,10 @@ class WriteMetadataActor(override val batchSize: Int, private val statsRecorder = MetadataStatisticsRecorder(metadataStatisticsRecorderSettings) override def process(e: NonEmptyVector[MetadataWriteAction]) = instrumentedProcess { + val cleanedMetadataWriteActions = if (metadataKeysToClean.isEmpty) e else sanitizeInputs(e) val empty = (Vector.empty[MetadataEvent], List.empty[(Iterable[MetadataEvent], ActorRef)]) - val (putWithoutResponse, putWithResponse) = e.foldLeft(empty) { + val (putWithoutResponse, putWithResponse) = cleanedMetadataWriteActions.foldLeft(empty) { case ((putEvents, putAndRespondEvents), action: PutMetadataAction) => (putEvents ++ action.events, putAndRespondEvents) case ((putEvents, putAndRespondEvents), action: PutMetadataActionAndRespond) => @@ -46,7 +49,7 @@ class WriteMetadataActor(override val batchSize: Int, case Success(_) => putWithResponse foreach { case (ev, replyTo) => replyTo ! MetadataWriteSuccess(ev) } case Failure(cause) => - val (outOfTries, stillGood) = e.toVector.partition(_.maxAttempts <= 1) + val (outOfTries, stillGood) = cleanedMetadataWriteActions.toVector.partition(_.maxAttempts <= 1) handleOutOfTries(outOfTries, cause) handleEventsToReconsider(stillGood) @@ -55,6 +58,23 @@ class WriteMetadataActor(override val batchSize: Int, dbAction.map(_ => allPutEvents.size) } + def sanitizeInputs( + metadataWriteActions: NonEmptyVector[MetadataWriteAction] + ): NonEmptyVector[MetadataWriteAction] = + metadataWriteActions.map { metadataWriteAction => + val metadataEvents = + metadataWriteAction.events.map { event => + event.value match { + case Some(eventVal) => event.copy(value = Option(MetadataValue(StringUtil.cleanUtf8mb4(eventVal.value)))) + case None => event + } + } + metadataWriteAction match { + case action: PutMetadataAction => action.copy(events = metadataEvents) + case actionAndResp: PutMetadataActionAndRespond => actionAndResp.copy(events = metadataEvents) + } + } + private def countActionsByWorkflow(writeActions: Vector[MetadataWriteAction]): Map[WorkflowId, Int] = writeActions.flatMap(_.events).groupBy(_.key.workflowId).map { case (k, v) => k -> v.size } @@ -106,9 +126,18 @@ object WriteMetadataActor { flushRate: FiniteDuration, serviceRegistryActor: ActorRef, threshold: Int, - statisticsRecorderSettings: MetadataStatisticsRecorderSettings + statisticsRecorderSettings: MetadataStatisticsRecorderSettings, + metadataKeysToClean: List[String] ): Props = - Props(new WriteMetadataActor(dbBatchSize, flushRate, serviceRegistryActor, threshold, statisticsRecorderSettings)) + Props( + new WriteMetadataActor(dbBatchSize, + flushRate, + serviceRegistryActor, + threshold, + statisticsRecorderSettings, + metadataKeysToClean + ) + ) .withDispatcher(ServiceDispatcher) .withMailbox(PriorityMailbox) } diff --git a/services/src/test/scala/cromwell/services/metadata/impl/WriteMetadataActorBenchmark.scala b/services/src/test/scala/cromwell/services/metadata/impl/WriteMetadataActorBenchmark.scala index 1a4783dee87..fd2fb5b3d4e 100644 --- a/services/src/test/scala/cromwell/services/metadata/impl/WriteMetadataActorBenchmark.scala +++ b/services/src/test/scala/cromwell/services/metadata/impl/WriteMetadataActorBenchmark.scala @@ -46,7 +46,7 @@ class WriteMetadataActorBenchmark extends TestKitSuite with AnyFlatSpecLike with it should "provide good throughput" taggedAs IntegrationTest in { val writeActor = - TestFSMRef(new WriteMetadataActor(1000, 5.seconds, registry, Int.MaxValue, MetadataStatisticsDisabled) { + TestFSMRef(new WriteMetadataActor(1000, 5.seconds, registry, Int.MaxValue, MetadataStatisticsDisabled, List()) { override val metadataDatabaseInterface: MetadataSlickDatabase = dataAccess }) diff --git a/services/src/test/scala/cromwell/services/metadata/impl/WriteMetadataActorSpec.scala b/services/src/test/scala/cromwell/services/metadata/impl/WriteMetadataActorSpec.scala index 9b9e4f75ccd..f8371928cd0 100644 --- a/services/src/test/scala/cromwell/services/metadata/impl/WriteMetadataActorSpec.scala +++ b/services/src/test/scala/cromwell/services/metadata/impl/WriteMetadataActorSpec.scala @@ -38,7 +38,7 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc it should "process jobs in the correct batch sizes" in { val registry = TestProbe().ref - val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue) { + val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List()) { override val metadataDatabaseInterface = mockDatabaseInterface(0) }) @@ -71,9 +71,10 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc failuresBetweenSuccessValues foreach { failureRate => it should s"succeed metadata writes and respond to all senders even with $failureRate failures between each success" in { val registry = TestProbe().ref - val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue) { - override val metadataDatabaseInterface = mockDatabaseInterface(failureRate) - }) + val writeActor = + TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List()) { + override val metadataDatabaseInterface = mockDatabaseInterface(failureRate) + }) def metadataEvent(index: Int, probe: ActorRef) = PutMetadataActionAndRespond(List( @@ -111,7 +112,7 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc it should s"fail metadata writes and respond to all senders with failures" in { val registry = TestProbe().ref - val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue) { + val writeActor = TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List()) { override val metadataDatabaseInterface = mockDatabaseInterface(100) }) @@ -146,6 +147,90 @@ class WriteMetadataActorSpec extends TestKitSuite with AnyFlatSpecLike with Matc writeActor.stop() } + it should s"test removing emojis from metadata works as expected" in { + val registry = TestProbe().ref + val writeActor = + TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List("metadata_key")) { + override val metadataDatabaseInterface = mockDatabaseInterface(100) + }) + + def metadataEvent(index: Int, probe: ActorRef) = PutMetadataActionAndRespond( + List( + MetadataEvent(MetadataKey(WorkflowId.randomId(), None, "metadata_key"), MetadataValue(s"🎉_$index")) + ), + probe + ) + + val probes = (0 until 43) + .map { _ => + val probe = TestProbe() + probe + } + .zipWithIndex + .map { case (probe, index) => + probe -> metadataEvent(index, probe.ref) + } + + val metadataWriteActions = probes.map(probe => probe._2).toVector + val metadataWriteActionNE = NonEmptyVector(metadataWriteActions.head, metadataWriteActions.tail) + + val sanitizedWriteActions = writeActor.underlyingActor.sanitizeInputs(metadataWriteActionNE) + + sanitizedWriteActions.map { writeAction => + writeAction.events.map { event => + if (event.value.getOrElse(fail("Removed value from metadata event")).value.matches("[\\x{10000}-\\x{FFFFF}]")) { + fail("Metadata event contains emoji") + } + + if (!event.value.getOrElse(fail("Removed value from metadata event")).value.contains("\uFFFD")) { + fail("Incorrect character used to replace emoji") + } + } + } + } + + it should s"test removing emojis from metadata which doesn't contain emojis returns the string" in { + val registry = TestProbe().ref + val writeActor = + TestFSMRef(new BatchSizeCountingWriteMetadataActor(10, 10.millis, registry, Int.MaxValue, List("metadata_key")) { + override val metadataDatabaseInterface = mockDatabaseInterface(100) + }) + + def metadataEvent(index: Int, probe: ActorRef) = PutMetadataActionAndRespond( + List( + MetadataEvent(MetadataKey(WorkflowId.randomId(), None, "metadata_key"), MetadataValue(s"hello_$index")) + ), + probe + ) + + val probes = (0 until 43) + .map { _ => + val probe = TestProbe() + probe + } + .zipWithIndex + .map { case (probe, index) => + probe -> metadataEvent(index, probe.ref) + } + + val metadataWriteActions = probes.map(probe => probe._2).toVector + val metadataWriteActionNE = NonEmptyVector(metadataWriteActions.head, metadataWriteActions.tail) + + val sanitizedWriteActions = writeActor.underlyingActor.sanitizeInputs(metadataWriteActionNE) + + sanitizedWriteActions.map { writeAction => + writeAction.events.map { event => + if (event.value.getOrElse(fail("Removed value from metadata event")).value.matches("[\\x{10000}-\\x{FFFFF}]")) { + fail("Metadata event contains emoji") + } + + if (event.value.getOrElse(fail("Removed value from metadata event")).value.contains("\uFFFD")) { + fail("Incorrectly replaced character in metadata event") + } + } + } + } + // Mock database interface. // A customizable number of failures occur between each success def mockDatabaseInterface(failuresBetweenEachSuccess: Int) = new MetadataSqlDatabase with SqlDatabase { @@ -382,8 +467,15 @@ object WriteMetadataActorSpec { class BatchSizeCountingWriteMetadataActor(override val batchSize: Int, override val flushRate: FiniteDuration, override val serviceRegistryActor: ActorRef, - override val threshold: Int - ) extends WriteMetadataActor(batchSize, flushRate, serviceRegistryActor, threshold, MetadataStatisticsDisabled) { + override val threshold: Int, + val metadataKeysToClean: List[String] + ) extends WriteMetadataActor(batchSize, + flushRate, + serviceRegistryActor, + threshold, + MetadataStatisticsDisabled, + metadataKeysToClean + ) { var batchSizes: Vector[Int] = Vector.empty var failureCount: Int = 0 diff --git a/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesUtilityConversions.scala b/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesUtilityConversions.scala index a1e08007b05..486a7ad28b1 100644 --- a/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesUtilityConversions.scala +++ b/supportedBackends/google/pipelines/v2beta/src/main/scala/cromwell/backend/google/pipelines/v2beta/PipelinesUtilityConversions.scala @@ -8,6 +8,7 @@ import cromwell.core.ExecutionEvent import cromwell.core.logging.JobLogger import mouse.all._ import PipelinesUtilityConversions._ +import wdl.util.StringUtil import scala.language.postfixOps @@ -67,7 +68,7 @@ trait PipelinesUtilityConversions { // characters (typically emoji). Some databases have trouble storing these; replace them with the standard // "unknown character" unicode symbol. val name = Option(event.getContainerStopped) match { - case Some(_) => cleanUtf8mb4(event.getDescription) + case Some(_) => StringUtil.cleanUtf8mb4(event.getDescription) case _ => event.getDescription } @@ -101,9 +102,4 @@ object PipelinesUtilityConversions { None } } - - lazy val utf8mb4Regex = "[\\x{10000}-\\x{FFFFF}]" - lazy val utf8mb3Replacement = "\uFFFD" // This is the standard char for replacing invalid/unknown unicode chars - def cleanUtf8mb4(in: String): String = - in.replaceAll(utf8mb4Regex, utf8mb3Replacement) } diff --git a/supportedBackends/google/pipelines/v2beta/src/test/scala/cromwell/backend/google/pipelines/v2beta/PipelinesUtilityConversionsSpec.scala b/supportedBackends/google/pipelines/v2beta/src/test/scala/cromwell/backend/google/pipelines/v2beta/PipelinesUtilityConversionsSpec.scala deleted file mode 100644 index eeef482a740..00000000000 --- a/supportedBackends/google/pipelines/v2beta/src/test/scala/cromwell/backend/google/pipelines/v2beta/PipelinesUtilityConversionsSpec.scala +++ /dev/null @@ -1,26 +0,0 @@ -package cromwell.backend.google.pipelines.v2beta - -import org.scalatest.flatspec.AnyFlatSpec -import org.scalatest.matchers.should.Matchers - -class PipelinesUtilityConversionsSpec extends AnyFlatSpec with Matchers { - behavior of "PipelinesUtilityConversions" - - it should "not modify strings that contain only ascii characters" in { - val input = "hi there!?" - PipelinesUtilityConversions.cleanUtf8mb4(input) shouldBe input - } - - it should "not modify strings with 3-byte unicode characters" in { - val input = "Here is my non-ascii character: \u1234 Do you like it?" - PipelinesUtilityConversions.cleanUtf8mb4(input) shouldBe input - } - - it should "replace 4-byte unicode characters" in { - val cry = new String(Character.toChars(Integer.parseInt("1F62D", 16))) - val barf = new String(Character.toChars(Integer.parseInt("1F92E", 16))) - val input = s"When I try to put an emoji in the database it $barf and then I $cry" - val cleaned = "When I try to put an emoji in the database it \uFFFD and then I \uFFFD" - PipelinesUtilityConversions.cleanUtf8mb4(input) shouldBe cleaned - } -} diff --git a/wdl/model/draft2/src/test/scala/wdl/util/StringUtilSpec.scala b/wdl/model/draft2/src/test/scala/wdl/util/StringUtilSpec.scala index 7c466aca30e..b3ccadc9fc6 100644 --- a/wdl/model/draft2/src/test/scala/wdl/util/StringUtilSpec.scala +++ b/wdl/model/draft2/src/test/scala/wdl/util/StringUtilSpec.scala @@ -70,4 +70,21 @@ class StringUtilSpec extends AnyFlatSpec with CromwellTimeoutSpec with Matchers } } + it should "not modify strings that contain only ascii characters" in { + val input = "hi there!?" + StringUtil.cleanUtf8mb4(input) shouldBe input + } + + it should "not modify strings with 3-byte unicode characters" in { + val input = "Here is my non-ascii character: \u1234 Do you like it?" + StringUtil.cleanUtf8mb4(input) shouldBe input + } + + it should "replace 4-byte unicode characters" in { + val cry = new String(Character.toChars(Integer.parseInt("1F62D", 16))) + val barf = new String(Character.toChars(Integer.parseInt("1F92E", 16))) + val input = s"When I try to put an emoji in the database it $barf and then I $cry" + val cleaned = "When I try to put an emoji in the database it \uFFFD and then I \uFFFD" + StringUtil.cleanUtf8mb4(input) shouldBe cleaned + } } diff --git a/wom/src/main/scala/wdl/util/StringUtil.scala b/wom/src/main/scala/wdl/util/StringUtil.scala index f89c4fd73da..03821457be5 100644 --- a/wom/src/main/scala/wdl/util/StringUtil.scala +++ b/wom/src/main/scala/wdl/util/StringUtil.scala @@ -8,6 +8,8 @@ import scala.annotation.tailrec * WOMmy TaskDefinition. That should get straightened out. */ object StringUtil { val Ws = Pattern.compile("[\\ \\t]+") + val utf8mb4Regex = "[\\x{10000}-\\x{FFFFF}]" + val utf8mb3Replacement = "\uFFFD" // This is the standard char for replacing /** * 1) Remove all leading newline chars @@ -63,4 +65,12 @@ object StringUtil { start(0) } + + /** + * Remove all utf8mb4 exclusive characters (emoji) from the given string. + * @param in String to clean + * @return String with all utf8mb4 exclusive characters removed + */ + def cleanUtf8mb4(in: String): String = + in.replaceAll(utf8mb4Regex, utf8mb3Replacement) }