Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[JSON-API] Websockets fix for matchedQueries #11361

Merged
merged 2 commits into from
Oct 26, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,82 @@ abstract class AbstractWebsocketServiceIntegrationTest
).toJson
}

"matchedQueries should be correct for multiqueries with per-query offsets" in withHttpService {
(uri, _, _, _) =>
import spray.json._

val (party, headers) = getUniquePartyAndAuthHeaders("Alice")
val initialCreate = initialIouCreate(uri, party, headers)

//initial query without offset
val query =
"""[
{"templateIds": ["Iou:Iou"], "query": {"currency": "USD"}}
]"""

@nowarn("msg=pattern var evtsWrapper .* is never used")
def resp(
iouCid: domain.ContractId,
kill: UniqueKillSwitch,
): Sink[JsValue, Future[domain.Offset]] = {
val dslSyntax = Consume.syntax[JsValue]
import dslSyntax._
Consume
.interpret(
for {
evtsWrapper @ ContractDelta(Vector((ctid, _)), Vector(), None) <- readOne
_ = (ctid: String) shouldBe (iouCid.unwrap: String)

_ = {
akshayshirahatti-da marked this conversation as resolved.
Show resolved Hide resolved
inside(evtsWrapper) { case JsObject(obj) =>
inside(obj get "events") {
case Some(
JsArray(
Vector(
Created(IouAmount(amt), MatchedQueries(NumList(ix), _))
)
)
) =>
//matchedQuery should be 0 for the initial query supplied
Set((amt, ix)) should ===(Set((BigDecimal("999.99"), Vector(BigDecimal(0)))))
}
}
}
ContractDelta(Vector(), _, Some(offset)) <- readOne

_ = kill.shutdown()
_ <- drain

} yield offset
)
}

for {
creation <- initialCreate
_ = creation._1 shouldBe a[StatusCodes.Success]
iouCid = getContractId(getResult(creation._2))
jwt = jwtForParties(List(party.unwrap), List(), testId)
(kill, source) = singleClientQueryStream(jwt, uri, query)
.viaMat(KillSwitches.single)(Keep.right)
.preMaterialize()
lastSeen <- source via parseResp runWith resp(iouCid, kill)

//construct a new multiquery with one of them having an offset while the other doesn't
multiquery = s"""[
{"templateIds": ["Iou:Iou"], "query": {"currency": "USD"}, "offset": "${lastSeen.unwrap}"},
{"templateIds": ["Iou:Iou"]}
]"""

clientMsg <- singleClientQueryStream(jwt, uri, multiquery)
.take(1)
.runWith(collectResultsAsTextMessageSkipOffsetTicks)
} yield inside(clientMsg) { case Vector(result) =>
//we should expect to have matchedQueries [1] to indicate a match for the new template query only.
result should include(s"""$iouCid""")
result should include(""""matchedQueries":[1]""")
}
}

"query should receive deltas as contracts are archived/created" in withHttpService {
(uri, _, _, _) =>
import spray.json._
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ object WebSocketService {
import util.Collections._

val indexedOffsets: Vector[Option[domain.Offset]] =
request.queries.map(_.offset).toVector
request.queriesWithPos.map { case (q, _) => q.offset }.toVector

def matchesOffset(queryIndex: Int, maybeEventOffset: Option[domain.Offset]): Boolean = {
import domain.Offset.`Offset ordering`
Expand All @@ -283,31 +283,31 @@ object WebSocketService {
}

def fn(
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), Int)]]
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), (Int, Int))]]
)(a: domain.ActiveContract[LfV], o: Option[domain.Offset]): Option[Positive] = {
q.get(a.templateId).flatMap { preds =>
preds.collect(Function unlift { case ((_, p), ix) =>
preds.collect(Function unlift { case ((_, p), (ix, pos)) =>
val matchesPredicate = p(a.payload)
(matchesPredicate && matchesOffset(ix, o)).option(ix)
(matchesPredicate && matchesOffset(ix, o)).option(pos)
})
}
}

def dbQueriesPlan(
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), Int)]]
q: Map[RequiredPkg, NonEmptyList[((ValuePredicate, LfV => Boolean), (Int, Int))]]
)(implicit
sjd: dbbackend.SupportedJdbcDriver.TC
): (Seq[(domain.TemplateId.RequiredPkg, doobie.Fragment)], Map[Int, Int]) = {
val annotated = q.toSeq.flatMap { case (tpid, nel) =>
nel.toVector.map { case ((vp, _), pos) => (tpid, vp.toSqlWhereClause, pos) }
nel.toVector.map { case ((vp, _), (_, pos)) => (tpid, vp.toSqlWhereClause, pos) }
}
val posMap = annotated.iterator.zipWithIndex.map { case ((_, _, pos), ix) =>
(ix, pos)
}.toMap
(annotated map { case (tpid, sql, _) => (tpid, sql) }, posMap)
}

