Skip to content

Commit

Permalink
Merge branch 'main' into noto-atom
Browse files Browse the repository at this point in the history
  • Loading branch information
awrichar authored Dec 6, 2024
2 parents f75a345 + 203a166 commit 31af7f5
Show file tree
Hide file tree
Showing 44 changed files with 711 additions and 139 deletions.
6 changes: 3 additions & 3 deletions core/go/internal/components/txmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,11 +97,11 @@ type TXManager interface {
GetPreparedTransactionByID(ctx context.Context, dbTX *gorm.DB, id uuid.UUID) (*pldapi.PreparedTransaction, error)
QueryPreparedTransactions(ctx context.Context, dbTX *gorm.DB, jq *query.QueryJSON) ([]*pldapi.PreparedTransaction, error)
CallTransaction(ctx context.Context, result any, tx *pldapi.TransactionCall) (err error)
UpsertABI(ctx context.Context, dbTX *gorm.DB, a abi.ABI) (*pldapi.StoredABI, error)
UpsertABI(ctx context.Context, dbTX *gorm.DB, a abi.ABI) (func(), *pldapi.StoredABI, error)

// These functions for use of the private TX manager for chaining private transactions.

PrepareInternalPrivateTransaction(ctx context.Context, dbTX *gorm.DB, tx *pldapi.TransactionInput, submitMode pldapi.SubmitMode) (*ValidatedTransaction, error)
PrepareInternalPrivateTransaction(ctx context.Context, dbTX *gorm.DB, tx *pldapi.TransactionInput, submitMode pldapi.SubmitMode) (func(), *ValidatedTransaction, error)
UpsertInternalPrivateTxsFinalizeIDs(ctx context.Context, dbTX *gorm.DB, txis []*ValidatedTransaction) error
WritePreparedTransactions(ctx context.Context, dbTX *gorm.DB, prepared []*PrepareTransactionWithRefs) (err error)
WritePreparedTransactions(ctx context.Context, dbTX *gorm.DB, prepared []*PrepareTransactionWithRefs) (postCommit func(), err error)
}
4 changes: 3 additions & 1 deletion core/go/internal/domainmgr/domain.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,9 +154,11 @@ func (d *domain) processDomainConfig(confRes *prototk.ConfigureDomainResponse) (
}
stream.Sources = append(stream.Sources, blockindexer.EventStreamSource{ABI: eventsABI})

if _, err := d.dm.txManager.UpsertABI(d.ctx, d.dm.persistence.DB(), eventsABI); err != nil {
postCommit, _, err := d.dm.txManager.UpsertABI(d.ctx, d.dm.persistence.DB(), eventsABI)
if err != nil {
return nil, err
}
postCommit() // we didn't actually use a coordinated TX to call immediately
}

