Skip to content

Commit

Permalink
Support rollback nodes in PostCommitValidation (#9501)
Browse files Browse the repository at this point in the history
* Support rollback nodes in PostCommitValidation

With that and the other in-flight PRs, #9400 now passes against Daml
on SQL.

changelog_begin
changelog_end

* Update ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/PostCommitValidation.scala

Co-authored-by: Stefano Baghino <[email protected]>

* Update ledger/participant-integration-api/src/main/scala/platform/store/appendonlydao/events/PostCommitValidation.scala

Co-authored-by: Stefano Baghino <[email protected]>

* Update ledger/participant-integration-api/src/main/scala/platform/store/dao/events/PostCommitValidation.scala

Co-authored-by: Stefano Baghino <[email protected]>

* Update ledger/participant-integration-api/src/main/scala/platform/store/dao/events/PostCommitValidation.scala

Co-authored-by: Stefano Baghino <[email protected]>

* Apply suggestions from code review

Co-authored-by: Miklos <[email protected]>

* fmt

changelog_begin
changelog_end

Co-authored-by: Stefano Baghino <[email protected]>
Co-authored-by: Miklos <[email protected]>
  • Loading branch information
3 people authored Apr 27, 2021
1 parent a335ee8 commit c567680
Show file tree
Hide file tree
Showing 3 changed files with 295 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -122,29 +122,23 @@ private[appendonlydao] object PostCommitValidation {
transaction: CommittedTransaction,
divulged: Set[ContractId],
): Set[ContractId] = {
val (createdInTransaction, referred) =
transaction.fold((Set.empty[ContractId], Set.empty[ContractId])) {
case ((created, ids), (_, c: Create)) =>
(created + c.coid, ids)
case ((created, ids), (_, e: Exercise)) if !divulged(e.targetCoid) =>
(created, ids + e.targetCoid)
case ((created, ids), (_, f: Fetch)) if !divulged(f.coid) =>
(created, ids + f.coid)
case ((created, ids), (_, l: LookupByKey)) =>
(created, l.result.filterNot(divulged).fold(ids)(ids + _))
case ((created, ids), _) => (created, ids)
}
referred.diff(createdInTransaction)
transaction.inputContracts.diff(divulged)
}

private def validateKeyUsages(
transaction: CommittedTransaction
)(implicit connection: Connection): Option[RejectionReason] =
transaction
.fold[Result](Right(State.empty(data))) {
case (Right(state), (_, node)) => validateKeyUsages(node, state)
case (rejection, _) => rejection
}
.foldInExecutionOrder[Result](Right(State.empty(data)))(
exerciseBegin = (acc, _, exe) => {
val newAcc = acc.flatMap(validateKeyUsages(exe, _))
(newAcc, true)
},
exerciseEnd = (acc, _, _) => acc,
rollbackBegin = (acc, _, _) => (acc.map(_.beginRollback()), true),
rollbackEnd = (acc, _, _) => acc.map(_.endRollback()),
leaf = (acc, _, leaf) => acc.flatMap(validateKeyUsages(leaf, _)),
)
.fold(Some(_), _ => None)

private def validateKeyUsages(
Expand All @@ -170,18 +164,49 @@ private[appendonlydao] object PostCommitValidation {

private type Result = Either[RejectionReason, State]

/** The active ledger key state during validation.
* After a rollback node, we restore the state at the
* beginning of the rollback.
*
* @param contracts Active contracts created in
* the current transaction that have a key indexed
* by a hash of their key.
* @param removed Hashes of contract keys that are known to
* to be archived. Note that a later create with the same
* key will remove the entry again.
*/
private final case class ActiveState(
contracts: Map[Hash, ContractId],
removed: Set[Hash],
) {
def add(key: Key, id: ContractId): ActiveState =
copy(
contracts = contracts.updated(key.hash, id),
removed = removed - key.hash,
)

def remove(key: Key): ActiveState =
copy(
contracts = contracts - key.hash,
removed = removed + key.hash,
)
}

/** Represents the state of an ongoing validation.
* It must be carried over as the transaction is
* validated one node at a time in pre-order
* traversal for this to make sense.
*
* @param contracts All contracts created as part of the current transaction
* @param removed Ensures indexed contracts are not referred to by key if they are removed in the current transaction
* @param data Data about committed contracts for post-commit validation purposes
* @param currentState The current active ledger state.
* @param rollbackStack Stack of states at the beginning of rollback nodes so we can
* restore the state at the end of the rollback. The most recent rollback
* comes first.
* @param data Data about committed contracts for post-commit validation purposes.
* This is never changed during the traversal of the transaction.
*/
private final case class State(
private val contracts: Map[Hash, ContractId],
private val removed: Set[Hash],
private val currentState: ActiveState,
private val rollbackStack: List[ActiveState],
private val data: PostCommitValidationData,
) {

Expand All @@ -204,29 +229,42 @@ private[appendonlydao] object PostCommitValidation {
else Left(MismatchingLookup(expectation, result))
}

private def add(key: Key, id: ContractId): State =
def beginRollback(): State =
copy(
contracts = contracts.updated(key.hash, id),
removed = removed - key.hash,
rollbackStack = currentState :: rollbackStack
)

def endRollback(): State = rollbackStack match {
case Nil =>
throw new IllegalStateException(
"Internal error: rollback ended but rollbackStack was empty"
)
case head :: tail =>
copy(
currentState = head,
rollbackStack = tail,
)
}

private def add(key: Key, id: ContractId): State =
copy(currentState = currentState.add(key, id))

private def remove(key: Key): State =
copy(
contracts = contracts - key.hash,
removed = removed + key.hash,
currentState = currentState.remove(key)
)

private def lookup(key: Key)(implicit connection: Connection): Option[ContractId] =
contracts.get(key.hash).orElse {
if (removed(key.hash)) None
currentState.contracts.get(key.hash).orElse {
if (currentState.removed(key.hash)) None
else data.lookupContractKeyGlobally(key)
}

}

private object State {
def empty(data: PostCommitValidationData): State =
State(Map.empty, Set.empty, data)
State(ActiveState(Map.empty, Set.empty), Nil, data)
}

private[events] val DuplicateKey: RejectionReason =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,29 +122,23 @@ private[dao] object PostCommitValidation {
transaction: CommittedTransaction,
divulged: Set[ContractId],
): Set[ContractId] = {
val (createdInTransaction, referred) =
transaction.fold((Set.empty[ContractId], Set.empty[ContractId])) {
case ((created, ids), (_, c: Create)) =>
(created + c.coid, ids)
case ((created, ids), (_, e: Exercise)) if !divulged(e.targetCoid) =>
(created, ids + e.targetCoid)
case ((created, ids), (_, f: Fetch)) if !divulged(f.coid) =>
(created, ids + f.coid)
case ((created, ids), (_, l: LookupByKey)) =>
(created, l.result.filterNot(divulged).fold(ids)(ids + _))
case ((created, ids), _) => (created, ids)
}
referred.diff(createdInTransaction)
transaction.inputContracts.diff(divulged)
}

private def validateKeyUsages(
transaction: CommittedTransaction
)(implicit connection: Connection): Option[RejectionReason] =
transaction
.fold[Result](Right(State.empty(data))) {
case (Right(state), (_, node)) => validateKeyUsages(node, state)
case (rejection, _) => rejection
}
.foldInExecutionOrder[Result](Right(State.empty(data)))(
exerciseBegin = (acc, _, exe) => {
val newAcc = acc.flatMap(validateKeyUsages(exe, _))
(newAcc, true)
},
exerciseEnd = (acc, _, _) => acc,
rollbackBegin = (acc, _, _) => (acc.map(_.beginRollback()), true),
rollbackEnd = (acc, _, _) => acc.map(_.endRollback()),
leaf = (acc, _, leaf) => acc.flatMap(validateKeyUsages(leaf, _)),
)
.fold(Some(_), _ => None)

private def validateKeyUsages(
Expand All @@ -170,18 +164,49 @@ private[dao] object PostCommitValidation {

private type Result = Either[RejectionReason, State]

/** The active ledger key state during validation.
* After a rollback node, we restore the state at the
* beginning of the rollback.
*
* @param contracts Active contracts created in
* the current transaction that have a key indexed
* by a hash of their key.
* @param removed Hashes of contract keys that are known to
* to be archived. Note that a later create with the same
* key will remove the entry again.
*/
private final case class ActiveState(
contracts: Map[Hash, ContractId],
removed: Set[Hash],
) {
def add(key: Key, id: ContractId): ActiveState =
copy(
contracts = contracts.updated(key.hash, id),
removed = removed - key.hash,
)

def remove(key: Key): ActiveState =
copy(
contracts = contracts - key.hash,
removed = removed + key.hash,
)
}

/** Represents the state of an ongoing validation.
* It must be carried over as the transaction is
* validated one node at a time in pre-order
* traversal for this to make sense.
*
* @param contracts All contracts created as part of the current transaction
* @param removed Ensures indexed contracts are not referred to by key if they are removed in the current transaction
* @param data Data about committed contracts for post-commit validation purposes
* @param currentState The current active ledger state.
* @param rollbackStack Stack of states at the beginning of rollback nodes so we can
* restore the state at the end of the rollback. The most recent rollback
* comes first.
* @param data Data about committed contracts for post-commit validation purposes.
* This is never changed durng the traversal of the transaction.
*/
private final case class State(
private val contracts: Map[Hash, ContractId],
private val removed: Set[Hash],
private val currentState: ActiveState,
private val rollbackStack: List[ActiveState],
private val data: PostCommitValidationData,
) {

Expand All @@ -204,29 +229,40 @@ private[dao] object PostCommitValidation {
else Left(MismatchingLookup(expectation, result))
}

private def add(key: Key, id: ContractId): State =
def beginRollback(): State =
copy(
contracts = contracts.updated(key.hash, id),
removed = removed - key.hash,
rollbackStack = currentState :: rollbackStack
)

def endRollback(): State = rollbackStack match {
case Nil =>
throw new IllegalStateException("Internal error: rollback end but rollbackStack was empty")
case head :: tail =>
copy(
currentState = head,
rollbackStack = tail,
)
}

private def add(key: Key, id: ContractId): State =
copy(currentState = currentState.add(key, id))

private def remove(key: Key): State =
copy(
contracts = contracts - key.hash,
removed = removed + key.hash,
currentState = currentState.remove(key)
)

private def lookup(key: Key)(implicit connection: Connection): Option[ContractId] =
contracts.get(key.hash).orElse {
if (removed(key.hash)) None
currentState.contracts.get(key.hash).orElse {
if (currentState.removed(key.hash)) None
else data.lookupContractKeyGlobally(key)
}

}

private object State {
def empty(data: PostCommitValidationData): State =
State(Map.empty, Set.empty, data)
State(ActiveState(Map.empty, Set.empty), Nil, data)
}

private[events] val DuplicateKey: RejectionReason =
Expand Down
Loading

0 comments on commit c567680

Please sign in to comment.