Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update TranscationReceipt to include decoding of events #141

Merged
merged 6 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/hashicorp/golang-lru v1.0.2
github.com/hyperledger/firefly-common v1.4.8
github.com/hyperledger/firefly-signer v1.1.13
github.com/hyperledger/firefly-transaction-manager v1.3.14
github.com/hyperledger/firefly-transaction-manager v1.3.15-0.20240611130821-adc7fa2f78e2
github.com/sirupsen/logrus v1.9.3
github.com/spf13/cobra v1.8.0
github.com/stretchr/testify v1.8.4
Expand Down
6 changes: 4 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,10 @@ github.com/hyperledger/firefly-common v1.4.8 h1:0o1Qp1c5YzQo8nbnX+gAo9SVd2tR4Z9U
github.com/hyperledger/firefly-common v1.4.8/go.mod h1:dXewcVMFNON2SvQ1UPvu64OWUt77+M3p8qy61lT1kE4=
github.com/hyperledger/firefly-signer v1.1.13 h1:eiHjc6HPRG8AzXUCUgm51qqX1I9BokiuiiqJ89XwK4M=
github.com/hyperledger/firefly-signer v1.1.13/go.mod h1:pK6kivzBFSue3zpJSQpH67VasnLLbwBJOBUNv0zHbRA=
github.com/hyperledger/firefly-transaction-manager v1.3.14 h1:qK5wFQhEkZosPd/rvlcVHiSc+5ZCl+LEgpKk2a+9wKw=
github.com/hyperledger/firefly-transaction-manager v1.3.14/go.mod h1:N3BoHh8+dWG710oQKuNiXmJNEOBBeLTsQ8GpZ41vhog=
github.com/hyperledger/firefly-transaction-manager v1.3.15-0.20240610222246-001555528484 h1:HEhw5XoN0NSqfADwUTImWvJjAgkPiRwyHCJpGSgR9EM=
github.com/hyperledger/firefly-transaction-manager v1.3.15-0.20240610222246-001555528484/go.mod h1:N3BoHh8+dWG710oQKuNiXmJNEOBBeLTsQ8GpZ41vhog=
github.com/hyperledger/firefly-transaction-manager v1.3.15-0.20240611130821-adc7fa2f78e2 h1:mcE7cTLB9yNlQ0UmTzBZyseu8XwVjcgCD0dNMcRq7fw=
github.com/hyperledger/firefly-transaction-manager v1.3.15-0.20240611130821-adc7fa2f78e2/go.mod h1:N3BoHh8+dWG710oQKuNiXmJNEOBBeLTsQ8GpZ41vhog=
github.com/imdario/mergo v0.3.11/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/imdario/mergo v0.3.16 h1:wwQJbIsHYGMUyLSPrEq1CT16AhnhNJQ51+4fdHUnCl4=
github.com/imdario/mergo v0.3.16/go.mod h1:WBLT9ZmE3lPoWsEzCh9LPo3TiwVN+ZKEjmz+hD27ysY=
Expand Down
35 changes: 35 additions & 0 deletions internal/ethereum/blocklistener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -896,6 +896,41 @@ func TestBlockListenerProcessNonStandardHashRejectedWhenNotInHederaCompatibility

}

func TestBlockListenerProcessNonStandardHashRejectedWhenWrongSizeForHedera(t *testing.T) {

_, c, mRPC, done := newTestConnector(t)
bl := c.blockListener
bl.blockPollingInterval = 1 * time.Microsecond
bl.hederaCompatibilityMode = true

block1003Hash := ethtypes.MustNewHexBytes0xPrefix("0xef")

mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_blockNumber").Return(nil).Run(func(args mock.Arguments) {
hbh := args[1].(*ethtypes.HexInteger)
*hbh = *ethtypes.NewHexInteger64(1000)
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_newBlockFilter").Return(nil).Run(func(args mock.Arguments) {
hbh := args[1].(*string)
*hbh = "filter_id1"
})
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", "filter_id1").Return(nil).Run(func(args mock.Arguments) {
hbh := args[1].(*[]ethtypes.HexBytes0xPrefix)
*hbh = []ethtypes.HexBytes0xPrefix{
block1003Hash,
}
}).Once()
mRPC.On("CallRPC", mock.Anything, mock.Anything, "eth_getFilterChanges", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
go done() // Close after we've processed the log
})

