From 0de667e4951633b2442c253ae7abbb9e5b7e6e5e Mon Sep 17 00:00:00 2001 From: Bruno Ballekens <30588687+brunoballekens@users.noreply.github.com> Date: Fri, 1 Oct 2021 16:12:46 +0200 Subject: [PATCH] Elasticsearch: Handle failed Future when invoking endpoint in Source (#2739) --- .../impl/ElasticsearchSourceStage.scala | 26 ++++++--- .../impl/ElasticsearchSourcStageTest.scala | 54 +++++++++++++++++++ 2 files changed, 73 insertions(+), 7 deletions(-) create mode 100644 elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourcStageTest.scala diff --git a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala index 058dd94785..bf93b5d1f8 100644 --- a/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala +++ b/elasticsearch/src/main/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourceStage.scala @@ -158,7 +158,7 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( request, settings.connection ) - .map { + .flatMap { case HttpResponse(StatusCodes.OK, _, responseEntity, _) => Unmarshal(responseEntity) .to[String] @@ -169,6 +169,9 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( .invoke(new RuntimeException(s"Request failed for POST $uri, got $status with body: $body")) } } + .recover { + case cause: Throwable => failureHandler.invoke(cause) + } } else { log.debug("Fetching next scroll") @@ -187,16 +190,22 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( request, settings.connection ) - .map { + .flatMap { case HttpResponse(StatusCodes.OK, _, responseEntity, _) => Unmarshal(responseEntity) .to[String] .map(json => responseHandler.invoke(json)) case HttpResponse(status, _, responseEntity, _) => - Unmarshal(responseEntity).to[String].map { body => - failureHandler - .invoke(new RuntimeException(s"Request failed for POST $uri, got $status with body: $body")) - } + Unmarshal(responseEntity) + .to[String] + .map { body => + failureHandler.invoke( + new RuntimeException(s"Request failed for POST $uri, got $status with body: $body") + ) + } + } + .recover { + case cause: Throwable => failureHandler.invoke(cause) } } } catch { @@ -304,7 +313,7 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( ElasticsearchApi .executeRequest(request, settings.connection) - .map { + .flatMap { case HttpResponse(StatusCodes.OK, _, responseEntity, _) => Unmarshal(responseEntity) .to[String] @@ -317,6 +326,9 @@ private[elasticsearch] final class ElasticsearchSourceLogic[T]( .invoke(Failure(new RuntimeException(s"Request failed for POST $uri, got $status with body: $body"))) } } + .recover { + case cause: Throwable => failureHandler.invoke(cause) + } } } diff --git a/elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourcStageTest.scala b/elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourcStageTest.scala new file mode 100644 index 0000000000..75621b0fef --- /dev/null +++ b/elasticsearch/src/test/scala/akka/stream/alpakka/elasticsearch/impl/ElasticsearchSourcStageTest.scala @@ -0,0 +1,54 @@ +/* + * Copyright (C) 2016-2020 Lightbend Inc. + */ + +package akka.stream.alpakka.elasticsearch.impl + +import akka.actor.ActorSystem +import akka.http.scaladsl.{Http, HttpExt} +import akka.stream.Materializer +import akka.stream.alpakka.elasticsearch._ +import akka.stream.alpakka.testkit.scaladsl.LogCapturing +import akka.stream.scaladsl.{Keep, Source} +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.TestKit +import org.scalatest.BeforeAndAfterAll +import org.scalatest.wordspec.AnyWordSpecLike + +import scala.concurrent.ExecutionContext.Implicits.global + +class ElasticsearchSourcStageTest + extends TestKit(ActorSystem("elasticsearchSourceStagetest")) + with AnyWordSpecLike + with BeforeAndAfterAll + with LogCapturing { + + implicit val mat: Materializer = Materializer(system) + implicit val http: HttpExt = Http() + + "ElasticsearchSourceStage" when { + "client cannot connect to ES" should { + "stop the stream" in { + val downstream = Source + .fromGraph( + new impl.ElasticsearchSourceStage[String]( + ElasticsearchParams.V7("es-simple-flow-index"), + Map("query" -> """{ "match_all":{}}"""), + ElasticsearchSourceSettings(ElasticsearchConnectionSettings("http://wololo:9202")), + (json: String) => ScrollResponse(Some(json), None) + ) + ) + .toMat(TestSink.probe)(Keep.right) + .run() + + downstream.request(1) + downstream.expectError() + } + } + } + + override def afterAll(): Unit = { + super.afterAll() + TestKit.shutdownActorSystem(system) + } +}