diff --git a/app/Components.scala b/app/Components.scala index 8ddf911ee0f..2e081ee4b50 100644 --- a/app/Components.scala +++ b/app/Components.scala @@ -78,7 +78,8 @@ class AppComponents(context: Context, val config: ApplicationConfiguration) val containerService = new ContainerService(containers) val collectionService = new CollectionService(frontsApi, containerService) val faciaPressQueue = new FaciaPressQueue(config) - val faciaPress = new FaciaPress(faciaPressQueue, configAgent) + val faciaPressTopic = new FaciaPressTopic(config) + val faciaPress = new FaciaPress(faciaPressQueue, faciaPressTopic, configAgent) val updateActions = new UpdateActions(faciaApiIO, frontsApi, config, configAgent, structuredLogger) val updateManager = new UpdateManager(updateActions, configAgent, s3FrontsApi) val cloudwatch = new CloudWatch(config, awsEndpoints) @@ -92,7 +93,7 @@ class AppComponents(context: Context, val config: ApplicationConfiguration) val defaults = new DefaultsController(acl, isDev, this) val faciaCapiProxy = new FaciaContentApiProxy(capi, this) val faciaTool = new FaciaToolController(acl, frontsApi, collectionService, faciaApiIO, updateActions, breakingNewsUpdate, - structuredLogger, faciaPress, faciaPressQueue, configAgent, s3FrontsApi, this) + structuredLogger, faciaPress, faciaPressQueue, faciaPressTopic, configAgent, s3FrontsApi, this) val front = new FrontController(acl, structuredLogger, updateManager, press, this) val pandaAuth = new PandaAuthController(this) val status = new StatusController(this) diff --git a/app/conf/Configuration.scala b/app/conf/Configuration.scala index 001ad477e49..c762b343457 100644 --- a/app/conf/Configuration.scala +++ b/app/conf/Configuration.scala @@ -228,6 +228,7 @@ class ApplicationConfiguration(val playConfiguration: PlayConfiguration, val isP object faciatool { lazy val breakingNewsFront = "breaking-news" lazy val frontPressToolQueue = getString("frontpress.sqs.tool_queue_url") + lazy val frontPressToolTopic = getString("faciatool.sns.tool_topic_arn") lazy val publishEventsQueue = getMandatoryString("publish_events.queue_url") lazy val showTestContainers = getBoolean("faciatool.show_test_containers").getOrElse(false) lazy val stsRoleToAssume = getString("faciatool.sts.role.to.assume").getOrElse(stsRoleToAssumeFromProperties) diff --git a/app/controllers/FaciaToolController.scala b/app/controllers/FaciaToolController.scala index 3538c0e7ce9..80a62404b99 100644 --- a/app/controllers/FaciaToolController.scala +++ b/app/controllers/FaciaToolController.scala @@ -29,6 +29,7 @@ class FaciaToolController( val structuredLogger: StructuredLogger, val faciaPress: FaciaPress, val faciaPressQueue: FaciaPressQueue, + val FaciaPressTopic: FaciaPressTopic, val configAgent: ConfigAgent, val s3FrontsApi: S3FrontsApi, val deps: BaseFaciaControllerComponents @@ -246,11 +247,13 @@ class FaciaToolController( def pressLivePath(path: String) = AccessAPIAuthAction { request => faciaPressQueue.enqueue(PressJob(FrontPath(path), Live, forceConfigUpdate = Option(true))) + FaciaPressTopic.publish(PressJob(FrontPath(path), Live, forceConfigUpdate = Option(true))) NoCache(Ok) } def pressDraftPath(path: String) = AccessAPIAuthAction { request => faciaPressQueue.enqueue(PressJob(FrontPath(path), Draft, forceConfigUpdate = Option(true))) + FaciaPressTopic.publish(PressJob(FrontPath(path), Draft, forceConfigUpdate = Option(true))) NoCache(Ok) } diff --git a/app/services/FaciaPress.scala b/app/services/FaciaPress.scala index 67008b3349f..6add09e6cf2 100644 --- a/app/services/FaciaPress.scala +++ b/app/services/FaciaPress.scala @@ -1,12 +1,14 @@ package services import com.amazonaws.regions.Regions +import com.amazonaws.services.sns.AmazonSNSAsyncClientBuilder +import com.amazonaws.services.sns.model.PublishResult import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder import com.amazonaws.services.sqs.model.SendMessageResult import com.gu.facia.api.models.faciapress.{Draft, FrontPath, Live, PressJob} import conf.ApplicationConfiguration -import metrics.FaciaToolMetrics.{EnqueuePressFailure, EnqueuePressSuccess} import logging.Logging +import metrics.FaciaToolMetrics.{EnqueuePressFailure, EnqueuePressSuccess} import scala.concurrent.ExecutionContext.Implicits.global import scala.concurrent.Future @@ -49,7 +51,29 @@ class FaciaPressQueue(val config: ApplicationConfiguration) { } } -class FaciaPress(val faciaPressQueue: FaciaPressQueue, val configAgent: ConfigAgent) extends Logging { +class FaciaPressTopic(val config: ApplicationConfiguration) { + val maybeTopic = config.faciatool.frontPressToolTopic map { topicArn => + val credentials = config.aws.cmsFrontsAccountCredentials + JsonMessageTopic[PressJob](AmazonSNSAsyncClientBuilder.standard() + .withCredentials(credentials) + .withRegion(Regions.EU_WEST_1).build(), + topicArn + ) + } + + def publish(job: PressJob): Future[PublishResult] = { + maybeTopic match { + case Some(topicArn) => + topicArn.send(job) + + case None => + Future.failed(new RuntimeException("Could not publish job.")) + } + } + +} + +class FaciaPress(val faciaPressQueue: FaciaPressQueue, val faciaPressTopic: FaciaPressTopic, val configAgent: ConfigAgent) extends Logging { def press(pressCommand: PressCommand): Future[List[SendMessageResult]] = { configAgent.refreshAndReturn() flatMap { _ => val paths: Set[String] = for { @@ -67,6 +91,7 @@ class FaciaPress(val faciaPressQueue: FaciaPressQueue, val configAgent: ConfigAg case Success(_) => EnqueuePressSuccess.increment() } + val fut2 = Future.traverse(paths)(path => faciaPressTopic.publish(PressJob(FrontPath(path), Live, forceConfigUpdate = pressCommand.forceConfigUpdate))) fut } else { Future.successful(Set.empty) @@ -82,6 +107,7 @@ class FaciaPress(val faciaPressQueue: FaciaPressQueue, val configAgent: ConfigAg case Success(_) => EnqueuePressSuccess.increment() } + val fut2 = Future.traverse(paths)(path => faciaPressTopic.publish(PressJob(FrontPath(path), Draft, forceConfigUpdate = pressCommand.forceConfigUpdate))) fut } else Future.successful(Set.empty) diff --git a/app/services/SNSTopic.scala.scala b/app/services/SNSTopic.scala.scala new file mode 100644 index 00000000000..00201e79ecf --- /dev/null +++ b/app/services/SNSTopic.scala.scala @@ -0,0 +1,47 @@ +package services + +import java.util.concurrent.{Future => JavaFuture} + +import com.amazonaws.handlers.AsyncHandler +import play.api.libs.json.{Json, Writes} + +import scala.concurrent.{ExecutionContext, Future, Promise} +import scala.util.{Failure, Success} +import com.amazonaws.services.sns.AmazonSNSAsync +import com.amazonaws.services.sns.model.PublishResult +import com.amazonaws.services.sns.model.PublishRequest +import logging.Logging + +object SNSTopics { + implicit class RichAmazonSNSAsyncClient(client: AmazonSNSAsync) { + private def createHandler[A <: com.amazonaws.AmazonWebServiceRequest, B]() = { + val promise = Promise[B]() + + val handler = new AsyncHandler[A, B] { + override def onSuccess(request: A, result: B): Unit = promise.complete(Success(result)) + + override def onError(exception: Exception): Unit = promise.complete(Failure(exception)) + } + + (promise.future, handler) + } + + private def asFuture[A <: com.amazonaws.AmazonWebServiceRequest, B](f: AsyncHandler[A, B] => JavaFuture[B]) = { + val (future, handler) = createHandler[A, B]() + f(handler) + future + } + + def publishMessageFuture(topicArn: String, message: String): Future[PublishResult] = + asFuture[PublishRequest, PublishResult](client.publishAsync(topicArn, message, _)) + } +} + +case class JsonMessageTopic[A](client: AmazonSNSAsync, topicArn: String) + (implicit executionContext: ExecutionContext) extends Logging { + import SNSTopics._ + + def send(a: A)(implicit writes: Writes[A]): Future[PublishResult] = { + client.publishMessageFuture(topicArn: String, Json.stringify(Json.toJson(a))) + } + } diff --git a/app/services/SQSQueues.scala b/app/services/SQSQueues.scala index 377ee23db57..82190270e98 100644 --- a/app/services/SQSQueues.scala +++ b/app/services/SQSQueues.scala @@ -1,12 +1,11 @@ package services -import java.util.concurrent.{Future => JavaFuture} - import com.amazonaws.handlers.AsyncHandler import com.amazonaws.services.sqs.AmazonSQSAsync import com.amazonaws.services.sqs.model._ import play.api.libs.json.{Json, Writes} +import java.util.concurrent.{Future => JavaFuture} import scala.concurrent.{ExecutionContext, Future, Promise} import scala.util.{Failure, Success} @@ -34,7 +33,6 @@ object SQSQueues { asFuture[SendMessageRequest, SendMessageResult](client.sendMessageAsync(request, _)) } } - /** Utility class for SQS queues that use JSON to serialize their messages */ case class JsonMessageQueue[A](client: AmazonSQSAsync, queueUrl: String) (implicit executionContext: ExecutionContext) { diff --git a/build.sbt b/build.sbt index f5f9b5da1f8..4c686bbfeee 100644 --- a/build.sbt +++ b/build.sbt @@ -85,6 +85,7 @@ libraryDependencies ++= Seq( "com.amazonaws" % "aws-java-sdk-core" % awsVersion, "com.amazonaws" % "aws-java-sdk-cloudwatch" % awsVersion, "com.amazonaws" % "aws-java-sdk-s3" % awsVersion, + "com.amazonaws" % "aws-java-sdk-sns" % awsVersion, "com.amazonaws" % "aws-java-sdk-sqs" % awsVersion, "com.amazonaws" % "aws-java-sdk-ssm" % awsVersion, "com.amazonaws" % "aws-java-sdk-sts" % awsVersion, diff --git a/conf/application.conf b/conf/application.conf index 03a15c25e38..45db94d8af1 100644 --- a/conf/application.conf +++ b/conf/application.conf @@ -122,6 +122,8 @@ PROD { publish_events.queue_url="https://sqs.eu-west-1.amazonaws.com/163592447864/publish-events-PROD" } +faciatool.sns.tool_topic_arn="arn:aws:sns:eu-west-1:163592447864:facia-CODE-FrontsUpdateSNSTopic-RepwK3g95V3w" + faciatool.show_test_containers=true include "local.conf"