bl.checkStartedLocked()

c.WaitClosed()

mRPC.AssertExpectations(t)

}

func TestBlockListenerProcessNonStandardHashAcceptedWhenInHederaCompatbilityMode(t *testing.T) {

_, c, mRPC, done := newTestConnector(t)
Expand Down
145 changes: 145 additions & 0 deletions internal/ethereum/event_enricher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
// Copyright © 2024 Kaleido, Inl.c.
//
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package ethereum

import (
"bytes"
"context"

"github.com/hyperledger/firefly-common/pkg/fftypes"
"github.com/hyperledger/firefly-common/pkg/log"
"github.com/hyperledger/firefly-signer/pkg/abi"
"github.com/hyperledger/firefly-signer/pkg/ethtypes"
"github.com/hyperledger/firefly-transaction-manager/pkg/ffcapi"
)

type eventEnricher struct {
connector *ethConnector
methods []*abi.Entry
extractSigner bool
}

func (ee *eventEnricher) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLog *logJSONRPC) (_ *ffcapi.Event, matched bool, decoded bool, err error) {

// Check the block for this event is at our high water mark, as we might have rewound for other listeners
blockNumber := ethLog.BlockNumber.BigInt().Int64()
transactionIndex := ethLog.TransactionIndex.BigInt().Int64()
logIndex := ethLog.LogIndex.BigInt().Int64()
protoID := getEventProtoID(blockNumber, transactionIndex, logIndex)

// Apply a post-filter check to the event
topicMatches := len(ethLog.Topics) > 0 && bytes.Equal(ethLog.Topics[0], f.Topic0)
addrMatches := f.Address == nil || bytes.Equal(ethLog.Address[:], f.Address[:])
if !topicMatches || !addrMatches {
log.L(ctx).Debugf("skipping event '%s' topicMatches=%t addrMatches=%t", protoID, topicMatches, addrMatches)
return nil, matched, decoded, nil
}
matched = true

log.L(ctx).Infof("detected event '%s'", protoID)
data, decoded := ee.decodeLogData(ctx, f.Event, ethLog.Topics, ethLog.Data)

info := eventInfo{
logJSONRPC: *ethLog,
}

var timestamp *fftypes.FFTime
if ee.connector.eventBlockTimestamps {
bi, err := ee.connector.getBlockInfoByHash(ctx, ethLog.BlockHash.String())
if bi == nil || err != nil {
log.L(ctx).Errorf("Failed to get block info timestamp for block '%s': %v", ethLog.BlockHash, err)
return nil, matched, decoded, err // This is an error condition, rather than just something we cannot enrich
}
timestamp = fftypes.UnixTime(bi.Timestamp.BigInt().Int64())
}

if len(ee.methods) > 0 || ee.extractSigner {
txInfo, err := ee.connector.getTransactionInfo(ctx, ethLog.TransactionHash)
if txInfo == nil || err != nil {
if txInfo == nil {
log.L(ctx).Errorf("Failed to get transaction info for TX '%s': transaction hash not found", ethLog.TransactionHash)
} else {
log.L(ctx).Errorf("Failed to get transaction info for TX '%s': %v", ethLog.TransactionHash, err)
}
return nil, matched, decoded, err // This is an error condition, rather than just something we cannot enrich
}
if ee.extractSigner {
info.InputSigner = txInfo.From
}
if len(ee.methods) > 0 {
ee.matchMethod(ctx, ee.methods, txInfo, &info)
}
}

signature := f.Signature
return &ffcapi.Event{
ID: ffcapi.EventID{
Signature: signature,
BlockHash: ethLog.BlockHash.String(),
TransactionHash: ethLog.TransactionHash.String(),
BlockNumber: fftypes.FFuint64(blockNumber),
TransactionIndex: fftypes.FFuint64(transactionIndex),
LogIndex: fftypes.FFuint64(logIndex),
Timestamp: timestamp,
},
Info: &info,
Data: data,
}, matched, decoded, nil
}

func (ee *eventEnricher) decodeLogData(ctx context.Context, event *abi.Entry, topics []ethtypes.HexBytes0xPrefix, data ethtypes.HexBytes0xPrefix) (*fftypes.JSONAny, bool) {
var b []byte
v, err := event.DecodeEventDataCtx(ctx, topics, data)
if err == nil {
b, err = ee.connector.serializer.SerializeJSONCtx(ctx, v)
}
if err != nil {
log.L(ctx).Errorf("Failed to process event log: %s", err)
return nil, false
}
return fftypes.JSONAnyPtrBytes(b), true
}

