Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Z/update state change entry encoder #42

Draft
wants to merge 37 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
74280d1
Updates to data handler
superzordon Mar 1, 2024
565dd80
Reduce sleep time for consumer
superzordon Mar 1, 2024
a65a03a
Add local docker compose file
superzordon Mar 1, 2024
72883ae
Update with additional speed enhancements and logging
superzordon Mar 5, 2024
7f13e8c
Update logs
superzordon Mar 6, 2024
6d2dc4e
Update data handler to utilize database transactions
superzordon Mar 7, 2024
96dde61
Update handler procedure
superzordon Mar 8, 2024
ed76f54
Updates to consumer
superzordon Mar 8, 2024
039fa0a
Remove savepoint release
superzordon Mar 8, 2024
432c2bf
Remove logging
superzordon Mar 8, 2024
9bee163
Release savepoints when they're no longer needed
superzordon Mar 9, 2024
0f7c17c
Updates to transaction creation
superzordon Mar 10, 2024
752bfe9
Update consumer logging
superzordon Mar 13, 2024
72f55f9
Update mempool sync routine
superzordon Mar 14, 2024
87f6bd9
Updates to consumer logic
superzordon Mar 15, 2024
ab38954
Updates to logging for consumer
superzordon Mar 16, 2024
ba1ada9
Updates to logging
superzordon Mar 16, 2024
40555d0
Update consumer
superzordon Mar 17, 2024
5b0fa64
Updates to consumer logic
superzordon Mar 19, 2024
16c2087
Updates to dockerfile
superzordon Mar 27, 2024
5ee27d4
Update dockerfile
superzordon Mar 27, 2024
ac0036c
Fix association block height
superzordon Mar 30, 2024
39f8928
Update consumer logging
superzordon Apr 2, 2024
845e114
Empty commit to trigger build
superzordon Apr 3, 2024
5d05f45
Empty commit to trigger build
superzordon Apr 3, 2024
3f9f0f9
Add logging
superzordon Apr 3, 2024
613ea29
Empty commit to trigger build
superzordon Apr 3, 2024
2f2f751
Empty commit to trigger build
superzordon Apr 3, 2024
ea64415
Empty commit to trigger build
superzordon Apr 4, 2024
64cb783
Empty commit to trigger build
superzordon Apr 4, 2024
962bf53
Empty commit to trigger build
superzordon Apr 4, 2024
4bfdf9a
Updates to new message entry and module
superzordon Apr 12, 2024
f467421
Update new message encoding
superzordon Apr 16, 2024
fc10c59
Updates
superzordon May 2, 2024
c933712
Empty commit to trigger build
superzordon May 24, 2024
a79f660
Fix docker compose make command
superzordon May 25, 2024
9fec50a
Empty commit to trigger build
superzordon May 31, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions .env
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
STATE_CHANGE_DIR=/tmp/state-changes-test
CONSUMER_PROGRESS_DIR=/tmp/consumer-progress-test
STATE_CHANGE_DIR=/tmp/state-change-files/state-changes
CONSUMER_PROGRESS_DIR=/tmp/consumer-progress
DB_HOST=localhost
DB_PORT=5432
DB_PORT=5430
DB_USERNAME=postgres
DB_PASSWORD=postgres
READONLY_USER_PASSWORD=postgres
LOG_QUERIES=true
LOG_QUERIES=false
THREAD_LIMIT=10
BATCH_BYTES=500000
CALCULATE_EXPLORER_STATISTICS=true
CALCULATE_EXPLORER_STATISTICS=false
28 changes: 18 additions & 10 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
FROM alpine:latest AS daodao
FROM alpine:latest AS builder

RUN apk update
RUN apk upgrade
Expand All @@ -10,6 +10,8 @@ COPY postgres-data-handler/go.mod postgres-data-handler/
COPY postgres-data-handler/go.sum postgres-data-handler/
COPY core/go.mod core/
COPY core/go.sum core/
COPY state-consumer/go.mod state-consumer/
COPY state-consumer/go.sum state-consumer/

WORKDIR /postgres-data-handler/src/postgres-data-handler

Expand All @@ -27,16 +29,22 @@ COPY core/cmd ../core/cmd
COPY core/lib ../core/lib
COPY core/migrate ../core/migrate

COPY state-consumer/consumer ../state-consumer/consumer

RUN go mod tidy

# Install Delve debugger, specifying the installation path explicitly
ENV GOPATH=/root/go
RUN go install github.com/go-delve/delve/cmd/dlv@latest

