Skip to content

Commit

Permalink
Changes to ensure matchedQueries are returned correctly when queries …
Browse files Browse the repository at this point in the history
…contain a mix of offsets and no offsets.

CHANGELOG_BEGIN
CHANGELOG_END
  • Loading branch information
akshayshirahatti-da committed Oct 22, 2021
1 parent e6da1f7 commit efe1826
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,112 @@ abstract class AbstractWebsocketServiceIntegrationTest
).toJson
}

"query should return matchedQueries appropriately" in withHttpService { (uri, _, _, _) =>
import spray.json._

val (party, headers) = getUniquePartyAndAuthHeaders("Alice")
val initialCreate = initialIouCreate(uri, party, headers)

val query =
"""[
{"templateIds": ["Iou:Iou"], "query": {"currency": "USD"}}
]"""

@nowarn("msg=pattern var evtsWrapper .* is never used")
def resp(
iouCid: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[ShouldHaveEnded]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume
.interpret(
for {
ContractDelta(Vector((ctid, _)), Vector(), None) <- readOne
_ = (ctid: String) shouldBe (iouCid.unwrap: String)
_ <- liftF(
postJsonRequest(
uri.withPath(Uri.Path("/v1/exercise")),
exercisePayload(domain.ContractId(ctid)),
headers,
) map { case (statusCode, _) =>
statusCode.isSuccess shouldBe true
}
)

ContractDelta(Vector(), _, Some(offset)) <- readOne

(preOffset, consumedCtid) = (offset, ctid)
evtsWrapper @ ContractDelta(
Vector((fstId, fst), (sndId, snd)),
Vector(observeConsumed),
Some(lastSeenOffset),
) <- readOne
(liveStartOffset, msgCount) = {
observeConsumed.contractId should ===(consumedCtid)
Set(fstId, sndId, consumedCtid) should have size 3
inside(evtsWrapper) { case JsObject(obj) =>
inside(obj get "events") {
case Some(
JsArray(
Vector(
Archived(_, _),
Created(IouAmount(amt1), MatchedQueries(NumList(ixes1), _)),
Created(IouAmount(amt2), MatchedQueries(NumList(ixes2), _)),
)
)
) =>
Set((amt1, ixes1), (amt2, ixes2)) should ===(
Set(
(BigDecimal("42.42"), Vector(BigDecimal(0))),
(BigDecimal("957.57"), Vector(BigDecimal(0))),
)
)
}
}
(preOffset, 2)
}

_ = kill.shutdown()
heartbeats <- drain
hbCount = (heartbeats.iterator.map(heartbeatOffset).toSet + lastSeenOffset).size - 1
} yield
// don't count empty events block if lastSeenOffset does not change
ShouldHaveEnded(
liveStartOffset = liveStartOffset,
msgCount = msgCount + hbCount,
lastSeenOffset = lastSeenOffset,
)
)
}

def queryWithAndWithoutOffsets(offset: domain.Offset) =
s"""[
{"templateIds": ["Iou:Iou"], "query": {"currency": "USD"}, "offset": "${offset.toString}"},
{"templateIds": ["Iou:Iou"]}
]"""
for {
creation <- initialCreate
_ = creation._1 shouldBe a[StatusCodes.Success]
iouCid = getContractId(getResult(creation._2))
jwt = jwtForParties(List(party.unwrap), List(), testId)
(kill, source) = singleClientQueryStream(jwt, uri, query)
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
lastState <- source via parseResp runWith resp(iouCid, kill)
liveOffset = inside(lastState) { case ShouldHaveEnded(liveStart, 2, lastSeen) =>
lastSeen.unwrap should be > liveStart.unwrap
liveStart
}
clientMsg <- singleClientQueryStream(jwt, uri, queryWithAndWithoutOffsets(liveOffset))
.take(3)
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
} yield inside(clientMsg) { case _ +: result +: _ =>
//we should expect to have matchedQueries [1] to indicate a match for the new template query
result should include(s""""matchedQueries":[1]""")
}
}

