diff --git a/Cargo.lock b/Cargo.lock index a07efe292d4e53..d823098e167ce9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1382,12 +1382,6 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e88a8acf291dafb59c2d96e8f59828f3838bb1a70398823ade51a84de6a6deed" -[[package]] -name = "fallible-iterator" -version = "0.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" - [[package]] name = "fast-math" version = "0.1.1" @@ -2466,17 +2460,6 @@ version = "2.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" -[[package]] -name = "md-5" -version = "0.9.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7b5a279bb9607f9f53c22d496eade00d138d1bdcccd07d74650387cf94942a15" -dependencies = [ - "block-buffer 0.9.0", - "digest 0.9.0", - "opaque-debug 0.3.0", -] - [[package]] name = "memchr" version = "2.4.1" @@ -3032,24 +3015,6 @@ dependencies = [ "indexmap", ] -[[package]] -name = "phf" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b9fc3db1018c4b59d7d582a739436478b6035138b6aecbce989fc91c3e98409f" -dependencies = [ - "phf_shared", -] - -[[package]] -name = "phf_shared" -version = "0.10.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6796ad771acdc0123d2a88dc428b5e38ef24456743ddb1744ed628f9815c096" -dependencies = [ - "siphasher", -] - [[package]] name = "pickledb" version = "0.4.1" @@ -3118,62 +3083,6 @@ version = "0.2.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4596b6d070b27117e987119b4dac604f3c58cfb0b191112e24771b2faeac1a6" -[[package]] -name = "postgres" -version = "0.19.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eb76d6535496f633fa799bb872ffb4790e9cbdedda9d35564ca0252f930c0dd5" -dependencies = [ - "bytes 1.1.0", - "fallible-iterator", - "futures 0.3.18", - "log 0.4.14", - "tokio", - "tokio-postgres", -] - -[[package]] -name = "postgres-derive" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c857dd221cb0e7d8414b894a0ce29eae44d453dda0baa132447878e75e701477" -dependencies = [ - "proc-macro2 1.0.32", - "quote 1.0.10", - "syn 1.0.81", -] - -[[package]] -name = "postgres-protocol" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b145e6a4ed52cb316a27787fc20fe8a25221cb476479f61e4e0327c15b98d91a" -dependencies = [ - "base64 0.13.0", - "byteorder", - "bytes 1.1.0", - "fallible-iterator", - "hmac 0.11.0", - "md-5", - "memchr", - "rand 0.8.4", - "sha2", - "stringprep", -] - -[[package]] -name = "postgres-types" -version = "0.2.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "04619f94ba0cc80999f4fc7073607cb825bc739a883cb6d20900fc5e009d6b0d" -dependencies = [ - "bytes 1.1.0", - "chrono", - "fallible-iterator", - "postgres-derive", - "postgres-protocol", -] - [[package]] name = "ppv-lite86" version = "0.2.15" @@ -4332,12 +4241,6 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2a30f10c911c0355f80f1c2faa8096efc4a58cdf8590b954d5b395efa071c711" -[[package]] -name = "siphasher" -version = "0.3.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "533494a8f9b724d33625ab53c6c4800f7cc445895924a8ef649222dcb76e938b" - [[package]] name = "slab" version = "0.4.5" @@ -4494,31 +4397,6 @@ dependencies = [ "thiserror", ] -[[package]] -name = "solana-accountsdb-plugin-postgres" -version = "1.9.7" -dependencies = [ - "bs58 0.4.0", - "chrono", - "crossbeam-channel", - "log 0.4.14", - "postgres", - "postgres-types", - "serde", - "serde_derive", - "serde_json", - "solana-account-decoder", - "solana-accountsdb-plugin-interface", - "solana-logger 1.9.7", - "solana-measure", - "solana-metrics", - "solana-runtime", - "solana-sdk", - "solana-transaction-status", - "thiserror", - "tokio-postgres", -] - [[package]] name = "solana-address-lookup-table-program" version = "1.9.7" @@ -6441,16 +6319,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "stringprep" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ee348cb74b87454fff4b551cbf727025810a004f88aeacae7f85b87f4e9a1c1" -dependencies = [ - "unicode-bidi", - "unicode-normalization", -] - [[package]] name = "strsim" version = "0.8.0" @@ -6885,29 +6753,6 @@ dependencies = [ "tokio", ] -[[package]] -name = "tokio-postgres" -version = "0.7.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b6c8b33df661b548dcd8f9bf87debb8c56c05657ed291122e1188698c2ece95" -dependencies = [ - "async-trait", - "byteorder", - "bytes 1.1.0", - "fallible-iterator", - "futures 0.3.18", - "log 0.4.14", - "parking_lot 0.11.2", - "percent-encoding 2.1.0", - "phf", - "pin-project-lite", - "postgres-protocol", - "postgres-types", - "socket2", - "tokio", - "tokio-util", -] - [[package]] name = "tokio-reactor" version = "0.1.12" diff --git a/Cargo.toml b/Cargo.toml index f26ae0f1d1e2e1..2cdca26d7ffa20 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,6 @@ members = [ "accountsdb-plugin-interface", "accountsdb-plugin-manager", - "accountsdb-plugin-postgres", "accounts-cluster-bench", "bench-streamer", "bench-tps", diff --git a/accountsdb-plugin-postgres/Cargo.toml b/accountsdb-plugin-postgres/Cargo.toml deleted file mode 100644 index 2892b1780382c1..00000000000000 --- a/accountsdb-plugin-postgres/Cargo.toml +++ /dev/null @@ -1,39 +0,0 @@ -[package] -authors = ["Solana Maintainers "] -edition = "2021" -name = "solana-accountsdb-plugin-postgres" -description = "The Solana AccountsDb plugin for PostgreSQL database." -version = "1.9.7" -repository = "https://github.com/solana-labs/solana" -license = "Apache-2.0" -homepage = "https://solana.com/" -documentation = "https://docs.rs/solana-validator" - -[lib] -crate-type = ["cdylib", "rlib"] - -[dependencies] -bs58 = "0.4.0" -chrono = { version = "0.4.11", features = ["serde"] } -crossbeam-channel = "0.5" -log = "0.4.14" -postgres = { version = "0.19.2", features = ["with-chrono-0_4"] } -postgres-types = { version = "0.2.2", features = ["derive"] } -serde = "1.0.130" -serde_derive = "1.0.103" -serde_json = "1.0.72" -solana-accountsdb-plugin-interface = { path = "../accountsdb-plugin-interface", version = "=1.9.7" } -solana-logger = { path = "../logger", version = "=1.9.7" } -solana-measure = { path = "../measure", version = "=1.9.7" } -solana-metrics = { path = "../metrics", version = "=1.9.7" } -solana-runtime = { path = "../runtime", version = "=1.9.7" } -solana-sdk = { path = "../sdk", version = "=1.9.7" } -solana-transaction-status = { path = "../transaction-status", version = "=1.9.7" } -thiserror = "1.0.30" -tokio-postgres = "0.7.4" - -[dev-dependencies] -solana-account-decoder = { path = "../account-decoder", version = "=1.9.7" } - -[package.metadata.docs.rs] -targets = ["x86_64-unknown-linux-gnu"] diff --git a/accountsdb-plugin-postgres/README.md b/accountsdb-plugin-postgres/README.md deleted file mode 100644 index e43c3273636126..00000000000000 --- a/accountsdb-plugin-postgres/README.md +++ /dev/null @@ -1,5 +0,0 @@ -This is an example implementing the AccountsDb plugin for PostgreSQL database. -Please see the `src/accountsdb_plugin_postgres.rs` for the format of the plugin's configuration file. - -To create the schema objects for the database, please use `scripts/create_schema.sql`. -`scripts/drop_schema.sql` can be used to tear down the schema objects. \ No newline at end of file diff --git a/accountsdb-plugin-postgres/scripts/create_schema.sql b/accountsdb-plugin-postgres/scripts/create_schema.sql deleted file mode 100644 index 732bcd6783c164..00000000000000 --- a/accountsdb-plugin-postgres/scripts/create_schema.sql +++ /dev/null @@ -1,201 +0,0 @@ -/** - * This plugin implementation for PostgreSQL requires the following tables - */ --- The table storing accounts - - -CREATE TABLE account ( - pubkey BYTEA PRIMARY KEY, - owner BYTEA, - lamports BIGINT NOT NULL, - slot BIGINT NOT NULL, - executable BOOL NOT NULL, - rent_epoch BIGINT NOT NULL, - data BYTEA, - write_version BIGINT NOT NULL, - updated_on TIMESTAMP NOT NULL -); - --- The table storing slot information -CREATE TABLE slot ( - slot BIGINT PRIMARY KEY, - parent BIGINT, - status VARCHAR(16) NOT NULL, - updated_on TIMESTAMP NOT NULL -); - --- Types for Transactions - -Create TYPE "TransactionErrorCode" AS ENUM ( - 'AccountInUse', - 'AccountLoadedTwice', - 'AccountNotFound', - 'ProgramAccountNotFound', - 'InsufficientFundsForFee', - 'InvalidAccountForFee', - 'AlreadyProcessed', - 'BlockhashNotFound', - 'InstructionError', - 'CallChainTooDeep', - 'MissingSignatureForFee', - 'InvalidAccountIndex', - 'SignatureFailure', - 'InvalidProgramForExecution', - 'SanitizeFailure', - 'ClusterMaintenance', - 'AccountBorrowOutstanding', - 'WouldExceedMaxAccountCostLimit', - 'WouldExceedMaxBlockCostLimit', - 'UnsupportedVersion', - 'InvalidWritableAccount', - 'WouldExceedMaxAccountDataCostLimit', - 'TooManyAccountLocks', - 'AddressLookupTableNotFound', - 'InvalidAddressLookupTableOwner', - 'InvalidAddressLookupTableData', - 'InvalidAddressLookupTableIndex', - 'InvalidRentPayingAccount' -); - -CREATE TYPE "TransactionError" AS ( - error_code "TransactionErrorCode", - error_detail VARCHAR(256) -); - -CREATE TYPE "CompiledInstruction" AS ( - program_id_index SMALLINT, - accounts SMALLINT[], - data BYTEA -); - -CREATE TYPE "InnerInstructions" AS ( - index SMALLINT, - instructions "CompiledInstruction"[] -); - -CREATE TYPE "TransactionTokenBalance" AS ( - account_index SMALLINT, - mint VARCHAR(44), - ui_token_amount DOUBLE PRECISION, - owner VARCHAR(44) -); - -Create TYPE "RewardType" AS ENUM ( - 'Fee', - 'Rent', - 'Staking', - 'Voting' -); - -CREATE TYPE "Reward" AS ( - pubkey VARCHAR(44), - lamports BIGINT, - post_balance BIGINT, - reward_type "RewardType", - commission SMALLINT -); - -CREATE TYPE "TransactionStatusMeta" AS ( - error "TransactionError", - fee BIGINT, - pre_balances BIGINT[], - post_balances BIGINT[], - inner_instructions "InnerInstructions"[], - log_messages TEXT[], - pre_token_balances "TransactionTokenBalance"[], - post_token_balances "TransactionTokenBalance"[], - rewards "Reward"[] -); - -CREATE TYPE "TransactionMessageHeader" AS ( - num_required_signatures SMALLINT, - num_readonly_signed_accounts SMALLINT, - num_readonly_unsigned_accounts SMALLINT -); - -CREATE TYPE "TransactionMessage" AS ( - header "TransactionMessageHeader", - account_keys BYTEA[], - recent_blockhash BYTEA, - instructions "CompiledInstruction"[] -); - -CREATE TYPE "TransactionMessageAddressTableLookup" AS ( - account_key BYTEA, - writable_indexes SMALLINT[], - readonly_indexes SMALLINT[] -); - -CREATE TYPE "TransactionMessageV0" AS ( - header "TransactionMessageHeader", - account_keys BYTEA[], - recent_blockhash BYTEA, - instructions "CompiledInstruction"[], - address_table_lookups "TransactionMessageAddressTableLookup"[] -); - -CREATE TYPE "LoadedAddresses" AS ( - writable BYTEA[], - readonly BYTEA[] -); - -CREATE TYPE "LoadedMessageV0" AS ( - message "TransactionMessageV0", - loaded_addresses "LoadedAddresses" -); - --- The table storing transactions -CREATE TABLE transaction ( - slot BIGINT NOT NULL, - signature BYTEA NOT NULL, - is_vote BOOL NOT NULL, - message_type SMALLINT, -- 0: legacy, 1: v0 message - legacy_message "TransactionMessage", - v0_loaded_message "LoadedMessageV0", - signatures BYTEA[], - message_hash BYTEA, - meta "TransactionStatusMeta", - updated_on TIMESTAMP NOT NULL, - CONSTRAINT transaction_pk PRIMARY KEY (slot, signature) -); - --- The table storing block metadata -CREATE TABLE block ( - slot BIGINT PRIMARY KEY, - blockhash VARCHAR(44), - rewards "Reward"[], - block_time BIGINT, - block_height BIGINT, - updated_on TIMESTAMP NOT NULL -); - -/** - * The following is for keeping historical data for accounts and is not required for plugin to work. - */ --- The table storing historical data for accounts -CREATE TABLE account_audit ( - pubkey BYTEA, - owner BYTEA, - lamports BIGINT NOT NULL, - slot BIGINT NOT NULL, - executable BOOL NOT NULL, - rent_epoch BIGINT NOT NULL, - data BYTEA, - write_version BIGINT NOT NULL, - updated_on TIMESTAMP NOT NULL -); - -CREATE INDEX account_audit_account_key ON account_audit (pubkey, write_version); - -CREATE FUNCTION audit_account_update() RETURNS trigger AS $audit_account_update$ - BEGIN - INSERT INTO account_audit (pubkey, owner, lamports, slot, executable, rent_epoch, data, write_version, updated_on) - VALUES (OLD.pubkey, OLD.owner, OLD.lamports, OLD.slot, - OLD.executable, OLD.rent_epoch, OLD.data, OLD.write_version, OLD.updated_on); - RETURN NEW; - END; - -$audit_account_update$ LANGUAGE plpgsql; - -CREATE TRIGGER account_update_trigger AFTER UPDATE OR DELETE ON account - FOR EACH ROW EXECUTE PROCEDURE audit_account_update(); diff --git a/accountsdb-plugin-postgres/scripts/drop_schema.sql b/accountsdb-plugin-postgres/scripts/drop_schema.sql deleted file mode 100644 index 448564f93399cf..00000000000000 --- a/accountsdb-plugin-postgres/scripts/drop_schema.sql +++ /dev/null @@ -1,26 +0,0 @@ -/** - * Script for cleaning up the schema for PostgreSQL used for the AccountsDb plugin. - */ - -DROP TRIGGER account_update_trigger ON account; -DROP FUNCTION audit_account_update; -DROP TABLE account_audit; -DROP TABLE account; -DROP TABLE slot; -DROP TABLE transaction; -DROP TABLE block; - -DROP TYPE "TransactionError" CASCADE; -DROP TYPE "TransactionErrorCode" CASCADE; -DROP TYPE "LoadedMessageV0" CASCADE; -DROP TYPE "LoadedAddresses" CASCADE; -DROP TYPE "TransactionMessageV0" CASCADE; -DROP TYPE "TransactionMessage" CASCADE; -DROP TYPE "TransactionMessageHeader" CASCADE; -DROP TYPE "TransactionMessageAddressTableLookup" CASCADE; -DROP TYPE "TransactionStatusMeta" CASCADE; -DROP TYPE "RewardType" CASCADE; -DROP TYPE "Reward" CASCADE; -DROP TYPE "TransactionTokenBalance" CASCADE; -DROP TYPE "InnerInstructions" CASCADE; -DROP TYPE "CompiledInstruction" CASCADE; diff --git a/accountsdb-plugin-postgres/scripts/postgresql.conf b/accountsdb-plugin-postgres/scripts/postgresql.conf deleted file mode 100644 index 53b089ab21eb3b..00000000000000 --- a/accountsdb-plugin-postgres/scripts/postgresql.conf +++ /dev/null @@ -1,802 +0,0 @@ -# This a reference configuration file for the PostgreSQL database version 14. - -# ----------------------------- -# PostgreSQL configuration file -# ----------------------------- -# -# This file consists of lines of the form: -# -# name = value -# -# (The "=" is optional.) Whitespace may be used. Comments are introduced with -# "#" anywhere on a line. The complete list of parameter names and allowed -# values can be found in the PostgreSQL documentation. -# -# The commented-out settings shown in this file represent the default values. -# Re-commenting a setting is NOT sufficient to revert it to the default value; -# you need to reload the server. -# -# This file is read on server startup and when the server receives a SIGHUP -# signal. If you edit the file on a running system, you have to SIGHUP the -# server for the changes to take effect, run "pg_ctl reload", or execute -# "SELECT pg_reload_conf()". Some parameters, which are marked below, -# require a server shutdown and restart to take effect. -# -# Any parameter can also be given as a command-line option to the server, e.g., -# "postgres -c log_connections=on". Some parameters can be changed at run time -# with the "SET" SQL command. -# -# Memory units: B = bytes Time units: us = microseconds -# kB = kilobytes ms = milliseconds -# MB = megabytes s = seconds -# GB = gigabytes min = minutes -# TB = terabytes h = hours -# d = days - - -#------------------------------------------------------------------------------ -# FILE LOCATIONS -#------------------------------------------------------------------------------ - -# The default values of these variables are driven from the -D command-line -# option or PGDATA environment variable, represented here as ConfigDir. - -data_directory = '/var/lib/postgresql/14/main' # use data in another directory - # (change requires restart) - -hba_file = '/etc/postgresql/14/main/pg_hba.conf' # host-based authentication file - # (change requires restart) -ident_file = '/etc/postgresql/14/main/pg_ident.conf' # ident configuration file - # (change requires restart) - -# If external_pid_file is not explicitly set, no extra PID file is written. -external_pid_file = '/var/run/postgresql/14-main.pid' # write an extra PID file - # (change requires restart) - - -#------------------------------------------------------------------------------ -# CONNECTIONS AND AUTHENTICATION -#------------------------------------------------------------------------------ - -# - Connection Settings - - -#listen_addresses = 'localhost' # what IP address(es) to listen on; - # comma-separated list of addresses; - # defaults to 'localhost'; use '*' for all - # (change requires restart) -listen_addresses = '*' -port = 5433 # (change requires restart) -max_connections = 200 # (change requires restart) -#superuser_reserved_connections = 3 # (change requires restart) -unix_socket_directories = '/var/run/postgresql' # comma-separated list of directories - # (change requires restart) -#unix_socket_group = '' # (change requires restart) -#unix_socket_permissions = 0777 # begin with 0 to use octal notation - # (change requires restart) -#bonjour = off # advertise server via Bonjour - # (change requires restart) -#bonjour_name = '' # defaults to the computer name - # (change requires restart) - -# - TCP settings - -# see "man tcp" for details - -#tcp_keepalives_idle = 0 # TCP_KEEPIDLE, in seconds; - # 0 selects the system default -#tcp_keepalives_interval = 0 # TCP_KEEPINTVL, in seconds; - # 0 selects the system default -#tcp_keepalives_count = 0 # TCP_KEEPCNT; - # 0 selects the system default -#tcp_user_timeout = 0 # TCP_USER_TIMEOUT, in milliseconds; - # 0 selects the system default - -#client_connection_check_interval = 0 # time between checks for client - # disconnection while running queries; - # 0 for never - -# - Authentication - - -#authentication_timeout = 1min # 1s-600s -#password_encryption = scram-sha-256 # scram-sha-256 or md5 -#db_user_namespace = off - -# GSSAPI using Kerberos -#krb_server_keyfile = 'FILE:${sysconfdir}/krb5.keytab' -#krb_caseins_users = off - -# - SSL - - -ssl = on -#ssl_ca_file = '' -ssl_cert_file = '/etc/ssl/certs/ssl-cert-snakeoil.pem' -#ssl_crl_file = '' -#ssl_crl_dir = '' -ssl_key_file = '/etc/ssl/private/ssl-cert-snakeoil.key' -#ssl_ciphers = 'HIGH:MEDIUM:+3DES:!aNULL' # allowed SSL ciphers -#ssl_prefer_server_ciphers = on -#ssl_ecdh_curve = 'prime256v1' -#ssl_min_protocol_version = 'TLSv1.2' -#ssl_max_protocol_version = '' -#ssl_dh_params_file = '' -#ssl_passphrase_command = '' -#ssl_passphrase_command_supports_reload = off - - -#------------------------------------------------------------------------------ -# RESOURCE USAGE (except WAL) -#------------------------------------------------------------------------------ - -# - Memory - - -shared_buffers = 1GB # min 128kB - # (change requires restart) -#huge_pages = try # on, off, or try - # (change requires restart) -#huge_page_size = 0 # zero for system default - # (change requires restart) -#temp_buffers = 8MB # min 800kB -#max_prepared_transactions = 0 # zero disables the feature - # (change requires restart) -# Caution: it is not advisable to set max_prepared_transactions nonzero unless -# you actively intend to use prepared transactions. -#work_mem = 4MB # min 64kB -#hash_mem_multiplier = 1.0 # 1-1000.0 multiplier on hash table work_mem -#maintenance_work_mem = 64MB # min 1MB -#autovacuum_work_mem = -1 # min 1MB, or -1 to use maintenance_work_mem -#logical_decoding_work_mem = 64MB # min 64kB -#max_stack_depth = 2MB # min 100kB -#shared_memory_type = mmap # the default is the first option - # supported by the operating system: - # mmap - # sysv - # windows - # (change requires restart) -dynamic_shared_memory_type = posix # the default is the first option - # supported by the operating system: - # posix - # sysv - # windows - # mmap - # (change requires restart) -#min_dynamic_shared_memory = 0MB # (change requires restart) - -# - Disk - - -#temp_file_limit = -1 # limits per-process temp file space - # in kilobytes, or -1 for no limit - -# - Kernel Resources - - -#max_files_per_process = 1000 # min 64 - # (change requires restart) - -# - Cost-Based Vacuum Delay - - -#vacuum_cost_delay = 0 # 0-100 milliseconds (0 disables) -#vacuum_cost_page_hit = 1 # 0-10000 credits -#vacuum_cost_page_miss = 2 # 0-10000 credits -#vacuum_cost_page_dirty = 20 # 0-10000 credits -#vacuum_cost_limit = 200 # 1-10000 credits - -# - Background Writer - - -#bgwriter_delay = 200ms # 10-10000ms between rounds -#bgwriter_lru_maxpages = 100 # max buffers written/round, 0 disables -#bgwriter_lru_multiplier = 2.0 # 0-10.0 multiplier on buffers scanned/round -#bgwriter_flush_after = 512kB # measured in pages, 0 disables - -# - Asynchronous Behavior - - -#backend_flush_after = 0 # measured in pages, 0 disables -effective_io_concurrency = 1000 # 1-1000; 0 disables prefetching -#maintenance_io_concurrency = 10 # 1-1000; 0 disables prefetching -#max_worker_processes = 8 # (change requires restart) -#max_parallel_workers_per_gather = 2 # taken from max_parallel_workers -#max_parallel_maintenance_workers = 2 # taken from max_parallel_workers -#max_parallel_workers = 8 # maximum number of max_worker_processes that - # can be used in parallel operations -#parallel_leader_participation = on -#old_snapshot_threshold = -1 # 1min-60d; -1 disables; 0 is immediate - # (change requires restart) - - -#------------------------------------------------------------------------------ -# WRITE-AHEAD LOG -#------------------------------------------------------------------------------ - -# - Settings - - -wal_level = minimal # minimal, replica, or logical - # (change requires restart) -fsync = off # flush data to disk for crash safety - # (turning this off can cause - # unrecoverable data corruption) -synchronous_commit = off # synchronization level; - # off, local, remote_write, remote_apply, or on -#wal_sync_method = fsync # the default is the first option - # supported by the operating system: - # open_datasync - # fdatasync (default on Linux and FreeBSD) - # fsync - # fsync_writethrough - # open_sync -full_page_writes = off # recover from partial page writes -#wal_log_hints = off # also do full page writes of non-critical updates - # (change requires restart) -#wal_compression = off # enable compression of full-page writes -#wal_init_zero = on # zero-fill new WAL files -#wal_recycle = on # recycle WAL files -#wal_buffers = -1 # min 32kB, -1 sets based on shared_buffers - # (change requires restart) -#wal_writer_delay = 200ms # 1-10000 milliseconds -#wal_writer_flush_after = 1MB # measured in pages, 0 disables -#wal_skip_threshold = 2MB - -#commit_delay = 0 # range 0-100000, in microseconds -#commit_siblings = 5 # range 1-1000 - -# - Checkpoints - - -#checkpoint_timeout = 5min # range 30s-1d -#checkpoint_completion_target = 0.9 # checkpoint target duration, 0.0 - 1.0 -#checkpoint_flush_after = 256kB # measured in pages, 0 disables -#checkpoint_warning = 30s # 0 disables -max_wal_size = 1GB -min_wal_size = 80MB - -# - Archiving - - -#archive_mode = off # enables archiving; off, on, or always - # (change requires restart) -#archive_command = '' # command to use to archive a logfile segment - # placeholders: %p = path of file to archive - # %f = file name only - # e.g. 'test ! -f /mnt/server/archivedir/%f && cp %p /mnt/server/archivedir/%f' -#archive_timeout = 0 # force a logfile segment switch after this - # number of seconds; 0 disables - -# - Archive Recovery - - -# These are only used in recovery mode. - -#restore_command = '' # command to use to restore an archived logfile segment - # placeholders: %p = path of file to restore - # %f = file name only - # e.g. 'cp /mnt/server/archivedir/%f %p' -#archive_cleanup_command = '' # command to execute at every restartpoint -#recovery_end_command = '' # command to execute at completion of recovery - -# - Recovery Target - - -# Set these only when performing a targeted recovery. - -#recovery_target = '' # 'immediate' to end recovery as soon as a - # consistent state is reached - # (change requires restart) -#recovery_target_name = '' # the named restore point to which recovery will proceed - # (change requires restart) -#recovery_target_time = '' # the time stamp up to which recovery will proceed - # (change requires restart) -#recovery_target_xid = '' # the transaction ID up to which recovery will proceed - # (change requires restart) -#recovery_target_lsn = '' # the WAL LSN up to which recovery will proceed - # (change requires restart) -#recovery_target_inclusive = on # Specifies whether to stop: - # just after the specified recovery target (on) - # just before the recovery target (off) - # (change requires restart) -#recovery_target_timeline = 'latest' # 'current', 'latest', or timeline ID - # (change requires restart) -#recovery_target_action = 'pause' # 'pause', 'promote', 'shutdown' - # (change requires restart) - - -#------------------------------------------------------------------------------ -# REPLICATION -#------------------------------------------------------------------------------ - -# - Sending Servers - - -# Set these on the primary and on any standby that will send replication data. - -max_wal_senders = 0 # max number of walsender processes - # (change requires restart) -#max_replication_slots = 10 # max number of replication slots - # (change requires restart) -#wal_keep_size = 0 # in megabytes; 0 disables -#max_slot_wal_keep_size = -1 # in megabytes; -1 disables -#wal_sender_timeout = 60s # in milliseconds; 0 disables -#track_commit_timestamp = off # collect timestamp of transaction commit - # (change requires restart) - -# - Primary Server - - -# These settings are ignored on a standby server. - -#synchronous_standby_names = '' # standby servers that provide sync rep - # method to choose sync standbys, number of sync standbys, - # and comma-separated list of application_name - # from standby(s); '*' = all -#vacuum_defer_cleanup_age = 0 # number of xacts by which cleanup is delayed - -# - Standby Servers - - -# These settings are ignored on a primary server. - -#primary_conninfo = '' # connection string to sending server -#primary_slot_name = '' # replication slot on sending server -#promote_trigger_file = '' # file name whose presence ends recovery -#hot_standby = on # "off" disallows queries during recovery - # (change requires restart) -#max_standby_archive_delay = 30s # max delay before canceling queries - # when reading WAL from archive; - # -1 allows indefinite delay -#max_standby_streaming_delay = 30s # max delay before canceling queries - # when reading streaming WAL; - # -1 allows indefinite delay -#wal_receiver_create_temp_slot = off # create temp slot if primary_slot_name - # is not set -#wal_receiver_status_interval = 10s # send replies at least this often - # 0 disables -#hot_standby_feedback = off # send info from standby to prevent - # query conflicts -#wal_receiver_timeout = 60s # time that receiver waits for - # communication from primary - # in milliseconds; 0 disables -#wal_retrieve_retry_interval = 5s # time to wait before retrying to - # retrieve WAL after a failed attempt -#recovery_min_apply_delay = 0 # minimum delay for applying changes during recovery - -# - Subscribers - - -# These settings are ignored on a publisher. - -#max_logical_replication_workers = 4 # taken from max_worker_processes - # (change requires restart) -#max_sync_workers_per_subscription = 2 # taken from max_logical_replication_workers - - -#------------------------------------------------------------------------------ -# QUERY TUNING -#------------------------------------------------------------------------------ - -# - Planner Method Configuration - - -#enable_async_append = on -#enable_bitmapscan = on -#enable_gathermerge = on -#enable_hashagg = on -#enable_hashjoin = on -#enable_incremental_sort = on -#enable_indexscan = on -#enable_indexonlyscan = on -#enable_material = on -#enable_memoize = on -#enable_mergejoin = on -#enable_nestloop = on -#enable_parallel_append = on -#enable_parallel_hash = on -#enable_partition_pruning = on -#enable_partitionwise_join = off -#enable_partitionwise_aggregate = off -#enable_seqscan = on -#enable_sort = on -#enable_tidscan = on - -# - Planner Cost Constants - - -#seq_page_cost = 1.0 # measured on an arbitrary scale -#random_page_cost = 4.0 # same scale as above -#cpu_tuple_cost = 0.01 # same scale as above -#cpu_index_tuple_cost = 0.005 # same scale as above -#cpu_operator_cost = 0.0025 # same scale as above -#parallel_setup_cost = 1000.0 # same scale as above -#parallel_tuple_cost = 0.1 # same scale as above -#min_parallel_table_scan_size = 8MB -#min_parallel_index_scan_size = 512kB -#effective_cache_size = 4GB - -#jit_above_cost = 100000 # perform JIT compilation if available - # and query more expensive than this; - # -1 disables -#jit_inline_above_cost = 500000 # inline small functions if query is - # more expensive than this; -1 disables -#jit_optimize_above_cost = 500000 # use expensive JIT optimizations if - # query is more expensive than this; - # -1 disables - -# - Genetic Query Optimizer - - -#geqo = on -#geqo_threshold = 12 -#geqo_effort = 5 # range 1-10 -#geqo_pool_size = 0 # selects default based on effort -#geqo_generations = 0 # selects default based on effort -#geqo_selection_bias = 2.0 # range 1.5-2.0 -#geqo_seed = 0.0 # range 0.0-1.0 - -# - Other Planner Options - - -#default_statistics_target = 100 # range 1-10000 -#constraint_exclusion = partition # on, off, or partition -#cursor_tuple_fraction = 0.1 # range 0.0-1.0 -#from_collapse_limit = 8 -#jit = on # allow JIT compilation -#join_collapse_limit = 8 # 1 disables collapsing of explicit - # JOIN clauses -#plan_cache_mode = auto # auto, force_generic_plan or - # force_custom_plan - - -#------------------------------------------------------------------------------ -# REPORTING AND LOGGING -#------------------------------------------------------------------------------ - -# - Where to Log - - -#log_destination = 'stderr' # Valid values are combinations of - # stderr, csvlog, syslog, and eventlog, - # depending on platform. csvlog - # requires logging_collector to be on. - -# This is used when logging to stderr: -#logging_collector = off # Enable capturing of stderr and csvlog - # into log files. Required to be on for - # csvlogs. - # (change requires restart) - -# These are only used if logging_collector is on: -#log_directory = 'log' # directory where log files are written, - # can be absolute or relative to PGDATA -#log_filename = 'postgresql-%Y-%m-%d_%H%M%S.log' # log file name pattern, - # can include strftime() escapes -#log_file_mode = 0600 # creation mode for log files, - # begin with 0 to use octal notation -#log_rotation_age = 1d # Automatic rotation of logfiles will - # happen after that time. 0 disables. -#log_rotation_size = 10MB # Automatic rotation of logfiles will - # happen after that much log output. - # 0 disables. -#log_truncate_on_rotation = off # If on, an existing log file with the - # same name as the new log file will be - # truncated rather than appended to. - # But such truncation only occurs on - # time-driven rotation, not on restarts - # or size-driven rotation. Default is - # off, meaning append to existing files - # in all cases. - -# These are relevant when logging to syslog: -#syslog_facility = 'LOCAL0' -#syslog_ident = 'postgres' -#syslog_sequence_numbers = on -#syslog_split_messages = on - -# This is only relevant when logging to eventlog (Windows): -# (change requires restart) -#event_source = 'PostgreSQL' - -# - When to Log - - -#log_min_messages = warning # values in order of decreasing detail: - # debug5 - # debug4 - # debug3 - # debug2 - # debug1 - # info - # notice - # warning - # error - # log - # fatal - # panic - -#log_min_error_statement = error # values in order of decreasing detail: - # debug5 - # debug4 - # debug3 - # debug2 - # debug1 - # info - # notice - # warning - # error - # log - # fatal - # panic (effectively off) - -#log_min_duration_statement = -1 # -1 is disabled, 0 logs all statements - # and their durations, > 0 logs only - # statements running at least this number - # of milliseconds - -#log_min_duration_sample = -1 # -1 is disabled, 0 logs a sample of statements - # and their durations, > 0 logs only a sample of - # statements running at least this number - # of milliseconds; - # sample fraction is determined by log_statement_sample_rate - -#log_statement_sample_rate = 1.0 # fraction of logged statements exceeding - # log_min_duration_sample to be logged; - # 1.0 logs all such statements, 0.0 never logs - - -#log_transaction_sample_rate = 0.0 # fraction of transactions whose statements - # are logged regardless of their duration; 1.0 logs all - # statements from all transactions, 0.0 never logs - -# - What to Log - - -#debug_print_parse = off -#debug_print_rewritten = off -#debug_print_plan = off -#debug_pretty_print = on -#log_autovacuum_min_duration = -1 # log autovacuum activity; - # -1 disables, 0 logs all actions and - # their durations, > 0 logs only - # actions running at least this number - # of milliseconds. -#log_checkpoints = off -#log_connections = off -#log_disconnections = off -#log_duration = off -#log_error_verbosity = default # terse, default, or verbose messages -#log_hostname = off -log_line_prefix = '%m [%p] %q%u@%d ' # special values: - # %a = application name - # %u = user name - # %d = database name - # %r = remote host and port - # %h = remote host - # %b = backend type - # %p = process ID - # %P = process ID of parallel group leader - # %t = timestamp without milliseconds - # %m = timestamp with milliseconds - # %n = timestamp with milliseconds (as a Unix epoch) - # %Q = query ID (0 if none or not computed) - # %i = command tag - # %e = SQL state - # %c = session ID - # %l = session line number - # %s = session start timestamp - # %v = virtual transaction ID - # %x = transaction ID (0 if none) - # %q = stop here in non-session - # processes - # %% = '%' - # e.g. '<%u%%%d> ' -#log_lock_waits = off # log lock waits >= deadlock_timeout -#log_recovery_conflict_waits = off # log standby recovery conflict waits - # >= deadlock_timeout -#log_parameter_max_length = -1 # when logging statements, limit logged - # bind-parameter values to N bytes; - # -1 means print in full, 0 disables -#log_parameter_max_length_on_error = 0 # when logging an error, limit logged - # bind-parameter values to N bytes; - # -1 means print in full, 0 disables -#log_statement = 'none' # none, ddl, mod, all -#log_replication_commands = off -#log_temp_files = -1 # log temporary files equal or larger - # than the specified size in kilobytes; - # -1 disables, 0 logs all temp files -log_timezone = 'Etc/UTC' - - -#------------------------------------------------------------------------------ -# PROCESS TITLE -#------------------------------------------------------------------------------ - -cluster_name = '14/main' # added to process titles if nonempty - # (change requires restart) -#update_process_title = on - - -#------------------------------------------------------------------------------ -# STATISTICS -#------------------------------------------------------------------------------ - -# - Query and Index Statistics Collector - - -#track_activities = on -#track_activity_query_size = 1024 # (change requires restart) -#track_counts = on -#track_io_timing = off -#track_wal_io_timing = off -#track_functions = none # none, pl, all -stats_temp_directory = '/var/run/postgresql/14-main.pg_stat_tmp' - - -# - Monitoring - - -#compute_query_id = auto -#log_statement_stats = off -#log_parser_stats = off -#log_planner_stats = off -#log_executor_stats = off - - -#------------------------------------------------------------------------------ -# AUTOVACUUM -#------------------------------------------------------------------------------ - -#autovacuum = on # Enable autovacuum subprocess? 'on' - # requires track_counts to also be on. -#autovacuum_max_workers = 3 # max number of autovacuum subprocesses - # (change requires restart) -#autovacuum_naptime = 1min # time between autovacuum runs -#autovacuum_vacuum_threshold = 50 # min number of row updates before - # vacuum -#autovacuum_vacuum_insert_threshold = 1000 # min number of row inserts - # before vacuum; -1 disables insert - # vacuums -#autovacuum_analyze_threshold = 50 # min number of row updates before - # analyze -#autovacuum_vacuum_scale_factor = 0.2 # fraction of table size before vacuum -#autovacuum_vacuum_insert_scale_factor = 0.2 # fraction of inserts over table - # size before insert vacuum -#autovacuum_analyze_scale_factor = 0.1 # fraction of table size before analyze -#autovacuum_freeze_max_age = 200000000 # maximum XID age before forced vacuum - # (change requires restart) -#autovacuum_multixact_freeze_max_age = 400000000 # maximum multixact age - # before forced vacuum - # (change requires restart) -#autovacuum_vacuum_cost_delay = 2ms # default vacuum cost delay for - # autovacuum, in milliseconds; - # -1 means use vacuum_cost_delay -#autovacuum_vacuum_cost_limit = -1 # default vacuum cost limit for - # autovacuum, -1 means use - # vacuum_cost_limit - - -#------------------------------------------------------------------------------ -# CLIENT CONNECTION DEFAULTS -#------------------------------------------------------------------------------ - -# - Statement Behavior - - -#client_min_messages = notice # values in order of decreasing detail: - # debug5 - # debug4 - # debug3 - # debug2 - # debug1 - # log - # notice - # warning - # error -#search_path = '"$user", public' # schema names -#row_security = on -#default_table_access_method = 'heap' -#default_tablespace = '' # a tablespace name, '' uses the default -#default_toast_compression = 'pglz' # 'pglz' or 'lz4' -#temp_tablespaces = '' # a list of tablespace names, '' uses - # only default tablespace -#check_function_bodies = on -#default_transaction_isolation = 'read committed' -#default_transaction_read_only = off -#default_transaction_deferrable = off -#session_replication_role = 'origin' -#statement_timeout = 0 # in milliseconds, 0 is disabled -#lock_timeout = 0 # in milliseconds, 0 is disabled -#idle_in_transaction_session_timeout = 0 # in milliseconds, 0 is disabled -#idle_session_timeout = 0 # in milliseconds, 0 is disabled -#vacuum_freeze_table_age = 150000000 -#vacuum_freeze_min_age = 50000000 -#vacuum_failsafe_age = 1600000000 -#vacuum_multixact_freeze_table_age = 150000000 -#vacuum_multixact_freeze_min_age = 5000000 -#vacuum_multixact_failsafe_age = 1600000000 -#bytea_output = 'hex' # hex, escape -#xmlbinary = 'base64' -#xmloption = 'content' -#gin_pending_list_limit = 4MB - -# - Locale and Formatting - - -datestyle = 'iso, mdy' -#intervalstyle = 'postgres' -timezone = 'Etc/UTC' -#timezone_abbreviations = 'Default' # Select the set of available time zone - # abbreviations. Currently, there are - # Default - # Australia (historical usage) - # India - # You can create your own file in - # share/timezonesets/. -#extra_float_digits = 1 # min -15, max 3; any value >0 actually - # selects precise output mode -#client_encoding = sql_ascii # actually, defaults to database - # encoding - -# These settings are initialized by initdb, but they can be changed. -lc_messages = 'C.UTF-8' # locale for system error message - # strings -lc_monetary = 'C.UTF-8' # locale for monetary formatting -lc_numeric = 'C.UTF-8' # locale for number formatting -lc_time = 'C.UTF-8' # locale for time formatting - -# default configuration for text search -default_text_search_config = 'pg_catalog.english' - -# - Shared Library Preloading - - -#local_preload_libraries = '' -#session_preload_libraries = '' -#shared_preload_libraries = '' # (change requires restart) -#jit_provider = 'llvmjit' # JIT library to use - -# - Other Defaults - - -#dynamic_library_path = '$libdir' -#extension_destdir = '' # prepend path when loading extensions - # and shared objects (added by Debian) -#gin_fuzzy_search_limit = 0 - - -#------------------------------------------------------------------------------ -# LOCK MANAGEMENT -#------------------------------------------------------------------------------ - -#deadlock_timeout = 1s -#max_locks_per_transaction = 64 # min 10 - # (change requires restart) -#max_pred_locks_per_transaction = 64 # min 10 - # (change requires restart) -#max_pred_locks_per_relation = -2 # negative values mean - # (max_pred_locks_per_transaction - # / -max_pred_locks_per_relation) - 1 -#max_pred_locks_per_page = 2 # min 0 - - -#------------------------------------------------------------------------------ -# VERSION AND PLATFORM COMPATIBILITY -#------------------------------------------------------------------------------ - -# - Previous PostgreSQL Versions - - -#array_nulls = on -#backslash_quote = safe_encoding # on, off, or safe_encoding -#escape_string_warning = on -#lo_compat_privileges = off -#quote_all_identifiers = off -#standard_conforming_strings = on -#synchronize_seqscans = on - -# - Other Platforms and Clients - - -#transform_null_equals = off - - -#------------------------------------------------------------------------------ -# ERROR HANDLING -#------------------------------------------------------------------------------ - -#exit_on_error = off # terminate session on any error? -#restart_after_crash = on # reinitialize after backend crash? -#data_sync_retry = off # retry or panic on failure to fsync - # data? - # (change requires restart) -#recovery_init_sync_method = fsync # fsync, syncfs (Linux 5.8+) - - -#------------------------------------------------------------------------------ -# CONFIG FILE INCLUDES -#------------------------------------------------------------------------------ - -# These options allow settings to be loaded from files other than the -# default postgresql.conf. Note that these are directives, not variable -# assignments, so they can usefully be given more than once. - -include_dir = 'conf.d' # include files ending in '.conf' from - # a directory, e.g., 'conf.d' -#include_if_exists = '...' # include file only if it exists -#include = '...' # include file - - -#------------------------------------------------------------------------------ -# CUSTOMIZED OPTIONS -#------------------------------------------------------------------------------ - -# Add settings for extensions here \ No newline at end of file diff --git a/accountsdb-plugin-postgres/src/accounts_selector.rs b/accountsdb-plugin-postgres/src/accounts_selector.rs deleted file mode 100644 index 77398b76c13e38..00000000000000 --- a/accountsdb-plugin-postgres/src/accounts_selector.rs +++ /dev/null @@ -1,74 +0,0 @@ -use {log::*, std::collections::HashSet}; - -#[derive(Debug)] -pub(crate) struct AccountsSelector { - pub accounts: HashSet>, - pub owners: HashSet>, - pub select_all_accounts: bool, -} - -impl AccountsSelector { - pub fn default() -> Self { - AccountsSelector { - accounts: HashSet::default(), - owners: HashSet::default(), - select_all_accounts: true, - } - } - - pub fn new(accounts: &[String], owners: &[String]) -> Self { - info!( - "Creating AccountsSelector from accounts: {:?}, owners: {:?}", - accounts, owners - ); - - let select_all_accounts = accounts.iter().any(|key| key == "*"); - if select_all_accounts { - return AccountsSelector { - accounts: HashSet::default(), - owners: HashSet::default(), - select_all_accounts, - }; - } - let accounts = accounts - .iter() - .map(|key| bs58::decode(key).into_vec().unwrap()) - .collect(); - let owners = owners - .iter() - .map(|key| bs58::decode(key).into_vec().unwrap()) - .collect(); - AccountsSelector { - accounts, - owners, - select_all_accounts, - } - } - - pub fn is_account_selected(&self, account: &[u8], owner: &[u8]) -> bool { - self.select_all_accounts || self.accounts.contains(account) || self.owners.contains(owner) - } - - /// Check if any account is of interested at all - pub fn is_enabled(&self) -> bool { - self.select_all_accounts || !self.accounts.is_empty() || !self.owners.is_empty() - } -} - -#[cfg(test)] -pub(crate) mod tests { - use super::*; - - #[test] - fn test_create_accounts_selector() { - AccountsSelector::new( - &["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_string()], - &[], - ); - - AccountsSelector::new( - &[], - &["9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin".to_string()], - ); - } -} diff --git a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs b/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs deleted file mode 100644 index 45bb441bd9100d..00000000000000 --- a/accountsdb-plugin-postgres/src/accountsdb_plugin_postgres.rs +++ /dev/null @@ -1,466 +0,0 @@ -use solana_measure::measure::Measure; -/// Main entry for the PostgreSQL plugin -use { - crate::{ - accounts_selector::AccountsSelector, - postgres_client::{ParallelPostgresClient, PostgresClientBuilder}, - transaction_selector::TransactionSelector, - }, - bs58, - log::*, - serde_derive::{Deserialize, Serialize}, - serde_json, - solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ - AccountsDbPlugin, AccountsDbPluginError, ReplicaAccountInfoVersions, - ReplicaBlockInfoVersions, ReplicaTransactionInfoVersions, Result, SlotStatus, - }, - solana_metrics::*, - std::{fs::File, io::Read}, - thiserror::Error, -}; - -#[derive(Default)] -pub struct AccountsDbPluginPostgres { - client: Option, - accounts_selector: Option, - transaction_selector: Option, -} - -impl std::fmt::Debug for AccountsDbPluginPostgres { - fn fmt(&self, _: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - Ok(()) - } -} - -#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)] -pub struct AccountsDbPluginPostgresConfig { - pub host: Option, - pub user: Option, - pub port: Option, - pub connection_str: Option, - pub threads: Option, - pub batch_size: Option, - pub panic_on_db_errors: Option, - /// Indicates if to store historical data for accounts - pub store_account_historical_data: Option, -} - -#[derive(Error, Debug)] -pub enum AccountsDbPluginPostgresError { - #[error("Error connecting to the backend data store. Error message: ({msg})")] - DataStoreConnectionError { msg: String }, - - #[error("Error preparing data store schema. Error message: ({msg})")] - DataSchemaError { msg: String }, - - #[error("Error preparing data store schema. Error message: ({msg})")] - ConfigurationError { msg: String }, -} - -impl AccountsDbPlugin for AccountsDbPluginPostgres { - fn name(&self) -> &'static str { - "AccountsDbPluginPostgres" - } - - /// Do initialization for the PostgreSQL plugin. - /// - /// # Format of the config file: - /// * The `accounts_selector` section allows the user to controls accounts selections. - /// "accounts_selector" : { - /// "accounts" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\], - /// } - /// or: - /// "accounts_selector" = { - /// "owners" : \["pubkey-1", "pubkey-2", ..., "pubkey-m"\] - /// } - /// Accounts either satisyfing the accounts condition or owners condition will be selected. - /// When only owners is specified, - /// all accounts belonging to the owners will be streamed. - /// The accounts field supports wildcard to select all accounts: - /// "accounts_selector" : { - /// "accounts" : \["*"\], - /// } - /// * "host", optional, specifies the PostgreSQL server. - /// * "user", optional, specifies the PostgreSQL user. - /// * "port", optional, specifies the PostgreSQL server's port. - /// * "connection_str", optional, the custom PostgreSQL connection string. - /// Please refer to https://docs.rs/postgres/0.19.2/postgres/config/struct.Config.html for the connection configuration. - /// When `connection_str` is set, the values in "host", "user" and "port" are ignored. If `connection_str` is not given, - /// `host` and `user` must be given. - /// "store_account_historical_data", optional, set it to 'true', to store historical account data to account_audit - /// table. - /// * "threads" optional, specifies the number of worker threads for the plugin. A thread - /// maintains a PostgreSQL connection to the server. The default is '10'. - /// * "batch_size" optional, specifies the batch size of bulk insert when the AccountsDb is created - /// from restoring a snapshot. The default is '10'. - /// * "panic_on_db_errors", optional, contols if to panic when there are errors replicating data to the - /// PostgreSQL database. The default is 'false'. - /// * "transaction_selector", optional, controls if and what transaction to store. If this field is missing - /// None of the transction is stored. - /// "transaction_selector" : { - /// "mentions" : \["pubkey-1", "pubkey-2", ..., "pubkey-n"\], - /// } - /// The `mentions` field support wildcard to select all transaction or all 'vote' transactions: - /// For example, to select all transactions: - /// "transaction_selector" : { - /// "mentions" : \["*"\], - /// } - /// To select all vote transactions: - /// "transaction_selector" : { - /// "mentions" : \["all_votes"\], - /// } - /// # Examples - /// - /// { - /// "libpath": "/home/solana/target/release/libsolana_accountsdb_plugin_postgres.so", - /// "host": "host_foo", - /// "user": "solana", - /// "threads": 10, - /// "accounts_selector" : { - /// "owners" : ["9oT9R5ZyRovSVnt37QvVoBttGpNqR3J7unkb567NP8k3"] - /// } - /// } - - fn on_load(&mut self, config_file: &str) -> Result<()> { - solana_logger::setup_with_default("info"); - info!( - "Loading plugin {:?} from config_file {:?}", - self.name(), - config_file - ); - let mut file = File::open(config_file)?; - let mut contents = String::new(); - file.read_to_string(&mut contents)?; - - let result: serde_json::Value = serde_json::from_str(&contents).unwrap(); - self.accounts_selector = Some(Self::create_accounts_selector_from_config(&result)); - self.transaction_selector = Some(Self::create_transaction_selector_from_config(&result)); - - let result: serde_json::Result = - serde_json::from_str(&contents); - match result { - Err(err) => { - return Err(AccountsDbPluginError::ConfigFileReadError { - msg: format!( - "The config file is not in the JSON format expected: {:?}", - err - ), - }) - } - Ok(config) => { - let client = PostgresClientBuilder::build_pararallel_postgres_client(&config)?; - self.client = Some(client); - } - } - - Ok(()) - } - - fn on_unload(&mut self) { - info!("Unloading plugin: {:?}", self.name()); - - match &mut self.client { - None => {} - Some(client) => { - client.join().unwrap(); - } - } - } - - fn update_account( - &mut self, - account: ReplicaAccountInfoVersions, - slot: u64, - is_startup: bool, - ) -> Result<()> { - let mut measure_all = Measure::start("accountsdb-plugin-postgres-update-account-main"); - match account { - ReplicaAccountInfoVersions::V0_0_1(account) => { - let mut measure_select = - Measure::start("accountsdb-plugin-postgres-update-account-select"); - if let Some(accounts_selector) = &self.accounts_selector { - if !accounts_selector.is_account_selected(account.pubkey, account.owner) { - return Ok(()); - } - } else { - return Ok(()); - } - measure_select.stop(); - inc_new_counter_debug!( - "accountsdb-plugin-postgres-update-account-select-us", - measure_select.as_us() as usize, - 100000, - 100000 - ); - - debug!( - "Updating account {:?} with owner {:?} at slot {:?} using account selector {:?}", - bs58::encode(account.pubkey).into_string(), - bs58::encode(account.owner).into_string(), - slot, - self.accounts_selector.as_ref().unwrap() - ); - - match &mut self.client { - None => { - return Err(AccountsDbPluginError::Custom(Box::new( - AccountsDbPluginPostgresError::DataStoreConnectionError { - msg: "There is no connection to the PostgreSQL database." - .to_string(), - }, - ))); - } - Some(client) => { - let mut measure_update = - Measure::start("accountsdb-plugin-postgres-update-account-client"); - let result = { client.update_account(account, slot, is_startup) }; - measure_update.stop(); - - inc_new_counter_debug!( - "accountsdb-plugin-postgres-update-account-client-us", - measure_update.as_us() as usize, - 100000, - 100000 - ); - - if let Err(err) = result { - return Err(AccountsDbPluginError::AccountsUpdateError { - msg: format!("Failed to persist the update of account to the PostgreSQL database. Error: {:?}", err) - }); - } - } - } - } - } - - measure_all.stop(); - - inc_new_counter_debug!( - "accountsdb-plugin-postgres-update-account-main-us", - measure_all.as_us() as usize, - 100000, - 100000 - ); - - Ok(()) - } - - fn update_slot_status( - &mut self, - slot: u64, - parent: Option, - status: SlotStatus, - ) -> Result<()> { - info!("Updating slot {:?} at with status {:?}", slot, status); - - match &mut self.client { - None => { - return Err(AccountsDbPluginError::Custom(Box::new( - AccountsDbPluginPostgresError::DataStoreConnectionError { - msg: "There is no connection to the PostgreSQL database.".to_string(), - }, - ))); - } - Some(client) => { - let result = client.update_slot_status(slot, parent, status); - - if let Err(err) = result { - return Err(AccountsDbPluginError::SlotStatusUpdateError{ - msg: format!("Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", err) - }); - } - } - } - - Ok(()) - } - - fn notify_end_of_startup(&mut self) -> Result<()> { - info!("Notifying the end of startup for accounts notifications"); - match &mut self.client { - None => { - return Err(AccountsDbPluginError::Custom(Box::new( - AccountsDbPluginPostgresError::DataStoreConnectionError { - msg: "There is no connection to the PostgreSQL database.".to_string(), - }, - ))); - } - Some(client) => { - let result = client.notify_end_of_startup(); - - if let Err(err) = result { - return Err(AccountsDbPluginError::SlotStatusUpdateError{ - msg: format!("Failed to notify the end of startup for accounts notifications. Error: {:?}", err) - }); - } - } - } - Ok(()) - } - - fn notify_transaction( - &mut self, - transaction_info: ReplicaTransactionInfoVersions, - slot: u64, - ) -> Result<()> { - match &mut self.client { - None => { - return Err(AccountsDbPluginError::Custom(Box::new( - AccountsDbPluginPostgresError::DataStoreConnectionError { - msg: "There is no connection to the PostgreSQL database.".to_string(), - }, - ))); - } - Some(client) => match transaction_info { - ReplicaTransactionInfoVersions::V0_0_1(transaction_info) => { - if let Some(transaction_selector) = &self.transaction_selector { - if !transaction_selector.is_transaction_selected( - transaction_info.is_vote, - transaction_info.transaction.message().account_keys_iter(), - ) { - return Ok(()); - } - } else { - return Ok(()); - } - - let result = client.log_transaction_info(transaction_info, slot); - - if let Err(err) = result { - return Err(AccountsDbPluginError::SlotStatusUpdateError{ - msg: format!("Failed to persist the transaction info to the PostgreSQL database. Error: {:?}", err) - }); - } - } - }, - } - - Ok(()) - } - - fn notify_block_metadata(&mut self, block_info: ReplicaBlockInfoVersions) -> Result<()> { - match &mut self.client { - None => { - return Err(AccountsDbPluginError::Custom(Box::new( - AccountsDbPluginPostgresError::DataStoreConnectionError { - msg: "There is no connection to the PostgreSQL database.".to_string(), - }, - ))); - } - Some(client) => match block_info { - ReplicaBlockInfoVersions::V0_0_1(block_info) => { - let result = client.update_block_metadata(block_info); - - if let Err(err) = result { - return Err(AccountsDbPluginError::SlotStatusUpdateError{ - msg: format!("Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}", err) - }); - } - } - }, - } - - Ok(()) - } - - /// Check if the plugin is interested in account data - /// Default is true -- if the plugin is not interested in - /// account data, please return false. - fn account_data_notifications_enabled(&self) -> bool { - self.accounts_selector - .as_ref() - .map_or_else(|| false, |selector| selector.is_enabled()) - } - - /// Check if the plugin is interested in transaction data - fn transaction_notifications_enabled(&self) -> bool { - self.transaction_selector - .as_ref() - .map_or_else(|| false, |selector| selector.is_enabled()) - } -} - -impl AccountsDbPluginPostgres { - fn create_accounts_selector_from_config(config: &serde_json::Value) -> AccountsSelector { - let accounts_selector = &config["accounts_selector"]; - - if accounts_selector.is_null() { - AccountsSelector::default() - } else { - let accounts = &accounts_selector["accounts"]; - let accounts: Vec = if accounts.is_array() { - accounts - .as_array() - .unwrap() - .iter() - .map(|val| val.as_str().unwrap().to_string()) - .collect() - } else { - Vec::default() - }; - let owners = &accounts_selector["owners"]; - let owners: Vec = if owners.is_array() { - owners - .as_array() - .unwrap() - .iter() - .map(|val| val.as_str().unwrap().to_string()) - .collect() - } else { - Vec::default() - }; - AccountsSelector::new(&accounts, &owners) - } - } - - fn create_transaction_selector_from_config(config: &serde_json::Value) -> TransactionSelector { - let transaction_selector = &config["transaction_selector"]; - - if transaction_selector.is_null() { - TransactionSelector::default() - } else { - let accounts = &transaction_selector["mentions"]; - let accounts: Vec = if accounts.is_array() { - accounts - .as_array() - .unwrap() - .iter() - .map(|val| val.as_str().unwrap().to_string()) - .collect() - } else { - Vec::default() - }; - TransactionSelector::new(&accounts) - } - } - - pub fn new() -> Self { - Self::default() - } -} - -#[no_mangle] -#[allow(improper_ctypes_definitions)] -/// # Safety -/// -/// This function returns the AccountsDbPluginPostgres pointer as trait AccountsDbPlugin. -pub unsafe extern "C" fn _create_plugin() -> *mut dyn AccountsDbPlugin { - let plugin = AccountsDbPluginPostgres::new(); - let plugin: Box = Box::new(plugin); - Box::into_raw(plugin) -} - -#[cfg(test)] -pub(crate) mod tests { - use {super::*, serde_json}; - - #[test] - fn test_accounts_selector_from_config() { - let config = "{\"accounts_selector\" : { \ - \"owners\" : [\"9xQeWvG816bUx9EPjHmaT23yvVM2ZWbrrpZb9PusVFin\"] \ - }}"; - - let config: serde_json::Value = serde_json::from_str(config).unwrap(); - AccountsDbPluginPostgres::create_accounts_selector_from_config(&config); - } -} diff --git a/accountsdb-plugin-postgres/src/lib.rs b/accountsdb-plugin-postgres/src/lib.rs deleted file mode 100644 index a2d78c1eefcad2..00000000000000 --- a/accountsdb-plugin-postgres/src/lib.rs +++ /dev/null @@ -1,4 +0,0 @@ -pub mod accounts_selector; -pub mod accountsdb_plugin_postgres; -pub mod postgres_client; -pub mod transaction_selector; diff --git a/accountsdb-plugin-postgres/src/postgres_client.rs b/accountsdb-plugin-postgres/src/postgres_client.rs deleted file mode 100644 index bd6ae5098f0e17..00000000000000 --- a/accountsdb-plugin-postgres/src/postgres_client.rs +++ /dev/null @@ -1,1036 +0,0 @@ -#![allow(clippy::integer_arithmetic)] - -mod postgres_client_block_metadata; -mod postgres_client_transaction; - -/// A concurrent implementation for writing accounts into the PostgreSQL in parallel. -use { - crate::accountsdb_plugin_postgres::{ - AccountsDbPluginPostgresConfig, AccountsDbPluginPostgresError, - }, - chrono::Utc, - crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender}, - log::*, - postgres::{Client, NoTls, Statement}, - postgres_client_block_metadata::DbBlockInfo, - postgres_client_transaction::LogTransactionRequest, - solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ - AccountsDbPluginError, ReplicaAccountInfo, ReplicaBlockInfo, SlotStatus, - }, - solana_measure::measure::Measure, - solana_metrics::*, - solana_sdk::timing::AtomicInterval, - std::{ - sync::{ - atomic::{AtomicBool, AtomicUsize, Ordering}, - Arc, Mutex, - }, - thread::{self, sleep, Builder, JoinHandle}, - time::Duration, - }, - tokio_postgres::types, -}; - -/// The maximum asynchronous requests allowed in the channel to avoid excessive -/// memory usage. The downside -- calls after this threshold is reached can get blocked. -const MAX_ASYNC_REQUESTS: usize = 40960; -const DEFAULT_POSTGRES_PORT: u16 = 5432; -const DEFAULT_THREADS_COUNT: usize = 100; -const DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE: usize = 10; -const ACCOUNT_COLUMN_COUNT: usize = 9; -const DEFAULT_PANIC_ON_DB_ERROR: bool = false; -const DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA: bool = false; - -struct PostgresSqlClientWrapper { - client: Client, - update_account_stmt: Statement, - bulk_account_insert_stmt: Statement, - update_slot_with_parent_stmt: Statement, - update_slot_without_parent_stmt: Statement, - update_transaction_log_stmt: Statement, - update_block_metadata_stmt: Statement, - insert_account_audit_stmt: Option, -} - -pub struct SimplePostgresClient { - batch_size: usize, - pending_account_updates: Vec, - client: Mutex, -} - -struct PostgresClientWorker { - client: SimplePostgresClient, - /// Indicating if accounts notification during startup is done. - is_startup_done: bool, -} - -impl Eq for DbAccountInfo {} - -#[derive(Clone, PartialEq, Debug)] -pub struct DbAccountInfo { - pub pubkey: Vec, - pub lamports: i64, - pub owner: Vec, - pub executable: bool, - pub rent_epoch: i64, - pub data: Vec, - pub slot: i64, - pub write_version: i64, -} - -pub(crate) fn abort() -> ! { - #[cfg(not(test))] - { - // standard error is usually redirected to a log file, cry for help on standard output as - // well - eprintln!("Validator process aborted. The validator log may contain further details"); - std::process::exit(1); - } - - #[cfg(test)] - panic!("process::exit(1) is intercepted for friendly test failure..."); -} - -impl DbAccountInfo { - fn new(account: &T, slot: u64) -> DbAccountInfo { - let data = account.data().to_vec(); - Self { - pubkey: account.pubkey().to_vec(), - lamports: account.lamports() as i64, - owner: account.owner().to_vec(), - executable: account.executable(), - rent_epoch: account.rent_epoch() as i64, - data, - slot: slot as i64, - write_version: account.write_version(), - } - } -} - -pub trait ReadableAccountInfo: Sized { - fn pubkey(&self) -> &[u8]; - fn owner(&self) -> &[u8]; - fn lamports(&self) -> i64; - fn executable(&self) -> bool; - fn rent_epoch(&self) -> i64; - fn data(&self) -> &[u8]; - fn write_version(&self) -> i64; -} - -impl ReadableAccountInfo for DbAccountInfo { - fn pubkey(&self) -> &[u8] { - &self.pubkey - } - - fn owner(&self) -> &[u8] { - &self.owner - } - - fn lamports(&self) -> i64 { - self.lamports - } - - fn executable(&self) -> bool { - self.executable - } - - fn rent_epoch(&self) -> i64 { - self.rent_epoch - } - - fn data(&self) -> &[u8] { - &self.data - } - - fn write_version(&self) -> i64 { - self.write_version - } -} - -impl<'a> ReadableAccountInfo for ReplicaAccountInfo<'a> { - fn pubkey(&self) -> &[u8] { - self.pubkey - } - - fn owner(&self) -> &[u8] { - self.owner - } - - fn lamports(&self) -> i64 { - self.lamports as i64 - } - - fn executable(&self) -> bool { - self.executable - } - - fn rent_epoch(&self) -> i64 { - self.rent_epoch as i64 - } - - fn data(&self) -> &[u8] { - self.data - } - - fn write_version(&self) -> i64 { - self.write_version as i64 - } -} - -pub trait PostgresClient { - fn join(&mut self) -> thread::Result<()> { - Ok(()) - } - - fn update_account( - &mut self, - account: DbAccountInfo, - is_startup: bool, - ) -> Result<(), AccountsDbPluginError>; - - fn update_slot_status( - &mut self, - slot: u64, - parent: Option, - status: SlotStatus, - ) -> Result<(), AccountsDbPluginError>; - - fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError>; - - fn log_transaction( - &mut self, - transaction_log_info: LogTransactionRequest, - ) -> Result<(), AccountsDbPluginError>; - - fn update_block_metadata( - &mut self, - block_info: UpdateBlockMetadataRequest, - ) -> Result<(), AccountsDbPluginError>; -} - -impl SimplePostgresClient { - fn connect_to_db( - config: &AccountsDbPluginPostgresConfig, - ) -> Result { - let port = config.port.unwrap_or(DEFAULT_POSTGRES_PORT); - - let connection_str = if let Some(connection_str) = &config.connection_str { - connection_str.clone() - } else { - if config.host.is_none() || config.user.is_none() { - let msg = format!( - "\"connection_str\": {:?}, or \"host\": {:?} \"user\": {:?} must be specified", - config.connection_str, config.host, config.user - ); - return Err(AccountsDbPluginError::Custom(Box::new( - AccountsDbPluginPostgresError::ConfigurationError { msg }, - ))); - } - format!( - "host={} user={} port={}", - config.host.as_ref().unwrap(), - config.user.as_ref().unwrap(), - port - ) - }; - - match Client::connect(&connection_str, NoTls) { - Err(err) => { - let msg = format!( - "Error in connecting to the PostgreSQL database: {:?} connection_str: {:?}", - err, connection_str - ); - error!("{}", msg); - Err(AccountsDbPluginError::Custom(Box::new( - AccountsDbPluginPostgresError::DataStoreConnectionError { msg }, - ))) - } - Ok(client) => Ok(client), - } - } - - fn build_bulk_account_insert_statement( - client: &mut Client, - config: &AccountsDbPluginPostgresConfig, - ) -> Result { - let batch_size = config - .batch_size - .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE); - let mut stmt = String::from("INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) VALUES"); - for j in 0..batch_size { - let row = j * ACCOUNT_COLUMN_COUNT; - let val_str = format!( - "(${}, ${}, ${}, ${}, ${}, ${}, ${}, ${}, ${})", - row + 1, - row + 2, - row + 3, - row + 4, - row + 5, - row + 6, - row + 7, - row + 8, - row + 9, - ); - - if j == 0 { - stmt = format!("{} {}", &stmt, val_str); - } else { - stmt = format!("{}, {}", &stmt, val_str); - } - } - - let handle_conflict = "ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \ - data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\ - acct.slot = excluded.slot AND acct.write_version < excluded.write_version)"; - - stmt = format!("{} {}", stmt, handle_conflict); - - info!("{}", stmt); - let bulk_stmt = client.prepare(&stmt); - - match bulk_stmt { - Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { - msg: format!( - "Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, config - ), - }))); - } - Ok(update_account_stmt) => Ok(update_account_stmt), - } - } - - fn build_single_account_upsert_statement( - client: &mut Client, - config: &AccountsDbPluginPostgresConfig, - ) -> Result { - let stmt = "INSERT INTO account AS acct (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) \ - ON CONFLICT (pubkey) DO UPDATE SET slot=excluded.slot, owner=excluded.owner, lamports=excluded.lamports, executable=excluded.executable, rent_epoch=excluded.rent_epoch, \ - data=excluded.data, write_version=excluded.write_version, updated_on=excluded.updated_on WHERE acct.slot < excluded.slot OR (\ - acct.slot = excluded.slot AND acct.write_version < excluded.write_version)"; - - let stmt = client.prepare(stmt); - - match stmt { - Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { - msg: format!( - "Error in preparing for the accounts update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, config - ), - }))); - } - Ok(update_account_stmt) => Ok(update_account_stmt), - } - } - - fn build_account_audit_insert_statement( - client: &mut Client, - config: &AccountsDbPluginPostgresConfig, - ) -> Result { - let stmt = "INSERT INTO account_audit (pubkey, slot, owner, lamports, executable, rent_epoch, data, write_version, updated_on) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)"; - - let stmt = client.prepare(stmt); - - match stmt { - Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { - msg: format!( - "Error in preparing for the account_audit update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, config - ), - }))); - } - Ok(stmt) => Ok(stmt), - } - } - - fn build_slot_upsert_statement_with_parent( - client: &mut Client, - config: &AccountsDbPluginPostgresConfig, - ) -> Result { - let stmt = "INSERT INTO slot (slot, parent, status, updated_on) \ - VALUES ($1, $2, $3, $4) \ - ON CONFLICT (slot) DO UPDATE SET parent=excluded.parent, status=excluded.status, updated_on=excluded.updated_on"; - - let stmt = client.prepare(stmt); - - match stmt { - Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { - msg: format!( - "Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, config - ), - }))); - } - Ok(stmt) => Ok(stmt), - } - } - - fn build_slot_upsert_statement_without_parent( - client: &mut Client, - config: &AccountsDbPluginPostgresConfig, - ) -> Result { - let stmt = "INSERT INTO slot (slot, status, updated_on) \ - VALUES ($1, $2, $3) \ - ON CONFLICT (slot) DO UPDATE SET status=excluded.status, updated_on=excluded.updated_on"; - - let stmt = client.prepare(stmt); - - match stmt { - Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { - msg: format!( - "Error in preparing for the slot update PostgreSQL database: {} host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, config - ), - }))); - } - Ok(stmt) => Ok(stmt), - } - } - - /// Internal function for inserting an account into account_audit table. - fn insert_account_audit( - account: &DbAccountInfo, - statement: &Statement, - client: &mut Client, - ) -> Result<(), AccountsDbPluginError> { - let lamports = account.lamports() as i64; - let rent_epoch = account.rent_epoch() as i64; - let updated_on = Utc::now().naive_utc(); - let result = client.execute( - statement, - &[ - &account.pubkey(), - &account.slot, - &account.owner(), - &lamports, - &account.executable(), - &rent_epoch, - &account.data(), - &account.write_version(), - &updated_on, - ], - ); - - if let Err(err) = result { - let msg = format!( - "Failed to persist the insert of account_audit to the PostgreSQL database. Error: {:?}", - err - ); - error!("{}", msg); - return Err(AccountsDbPluginError::AccountsUpdateError { msg }); - } - Ok(()) - } - - /// Internal function for updating or inserting a single account - fn upsert_account_internal( - account: &DbAccountInfo, - statement: &Statement, - client: &mut Client, - insert_account_audit_stmt: &Option, - ) -> Result<(), AccountsDbPluginError> { - let lamports = account.lamports() as i64; - let rent_epoch = account.rent_epoch() as i64; - let updated_on = Utc::now().naive_utc(); - let result = client.execute( - statement, - &[ - &account.pubkey(), - &account.slot, - &account.owner(), - &lamports, - &account.executable(), - &rent_epoch, - &account.data(), - &account.write_version(), - &updated_on, - ], - ); - - if let Err(err) = result { - let msg = format!( - "Failed to persist the update of account to the PostgreSQL database. Error: {:?}", - err - ); - error!("{}", msg); - return Err(AccountsDbPluginError::AccountsUpdateError { msg }); - } else if result.unwrap() == 0 && insert_account_audit_stmt.is_some() { - // If no records modified (inserted or updated), it is because the account is updated - // at an older slot, insert the record directly into the account_audit table. - let statement = insert_account_audit_stmt.as_ref().unwrap(); - Self::insert_account_audit(account, statement, client)?; - } - - Ok(()) - } - - /// Update or insert a single account - fn upsert_account(&mut self, account: &DbAccountInfo) -> Result<(), AccountsDbPluginError> { - let client = self.client.get_mut().unwrap(); - let insert_account_audit_stmt = &client.insert_account_audit_stmt; - let statement = &client.update_account_stmt; - let client = &mut client.client; - Self::upsert_account_internal(account, statement, client, insert_account_audit_stmt) - } - - /// Insert accounts in batch to reduce network overhead - fn insert_accounts_in_batch( - &mut self, - account: DbAccountInfo, - ) -> Result<(), AccountsDbPluginError> { - self.pending_account_updates.push(account); - - if self.pending_account_updates.len() == self.batch_size { - let mut measure = Measure::start("accountsdb-plugin-postgres-prepare-values"); - - let mut values: Vec<&(dyn types::ToSql + Sync)> = - Vec::with_capacity(self.batch_size * ACCOUNT_COLUMN_COUNT); - let updated_on = Utc::now().naive_utc(); - for j in 0..self.batch_size { - let account = &self.pending_account_updates[j]; - - values.push(&account.pubkey); - values.push(&account.slot); - values.push(&account.owner); - values.push(&account.lamports); - values.push(&account.executable); - values.push(&account.rent_epoch); - values.push(&account.data); - values.push(&account.write_version); - values.push(&updated_on); - } - measure.stop(); - inc_new_counter_debug!( - "accountsdb-plugin-postgres-prepare-values-us", - measure.as_us() as usize, - 10000, - 10000 - ); - - let mut measure = Measure::start("accountsdb-plugin-postgres-update-account"); - let client = self.client.get_mut().unwrap(); - let result = client - .client - .query(&client.bulk_account_insert_stmt, &values); - - self.pending_account_updates.clear(); - if let Err(err) = result { - let msg = format!( - "Failed to persist the update of account to the PostgreSQL database. Error: {:?}", - err - ); - error!("{}", msg); - return Err(AccountsDbPluginError::AccountsUpdateError { msg }); - } - measure.stop(); - inc_new_counter_debug!( - "accountsdb-plugin-postgres-update-account-us", - measure.as_us() as usize, - 10000, - 10000 - ); - inc_new_counter_debug!( - "accountsdb-plugin-postgres-update-account-count", - self.batch_size, - 10000, - 10000 - ); - } - Ok(()) - } - - /// Flush any left over accounts in batch which are not processed in the last batch - fn flush_buffered_writes(&mut self) -> Result<(), AccountsDbPluginError> { - if self.pending_account_updates.is_empty() { - return Ok(()); - } - - let client = self.client.get_mut().unwrap(); - let insert_account_audit_stmt = &client.insert_account_audit_stmt; - let statement = &client.update_account_stmt; - let client = &mut client.client; - - for account in self.pending_account_updates.drain(..) { - Self::upsert_account_internal(&account, statement, client, insert_account_audit_stmt)?; - } - - Ok(()) - } - - pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result { - info!("Creating SimplePostgresClient..."); - let mut client = Self::connect_to_db(config)?; - let bulk_account_insert_stmt = - Self::build_bulk_account_insert_statement(&mut client, config)?; - let update_account_stmt = Self::build_single_account_upsert_statement(&mut client, config)?; - - let update_slot_with_parent_stmt = - Self::build_slot_upsert_statement_with_parent(&mut client, config)?; - let update_slot_without_parent_stmt = - Self::build_slot_upsert_statement_without_parent(&mut client, config)?; - let update_transaction_log_stmt = - Self::build_transaction_info_upsert_statement(&mut client, config)?; - let update_block_metadata_stmt = - Self::build_block_metadata_upsert_statement(&mut client, config)?; - - let batch_size = config - .batch_size - .unwrap_or(DEFAULT_ACCOUNTS_INSERT_BATCH_SIZE); - - let store_account_historical_data = config - .store_account_historical_data - .unwrap_or(DEFAULT_STORE_ACCOUNT_HISTORICAL_DATA); - - let insert_account_audit_stmt = if store_account_historical_data { - let stmt = Self::build_account_audit_insert_statement(&mut client, config)?; - Some(stmt) - } else { - None - }; - - info!("Created SimplePostgresClient."); - Ok(Self { - batch_size, - pending_account_updates: Vec::with_capacity(batch_size), - client: Mutex::new(PostgresSqlClientWrapper { - client, - update_account_stmt, - bulk_account_insert_stmt, - update_slot_with_parent_stmt, - update_slot_without_parent_stmt, - update_transaction_log_stmt, - update_block_metadata_stmt, - insert_account_audit_stmt, - }), - }) - } -} - -impl PostgresClient for SimplePostgresClient { - fn update_account( - &mut self, - account: DbAccountInfo, - is_startup: bool, - ) -> Result<(), AccountsDbPluginError> { - trace!( - "Updating account {} with owner {} at slot {}", - bs58::encode(account.pubkey()).into_string(), - bs58::encode(account.owner()).into_string(), - account.slot, - ); - if !is_startup { - return self.upsert_account(&account); - } - self.insert_accounts_in_batch(account) - } - - fn update_slot_status( - &mut self, - slot: u64, - parent: Option, - status: SlotStatus, - ) -> Result<(), AccountsDbPluginError> { - info!("Updating slot {:?} at with status {:?}", slot, status); - - let slot = slot as i64; // postgres only supports i64 - let parent = parent.map(|parent| parent as i64); - let updated_on = Utc::now().naive_utc(); - let status_str = status.as_str(); - let client = self.client.get_mut().unwrap(); - - let result = match parent { - Some(parent) => client.client.execute( - &client.update_slot_with_parent_stmt, - &[&slot, &parent, &status_str, &updated_on], - ), - None => client.client.execute( - &client.update_slot_without_parent_stmt, - &[&slot, &status_str, &updated_on], - ), - }; - - match result { - Err(err) => { - let msg = format!( - "Failed to persist the update of slot to the PostgreSQL database. Error: {:?}", - err - ); - error!("{:?}", msg); - return Err(AccountsDbPluginError::SlotStatusUpdateError { msg }); - } - Ok(rows) => { - assert_eq!(1, rows, "Expected one rows to be updated a time"); - } - } - - Ok(()) - } - - fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> { - self.flush_buffered_writes() - } - - fn log_transaction( - &mut self, - transaction_log_info: LogTransactionRequest, - ) -> Result<(), AccountsDbPluginError> { - self.log_transaction_impl(transaction_log_info) - } - - fn update_block_metadata( - &mut self, - block_info: UpdateBlockMetadataRequest, - ) -> Result<(), AccountsDbPluginError> { - self.update_block_metadata_impl(block_info) - } -} - -struct UpdateAccountRequest { - account: DbAccountInfo, - is_startup: bool, -} - -struct UpdateSlotRequest { - slot: u64, - parent: Option, - slot_status: SlotStatus, -} - -pub struct UpdateBlockMetadataRequest { - pub block_info: DbBlockInfo, -} - -#[warn(clippy::large_enum_variant)] -enum DbWorkItem { - UpdateAccount(Box), - UpdateSlot(Box), - LogTransaction(Box), - UpdateBlockMetadata(Box), -} - -impl PostgresClientWorker { - fn new(config: AccountsDbPluginPostgresConfig) -> Result { - let result = SimplePostgresClient::new(&config); - match result { - Ok(client) => Ok(PostgresClientWorker { - client, - is_startup_done: false, - }), - Err(err) => { - error!("Error in creating SimplePostgresClient: {}", err); - Err(err) - } - } - } - - fn do_work( - &mut self, - receiver: Receiver, - exit_worker: Arc, - is_startup_done: Arc, - startup_done_count: Arc, - panic_on_db_errors: bool, - ) -> Result<(), AccountsDbPluginError> { - while !exit_worker.load(Ordering::Relaxed) { - let mut measure = Measure::start("accountsdb-plugin-postgres-worker-recv"); - let work = receiver.recv_timeout(Duration::from_millis(500)); - measure.stop(); - inc_new_counter_debug!( - "accountsdb-plugin-postgres-worker-recv-us", - measure.as_us() as usize, - 100000, - 100000 - ); - match work { - Ok(work) => match work { - DbWorkItem::UpdateAccount(request) => { - if let Err(err) = self - .client - .update_account(request.account, request.is_startup) - { - error!("Failed to update account: ({})", err); - if panic_on_db_errors { - abort(); - } - } - } - DbWorkItem::UpdateSlot(request) => { - if let Err(err) = self.client.update_slot_status( - request.slot, - request.parent, - request.slot_status, - ) { - error!("Failed to update slot: ({})", err); - if panic_on_db_errors { - abort(); - } - } - } - DbWorkItem::LogTransaction(transaction_log_info) => { - self.client.log_transaction(*transaction_log_info)?; - } - DbWorkItem::UpdateBlockMetadata(block_info) => { - if let Err(err) = self.client.update_block_metadata(*block_info) { - error!("Failed to update block metadata: ({})", err); - if panic_on_db_errors { - abort(); - } - } - } - }, - Err(err) => match err { - RecvTimeoutError::Timeout => { - if !self.is_startup_done && is_startup_done.load(Ordering::Relaxed) { - if let Err(err) = self.client.notify_end_of_startup() { - error!("Error in notifying end of startup: ({})", err); - if panic_on_db_errors { - abort(); - } - } - self.is_startup_done = true; - startup_done_count.fetch_add(1, Ordering::Relaxed); - } - - continue; - } - _ => { - error!("Error in receiving the item {:?}", err); - if panic_on_db_errors { - abort(); - } - break; - } - }, - } - } - Ok(()) - } -} -pub struct ParallelPostgresClient { - workers: Vec>>, - exit_worker: Arc, - is_startup_done: Arc, - startup_done_count: Arc, - initialized_worker_count: Arc, - sender: Sender, - last_report: AtomicInterval, -} - -impl ParallelPostgresClient { - pub fn new(config: &AccountsDbPluginPostgresConfig) -> Result { - info!("Creating ParallelPostgresClient..."); - let (sender, receiver) = bounded(MAX_ASYNC_REQUESTS); - let exit_worker = Arc::new(AtomicBool::new(false)); - let mut workers = Vec::default(); - let is_startup_done = Arc::new(AtomicBool::new(false)); - let startup_done_count = Arc::new(AtomicUsize::new(0)); - let worker_count = config.threads.unwrap_or(DEFAULT_THREADS_COUNT); - let initialized_worker_count = Arc::new(AtomicUsize::new(0)); - for i in 0..worker_count { - let cloned_receiver = receiver.clone(); - let exit_clone = exit_worker.clone(); - let is_startup_done_clone = is_startup_done.clone(); - let startup_done_count_clone = startup_done_count.clone(); - let initialized_worker_count_clone = initialized_worker_count.clone(); - let config = config.clone(); - let worker = Builder::new() - .name(format!("worker-{}", i)) - .spawn(move || -> Result<(), AccountsDbPluginError> { - let panic_on_db_errors = *config - .panic_on_db_errors - .as_ref() - .unwrap_or(&DEFAULT_PANIC_ON_DB_ERROR); - let result = PostgresClientWorker::new(config); - - match result { - Ok(mut worker) => { - initialized_worker_count_clone.fetch_add(1, Ordering::Relaxed); - worker.do_work( - cloned_receiver, - exit_clone, - is_startup_done_clone, - startup_done_count_clone, - panic_on_db_errors, - )?; - Ok(()) - } - Err(err) => { - error!("Error when making connection to database: ({})", err); - if panic_on_db_errors { - abort(); - } - Err(err) - } - } - }) - .unwrap(); - - workers.push(worker); - } - - info!("Created ParallelPostgresClient."); - Ok(Self { - last_report: AtomicInterval::default(), - workers, - exit_worker, - is_startup_done, - startup_done_count, - initialized_worker_count, - sender, - }) - } - - pub fn join(&mut self) -> thread::Result<()> { - self.exit_worker.store(true, Ordering::Relaxed); - while !self.workers.is_empty() { - let worker = self.workers.pop(); - if worker.is_none() { - break; - } - let worker = worker.unwrap(); - let result = worker.join().unwrap(); - if result.is_err() { - error!("The worker thread has failed: {:?}", result); - } - } - - Ok(()) - } - - pub fn update_account( - &mut self, - account: &ReplicaAccountInfo, - slot: u64, - is_startup: bool, - ) -> Result<(), AccountsDbPluginError> { - if self.last_report.should_update(30000) { - datapoint_debug!( - "postgres-plugin-stats", - ("message-queue-length", self.sender.len() as i64, i64), - ); - } - let mut measure = Measure::start("accountsdb-plugin-posgres-create-work-item"); - let wrk_item = DbWorkItem::UpdateAccount(Box::new(UpdateAccountRequest { - account: DbAccountInfo::new(account, slot), - is_startup, - })); - - measure.stop(); - - inc_new_counter_debug!( - "accountsdb-plugin-posgres-create-work-item-us", - measure.as_us() as usize, - 100000, - 100000 - ); - - let mut measure = Measure::start("accountsdb-plugin-posgres-send-msg"); - - if let Err(err) = self.sender.send(wrk_item) { - return Err(AccountsDbPluginError::AccountsUpdateError { - msg: format!( - "Failed to update the account {:?}, error: {:?}", - bs58::encode(account.pubkey()).into_string(), - err - ), - }); - } - - measure.stop(); - inc_new_counter_debug!( - "accountsdb-plugin-posgres-send-msg-us", - measure.as_us() as usize, - 100000, - 100000 - ); - - Ok(()) - } - - pub fn update_slot_status( - &mut self, - slot: u64, - parent: Option, - status: SlotStatus, - ) -> Result<(), AccountsDbPluginError> { - if let Err(err) = self - .sender - .send(DbWorkItem::UpdateSlot(Box::new(UpdateSlotRequest { - slot, - parent, - slot_status: status, - }))) - { - return Err(AccountsDbPluginError::SlotStatusUpdateError { - msg: format!("Failed to update the slot {:?}, error: {:?}", slot, err), - }); - } - Ok(()) - } - - pub fn update_block_metadata( - &mut self, - block_info: &ReplicaBlockInfo, - ) -> Result<(), AccountsDbPluginError> { - if let Err(err) = self.sender.send(DbWorkItem::UpdateBlockMetadata(Box::new( - UpdateBlockMetadataRequest { - block_info: DbBlockInfo::from(block_info), - }, - ))) { - return Err(AccountsDbPluginError::SlotStatusUpdateError { - msg: format!( - "Failed to update the block metadata at slot {:?}, error: {:?}", - block_info.slot, err - ), - }); - } - Ok(()) - } - - pub fn notify_end_of_startup(&mut self) -> Result<(), AccountsDbPluginError> { - info!("Notifying the end of startup"); - // Ensure all items in the queue has been received by the workers - while !self.sender.is_empty() { - sleep(Duration::from_millis(100)); - } - self.is_startup_done.store(true, Ordering::Relaxed); - - // Wait for all worker threads to be done with flushing - while self.startup_done_count.load(Ordering::Relaxed) - != self.initialized_worker_count.load(Ordering::Relaxed) - { - info!( - "Startup done count: {}, good worker thread count: {}", - self.startup_done_count.load(Ordering::Relaxed), - self.initialized_worker_count.load(Ordering::Relaxed) - ); - sleep(Duration::from_millis(100)); - } - - info!("Done with notifying the end of startup"); - Ok(()) - } -} - -pub struct PostgresClientBuilder {} - -impl PostgresClientBuilder { - pub fn build_pararallel_postgres_client( - config: &AccountsDbPluginPostgresConfig, - ) -> Result { - ParallelPostgresClient::new(config) - } - - pub fn build_simple_postgres_client( - config: &AccountsDbPluginPostgresConfig, - ) -> Result { - SimplePostgresClient::new(config) - } -} diff --git a/accountsdb-plugin-postgres/src/postgres_client/postgres_client_block_metadata.rs b/accountsdb-plugin-postgres/src/postgres_client/postgres_client_block_metadata.rs deleted file mode 100644 index a882e6767c7d42..00000000000000 --- a/accountsdb-plugin-postgres/src/postgres_client/postgres_client_block_metadata.rs +++ /dev/null @@ -1,97 +0,0 @@ -use { - crate::{ - accountsdb_plugin_postgres::{ - AccountsDbPluginPostgresConfig, AccountsDbPluginPostgresError, - }, - postgres_client::{ - postgres_client_transaction::DbReward, SimplePostgresClient, UpdateBlockMetadataRequest, - }, - }, - chrono::Utc, - log::*, - postgres::{Client, Statement}, - solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ - AccountsDbPluginError, ReplicaBlockInfo, - }, -}; - -#[derive(Clone, Debug)] -pub struct DbBlockInfo { - pub slot: i64, - pub blockhash: String, - pub rewards: Vec, - pub block_time: Option, - pub block_height: Option, -} - -impl<'a> From<&ReplicaBlockInfo<'a>> for DbBlockInfo { - fn from(block_info: &ReplicaBlockInfo) -> Self { - Self { - slot: block_info.slot as i64, - blockhash: block_info.blockhash.to_string(), - rewards: block_info.rewards.iter().map(DbReward::from).collect(), - block_time: block_info.block_time, - block_height: block_info - .block_height - .map(|block_height| block_height as i64), - } - } -} - -impl SimplePostgresClient { - pub(crate) fn build_block_metadata_upsert_statement( - client: &mut Client, - config: &AccountsDbPluginPostgresConfig, - ) -> Result { - let stmt = - "INSERT INTO block (slot, blockhash, rewards, block_time, block_height, updated_on) \ - VALUES ($1, $2, $3, $4, $5, $6)"; - - let stmt = client.prepare(stmt); - - match stmt { - Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { - msg: format!( - "Error in preparing for the block metadata update PostgreSQL database: ({}) host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, config - ), - }))); - } - Ok(stmt) => Ok(stmt), - } - } - - pub(crate) fn update_block_metadata_impl( - &mut self, - block_info: UpdateBlockMetadataRequest, - ) -> Result<(), AccountsDbPluginError> { - let client = self.client.get_mut().unwrap(); - let statement = &client.update_block_metadata_stmt; - let client = &mut client.client; - let updated_on = Utc::now().naive_utc(); - - let block_info = block_info.block_info; - let result = client.query( - statement, - &[ - &block_info.slot, - &block_info.blockhash, - &block_info.rewards, - &block_info.block_time, - &block_info.block_height, - &updated_on, - ], - ); - - if let Err(err) = result { - let msg = format!( - "Failed to persist the update of block metadata to the PostgreSQL database. Error: {:?}", - err); - error!("{}", msg); - return Err(AccountsDbPluginError::AccountsUpdateError { msg }); - } - - Ok(()) - } -} diff --git a/accountsdb-plugin-postgres/src/postgres_client/postgres_client_transaction.rs b/accountsdb-plugin-postgres/src/postgres_client/postgres_client_transaction.rs deleted file mode 100644 index d20c189b53a913..00000000000000 --- a/accountsdb-plugin-postgres/src/postgres_client/postgres_client_transaction.rs +++ /dev/null @@ -1,1374 +0,0 @@ -/// Module responsible for handling persisting transaction data to the PostgreSQL -/// database. -use { - crate::{ - accountsdb_plugin_postgres::{ - AccountsDbPluginPostgresConfig, AccountsDbPluginPostgresError, - }, - postgres_client::{DbWorkItem, ParallelPostgresClient, SimplePostgresClient}, - }, - chrono::Utc, - log::*, - postgres::{Client, Statement}, - postgres_types::ToSql, - solana_accountsdb_plugin_interface::accountsdb_plugin_interface::{ - AccountsDbPluginError, ReplicaTransactionInfo, - }, - solana_runtime::bank::RewardType, - solana_sdk::{ - instruction::CompiledInstruction, - message::{ - v0::{self, LoadedAddresses, MessageAddressTableLookup}, - Message, MessageHeader, SanitizedMessage, - }, - transaction::TransactionError, - }, - solana_transaction_status::{ - InnerInstructions, Reward, TransactionStatusMeta, TransactionTokenBalance, - }, -}; - -const MAX_TRANSACTION_STATUS_LEN: usize = 256; - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "CompiledInstruction")] -pub struct DbCompiledInstruction { - pub program_id_index: i16, - pub accounts: Vec, - pub data: Vec, -} - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "InnerInstructions")] -pub struct DbInnerInstructions { - pub index: i16, - pub instructions: Vec, -} - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "TransactionTokenBalance")] -pub struct DbTransactionTokenBalance { - pub account_index: i16, - pub mint: String, - pub ui_token_amount: Option, - pub owner: String, -} - -#[derive(Clone, Debug, ToSql, PartialEq)] -#[postgres(name = "RewardType")] -pub enum DbRewardType { - Fee, - Rent, - Staking, - Voting, -} - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "Reward")] -pub struct DbReward { - pub pubkey: String, - pub lamports: i64, - pub post_balance: i64, - pub reward_type: Option, - pub commission: Option, -} - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "TransactionStatusMeta")] -pub struct DbTransactionStatusMeta { - pub error: Option, - pub fee: i64, - pub pre_balances: Vec, - pub post_balances: Vec, - pub inner_instructions: Option>, - pub log_messages: Option>, - pub pre_token_balances: Option>, - pub post_token_balances: Option>, - pub rewards: Option>, -} - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "TransactionMessageHeader")] -pub struct DbTransactionMessageHeader { - pub num_required_signatures: i16, - pub num_readonly_signed_accounts: i16, - pub num_readonly_unsigned_accounts: i16, -} - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "TransactionMessage")] -pub struct DbTransactionMessage { - pub header: DbTransactionMessageHeader, - pub account_keys: Vec>, - pub recent_blockhash: Vec, - pub instructions: Vec, -} - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "TransactionMessageAddressTableLookup")] -pub struct DbTransactionMessageAddressTableLookup { - pub account_key: Vec, - pub writable_indexes: Vec, - pub readonly_indexes: Vec, -} - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "TransactionMessageV0")] -pub struct DbTransactionMessageV0 { - pub header: DbTransactionMessageHeader, - pub account_keys: Vec>, - pub recent_blockhash: Vec, - pub instructions: Vec, - pub address_table_lookups: Vec, -} - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "LoadedAddresses")] -pub struct DbLoadedAddresses { - pub writable: Vec>, - pub readonly: Vec>, -} - -#[derive(Clone, Debug, ToSql)] -#[postgres(name = "LoadedMessageV0")] -pub struct DbLoadedMessageV0 { - pub message: DbTransactionMessageV0, - pub loaded_addresses: DbLoadedAddresses, -} - -pub struct DbTransaction { - pub signature: Vec, - pub is_vote: bool, - pub slot: i64, - pub message_type: i16, - pub legacy_message: Option, - pub v0_loaded_message: Option, - pub message_hash: Vec, - pub meta: DbTransactionStatusMeta, - pub signatures: Vec>, -} - -pub struct LogTransactionRequest { - pub transaction_info: DbTransaction, -} - -impl From<&MessageAddressTableLookup> for DbTransactionMessageAddressTableLookup { - fn from(address_table_lookup: &MessageAddressTableLookup) -> Self { - Self { - account_key: address_table_lookup.account_key.as_ref().to_vec(), - writable_indexes: address_table_lookup - .writable_indexes - .iter() - .map(|idx| *idx as i16) - .collect(), - readonly_indexes: address_table_lookup - .readonly_indexes - .iter() - .map(|idx| *idx as i16) - .collect(), - } - } -} - -impl From<&LoadedAddresses> for DbLoadedAddresses { - fn from(loaded_addresses: &LoadedAddresses) -> Self { - Self { - writable: loaded_addresses - .writable - .iter() - .map(|pubkey| pubkey.as_ref().to_vec()) - .collect(), - readonly: loaded_addresses - .readonly - .iter() - .map(|pubkey| pubkey.as_ref().to_vec()) - .collect(), - } - } -} - -impl From<&MessageHeader> for DbTransactionMessageHeader { - fn from(header: &MessageHeader) -> Self { - Self { - num_required_signatures: header.num_required_signatures as i16, - num_readonly_signed_accounts: header.num_readonly_signed_accounts as i16, - num_readonly_unsigned_accounts: header.num_readonly_unsigned_accounts as i16, - } - } -} - -impl From<&CompiledInstruction> for DbCompiledInstruction { - fn from(instruction: &CompiledInstruction) -> Self { - Self { - program_id_index: instruction.program_id_index as i16, - accounts: instruction - .accounts - .iter() - .map(|account_idx| *account_idx as i16) - .collect(), - data: instruction.data.clone(), - } - } -} - -impl From<&Message> for DbTransactionMessage { - fn from(message: &Message) -> Self { - Self { - header: DbTransactionMessageHeader::from(&message.header), - account_keys: message - .account_keys - .iter() - .map(|key| key.as_ref().to_vec()) - .collect(), - recent_blockhash: message.recent_blockhash.as_ref().to_vec(), - instructions: message - .instructions - .iter() - .map(DbCompiledInstruction::from) - .collect(), - } - } -} - -impl From<&v0::Message> for DbTransactionMessageV0 { - fn from(message: &v0::Message) -> Self { - Self { - header: DbTransactionMessageHeader::from(&message.header), - account_keys: message - .account_keys - .iter() - .map(|key| key.as_ref().to_vec()) - .collect(), - recent_blockhash: message.recent_blockhash.as_ref().to_vec(), - instructions: message - .instructions - .iter() - .map(DbCompiledInstruction::from) - .collect(), - address_table_lookups: message - .address_table_lookups - .iter() - .map(DbTransactionMessageAddressTableLookup::from) - .collect(), - } - } -} - -impl From<&v0::LoadedMessage> for DbLoadedMessageV0 { - fn from(message: &v0::LoadedMessage) -> Self { - Self { - message: DbTransactionMessageV0::from(&message.message), - loaded_addresses: DbLoadedAddresses::from(&message.loaded_addresses), - } - } -} - -impl From<&InnerInstructions> for DbInnerInstructions { - fn from(instructions: &InnerInstructions) -> Self { - Self { - index: instructions.index as i16, - instructions: instructions - .instructions - .iter() - .map(DbCompiledInstruction::from) - .collect(), - } - } -} - -impl From<&RewardType> for DbRewardType { - fn from(reward_type: &RewardType) -> Self { - match reward_type { - RewardType::Fee => Self::Fee, - RewardType::Rent => Self::Rent, - RewardType::Staking => Self::Staking, - RewardType::Voting => Self::Voting, - } - } -} - -fn get_reward_type(reward: &Option) -> Option { - reward.as_ref().map(DbRewardType::from) -} - -impl From<&Reward> for DbReward { - fn from(reward: &Reward) -> Self { - Self { - pubkey: reward.pubkey.clone(), - lamports: reward.lamports as i64, - post_balance: reward.post_balance as i64, - reward_type: get_reward_type(&reward.reward_type), - commission: reward - .commission - .as_ref() - .map(|commission| *commission as i16), - } - } -} - -#[derive(Clone, Debug, ToSql, PartialEq)] -#[postgres(name = "TransactionErrorCode")] -pub enum DbTransactionErrorCode { - AccountInUse, - AccountLoadedTwice, - AccountNotFound, - ProgramAccountNotFound, - InsufficientFundsForFee, - InvalidAccountForFee, - AlreadyProcessed, - BlockhashNotFound, - InstructionError, - CallChainTooDeep, - MissingSignatureForFee, - InvalidAccountIndex, - SignatureFailure, - InvalidProgramForExecution, - SanitizeFailure, - ClusterMaintenance, - AccountBorrowOutstanding, - WouldExceedMaxAccountCostLimit, - WouldExceedMaxBlockCostLimit, - UnsupportedVersion, - InvalidWritableAccount, - WouldExceedMaxAccountDataCostLimit, - TooManyAccountLocks, - AddressLookupTableNotFound, - InvalidAddressLookupTableOwner, - InvalidAddressLookupTableData, - InvalidAddressLookupTableIndex, - InvalidRentPayingAccount, - WouldExceedMaxVoteCostLimit, -} - -impl From<&TransactionError> for DbTransactionErrorCode { - fn from(err: &TransactionError) -> Self { - match err { - TransactionError::AccountInUse => Self::AccountInUse, - TransactionError::AccountLoadedTwice => Self::AccountLoadedTwice, - TransactionError::AccountNotFound => Self::AccountNotFound, - TransactionError::ProgramAccountNotFound => Self::ProgramAccountNotFound, - TransactionError::InsufficientFundsForFee => Self::InsufficientFundsForFee, - TransactionError::InvalidAccountForFee => Self::InvalidAccountForFee, - TransactionError::AlreadyProcessed => Self::AlreadyProcessed, - TransactionError::BlockhashNotFound => Self::BlockhashNotFound, - TransactionError::InstructionError(_idx, _error) => Self::InstructionError, - TransactionError::CallChainTooDeep => Self::CallChainTooDeep, - TransactionError::MissingSignatureForFee => Self::MissingSignatureForFee, - TransactionError::InvalidAccountIndex => Self::InvalidAccountIndex, - TransactionError::SignatureFailure => Self::SignatureFailure, - TransactionError::InvalidProgramForExecution => Self::InvalidProgramForExecution, - TransactionError::SanitizeFailure => Self::SanitizeFailure, - TransactionError::ClusterMaintenance => Self::ClusterMaintenance, - TransactionError::AccountBorrowOutstanding => Self::AccountBorrowOutstanding, - TransactionError::WouldExceedMaxAccountCostLimit => { - Self::WouldExceedMaxAccountCostLimit - } - TransactionError::WouldExceedMaxBlockCostLimit => Self::WouldExceedMaxBlockCostLimit, - TransactionError::WouldExceedMaxVoteCostLimit => Self::WouldExceedMaxVoteCostLimit, - TransactionError::UnsupportedVersion => Self::UnsupportedVersion, - TransactionError::InvalidWritableAccount => Self::InvalidWritableAccount, - TransactionError::WouldExceedMaxAccountDataCostLimit => { - Self::WouldExceedMaxAccountDataCostLimit - } - TransactionError::TooManyAccountLocks => Self::TooManyAccountLocks, - TransactionError::AddressLookupTableNotFound => Self::AddressLookupTableNotFound, - TransactionError::InvalidAddressLookupTableOwner => { - Self::InvalidAddressLookupTableOwner - } - TransactionError::InvalidAddressLookupTableData => Self::InvalidAddressLookupTableData, - TransactionError::InvalidAddressLookupTableIndex => { - Self::InvalidAddressLookupTableIndex - } - TransactionError::InvalidRentPayingAccount => Self::InvalidRentPayingAccount, - } - } -} - -#[derive(Clone, Debug, ToSql, PartialEq)] -#[postgres(name = "TransactionError")] -pub struct DbTransactionError { - error_code: DbTransactionErrorCode, - error_detail: Option, -} - -fn get_transaction_error(result: &Result<(), TransactionError>) -> Option { - if result.is_ok() { - return None; - } - - let error = result.as_ref().err().unwrap(); - Some(DbTransactionError { - error_code: DbTransactionErrorCode::from(error), - error_detail: { - if let TransactionError::InstructionError(idx, instruction_error) = error { - let mut error_detail = format!( - "InstructionError: idx ({}), error: ({})", - idx, instruction_error - ); - if error_detail.len() > MAX_TRANSACTION_STATUS_LEN { - error_detail = error_detail - .to_string() - .split_off(MAX_TRANSACTION_STATUS_LEN); - } - Some(error_detail) - } else { - None - } - }, - }) -} - -impl From<&TransactionTokenBalance> for DbTransactionTokenBalance { - fn from(token_balance: &TransactionTokenBalance) -> Self { - Self { - account_index: token_balance.account_index as i16, - mint: token_balance.mint.clone(), - ui_token_amount: token_balance.ui_token_amount.ui_amount, - owner: token_balance.owner.clone(), - } - } -} - -impl From<&TransactionStatusMeta> for DbTransactionStatusMeta { - fn from(meta: &TransactionStatusMeta) -> Self { - Self { - error: get_transaction_error(&meta.status), - fee: meta.fee as i64, - pre_balances: meta - .pre_balances - .iter() - .map(|balance| *balance as i64) - .collect(), - post_balances: meta - .post_balances - .iter() - .map(|balance| *balance as i64) - .collect(), - inner_instructions: meta - .inner_instructions - .as_ref() - .map(|instructions| instructions.iter().map(DbInnerInstructions::from).collect()), - log_messages: meta.log_messages.clone(), - pre_token_balances: meta.pre_token_balances.as_ref().map(|balances| { - balances - .iter() - .map(DbTransactionTokenBalance::from) - .collect() - }), - post_token_balances: meta.post_token_balances.as_ref().map(|balances| { - balances - .iter() - .map(DbTransactionTokenBalance::from) - .collect() - }), - rewards: meta - .rewards - .as_ref() - .map(|rewards| rewards.iter().map(DbReward::from).collect()), - } - } -} - -fn build_db_transaction(slot: u64, transaction_info: &ReplicaTransactionInfo) -> DbTransaction { - DbTransaction { - signature: transaction_info.signature.as_ref().to_vec(), - is_vote: transaction_info.is_vote, - slot: slot as i64, - message_type: match transaction_info.transaction.message() { - SanitizedMessage::Legacy(_) => 0, - SanitizedMessage::V0(_) => 1, - }, - legacy_message: match transaction_info.transaction.message() { - SanitizedMessage::Legacy(legacy_message) => { - Some(DbTransactionMessage::from(legacy_message)) - } - _ => None, - }, - v0_loaded_message: match transaction_info.transaction.message() { - SanitizedMessage::V0(loaded_message) => Some(DbLoadedMessageV0::from(loaded_message)), - _ => None, - }, - signatures: transaction_info - .transaction - .signatures() - .iter() - .map(|signature| signature.as_ref().to_vec()) - .collect(), - message_hash: transaction_info - .transaction - .message_hash() - .as_ref() - .to_vec(), - meta: DbTransactionStatusMeta::from(transaction_info.transaction_status_meta), - } -} - -impl SimplePostgresClient { - pub(crate) fn build_transaction_info_upsert_statement( - client: &mut Client, - config: &AccountsDbPluginPostgresConfig, - ) -> Result { - let stmt = "INSERT INTO transaction AS txn (signature, is_vote, slot, message_type, legacy_message, \ - v0_loaded_message, signatures, message_hash, meta, updated_on) \ - VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10) \ - ON CONFLICT (slot, signature) DO UPDATE SET is_vote=excluded.is_vote, \ - message_type=excluded.message_type, \ - legacy_message=excluded.legacy_message, \ - v0_loaded_message=excluded.v0_loaded_message, \ - signatures=excluded.signatures, \ - message_hash=excluded.message_hash, \ - meta=excluded.meta, \ - updated_on=excluded.updated_on"; - - let stmt = client.prepare(stmt); - - match stmt { - Err(err) => { - return Err(AccountsDbPluginError::Custom(Box::new(AccountsDbPluginPostgresError::DataSchemaError { - msg: format!( - "Error in preparing for the transaction update PostgreSQL database: ({}) host: {:?} user: {:?} config: {:?}", - err, config.host, config.user, config - ), - }))); - } - Ok(stmt) => Ok(stmt), - } - } - - pub(crate) fn log_transaction_impl( - &mut self, - transaction_log_info: LogTransactionRequest, - ) -> Result<(), AccountsDbPluginError> { - let client = self.client.get_mut().unwrap(); - let statement = &client.update_transaction_log_stmt; - let client = &mut client.client; - let updated_on = Utc::now().naive_utc(); - - let transaction_info = transaction_log_info.transaction_info; - let result = client.query( - statement, - &[ - &transaction_info.signature, - &transaction_info.is_vote, - &transaction_info.slot, - &transaction_info.message_type, - &transaction_info.legacy_message, - &transaction_info.v0_loaded_message, - &transaction_info.signatures, - &transaction_info.message_hash, - &transaction_info.meta, - &updated_on, - ], - ); - - if let Err(err) = result { - let msg = format!( - "Failed to persist the update of transaction info to the PostgreSQL database. Error: {:?}", - err - ); - error!("{}", msg); - return Err(AccountsDbPluginError::AccountsUpdateError { msg }); - } - - Ok(()) - } -} - -impl ParallelPostgresClient { - fn build_transaction_request( - slot: u64, - transaction_info: &ReplicaTransactionInfo, - ) -> LogTransactionRequest { - LogTransactionRequest { - transaction_info: build_db_transaction(slot, transaction_info), - } - } - - pub fn log_transaction_info( - &mut self, - transaction_info: &ReplicaTransactionInfo, - slot: u64, - ) -> Result<(), AccountsDbPluginError> { - let wrk_item = DbWorkItem::LogTransaction(Box::new(Self::build_transaction_request( - slot, - transaction_info, - ))); - - if let Err(err) = self.sender.send(wrk_item) { - return Err(AccountsDbPluginError::SlotStatusUpdateError { - msg: format!("Failed to update the transaction, error: {:?}", err), - }); - } - Ok(()) - } -} - -#[cfg(test)] -pub(crate) mod tests { - use { - super::*, - solana_account_decoder::parse_token::UiTokenAmount, - solana_sdk::{ - hash::Hash, - message::VersionedMessage, - pubkey::Pubkey, - sanitize::Sanitize, - signature::{Keypair, Signature, Signer}, - system_transaction, - transaction::{SanitizedTransaction, Transaction, VersionedTransaction}, - }, - }; - - fn check_compiled_instruction_equality( - compiled_instruction: &CompiledInstruction, - db_compiled_instruction: &DbCompiledInstruction, - ) { - assert_eq!( - compiled_instruction.program_id_index, - db_compiled_instruction.program_id_index as u8 - ); - assert_eq!( - compiled_instruction.accounts.len(), - db_compiled_instruction.accounts.len() - ); - assert_eq!( - compiled_instruction.data.len(), - db_compiled_instruction.data.len() - ); - - for i in 0..compiled_instruction.accounts.len() { - assert_eq!( - compiled_instruction.accounts[i], - db_compiled_instruction.accounts[i] as u8 - ) - } - for i in 0..compiled_instruction.data.len() { - assert_eq!( - compiled_instruction.data[i], - db_compiled_instruction.data[i] as u8 - ) - } - } - - #[test] - fn test_transform_compiled_instruction() { - let compiled_instruction = CompiledInstruction { - program_id_index: 0, - accounts: vec![1, 2, 3], - data: vec![4, 5, 6], - }; - - let db_compiled_instruction = DbCompiledInstruction::from(&compiled_instruction); - check_compiled_instruction_equality(&compiled_instruction, &db_compiled_instruction); - } - - fn check_inner_instructions_equality( - inner_instructions: &InnerInstructions, - db_inner_instructions: &DbInnerInstructions, - ) { - assert_eq!(inner_instructions.index, db_inner_instructions.index as u8); - assert_eq!( - inner_instructions.instructions.len(), - db_inner_instructions.instructions.len() - ); - - for i in 0..inner_instructions.instructions.len() { - check_compiled_instruction_equality( - &inner_instructions.instructions[i], - &db_inner_instructions.instructions[i], - ) - } - } - - #[test] - fn test_transform_inner_instructions() { - let inner_instructions = InnerInstructions { - index: 0, - instructions: vec![ - CompiledInstruction { - program_id_index: 0, - accounts: vec![1, 2, 3], - data: vec![4, 5, 6], - }, - CompiledInstruction { - program_id_index: 1, - accounts: vec![12, 13, 14], - data: vec![24, 25, 26], - }, - ], - }; - - let db_inner_instructions = DbInnerInstructions::from(&inner_instructions); - check_inner_instructions_equality(&inner_instructions, &db_inner_instructions); - } - - fn check_address_table_lookups_equality( - address_table_lookups: &MessageAddressTableLookup, - db_address_table_lookups: &DbTransactionMessageAddressTableLookup, - ) { - assert_eq!( - address_table_lookups.writable_indexes.len(), - db_address_table_lookups.writable_indexes.len() - ); - assert_eq!( - address_table_lookups.readonly_indexes.len(), - db_address_table_lookups.readonly_indexes.len() - ); - - for i in 0..address_table_lookups.writable_indexes.len() { - assert_eq!( - address_table_lookups.writable_indexes[i], - db_address_table_lookups.writable_indexes[i] as u8 - ) - } - for i in 0..address_table_lookups.readonly_indexes.len() { - assert_eq!( - address_table_lookups.readonly_indexes[i], - db_address_table_lookups.readonly_indexes[i] as u8 - ) - } - } - - #[test] - fn test_transform_address_table_lookups() { - let address_table_lookups = MessageAddressTableLookup { - account_key: Pubkey::new_unique(), - writable_indexes: vec![1, 2, 3], - readonly_indexes: vec![4, 5, 6], - }; - - let db_address_table_lookups = - DbTransactionMessageAddressTableLookup::from(&address_table_lookups); - check_address_table_lookups_equality(&address_table_lookups, &db_address_table_lookups); - } - - fn check_reward_equality(reward: &Reward, db_reward: &DbReward) { - assert_eq!(reward.pubkey, db_reward.pubkey); - assert_eq!(reward.lamports, db_reward.lamports); - assert_eq!(reward.post_balance, db_reward.post_balance as u64); - assert_eq!(get_reward_type(&reward.reward_type), db_reward.reward_type); - assert_eq!( - reward.commission, - db_reward - .commission - .as_ref() - .map(|commission| *commission as u8) - ); - } - - #[test] - fn test_transform_reward() { - let reward = Reward { - pubkey: Pubkey::new_unique().to_string(), - lamports: 1234, - post_balance: 45678, - reward_type: Some(RewardType::Fee), - commission: Some(12), - }; - - let db_reward = DbReward::from(&reward); - check_reward_equality(&reward, &db_reward); - } - - fn check_transaction_token_balance_equality( - transaction_token_balance: &TransactionTokenBalance, - db_transaction_token_balance: &DbTransactionTokenBalance, - ) { - assert_eq!( - transaction_token_balance.account_index, - db_transaction_token_balance.account_index as u8 - ); - assert_eq!( - transaction_token_balance.mint, - db_transaction_token_balance.mint - ); - assert_eq!( - transaction_token_balance.ui_token_amount.ui_amount, - db_transaction_token_balance.ui_token_amount - ); - assert_eq!( - transaction_token_balance.owner, - db_transaction_token_balance.owner - ); - } - - #[test] - fn test_transform_transaction_token_balance() { - let transaction_token_balance = TransactionTokenBalance { - account_index: 3, - mint: Pubkey::new_unique().to_string(), - ui_token_amount: UiTokenAmount { - ui_amount: Some(0.42), - decimals: 2, - amount: "42".to_string(), - ui_amount_string: "0.42".to_string(), - }, - owner: Pubkey::new_unique().to_string(), - }; - - let db_transaction_token_balance = - DbTransactionTokenBalance::from(&transaction_token_balance); - - check_transaction_token_balance_equality( - &transaction_token_balance, - &db_transaction_token_balance, - ); - } - - fn check_token_balances( - token_balances: &Option>, - db_token_balances: &Option>, - ) { - assert_eq!( - token_balances - .as_ref() - .map(|token_balances| token_balances.len()), - db_token_balances - .as_ref() - .map(|token_balances| token_balances.len()), - ); - - if token_balances.is_some() { - for i in 0..token_balances.as_ref().unwrap().len() { - check_transaction_token_balance_equality( - &token_balances.as_ref().unwrap()[i], - &db_token_balances.as_ref().unwrap()[i], - ); - } - } - } - - fn check_transaction_status_meta( - transaction_status_meta: &TransactionStatusMeta, - db_transaction_status_meta: &DbTransactionStatusMeta, - ) { - assert_eq!( - get_transaction_error(&transaction_status_meta.status), - db_transaction_status_meta.error - ); - assert_eq!( - transaction_status_meta.fee, - db_transaction_status_meta.fee as u64 - ); - assert_eq!( - transaction_status_meta.pre_balances.len(), - db_transaction_status_meta.pre_balances.len() - ); - - for i in 0..transaction_status_meta.pre_balances.len() { - assert_eq!( - transaction_status_meta.pre_balances[i], - db_transaction_status_meta.pre_balances[i] as u64 - ); - } - assert_eq!( - transaction_status_meta.post_balances.len(), - db_transaction_status_meta.post_balances.len() - ); - for i in 0..transaction_status_meta.post_balances.len() { - assert_eq!( - transaction_status_meta.post_balances[i], - db_transaction_status_meta.post_balances[i] as u64 - ); - } - assert_eq!( - transaction_status_meta - .inner_instructions - .as_ref() - .map(|inner_instructions| inner_instructions.len()), - db_transaction_status_meta - .inner_instructions - .as_ref() - .map(|inner_instructions| inner_instructions.len()), - ); - - if transaction_status_meta.inner_instructions.is_some() { - for i in 0..transaction_status_meta - .inner_instructions - .as_ref() - .unwrap() - .len() - { - check_inner_instructions_equality( - &transaction_status_meta.inner_instructions.as_ref().unwrap()[i], - &db_transaction_status_meta - .inner_instructions - .as_ref() - .unwrap()[i], - ); - } - } - - assert_eq!( - transaction_status_meta - .log_messages - .as_ref() - .map(|log_messages| log_messages.len()), - db_transaction_status_meta - .log_messages - .as_ref() - .map(|log_messages| log_messages.len()), - ); - - if transaction_status_meta.log_messages.is_some() { - for i in 0..transaction_status_meta.log_messages.as_ref().unwrap().len() { - assert_eq!( - &transaction_status_meta.log_messages.as_ref().unwrap()[i], - &db_transaction_status_meta.log_messages.as_ref().unwrap()[i] - ); - } - } - - check_token_balances( - &transaction_status_meta.pre_token_balances, - &db_transaction_status_meta.pre_token_balances, - ); - - check_token_balances( - &transaction_status_meta.post_token_balances, - &db_transaction_status_meta.post_token_balances, - ); - - assert_eq!( - transaction_status_meta - .rewards - .as_ref() - .map(|rewards| rewards.len()), - db_transaction_status_meta - .rewards - .as_ref() - .map(|rewards| rewards.len()), - ); - - if transaction_status_meta.rewards.is_some() { - for i in 0..transaction_status_meta.rewards.as_ref().unwrap().len() { - check_reward_equality( - &transaction_status_meta.rewards.as_ref().unwrap()[i], - &db_transaction_status_meta.rewards.as_ref().unwrap()[i], - ); - } - } - } - - fn build_transaction_status_meta() -> TransactionStatusMeta { - TransactionStatusMeta { - status: Ok(()), - fee: 23456, - pre_balances: vec![11, 22, 33], - post_balances: vec![44, 55, 66], - inner_instructions: Some(vec![InnerInstructions { - index: 0, - instructions: vec![ - CompiledInstruction { - program_id_index: 0, - accounts: vec![1, 2, 3], - data: vec![4, 5, 6], - }, - CompiledInstruction { - program_id_index: 1, - accounts: vec![12, 13, 14], - data: vec![24, 25, 26], - }, - ], - }]), - log_messages: Some(vec!["message1".to_string(), "message2".to_string()]), - pre_token_balances: Some(vec![ - TransactionTokenBalance { - account_index: 3, - mint: Pubkey::new_unique().to_string(), - ui_token_amount: UiTokenAmount { - ui_amount: Some(0.42), - decimals: 2, - amount: "42".to_string(), - ui_amount_string: "0.42".to_string(), - }, - owner: Pubkey::new_unique().to_string(), - }, - TransactionTokenBalance { - account_index: 2, - mint: Pubkey::new_unique().to_string(), - ui_token_amount: UiTokenAmount { - ui_amount: Some(0.38), - decimals: 2, - amount: "38".to_string(), - ui_amount_string: "0.38".to_string(), - }, - owner: Pubkey::new_unique().to_string(), - }, - ]), - post_token_balances: Some(vec![ - TransactionTokenBalance { - account_index: 3, - mint: Pubkey::new_unique().to_string(), - ui_token_amount: UiTokenAmount { - ui_amount: Some(0.82), - decimals: 2, - amount: "82".to_string(), - ui_amount_string: "0.82".to_string(), - }, - owner: Pubkey::new_unique().to_string(), - }, - TransactionTokenBalance { - account_index: 2, - mint: Pubkey::new_unique().to_string(), - ui_token_amount: UiTokenAmount { - ui_amount: Some(0.48), - decimals: 2, - amount: "48".to_string(), - ui_amount_string: "0.48".to_string(), - }, - owner: Pubkey::new_unique().to_string(), - }, - ]), - rewards: Some(vec![ - Reward { - pubkey: Pubkey::new_unique().to_string(), - lamports: 1234, - post_balance: 45678, - reward_type: Some(RewardType::Fee), - commission: Some(12), - }, - Reward { - pubkey: Pubkey::new_unique().to_string(), - lamports: 234, - post_balance: 324, - reward_type: Some(RewardType::Staking), - commission: Some(11), - }, - ]), - } - } - - #[test] - fn test_transform_transaction_status_meta() { - let transaction_status_meta = build_transaction_status_meta(); - let db_transaction_status_meta = DbTransactionStatusMeta::from(&transaction_status_meta); - check_transaction_status_meta(&transaction_status_meta, &db_transaction_status_meta); - } - - fn check_message_header_equality( - message_header: &MessageHeader, - db_message_header: &DbTransactionMessageHeader, - ) { - assert_eq!( - message_header.num_readonly_signed_accounts, - db_message_header.num_readonly_signed_accounts as u8 - ); - assert_eq!( - message_header.num_readonly_unsigned_accounts, - db_message_header.num_readonly_unsigned_accounts as u8 - ); - assert_eq!( - message_header.num_required_signatures, - db_message_header.num_required_signatures as u8 - ); - } - - #[test] - fn test_transform_transaction_message_header() { - let message_header = MessageHeader { - num_readonly_signed_accounts: 1, - num_readonly_unsigned_accounts: 2, - num_required_signatures: 3, - }; - - let db_message_header = DbTransactionMessageHeader::from(&message_header); - check_message_header_equality(&message_header, &db_message_header) - } - - fn check_transaction_message_equality(message: &Message, db_message: &DbTransactionMessage) { - check_message_header_equality(&message.header, &db_message.header); - assert_eq!(message.account_keys.len(), db_message.account_keys.len()); - for i in 0..message.account_keys.len() { - assert_eq!(message.account_keys[i].as_ref(), db_message.account_keys[i]); - } - assert_eq!(message.instructions.len(), db_message.instructions.len()); - for i in 0..message.instructions.len() { - check_compiled_instruction_equality( - &message.instructions[i], - &db_message.instructions[i], - ); - } - } - - fn build_message() -> Message { - Message { - header: MessageHeader { - num_readonly_signed_accounts: 11, - num_readonly_unsigned_accounts: 12, - num_required_signatures: 13, - }, - account_keys: vec![Pubkey::new_unique(), Pubkey::new_unique()], - recent_blockhash: Hash::new_unique(), - instructions: vec![ - CompiledInstruction { - program_id_index: 0, - accounts: vec![1, 2, 3], - data: vec![4, 5, 6], - }, - CompiledInstruction { - program_id_index: 3, - accounts: vec![11, 12, 13], - data: vec![14, 15, 16], - }, - ], - } - } - - #[test] - fn test_transform_transaction_message() { - let message = build_message(); - - let db_message = DbTransactionMessage::from(&message); - check_transaction_message_equality(&message, &db_message); - } - - fn check_transaction_message_v0_equality( - message: &v0::Message, - db_message: &DbTransactionMessageV0, - ) { - check_message_header_equality(&message.header, &db_message.header); - assert_eq!(message.account_keys.len(), db_message.account_keys.len()); - for i in 0..message.account_keys.len() { - assert_eq!(message.account_keys[i].as_ref(), db_message.account_keys[i]); - } - assert_eq!(message.instructions.len(), db_message.instructions.len()); - for i in 0..message.instructions.len() { - check_compiled_instruction_equality( - &message.instructions[i], - &db_message.instructions[i], - ); - } - assert_eq!( - message.address_table_lookups.len(), - db_message.address_table_lookups.len() - ); - for i in 0..message.address_table_lookups.len() { - check_address_table_lookups_equality( - &message.address_table_lookups[i], - &db_message.address_table_lookups[i], - ); - } - } - - fn build_transaction_message_v0() -> v0::Message { - v0::Message { - header: MessageHeader { - num_readonly_signed_accounts: 2, - num_readonly_unsigned_accounts: 2, - num_required_signatures: 3, - }, - account_keys: vec![ - Pubkey::new_unique(), - Pubkey::new_unique(), - Pubkey::new_unique(), - Pubkey::new_unique(), - Pubkey::new_unique(), - ], - recent_blockhash: Hash::new_unique(), - instructions: vec![ - CompiledInstruction { - program_id_index: 1, - accounts: vec![1, 2, 3], - data: vec![4, 5, 6], - }, - CompiledInstruction { - program_id_index: 2, - accounts: vec![0, 1, 2], - data: vec![14, 15, 16], - }, - ], - address_table_lookups: vec![ - MessageAddressTableLookup { - account_key: Pubkey::new_unique(), - writable_indexes: vec![0], - readonly_indexes: vec![1, 2], - }, - MessageAddressTableLookup { - account_key: Pubkey::new_unique(), - writable_indexes: vec![1], - readonly_indexes: vec![0, 2], - }, - ], - } - } - - #[test] - fn test_transform_transaction_message_v0() { - let message = build_transaction_message_v0(); - - let db_message = DbTransactionMessageV0::from(&message); - check_transaction_message_v0_equality(&message, &db_message); - } - - fn check_loaded_addresses( - loaded_addresses: &LoadedAddresses, - db_loaded_addresses: &DbLoadedAddresses, - ) { - assert_eq!( - loaded_addresses.writable.len(), - db_loaded_addresses.writable.len() - ); - for i in 0..loaded_addresses.writable.len() { - assert_eq!( - loaded_addresses.writable[i].as_ref(), - db_loaded_addresses.writable[i] - ); - } - - assert_eq!( - loaded_addresses.readonly.len(), - db_loaded_addresses.readonly.len() - ); - for i in 0..loaded_addresses.readonly.len() { - assert_eq!( - loaded_addresses.readonly[i].as_ref(), - db_loaded_addresses.readonly[i] - ); - } - } - - fn check_loaded_message_v0_equality( - message: &v0::LoadedMessage, - db_message: &DbLoadedMessageV0, - ) { - check_transaction_message_v0_equality(&message.message, &db_message.message); - check_loaded_addresses(&message.loaded_addresses, &db_message.loaded_addresses); - } - - #[test] - fn test_transform_loaded_message_v0() { - let message = v0::LoadedMessage { - message: build_transaction_message_v0(), - loaded_addresses: LoadedAddresses { - writable: vec![Pubkey::new_unique(), Pubkey::new_unique()], - readonly: vec![Pubkey::new_unique(), Pubkey::new_unique()], - }, - }; - - let db_message = DbLoadedMessageV0::from(&message); - check_loaded_message_v0_equality(&message, &db_message); - } - - fn check_transaction( - slot: u64, - transaction: &ReplicaTransactionInfo, - db_transaction: &DbTransaction, - ) { - assert_eq!(transaction.signature.as_ref(), db_transaction.signature); - assert_eq!(transaction.is_vote, db_transaction.is_vote); - assert_eq!(slot, db_transaction.slot as u64); - match transaction.transaction.message() { - SanitizedMessage::Legacy(message) => { - assert_eq!(db_transaction.message_type, 0); - check_transaction_message_equality( - message, - db_transaction.legacy_message.as_ref().unwrap(), - ); - } - SanitizedMessage::V0(message) => { - assert_eq!(db_transaction.message_type, 1); - check_loaded_message_v0_equality( - message, - db_transaction.v0_loaded_message.as_ref().unwrap(), - ); - } - } - - assert_eq!( - transaction.transaction.signatures().len(), - db_transaction.signatures.len() - ); - - for i in 0..transaction.transaction.signatures().len() { - assert_eq!( - transaction.transaction.signatures()[i].as_ref(), - db_transaction.signatures[i] - ); - } - - assert_eq!( - transaction.transaction.message_hash().as_ref(), - db_transaction.message_hash - ); - - check_transaction_status_meta(transaction.transaction_status_meta, &db_transaction.meta); - } - - fn build_test_transaction_legacy() -> Transaction { - let keypair1 = Keypair::new(); - let pubkey1 = keypair1.pubkey(); - let zero = Hash::default(); - system_transaction::transfer(&keypair1, &pubkey1, 42, zero) - } - - #[test] - fn test_build_db_transaction_legacy() { - let signature = Signature::new(&[1u8; 64]); - - let message_hash = Hash::new_unique(); - let transaction = build_test_transaction_legacy(); - - let transaction = VersionedTransaction::from(transaction); - - let transaction = - SanitizedTransaction::try_create(transaction, message_hash, Some(true), |_| { - Err(TransactionError::UnsupportedVersion) - }) - .unwrap(); - - let transaction_status_meta = build_transaction_status_meta(); - let transaction_info = ReplicaTransactionInfo { - signature: &signature, - is_vote: false, - transaction: &transaction, - transaction_status_meta: &transaction_status_meta, - }; - - let slot = 54; - let db_transaction = build_db_transaction(slot, &transaction_info); - check_transaction(slot, &transaction_info, &db_transaction); - } - - fn build_test_transaction_v0() -> VersionedTransaction { - VersionedTransaction { - signatures: vec![ - Signature::new(&[1u8; 64]), - Signature::new(&[2u8; 64]), - Signature::new(&[3u8; 64]), - ], - message: VersionedMessage::V0(build_transaction_message_v0()), - } - } - - #[test] - fn test_build_db_transaction_v0() { - let signature = Signature::new(&[1u8; 64]); - - let message_hash = Hash::new_unique(); - let transaction = build_test_transaction_v0(); - - transaction.sanitize().unwrap(); - - let transaction = - SanitizedTransaction::try_create(transaction, message_hash, Some(true), |_message| { - Ok(LoadedAddresses { - writable: vec![Pubkey::new_unique(), Pubkey::new_unique()], - readonly: vec![Pubkey::new_unique(), Pubkey::new_unique()], - }) - }) - .unwrap(); - - let transaction_status_meta = build_transaction_status_meta(); - let transaction_info = ReplicaTransactionInfo { - signature: &signature, - is_vote: true, - transaction: &transaction, - transaction_status_meta: &transaction_status_meta, - }; - - let slot = 54; - let db_transaction = build_db_transaction(slot, &transaction_info); - check_transaction(slot, &transaction_info, &db_transaction); - } -} diff --git a/accountsdb-plugin-postgres/src/transaction_selector.rs b/accountsdb-plugin-postgres/src/transaction_selector.rs deleted file mode 100644 index 7c256f3c07b6a9..00000000000000 --- a/accountsdb-plugin-postgres/src/transaction_selector.rs +++ /dev/null @@ -1,194 +0,0 @@ -/// The transaction selector is responsible for filtering transactions -/// in the plugin framework. -use {log::*, solana_sdk::pubkey::Pubkey, std::collections::HashSet}; - -pub(crate) struct TransactionSelector { - pub mentioned_addresses: HashSet>, - pub select_all_transactions: bool, - pub select_all_vote_transactions: bool, -} - -#[allow(dead_code)] -impl TransactionSelector { - pub fn default() -> Self { - Self { - mentioned_addresses: HashSet::default(), - select_all_transactions: false, - select_all_vote_transactions: false, - } - } - - /// Create a selector based on the mentioned addresses - /// To select all transactions use ["*"] or ["all"] - /// To select all vote transactions, use ["all_votes"] - /// To select transactions mentioning specific addresses use ["", "", ...] - pub fn new(mentioned_addresses: &[String]) -> Self { - info!( - "Creating TransactionSelector from addresses: {:?}", - mentioned_addresses - ); - - let select_all_transactions = mentioned_addresses - .iter() - .any(|key| key == "*" || key == "all"); - if select_all_transactions { - return Self { - mentioned_addresses: HashSet::default(), - select_all_transactions, - select_all_vote_transactions: true, - }; - } - let select_all_vote_transactions = mentioned_addresses.iter().any(|key| key == "all_votes"); - if select_all_vote_transactions { - return Self { - mentioned_addresses: HashSet::default(), - select_all_transactions, - select_all_vote_transactions: true, - }; - } - - let mentioned_addresses = mentioned_addresses - .iter() - .map(|key| bs58::decode(key).into_vec().unwrap()) - .collect(); - - Self { - mentioned_addresses, - select_all_transactions: false, - select_all_vote_transactions: false, - } - } - - /// Check if a transaction is of interest. - pub fn is_transaction_selected( - &self, - is_vote: bool, - mentioned_addresses: Box + '_>, - ) -> bool { - if !self.is_enabled() { - return false; - } - - if self.select_all_transactions || (self.select_all_vote_transactions && is_vote) { - return true; - } - for address in mentioned_addresses { - if self.mentioned_addresses.contains(address.as_ref()) { - return true; - } - } - false - } - - /// Check if any transaction is of interest at all - pub fn is_enabled(&self) -> bool { - self.select_all_transactions - || self.select_all_vote_transactions - || !self.mentioned_addresses.is_empty() - } -} - -#[cfg(test)] -pub(crate) mod tests { - use super::*; - - #[test] - fn test_select_transaction() { - let pubkey1 = Pubkey::new_unique(); - let pubkey2 = Pubkey::new_unique(); - - let selector = TransactionSelector::new(&[pubkey1.to_string()]); - - assert!(selector.is_enabled()); - - let addresses = [pubkey1]; - - assert!(selector.is_transaction_selected(false, Box::new(addresses.iter()))); - - let addresses = [pubkey2]; - assert!(!selector.is_transaction_selected(false, Box::new(addresses.iter()))); - - let addresses = [pubkey1, pubkey2]; - assert!(selector.is_transaction_selected(false, Box::new(addresses.iter()))); - } - - #[test] - fn test_select_all_transaction_using_wildcard() { - let pubkey1 = Pubkey::new_unique(); - let pubkey2 = Pubkey::new_unique(); - - let selector = TransactionSelector::new(&["*".to_string()]); - - assert!(selector.is_enabled()); - - let addresses = [pubkey1]; - - assert!(selector.is_transaction_selected(false, Box::new(addresses.iter()))); - - let addresses = [pubkey2]; - assert!(selector.is_transaction_selected(false, Box::new(addresses.iter()))); - - let addresses = [pubkey1, pubkey2]; - assert!(selector.is_transaction_selected(false, Box::new(addresses.iter()))); - } - - #[test] - fn test_select_all_transaction_all() { - let pubkey1 = Pubkey::new_unique(); - let pubkey2 = Pubkey::new_unique(); - - let selector = TransactionSelector::new(&["all".to_string()]); - - assert!(selector.is_enabled()); - - let addresses = [pubkey1]; - - assert!(selector.is_transaction_selected(false, Box::new(addresses.iter()))); - - let addresses = [pubkey2]; - assert!(selector.is_transaction_selected(false, Box::new(addresses.iter()))); - - let addresses = [pubkey1, pubkey2]; - assert!(selector.is_transaction_selected(false, Box::new(addresses.iter()))); - } - - #[test] - fn test_select_all_vote_transaction() { - let pubkey1 = Pubkey::new_unique(); - let pubkey2 = Pubkey::new_unique(); - - let selector = TransactionSelector::new(&["all_votes".to_string()]); - - assert!(selector.is_enabled()); - - let addresses = [pubkey1]; - - assert!(!selector.is_transaction_selected(false, Box::new(addresses.iter()))); - - let addresses = [pubkey2]; - assert!(selector.is_transaction_selected(true, Box::new(addresses.iter()))); - - let addresses = [pubkey1, pubkey2]; - assert!(selector.is_transaction_selected(true, Box::new(addresses.iter()))); - } - - #[test] - fn test_select_no_transaction() { - let pubkey1 = Pubkey::new_unique(); - let pubkey2 = Pubkey::new_unique(); - - let selector = TransactionSelector::new(&[]); - - assert!(!selector.is_enabled()); - - let addresses = [pubkey1]; - - assert!(!selector.is_transaction_selected(false, Box::new(addresses.iter()))); - - let addresses = [pubkey2]; - assert!(!selector.is_transaction_selected(true, Box::new(addresses.iter()))); - - let addresses = [pubkey1, pubkey2]; - assert!(!selector.is_transaction_selected(true, Box::new(addresses.iter()))); - } -}