Skip to content

Commit

Permalink
Refactor CompositeIndexingSuite so that it can run for both Single an…
Browse files Browse the repository at this point in the history
…d Batch sink
  • Loading branch information
olivergrabinski committed Aug 3, 2023
1 parent 75f28b1 commit 3053b0b
Showing 1 changed file with 116 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import ch.epfl.bluebrain.nexus.delta.kernel.search.Pagination.FromPagination
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.BlazegraphClientSetup
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.BlazegraphClient
import ch.epfl.bluebrain.nexus.delta.plugins.blazegraph.client.SparqlQueryResponseType.SparqlNTriples
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeIndexingSuite.{batchConfig, Album, Band, Music}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.config.CompositeViewsConfig.SinkConfig.SinkConfig
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeIndexingSuite._
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.CompositeViewDef.ActiveViewDef
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.indexing.Queries.{batchQuery, singleQuery}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewProjection.{ElasticSearchProjection, SparqlProjection}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.CompositeViewSource.{CrossProjectSource, ProjectSource, RemoteProjectSource}
import ch.epfl.bluebrain.nexus.delta.plugins.compositeviews.model.{permissions, CompositeView, CompositeViewSource, CompositeViewValue}
Expand All @@ -31,6 +34,7 @@ import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.JsonLdContext.keywords
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.context.{ContextValue, RemoteContextResolution}
import ch.epfl.bluebrain.nexus.delta.rdf.jsonld.encoder.JsonLdEncoder
import ch.epfl.bluebrain.nexus.delta.rdf.query.SparqlQuery.SparqlConstructQuery
import ch.epfl.bluebrain.nexus.delta.rdf.syntax.{iriStringContextSyntax, jsonOpsSyntax}
import ch.epfl.bluebrain.nexus.delta.sdk.implicits._
import ch.epfl.bluebrain.nexus.delta.sdk.model.BaseUri
import ch.epfl.bluebrain.nexus.delta.sdk.model.search.{Sort, SortList}
Expand All @@ -46,37 +50,78 @@ import ch.epfl.bluebrain.nexus.delta.sourcing.state.GraphResource
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.Elem.SuccessElem
import ch.epfl.bluebrain.nexus.delta.sourcing.stream._
import ch.epfl.bluebrain.nexus.delta.sourcing.stream.pipes.{DiscardMetadata, FilterByType, FilterDeprecated}
import ch.epfl.bluebrain.nexus.testkit.TestHelpers
import ch.epfl.bluebrain.nexus.testkit.bio.ResourceFixture.TaskFixture
import ch.epfl.bluebrain.nexus.testkit.bio._
import ch.epfl.bluebrain.nexus.testkit.{IOFixedClock, TestHelpers}
import fs2.Stream
import io.circe.generic.extras.Configuration
import io.circe.generic.extras.semiauto.deriveConfiguredEncoder
import io.circe.generic.semiauto.deriveEncoder
import io.circe.syntax._
import io.circe.{Encoder, Json}
import monix.bio.{Task, UIO}
import monix.execution.Scheduler
import munit.AnyFixture

import java.time.Instant
import java.util.UUID
import scala.concurrent.duration._
import scala.concurrent.duration.DurationInt

class CompositeIndexingSuite
extends BioSuite
with CompositeIndexingSuite.Fixture
class SingleCompositeIndexingSuite extends CompositeIndexingSuite(SinkConfig.Single, singleQuery)
class BatchCompositeIndexingSuite extends CompositeIndexingSuite(SinkConfig.Batch, batchQuery)

trait CompositeIndexingFixture extends BioSuite {

implicit private val baseUri: BaseUri = BaseUri("http://localhost", Label.unsafe("v1"))
implicit private val rcr: RemoteContextResolution = RemoteContextResolution.never

private val queryConfig = QueryConfig(10, RefreshStrategy.Delay(10.millis))
val batchConfig = BatchConfig(2, 50.millis)
private val compositeConfig =
CompositeViewsFixture.config.copy(
blazegraphBatch = batchConfig,
elasticsearchBatch = batchConfig
)

type Result = (ElasticSearchClient, BlazegraphClient, CompositeProjections, CompositeSpaces.Builder)

private def resource(sinkConfig: SinkConfig): Resource[Task, Result] = {
(Doobie.resource(), ElasticSearchClientSetup.resource(), BlazegraphClientSetup.resource()).parMapN {
case (xas, esClient, bgClient) =>
val compositeRestartStore = new CompositeRestartStore(xas)
val projections =
CompositeProjections(compositeRestartStore, xas, queryConfig, batchConfig, 3.seconds)
val spacesBuilder =
CompositeSpaces.Builder("delta", esClient, bgClient, compositeConfig.copy(sinkConfig = sinkConfig))(
baseUri,
rcr
)
(esClient, bgClient, projections, spacesBuilder)
}
}

def suiteLocalFixture(name: String, sinkConfig: SinkConfig): TaskFixture[Result] =
ResourceFixture.suiteLocal(name, resource(sinkConfig))

def compositeIndexing(sinkConfig: SinkConfig): ResourceFixture.TaskFixture[Result] =
suiteLocalFixture("compositeIndexing", sinkConfig)

}

