Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add closed_at to ledger entry changes tables #208

Merged
merged 5 commits into from
Nov 2, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/export_account_signers.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,10 @@ the export_ledger_entry_changes command.`,
numFailures := 0
totalNumBytes := 0
numSigners := 0
var header xdr.LedgerHeaderHistoryEntry
for _, acc := range accounts {
if acc.AccountSignersChanged() {
transformed, err := transform.TransformSigners(acc)
transformed, err := transform.TransformSigners(acc, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform account signer: %v", err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_accounts.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,9 @@ the export_ledger_entry_changes command.`,
outFile := mustOutFile(path)
numFailures := 0
totalNumBytes := 0
var header xdr.LedgerHeaderHistoryEntry
for _, acc := range accounts {
transformed, err := transform.TransformAccount(acc)
transformed, err := transform.TransformAccount(acc, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform account: %v", err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_claimable_balances.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ var claimableBalancesCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, balance := range balances {
transformed, err := transform.TransformClaimableBalance(balance)
var header xdr.LedgerHeaderHistoryEntry
chowbao marked this conversation as resolved.
Show resolved Hide resolved
transformed, err := transform.TransformClaimableBalance(balance, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform balance %+v: %v", balance, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_config_setting.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ var configSettingCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, setting := range settings {
transformed, err := transform.TransformConfigSetting(setting)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformConfigSetting(setting, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform config setting %+v: %v", setting, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_contract_code.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ var codeCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, code := range codes {
transformed, err := transform.TransformContractCode(code)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformContractCode(code, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform contract code %+v: %v", code, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_contract_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ var dataCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, data := range datas {
var header xdr.LedgerHeaderHistoryEntry
TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData)
transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase)
transformed, err, ok := TransformContractData.TransformContractData(data, env.NetworkPassphrase, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform contract data %+v: %v", data, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ var expirationCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, expiration := range expirations {
transformed, err := transform.TransformExpiration(expiration)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformExpiration(expiration, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform expiration %+v: %v", expiration, err))
numFailures += 1
Expand Down
38 changes: 19 additions & 19 deletions cmd/export_ledger_entry_changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,13 +112,13 @@ be exported.`,
if !exports["export-accounts"] {
continue
}
for _, change := range changes {
for i, change := range changes.Changes {
if changed, err := change.AccountChangedExceptSigners(); err != nil {
cmdLogger.LogError(fmt.Errorf("unable to identify changed accounts: %v", err))
continue
} else if changed {

acc, err := transform.TransformAccount(change)
acc, err := transform.TransformAccount(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -127,7 +127,7 @@ be exported.`,
transformedOutputs["accounts"] = append(transformedOutputs["accounts"], acc)
}
if change.AccountSignersChanged() {
signers, err := transform.TransformSigners(change)
signers, err := transform.TransformSigners(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming account signers from %d :%s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -142,8 +142,8 @@ be exported.`,
if !exports["export-balances"] {
continue
}
for _, change := range changes {
balance, err := transform.TransformClaimableBalance(change)
for i, change := range changes.Changes {
balance, err := transform.TransformClaimableBalance(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming balance entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -155,8 +155,8 @@ be exported.`,
if !exports["export-offers"] {
continue
}
for _, change := range changes {
offer, err := transform.TransformOffer(change)
for i, change := range changes.Changes {
offer, err := transform.TransformOffer(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming offer entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -168,8 +168,8 @@ be exported.`,
if !exports["export-trustlines"] {
continue
}
for _, change := range changes {
trust, err := transform.TransformTrustline(change)
for i, change := range changes.Changes {
trust, err := transform.TransformTrustline(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming trustline entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -181,8 +181,8 @@ be exported.`,
if !exports["export-pools"] {
continue
}
for _, change := range changes {
pool, err := transform.TransformPool(change)
for i, change := range changes.Changes {
pool, err := transform.TransformPool(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming liquidity pool entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -194,9 +194,9 @@ be exported.`,
if !exports["export-contract-data"] {
continue
}
for _, change := range changes {
for i, change := range changes.Changes {
TransformContractData := transform.NewTransformContractDataStruct(transform.AssetFromContractData, transform.ContractBalanceFromContractData)
contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase)
contractData, err, _ := TransformContractData.TransformContractData(change, env.NetworkPassphrase, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming contract data entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -214,8 +214,8 @@ be exported.`,
if !exports["export-contract-code"] {
continue
}
for _, change := range changes {
contractCode, err := transform.TransformContractCode(change)
for i, change := range changes.Changes {
contractCode, err := transform.TransformContractCode(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming contract code entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -227,8 +227,8 @@ be exported.`,
if !exports["export-config-settings"] {
continue
}
for _, change := range changes {
configSettings, err := transform.TransformConfigSetting(change)
for i, change := range changes.Changes {
configSettings, err := transform.TransformConfigSetting(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming config settings entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand All @@ -240,8 +240,8 @@ be exported.`,
if !exports["export-expiration"] {
continue
}
for _, change := range changes {
expiration, err := transform.TransformExpiration(change)
for i, change := range changes.Changes {
expiration, err := transform.TransformExpiration(change, changes.LedgerHeaders[i])
if err != nil {
entry, _, _, _ := utils.ExtractEntryFromChange(change)
cmdLogger.LogError(fmt.Errorf("error transforming expiration entry last updated at %d: %s", entry.LastModifiedLedgerSeq, err))
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_liquidity_pools.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@ the export_ledger_entry_changes command.`,
numFailures := 0
totalNumBytes := 0
for _, pool := range pools {
transformed, err := transform.TransformPool(pool)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformPool(pool, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform pool %+v: %v", pool, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_offers.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ var offersCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, offer := range offers {
transformed, err := transform.TransformOffer(offer)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformOffer(offer, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not transform offer %+v: %v", offer, err))
numFailures += 1
Expand Down
3 changes: 2 additions & 1 deletion cmd/export_trustlines.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ var trustlinesCmd = &cobra.Command{
numFailures := 0
totalNumBytes := 0
for _, trust := range trustlines {
transformed, err := transform.TransformTrustline(trust)
var header xdr.LedgerHeaderHistoryEntry
transformed, err := transform.TransformTrustline(trust, header)
if err != nil {
cmdLogger.LogError(fmt.Errorf("could not json transform trustline %+v: %v", trust, err))
numFailures += 1
Expand Down
26 changes: 14 additions & 12 deletions internal/input/changes.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,14 @@ var (
ExtractBatch = extractBatch
)

type LedgerChanges struct {
Changes []ingest.Change
LedgerHeaders []xdr.LedgerHeaderHistoryEntry
}

// ChangeBatch represents the changes in a batch of ledgers represented by the range [BatchStart, BatchEnd)
type ChangeBatch struct {
Changes map[xdr.LedgerEntryType][]ingest.Change
Changes map[xdr.LedgerEntryType]LedgerChanges
BatchStart uint32
BatchEnd uint32
}
Expand Down Expand Up @@ -92,7 +97,7 @@ func extractBatch(
xdr.LedgerEntryTypeConfigSetting,
xdr.LedgerEntryTypeExpiration}

changes := map[xdr.LedgerEntryType][]ingest.Change{}
changesClosedAt := map[xdr.LedgerEntryType]LedgerChanges{}
ctx := context.Background()
for seq := batchStart; seq <= batchEnd; {
changeCompactors := map[xdr.LedgerEntryType]*ingest.ChangeCompactor{}
Expand All @@ -107,19 +112,13 @@ func extractBatch(

// if this ledger is available, we process its changes and move on to the next ledger by incrementing seq.
// Otherwise, nothing is incremented, and we try again on the next iteration of the loop
var header xdr.LedgerHeaderHistoryEntry
if seq <= latestLedger {
changeReader, err := ingest.NewLedgerChangeReader(ctx, core, env.NetworkPassphrase, seq)
if err != nil {
logger.Fatal(fmt.Sprintf("unable to create change reader for ledger %d: ", seq), err)
}
// TODO: Add in ledger_closed_at; Update changeCompactors to also save ledger close time.
// AddChange is from the go monorepo so it might be easier to just add a addledgerclose func after it
//txReader := changeReader.LedgerTransactionReader

//closeTime, err := utils.TimePointToUTCTimeStamp(txReader.GetHeader().Header.ScpValue.CloseTime)
//if err != nil {
// logger.Fatal(fmt.Sprintf("unable to read close time for ledger %d: ", seq), err)
//}
header = changeReader.LedgerTransactionReader.GetHeader()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice find :) was this method easy to find? Or would it be helpful to add to the ingest documentation for ease of use in the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It wasn't necessarily obvious because of all the different reader nesting. But when you know it exists it is easy. Probably would be useful in the ingest documentation. It would be nice to get a reader/backend to xdr.* mapping like {Reader: [xdr.TxMeta, xdr.CloseMeta, xdr.LedgerHeader, etc...]} without having to look through the xdr

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's a good idea. Let's discuss with the rest of platform whether it makes sense to change the backend xdr mapping to be clearer.


for {
change, err := changeReader.Read()
Expand All @@ -145,14 +144,17 @@ func extractBatch(

for dataType, compactor := range changeCompactors {
for _, change := range compactor.GetChanges() {
changes[dataType] = append(changes[dataType], change)
dataTypeChanges := changesClosedAt[dataType]
dataTypeChanges.Changes = append(dataTypeChanges.Changes, change)
dataTypeChanges.LedgerHeaders = append(dataTypeChanges.LedgerHeaders, header)
changesClosedAt[dataType] = dataTypeChanges
}
}

}

return ChangeBatch{
Changes: changes,
Changes: changesClosedAt,
BatchStart: batchStart,
BatchEnd: batchEnd,
}
Expand Down
13 changes: 9 additions & 4 deletions internal/input/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package input
import (
"testing"

"github.com/stellar/go/ingest"
"github.com/stellar/go/ingest/ledgerbackend"
"github.com/stellar/go/support/log"
"github.com/stellar/stellar-etl/internal/utils"
"github.com/stretchr/testify/assert"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
)

Expand Down Expand Up @@ -114,8 +114,13 @@ func TestSendBatchToChannel(t *testing.T) {
}

func wrapLedgerEntry(entryType xdr.LedgerEntryType, entry xdr.LedgerEntry) ChangeBatch {
changes := map[xdr.LedgerEntryType][]ingest.Change{
entryType: {{Type: entry.Data.Type, Post: &entry}},
changes := map[xdr.LedgerEntryType]LedgerChanges{
entryType: {
Changes: []ingest.Change{
{Type: entry.Data.Type, Post: &entry},
},
LedgerHeaders: []xdr.LedgerHeaderHistoryEntry{},
},
}
return ChangeBatch{
Changes: changes,
Expand All @@ -128,7 +133,7 @@ func mockExtractBatch(
env utils.EnvironmentDetails, logger *utils.EtlLogger) ChangeBatch {
log.Errorf("mock called")
return ChangeBatch{
Changes: map[xdr.LedgerEntryType][]ingest.Change{},
Changes: map[xdr.LedgerEntryType]LedgerChanges{},
BatchStart: batchStart,
BatchEnd: batchEnd,
}
Expand Down
11 changes: 10 additions & 1 deletion internal/transform/account.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

// TransformAccount converts an account from the history archive ingestion system into a form suitable for BigQuery
func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {
func TransformAccount(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) (AccountOutput, error) {
ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
if err != nil {
return AccountOutput{}, err
Expand Down Expand Up @@ -76,6 +76,13 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {

outputLastModifiedLedger := uint32(ledgerEntry.LastModifiedLedgerSeq)

closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
if err != nil {
return AccountOutput{}, err
}

ledgerSequence := header.Header.LedgerSeq

transformedAccount := AccountOutput{
AccountID: outputID,
Balance: utils.ConvertStroopValueToReal(outputBalance),
Expand All @@ -98,6 +105,8 @@ func TransformAccount(ledgerChange ingest.Change) (AccountOutput, error) {
NumSponsoring: uint32(accountEntry.NumSponsoring()),
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
ClosedAt: closedAt,
LedgerSequence: uint32(ledgerSequence),
}
return transformedAccount, nil
}
12 changes: 11 additions & 1 deletion internal/transform/account_signer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (

"github.com/guregu/null"
"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"
"github.com/stellar/stellar-etl/internal/utils"
)

// TransformSigners converts account signers from the history archive ingestion system into a form suitable for BigQuery
func TransformSigners(ledgerChange ingest.Change) ([]AccountSignerOutput, error) {
func TransformSigners(ledgerChange ingest.Change, header xdr.LedgerHeaderHistoryEntry) ([]AccountSignerOutput, error) {
var signers []AccountSignerOutput

ledgerEntry, changeType, outputDeleted, err := utils.ExtractEntryFromChange(ledgerChange)
Expand All @@ -23,6 +24,13 @@ func TransformSigners(ledgerChange ingest.Change) ([]AccountSignerOutput, error)
return signers, fmt.Errorf("could not extract signer data from ledger entry of type: %+v", ledgerEntry.Data.Type)
}

closedAt, err := utils.TimePointToUTCTimeStamp(header.Header.ScpValue.CloseTime)
if err != nil {
return signers, err
}

ledgerSequence := header.Header.LedgerSeq

sponsors := accountEntry.SponsorPerSigner()
for signer, weight := range accountEntry.SignerSummary() {
var sponsor null.String
Expand All @@ -38,6 +46,8 @@ func TransformSigners(ledgerChange ingest.Change) ([]AccountSignerOutput, error)
LastModifiedLedger: outputLastModifiedLedger,
LedgerEntryChange: uint32(changeType),
Deleted: outputDeleted,
ClosedAt: closedAt,
LedgerSequence: uint32(ledgerSequence),
})
}
sort.Slice(signers, func(a, b int) bool { return signers[a].Weight < signers[b].Weight })
Expand Down
Loading
Loading