## build postgres data handler backend
RUN GOOS=linux go build -mod=mod -a -installsuffix cgo -o bin/postgres-data-handler main.go
#
## create tiny image
#FROM alpine:latest
##
#RUN apk add --update vips-dev
##
#COPY --from=daodao /daodao/src/daodao-backend/bin/daodao-backend /daodao/bin/daodao-backend
#ENTRYPOINT ["/daodao/bin/daodao-backend"]
ENTRYPOINT ["/postgres-data-handler/src/postgres-data-handler/bin/postgres-data-handler"]

# Install runtime dependencies
RUN apk add --no-cache vips-dev

# Expose the port Delve will listen on
EXPOSE 2345

# Set the entry point to start the application under Delve's control
ENTRYPOINT ["/root/go/bin/dlv", "--listen=:2345", "--headless=true", "--api-version=2", "--accept-multiclient", "exec", "/postgres-data-handler/src/postgres-data-handler/bin/postgres-data-handler"]
8 changes: 8 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
dev:
go run .

dev-env:
docker compose -f local.docker-compose.yml build && docker compose -f local.docker-compose.yml up

dev-env-down:
docker compose -f local.docker-compose.yml down --volumes
6 changes: 3 additions & 3 deletions entries/access_group.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func AccessGroupEncoderToPGStruct(accessGroupEntry *lib.AccessGroupEntry, keyByt

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// based on the operation type and executes it.
func AccessGroupBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
func AccessGroupBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
Expand All @@ -69,7 +69,7 @@ func AccessGroupBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, para
}

// bulkInsertAccessGroupEntry inserts a batch of access_group entries into the database.
func bulkInsertAccessGroupEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
func bulkInsertAccessGroupEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
Expand All @@ -94,7 +94,7 @@ func bulkInsertAccessGroupEntry(entries []*lib.StateChangeEntry, db *bun.DB, ope
}

// bulkDeletePostEntry deletes a batch of access_group entries from the database.
func bulkDeleteAccessGroupEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
func bulkDeleteAccessGroupEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

Expand Down
6 changes: 3 additions & 3 deletions entries/access_group_member.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func AccessGroupMemberEncoderToPGStruct(accessGroupMemberEntry *lib.AccessGroupM

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// based on the operation type and executes it.
func AccessGroupMemberBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
func AccessGroupMemberBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
Expand All @@ -74,7 +74,7 @@ func AccessGroupMemberBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB
}

// bulkInsertAccessGroupMemberEntry inserts a batch of access_group_member entries into the database.
func bulkInsertAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
func bulkInsertAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
Expand All @@ -99,7 +99,7 @@ func bulkInsertAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db *bun.D
}

// bulkDeletePostEntry deletes a batch of access_group_member entries from the database.
func bulkDeleteAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
func bulkDeleteAccessGroupMemberEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

Expand Down
6 changes: 3 additions & 3 deletions entries/balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func BalanceEntryEncoderToPGStruct(balanceEntry *lib.BalanceEntry, keyBytes []by

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// based on the operation type and executes it.
func BalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
func BalanceBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
Expand All @@ -63,7 +63,7 @@ func BalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *
}

// bulkInsertBalanceEntry inserts a batch of balance entries into the database.
func bulkInsertBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
func bulkInsertBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
Expand All @@ -88,7 +88,7 @@ func bulkInsertBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operati
}

// bulkDeletePostEntry deletes a batch of balance entries from the database.
func bulkDeleteBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
func bulkDeleteBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

Expand Down
6 changes: 3 additions & 3 deletions entries/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func BlockEncoderToPGStruct(block *lib.MsgDeSoBlock, keyBytes []byte) *PGBlockEn

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// based on the operation type and executes it.
func BlockBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
func BlockBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
Expand All @@ -62,7 +62,7 @@ func BlockBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *li
}

// bulkInsertUtxoOperationsEntry inserts a batch of user_association entries into the database.
func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// If this block is a part of the initial sync, skip it - it will be handled by the utxo operations.
if operationType == lib.DbOperationTypeInsert {
return nil
Expand Down Expand Up @@ -106,7 +106,7 @@ func bulkInsertBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operation
}

// bulkDeleteBlockEntry deletes a batch of block entries from the database.
func bulkDeleteBlockEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
func bulkDeleteBlockEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

