From 783496952ff87b6a864a5e51d3da1658fd9437b9 Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Mon, 29 Jul 2024 21:03:20 -0700 Subject: [PATCH 1/2] eval: add a rewrite module to handle queries with namespaces. If configured via `atlas.eval.stream.rewrite-url`, data source URIs are sent to a rewriting service. Responses with errors are then dropped and the caller notified. Valid rewrites continue processing. --- .../eval/stream/DataSourceRewriter.scala | 130 +++++++++ .../atlas/eval/stream/EvaluatorImpl.scala | 27 +- .../atlas/eval/stream/HostSource.scala | 2 +- .../netflix/atlas/eval/stream/package.scala | 3 + .../eval/stream/DataSourceRewriterSuite.scala | 272 ++++++++++++++++++ .../atlas/eval/stream/EvaluatorSuite.scala | 96 +++++++ build.sbt | 4 +- 7 files changed, 529 insertions(+), 5 deletions(-) create mode 100644 atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala create mode 100644 atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala new file mode 100644 index 000000000..b5e883741 --- /dev/null +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala @@ -0,0 +1,130 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.stream + +import com.netflix.atlas.eval.stream.Evaluator.DataSource +import com.netflix.atlas.eval.stream.Evaluator.DataSources +import com.netflix.atlas.eval.stream.HostSource.unzipIfNeeded +import com.netflix.atlas.json.Json +import com.netflix.atlas.pekko.DiagnosticMessage +import com.netflix.atlas.pekko.PekkoHttpClient +import com.typesafe.config.Config +import com.typesafe.scalalogging.StrictLogging +import org.apache.pekko.NotUsed +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.model.ContentTypes +import org.apache.pekko.http.scaladsl.model.HttpEntity +import org.apache.pekko.http.scaladsl.model.HttpMethods +import org.apache.pekko.http.scaladsl.model.HttpRequest +import org.apache.pekko.http.scaladsl.model.StatusCodes +import org.apache.pekko.stream.scaladsl.Flow + +import java.io.ByteArrayOutputStream +import scala.jdk.CollectionConverters.CollectionHasAsScala +import scala.util.Failure +import scala.util.Success +import scala.util.Using + +class DataSourceRewriter(config: Config, implicit val system: ActorSystem) extends StrictLogging { + + private val (enabled, rewriteUrl) = { + val enabled = config.hasPath("atlas.eval.stream.rewrite-url") + val url = if (enabled) config.getString("atlas.eval.stream.rewrite-url") else "" + if (enabled) { + logger.info(s"Rewriting enabled with url: ${url}") + } else { + logger.info("Rewriting is disabled") + } + (enabled, url) + } + + private val client = PekkoHttpClient + .create("datasource-rewrite", system) + .superPool[List[DataSource]]() + + def rewrite(context: StreamContext): Flow[DataSources, DataSources, NotUsed] = { + rewrite(client, context) + } + + def rewrite( + client: SuperPoolClient, + context: StreamContext + ): Flow[DataSources, DataSources, NotUsed] = { + if (!enabled) { + return Flow[DataSources] + } + + Flow[DataSources] + .map(_.sources().asScala.toList) + .map(dsl => constructRequest(dsl) -> dsl) + .via(client) + .flatMapConcat { + case (Success(resp), dsl) => + unzipIfNeeded(resp) + .map(_.utf8String) + .map { body => + resp.status match { + case StatusCodes.OK => + val rewrites = List.newBuilder[DataSource] + Json + .decode[List[Rewrite]](body) + .zip(dsl) + .map { + case (r, ds) => + if (!r.status.equals("OK")) { + val msg = + DiagnosticMessage.error(s"failed rewrite of ${ds.uri()}: ${r.message}") + context.dsLogger(ds, msg) + } else { + rewrites += new DataSource(ds.id, ds.step(), r.rewrite) + } + } + .toArray + // NOTE: We're assuming that the number of items returned will be the same as the + // number of uris sent to the rewrite service. If they differ, data sources may be + // mapped to IDs and steps incorrectly. + DataSources.of(rewrites.result().toArray: _*) + case _ => + logger.error( + "Exception from rewrite service. status={}, resp={}", + resp.status, + body + ) + throw new RuntimeException(body) + } + } + case (Failure(ex), _) => + throw ex + } + } + + private[stream] def constructRequest(dss: List[DataSource]): HttpRequest = { + val baos = new ByteArrayOutputStream + Using(Json.newJsonGenerator(baos)) { json => + json.writeStartArray() + dss.foreach(s => json.writeString(s.uri())) + json.writeEndArray() + } + HttpRequest( + uri = rewriteUrl, + method = HttpMethods.POST, + entity = HttpEntity(ContentTypes.`application/json`, baos.toByteArray) + ) + } + +} + +case class Rewrite(status: String, rewrite: String, original: String, message: String) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala index e184247b6..ba66a5113 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala @@ -60,6 +60,7 @@ import com.netflix.atlas.eval.stream.Evaluator.MessageEnvelope import com.netflix.atlas.json.Json import com.netflix.atlas.json.JsonSupport import com.netflix.atlas.pekko.ClusterOps +import com.netflix.atlas.pekko.DiagnosticMessage import com.netflix.atlas.pekko.StreamOps import com.netflix.atlas.pekko.ThreadPools import com.netflix.spectator.api.Registry @@ -86,8 +87,11 @@ private[stream] abstract class EvaluatorImpl( private val logger = LoggerFactory.getLogger(getClass) + // Calls out to a rewrite service in case URIs need mutating to pick the proper backend. + private[stream] var dataSourceRewriter = new DataSourceRewriter(config, system) + // Cached context instance used for things like expression validation. - private val validationStreamContext = newStreamContext() + private val validationStreamContext = newStreamContext(new ThrowingDSLogger) // Timeout for DataSources unique operator: emit repeating DataSources after timeout exceeds private val uniqueTimeout: Long = config.getDuration("atlas.eval.stream.unique-timeout").toMillis @@ -129,7 +133,13 @@ private[stream] abstract class EvaluatorImpl( } protected def validateImpl(ds: DataSource): Unit = { - validationStreamContext.validateDataSource(ds).get + val future = Source + .single(DataSources.of(ds)) + .via(dataSourceRewriter.rewrite(validationStreamContext)) + .map(_.sources().asScala.map(validationStreamContext.validateDataSource).map(_.get)) + .toMat(Sink.head)(Keep.right) + .run() + Await.result(future, 60.seconds) } protected def writeInputToFileImpl(uri: String, file: Path, duration: Duration): Unit = { @@ -212,6 +222,7 @@ private[stream] abstract class EvaluatorImpl( def createStreamsFlow: Flow[DataSources, MessageEnvelope, NotUsed] = { val (logSrc, context) = createStreamContextSource Flow[DataSources] + .via(dataSourceRewriter.rewrite(context)) .map(dss => groupByHost(dss)) // Emit empty DataSource if no more DataSource for a host, so that the sub-stream get the info .via(new FillRemovedKeysWith[String, DataSources](_ => DataSources.empty())) @@ -581,4 +592,16 @@ private[stream] abstract class EvaluatorImpl( private def isPrintable(c: Int): Boolean = { c >= 32 && c < 127 } + + private class ThrowingDSLogger extends DataSourceLogger { + + override def apply(ds: DataSource, msg: JsonSupport): Unit = { + msg match { + case dsg: DiagnosticMessage => + throw new IllegalArgumentException(dsg.message) + } + } + + override def close(): Unit = {} + } } diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/HostSource.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/HostSource.scala index 357009424..0f44617dd 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/HostSource.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/HostSource.scala @@ -103,7 +103,7 @@ private[stream] object HostSource extends StrictLogging { Source.empty[ByteString] } - private def unzipIfNeeded(res: HttpResponse): Source[ByteString, Any] = { + def unzipIfNeeded(res: HttpResponse): Source[ByteString, Any] = { val isCompressed = res.headers.contains(`Content-Encoding`(HttpEncodings.gzip)) val dataBytes = res.entity.withoutSizeLimit().dataBytes if (isCompressed) dataBytes.via(Compression.gunzip()) else dataBytes diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/package.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/package.scala index bb68511fc..d065fd840 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/package.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/package.scala @@ -30,5 +30,8 @@ package object stream { type SimpleClient = Flow[HttpRequest, Try[HttpResponse], NotUsed] + type SuperPoolClient = + Flow[(HttpRequest, List[DataSource]), (Try[HttpResponse], List[DataSource]), NotUsed] + type SourcesAndGroups = (DataSources, EddaSource.Groups) } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala new file mode 100644 index 000000000..066e4d13b --- /dev/null +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala @@ -0,0 +1,272 @@ +/* + * Copyright 2014-2024 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.netflix.atlas.eval.stream + +import com.fasterxml.jackson.databind.JsonMappingException +import com.netflix.atlas.eval.stream.Evaluator.DataSource +import com.netflix.atlas.eval.stream.Evaluator.DataSources +import com.netflix.atlas.json.Json +import com.netflix.atlas.json.JsonSupport +import com.netflix.atlas.pekko.PekkoHttpClient +import com.typesafe.config.Config +import com.typesafe.config.ConfigFactory +import com.typesafe.config.ConfigValueFactory +import munit.FunSuite +import org.apache.pekko.NotUsed +import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.http.scaladsl.model.ContentTypes +import org.apache.pekko.http.scaladsl.model.HttpEntity +import org.apache.pekko.http.scaladsl.model.HttpRequest +import org.apache.pekko.http.scaladsl.model.HttpResponse +import org.apache.pekko.http.scaladsl.model.StatusCode +import org.apache.pekko.stream.Materializer +import org.apache.pekko.stream.scaladsl.Flow +import org.apache.pekko.stream.scaladsl.Keep +import org.apache.pekko.stream.scaladsl.Sink +import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.testkit.TestKitBase + +import java.time.Duration +import scala.concurrent.Await +import scala.concurrent.Future +import scala.concurrent.duration.DurationInt +import scala.util.Failure +import scala.util.Success +import scala.util.Try + +class DataSourceRewriterSuite extends FunSuite with TestKitBase { + + val dss = DataSources.of( + new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq"), + new DataSource( + "bar", + Duration.ofSeconds(60), + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and" + ) + ) + + var config: Config = _ + var logger: MockLogger = _ + var ctx: StreamContext = null + + override implicit def system: ActorSystem = ActorSystem("Test") + + override def beforeEach(context: BeforeEach): Unit = { + config = ConfigFactory + .load() + .withValue( + "atlas.eval.stream.rewrite-url", + ConfigValueFactory.fromAnyRef("http://localhost/api/v1/rewrite") + ) + logger = new MockLogger() + ctx = new StreamContext(config, Materializer(system), dsLogger = logger) + } + + test("rewrite: Disabled") { + config = ConfigFactory.load() + val obtained = rewrite(dss, null) + assertEquals(obtained, dss) + } + + test("rewrite: OK") { + val client = mockClient( + StatusCode.int2StatusCode(200), + Map( + "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( + "OK", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "" + ), + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and" -> Rewrite( + "OK", + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq", + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and", + "" + ) + ) + ) + val expected = DataSources.of( + new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq"), + new DataSource( + "bar", + Duration.ofSeconds(60), + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq" + ) + ) + val obtained = rewrite(dss, client) + assertEquals(obtained, expected) + ctx.dsLogger.asInstanceOf[MockLogger].assertSize(0) + } + + test("parseRewrites: Bad URI in datasources") { + val client = mockClient( + StatusCode.int2StatusCode(200), + Map( + "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( + "OK", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "" + ), + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and" -> Rewrite( + "NOT_FOUND", + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq", + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq", + "No namespace found for seg" + ) + ) + ) + val expected = DataSources.of( + new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq") + ) + val obtained = rewrite(dss, client) + assertEquals(obtained, expected) + ctx.dsLogger.asInstanceOf[MockLogger].assertSize(1) + } + + test("parseRewrites: Malformed response JSON") { + val client = mockClient( + StatusCode.int2StatusCode(200), + Map( + "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( + "OK", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "" + ), + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and" -> Rewrite( + "OK", + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq", + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and", + "" + ) + ), + malformed = true + ) + intercept[JsonMappingException] { + rewrite(dss, client) + } + } + + test("parseRewrites: 500") { + val client = mockClient( + StatusCode.int2StatusCode(500), + Map.empty + ) + intercept[RuntimeException] { + rewrite(dss, client) + } + } + + test("parseRewrites: Missing a rewrite") { + val client = mockClient( + StatusCode.int2StatusCode(200), + Map( + "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( + "OK", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "" + ) + ) + ) + val expected = DataSources.of( + new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq") + ) + val obtained = rewrite(dss, client) + assertEquals(obtained, expected) + ctx.dsLogger.asInstanceOf[MockLogger].assertSize(1) + } + + def mockClient( + status: StatusCode, + response: Map[String, Rewrite], + returnEx: Option[Exception] = None, + malformed: Boolean = false + ): SuperPoolClient = { + returnEx + .map { ex => + PekkoHttpClient.create(Failure(ex)).superPool[List[DataSource]]() + } + .getOrElse { + new MockClient(status, response, malformed).superPool[List[DataSource]]() + } + } + + def rewrite(dss: DataSources, client: SuperPoolClient): DataSources = { + val rewriter = new DataSourceRewriter(config, system) + val future = Source + .single(dss) + .via( + rewriter.rewrite(client, ctx) + ) + .toMat(Sink.head)(Keep.right) + .run() + Await.result(future, 30.seconds) + } + + class MockClient(status: StatusCode, response: Map[String, Rewrite], malformed: Boolean = false) + extends PekkoHttpClient { + + override def singleRequest(request: HttpRequest): Future[HttpResponse] = ??? + + override def superPool[C]( + config: PekkoHttpClient.ClientConfig + ): Flow[(HttpRequest, C), (Try[HttpResponse], C), NotUsed] = { + Flow[(HttpRequest, C)] + .flatMapConcat { + case (req, context) => + req.entity.withoutSizeLimit().dataBytes.map { body => + val uris = Json.decode[List[String]](body.toArray) + val rewrites = uris.map { uri => + response.get(uri) match { + case Some(r) => r + case None => Rewrite("NOT_FOUND", uri, uri, "Empty") + } + } + + val json = + if (malformed) Json.encode(rewrites).substring(0, 25) else Json.encode(rewrites) + + val httpResp = + HttpResponse( + status, + entity = HttpEntity(ContentTypes.`application/json`, json) + ) + Success(httpResp) -> context + } + } + } + } + + class MockLogger extends DataSourceLogger { + + var tuples: List[(DataSource, JsonSupport)] = Nil + + override def apply(ds: DataSource, msg: JsonSupport): Unit = { + tuples = (ds, msg) :: tuples + } + + override def close(): Unit = { + tuples = Nil + } + + def assertSize(n: Int): Unit = { + assertEquals(tuples.size, n) + } + } +} diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala index c4ab5efc0..be42ece78 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala @@ -35,6 +35,7 @@ import com.netflix.atlas.eval.model.LwcHeartbeat import com.netflix.atlas.eval.model.LwcMessages import com.netflix.atlas.eval.model.LwcSubscription import com.netflix.atlas.eval.model.TimeSeriesMessage +import com.netflix.atlas.eval.stream.Evaluator.DataSources import com.netflix.atlas.eval.stream.Evaluator.MessageEnvelope import com.netflix.atlas.json.Json import com.netflix.atlas.json.JsonSupport @@ -45,10 +46,14 @@ import com.typesafe.config.ConfigValueFactory import nl.jqno.equalsverifier.EqualsVerifier import nl.jqno.equalsverifier.Warning import munit.FunSuite +import org.apache.pekko.NotUsed import java.nio.file.Path import scala.concurrent.Await +import scala.concurrent.Future import scala.concurrent.Promise +import scala.concurrent.duration.DurationInt +import scala.jdk.CollectionConverters.SetHasAsScala import scala.util.Success import scala.util.Using @@ -365,6 +370,56 @@ class EvaluatorSuite extends FunSuite { testError(ds1, msg) } + def testRewrite(skipOne: Boolean = false, throwEx: Boolean = false): Unit = { + val evaluator = new Evaluator(config, registry, system) + evaluator.dataSourceRewriter = new UTRewriter(skipOne, throwEx) + + val baseUri = "resource:///gc-pause.dat" + val uri = s"$baseUri?q=name,jvm.gc.pause,:eq,:dist-max,(,nf.asg,nf.node,),:by" + val ds1 = Evaluator.DataSources.of(ds("one", uri), ds("two", uri)) + val sourceRef = EvaluationFlows.stoppableSource( + Source + .single(ds1) + .via(Flow.fromProcessor(() => evaluator.createStreamsProcessor())) + ) + + val oneCount = new AtomicInteger() + val twoCount = new AtomicInteger() + val sink = Sink.foreach[Evaluator.MessageEnvelope] { msg => + msg.id match { + case "one" => oneCount.incrementAndGet() + case "two" => + if (skipOne && msg.message.isInstanceOf[DiagnosticMessage]) twoCount.incrementAndGet() + else twoCount.incrementAndGet() + } + if (oneCount.get() > 0 && twoCount.get() > 0) sourceRef.stop() + } + + val future = sourceRef.source + .toMat(sink)(Keep.right) + .run() + + if (throwEx) { + intercept[RuntimeException] { + Await.result(future, 1.minute) + } + } else { + Await.result(future, 1.minute) + } + } + + test("create processor, rewrites ok") { + testRewrite() + } + + test("create processor, one of two failed rewrite") { + testRewrite(skipOne = true) + } + + test("create processor, rewrite call failed") { + testRewrite(throwEx = true) + } + test("processor handles multiple steps") { val evaluator = new Evaluator(config, registry, system) @@ -604,6 +659,18 @@ class EvaluatorSuite extends FunSuite { ) } + test("validate: invalid rewrite") { + val evaluator = new Evaluator(config, registry, system) + evaluator.dataSourceRewriter = new UTRewriter(true) + val ds = new Evaluator.DataSource( + "test", + s"synthetic://test/?q=nf.ns,none,:eq,name,foo,:eq,:and" + ) + intercept[IllegalArgumentException] { + evaluator.validate(ds) + } + } + private def invalidHiResQuery(expr: String): Unit = { val evaluator = new Evaluator(config, registry, system) val ds = new Evaluator.DataSource( @@ -791,4 +858,33 @@ class EvaluatorSuite extends FunSuite { val result = Await.result(future, scala.concurrent.duration.Duration.Inf) assertEquals(result.size, 10) } + + class UTRewriter(skipOne: Boolean = false, throwEx: Boolean = false) + extends DataSourceRewriter(config, system) { + + override def rewrite(context: StreamContext): Flow[DataSources, DataSources, NotUsed] = { + if (throwEx) { + Flow[DataSources] + .map(_ => throw new RuntimeException("test")) + } else if (skipOne) { + // mimic dropping one. + Flow[DataSources] + .map { dss => + val dsl = dss.sources().asScala.toList + dsl.size match { + case 1 => + val msg = DiagnosticMessage.error(s"failed rewrite") + context.dsLogger(dsl(0), msg) + DataSources.empty() + case 2 => + val msg = DiagnosticMessage.error(s"failed rewrite") + context.dsLogger(dsl(1), msg) + DataSources.of(dsl(0)) + } + } + } else { + Flow[DataSources] + } + } + } } diff --git a/build.sbt b/build.sbt index 26a4b7948..1b7df2295 100644 --- a/build.sbt +++ b/build.sbt @@ -68,10 +68,10 @@ lazy val `atlas-eval` = project .configure(BuildSettings.profile) .dependsOn(`atlas-pekko`, `atlas-chart`, `atlas-core`) .settings(libraryDependencies ++= Seq( + Dependencies.equalsVerifier % "test", Dependencies.pekkoHttpTestkit % "test", Dependencies.pekkoStreamTestkit % "test", - Dependencies.pekkoTestkit % "test", - Dependencies.equalsVerifier % "test" + Dependencies.pekkoTestkit % "test" )) lazy val `atlas-jmh` = project From 6cb347a7e4c53fa19b6a151e4fcf1f8550cee8cf Mon Sep 17 00:00:00 2001 From: Chris Larsen Date: Wed, 14 Aug 2024 15:18:08 -0700 Subject: [PATCH 2/2] eval: Add caching and continuous retries to the DataSourceRewriter. Some uses, e.g. the atlas-stream case, entail updating the data sources for a flow when new subscriptions are added or existing subscriptions dropped. This will return cached rewrites for individual queries in the event the rewrite backend is unavailable. Calls will continue for the rewrite service until a successful response is received, at which point it will update the cache and forward the rewrites downstream. f --- .../eval/stream/DataSourceRewriter.scala | 123 +++++++++- .../atlas/eval/stream/EvaluatorImpl.scala | 4 +- .../eval/stream/DataSourceRewriterSuite.scala | 223 ++++++++++++++---- .../atlas/eval/stream/EvaluatorSuite.scala | 102 +++++++- 4 files changed, 397 insertions(+), 55 deletions(-) diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala index b5e883741..5bd74bb18 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/DataSourceRewriter.scala @@ -20,7 +20,10 @@ import com.netflix.atlas.eval.stream.Evaluator.DataSources import com.netflix.atlas.eval.stream.HostSource.unzipIfNeeded import com.netflix.atlas.json.Json import com.netflix.atlas.pekko.DiagnosticMessage +import com.netflix.atlas.pekko.OpportunisticEC.ec import com.netflix.atlas.pekko.PekkoHttpClient +import com.netflix.atlas.pekko.StreamOps +import com.netflix.spectator.api.Registry import com.typesafe.config.Config import com.typesafe.scalalogging.StrictLogging import org.apache.pekko.NotUsed @@ -30,15 +33,25 @@ import org.apache.pekko.http.scaladsl.model.HttpEntity import org.apache.pekko.http.scaladsl.model.HttpMethods import org.apache.pekko.http.scaladsl.model.HttpRequest import org.apache.pekko.http.scaladsl.model.StatusCodes +import org.apache.pekko.stream.scaladsl.BroadcastHub import org.apache.pekko.stream.scaladsl.Flow +import org.apache.pekko.stream.scaladsl.Keep +import org.apache.pekko.stream.scaladsl.RetryFlow +import org.apache.pekko.stream.scaladsl.Source import java.io.ByteArrayOutputStream +import java.util.concurrent.ConcurrentHashMap +import scala.concurrent.duration.DurationInt import scala.jdk.CollectionConverters.CollectionHasAsScala import scala.util.Failure import scala.util.Success import scala.util.Using -class DataSourceRewriter(config: Config, implicit val system: ActorSystem) extends StrictLogging { +class DataSourceRewriter( + config: Config, + registry: Registry, + implicit val system: ActorSystem +) extends StrictLogging { private val (enabled, rewriteUrl) = { val enabled = config.hasPath("atlas.eval.stream.rewrite-url") @@ -55,21 +68,84 @@ class DataSourceRewriter(config: Config, implicit val system: ActorSystem) exten .create("datasource-rewrite", system) .superPool[List[DataSource]]() - def rewrite(context: StreamContext): Flow[DataSources, DataSources, NotUsed] = { - rewrite(client, context) + private val rewriteCache = new ConcurrentHashMap[String, String]() + + private val rewriteSuccess = registry.counter("atlas.eval.stream.rewrite.success") + private val rewriteFailures = registry.createId("atlas.eval.stream.rewrite.failures") + private val rewriteCacheHits = registry.counter("atlas.eval.stream.rewrite.cache", "id", "hits") + + private val rewriteCacheMisses = + registry.counter("atlas.eval.stream.rewrite.cache", "id", "misses") + + def rewrite( + context: StreamContext, + keepRetrying: Boolean = true + ): Flow[DataSources, DataSources, NotUsed] = { + rewrite(client, context, keepRetrying) } def rewrite( client: SuperPoolClient, - context: StreamContext + context: StreamContext, + keepRetrying: Boolean ): Flow[DataSources, DataSources, NotUsed] = { if (!enabled) { return Flow[DataSources] } + val (cachedQueue, cachedSource) = StreamOps + .blockingQueue[DataSources](registry, "cachedRewrites", 1) + .toMat(BroadcastHub.sink(1))(Keep.both) + .run() + var sentCacheData = false + + val retryFlow = RetryFlow + .withBackoff( + minBackoff = 100.milliseconds, + maxBackoff = 5.second, + randomFactor = 0.35, + maxRetries = if (keepRetrying) -1 else 0, + flow = httpFlow(client, context) + ) { + case (original, resp) => + resp match { + case Success(_) => None + case Failure(ex) => + val (request, dsl) = original + logger.debug("Retrying the rewrite request due to error", ex) + if (!sentCacheData) { + if (!cachedQueue.offer(returnFromCache(dsl))) { + // note that this should never happen. + logger.error("Unable to send cached results to queue.") + } else { + sentCacheData = true + } + } + Some(request -> dsl) + } + + } + .watchTermination() { (_, f) => + f.onComplete { _ => + cachedQueue.complete() + } + } + Flow[DataSources] .map(_.sources().asScala.toList) .map(dsl => constructRequest(dsl) -> dsl) + .via(retryFlow) + .filter(_.isSuccess) + .map { + // reset the cached flag + sentCacheData = false + _.get + } + .merge(cachedSource) + } + + private[stream] def httpFlow(client: SuperPoolClient, context: StreamContext) = { + Flow[(HttpRequest, List[DataSource])] .via(client) .flatMapConcat { case (Success(resp), dsl) => @@ -89,6 +165,7 @@ class DataSourceRewriter(config: Config, implicit val system: ActorSystem) exten DiagnosticMessage.error(s"failed rewrite of ${ds.uri()}: ${r.message}") context.dsLogger(ds, msg) } else { + rewriteCache.put(ds.uri(), r.rewrite) rewrites += new DataSource(ds.id, ds.step(), r.rewrite) } } @@ -96,21 +173,51 @@ class DataSourceRewriter(config: Config, implicit val system: ActorSystem) exten // NOTE: We're assuming that the number of items returned will be the same as the // number of uris sent to the rewrite service. If they differ, data sources may be // mapped to IDs and steps incorrectly. - DataSources.of(rewrites.result().toArray: _*) + rewriteSuccess.increment() + Success(DataSources.of(rewrites.result().toArray: _*)) case _ => logger.error( - "Exception from rewrite service. status={}, resp={}", + "Error from rewrite service. status={}, resp={}", resp.status, body ) - throw new RuntimeException(body) + registry + .counter( + rewriteFailures.withTags("status", resp.status.toString(), "exception", "NA") + ) + .increment() + Failure( + new RuntimeException( + s"Error from rewrite service. status=${resp.status}, resp=$body" + ) + ) } } case (Failure(ex), _) => - throw ex + logger.error("Failure from rewrite service", ex) + registry + .counter( + rewriteFailures.withTags("status", "0", "exception", ex.getClass.getSimpleName) + ) + .increment() + Source.single(Failure(ex)) } } + private[stream] def returnFromCache(dsl: List[DataSource]): DataSources = { + val rewrites = dsl.flatMap { ds => + val rewrite = rewriteCache.get(ds.uri()) + if (rewrite == null) { + rewriteCacheMisses.increment() + None + } else { + rewriteCacheHits.increment() + Some(new DataSource(ds.id, ds.step(), rewrite)) + } + } + DataSources.of(rewrites: _*) + } + private[stream] def constructRequest(dss: List[DataSource]): HttpRequest = { val baos = new ByteArrayOutputStream Using(Json.newJsonGenerator(baos)) { json => diff --git a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala index ba66a5113..c9e8935d7 100644 --- a/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala +++ b/atlas-eval/src/main/scala/com/netflix/atlas/eval/stream/EvaluatorImpl.scala @@ -88,7 +88,7 @@ private[stream] abstract class EvaluatorImpl( private val logger = LoggerFactory.getLogger(getClass) // Calls out to a rewrite service in case URIs need mutating to pick the proper backend. - private[stream] var dataSourceRewriter = new DataSourceRewriter(config, system) + private[stream] var dataSourceRewriter = new DataSourceRewriter(config, registry, system) // Cached context instance used for things like expression validation. private val validationStreamContext = newStreamContext(new ThrowingDSLogger) @@ -135,7 +135,7 @@ private[stream] abstract class EvaluatorImpl( protected def validateImpl(ds: DataSource): Unit = { val future = Source .single(DataSources.of(ds)) - .via(dataSourceRewriter.rewrite(validationStreamContext)) + .via(dataSourceRewriter.rewrite(validationStreamContext, false)) .map(_.sources().asScala.map(validationStreamContext.validateDataSource).map(_.get)) .toMat(Sink.head)(Keep.right) .run() diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala index 066e4d13b..e9aafc407 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/DataSourceRewriterSuite.scala @@ -21,6 +21,7 @@ import com.netflix.atlas.eval.stream.Evaluator.DataSources import com.netflix.atlas.json.Json import com.netflix.atlas.json.JsonSupport import com.netflix.atlas.pekko.PekkoHttpClient +import com.netflix.spectator.api.NoopRegistry import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory @@ -32,6 +33,7 @@ import org.apache.pekko.http.scaladsl.model.HttpEntity import org.apache.pekko.http.scaladsl.model.HttpRequest import org.apache.pekko.http.scaladsl.model.HttpResponse import org.apache.pekko.http.scaladsl.model.StatusCode +import org.apache.pekko.http.scaladsl.model.StatusCodes import org.apache.pekko.stream.Materializer import org.apache.pekko.stream.scaladsl.Flow import org.apache.pekko.stream.scaladsl.Keep @@ -84,20 +86,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { test("rewrite: OK") { val client = mockClient( StatusCode.int2StatusCode(200), - Map( - "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( - "OK", - "http://localhost/api/v1/graph?q=name,foo,:eq", - "http://localhost/api/v1/graph?q=name,foo,:eq", - "" - ), - "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and" -> Rewrite( - "OK", - "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq", - "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and", - "" - ) - ) + okRewrite() ) val expected = DataSources.of( new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq"), @@ -112,7 +101,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { ctx.dsLogger.asInstanceOf[MockLogger].assertSize(0) } - test("parseRewrites: Bad URI in datasources") { + test("rewrite: Bad URI in datasources") { val client = mockClient( StatusCode.int2StatusCode(200), Map( @@ -138,7 +127,7 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { ctx.dsLogger.asInstanceOf[MockLogger].assertSize(1) } - test("parseRewrites: Malformed response JSON") { + test("rewrite: Malformed response JSON") { val client = mockClient( StatusCode.int2StatusCode(200), Map( @@ -162,17 +151,18 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { } } - test("parseRewrites: 500") { + test("rewrite: 500") { val client = mockClient( StatusCode.int2StatusCode(500), Map.empty ) - intercept[RuntimeException] { - rewrite(dss, client) - } + val expected = DataSources.of() + val obtained = rewrite(dss, client) + assertEquals(obtained, expected) + ctx.dsLogger.asInstanceOf[MockLogger].assertSize(0) } - test("parseRewrites: Missing a rewrite") { + test("rewrite: Missing a rewrite") { val client = mockClient( StatusCode.int2StatusCode(200), Map( @@ -192,6 +182,97 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { ctx.dsLogger.asInstanceOf[MockLogger].assertSize(1) } + test("rewrite: source changes with good, bad, good") { + val client = new MockClient( + List( + StatusCode.int2StatusCode(200), + StatusCode.int2StatusCode(500), + StatusCode.int2StatusCode(200) + ), + List( + okRewrite(true), + Map.empty, + okRewrite() + ) + ).superPool[List[DataSource]]() + + val rewriter = new DataSourceRewriter(config, new NoopRegistry(), system) + val future = Source + .fromIterator(() => + List( + DataSources.of( + new DataSource( + "foo", + Duration.ofSeconds(60), + "http://localhost/api/v1/graph?q=name,foo,:eq" + ) + ), + dss, + dss + ).iterator + ) + .via(rewriter.rewrite(client, ctx, true)) + .grouped(3) + .toMat(Sink.head)(Keep.right) + .run() + val res = Await.result(future, 30.seconds) + assertEquals(res(0), expectedRewrite(true)) + assertEquals(res(1), expectedRewrite(true)) + assertEquals(res(2), expectedRewrite()) + } + + test("rewrite: retry initial flow with 500s") { + val client = new MockClient( + List( + StatusCode.int2StatusCode(500), + StatusCode.int2StatusCode(500), + StatusCode.int2StatusCode(200) + ), + List( + Map.empty, + Map.empty, + okRewrite() + ) + ).superPool[List[DataSource]]() + + val rewriter = new DataSourceRewriter(config, new NoopRegistry(), system) + val future = Source + .single(dss) + .via(rewriter.rewrite(client, ctx, true)) + .grouped(3) + .toMat(Sink.head)(Keep.right) + .run() + val res = Await.result(future, 30.seconds) + assertEquals(res(0), DataSources.of()) + assertEquals(res(1), expectedRewrite()) + } + + test("rewrite: retry initial flow with 500, exception, ok") { + val client = new MockClient( + List( + StatusCode.int2StatusCode(500), + StatusCodes.custom(0, "no conn", "no conn", false, true), + StatusCode.int2StatusCode(200) + ), + List( + Map.empty, + Map.empty, + okRewrite() + ) + ).superPool[List[DataSource]]() + + val rewriter = new DataSourceRewriter(config, new NoopRegistry(), system) + val future = Source + .single(dss) + .via(rewriter.rewrite(client, ctx, true)) + .grouped(3) + .toMat(Sink.head)(Keep.right) + .run() + val res = Await.result(future, 30.seconds) + assertEquals(res(0), DataSources.of()) + assertEquals(res(1), expectedRewrite()) + } + def mockClient( status: StatusCode, response: Map[String, Rewrite], @@ -203,24 +284,64 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { PekkoHttpClient.create(Failure(ex)).superPool[List[DataSource]]() } .getOrElse { - new MockClient(status, response, malformed).superPool[List[DataSource]]() + new MockClient(List(status), List(response), malformed).superPool[List[DataSource]]() } } def rewrite(dss: DataSources, client: SuperPoolClient): DataSources = { - val rewriter = new DataSourceRewriter(config, system) + val rewriter = new DataSourceRewriter(config, new NoopRegistry(), system) val future = Source .single(dss) - .via( - rewriter.rewrite(client, ctx) - ) + .via(rewriter.rewrite(client, ctx, true)) .toMat(Sink.head)(Keep.right) .run() Await.result(future, 30.seconds) } - class MockClient(status: StatusCode, response: Map[String, Rewrite], malformed: Boolean = false) - extends PekkoHttpClient { + def okRewrite(dropSecond: Boolean = false): Map[String, Rewrite] = { + val builder = Map.newBuilder[String, Rewrite] + builder += + "http://localhost/api/v1/graph?q=name,foo,:eq" -> Rewrite( + "OK", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "http://localhost/api/v1/graph?q=name,foo,:eq", + "" + ) + + if (!dropSecond) { + builder += "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and" -> Rewrite( + "OK", + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq", + "http://localhost/api/v1/graph?q=nf.ns,seg.prod,:eq,name,bar,:eq,:and", + "" + ) + } + + builder.result() + } + + def expectedRewrite(dropSecond: Boolean = false): DataSources = { + val ds1 = + new DataSource("foo", Duration.ofSeconds(60), "http://localhost/api/v1/graph?q=name,foo,:eq") + val ds2 = new DataSource( + "bar", + Duration.ofSeconds(60), + "http://atlas-seg.prod.netflix.net/api/v1/graph?q=name,bar,:eq" + ) + if (dropSecond) { + DataSources.of(ds1) + } else { + DataSources.of(ds1, ds2) + } + } + + class MockClient( + status: List[StatusCode], + response: List[Map[String, Rewrite]], + malformed: Boolean = false + ) extends PekkoHttpClient { + + var called = 0 override def singleRequest(request: HttpRequest): Future[HttpResponse] = ??? @@ -231,23 +352,39 @@ class DataSourceRewriterSuite extends FunSuite with TestKitBase { .flatMapConcat { case (req, context) => req.entity.withoutSizeLimit().dataBytes.map { body => - val uris = Json.decode[List[String]](body.toArray) - val rewrites = uris.map { uri => - response.get(uri) match { - case Some(r) => r - case None => Rewrite("NOT_FOUND", uri, uri, "Empty") - } - } + val httpResp = status(called) match { + case status if status.intValue() == 0 => null + + case status if status.intValue() != 200 => + HttpResponse( + status, + entity = HttpEntity(ContentTypes.`application/json`, "{\"error\":\"whoops\"}") + ) - val json = - if (malformed) Json.encode(rewrites).substring(0, 25) else Json.encode(rewrites) + case status => + val uris = Json.decode[List[String]](body.toArray) + val rewrites = uris.map { uri => + response(called).get(uri) match { + case Some(r) => r + case None => Rewrite("NOT_FOUND", uri, uri, "Empty") + } + } - val httpResp = - HttpResponse( - status, - entity = HttpEntity(ContentTypes.`application/json`, json) - ) - Success(httpResp) -> context + val json = + if (malformed) Json.encode(rewrites).substring(0, 25) else Json.encode(rewrites) + + HttpResponse( + status, + entity = HttpEntity(ContentTypes.`application/json`, json) + ) + } + + called += 1 + if (httpResp == null) { + Failure(new RuntimeException("no conn")) -> context + } else { + Success(httpResp) -> context + } } } } diff --git a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala index be42ece78..35004c643 100644 --- a/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala +++ b/atlas-eval/src/test/scala/com/netflix/atlas/eval/stream/EvaluatorSuite.scala @@ -21,11 +21,15 @@ import java.time.Duration import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger import org.apache.pekko.actor.ActorSystem +import org.apache.pekko.stream.scaladsl.BroadcastHub import org.apache.pekko.stream.scaladsl.Flow import org.apache.pekko.stream.scaladsl.Keep import org.apache.pekko.stream.scaladsl.Sink import org.apache.pekko.stream.scaladsl.Source +import org.apache.pekko.stream.scaladsl.StreamConverters import com.netflix.atlas.chart.util.SrcPath +import com.netflix.atlas.core.model.FilterExpr.Filter +import com.netflix.atlas.core.util.Streams import com.netflix.atlas.eval.model.ArrayData import com.netflix.atlas.eval.model.LwcDatapoint import com.netflix.atlas.eval.model.LwcDiagnosticMessage @@ -40,19 +44,23 @@ import com.netflix.atlas.eval.stream.Evaluator.MessageEnvelope import com.netflix.atlas.json.Json import com.netflix.atlas.json.JsonSupport import com.netflix.atlas.pekko.DiagnosticMessage +import com.netflix.atlas.pekko.StreamOps import com.netflix.spectator.api.DefaultRegistry +import com.netflix.spectator.api.NoopRegistry import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigValueFactory import nl.jqno.equalsverifier.EqualsVerifier import nl.jqno.equalsverifier.Warning import munit.FunSuite import org.apache.pekko.NotUsed +import org.apache.pekko.util.ByteString import java.nio.file.Path import scala.concurrent.Await import scala.concurrent.Future import scala.concurrent.Promise import scala.concurrent.duration.DurationInt +import scala.jdk.CollectionConverters.CollectionHasAsScala import scala.jdk.CollectionConverters.SetHasAsScala import scala.util.Success import scala.util.Using @@ -370,6 +378,83 @@ class EvaluatorSuite extends FunSuite { testError(ds1, msg) } + // TODO - these datasources changes tests are ignored as they are currently relying + // on a sleep since making it deterministic will be a challenge. But they do show how + // the last datasources wins. Previous streams are stopped. + def dataSourcesChanges(state: Int): Unit = { + val evaluator = new Evaluator(config, registry, system) + + val baseUri = "resource:///gc-pause.dat" + val uri = s"$baseUri?q=name,jvm.gc.pause,:eq,:dist-max,(,nf.asg,nf.node,),:by" + + def getSources: List[DataSources] = { + state match { + // duplicates + case 0 => + List( + Evaluator.DataSources.of(ds("one", uri), ds("two", uri)), + Evaluator.DataSources.of(ds("one", uri), ds("two", uri)) + ) + // disjoint + case 1 => + List( + Evaluator.DataSources.of(ds("one", uri)), + Evaluator.DataSources.of(ds("two", uri)) + ) + // overlap + case 2 => + List( + Evaluator.DataSources.of(ds("one", uri)), + Evaluator.DataSources.of(ds("one", uri), ds("two", uri)) + ) + case x => + throw new IllegalArgumentException(s"Haven't setup a test for ${x}") + } + } + val sourceRef = EvaluationFlows.stoppableSource( + Source + .fromIterator(() => getSources.iterator) + .via(Flow.fromProcessor(() => evaluator.createStreamsProcessor())) + ) + + val oneCount = new AtomicInteger() + val twoCount = new AtomicInteger() + val sink = Sink.foreach[Evaluator.MessageEnvelope] { msg => + if (msg.message().isInstanceOf[TimeSeriesMessage]) { + msg.id match { + case "one" => oneCount.incrementAndGet() + case "two" => twoCount.incrementAndGet() + } + } + } + + val future = sourceRef.source + .toMat(sink)(Keep.right) + .run() + + // TODO - We should have a better trigger. Can't trigger off DPs as if duplication breaks, + // we don't want to stop at 264 and miss dupes. + Thread.sleep(5000) + sourceRef.stop() + + Await.result(future, 1.minute) + // NOTE: Last source wins. So in this case, one is never processed. + assertEquals(oneCount.get(), if (state == 1) 0 else 255) + assertEquals(twoCount.get(), 255) + } + + test("datasources changes, duplicates".ignore) { + dataSourcesChanges(0) + } + + test("datasources changes, disjoint".ignore) { + dataSourcesChanges(1) + } + + test("datasources changes, overlap".ignore) { + dataSourcesChanges(2) + } + def testRewrite(skipOne: Boolean = false, throwEx: Boolean = false): Unit = { val evaluator = new Evaluator(config, registry, system) evaluator.dataSourceRewriter = new UTRewriter(skipOne, throwEx) @@ -859,10 +944,23 @@ class EvaluatorSuite extends FunSuite { assertEquals(result.size, 10) } + def getMessages(file: String, filter: Option[String] = None): Seq[ByteString] = { + Files + .readAllLines(Paths.get(file)) + .asScala + .filter(!_.isBlank) + .filter(line => filter.forall(line.contains)) + .map(ByteString(_)) + .toSeq + } + class UTRewriter(skipOne: Boolean = false, throwEx: Boolean = false) - extends DataSourceRewriter(config, system) { + extends DataSourceRewriter(config, registry, system) { - override def rewrite(context: StreamContext): Flow[DataSources, DataSources, NotUsed] = { + override def rewrite( + context: StreamContext, + keepRetrying: Boolean = true + ): Flow[DataSources, DataSources, NotUsed] = { if (throwEx) { Flow[DataSources] .map(_ => throw new RuntimeException("test"))