// We build a stream name in a way assured to result in a new stream if the ABI changes
Expand Down
4 changes: 2 additions & 2 deletions core/go/internal/domainmgr/domain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ func TestDomainInitStates(t *testing.T) {

}
func mockUpsertABIOk(mc *mockComponents) {
mc.txManager.On("UpsertABI", mock.Anything, mock.Anything, mock.Anything).Return(&pldapi.StoredABI{
mc.txManager.On("UpsertABI", mock.Anything, mock.Anything, mock.Anything).Return(func() {}, &pldapi.StoredABI{
Hash: tktypes.Bytes32(tktypes.RandBytes(32)),
}, nil)
}
Expand Down Expand Up @@ -380,7 +380,7 @@ func TestDomainInitUpsertEventsABIFail(t *testing.T) {
}
]`,
}, func(mc *mockComponents) {
mc.txManager.On("UpsertABI", mock.Anything, mock.Anything, mock.Anything).Return(nil, fmt.Errorf("pop"))
mc.txManager.On("UpsertABI", mock.Anything, mock.Anything, mock.Anything).Return(nil, nil, fmt.Errorf("pop"))
})
defer done()
assert.Regexp(t, "pop", *td.d.initError.Load())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,17 @@ func (rsw *receivedPreparedTransactionWriter) runBatch(ctx context.Context, dbTX

preparedTransactions[i] = receivedPreparedTransactionWriteOperation.PreparedTransaction
}
if err := rsw.txMgr.WritePreparedTransactions(ctx, dbTX, preparedTransactions); err != nil {
postCommit, err := rsw.txMgr.WritePreparedTransactions(ctx, dbTX, preparedTransactions)
if err != nil {
log.L(ctx).Errorf("Error persisting prepared transactions: %s", err)
return nil, nil, err
}
// We don't actually provide any result, so just build an array of nil results
return nil, make([]flushwriter.Result[*receivedPreparedTransactionWriterNoResult], len(values)), nil
return func(err error) {
if err == nil {
postCommit()
}
}, make([]flushwriter.Result[*receivedPreparedTransactionWriterNoResult], len(values)), nil

}

Expand Down
3 changes: 2 additions & 1 deletion core/go/internal/privatetxnmgr/sequencer_dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,12 +78,13 @@ func (s *Sequencer) DispatchTransactions(ctx context.Context, dispatchableTransa
})
case preparedTransaction.Inputs.Intent == prototk.TransactionSpecification_SEND_TRANSACTION && hasPrivateTransaction && !hasPublicTransaction:
log.L(ctx).Infof("Result of transaction %s is a chained private transaction", preparedTransaction.ID)
validatedPrivateTx, err := s.components.TxManager().PrepareInternalPrivateTransaction(ctx, s.components.Persistence().DB(), preparedTransaction.PreparedPrivateTransaction, pldapi.SubmitModeAuto)
preparePostCommit, validatedPrivateTx, err := s.components.TxManager().PrepareInternalPrivateTransaction(ctx, s.components.Persistence().DB(), preparedTransaction.PreparedPrivateTransaction, pldapi.SubmitModeAuto)
if err != nil {
log.L(ctx).Errorf("Error preparing transaction %s: %s", preparedTransaction.ID, err)
// TODO: this is just an error situation for one transaction - this function is a batch function
return err
}
preparePostCommit() // we didn't use a coordinated TX to call immediately
dispatchBatch.PrivateDispatches = append(dispatchBatch.PrivateDispatches, validatedPrivateTx)
case preparedTransaction.Inputs.Intent == prototk.TransactionSpecification_PREPARE_TRANSACTION && (hasPublicTransaction || hasPrivateTransaction):
log.L(ctx).Infof("Result of transaction %s is a prepared transaction public=%t private=%t", preparedTransaction.ID, hasPublicTransaction, hasPrivateTransaction)
Expand Down
10 changes: 6 additions & 4 deletions core/go/internal/privatetxnmgr/syncpoints/dispatch.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func (s *syncPoints) PersistDeployDispatchBatch(ctx context.Context, dispatchBat
return err
}

func (s *syncPoints) writeDispatchOperations(ctx context.Context, dbTX *gorm.DB, dispatchOperations []*dispatchOperation) (pubTXCbs []func(), err error) {
func (s *syncPoints) writeDispatchOperations(ctx context.Context, dbTX *gorm.DB, dispatchOperations []*dispatchOperation) (postCommits []func(), err error) {

// For each operation in the batch, we need to call the baseledger transaction manager to allocate its nonce
// which it can only guaranteed to be gapless and unique if it is done during the database transaction that inserts the dispatch record.
Expand All @@ -137,7 +137,7 @@ func (s *syncPoints) writeDispatchOperations(ctx context.Context, dbTX *gorm.DB,
log.L(ctx).Errorf("Error submitting public transactions: %s", err)
return nil, err
}
pubTXCbs = append(pubTXCbs, pubTXCb)
postCommits = append(postCommits, pubTXCb)

//TODO this results in an `INSERT` for each dispatchSequence
//Would it be more efficient to pass an array for the whole flush?
Expand Down Expand Up @@ -183,10 +183,12 @@ func (s *syncPoints) writeDispatchOperations(ctx context.Context, dbTX *gorm.DB,
if len(op.preparedTransactions) > 0 {
log.L(ctx).Debugf("Writing prepared transactions locally %d", len(op.preparedTransactions))

if err := s.txMgr.WritePreparedTransactions(ctx, dbTX, op.preparedTransactions); err != nil {
txPostCommit, err := s.txMgr.WritePreparedTransactions(ctx, dbTX, op.preparedTransactions)
if err != nil {
log.L(ctx).Errorf("Error persisting prepared transactions: %s", err)
return nil, err
}
postCommits = append(postCommits, txPostCommit)
}

if len(op.preparedTxnDistributions) == 0 {
Expand Down Expand Up @@ -235,5 +237,5 @@ func (s *syncPoints) writeDispatchOperations(ctx context.Context, dbTX *gorm.DB,
}

}
return pubTXCbs, nil
return postCommits, nil
}
5 changes: 3 additions & 2 deletions core/go/internal/txmgr/block_indexing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,9 @@ func TestPublicConfirmWithErrorDecodeRealDB(t *testing.T) {
})
defer done()

abiRef, err := txm.storeABI(ctx, txm.p.DB(), testABI)
postCommit, abiRef, err := txm.storeABI(ctx, txm.p.DB(), testABI)
require.NoError(t, err)
postCommit()

txID, err = txm.SendTransaction(ctx, &pldapi.TransactionInput{
TransactionBase: pldapi.TransactionBase{
Expand All @@ -111,7 +112,7 @@ func TestPublicConfirmWithErrorDecodeRealDB(t *testing.T) {
})
require.NoError(t, err)

postCommit, err := txm.blockIndexerPreCommit(ctx, txm.p.DB(), []*pldapi.IndexedBlock{},
postCommit, err = txm.blockIndexerPreCommit(ctx, txm.p.DB(), []*pldapi.IndexedBlock{},
[]*blockindexer.IndexedTransactionNotify{txi})
require.NoError(t, err)
postCommit()
Expand Down
23 changes: 12 additions & 11 deletions core/go/internal/txmgr/persisted_abi.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,25 @@ func (tm *txManager) getABIByHash(ctx context.Context, dbTX *gorm.DB, hash tktyp
return pa, nil
}

func (tm *txManager) storeABI(ctx context.Context, dbTX *gorm.DB, a abi.ABI) (*tktypes.Bytes32, error) {
pa, err := tm.UpsertABI(ctx, dbTX, a)
func (tm *txManager) storeABI(ctx context.Context, dbTX *gorm.DB, a abi.ABI) (func(), *tktypes.Bytes32, error) {
postCommit, pa, err := tm.UpsertABI(ctx, dbTX, a)
if err != nil {
return nil, err
return nil, nil, err
}
return &pa.Hash, err
return postCommit, &pa.Hash, err
}

func (tm *txManager) UpsertABI(ctx context.Context, dbTX *gorm.DB, a abi.ABI) (*pldapi.StoredABI, error) {
func (tm *txManager) UpsertABI(ctx context.Context, dbTX *gorm.DB, a abi.ABI) (func(), *pldapi.StoredABI, error) {
hash, err := tktypes.ABISolDefinitionHash(ctx, a)
if err != nil {
return nil, i18n.WrapError(ctx, err, msgs.MsgTxMgrInvalidABI)
return nil, nil, i18n.WrapError(ctx, err, msgs.MsgTxMgrInvalidABI)
}

// If cached, nothing to do (note must not cache until written for this to be true)
pa, existing := tm.abiCache.Get(*hash)
if existing {
log.L(ctx).Debugf("ABI %s already cached", hash)
return pa, nil
return func() {}, pa, nil
}

// Grab all the error definitions for reverse lookup
Expand Down Expand Up @@ -135,12 +135,13 @@ func (tm *txManager) UpsertABI(ctx context.Context, dbTX *gorm.DB, a abi.ABI) (*
Error
}
if err != nil {
return nil, err
return nil, nil, err
}
// Now we can cache it
pa = &pldapi.StoredABI{Hash: *hash, ABI: a}
tm.abiCache.Set(*hash, pa)
return pa, err
return func() {
// Caching must only be done post-commit of the DB transaction
tm.abiCache.Set(*hash, pa)
}, pa, err
}

func (tm *txManager) queryABIs(ctx context.Context, jq *query.QueryJSON) ([]*pldapi.StoredABI, error) {
Expand Down
4 changes: 2 additions & 2 deletions core/go/internal/txmgr/persisted_abi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestUpsertABIBadData(t *testing.T) {
ctx, txm, done := newTestTransactionManager(t, false)
defer done()

_, err := txm.UpsertABI(ctx, txm.p.DB(), abi.ABI{{Inputs: abi.ParameterArray{{Type: "wrong"}}}})
_, _, err := txm.UpsertABI(ctx, txm.p.DB(), abi.ABI{{Inputs: abi.ParameterArray{{Type: "wrong"}}}})
assert.Regexp(t, "PD012201", err)

}
Expand All @@ -95,7 +95,7 @@ func TestUpsertABIFail(t *testing.T) {
})
defer done()

_, err := txm.storeABI(ctx, txm.p.DB(), abi.ABI{{Type: abi.Function, Name: "get"}})
_, _, err := txm.storeABI(ctx, txm.p.DB(), abi.ABI{{Type: abi.Function, Name: "get"}})
assert.Regexp(t, "pop", err)

}
8 changes: 5 additions & 3 deletions core/go/internal/txmgr/persisted_receipt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ func TestFinalizeTransactionsInsertOkOffChain(t *testing.T) {
},
ABI: exampleABI,
})
assert.NoError(t, err)
require.NoError(t, err)

err = txm.p.DB().Transaction(func(tx *gorm.DB) error {
return txm.FinalizeTransactions(ctx, tx, []*components.ReceiptInput{
Expand Down Expand Up @@ -355,8 +355,9 @@ func TestDecodeCall(t *testing.T) {
ctx, txm, done := newTestTransactionManager(t, true)
defer done()

_, err := txm.storeABI(ctx, txm.p.DB(), sampleABI)
postCommit, _, err := txm.storeABI(ctx, txm.p.DB(), sampleABI)
require.NoError(t, err)
postCommit()

validCall, err := sampleABI.Functions()["set"].EncodeCallDataJSON([]byte(`[12345]`))
require.NoError(t, err)
Expand Down Expand Up @@ -390,8 +391,9 @@ func TestDecodeEvent(t *testing.T) {
ctx, txm, done := newTestTransactionManager(t, true)
defer done()

_, err := txm.storeABI(ctx, txm.p.DB(), sampleABI)
postCommit, _, err := txm.storeABI(ctx, txm.p.DB(), sampleABI)
require.NoError(t, err)
postCommit()

validTopic0 := tktypes.Bytes32(sampleABI.Events()["Updated"].SignatureHashBytes())
validTopic1, err := (&abi.ParameterArray{{Type: "uint256"}}).EncodeABIDataJSON([]byte(`["12345"]`))
Expand Down
14 changes: 10 additions & 4 deletions core/go/internal/txmgr/prepared_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,11 @@ var preparedTransactionFilters = filters.FieldMap{
"created": filters.TimestampField("created"),
}

func (tm *txManager) WritePreparedTransactions(ctx context.Context, dbTX *gorm.DB, prepared []*components.PrepareTransactionWithRefs) (err error) {
func (tm *txManager) WritePreparedTransactions(ctx context.Context, dbTX *gorm.DB, prepared []*components.PrepareTransactionWithRefs) (postCommit func(), err error) {

var preparedTxInserts []*preparedTransaction
var preparedTxStateInserts []*preparedTransactionState
var postCommits []func()
for _, p := range prepared {
dbPreparedTx := &preparedTransaction{
ID: p.ID,
Expand All @@ -83,16 +84,17 @@ func (tm *txManager) WritePreparedTransactions(ctx context.Context, dbTX *gorm.D
Metadata: p.Metadata,
}
// We do the work for the ABI validation etc. before we insert the TX
resolved, err := tm.resolveNewTransaction(ctx, dbTX, p.Transaction, pldapi.SubmitModePrepare)
txPostCommit, resolved, err := tm.resolveNewTransaction(ctx, dbTX, p.Transaction, pldapi.SubmitModePrepare)
if err == nil {
p.Transaction.ABI = nil // move to the reference
p.Transaction.ABIReference = resolved.Function.ABIReference
p.Transaction.Function = resolved.Function.Definition.String()
dbPreparedTx.Transaction, err = json.Marshal(p.Transaction)
}
if err != nil {
return err
return nil, err
}
postCommits = append(postCommits, txPostCommit)
preparedTxInserts = append(preparedTxInserts, dbPreparedTx)
for i, stateID := range p.States.Spent {
preparedTxStateInserts = append(preparedTxStateInserts, &preparedTransactionState{
Expand Down Expand Up @@ -149,7 +151,11 @@ func (tm *txManager) WritePreparedTransactions(ctx context.Context, dbTX *gorm.D
Error
}

return err
return func() {
for _, pc := range postCommits {
pc()
}
}, err

}

Expand Down
11 changes: 7 additions & 4 deletions core/go/internal/txmgr/prepared_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func TestPreparedTransactionRealDB(t *testing.T) {
testSchemaID := schemas[0].ID()

// Create the parent TX
parentTx, err := txm.resolveNewTransaction(ctx, txm.p.DB(), &pldapi.TransactionInput{
postCommit, parentTx, err := txm.resolveNewTransaction(ctx, txm.p.DB(), &pldapi.TransactionInput{
TransactionBase: pldapi.TransactionBase{
From: "me",
IdempotencyKey: "parent_txn",
Expand All @@ -114,6 +114,7 @@ func TestPreparedTransactionRealDB(t *testing.T) {
ABI: abi.ABI{{Type: abi.Function, Name: "doThing1"}},
}, pldapi.SubmitModeAuto)
require.NoError(t, err)
postCommit()
_, err = txm.insertTransactions(ctx, txm.p.DB(), []*components.ValidatedTransaction{parentTx}, false)
require.NoError(t, err)

Expand Down Expand Up @@ -149,12 +150,14 @@ func TestPreparedTransactionRealDB(t *testing.T) {
Metadata: tktypes.RawJSON(`{"some":"data"}`),
}

storedABI, err := txm.UpsertABI(ctx, txm.p.DB(), childFnABI)
postCommit, storedABI, err := txm.UpsertABI(ctx, txm.p.DB(), childFnABI)
require.NoError(t, err)
postCommit()

// Write the prepared TX it results in
err = txm.WritePreparedTransactions(ctx, txm.p.DB(), []*components.PrepareTransactionWithRefs{ptInsert})
postCommit, err = txm.WritePreparedTransactions(ctx, txm.p.DB(), []*components.PrepareTransactionWithRefs{ptInsert})
require.NoError(t, err)
postCommit()

// Query it back
pt, err := txm.GetPreparedTransactionByID(ctx, txm.p.DB(), *parentTx.Transaction.ID)
Expand Down Expand Up @@ -190,7 +193,7 @@ func TestWritePreparedTransactionsBadTX(t *testing.T) {
ctx, txm, done := newTestTransactionManager(t, false)
defer done()

err := txm.WritePreparedTransactions(ctx, txm.p.DB(), []*components.PrepareTransactionWithRefs{{
_, err := txm.WritePreparedTransactions(ctx, txm.p.DB(), []*components.PrepareTransactionWithRefs{{
Transaction: &pldapi.TransactionInput{},
}})
assert.Regexp(t, "PD012211", err)
Expand Down
6 changes: 5 additions & 1 deletion core/go/internal/txmgr/rpcmodule.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,11 @@ func (tm *txManager) rpcStoreABI() rpcserver.RPCHandler {
return rpcserver.RPCMethod1(func(ctx context.Context,
a abi.ABI,
) (*tktypes.Bytes32, error) {
return tm.storeABI(ctx, tm.p.DB(), a)
postCommit, abiHashRef, err := tm.storeABI(ctx, tm.p.DB(), a)
if err == nil {
postCommit()
}
return abiHashRef, err
})
}

Expand Down
Loading

0 comments on commit 31af7f5

Please sign in to comment.