abstract class CompositeIndexingSuite(sinkConfig: SinkConfig, query: SparqlConstructQuery)
extends CompositeIndexingFixture
with TestHelpers
with Fixtures
with JsonAssertions
with TextAssertions {

override def munitFixtures: Seq[AnyFixture[_]] = List(compositeIndexing)
private val fixture = compositeIndexing(sinkConfig)

override def munitFixtures: Seq[AnyFixture[_]] = List(fixture)

implicit private val patienceConfig: PatienceConfig = PatienceConfig(10.seconds, 100.millis)

private val prefix = "delta"
private lazy val (esClient, bgClient, projections, spacesBuilder) = compositeIndexing()
private lazy val (esClient, bgClient, projections, spacesBuilder) = fixture()

// Data to index
private val museId = iri"http://music.com/muse"
Expand All @@ -95,30 +140,6 @@ class CompositeIndexingSuite
private val theGatewayId = iri"http://music.com/the_getaway"
private val theGateway = Album(theGatewayId, "The Getaway", redHotId)

private val query = SparqlConstructQuery.unsafe(
"""
|prefix music: <http://music.com/>
|CONSTRUCT {
| ?alias music:name ?bandName ;
| music:genre ?bandGenre ;
| music:start ?bandStartYear ;
| music:album ?albumId .
| ?albumId music:title ?albumTitle .
|} WHERE {
| VALUES ?id { {resource_id} } .
| BIND( IRI(concat(str(?id), '/', 'alias')) AS ?alias ) .
|
| ?id music:name ?bandName ;
| music:start ?bandStartYear;
| music:genre ?bandGenre .
| OPTIONAL {
| ?id ^music:by ?albumId .
| ?albumId music:title ?albumTitle .
| }
|}
|""".stripMargin
)

private val project1 = ProjectRef.unsafe("org", "proj")
private val project2 = ProjectRef.unsafe("org", "proj2")
private val project3 = ProjectRef.unsafe("org", "proj3")
Expand Down Expand Up @@ -183,7 +204,8 @@ class CompositeIndexingSuite
private val mainCompleted = Ref.unsafe[Task, Map[ProjectRef, Int]](Map.empty)
private val rebuildCompleted = Ref.unsafe[Task, Map[ProjectRef, Int]](Map.empty)

private def resetCompleted = mainCompleted.set(Map.empty) >> rebuildCompleted.set(Map.empty)
private def resetCompleted = mainCompleted.set(Map.empty) >> rebuildCompleted.set(Map.empty)

private def increment(map: Ref[Task, Map[ProjectRef, Int]], project: ProjectRef) =
map.update(_.updatedWith(project)(_.map(_ + 1).orElse(Some(1))))

Expand Down Expand Up @@ -532,43 +554,17 @@ class CompositeIndexingSuite
} yield ()
}

private val checkQuery = SparqlConstructQuery.unsafe("CONSTRUCT {?s ?p ?o} WHERE {?s ?p ?o} ORDER BY ?s")
private val checkQuery = SparqlConstructQuery.unsafe("CONSTRUCT {?s ?p ?o} WHERE {?s ?p ?o} ORDER BY ?s")

private def checkBlazegraphTriples(namespace: String, expected: String) =
bgClient
.query(Set(namespace), checkQuery, SparqlNTriples)
.map(_.value.toString)
.map(_.equalLinesUnordered(expected))
}

