Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

services/horizon: Batch account data creation, updates and deletions #3956

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 14 additions & 25 deletions services/horizon/internal/actions/account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,29 +121,20 @@ var (
},
}

data1 = xdr.LedgerEntry{
LastModifiedLedgerSeq: 100,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeData,
Data: &xdr.DataEntry{
AccountId: xdr.MustAddress(accountOne),
DataName: "test data",
// This also tests if base64 encoding is working as 0 is invalid UTF-8 byte
DataValue: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
},
},
data1 = history.Data{
AccountID: accountOne,
Name: "test data",
// This also tests if base64 encoding is working as 0 is invalid UTF-8 byte
Value: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
LastModifiedLedger: 1234,
}

data2 = xdr.LedgerEntry{
LastModifiedLedgerSeq: 100,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeData,
Data: &xdr.DataEntry{
AccountId: xdr.MustAddress(accountTwo),
DataName: "test data2",
DataValue: []byte{10, 11, 12, 13, 14, 15, 16, 17, 18, 19},
},
},
data2 = history.Data{
AccountID: accountTwo,
Name: "test data2",
Value: []byte{10, 11, 12, 13, 14, 15, 16, 17, 18, 19},
LastModifiedLedger: 1234,
Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
}

accountSigners = []history.AccountSigner{
Expand Down Expand Up @@ -437,9 +428,7 @@ func TestGetAccountsHandlerPageResultsByAsset(t *testing.T) {
tt.Assert.NoError(err)
}

_, err = q.InsertAccountData(tt.Ctx, data1)
assert.NoError(t, err)
_, err = q.InsertAccountData(tt.Ctx, data2)
err = q.UpsertAccountData(tt.Ctx, []history.Data{data1, data2})
assert.NoError(t, err)

var assetType, code, issuer string
Expand Down Expand Up @@ -485,7 +474,7 @@ func TestGetAccountsHandlerPageResultsByAsset(t *testing.T) {
tt.Assert.Len(result.Balances, 2)
tt.Assert.Len(result.Signers, 2)

_, ok := result.Data[string(data2.Data.Data.DataName)]
_, ok := result.Data[data2.Name]
tt.Assert.True(ok)
}

Expand Down
54 changes: 18 additions & 36 deletions services/horizon/internal/actions_data_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"testing"

"github.com/guregu/null"
"github.com/stretchr/testify/assert"

"github.com/stellar/go/services/horizon/internal/db2/history"
Expand All @@ -14,35 +15,21 @@ import (
)

var (
data1 = xdr.LedgerEntry{
LastModifiedLedgerSeq: 100,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeData,
Data: &xdr.DataEntry{
AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"),
DataName: "name1",
// This also tests if base64 encoding is working as 0 is invalid UTF-8 byte
DataValue: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
},
},
Ext: xdr.LedgerEntryExt{
V: 1,
V1: &xdr.LedgerEntryExtensionV1{
SponsoringId: xdr.MustAddressPtr("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
},
},
data1 = history.Data{
LastModifiedLedger: 100,
AccountID: "GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB",
Name: "name1",
// This also tests if base64 encoding is working as 0 is invalid UTF-8 byte
Value: []byte{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},

Sponsor: null.StringFrom("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML"),
}

data2 = xdr.LedgerEntry{
LastModifiedLedgerSeq: 100,
Data: xdr.LedgerEntryData{
Type: xdr.LedgerEntryTypeData,
Data: &xdr.DataEntry{
AccountId: xdr.MustAddress("GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"),
DataName: "name ",
DataValue: []byte("it got spaces!"),
},
},
data2 = history.Data{
LastModifiedLedger: 100,
AccountID: "GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB",
Name: "name ",
Value: []byte("it got spaces!"),
}
)

Expand All @@ -64,13 +51,8 @@ func TestDataActions_Show(t *testing.T) {
}, 0, 0, 0, 0, 0)
ht.Assert.NoError(err)

rows, err := q.InsertAccountData(ht.Ctx, data1)
assert.NoError(t, err)
ht.Assert.Equal(int64(1), rows)

rows, err = q.InsertAccountData(ht.Ctx, data2)
err = q.UpsertAccountData(ht.Ctx, []history.Data{data1, data2})
assert.NoError(t, err)
ht.Assert.Equal(int64(1), rows)

prefix := "/accounts/GAOQJGUAB7NI7K7I62ORBXMN3J4SSWQUQ7FOEPSDJ322W2HMCNWPHXFB"
result := map[string]string{}
Expand All @@ -82,14 +64,14 @@ func TestDataActions_Show(t *testing.T) {
ht.Assert.NoError(err)
decoded, err := base64.StdEncoding.DecodeString(result["value"])
ht.Assert.NoError(err)
ht.Assert.Equal([]byte(data1.Data.Data.DataValue), decoded)
ht.Assert.Equal([]byte(data1.Value), decoded)
ht.Assert.Equal("GC3C4AKRBQLHOJ45U4XG35ESVWRDECWO5XLDGYADO6DPR3L7KIDVUMML", result["sponsor"])
}

// raw
w = ht.Get(prefix+"/data/name1", test.RequestHelperRaw)
if ht.Assert.Equal(200, w.Code) {
ht.Assert.Equal([]byte(data1.Data.Data.DataValue), w.Body.Bytes())
ht.Assert.Equal([]byte(data1.Value), w.Body.Bytes())
}

result = map[string]string{}
Expand All @@ -102,7 +84,7 @@ func TestDataActions_Show(t *testing.T) {

decoded, err := base64.StdEncoding.DecodeString(result["value"])
ht.Assert.NoError(err)
ht.Assert.Equal([]byte(data2.Data.Data.DataValue), decoded)
ht.Assert.Equal([]byte(data2.Value), decoded)
ht.Assert.Equal("", result["sponsor"])
}

Expand Down
116 changes: 46 additions & 70 deletions services/horizon/internal/db2/history/account_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,13 @@ func (q *Q) GetAccountDataByAccountID(ctx context.Context, id string) ([]Data, e
}

// GetAccountDataByKeys loads a row from the `accounts_data` table, selected by multiple keys.
func (q *Q) GetAccountDataByKeys(ctx context.Context, keys []xdr.LedgerKeyData) ([]Data, error) {
func (q *Q) GetAccountDataByKeys(ctx context.Context, keys []AccountDataKey) ([]Data, error) {
var data []Data
lkeys := make([]string, 0, len(keys))
for _, key := range keys {
lkey, err := ledgerKeyDataToString(key)
lkey, err := accountDataKeyToString(key)
if err != nil {
return nil, errors.Wrap(err, "Error running ledgerKeyTrustLineToString")
return nil, errors.Wrap(err, "Error running accountDataKeyToString")
}
lkeys = append(lkeys, lkey)
}
Expand All @@ -55,94 +55,70 @@ func (q *Q) GetAccountDataByKeys(ctx context.Context, keys []xdr.LedgerKeyData)
return data, err
}

func ledgerKeyDataToString(data xdr.LedgerKeyData) (string, error) {
ledgerKey := &xdr.LedgerKey{}
err := ledgerKey.SetData(data.AccountId, string(data.DataName))
func accountDataKeyToString(key AccountDataKey) (string, error) {
var aid xdr.AccountId
err := aid.SetAddress(key.AccountID)
if err != nil {
return "", err
}
var ledgerKey xdr.LedgerKey
if err = ledgerKey.SetData(aid, key.DataName); err != nil {
return "", errors.Wrap(err, "Error running ledgerKey.SetData")
}
key, err := ledgerKey.MarshalBinary()
lKey, err := ledgerKey.MarshalBinary()
if err != nil {
return "", errors.Wrap(err, "Error running MarshalBinaryCompress")
}

return base64.StdEncoding.EncodeToString(key), nil
return base64.StdEncoding.EncodeToString(lKey), nil
}

func dataEntryToLedgerKeyString(entry xdr.LedgerEntry) (string, error) {
ledgerKey := entry.LedgerKey()
key, err := ledgerKey.MarshalBinary()
if err != nil {
return "", errors.Wrap(err, "Error running MarshalBinaryCompress")
}
// UpsertAccountData upserts a batch of data in the account_Data table.
func (q *Q) UpsertAccountData(ctx context.Context, data []Data) error {
var ledgerKey, accountID, name, value, lastModifiedLedger, sponsor []interface{}

return base64.StdEncoding.EncodeToString(key), nil
}

// InsertAccountData creates a row in the accounts_data table.
// Returns number of rows affected and error.
func (q *Q) InsertAccountData(ctx context.Context, entry xdr.LedgerEntry) (int64, error) {
data := entry.Data.MustData()

// Add lkey only when inserting rows
key, err := dataEntryToLedgerKeyString(entry)
if err != nil {
return 0, errors.Wrap(err, "Error running dataEntryToLedgerKeyString")
}

sql := sq.Insert("accounts_data").
Columns("ledger_key", "account_id", "name", "value", "last_modified_ledger", "sponsor").
Values(
key,
data.AccountId.Address(),
data.DataName,
AccountDataValue(data.DataValue),
entry.LastModifiedLedgerSeq,
ledgerEntrySponsorToNullString(entry),
)

result, err := q.Exec(ctx, sql)
if err != nil {
return 0, err
for _, d := range data {
key, err := accountDataKeyToString(AccountDataKey{
AccountID: d.AccountID,
DataName: d.Name,
})
if err != nil {
return err
}
ledgerKey = append(ledgerKey, key)
accountID = append(accountID, d.AccountID)
name = append(name, d.Name)
value = append(value, d.Value)
lastModifiedLedger = append(lastModifiedLedger, d.LastModifiedLedger)
sponsor = append(sponsor, d.Sponsor)
}

return result.RowsAffected()
}

// UpdateAccountData updates a row in the accounts_data table.
// Returns number of rows affected and error.
func (q *Q) UpdateAccountData(ctx context.Context, entry xdr.LedgerEntry) (int64, error) {
data := entry.Data.MustData()

key, err := dataEntryToLedgerKeyString(entry)
if err != nil {
return 0, errors.Wrap(err, "Error running dataEntryToLedgerKeyString")
}
sql := sq.Update("accounts_data").
SetMap(map[string]interface{}{
"value": AccountDataValue(data.DataValue),
"last_modified_ledger": entry.LastModifiedLedgerSeq,
"sponsor": ledgerEntrySponsorToNullString(entry),
}).
Where(sq.Eq{"ledger_key": key})
result, err := q.Exec(ctx, sql)
if err != nil {
return 0, err
upsertFields := []upsertField{
{"ledger_key", "character varying(150)", ledgerKey},
{"account_id", "character varying(56)", accountID},
{"name", "character varying(64)", name},
{"value", "character varying(90)", value},
{"last_modified_ledger", "integer", lastModifiedLedger},
{"sponsor", "text", sponsor},
}

return result.RowsAffected()
return q.upsertRows(ctx, "accounts_data", "ledger_key", upsertFields)
}

// RemoveAccountData deletes a row in the accounts_data table.
// Returns number of rows affected and error.
func (q *Q) RemoveAccountData(ctx context.Context, key xdr.LedgerKeyData) (int64, error) {
lkey, err := ledgerKeyDataToString(key)
if err != nil {
return 0, errors.Wrap(err, "Error running ledgerKeyDataToString")
func (q *Q) RemoveAccountData(ctx context.Context, keys []AccountDataKey) (int64, error) {
lkeys := make([]string, 0, len(keys))
for _, key := range keys {
lkey, err := accountDataKeyToString(key)
if err != nil {
return 0, errors.Wrap(err, "Error running accountDataKeyToString")
}
lkeys = append(lkeys, lkey)
}

sql := sq.Delete("accounts_data").
Where(sq.Eq{"ledger_key": lkey})
Where(sq.Eq{"ledger_key": lkeys})
result, err := q.Exec(ctx, sql)
if err != nil {
return 0, err
Expand Down

This file was deleted.

This file was deleted.

Loading