val query = (gacr: domain.SearchForeverQuery, ix: Int) =>
val query = (gacr: domain.SearchForeverQuery, pos: Int, ix: Int) =>
for {
res <-
gacr.templateIds.toList
Expand All @@ -322,10 +322,11 @@ object WebSocketService {
)
(resolved, unresolved) = res
q = prepareFilters(resolved, gacr.query, lookupType): CompiledQueries
} yield (resolved, unresolved, q transform ((_, p) => NonEmptyList((p, ix))))
} yield (resolved, unresolved, q transform ((_, p) => NonEmptyList((p, (ix, pos)))))
for {
res <-
request.queries.zipWithIndex
request.queriesWithPos.zipWithIndex //index is used to ensure matchesOffset works properly
.map { case ((q, pos), ix) => (q, pos, ix) }
.foldMapM(query.tupled)
(resolved, unresolved, q) = res
} yield StreamPredicate(
Expand Down Expand Up @@ -363,8 +364,8 @@ object WebSocketService {
request: SearchForeverRequest,
): Option[SearchForeverRequest] = {
import scalaz.std.list
val withoutOffset = request.queries.toList.filter(_.offset.isEmpty)
list.toNel(withoutOffset).map(SearchForeverRequest(_))
val withoutOffset = request.queriesWithPos.toList.filter { case (q, _) => q.offset.isEmpty }
list.toNel(withoutOffset).map(SearchForeverRequest)
}

override def adjustRequest(
Expand All @@ -373,9 +374,9 @@ object WebSocketService {
): SearchForeverRequest =
prefix.fold(request)(prefix =>
request.copy(
queries = request.queries.map(query =>
query.copy(offset = query.offset.orElse(Some(prefix.offset)))
)
queriesWithPos = request.queriesWithPos.map {
_ leftMap (q => q.copy(offset = q.offset.orElse(Some(prefix.offset))))
}
)
)

Expand All @@ -388,8 +389,8 @@ object WebSocketService {
prefix: Option[domain.StartingOffset],
request: SearchForeverRequest,
): Option[domain.StartingOffset] =
request.queries
.map(_.offset)
request.queriesWithPos
.map { case (q, _) => q.offset }
.minimumBy1(identity)
.map(domain.StartingOffset(_))

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ object domain extends com.daml.fetchcontracts.domain.Aliases {
)

final case class SearchForeverRequest(
queries: NonEmptyList[SearchForeverQuery]
queriesWithPos: NonEmptyList[(SearchForeverQuery, Int)]
)

final case class PartyDetails(identifier: Party, displayName: Option[String], isLocal: Boolean)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,9 +288,10 @@ object JsonProtocol extends DefaultJsonProtocol with ExtraFormats {

implicit val SearchForeverRequestFormat: RootJsonReader[domain.SearchForeverRequest] = {
case multi @ JsArray(_) =>
domain.SearchForeverRequest(multi.convertTo[NonEmptyList[domain.SearchForeverQuery]])
val queriesWithPos = multi.convertTo[NonEmptyList[domain.SearchForeverQuery]].zipWithIndex
domain.SearchForeverRequest(queriesWithPos)
case single =>
domain.SearchForeverRequest(NonEmptyList(single.convertTo[domain.SearchForeverQuery]))
domain.SearchForeverRequest(NonEmptyList((single.convertTo[domain.SearchForeverQuery], 0)))
}

implicit val CommandMetaFormat: RootJsonFormat[domain.CommandMeta] = jsonFormat1(
Expand Down