object CompositeIndexingSuite extends IOFixedClock {

implicit private val baseUri: BaseUri = BaseUri("http://localhost", Label.unsafe("v1"))
implicit private val rcr: RemoteContextResolution = RemoteContextResolution.never

private val queryConfig = QueryConfig(10, RefreshStrategy.Delay(10.millis))
private val batchConfig = BatchConfig(2, 50.millis)
private val compositeConfig =
CompositeViewsFixture.config.copy(blazegraphBatch = batchConfig, elasticsearchBatch = batchConfig)

type Result = (ElasticSearchClient, BlazegraphClient, CompositeProjections, CompositeSpaces.Builder)

private def resource()(implicit s: Scheduler, cl: ClassLoader): Resource[Task, Result] = {
(Doobie.resource(), ElasticSearchClientSetup.resource(), BlazegraphClientSetup.resource()).parMapN {
case (xas, esClient, bgClient) =>
val compositeRestartStore = new CompositeRestartStore(xas)
val projections =
CompositeProjections(compositeRestartStore, xas, queryConfig, batchConfig, 3.seconds)
val spacesBuilder = CompositeSpaces.Builder("delta", esClient, bgClient, compositeConfig)(baseUri, rcr)
(esClient, bgClient, projections, spacesBuilder)
}
}

def suiteLocalFixture(name: String)(implicit s: Scheduler, cl: ClassLoader): TaskFixture[Result] =
ResourceFixture.suiteLocal(name, resource())
}

trait Fixture { self: BioSuite =>
val compositeIndexing: ResourceFixture.TaskFixture[Result] = suiteLocalFixture("compositeIndexing")
}
object CompositeIndexingSuite {

private val ctxIri = ContextValue(iri"http://music.com/context")

Expand All @@ -581,24 +577,29 @@ object CompositeIndexingSuite extends IOFixedClock {

sealed trait Music extends Product with Serializable {
def id: Iri

def tpe: Iri

def label: String
}

final case class Band(id: Iri, name: String, start: Int, genre: Set[String]) extends Music {
override val tpe: Iri = iri"http://music.com/Band"
override val label: String = name
}

object Band {
implicit val bandEncoder: Encoder.AsObject[Band] =
deriveConfiguredEncoder[Band].mapJsonObject(_.add("@type", "Band".asJson))
implicit val bandJsonLdEncoder: JsonLdEncoder[Band] =
JsonLdEncoder.computeFromCirce((b: Band) => b.id, ctxIri)
}
final case class Album(id: Iri, title: String, by: Iri) extends Music {

final case class Album(id: Iri, title: String, by: Iri) extends Music {
override val tpe: Iri = iri"http://music.com/Album"
override val label: String = title
}

object Album {
implicit val albumEncoder: Encoder.AsObject[Album] =
deriveConfiguredEncoder[Album].mapJsonObject(_.add("@type", "Album".asJson))
Expand All @@ -607,10 +608,60 @@ object CompositeIndexingSuite extends IOFixedClock {
}

final case class Metadata(uuid: UUID)

object Metadata {
implicit private val encoderMetadata: Encoder.AsObject[Metadata] = deriveEncoder
implicit val jsonLdEncoderMetadata: JsonLdEncoder[Metadata] = JsonLdEncoder.computeFromCirce(ctxIri)

}

}

object Queries {
val batchQuery: SparqlConstructQuery = SparqlConstructQuery.unsafe(
"""
|prefix music: <http://music.com/>
|CONSTRUCT {
| ?alias music:name ?bandName ;
| music:genre ?bandGenre ;
| music:start ?bandStartYear ;
| music:album ?albumId .
| ?albumId music:title ?albumTitle .
|} WHERE {
| VALUES ?id { {resource_id} } .
| BIND( IRI(concat(str(?id), '/', 'alias')) AS ?alias ) .
|
| ?id music:name ?bandName ;
| music:start ?bandStartYear;
| music:genre ?bandGenre .
| OPTIONAL {
| ?id ^music:by ?albumId .
| ?albumId music:title ?albumTitle .
| }
|}
|""".stripMargin
)

val singleQuery: SparqlConstructQuery = SparqlConstructQuery.unsafe(
"""
|prefix music: <http://music.com/>
|CONSTRUCT {
| ?id music:name ?bandName ;
| music:genre ?bandGenre ;
| music:start ?bandStartYear ;
| music:album ?albumId .
| ?albumId music:title ?albumTitle .
|} WHERE {
| BIND( {resource_id} AS ?id ) .
|
| ?id music:name ?bandName ;
| music:start ?bandStartYear;
| music:genre ?bandGenre .
| OPTIONAL {
| ?id ^music:by ?albumId .
| ?albumId music:title ?albumTitle .
| }
|}
|""".stripMargin
)
}

0 comments on commit 3053b0b

Please sign in to comment.