"query should receive deltas as contracts are archived/created" in withHttpService {
(uri, _, _, _) =>
import spray.json._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -283,31 +283,31 @@ object WebSocketService {
}

def fn(
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), Int)]]
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), (Int, Int))]]
)(a: domain.ActiveContract[LfV], o: Option[domain.Offset]): Option[Positive] = {
q.get(a.templateId).flatMap { preds =>
preds.collect(Function unlift { case ((_, p), ix) =>
preds.collect(Function unlift { case ((_, p), (ix, pos)) =>
val matchesPredicate = p(a.payload)
(matchesPredicate && matchesOffset(ix, o)).option(ix)
(matchesPredicate && matchesOffset(ix, o)).option(pos)
})
}
}

def dbQueriesPlan(
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), Int)]]
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), (Int, Int))]]
)(implicit
sjd: dbbackend.SupportedJdbcDriver.TC
): (Seq[(domain.TemplateId.RequiredPkg, doobie.Fragment)], Map[Int, Int]) = {
val annotated = q.toSeq.flatMap { case (tpid, nel) =>
nel.toVector.map { case ((vp, _), pos) => (tpid, vp.toSqlWhereClause, pos) }
nel.toVector.map { case ((vp, _), (_, pos)) => (tpid, vp.toSqlWhereClause, pos) }
}
val posMap = annotated.iterator.zipWithIndex.map { case ((_, _, pos), ix) =>
(ix, pos)
}.toMap
(annotated map { case (tpid, sql, _) => (tpid, sql) }, posMap)
}

val query = (gacr: domain.SearchForeverQuery, ix: Int) =>
val query = (gacr: domain.SearchForeverQuery, ix: Int, pos: Int) =>
for {
res <-
gacr.templateIds.toList
Expand All @@ -322,10 +322,12 @@ object WebSocketService {
)
(resolved, unresolved) = res
q = prepareFilters(resolved, gacr.query, lookupType): CompiledQueries
} yield (resolved, unresolved, q transform ((_, p) => NonEmptyList((p, ix))))
} yield (resolved, unresolved, q transform ((_, p) => NonEmptyList((p, (ix, pos)))))
for {
res <-
request.queries.zipWithIndex
.zip(request.pos)
.map(x => (x._1._1, x._1._2, x._2))
.foldMapM(query.tupled)
(resolved, unresolved, q) = res
} yield StreamPredicate(
Expand Down Expand Up @@ -363,8 +365,12 @@ object WebSocketService {
request: SearchForeverRequest,
): Option[SearchForeverRequest] = {
import scalaz.std.list
val withoutOffset = request.queries.toList.filter(_.offset.isEmpty)
list.toNel(withoutOffset).map(SearchForeverRequest(_))
val (withoutOffset, ixes) =
request.queries.zip(request.pos).toList.filter { case (q, _) => q.offset.isEmpty }.unzip
for {
q <- list.toNel(withoutOffset)
ix <- list.toNel(ixes)
} yield SearchForeverRequest(q, ix)
}

override def adjustRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ object domain extends com.daml.fetchcontracts.domain.Aliases {
)

final case class SearchForeverRequest(
queries: NonEmptyList[SearchForeverQuery]
queries: NonEmptyList[SearchForeverQuery],
pos: NonEmptyList[Int],
)

final case class PartyDetails(identifier: Party, displayName: Option[String], isLocal: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,14 @@ object JsonProtocol extends DefaultJsonProtocol with ExtraFormats {

implicit val SearchForeverRequestFormat: RootJsonReader[domain.SearchForeverRequest] = {
case multi @ JsArray(_) =>
domain.SearchForeverRequest(multi.convertTo[NonEmptyList[domain.SearchForeverQuery]])
val (queries, pos) =
multi.convertTo[NonEmptyList[domain.SearchForeverQuery]].zipWithIndex.unzip
domain.SearchForeverRequest(queries, pos)
case single =>
domain.SearchForeverRequest(NonEmptyList(single.convertTo[domain.SearchForeverQuery]))
domain.SearchForeverRequest(
NonEmptyList(single.convertTo[domain.SearchForeverQuery]),
NonEmptyList(0),
)
}

implicit val CommandMetaFormat: RootJsonFormat[domain.CommandMeta] = jsonFormat1(
Expand Down

0 comments on commit efe1826

Please sign in to comment.