Expand Down
8 changes: 4 additions & 4 deletions entries/dao_coin_limit_order.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type DaoCoinLimitOrderEntry struct {
OperationType uint8 `bun:",nullzero"`
FillType uint8 `bun:",nullzero"`
BlockHeight uint32 `bun:",nullzero"`
IsDaoCoinConst bool `bun:",nullzero"`
IsDaoCoinConst bool
BadgerKey []byte `pg:",pk,use_zero"`
}

Expand Down Expand Up @@ -48,7 +48,7 @@ func DaoCoinLimitOrderEncoderToPGStruct(daoCoinLimitOrder *lib.DAOCoinLimitOrder

// DaoCoinLimitOrderBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// based on the operation type and executes it.
func DaoCoinLimitOrderBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
func DaoCoinLimitOrderBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
Expand All @@ -65,7 +65,7 @@ func DaoCoinLimitOrderBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB
}

// bulkInsertDaoCoinLimitOrderEntry inserts a batch of post_association entries into the database.
func bulkInsertDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
func bulkInsertDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
Expand All @@ -89,7 +89,7 @@ func bulkInsertDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db *bun.D
}

// bulkDeletePostEntry deletes a batch of post_association entries from the database.
func bulkDeleteDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
func bulkDeleteDaoCoinLimitOrderEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

Expand Down
6 changes: 3 additions & 3 deletions entries/derived_key.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func DerivedKeyEncoderToPGStruct(derivedKeyEntry *lib.DerivedKeyEntry, keyBytes

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// based on the operation type and executes it.
func DerivedKeyBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
func DerivedKeyBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
Expand All @@ -73,7 +73,7 @@ func DerivedKeyBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, param
}

// bulkInsertDerivedKeyEntry inserts a batch of derived_key entries into the database.
func bulkInsertDerivedKeyEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
func bulkInsertDerivedKeyEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
Expand Down Expand Up @@ -101,7 +101,7 @@ func bulkInsertDerivedKeyEntry(entries []*lib.StateChangeEntry, db *bun.DB, oper
}

// bulkDeletePostEntry deletes a batch of derived_key entries from the database.
func bulkDeleteDerivedKeyEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
func bulkDeleteDerivedKeyEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

Expand Down
6 changes: 3 additions & 3 deletions entries/deso_balance.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func DesoBalanceEncoderToPGStruct(desoBalanceEntry *lib.DeSoBalanceEntry, keyByt

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// based on the operation type and executes it.
func DesoBalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
func DesoBalanceBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
Expand All @@ -53,7 +53,7 @@ func DesoBalanceBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, para
}

// bulkInsertDiamondEntry inserts a batch of diamond entries into the database.
func bulkInsertDesoBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
func bulkInsertDesoBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
Expand All @@ -77,7 +77,7 @@ func bulkInsertDesoBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, ope
}

// bulkDeletePostEntry deletes a batch of diamond entries from the database.
func bulkDeleteDesoBalanceEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
func bulkDeleteDesoBalanceEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

Expand Down
6 changes: 3 additions & 3 deletions entries/diamond.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func DiamondEncoderToPGStruct(diamondEntry *lib.DiamondEntry, keyBytes []byte, p

// PostBatchOperation is the entry point for processing a batch of post entries. It determines the appropriate handler
// based on the operation type and executes it.
func DiamondBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *lib.DeSoParams) error {
func DiamondBatchOperation(entries []*lib.StateChangeEntry, db bun.IDB, params *lib.DeSoParams) error {
// We check before we call this function that there is at least one operation type.
// We also ensure before this that all entries have the same operation type.
operationType := entries[0].OperationType
Expand All @@ -58,7 +58,7 @@ func DiamondBatchOperation(entries []*lib.StateChangeEntry, db *bun.DB, params *
}

// bulkInsertDiamondEntry inserts a batch of diamond entries into the database.
func bulkInsertDiamondEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
func bulkInsertDiamondEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType, params *lib.DeSoParams) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)
// Create a new array to hold the bun struct.
Expand All @@ -82,7 +82,7 @@ func bulkInsertDiamondEntry(entries []*lib.StateChangeEntry, db *bun.DB, operati
}

// bulkDeletePostEntry deletes a batch of diamond entries from the database.
func bulkDeleteDiamondEntry(entries []*lib.StateChangeEntry, db *bun.DB, operationType lib.StateSyncerOperationType) error {
func bulkDeleteDiamondEntry(entries []*lib.StateChangeEntry, db bun.IDB, operationType lib.StateSyncerOperationType) error {
// Track the unique entries we've inserted so we don't insert the same entry twice.
uniqueEntries := consumer.UniqueEntries(entries)

Expand Down
Loading