func (ee *eventEnricher) matchMethod(ctx context.Context, methods []*abi.Entry, txInfo *txInfoJSONRPC, info *eventInfo) {
if len(txInfo.Input) < 4 {
log.L(ctx).Debugf("No function selector available for TX '%s'", txInfo.Hash)
return
}
functionID := txInfo.Input[0:4]
var method *abi.Entry
for _, m := range methods {
if bytes.Equal(m.FunctionSelectorBytes(), functionID) {
method = m
break
}
}
if method == nil {
log.L(ctx).Debugf("Function selector '%s' for TX '%s' does not match any of the supplied methods", functionID.String(), txInfo.Hash)
return
}
info.InputMethod = method.String()
v, err := method.DecodeCallDataCtx(ctx, txInfo.Input)
var b []byte
if err == nil {
b, err = ee.connector.serializer.SerializeJSONCtx(ctx, v)
}
if err != nil {
log.L(ctx).Warnf("Failed to decode input for TX '%s' using '%s'", txInfo.Hash, info.InputMethod)
return
}
info.InputArgs = fftypes.JSONAnyPtrBytes(b)
}
109 changes: 7 additions & 102 deletions internal/ethereum/event_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package ethereum

import (
"bytes"
"context"
"encoding/json"
"math/big"
Expand Down Expand Up @@ -59,6 +58,7 @@ type listener struct {
id *fftypes.UUID
c *ethConnector
es *eventStream
ee *eventEnricher
hwmMux sync.Mutex // Protects checkpoint of an individual listener. May hold ES lock when taking this, must NOT attempt to obtain ES lock while holding this
hwmBlock int64
config listenerConfig
Expand Down Expand Up @@ -239,124 +239,29 @@ func (l *listener) listenerCatchupLoop() {
}
}

func (l *listener) decodeLogData(ctx context.Context, event *abi.Entry, topics []ethtypes.HexBytes0xPrefix, data ethtypes.HexBytes0xPrefix) *fftypes.JSONAny {
var b []byte
v, err := event.DecodeEventDataCtx(ctx, topics, data)
if err == nil {
b, err = l.c.serializer.SerializeJSONCtx(ctx, v)
}
if err != nil {
log.L(ctx).Errorf("Failed to process event log: %s", err)
return nil
}
return fftypes.JSONAnyPtrBytes(b)
}

func (l *listener) matchMethod(ctx context.Context, methods []*abi.Entry, txInfo *txInfoJSONRPC, info *eventInfo) {
if len(txInfo.Input) < 4 {
log.L(ctx).Debugf("No function selector available for TX '%s'", txInfo.Hash)
return
}
functionID := txInfo.Input[0:4]
var method *abi.Entry
for _, m := range methods {
if bytes.Equal(m.FunctionSelectorBytes(), functionID) {
method = m
break
}
}
if method == nil {
log.L(ctx).Debugf("Function selector '%s' for TX '%s' does not match any of the supplied methods", functionID.String(), txInfo.Hash)
return
}
info.InputMethod = method.String()
v, err := method.DecodeCallDataCtx(ctx, txInfo.Input)
var b []byte
if err == nil {
b, err = l.c.serializer.SerializeJSONCtx(ctx, v)
}
if err != nil {
log.L(ctx).Warnf("Failed to decode input for TX '%s' using '%s'", txInfo.Hash, info.InputMethod)
return
}
info.InputArgs = fftypes.JSONAnyPtrBytes(b)
}

func (l *listener) filterEnrichEthLog(ctx context.Context, f *eventFilter, ethLog *logJSONRPC) (*ffcapi.ListenerEvent, bool, error) {

// Check the block for this event is at our high water mark, as we might have rewound for other listeners
blockNumber := ethLog.BlockNumber.BigInt().Int64()
transactionIndex := ethLog.TransactionIndex.BigInt().Int64()
logIndex := ethLog.LogIndex.BigInt().Int64()
protoID := getEventProtoID(blockNumber, transactionIndex, logIndex)
if blockNumber < l.hwmBlock {
log.L(ctx).Debugf("Listener %s already delivered event '%s' hwm=%d", l.id, protoID, l.hwmBlock)
log.L(ctx).Debugf("Listener %s already delivered event '%s' hwm=%d", l.id, getEventProtoID(blockNumber, transactionIndex, logIndex), l.hwmBlock)
return nil, false, nil
}

// Apply a post-filter check to the event
topicMatches := len(ethLog.Topics) > 0 && bytes.Equal(ethLog.Topics[0], f.Topic0)
addrMatches := f.Address == nil || bytes.Equal(ethLog.Address[:], f.Address[:])
if !topicMatches || !addrMatches {
log.L(ctx).Debugf("Listener %s skipping event '%s' topicMatches=%t addrMatches=%t", l.id, protoID, topicMatches, addrMatches)
return nil, false, nil
e, matched, _, err := l.ee.filterEnrichEthLog(ctx, f, ethLog)
if !matched || err != nil {
return nil, false, err
}

log.L(ctx).Infof("Listener %s detected event '%s'", l.id, protoID)
data := l.decodeLogData(ctx, f.Event, ethLog.Topics, ethLog.Data)

info := eventInfo{
logJSONRPC: *ethLog,
}

var timestamp *fftypes.FFTime
if l.c.eventBlockTimestamps {
bi, err := l.c.getBlockInfoByHash(ctx, ethLog.BlockHash.String())
if bi == nil || err != nil {
log.L(ctx).Errorf("Failed to get block info timestamp for block '%s': %v", ethLog.BlockHash, err)
return nil, false, err // This is an error condition, rather than just something we cannot enrich
}
timestamp = fftypes.UnixTime(bi.Timestamp.BigInt().Int64())
}

if len(l.config.options.Methods) > 0 || l.config.options.Signer {
txInfo, err := l.c.getTransactionInfo(ctx, ethLog.TransactionHash)
if txInfo == nil || err != nil {
if txInfo == nil {
log.L(ctx).Errorf("Failed to get transaction info for TX '%s': transaction hash not found", ethLog.TransactionHash)
} else {
log.L(ctx).Errorf("Failed to get transaction info for TX '%s': %v", ethLog.TransactionHash, err)
}
return nil, false, err // This is an error condition, rather than just something we cannot enrich
}
if l.config.options.Signer {
info.InputSigner = txInfo.From
}
if len(l.config.options.Methods) > 0 {
l.matchMethod(ctx, l.config.options.Methods, txInfo, &info)
}
}

signature := f.Signature
e.ID.ListenerID = l.id
return &ffcapi.ListenerEvent{
Checkpoint: &listenerCheckpoint{
Block: blockNumber,
TransactionIndex: transactionIndex,
LogIndex: logIndex,
},
Event: &ffcapi.Event{
ID: ffcapi.EventID{
ListenerID: l.id,
Signature: signature,
BlockHash: ethLog.BlockHash.String(),
TransactionHash: ethLog.TransactionHash.String(),
BlockNumber: fftypes.FFuint64(blockNumber),
TransactionIndex: fftypes.FFuint64(transactionIndex),
LogIndex: fftypes.FFuint64(logIndex),
Timestamp: timestamp,
},
Info: &info,
Data: data,
},
Event: e,
}, true, nil
}
6 changes: 4 additions & 2 deletions internal/ethereum/event_listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,9 @@ func TestDecodeLogDataFail(t *testing.T) {
err := json.Unmarshal([]byte(abiTransferEvent), &abiEvent)
assert.NoError(t, err)

res := l.decodeLogData(l.es.ctx, abiEvent, []ethtypes.HexBytes0xPrefix{}, nil)
res, decoded := l.ee.decodeLogData(l.es.ctx, abiEvent, []ethtypes.HexBytes0xPrefix{}, nil)
assert.Nil(t, res)
assert.False(t, decoded)

}

Expand All @@ -405,8 +406,9 @@ func TestSerializeEventDataFail(t *testing.T) {
err := json.Unmarshal([]byte(abiTransferEvent), &abiEvent)
assert.NoError(t, err)

res := l.decodeLogData(l.es.ctx, abiEvent, []ethtypes.HexBytes0xPrefix{}, nil)
res, decoded := l.ee.decodeLogData(l.es.ctx, abiEvent, []ethtypes.HexBytes0xPrefix{}, nil)
assert.Nil(t, res)
assert.False(t, decoded)

}

Expand Down
Loading
Loading