Skip to content

Commit

Permalink
#1708 Add serializer for Cortex messages
Browse files Browse the repository at this point in the history
  • Loading branch information
To-om committed Dec 14, 2020
1 parent 5cb2192 commit 1668df5
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import org.thp.thehive.connector.cortex.controllers.v0.Conversion._
import org.thp.thehive.connector.cortex.models._
import org.thp.thehive.connector.cortex.services.ActionOps._
import org.thp.thehive.connector.cortex.services.Conversion._
import org.thp.thehive.connector.cortex.services.CortexActor.CheckJob
import org.thp.thehive.controllers.v0.Conversion._
import org.thp.thehive.models._
import org.thp.thehive.services.AlertOps._
Expand Down
Original file line number Diff line number Diff line change
@@ -1,39 +1,39 @@
package org.thp.thehive.connector.cortex.services

import java.util.Date

import akka.actor._
import akka.pattern.pipe

import javax.inject.Inject
import org.thp.client.ApplicationError
import org.thp.cortex.dto.v0.{JobStatus, JobType, OutputJob => CortexJob}
import org.thp.cortex.dto.v0.{JobStatus, JobType, OutputJob}
import org.thp.scalligraph.EntityId
import org.thp.scalligraph.auth.AuthContext
import play.api.Logger
import play.api.libs.json.{Json, OFormat}

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._

object CortexActor {
final case class CheckJob(
jobId: Option[EntityId],
cortexJobId: String,
actionId: Option[EntityId],
cortexId: String,
authContext: AuthContext
)

final private case object CheckJobs
final private case object CheckJobsKey
final private case object FirstCheckJobs
}

sealed trait CortexActorMessage
case class RemoteJob(job: OutputJob) extends CortexActorMessage
case class CheckJob(
jobId: Option[EntityId],
cortexJobId: String,
actionId: Option[EntityId],
cortexId: String,
authContext: AuthContext
) extends CortexActorMessage

private case object CheckJobs extends CortexActorMessage
private case object CheckJobsKey
private case object FirstCheckJobs extends CortexActorMessage
// FIXME Add serializer
/**
* This actor is primarily used to check Job statuses on regular
* ticks using the provided client for each job
*/
class CortexActor @Inject() (connector: Connector, jobSrv: JobSrv, actionSrv: ActionSrv) extends Actor with Timers {
import CortexActor._
implicit val ec: ExecutionContext = context.dispatcher
lazy val logger: Logger = Logger(getClass)

Expand Down Expand Up @@ -66,35 +66,36 @@ class CortexActor @Inject() (connector: Connector, jobSrv: JobSrv, actionSrv: Ac
.getReport(cortexJobId, 1.second)
.recover { // this is a workaround for a timeout bug in Cortex
case ApplicationError(500, body) if (body \ "type").asOpt[String].contains("akka.pattern.AskTimeoutException") =>
CortexJob(cortexJobId, "", "", "", new Date, None, None, JobStatus.InProgress, None, None, "", "", None, JobType.analyzer)
OutputJob(cortexJobId, "", "", "", new Date, None, None, JobStatus.InProgress, None, None, "", "", None, JobType.analyzer)
}
.map(RemoteJob)
.pipeTo(self)
()
}
}

