Skip to content

Commit

Permalink
Get facia tool to publish a message onto the SNS topic (#1528)
Browse files Browse the repository at this point in the history
* Get facia tool to publish a message onto the SNS topic
* Create new SNSTopic file
  • Loading branch information
silvacb authored Oct 23, 2023
1 parent 256f5c8 commit d4675c5
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 7 deletions.
5 changes: 3 additions & 2 deletions app/Components.scala
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ class AppComponents(context: Context, val config: ApplicationConfiguration)
val containerService = new ContainerService(containers)
val collectionService = new CollectionService(frontsApi, containerService)
val faciaPressQueue = new FaciaPressQueue(config)
val faciaPress = new FaciaPress(faciaPressQueue, configAgent)
val faciaPressTopic = new FaciaPressTopic(config)
val faciaPress = new FaciaPress(faciaPressQueue, faciaPressTopic, configAgent)
val updateActions = new UpdateActions(faciaApiIO, frontsApi, config, configAgent, structuredLogger)
val updateManager = new UpdateManager(updateActions, configAgent, s3FrontsApi)
val cloudwatch = new CloudWatch(config, awsEndpoints)
Expand All @@ -92,7 +93,7 @@ class AppComponents(context: Context, val config: ApplicationConfiguration)
val defaults = new DefaultsController(acl, isDev, this)
val faciaCapiProxy = new FaciaContentApiProxy(capi, this)
val faciaTool = new FaciaToolController(acl, frontsApi, collectionService, faciaApiIO, updateActions, breakingNewsUpdate,
structuredLogger, faciaPress, faciaPressQueue, configAgent, s3FrontsApi, this)
structuredLogger, faciaPress, faciaPressQueue, faciaPressTopic, configAgent, s3FrontsApi, this)
val front = new FrontController(acl, structuredLogger, updateManager, press, this)
val pandaAuth = new PandaAuthController(this)
val status = new StatusController(this)
Expand Down
1 change: 1 addition & 0 deletions app/conf/Configuration.scala
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ class ApplicationConfiguration(val playConfiguration: PlayConfiguration, val isP
object faciatool {
lazy val breakingNewsFront = "breaking-news"
lazy val frontPressToolQueue = getString("frontpress.sqs.tool_queue_url")
lazy val frontPressToolTopic = getString("faciatool.sns.tool_topic_arn")
lazy val publishEventsQueue = getMandatoryString("publish_events.queue_url")
lazy val showTestContainers = getBoolean("faciatool.show_test_containers").getOrElse(false)
lazy val stsRoleToAssume = getString("faciatool.sts.role.to.assume").getOrElse(stsRoleToAssumeFromProperties)
Expand Down
3 changes: 3 additions & 0 deletions app/controllers/FaciaToolController.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ class FaciaToolController(
val structuredLogger: StructuredLogger,
val faciaPress: FaciaPress,
val faciaPressQueue: FaciaPressQueue,
val FaciaPressTopic: FaciaPressTopic,
val configAgent: ConfigAgent,
val s3FrontsApi: S3FrontsApi,
val deps: BaseFaciaControllerComponents
Expand Down Expand Up @@ -246,11 +247,13 @@ class FaciaToolController(

def pressLivePath(path: String) = AccessAPIAuthAction { request =>
faciaPressQueue.enqueue(PressJob(FrontPath(path), Live, forceConfigUpdate = Option(true)))
FaciaPressTopic.publish(PressJob(FrontPath(path), Live, forceConfigUpdate = Option(true)))
NoCache(Ok)
}

def pressDraftPath(path: String) = AccessAPIAuthAction { request =>
faciaPressQueue.enqueue(PressJob(FrontPath(path), Draft, forceConfigUpdate = Option(true)))
FaciaPressTopic.publish(PressJob(FrontPath(path), Draft, forceConfigUpdate = Option(true)))
NoCache(Ok)
}

Expand Down
30 changes: 28 additions & 2 deletions app/services/FaciaPress.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
package services

import com.amazonaws.regions.Regions
import com.amazonaws.services.sns.AmazonSNSAsyncClientBuilder
import com.amazonaws.services.sns.model.PublishResult
import com.amazonaws.services.sqs.AmazonSQSAsyncClientBuilder
import com.amazonaws.services.sqs.model.SendMessageResult
import com.gu.facia.api.models.faciapress.{Draft, FrontPath, Live, PressJob}
import conf.ApplicationConfiguration
import metrics.FaciaToolMetrics.{EnqueuePressFailure, EnqueuePressSuccess}
import logging.Logging
import metrics.FaciaToolMetrics.{EnqueuePressFailure, EnqueuePressSuccess}

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
Expand Down Expand Up @@ -49,7 +51,29 @@ class FaciaPressQueue(val config: ApplicationConfiguration) {
}
}

class FaciaPress(val faciaPressQueue: FaciaPressQueue, val configAgent: ConfigAgent) extends Logging {
class FaciaPressTopic(val config: ApplicationConfiguration) {
val maybeTopic = config.faciatool.frontPressToolTopic map { topicArn =>
val credentials = config.aws.cmsFrontsAccountCredentials
JsonMessageTopic[PressJob](AmazonSNSAsyncClientBuilder.standard()
.withCredentials(credentials)
.withRegion(Regions.EU_WEST_1).build(),
topicArn
)
}

def publish(job: PressJob): Future[PublishResult] = {
maybeTopic match {
case Some(topicArn) =>
topicArn.send(job)

case None =>
Future.failed(new RuntimeException("Could not publish job."))
}
}

}

class FaciaPress(val faciaPressQueue: FaciaPressQueue, val faciaPressTopic: FaciaPressTopic, val configAgent: ConfigAgent) extends Logging {
def press(pressCommand: PressCommand): Future[List[SendMessageResult]] = {
configAgent.refreshAndReturn() flatMap { _ =>
val paths: Set[String] = for {
Expand All @@ -67,6 +91,7 @@ class FaciaPress(val faciaPressQueue: FaciaPressQueue, val configAgent: ConfigAg
case Success(_) =>
EnqueuePressSuccess.increment()
}
val fut2 = Future.traverse(paths)(path => faciaPressTopic.publish(PressJob(FrontPath(path), Live, forceConfigUpdate = pressCommand.forceConfigUpdate)))
fut
} else {
Future.successful(Set.empty)
Expand All @@ -82,6 +107,7 @@ class FaciaPress(val faciaPressQueue: FaciaPressQueue, val configAgent: ConfigAg
case Success(_) =>
EnqueuePressSuccess.increment()
}
val fut2 = Future.traverse(paths)(path => faciaPressTopic.publish(PressJob(FrontPath(path), Draft, forceConfigUpdate = pressCommand.forceConfigUpdate)))
fut
} else Future.successful(Set.empty)

Expand Down
47 changes: 47 additions & 0 deletions app/services/SNSTopic.scala.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package services

import java.util.concurrent.{Future => JavaFuture}

import com.amazonaws.handlers.AsyncHandler
import play.api.libs.json.{Json, Writes}

import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}
import com.amazonaws.services.sns.AmazonSNSAsync
import com.amazonaws.services.sns.model.PublishResult
import com.amazonaws.services.sns.model.PublishRequest
import logging.Logging

object SNSTopics {
implicit class RichAmazonSNSAsyncClient(client: AmazonSNSAsync) {
private def createHandler[A <: com.amazonaws.AmazonWebServiceRequest, B]() = {
val promise = Promise[B]()

val handler = new AsyncHandler[A, B] {
override def onSuccess(request: A, result: B): Unit = promise.complete(Success(result))

override def onError(exception: Exception): Unit = promise.complete(Failure(exception))
}

(promise.future, handler)
}

private def asFuture[A <: com.amazonaws.AmazonWebServiceRequest, B](f: AsyncHandler[A, B] => JavaFuture[B]) = {
val (future, handler) = createHandler[A, B]()
f(handler)
future
}

def publishMessageFuture(topicArn: String, message: String): Future[PublishResult] =
asFuture[PublishRequest, PublishResult](client.publishAsync(topicArn, message, _))
}
}

case class JsonMessageTopic[A](client: AmazonSNSAsync, topicArn: String)
(implicit executionContext: ExecutionContext) extends Logging {
import SNSTopics._

def send(a: A)(implicit writes: Writes[A]): Future[PublishResult] = {
client.publishMessageFuture(topicArn: String, Json.stringify(Json.toJson(a)))
}
}
4 changes: 1 addition & 3 deletions app/services/SQSQueues.scala
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
package services

import java.util.concurrent.{Future => JavaFuture}

import com.amazonaws.handlers.AsyncHandler
import com.amazonaws.services.sqs.AmazonSQSAsync
import com.amazonaws.services.sqs.model._
import play.api.libs.json.{Json, Writes}

import java.util.concurrent.{Future => JavaFuture}
import scala.concurrent.{ExecutionContext, Future, Promise}
import scala.util.{Failure, Success}

Expand Down Expand Up @@ -34,7 +33,6 @@ object SQSQueues {
asFuture[SendMessageRequest, SendMessageResult](client.sendMessageAsync(request, _))
}
}

/** Utility class for SQS queues that use JSON to serialize their messages */
case class JsonMessageQueue[A](client: AmazonSQSAsync, queueUrl: String)
(implicit executionContext: ExecutionContext) {
Expand Down
1 change: 1 addition & 0 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ libraryDependencies ++= Seq(
"com.amazonaws" % "aws-java-sdk-core" % awsVersion,
"com.amazonaws" % "aws-java-sdk-cloudwatch" % awsVersion,
"com.amazonaws" % "aws-java-sdk-s3" % awsVersion,
"com.amazonaws" % "aws-java-sdk-sns" % awsVersion,
"com.amazonaws" % "aws-java-sdk-sqs" % awsVersion,
"com.amazonaws" % "aws-java-sdk-ssm" % awsVersion,
"com.amazonaws" % "aws-java-sdk-sts" % awsVersion,
Expand Down
2 changes: 2 additions & 0 deletions conf/application.conf
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,8 @@ PROD {
publish_events.queue_url="https://sqs.eu-west-1.amazonaws.com/163592447864/publish-events-PROD"
}

faciatool.sns.tool_topic_arn="arn:aws:sns:eu-west-1:163592447864:facia-CODE-FrontsUpdateSNSTopic-RepwK3g95V3w"

faciatool.show_test_containers=true

include "local.conf"
Expand Down

0 comments on commit d4675c5

Please sign in to comment.