case cortexJob: CortexJob if cortexJob.status == JobStatus.Success || cortexJob.status == JobStatus.Failure =>
checkedJobs.find(_.cortexJobId == cortexJob.id) match {
case Some(CheckJob(Some(jobId), cortexJobId, _, cortexId, authContext)) if cortexJob.`type` == JobType.analyzer =>
logger.info(s"Job $cortexJobId in cortex $cortexId has finished with status ${cortexJob.status}, updating job $jobId")
jobSrv.finished(cortexId, jobId, cortexJob)(authContext)
context.become(receive(checkedJobs.filterNot(_.cortexJobId == cortexJob.id), failuresCount))
case RemoteJob(job) if job.status == JobStatus.Success || job.status == JobStatus.Failure =>
checkedJobs.find(_.cortexJobId == job.id) match {
case Some(CheckJob(Some(jobId), cortexJobId, _, cortexId, authContext)) if job.`type` == JobType.analyzer =>
logger.info(s"Job $cortexJobId in cortex $cortexId has finished with status ${job.status}, updating job $jobId")
jobSrv.finished(cortexId, jobId, job)(authContext)
context.become(receive(checkedJobs.filterNot(_.cortexJobId == job.id), failuresCount))

case Some(CheckJob(_, cortexJobId, Some(actionId), cortexId, authContext)) if cortexJob.`type` == JobType.responder =>
logger.info(s"Job $cortexJobId in cortex $cortexId has finished with status ${cortexJob.status}, updating action $actionId")
actionSrv.finished(actionId, cortexJob)(authContext)
context.become(receive(checkedJobs.filterNot(_.cortexJobId == cortexJob.id), failuresCount))
case Some(CheckJob(_, cortexJobId, Some(actionId), cortexId, authContext)) if job.`type` == JobType.responder =>
logger.info(s"Job $cortexJobId in cortex $cortexId has finished with status ${job.status}, updating action $actionId")
actionSrv.finished(actionId, job)(authContext)
context.become(receive(checkedJobs.filterNot(_.cortexJobId == job.id), failuresCount))

case Some(_) =>
logger.error(s"CortexActor received job output $cortexJob but with unknown type ${cortexJob.`type`}")
logger.error(s"CortexActor received job output $job but with unknown type ${job.`type`}")

case None =>
logger.error(s"CortexActor received job output $cortexJob but did not have it in state $checkedJobs")
logger.error(s"CortexActor received job output $job but did not have it in state $checkedJobs")
}
case cortexJob: CortexJob if cortexJob.status == JobStatus.InProgress || cortexJob.status == JobStatus.Waiting =>
logger.info(s"CortexActor received ${cortexJob.status} from client, retrying in ${connector.refreshDelay}")
case RemoteJob(job) if job.status == JobStatus.InProgress || job.status == JobStatus.Waiting =>
logger.info(s"CortexActor received ${job.status} from client, retrying in ${connector.refreshDelay}")

case _: CortexJob =>
case _: RemoteJob =>
logger.warn(s"CortexActor received JobStatus.Unknown from client, retrying in ${connector.refreshDelay}")

case Status.Failure(e) if failuresCount < connector.maxRetryOnError =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import org.thp.scalligraph.{EntityId, EntityIdOrName, NotFoundError}
import org.thp.thehive.connector.cortex.controllers.v0.Conversion._
import org.thp.thehive.connector.cortex.models._
import org.thp.thehive.connector.cortex.services.Conversion._
import org.thp.thehive.connector.cortex.services.CortexActor.CheckJob
import org.thp.thehive.connector.cortex.services.JobOps._
import org.thp.thehive.controllers.v0.Conversion._
import org.thp.thehive.models._
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package org.thp.thehive.connector.cortex.services

import akka.serialization.Serializer
import org.thp.cortex.dto.v0.OutputJob
import org.thp.scalligraph.EntityIdOrName
import org.thp.scalligraph.auth.{AuthContext, AuthContextImpl, Permission}
import play.api.libs.json.{JsPath, Json, OFormat, Reads, Writes}
import play.api.libs.functional.syntax._

import java.io.NotSerializableException

object CortexSerializer {
implicit val authContextReads: Reads[AuthContext] =
((JsPath \ "userId").read[String] and
(JsPath \ "userName").read[String] and
(JsPath \ "organisation").read[String].map(EntityIdOrName.apply) and
(JsPath \ "requestId").read[String] and
(JsPath \ "permissions").read[Set[String]].map(Permission.apply))(AuthContextImpl.apply _)

implicit val authContextWrites: Writes[AuthContext] = Writes[AuthContext] { authContext =>
Json.obj(
"userId" -> authContext.userId,
"userName" -> authContext.userName,
"organisation" -> authContext.organisation.toString,
"requestId" -> authContext.requestId,
"permissions" -> authContext.permissions
)
}
implicit val format: OFormat[CheckJob] = Json.format[CheckJob]
}

class CortexSerializer extends Serializer {
import CortexSerializer._
override def identifier: Int = -414525848

override def includeManifest: Boolean = false

override def toBinary(o: AnyRef): Array[Byte] =
o match {
case CheckJobs => Array(0)
case FirstCheckJobs => Array(1)
case RemoteJob(job) => 2.toByte +: Json.toJson(job).toString.getBytes
case cj: CheckJob => 3.toByte +: Json.toJson(cj).toString().getBytes
case _ => throw new NotSerializableException
}

override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef =
bytes(0) match {
case 0 => CheckJobs
case 1 => FirstCheckJobs
case 2 => RemoteJob(Json.parse(bytes.tail).as[OutputJob])
case 3 => Json.parse(bytes.tail).as[CheckJob]
case _ => throw new NotSerializableException
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package org.thp.thehive.connector.misp.services

import akka.serialization.Serializer

import java.io.NotSerializableException

class MispSerializer extends Serializer {
override def identifier: Int = -222314660

Expand All @@ -12,12 +14,14 @@ class MispSerializer extends Serializer {
case Synchro => Array(0)
case EndOfSynchro(None) => Array(1)
case EndOfSynchro(Some(error)) => 2.toByte +: error.getBytes()
case _ => throw new NotSerializableException
}

override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef =
bytes(0) match {
case 0 => Synchro
case 1 => EndOfSynchro(None)
case 2 => EndOfSynchro(Some(new String(bytes.tail)))
case _ => throw new NotSerializableException
}
}
3 changes: 1 addition & 2 deletions thehive/app/org/thp/thehive/models/SchemaUpdaterActor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,12 @@ class DatabaseProvider @Inject() (
sealed trait SchemaUpdaterMessage
case class RequestDB(databaseInstanceId: String) extends SchemaUpdaterMessage
case object DBReady extends SchemaUpdaterMessage
case object Update extends SchemaUpdaterMessage

class SchemaUpdaterActor @Inject() (theHiveSchema: TheHiveSchemaDefinition, database: Database) extends Actor {

lazy val logger: Logger = Logger(getClass)

final case object Update

implicit val ec: ExecutionContext = context.dispatcher
var originalConnectionIds: Set[String] = Set.empty

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,15 @@ class SchemaUpdaterSerializer extends Serializer {
o match {
case RequestDB(instanceId) => 0.toByte +: instanceId.getBytes
case DBReady => Array(1)
case Update => Array(2)
case _ => throw new NotSerializableException
}

override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef =
bytes(0) match {
case 0 => RequestDB(new String(bytes.tail))
case 1 => DBReady
case 2 => Update
case _ => throw new NotSerializableException
}
}

0 comments on commit 1668df5

Please sign in to comment.