From dfa6bf70977913d19a51d069e1738e9c337a3054 Mon Sep 17 00:00:00 2001 From: Larry Liu Date: Wed, 1 Mar 2023 16:16:23 -0800 Subject: [PATCH] refactor the service. --- Cargo.lock | 124 ++++---- Cargo.toml | 6 +- .../src/pb/aptos.datastream.v1.rs | 258 +++++++++-------- .../src/pb/aptos.transaction.v1.serde.rs | 28 +- .../indexer-grpc-cache-worker/src/worker.rs | 241 ++++++++-------- .../indexer-grpc-data-service/src/service.rs | 269 +++++++++++++----- .../Cargo.toml | 31 -- .../README.md | 18 -- .../src/lib.rs | 4 - .../src/main.rs | 50 ---- .../src/worker.rs | 119 -------- .../indexer-grpc-file-store/src/processor.rs | 176 ++++++++---- .../indexer-grpc-utils/src/cache_operator.rs | 18 +- .../src/file_store_operator.rs | 125 ++++++-- .../indexer-grpc-utils/src/lib.rs | 15 + 15 files changed, 762 insertions(+), 720 deletions(-) delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/Cargo.toml delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/README.md delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/lib.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/main.rs delete mode 100644 ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/worker.rs diff --git a/Cargo.lock b/Cargo.lock index 31e0b578640dc..82f35181adc85 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -161,7 +161,7 @@ dependencies = [ "base64 0.13.0", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", "chrono", - "clap 3.2.17", + "clap 3.2.23", "clap_complete", "codespan-reporting", "dirs", @@ -366,7 +366,7 @@ dependencies = [ "async-trait", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", "bytes 1.2.1", - "clap 3.2.17", + "clap 3.2.23", "futures", "itertools", "move-binary-format", @@ -784,7 +784,7 @@ dependencies = [ "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", "byteorder", "claims", - "clap 3.2.17", + "clap 3.2.23", "dashmap", "itertools", "lru", @@ -863,7 +863,7 @@ dependencies = [ "aptos-push-metrics", "aptos-temppath", "aptos-types", - "clap 3.2.17", + "clap 3.2.23", "owo-colors", "tokio", ] @@ -882,7 +882,7 @@ dependencies = [ "aptos-types", "aptos-validator-interface", "aptos-vm", - "clap 3.2.17", + "clap 3.2.23", "move-binary-format", "move-cli", "move-compiler", @@ -1062,7 +1062,7 @@ dependencies = [ "aptos-warp-webserver", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", "bytes 1.2.1", - "clap 3.2.17", + "clap 3.2.23", "futures", "hex", "rand 0.7.3", @@ -1085,7 +1085,7 @@ dependencies = [ "aptos-faucet", "aptos-logger", "aptos-sdk", - "clap 3.2.17", + "clap 3.2.23", "serde 1.0.149", "serde_yaml 0.8.26", "tokio", @@ -1100,7 +1100,7 @@ dependencies = [ "aptos-logger", "aptos-node-checker", "aptos-sdk", - "clap 3.2.17", + "clap 3.2.23", "env_logger 0.9.0", "futures", "gcp-bigquery-client", @@ -1202,7 +1202,7 @@ dependencies = [ "better_any", "blake2-rfc", "claims", - "clap 3.2.17", + "clap 3.2.23", "codespan-reporting", "curve25519-dalek", "flate2", @@ -1304,7 +1304,7 @@ dependencies = [ "aptos-package-builder", "aptos-types", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", - "clap 3.2.17", + "clap 3.2.23", "move-binary-format", "move-core-types", "move-model", @@ -1387,7 +1387,7 @@ dependencies = [ "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", "bigdecimal", "chrono", - "clap 3.2.17", + "clap 3.2.23", "diesel", "diesel_migrations", "field_count", @@ -1418,7 +1418,7 @@ dependencies = [ "aptos-runtimes", "async-trait", "backoff", - "clap 3.2.17", + "clap 3.2.23", "futures", "futures-core", "once_cell", @@ -1443,7 +1443,7 @@ dependencies = [ "aptos-moving-average", "aptos-protos", "aptos-runtimes", - "clap 3.2.17", + "clap 3.2.23", "cloud-storage", "futures", "prost", @@ -1466,7 +1466,7 @@ dependencies = [ "aptos-logger", "aptos-moving-average", "aptos-runtimes", - "clap 3.2.17", + "clap 3.2.23", "cloud-storage", "futures-util", "redis", @@ -1476,28 +1476,6 @@ dependencies = [ "warp", ] -[[package]] -name = "aptos-indexer-grpc-file-store-data-verifier" -version = "0.1.0" -dependencies = [ - "anyhow", - "aptos-crash-handler", - "aptos-indexer-grpc-utils", - "aptos-logger", - "aptos-moving-average", - "aptos-protos", - "aptos-runtimes", - "clap 3.2.17", - "cloud-storage", - "futures", - "futures-util", - "redis", - "serde 1.0.149", - "serde_json", - "tokio", - "warp", -] - [[package]] name = "aptos-indexer-grpc-fullnode" version = "0.1.0" @@ -1788,7 +1766,7 @@ dependencies = [ "aptos-gas", "aptos-types", "aptos-vm", - "clap 3.2.17", + "clap 3.2.23", "move-cli", "move-package", "move-prover", @@ -1917,7 +1895,7 @@ dependencies = [ "aptos-logger", "aptos-network", "aptos-types", - "clap 3.2.17", + "clap 3.2.23", "futures", "serde 1.0.149", "tokio", @@ -2000,7 +1978,7 @@ dependencies = [ "aptos-types", "aptos-vm", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", - "clap 3.2.17", + "clap 3.2.23", "fail 0.5.0", "futures", "hex", @@ -2028,7 +2006,7 @@ dependencies = [ "aptos-sdk", "aptos-transaction-emitter-lib", "async-trait", - "clap 3.2.17", + "clap 3.2.23", "const_format", "env_logger 0.9.0", "futures", @@ -2102,7 +2080,7 @@ dependencies = [ "aptos-mempool", "aptos-storage-interface", "aptos-types", - "clap 3.2.17", + "clap 3.2.23", ] [[package]] @@ -2231,7 +2209,7 @@ dependencies = [ "aptos-types", "aptos-vm-genesis", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", - "clap 3.2.17", + "clap 3.2.23", "futures", "git2 0.16.1", "hex", @@ -2271,7 +2249,7 @@ dependencies = [ "aptos-types", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", "bytes 1.2.1", - "clap 3.2.17", + "clap 3.2.23", "futures", "hex", "move-binary-format", @@ -2318,7 +2296,7 @@ dependencies = [ "aptos-types", "aptos-warp-webserver", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", - "clap 3.2.17", + "clap 3.2.23", "futures", "hex", "itertools", @@ -2343,7 +2321,7 @@ dependencies = [ "aptos-logger", "aptos-rosetta", "aptos-types", - "clap 3.2.17", + "clap 3.2.23", "serde 1.0.149", "serde_json", "tokio", @@ -2729,7 +2707,7 @@ dependencies = [ "base64 0.13.0", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", "chrono", - "clap 3.2.17", + "clap 3.2.23", "debug-ignore", "flate2", "futures", @@ -2829,7 +2807,7 @@ dependencies = [ "aptos-logger", "aptos-sdk", "aptos-transaction-emitter-lib", - "clap 3.2.17", + "clap 3.2.23", "futures", "itertools", "rand 0.7.3", @@ -2853,7 +2831,7 @@ dependencies = [ "aptos-rest-client", "aptos-sdk", "async-trait", - "clap 3.2.17", + "clap 3.2.23", "futures", "itertools", "move-binary-format", @@ -2883,7 +2861,7 @@ dependencies = [ "aptos-vm", "aptos-vm-genesis", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", - "clap 3.2.17", + "clap 3.2.23", "datatest-stable", "hex", "move-binary-format", @@ -3040,7 +3018,7 @@ dependencies = [ "aptos-vm", "aptos-vm-genesis", "bcs 0.1.4 (git+https://github.com/aptos-labs/bcs.git?rev=d31fab9d81748e2594be5cd5cdf845786a30562d)", - "clap 3.2.17", + "clap 3.2.23", "glob", "move-binary-format", "move-core-types", @@ -3988,9 +3966,9 @@ dependencies = [ [[package]] name = "clap" -version = "3.2.17" +version = "3.2.23" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29e724a68d9319343bb3328c9cc2dfde263f4b3142ee1059a9980580171c954b" +checksum = "71655c45cb9845d3270c9d6df84ebe72b4dad3c2ba3f7023ad47c144e4e473a5" dependencies = [ "atty", "bitflags", @@ -4000,7 +3978,7 @@ dependencies = [ "once_cell", "strsim 0.10.0", "termcolor", - "textwrap 0.15.0", + "textwrap 0.16.0", ] [[package]] @@ -4009,14 +3987,14 @@ version = "3.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e4179da71abd56c26b54dd0c248cc081c1f43b0a1a7e8448e28e57a29baa993d" dependencies = [ - "clap 3.2.17", + "clap 3.2.23", ] [[package]] name = "clap_derive" -version = "3.2.17" +version = "3.2.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "13547f7012c01ab4a0e8f8967730ada8f9fdf419e8b6c792788f39cf4e46eefa" +checksum = "ea0c8bce528c4be4da13ea6fead8965e95b6073585a2f05204bd8f4119f82a65" dependencies = [ "heck 0.4.0", "proc-macro-error", @@ -6483,7 +6461,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "79529479c298f5af41375b0c1a77ef670d450b4c9cd7949d2b43af08121b20ec" dependencies = [ - "clap 3.2.17", + "clap 3.2.23", "termcolor", "threadpool", ] @@ -6517,7 +6495,7 @@ name = "listener" version = "0.1.0" dependencies = [ "bytes 1.2.1", - "clap 3.2.17", + "clap 3.2.23", "tokio", ] @@ -6853,7 +6831,7 @@ version = "0.1.0" source = "git+https://github.com/move-language/move?rev=6bf0970221ecc0b27454d574a6cc89e8fa175cc7#6bf0970221ecc0b27454d574a6cc89e8fa175cc7" dependencies = [ "anyhow", - "clap 3.2.17", + "clap 3.2.23", "crossterm 0.21.0", "move-binary-format", "move-bytecode-source-map", @@ -6871,7 +6849,7 @@ source = "git+https://github.com/move-language/move?rev=6bf0970221ecc0b27454d574 dependencies = [ "anyhow", "bcs 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "clap 3.2.17", + "clap 3.2.23", "codespan-reporting", "colored", "difference", @@ -6934,7 +6912,7 @@ source = "git+https://github.com/move-language/move?rev=6bf0970221ecc0b27454d574 dependencies = [ "anyhow", "bcs 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "clap 3.2.17", + "clap 3.2.23", "codespan-reporting", "difference", "hex", @@ -6985,7 +6963,7 @@ source = "git+https://github.com/move-language/move?rev=6bf0970221ecc0b27454d574 dependencies = [ "anyhow", "bcs 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "clap 3.2.17", + "clap 3.2.23", "codespan", "colored", "move-binary-format", @@ -7004,7 +6982,7 @@ version = "0.1.0" source = "git+https://github.com/move-language/move?rev=6bf0970221ecc0b27454d574a6cc89e8fa175cc7#6bf0970221ecc0b27454d574a6cc89e8fa175cc7" dependencies = [ "anyhow", - "clap 3.2.17", + "clap 3.2.23", "colored", "move-binary-format", "move-bytecode-source-map", @@ -7055,7 +7033,7 @@ source = "git+https://github.com/move-language/move?rev=6bf0970221ecc0b27454d574 dependencies = [ "anyhow", "bcs 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "clap 3.2.17", + "clap 3.2.23", "move-binary-format", "move-bytecode-source-map", "move-bytecode-verifier", @@ -7146,7 +7124,7 @@ source = "git+https://github.com/move-language/move?rev=6bf0970221ecc0b27454d574 dependencies = [ "anyhow", "bcs 0.1.4 (registry+https://github.com/rust-lang/crates.io-index)", - "clap 3.2.17", + "clap 3.2.23", "colored", "dirs-next", "itertools", @@ -7183,7 +7161,7 @@ dependencies = [ "anyhow", "async-trait", "atty", - "clap 3.2.17", + "clap 3.2.23", "codespan", "codespan-reporting", "futures", @@ -7301,7 +7279,7 @@ source = "git+https://github.com/move-language/move?rev=6bf0970221ecc0b27454d574 dependencies = [ "anyhow", "bytecode-interpreter-crypto", - "clap 3.2.17", + "clap 3.2.23", "codespan-reporting", "itertools", "move-binary-format", @@ -7367,7 +7345,7 @@ version = "0.1.0" source = "git+https://github.com/move-language/move?rev=6bf0970221ecc0b27454d574a6cc89e8fa175cc7#6bf0970221ecc0b27454d574a6cc89e8fa175cc7" dependencies = [ "anyhow", - "clap 3.2.17", + "clap 3.2.23", "colored", "move-binary-format", "move-bytecode-source-map", @@ -7400,7 +7378,7 @@ source = "git+https://github.com/move-language/move?rev=6bf0970221ecc0b27454d574 dependencies = [ "anyhow", "better_any", - "clap 3.2.17", + "clap 3.2.23", "codespan-reporting", "colored", "itertools", @@ -9397,7 +9375,7 @@ name = "sender" version = "0.1.0" dependencies = [ "bytes 1.2.1", - "clap 3.2.17", + "clap 3.2.23", "event-listener", "quanta", "tokio", @@ -10168,6 +10146,12 @@ dependencies = [ "unicode-width", ] +[[package]] +name = "textwrap" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "222a222a5bfe1bba4a77b45ec488a741b3cb8872e5e499451fd7d0129c9c7c3d" + [[package]] name = "thiserror" version = "1.0.37" diff --git a/Cargo.toml b/Cargo.toml index cd197fac8ab85..be3072bb4c848 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -82,7 +82,6 @@ members = [ "ecosystem/indexer-grpc/indexer-grpc-cache-worker", "ecosystem/indexer-grpc/indexer-grpc-data-service", "ecosystem/indexer-grpc/indexer-grpc-file-store", - "ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier", "ecosystem/indexer-grpc/indexer-grpc-fullnode", "ecosystem/indexer-grpc/indexer-grpc-utils", "ecosystem/node-checker", @@ -227,7 +226,6 @@ aptos-indexer-grpc-data-service = { path = "ecosystem/indexer-grpc/indexer-grpc- aptos-indexer-grpc-file-store = { path = "ecosystem/indexer-grpc/indexer-grpc-file-store" } aptos-indexer-grpc-fullnode = { path = "ecosystem/indexer-grpc/indexer-grpc-fullnode" } aptos-indexer-grpc-utils = { path = "ecosystem/indexer-grpc/indexer-grpc-utils" } -aptos-indexer-grpc-file-store-data-verifier = { path = "ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier" } aptos-infallible = { path = "crates/aptos-infallible" } aptos-inspection-service = { path = "crates/inspection-service" } aptos-jellyfish-merkle = { path = "storage/jellyfish-merkle" } @@ -326,11 +324,11 @@ bytes = "1.1.0" chrono = { version = "0.4.19", features = ["clock", "serde"] } cfg-if = "1.0.0" claims = "0.7" -clap = { version = "3.2.17", features = ["derive", "env", "suggestions"] } +clap = { version = "3.2.23", features = ["derive", "env", "suggestions"] } clap_complete = "3.2.3" cloud-storage = { version = "0.11.1", features = ["global-client"] } codespan-reporting = "0.11.1" -console-subscriber = "0.1.6" +console-subscriber = "0.1.8" const_format = "0.2.26" criterion = "0.3.5" criterion-cpu-time = "0.1.0" diff --git a/crates/aptos-protos/src/pb/aptos.datastream.v1.rs b/crates/aptos-protos/src/pb/aptos.datastream.v1.rs index 15f5c7502754f..8e7b4d0d5bc20 100644 --- a/crates/aptos-protos/src/pb/aptos.datastream.v1.rs +++ b/crates/aptos-protos/src/pb/aptos.datastream.v1.rs @@ -105,7 +105,7 @@ pub mod raw_datastream_response { } /// Encoded file descriptor set for the `aptos.datastream.v1` package pub const FILE_DESCRIPTOR_SET: &[u8] = &[ - 0x0a, 0xf3, 0x15, 0x0a, 0x24, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x73, + 0x0a, 0xf0, 0x17, 0x0a, 0x24, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x76, 0x31, 0x2f, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x13, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x31, 0x1a, 0x24, @@ -157,122 +157,146 @@ pub const FILE_DESCRIPTOR_SET: &[u8] = &[ 0x61, 0x74, 0x75, 0x73, 0x48, 0x00, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x3d, 0x0a, 0x04, 0x64, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x27, 0x2e, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, - 0x76, 0x31, 0x2e, 0x52, 0x61, 0x77, 0x44, 0x61, 0x74, 0x61, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, - 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x4a, 0x91, 0x0e, 0x0a, 0x06, 0x12, - 0x04, 0x03, 0x00, 0x3d, 0x01, 0x0a, 0x4e, 0x0a, 0x01, 0x0c, 0x12, 0x03, 0x03, 0x00, 0x12, 0x32, - 0x44, 0x20, 0x43, 0x6f, 0x70, 0x79, 0x72, 0x69, 0x67, 0x68, 0x74, 0x20, 0xc2, 0xa9, 0x20, 0x41, - 0x70, 0x74, 0x6f, 0x73, 0x20, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x0a, - 0x20, 0x53, 0x50, 0x44, 0x58, 0x2d, 0x4c, 0x69, 0x63, 0x65, 0x6e, 0x73, 0x65, 0x2d, 0x49, 0x64, - 0x65, 0x6e, 0x74, 0x69, 0x66, 0x69, 0x65, 0x72, 0x3a, 0x20, 0x41, 0x70, 0x61, 0x63, 0x68, 0x65, - 0x2d, 0x32, 0x2e, 0x30, 0x0a, 0x0a, 0x08, 0x0a, 0x01, 0x02, 0x12, 0x03, 0x05, 0x00, 0x1c, 0x0a, - 0x09, 0x0a, 0x02, 0x03, 0x00, 0x12, 0x03, 0x07, 0x00, 0x2e, 0x0a, 0xfe, 0x01, 0x0a, 0x02, 0x04, - 0x00, 0x12, 0x04, 0x10, 0x00, 0x12, 0x01, 0x32, 0xf1, 0x01, 0x20, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x20, 0x64, 0x61, 0x74, 0x61, 0x20, 0x69, 0x73, 0x20, 0x74, - 0x72, 0x61, 0x6e, 0x73, 0x66, 0x65, 0x72, 0x72, 0x65, 0x64, 0x20, 0x76, 0x69, 0x61, 0x20, 0x31, - 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x20, 0x77, 0x69, 0x74, 0x68, 0x20, 0x62, 0x61, 0x74, - 0x63, 0x68, 0x65, 0x73, 0x20, 0x75, 0x6e, 0x74, 0x69, 0x6c, 0x20, 0x74, 0x65, 0x72, 0x6d, 0x69, - 0x6e, 0x61, 0x74, 0x65, 0x64, 0x2e, 0x0a, 0x20, 0x4f, 0x6e, 0x65, 0x20, 0x73, 0x74, 0x72, 0x65, - 0x61, 0x6d, 0x20, 0x63, 0x6f, 0x6e, 0x73, 0x69, 0x73, 0x74, 0x73, 0x3a, 0x0a, 0x20, 0x20, 0x53, - 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x3a, 0x20, 0x49, 0x4e, 0x49, - 0x54, 0x20, 0x77, 0x69, 0x74, 0x68, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x78, - 0x0a, 0x20, 0x20, 0x6c, 0x6f, 0x6f, 0x70, 0x20, 0x6b, 0x3a, 0x0a, 0x20, 0x20, 0x20, 0x20, 0x54, - 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, - 0x20, 0x64, 0x61, 0x74, 0x61, 0x28, 0x73, 0x69, 0x7a, 0x65, 0x20, 0x6e, 0x29, 0x0a, 0x20, 0x20, - 0x20, 0x20, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x3a, 0x20, - 0x42, 0x41, 0x54, 0x43, 0x48, 0x5f, 0x45, 0x4e, 0x44, 0x20, 0x77, 0x69, 0x74, 0x68, 0x20, 0x76, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x78, 0x20, 0x2b, 0x20, 0x28, 0x6b, 0x20, 0x2b, 0x20, - 0x31, 0x29, 0x20, 0x2a, 0x20, 0x6e, 0x20, 0x2d, 0x20, 0x31, 0x0a, 0x0a, 0x0a, 0x0a, 0x03, 0x04, - 0x00, 0x01, 0x12, 0x03, 0x10, 0x08, 0x1a, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x00, 0x02, 0x00, 0x12, - 0x03, 0x11, 0x02, 0x2f, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x04, 0x12, 0x03, 0x11, - 0x02, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x06, 0x12, 0x03, 0x11, 0x0b, 0x1c, - 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x11, 0x1d, 0x29, 0x0a, 0x0c, - 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x11, 0x2d, 0x2e, 0x0a, 0x0a, 0x0a, 0x02, - 0x04, 0x01, 0x12, 0x04, 0x14, 0x00, 0x19, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x01, 0x01, 0x12, - 0x03, 0x14, 0x08, 0x19, 0x0a, 0x3d, 0x0a, 0x04, 0x04, 0x01, 0x02, 0x00, 0x12, 0x03, 0x16, 0x02, - 0x20, 0x1a, 0x30, 0x20, 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x20, 0x61, 0x70, 0x74, 0x6f, - 0x73, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, - 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x20, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x20, 0x64, 0x61, 0x74, - 0x61, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x00, 0x05, 0x12, 0x03, 0x16, 0x02, - 0x08, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x00, 0x01, 0x12, 0x03, 0x16, 0x09, 0x1b, 0x0a, - 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x00, 0x03, 0x12, 0x03, 0x16, 0x1e, 0x1f, 0x0a, 0x0b, 0x0a, - 0x04, 0x04, 0x01, 0x02, 0x01, 0x12, 0x03, 0x17, 0x02, 0x15, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, - 0x02, 0x01, 0x05, 0x12, 0x03, 0x17, 0x02, 0x08, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x01, - 0x01, 0x12, 0x03, 0x17, 0x09, 0x10, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x01, 0x03, 0x12, - 0x03, 0x17, 0x13, 0x14, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x01, 0x02, 0x02, 0x12, 0x03, 0x18, 0x02, - 0x2f, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x02, 0x06, 0x12, 0x03, 0x18, 0x02, 0x20, 0x0a, - 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x02, 0x01, 0x12, 0x03, 0x18, 0x21, 0x2a, 0x0a, 0x0c, 0x0a, - 0x05, 0x04, 0x01, 0x02, 0x02, 0x03, 0x12, 0x03, 0x18, 0x2d, 0x2e, 0x0a, 0x0a, 0x0a, 0x02, 0x04, - 0x02, 0x12, 0x04, 0x1b, 0x00, 0x27, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x02, 0x01, 0x12, 0x03, - 0x1b, 0x08, 0x14, 0x0a, 0x0c, 0x0a, 0x04, 0x04, 0x02, 0x04, 0x00, 0x12, 0x04, 0x1c, 0x02, 0x21, - 0x03, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x04, 0x00, 0x01, 0x12, 0x03, 0x1c, 0x07, 0x11, 0x0a, - 0x34, 0x0a, 0x06, 0x04, 0x02, 0x04, 0x00, 0x02, 0x00, 0x12, 0x03, 0x1e, 0x04, 0x0d, 0x1a, 0x25, - 0x20, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x20, 0x66, 0x6f, 0x72, 0x20, 0x74, 0x68, 0x65, 0x20, - 0x73, 0x74, 0x61, 0x72, 0x74, 0x20, 0x6f, 0x66, 0x20, 0x74, 0x68, 0x65, 0x20, 0x73, 0x74, 0x72, - 0x65, 0x61, 0x6d, 0x2e, 0x0a, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x02, 0x04, 0x00, 0x02, 0x00, 0x01, - 0x12, 0x03, 0x1e, 0x04, 0x08, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x02, 0x04, 0x00, 0x02, 0x00, 0x02, - 0x12, 0x03, 0x1e, 0x0b, 0x0c, 0x0a, 0x31, 0x0a, 0x06, 0x04, 0x02, 0x04, 0x00, 0x02, 0x01, 0x12, - 0x03, 0x20, 0x04, 0x12, 0x1a, 0x22, 0x20, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x20, 0x66, 0x6f, - 0x72, 0x20, 0x74, 0x68, 0x65, 0x20, 0x65, 0x6e, 0x64, 0x20, 0x6f, 0x66, 0x20, 0x74, 0x68, 0x65, - 0x20, 0x62, 0x61, 0x74, 0x63, 0x68, 0x2e, 0x0a, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x02, 0x04, 0x00, - 0x02, 0x01, 0x01, 0x12, 0x03, 0x20, 0x04, 0x0d, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x02, 0x04, 0x00, - 0x02, 0x01, 0x02, 0x12, 0x03, 0x20, 0x10, 0x11, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x02, 0x02, 0x00, - 0x12, 0x03, 0x22, 0x02, 0x16, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x00, 0x06, 0x12, 0x03, - 0x22, 0x02, 0x0c, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x00, 0x01, 0x12, 0x03, 0x22, 0x0d, - 0x11, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x00, 0x03, 0x12, 0x03, 0x22, 0x14, 0x15, 0x0a, - 0x4a, 0x0a, 0x04, 0x04, 0x02, 0x02, 0x01, 0x12, 0x03, 0x24, 0x02, 0x1b, 0x1a, 0x3d, 0x20, 0x52, - 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x2e, 0x20, 0x53, 0x74, 0x61, 0x72, 0x74, 0x20, 0x76, - 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x6f, 0x66, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, - 0x74, 0x20, 0x62, 0x61, 0x74, 0x63, 0x68, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2c, 0x20, - 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x76, 0x65, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, - 0x02, 0x02, 0x01, 0x05, 0x12, 0x03, 0x24, 0x02, 0x08, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, - 0x01, 0x01, 0x12, 0x03, 0x24, 0x09, 0x16, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x01, 0x03, - 0x12, 0x03, 0x24, 0x19, 0x1a, 0x0a, 0x39, 0x0a, 0x04, 0x04, 0x02, 0x02, 0x02, 0x12, 0x03, 0x26, - 0x02, 0x22, 0x1a, 0x2c, 0x20, 0x45, 0x6e, 0x64, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, - 0x20, 0x6f, 0x66, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x2a, 0x62, 0x61, 0x74, - 0x63, 0x68, 0x2a, 0x2c, 0x20, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x76, 0x65, 0x2e, 0x0a, - 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x02, 0x04, 0x12, 0x03, 0x26, 0x02, 0x0a, 0x0a, 0x0c, - 0x0a, 0x05, 0x04, 0x02, 0x02, 0x02, 0x05, 0x12, 0x03, 0x26, 0x0b, 0x11, 0x0a, 0x0c, 0x0a, 0x05, - 0x04, 0x02, 0x02, 0x02, 0x01, 0x12, 0x03, 0x26, 0x12, 0x1d, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, - 0x02, 0x02, 0x03, 0x12, 0x03, 0x26, 0x20, 0x21, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x03, 0x12, 0x04, - 0x29, 0x00, 0x2c, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x03, 0x01, 0x12, 0x03, 0x29, 0x08, 0x1c, - 0x0a, 0x39, 0x0a, 0x04, 0x04, 0x03, 0x02, 0x00, 0x12, 0x03, 0x2b, 0x02, 0x1e, 0x1a, 0x2c, 0x20, - 0x52, 0x65, 0x71, 0x75, 0x69, 0x72, 0x65, 0x64, 0x3b, 0x20, 0x73, 0x74, 0x61, 0x72, 0x74, 0x20, - 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x6f, 0x66, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, - 0x6e, 0x74, 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, - 0x03, 0x02, 0x00, 0x05, 0x12, 0x03, 0x2b, 0x02, 0x08, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x03, 0x02, - 0x00, 0x01, 0x12, 0x03, 0x2b, 0x09, 0x19, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x03, 0x02, 0x00, 0x03, - 0x12, 0x03, 0x2b, 0x1c, 0x1d, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x04, 0x12, 0x04, 0x2e, 0x00, 0x39, - 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x04, 0x01, 0x12, 0x03, 0x2e, 0x08, 0x1d, 0x0a, 0x0c, 0x0a, - 0x04, 0x04, 0x04, 0x04, 0x00, 0x12, 0x04, 0x2f, 0x02, 0x32, 0x03, 0x0a, 0x0c, 0x0a, 0x05, 0x04, - 0x04, 0x04, 0x00, 0x01, 0x12, 0x03, 0x2f, 0x07, 0x14, 0x0a, 0x0d, 0x0a, 0x06, 0x04, 0x04, 0x04, - 0x00, 0x02, 0x00, 0x12, 0x03, 0x30, 0x04, 0x0f, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x04, 0x04, 0x00, - 0x02, 0x00, 0x01, 0x12, 0x03, 0x30, 0x04, 0x0a, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x04, 0x04, 0x00, - 0x02, 0x00, 0x02, 0x12, 0x03, 0x30, 0x0d, 0x0e, 0x0a, 0x0d, 0x0a, 0x06, 0x04, 0x04, 0x04, 0x00, - 0x02, 0x01, 0x12, 0x03, 0x31, 0x04, 0x0d, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x04, 0x04, 0x00, 0x02, - 0x01, 0x01, 0x12, 0x03, 0x31, 0x04, 0x08, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x04, 0x04, 0x00, 0x02, - 0x01, 0x02, 0x12, 0x03, 0x31, 0x0b, 0x0c, 0x0a, 0x0c, 0x0a, 0x04, 0x04, 0x04, 0x08, 0x00, 0x12, - 0x04, 0x33, 0x02, 0x36, 0x03, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x08, 0x00, 0x01, 0x12, 0x03, - 0x33, 0x08, 0x10, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x04, 0x02, 0x00, 0x12, 0x03, 0x34, 0x04, 0x1c, - 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x00, 0x06, 0x12, 0x03, 0x34, 0x04, 0x10, 0x0a, 0x0c, - 0x0a, 0x05, 0x04, 0x04, 0x02, 0x00, 0x01, 0x12, 0x03, 0x34, 0x11, 0x17, 0x0a, 0x0c, 0x0a, 0x05, - 0x04, 0x04, 0x02, 0x00, 0x03, 0x12, 0x03, 0x34, 0x1a, 0x1b, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x04, - 0x02, 0x01, 0x12, 0x03, 0x35, 0x04, 0x20, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x01, 0x06, - 0x12, 0x03, 0x35, 0x04, 0x16, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x01, 0x01, 0x12, 0x03, - 0x35, 0x17, 0x1b, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x01, 0x03, 0x12, 0x03, 0x35, 0x1e, - 0x1f, 0x0a, 0x44, 0x0a, 0x04, 0x04, 0x04, 0x02, 0x02, 0x12, 0x03, 0x38, 0x02, 0x16, 0x1a, 0x37, - 0x20, 0x4d, 0x61, 0x6b, 0x69, 0x6e, 0x67, 0x20, 0x73, 0x75, 0x72, 0x65, 0x20, 0x74, 0x68, 0x61, - 0x74, 0x20, 0x61, 0x6c, 0x6c, 0x20, 0x74, 0x68, 0x65, 0x20, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, - 0x73, 0x65, 0x73, 0x20, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x20, 0x61, 0x20, 0x63, 0x68, - 0x61, 0x69, 0x6e, 0x20, 0x69, 0x64, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x02, 0x05, - 0x12, 0x03, 0x38, 0x02, 0x08, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x02, 0x01, 0x12, 0x03, - 0x38, 0x09, 0x11, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x02, 0x03, 0x12, 0x03, 0x38, 0x14, - 0x15, 0x0a, 0x0a, 0x0a, 0x02, 0x06, 0x00, 0x12, 0x04, 0x3b, 0x00, 0x3d, 0x01, 0x0a, 0x0a, 0x0a, - 0x03, 0x06, 0x00, 0x01, 0x12, 0x03, 0x3b, 0x08, 0x15, 0x0a, 0x0b, 0x0a, 0x04, 0x06, 0x00, 0x02, - 0x00, 0x12, 0x03, 0x3c, 0x04, 0x53, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x01, 0x12, - 0x03, 0x3c, 0x08, 0x15, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x02, 0x12, 0x03, 0x3c, - 0x16, 0x2a, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x06, 0x12, 0x03, 0x3c, 0x35, 0x3b, - 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x3c, 0x3c, 0x51, 0x62, 0x06, - 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x76, 0x31, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x4f, + 0x75, 0x74, 0x70, 0x75, 0x74, 0x48, 0x00, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x19, 0x0a, + 0x08, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x5f, 0x69, 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x0d, 0x52, + 0x07, 0x63, 0x68, 0x61, 0x69, 0x6e, 0x49, 0x64, 0x22, 0x25, 0x0a, 0x0d, 0x72, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x5f, 0x74, 0x79, 0x70, 0x65, 0x12, 0x0a, 0x0a, 0x06, 0x53, 0x54, 0x41, + 0x54, 0x55, 0x53, 0x10, 0x00, 0x12, 0x08, 0x0a, 0x04, 0x44, 0x41, 0x54, 0x41, 0x10, 0x01, 0x42, + 0x0a, 0x0a, 0x08, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x32, 0x79, 0x0a, 0x0d, 0x49, + 0x6e, 0x64, 0x65, 0x78, 0x65, 0x72, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x68, 0x0a, 0x0d, + 0x52, 0x61, 0x77, 0x44, 0x61, 0x74, 0x61, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x29, 0x2e, + 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x2e, 0x76, 0x31, 0x2e, 0x52, 0x61, 0x77, 0x44, 0x61, 0x74, 0x61, 0x73, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2a, 0x2e, 0x61, 0x70, 0x74, 0x6f, 0x73, + 0x2e, 0x64, 0x61, 0x74, 0x61, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x76, 0x31, 0x2e, 0x52, + 0x61, 0x77, 0x44, 0x61, 0x74, 0x61, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x52, 0x65, 0x73, 0x70, + 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x4a, 0xc2, 0x0f, 0x0a, 0x06, 0x12, 0x04, 0x03, 0x00, 0x41, + 0x01, 0x0a, 0x4e, 0x0a, 0x01, 0x0c, 0x12, 0x03, 0x03, 0x00, 0x12, 0x32, 0x44, 0x20, 0x43, 0x6f, + 0x70, 0x79, 0x72, 0x69, 0x67, 0x68, 0x74, 0x20, 0xc2, 0xa9, 0x20, 0x41, 0x70, 0x74, 0x6f, 0x73, + 0x20, 0x46, 0x6f, 0x75, 0x6e, 0x64, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x0a, 0x20, 0x53, 0x50, 0x44, + 0x58, 0x2d, 0x4c, 0x69, 0x63, 0x65, 0x6e, 0x73, 0x65, 0x2d, 0x49, 0x64, 0x65, 0x6e, 0x74, 0x69, + 0x66, 0x69, 0x65, 0x72, 0x3a, 0x20, 0x41, 0x70, 0x61, 0x63, 0x68, 0x65, 0x2d, 0x32, 0x2e, 0x30, + 0x0a, 0x0a, 0x08, 0x0a, 0x01, 0x02, 0x12, 0x03, 0x05, 0x00, 0x1c, 0x0a, 0x09, 0x0a, 0x02, 0x03, + 0x00, 0x12, 0x03, 0x07, 0x00, 0x2e, 0x0a, 0xfe, 0x01, 0x0a, 0x02, 0x04, 0x00, 0x12, 0x04, 0x10, + 0x00, 0x12, 0x01, 0x32, 0xf1, 0x01, 0x20, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x20, 0x64, 0x61, 0x74, 0x61, 0x20, 0x69, 0x73, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, + 0x66, 0x65, 0x72, 0x72, 0x65, 0x64, 0x20, 0x76, 0x69, 0x61, 0x20, 0x31, 0x20, 0x73, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x20, 0x77, 0x69, 0x74, 0x68, 0x20, 0x62, 0x61, 0x74, 0x63, 0x68, 0x65, 0x73, + 0x20, 0x75, 0x6e, 0x74, 0x69, 0x6c, 0x20, 0x74, 0x65, 0x72, 0x6d, 0x69, 0x6e, 0x61, 0x74, 0x65, + 0x64, 0x2e, 0x0a, 0x20, 0x4f, 0x6e, 0x65, 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x20, 0x63, + 0x6f, 0x6e, 0x73, 0x69, 0x73, 0x74, 0x73, 0x3a, 0x0a, 0x20, 0x20, 0x53, 0x74, 0x72, 0x65, 0x61, + 0x6d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x3a, 0x20, 0x49, 0x4e, 0x49, 0x54, 0x20, 0x77, 0x69, + 0x74, 0x68, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x78, 0x0a, 0x20, 0x20, 0x6c, + 0x6f, 0x6f, 0x70, 0x20, 0x6b, 0x3a, 0x0a, 0x20, 0x20, 0x20, 0x20, 0x54, 0x72, 0x61, 0x6e, 0x73, + 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x4f, 0x75, 0x74, 0x70, 0x75, 0x74, 0x20, 0x64, 0x61, 0x74, + 0x61, 0x28, 0x73, 0x69, 0x7a, 0x65, 0x20, 0x6e, 0x29, 0x0a, 0x20, 0x20, 0x20, 0x20, 0x53, 0x74, + 0x72, 0x65, 0x61, 0x6d, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x3a, 0x20, 0x42, 0x41, 0x54, 0x43, + 0x48, 0x5f, 0x45, 0x4e, 0x44, 0x20, 0x77, 0x69, 0x74, 0x68, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x20, 0x78, 0x20, 0x2b, 0x20, 0x28, 0x6b, 0x20, 0x2b, 0x20, 0x31, 0x29, 0x20, 0x2a, + 0x20, 0x6e, 0x20, 0x2d, 0x20, 0x31, 0x0a, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x00, 0x01, 0x12, 0x03, + 0x10, 0x08, 0x1a, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x00, 0x02, 0x00, 0x12, 0x03, 0x11, 0x02, 0x2f, + 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x04, 0x12, 0x03, 0x11, 0x02, 0x0a, 0x0a, 0x0c, + 0x0a, 0x05, 0x04, 0x00, 0x02, 0x00, 0x06, 0x12, 0x03, 0x11, 0x0b, 0x1c, 0x0a, 0x0c, 0x0a, 0x05, + 0x04, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x11, 0x1d, 0x29, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x00, + 0x02, 0x00, 0x03, 0x12, 0x03, 0x11, 0x2d, 0x2e, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x01, 0x12, 0x04, + 0x14, 0x00, 0x19, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x01, 0x01, 0x12, 0x03, 0x14, 0x08, 0x19, + 0x0a, 0x3d, 0x0a, 0x04, 0x04, 0x01, 0x02, 0x00, 0x12, 0x03, 0x16, 0x02, 0x20, 0x1a, 0x30, 0x20, + 0x45, 0x6e, 0x63, 0x6f, 0x64, 0x65, 0x64, 0x20, 0x61, 0x70, 0x74, 0x6f, 0x73, 0x2e, 0x70, 0x72, + 0x6f, 0x74, 0x6f, 0x2e, 0x76, 0x31, 0x2e, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, + 0x6f, 0x6e, 0x20, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x20, 0x64, 0x61, 0x74, 0x61, 0x2e, 0x0a, 0x0a, + 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x00, 0x05, 0x12, 0x03, 0x16, 0x02, 0x08, 0x0a, 0x0c, 0x0a, + 0x05, 0x04, 0x01, 0x02, 0x00, 0x01, 0x12, 0x03, 0x16, 0x09, 0x1b, 0x0a, 0x0c, 0x0a, 0x05, 0x04, + 0x01, 0x02, 0x00, 0x03, 0x12, 0x03, 0x16, 0x1e, 0x1f, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x01, 0x02, + 0x01, 0x12, 0x03, 0x17, 0x02, 0x15, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x01, 0x05, 0x12, + 0x03, 0x17, 0x02, 0x08, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x01, 0x01, 0x12, 0x03, 0x17, + 0x09, 0x10, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, 0x01, 0x03, 0x12, 0x03, 0x17, 0x13, 0x14, + 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x01, 0x02, 0x02, 0x12, 0x03, 0x18, 0x02, 0x2f, 0x0a, 0x0c, 0x0a, + 0x05, 0x04, 0x01, 0x02, 0x02, 0x06, 0x12, 0x03, 0x18, 0x02, 0x20, 0x0a, 0x0c, 0x0a, 0x05, 0x04, + 0x01, 0x02, 0x02, 0x01, 0x12, 0x03, 0x18, 0x21, 0x2a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x01, 0x02, + 0x02, 0x03, 0x12, 0x03, 0x18, 0x2d, 0x2e, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x02, 0x12, 0x04, 0x1b, + 0x00, 0x27, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x02, 0x01, 0x12, 0x03, 0x1b, 0x08, 0x14, 0x0a, + 0x0c, 0x0a, 0x04, 0x04, 0x02, 0x04, 0x00, 0x12, 0x04, 0x1c, 0x02, 0x21, 0x03, 0x0a, 0x0c, 0x0a, + 0x05, 0x04, 0x02, 0x04, 0x00, 0x01, 0x12, 0x03, 0x1c, 0x07, 0x11, 0x0a, 0x34, 0x0a, 0x06, 0x04, + 0x02, 0x04, 0x00, 0x02, 0x00, 0x12, 0x03, 0x1e, 0x04, 0x0d, 0x1a, 0x25, 0x20, 0x53, 0x69, 0x67, + 0x6e, 0x61, 0x6c, 0x20, 0x66, 0x6f, 0x72, 0x20, 0x74, 0x68, 0x65, 0x20, 0x73, 0x74, 0x61, 0x72, + 0x74, 0x20, 0x6f, 0x66, 0x20, 0x74, 0x68, 0x65, 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, + 0x0a, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x02, 0x04, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x1e, 0x04, + 0x08, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x02, 0x04, 0x00, 0x02, 0x00, 0x02, 0x12, 0x03, 0x1e, 0x0b, + 0x0c, 0x0a, 0x31, 0x0a, 0x06, 0x04, 0x02, 0x04, 0x00, 0x02, 0x01, 0x12, 0x03, 0x20, 0x04, 0x12, + 0x1a, 0x22, 0x20, 0x53, 0x69, 0x67, 0x6e, 0x61, 0x6c, 0x20, 0x66, 0x6f, 0x72, 0x20, 0x74, 0x68, + 0x65, 0x20, 0x65, 0x6e, 0x64, 0x20, 0x6f, 0x66, 0x20, 0x74, 0x68, 0x65, 0x20, 0x62, 0x61, 0x74, + 0x63, 0x68, 0x2e, 0x0a, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x02, 0x04, 0x00, 0x02, 0x01, 0x01, 0x12, + 0x03, 0x20, 0x04, 0x0d, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x02, 0x04, 0x00, 0x02, 0x01, 0x02, 0x12, + 0x03, 0x20, 0x10, 0x11, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x02, 0x02, 0x00, 0x12, 0x03, 0x22, 0x02, + 0x16, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x00, 0x06, 0x12, 0x03, 0x22, 0x02, 0x0c, 0x0a, + 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x00, 0x01, 0x12, 0x03, 0x22, 0x0d, 0x11, 0x0a, 0x0c, 0x0a, + 0x05, 0x04, 0x02, 0x02, 0x00, 0x03, 0x12, 0x03, 0x22, 0x14, 0x15, 0x0a, 0x4a, 0x0a, 0x04, 0x04, + 0x02, 0x02, 0x01, 0x12, 0x03, 0x24, 0x02, 0x1b, 0x1a, 0x3d, 0x20, 0x52, 0x65, 0x71, 0x75, 0x69, + 0x72, 0x65, 0x64, 0x2e, 0x20, 0x53, 0x74, 0x61, 0x72, 0x74, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, + 0x6f, 0x6e, 0x20, 0x6f, 0x66, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x62, 0x61, + 0x74, 0x63, 0x68, 0x2f, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2c, 0x20, 0x69, 0x6e, 0x63, 0x6c, + 0x75, 0x73, 0x69, 0x76, 0x65, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x01, 0x05, + 0x12, 0x03, 0x24, 0x02, 0x08, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x01, 0x01, 0x12, 0x03, + 0x24, 0x09, 0x16, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x01, 0x03, 0x12, 0x03, 0x24, 0x19, + 0x1a, 0x0a, 0x39, 0x0a, 0x04, 0x04, 0x02, 0x02, 0x02, 0x12, 0x03, 0x26, 0x02, 0x22, 0x1a, 0x2c, + 0x20, 0x45, 0x6e, 0x64, 0x20, 0x76, 0x65, 0x72, 0x73, 0x69, 0x6f, 0x6e, 0x20, 0x6f, 0x66, 0x20, + 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x2a, 0x62, 0x61, 0x74, 0x63, 0x68, 0x2a, 0x2c, + 0x20, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x73, 0x69, 0x76, 0x65, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, + 0x04, 0x02, 0x02, 0x02, 0x04, 0x12, 0x03, 0x26, 0x02, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, + 0x02, 0x02, 0x05, 0x12, 0x03, 0x26, 0x0b, 0x11, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x02, + 0x01, 0x12, 0x03, 0x26, 0x12, 0x1d, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x02, 0x02, 0x02, 0x03, 0x12, + 0x03, 0x26, 0x20, 0x21, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x03, 0x12, 0x04, 0x29, 0x00, 0x30, 0x01, + 0x0a, 0x0a, 0x0a, 0x03, 0x04, 0x03, 0x01, 0x12, 0x03, 0x29, 0x08, 0x1c, 0x0a, 0x39, 0x0a, 0x04, + 0x04, 0x03, 0x02, 0x00, 0x12, 0x03, 0x2b, 0x02, 0x1e, 0x1a, 0x2c, 0x20, 0x52, 0x65, 0x71, 0x75, + 0x69, 0x72, 0x65, 0x64, 0x3b, 0x20, 0x73, 0x74, 0x61, 0x72, 0x74, 0x20, 0x76, 0x65, 0x72, 0x73, + 0x69, 0x6f, 0x6e, 0x20, 0x6f, 0x66, 0x20, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x03, 0x02, 0x00, 0x05, + 0x12, 0x03, 0x2b, 0x02, 0x08, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x03, 0x02, 0x00, 0x01, 0x12, 0x03, + 0x2b, 0x09, 0x19, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x03, 0x02, 0x00, 0x03, 0x12, 0x03, 0x2b, 0x1c, + 0x1d, 0x0a, 0x77, 0x0a, 0x04, 0x04, 0x03, 0x02, 0x01, 0x12, 0x03, 0x2f, 0x02, 0x29, 0x1a, 0x6a, + 0x20, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x3b, 0x20, 0x6e, 0x75, 0x6d, 0x62, 0x65, + 0x72, 0x20, 0x6f, 0x66, 0x20, 0x74, 0x72, 0x61, 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, + 0x73, 0x20, 0x74, 0x6f, 0x20, 0x72, 0x65, 0x74, 0x75, 0x72, 0x6e, 0x20, 0x69, 0x6e, 0x20, 0x63, + 0x75, 0x72, 0x72, 0x65, 0x6e, 0x74, 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x20, 0x0a, + 0x20, 0x49, 0x66, 0x20, 0x6e, 0x6f, 0x74, 0x20, 0x73, 0x65, 0x74, 0x2c, 0x20, 0x72, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x20, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x73, 0x20, 0x69, 0x6e, + 0x66, 0x69, 0x6e, 0x69, 0x74, 0x65, 0x6c, 0x79, 0x2e, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x03, + 0x02, 0x01, 0x04, 0x12, 0x03, 0x2f, 0x02, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x03, 0x02, 0x01, + 0x05, 0x12, 0x03, 0x2f, 0x0b, 0x11, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x03, 0x02, 0x01, 0x01, 0x12, + 0x03, 0x2f, 0x12, 0x24, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x03, 0x02, 0x01, 0x03, 0x12, 0x03, 0x2f, + 0x27, 0x28, 0x0a, 0x0a, 0x0a, 0x02, 0x04, 0x04, 0x12, 0x04, 0x32, 0x00, 0x3d, 0x01, 0x0a, 0x0a, + 0x0a, 0x03, 0x04, 0x04, 0x01, 0x12, 0x03, 0x32, 0x08, 0x1d, 0x0a, 0x0c, 0x0a, 0x04, 0x04, 0x04, + 0x04, 0x00, 0x12, 0x04, 0x33, 0x02, 0x36, 0x03, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x04, 0x00, + 0x01, 0x12, 0x03, 0x33, 0x07, 0x14, 0x0a, 0x0d, 0x0a, 0x06, 0x04, 0x04, 0x04, 0x00, 0x02, 0x00, + 0x12, 0x03, 0x34, 0x04, 0x0f, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x04, 0x04, 0x00, 0x02, 0x00, 0x01, + 0x12, 0x03, 0x34, 0x04, 0x0a, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x04, 0x04, 0x00, 0x02, 0x00, 0x02, + 0x12, 0x03, 0x34, 0x0d, 0x0e, 0x0a, 0x0d, 0x0a, 0x06, 0x04, 0x04, 0x04, 0x00, 0x02, 0x01, 0x12, + 0x03, 0x35, 0x04, 0x0d, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x04, 0x04, 0x00, 0x02, 0x01, 0x01, 0x12, + 0x03, 0x35, 0x04, 0x08, 0x0a, 0x0e, 0x0a, 0x07, 0x04, 0x04, 0x04, 0x00, 0x02, 0x01, 0x02, 0x12, + 0x03, 0x35, 0x0b, 0x0c, 0x0a, 0x0c, 0x0a, 0x04, 0x04, 0x04, 0x08, 0x00, 0x12, 0x04, 0x37, 0x02, + 0x3a, 0x03, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x08, 0x00, 0x01, 0x12, 0x03, 0x37, 0x08, 0x10, + 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x04, 0x02, 0x00, 0x12, 0x03, 0x38, 0x04, 0x1c, 0x0a, 0x0c, 0x0a, + 0x05, 0x04, 0x04, 0x02, 0x00, 0x06, 0x12, 0x03, 0x38, 0x04, 0x10, 0x0a, 0x0c, 0x0a, 0x05, 0x04, + 0x04, 0x02, 0x00, 0x01, 0x12, 0x03, 0x38, 0x11, 0x17, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, + 0x00, 0x03, 0x12, 0x03, 0x38, 0x1a, 0x1b, 0x0a, 0x0b, 0x0a, 0x04, 0x04, 0x04, 0x02, 0x01, 0x12, + 0x03, 0x39, 0x04, 0x20, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x01, 0x06, 0x12, 0x03, 0x39, + 0x04, 0x16, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x01, 0x01, 0x12, 0x03, 0x39, 0x17, 0x1b, + 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x01, 0x03, 0x12, 0x03, 0x39, 0x1e, 0x1f, 0x0a, 0x44, + 0x0a, 0x04, 0x04, 0x04, 0x02, 0x02, 0x12, 0x03, 0x3c, 0x02, 0x16, 0x1a, 0x37, 0x20, 0x4d, 0x61, + 0x6b, 0x69, 0x6e, 0x67, 0x20, 0x73, 0x75, 0x72, 0x65, 0x20, 0x74, 0x68, 0x61, 0x74, 0x20, 0x61, + 0x6c, 0x6c, 0x20, 0x74, 0x68, 0x65, 0x20, 0x72, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x73, + 0x20, 0x69, 0x6e, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x20, 0x61, 0x20, 0x63, 0x68, 0x61, 0x69, 0x6e, + 0x20, 0x69, 0x64, 0x0a, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x02, 0x05, 0x12, 0x03, 0x3c, + 0x02, 0x08, 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x02, 0x01, 0x12, 0x03, 0x3c, 0x09, 0x11, + 0x0a, 0x0c, 0x0a, 0x05, 0x04, 0x04, 0x02, 0x02, 0x03, 0x12, 0x03, 0x3c, 0x14, 0x15, 0x0a, 0x0a, + 0x0a, 0x02, 0x06, 0x00, 0x12, 0x04, 0x3f, 0x00, 0x41, 0x01, 0x0a, 0x0a, 0x0a, 0x03, 0x06, 0x00, + 0x01, 0x12, 0x03, 0x3f, 0x08, 0x15, 0x0a, 0x0b, 0x0a, 0x04, 0x06, 0x00, 0x02, 0x00, 0x12, 0x03, + 0x40, 0x04, 0x53, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x01, 0x12, 0x03, 0x40, 0x08, + 0x15, 0x0a, 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x02, 0x12, 0x03, 0x40, 0x16, 0x2a, 0x0a, + 0x0c, 0x0a, 0x05, 0x06, 0x00, 0x02, 0x00, 0x06, 0x12, 0x03, 0x40, 0x35, 0x3b, 0x0a, 0x0c, 0x0a, + 0x05, 0x06, 0x00, 0x02, 0x00, 0x03, 0x12, 0x03, 0x40, 0x3c, 0x51, 0x62, 0x06, 0x70, 0x72, 0x6f, + 0x74, 0x6f, 0x33, ]; include!("aptos.datastream.v1.serde.rs"); include!("aptos.datastream.v1.tonic.rs"); diff --git a/crates/aptos-protos/src/pb/aptos.transaction.v1.serde.rs b/crates/aptos-protos/src/pb/aptos.transaction.v1.serde.rs index 3c87e909b8c20..49cc30d80c581 100644 --- a/crates/aptos-protos/src/pb/aptos.transaction.v1.serde.rs +++ b/crates/aptos-protos/src/pb/aptos.transaction.v1.serde.rs @@ -4136,7 +4136,9 @@ impl<'de> serde::Deserialize<'de> for MultisigPayload { D: serde::Deserializer<'de>, { const FIELDS: &[&str] = &[ + "multisig_address", "multisigAddress", + "transaction_payload", "transactionPayload", ]; @@ -4165,8 +4167,8 @@ impl<'de> serde::Deserialize<'de> for MultisigPayload { E: serde::de::Error, { match value { - "multisigAddress" => Ok(GeneratedField::MultisigAddress), - "transactionPayload" => Ok(GeneratedField::TransactionPayload), + "multisigAddress" | "multisig_address" => Ok(GeneratedField::MultisigAddress), + "transactionPayload" | "transaction_payload" => Ok(GeneratedField::TransactionPayload), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4200,7 +4202,7 @@ impl<'de> serde::Deserialize<'de> for MultisigPayload { if transaction_payload__.is_some() { return Err(serde::de::Error::duplicate_field("transactionPayload")); } - transaction_payload__ = Some(map.next_value()?); + transaction_payload__ = map.next_value()?; } } } @@ -4251,6 +4253,7 @@ impl<'de> serde::Deserialize<'de> for MultisigTransactionPayload { { const FIELDS: &[&str] = &[ "type", + "entry_function_payload", "entryFunctionPayload", ]; @@ -4280,7 +4283,7 @@ impl<'de> serde::Deserialize<'de> for MultisigTransactionPayload { { match value { "type" => Ok(GeneratedField::Type), - "entryFunctionPayload" => Ok(GeneratedField::EntryFunctionPayload), + "entryFunctionPayload" | "entry_function_payload" => Ok(GeneratedField::EntryFunctionPayload), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -4314,7 +4317,8 @@ impl<'de> serde::Deserialize<'de> for MultisigTransactionPayload { if payload__.is_some() { return Err(serde::de::Error::duplicate_field("entryFunctionPayload")); } - payload__ = Some(multisig_transaction_payload::Payload::EntryFunctionPayload(map.next_value()?)); + payload__ = map.next_value::<::std::option::Option<_>>()?.map(multisig_transaction_payload::Payload::EntryFunctionPayload) +; } } } @@ -5552,6 +5556,7 @@ impl<'de> serde::Deserialize<'de> for TransactionPayload { "moduleBundlePayload", "write_set_payload", "writeSetPayload", + "multisig_payload", "multisigPayload", ]; @@ -5585,11 +5590,11 @@ impl<'de> serde::Deserialize<'de> for TransactionPayload { { match value { "type" => Ok(GeneratedField::Type), - "entryFunctionPayload" => Ok(GeneratedField::EntryFunctionPayload), - "scriptPayload" => Ok(GeneratedField::ScriptPayload), - "moduleBundlePayload" => Ok(GeneratedField::ModuleBundlePayload), - "writeSetPayload" => Ok(GeneratedField::WriteSetPayload), - "multisigPayload" => Ok(GeneratedField::MultisigPayload), + "entryFunctionPayload" | "entry_function_payload" => Ok(GeneratedField::EntryFunctionPayload), + "scriptPayload" | "script_payload" => Ok(GeneratedField::ScriptPayload), + "moduleBundlePayload" | "module_bundle_payload" => Ok(GeneratedField::ModuleBundlePayload), + "writeSetPayload" | "write_set_payload" => Ok(GeneratedField::WriteSetPayload), + "multisigPayload" | "multisig_payload" => Ok(GeneratedField::MultisigPayload), _ => Err(serde::de::Error::unknown_field(value, FIELDS)), } } @@ -5651,7 +5656,8 @@ impl<'de> serde::Deserialize<'de> for TransactionPayload { if payload__.is_some() { return Err(serde::de::Error::duplicate_field("multisigPayload")); } - payload__ = Some(transaction_payload::Payload::MultisigPayload(map.next_value()?)); + payload__ = map.next_value::<::std::option::Option<_>>()?.map(transaction_payload::Payload::MultisigPayload) +; } } } diff --git a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs index fe2bdf8e738ca..7da4227c9e522 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-cache-worker/src/worker.rs @@ -10,15 +10,21 @@ use aptos_indexer_grpc_utils::{ use aptos_logger::{error, info}; use aptos_moving_average::MovingAverage; use aptos_protos::datastream::v1::{ - self as datastream, raw_datastream_response::Response, RawDatastreamRequest, - RawDatastreamResponse, + self as datastream, raw_datastream_response::Response, stream_status::StatusType, + RawDatastreamRequest, RawDatastreamResponse, }; use futures::{self, StreamExt}; + +type ChainID = u32; +type StartingVersion = u64; + +const WORKER_RESTART_DELAY_IF_METADATA_NOT_FOUND_IN_SECS: u64 = 60; + pub struct Worker { /// Redis client. redis_client: redis::Client, /// Fullnode grpc address. - grpc_address: String, + fullnode_grpc_address: String, /// File store bucket name. pub file_store_bucket_name: String, } @@ -51,7 +57,7 @@ impl Worker { Self { redis_client, // The fullnode grpc address is required. - grpc_address: format!("http://{}", config.fullnode_grpc_address.unwrap()), + fullnode_grpc_address: format!("http://{}", config.fullnode_grpc_address.unwrap()), file_store_bucket_name: config.file_store_bucket_name, } } @@ -73,31 +79,31 @@ impl Worker { .await .expect("Get redis connection failed."); - let mut rpc_client = create_grpc_client(self.grpc_address.clone()).await; + let mut rpc_client = create_grpc_client(self.fullnode_grpc_address.clone()).await; // 1. Fetch metadata. let file_store_operator = FileStoreOperator::new(self.file_store_bucket_name.clone()); - file_store_operator.bootstrap().await; - + file_store_operator.verify_storage_bucket_existence().await; + let mut starting_version = 0; let file_store_metadata = file_store_operator.get_file_store_metadata().await; - if file_store_metadata.is_none() { + + if let Some(metadata) = file_store_metadata { + info!("[Indexer Cache] File store metadata: {:?}", metadata); + starting_version = metadata.version; + } else { error!("[Indexer Cache] File store is empty. Exit after 1 minute."); tokio::spawn(async move { - // Wait for 1 minute and then exit. - tokio::time::sleep(std::time::Duration::from_secs(60)).await; + tokio::time::sleep(std::time::Duration::from_secs( + WORKER_RESTART_DELAY_IF_METADATA_NOT_FOUND_IN_SECS, + )) + .await; std::process::exit(1); }); - }; + } // 2. Start streaming RPC. - let current_version = match file_store_metadata { - Some(metadata) => metadata.version, - // File store is empty. - None => 0, - }; let request = tonic::Request::new(RawDatastreamRequest { - starting_version: current_version, - // Worker fetches all transactions without end. + starting_version, ..Default::default() }); @@ -109,49 +115,93 @@ impl Worker { } } -/// Handles the INIT frame from RawDatastreamResponse: check cache and file store to start from correct state. -/// Returns a cache operator if it's ok; otherwise, error out. -async fn init_signal_handling( - start_version: &mut u64, - chain_id: &mut u64, - file_store_metadata: Option, +async fn process_raw_datastream_response( + response: RawDatastreamResponse, + cache_operator: &mut CacheOperator, +) -> anyhow::Result { + match response.response.unwrap() { + datastream::raw_datastream_response::Response::Status(status) => { + match StatusType::from_i32(status.r#type).expect("[Indexer Cache] Invalid status type.") + { + StatusType::Init => Ok(GrpcDataStatus::StreamInit(status.start_version)), + StatusType::BatchEnd => { + let start_version = status.start_version; + let num_of_transactions = status + .end_version + .expect("RawDatastreamResponse status end_version is None") + - start_version + + 1; + Ok(GrpcDataStatus::BatchEnd { + start_version, + num_of_transactions, + }) + }, + } + }, + datastream::raw_datastream_response::Response::Data(data) => { + let transaction_len = data.transactions.len(); + let start_version = data.transactions.first().unwrap().version; + + for e in data.transactions { + let version = e.version; + let timestamp_in_seconds = match e.timestamp { + Some(t) => t.seconds, + // For Genesis block, there is no timestamp. + None => 0, + }; + // Push to cache. + match cache_operator + .update_cache_transaction( + version, + e.encoded_proto_data, + timestamp_in_seconds as u64, + ) + .await + { + Ok(_) => {}, + Err(e) => { + anyhow::bail!("Update cache with version failed: {}", e); + }, + } + } + Ok(GrpcDataStatus::ChunkDataOk { + start_version, + num_of_transactions: transaction_len as u64, + }) + }, + } +} + +/// Setup the cache operator with init signal, includeing chain id and starting version from fullnode. +async fn setup_cache_with_init_signal( conn: redis::aio::Connection, init_signal: RawDatastreamResponse, -) -> CacheOperator { - let fullnode_chain_id = match init_signal.response.expect("Response type not exists.") { - Response::Status(status_frame) => match status_frame.r#type { - 0 => init_signal.chain_id as u64, +) -> ( + CacheOperator, + ChainID, + StartingVersion, +) { + let (fullnode_chain_id, starting_version) = + match init_signal.response.expect("Response type not exists.") { + Response::Status(status_frame) => match status_frame.r#type { + 0 => (init_signal.chain_id, status_frame.start_version), + _ => { + panic!("[Indexer Cache] Streaming error: first frame is not INIT signal."); + }, + }, _ => { - panic!("[Indexer Cache] Streaming error: first frame is not INIT signal."); + panic!("[Indexer Cache] Streaming error: first frame is not siganl frame."); }, - }, - _ => { - panic!("[Indexer Cache] Streaming error: first frame is not siganl frame."); - }, - }; + }; let mut cache_operator = CacheOperator::new(conn); cache_operator.cache_setup_if_needed().await; cache_operator - .update_or_verify_chain_id(fullnode_chain_id) + .update_or_verify_chain_id(fullnode_chain_id as u64) .await .expect("[Indexer Cache] Chain id mismatch between cache and fullnode."); - match file_store_metadata { - Some(metadata) => { - // 1. If metadata is present, start from file store version. - *start_version = metadata.version; - if metadata.chain_id != fullnode_chain_id { - panic!("[Indexer Cache] Chain id mismatch between file store and fullnode."); - } - }, - None => { - // 1. If metadata is not present, start from 0. - *start_version = 0; - }, - } - *chain_id = fullnode_chain_id; - cache_operator + (cache_operator, fullnode_chain_id, starting_version) } // Infinite streaming processing. Retry if error happens; crash if fatal. @@ -161,25 +211,27 @@ async fn process_streaming_response( mut resp_stream: impl futures_core::Stream> + std::marker::Unpin, ) { - let mut current_version = 0; - let mut current_chain_id = 0; - let mut ma = MovingAverage::new(10_000); + let mut tps_calculator = MovingAverage::new(10_000); let mut transaction_count = 0; - // 3. Handle the INIT frame from RawDatastreamResponse: + // 3. Set up the cache operator with init signal. let init_signal = match resp_stream.next().await { Some(Ok(r)) => r, _ => { panic!("[Indexer Cache] Streaming error: no response."); }, }; - let mut cache_operator = init_signal_handling( - &mut current_version, - &mut current_chain_id, - file_store_metadata, - conn, - init_signal, - ) - .await; + let (mut cache_operator, fullnode_chain_id, starting_version) = + setup_cache_with_init_signal(conn, init_signal).await; + // It's required to start the worker with the same version as file store. + if let Some(file_store_metadata) = file_store_metadata { + if file_store_metadata.version != starting_version { + panic!("[Indexer Cache] File store version mismatch with fullnode."); + } + if file_store_metadata.chain_id != fullnode_chain_id as u64 { + panic!("[Indexer Cache] Chain id mismatch between file store and fullnode."); + } + } + let mut current_version = starting_version; // 4. Process the streaming response. while let Some(received) = resp_stream.next().await { @@ -191,7 +243,7 @@ async fn process_streaming_response( }, }; - if received.chain_id as u64 != current_chain_id { + if received.chain_id as u64 != fullnode_chain_id as u64 { panic!("[Indexer Cache] Chain id mismatch happens during data streaming."); } @@ -203,7 +255,7 @@ async fn process_streaming_response( } => { current_version += num_of_transactions; transaction_count += num_of_transactions; - ma.tick_now(num_of_transactions); + tps_calculator.tick_now(num_of_transactions); aptos_logger::info!( start_version = start_version, num_of_transactions = num_of_transactions, @@ -241,8 +293,8 @@ async fn process_streaming_response( transaction_count = 0; info!( current_version = current_version, - chain_id = current_chain_id, - tps = (ma.avg() * 1000.0) as u64, + chain_id = fullnode_chain_id, + tps = (tps_calculator.avg() * 1000.0) as u64, "[Indexer Cache] Successfully process current batch." ); }, @@ -257,60 +309,3 @@ async fn process_streaming_response( } } } - -async fn process_raw_datastream_response( - response: RawDatastreamResponse, - cache_operator: &mut CacheOperator, -) -> anyhow::Result { - match response.response.unwrap() { - datastream::raw_datastream_response::Response::Status(status) => match status.r#type { - 0 => Ok(GrpcDataStatus::StreamInit(status.start_version)), - 1 => { - let start_version = status.start_version; - let num_of_transactions = status - .end_version - .expect("RawDatastreamResponse status end_version is None") - - start_version - + 1; - Ok(GrpcDataStatus::BatchEnd { - start_version, - num_of_transactions, - }) - }, - _ => { - anyhow::bail!("Unknown status type: {}", status.r#type); - }, - }, - datastream::raw_datastream_response::Response::Data(data) => { - let transaction_len = data.transactions.len(); - let start_version = data.transactions.first().unwrap().version; - - for e in data.transactions { - let version = e.version; - let timestamp_in_seconds = match e.timestamp { - Some(t) => t.seconds, - // For Genesis block, there is no timestamp. - None => 0, - }; - // Push to cache. - match cache_operator - .update_cache_transaction( - version, - e.encoded_proto_data, - timestamp_in_seconds as u64, - ) - .await - { - Ok(_) => {}, - Err(e) => { - anyhow::bail!("Update cache with version failed: {}", e); - }, - } - } - Ok(GrpcDataStatus::ChunkDataOk { - start_version, - num_of_transactions: transaction_len as u64, - }) - }, - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs index 16279bebc68b5..a190c37df5e11 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-data-service/src/service.rs @@ -2,12 +2,13 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_indexer_grpc_utils::{ + build_protobuf_encoded_transaction_wrappers, cache_operator::{CacheBatchGetStatus, CacheOperator}, config::IndexerGrpcConfig, - constants::BLOB_STORAGE_SIZE, file_store_operator::FileStoreOperator, + EncodedTransactionWithVersion, }; -use aptos_logger::{info, warn}; +use aptos_logger::{error, info, warn}; use aptos_moving_average::MovingAverage; use aptos_protos::datastream::v1::{ indexer_stream_server::IndexerStream, @@ -15,7 +16,7 @@ use aptos_protos::datastream::v1::{ RawDatastreamResponse, TransactionOutput, TransactionsOutput, }; use futures::Stream; -use std::{pin::Pin, sync::Arc, thread::sleep, time::Duration}; +use std::{pin::Pin, sync::Arc, time::Duration}; use tokio::sync::mpsc::{channel, error::TrySendError}; use tokio_stream::wrappers::ReceiverStream; use tonic::{Request, Response, Status}; @@ -24,11 +25,22 @@ use uuid::Uuid; type ResponseStream = Pin> + Send>>; const MOVING_AVERAGE_WINDOW_SIZE: u64 = 10_000; -const DATA_NOT_READY_SLEEP_DURATION_MS: u64 = 1000; +// When trying to fetch beyond the current head of cache, the server will retry after this duration. +const AHEAD_OF_CACHE_RETRY_SLEEP_DURATION_MS: u64 = 200; +// When error happens when fetching data from cache and file store, the server will retry after this duration. +// TODO(larry): fix all errors treated as transient errors. +const TRANSIENT_DATA_ERROR_RETRY_SLEEP_DURATION_MS: u64 = 1000; + +// TODO(larry): replace this with a exponential backoff. +// The server will not fetch more data from the cache and file store until the channel is not full. +const RESPONSE_CHANNEL_FULL_BACKOFF_DURATION_MS: u64 = 1000; +// Up to MAX_RESPONSE_CHANNEL_SIZE response can be buffered in the channel. If the channel is full, +// the server will not fetch more data from the cache and file store until the channel is not full. +const MAX_RESPONSE_CHANNEL_SIZE: usize = 40; pub struct DatastreamServer { pub redis_client: Arc, - pub config: IndexerGrpcConfig, + pub server_config: IndexerGrpcConfig, } impl DatastreamServer { @@ -38,98 +50,108 @@ impl DatastreamServer { redis::Client::open(format!("redis://{}", config.redis_address)) .expect("Create redis client failed."), ), - config, + server_config: config, } } } -// The hard limit of TPS to avoid overloading the server. -const MAX_TPS: u64 = 20_000; -// The backoff time when the channel is full, in other words, stop fetching data from the storage. -const CHANNEL_FULL_BACKOFF_IN_SECS: u64 = 1; -const STREAMING_CHANNEL_SIZE: u64 = - MAX_TPS * CHANNEL_FULL_BACKOFF_IN_SECS / BLOB_STORAGE_SIZE as u64; +/// Enum to represent the status of the data fetching overall. +enum TransactionsDataStatus { + // Data fetching is successful. + Success(Vec), + // Ahead of current head of cache. + AheadOfCache, + // Fatal error when gap detected between cache and file store. + DataGap, +} -// DatastreamServer handles the raw datastream requests from cache and file store. +/// DatastreamServer handles the raw datastream requests from cache and file store. #[tonic::async_trait] impl IndexerStream for DatastreamServer { type RawDatastreamStream = ResponseStream; + /// RawDatastream is a streaming GRPC endpoint: + /// 1. Fetches data from cache and file store. + /// 1.1. If the data is beyond the current head of cache, retry after a short sleep. + /// 1.2. If the data is not in cache, fetch the data from file store. + /// 1.3. If the data is not in file store, stream connection will break. + /// 1.4 If error happens, retry after a short sleep. + /// 2. Push data into channel to stream to the client. + /// 2.1. If the channel is full, do not fetch and retry after a short sleep. async fn raw_datastream( &self, req: Request, ) -> Result, Status> { - // Limit the TPS at 20K. This is to prevent the server from being overloaded. - let (tx, rx) = channel(STREAMING_CHANNEL_SIZE as usize); + // Response channel to stream the data to the client. + let (tx, rx) = channel(MAX_RESPONSE_CHANNEL_SIZE); let req = req.into_inner(); - // Round the version to the nearest BLOB_STORAGE_SIZE. - let mut current_version = - (req.starting_version / BLOB_STORAGE_SIZE as u64) * BLOB_STORAGE_SIZE as u64; + let mut current_version = req.starting_version; - let file_store_bucket_name = self.config.file_store_bucket_name.clone(); + let file_store_bucket_name = self.server_config.file_store_bucket_name.clone(); let redis_client = self.redis_client.clone(); + tokio::spawn(async move { - let mut ma = MovingAverage::new(MOVING_AVERAGE_WINDOW_SIZE); - let request_id = Uuid::new_v4().to_string(); let conn = redis_client.get_async_connection().await.unwrap(); let mut cache_operator = CacheOperator::new(conn); - let chain_id = cache_operator.get_chain_id().await.unwrap(); - let file_store_operator = FileStoreOperator::new(file_store_bucket_name); - file_store_operator.bootstrap().await; + file_store_operator.verify_storage_bucket_existence().await; + + let chain_id = cache_operator.get_chain_id().await.unwrap(); + // Data service metrics. + let mut tps_calculator = MovingAverage::new(MOVING_AVERAGE_WINDOW_SIZE); + // Request metadata. + let request_id = Uuid::new_v4().to_string(); + info!( + chain_id = chain_id, + request_id = request_id.as_str(), + current_version = current_version, + "[Indexer Data] New request received." + ); loop { - // Check if the receiver is closed. - if tx.is_closed() { - break; - } + // 1. Fetch data from cache and file store. + let transaction_data = + match data_fetch(current_version, &mut cache_operator, &file_store_operator) + .await + { + Ok(TransactionsDataStatus::Success(transactions)) => transactions, + Ok(TransactionsDataStatus::AheadOfCache) => { + ahead_of_cache_data_handling().await; + // Retry after a short sleep. + continue; + }, + Ok(TransactionsDataStatus::DataGap) => { + data_gap_handling(current_version); + // End the data stream. + break; + }, + Err(e) => { + data_fetch_error_handling( + e, + current_version, + chain_id, + request_id.as_str(), + ) + .await; + // Retry after a short sleep. + continue; + }, + }; - let batch_get_result = cache_operator - .batch_get_encoded_proto_data(current_version) - .await; - let encoded_proto_data_vec = match batch_get_result { - Ok(CacheBatchGetStatus::NotReady) => { - // Data is not ready yet in the cache. - sleep(Duration::from_millis(DATA_NOT_READY_SLEEP_DURATION_MS)); - continue; - }, - Ok(CacheBatchGetStatus::Ok(v)) => v, - Ok(CacheBatchGetStatus::HitTheHead(v)) => v, - Ok(CacheBatchGetStatus::EvictedFromCache) => { - // TODO: fetch from the file store. - continue; - }, - Err(e) => { - warn!( - "[Indexer Data] Failed to get cache transactions. Error: {:?}", - e - ); - sleep(Duration::from_millis(100)); - continue; - }, - }; - let current_batch_size = encoded_proto_data_vec.len() as u64; - let item = RawDatastreamResponse { - response: Some(DatastreamProtoResponse::Data(TransactionsOutput { - transactions: encoded_proto_data_vec - .iter() - .enumerate() - .map(|(i, e)| TransactionOutput { - encoded_proto_data: e.clone(), - version: current_version + i as u64, - ..TransactionOutput::default() - }) - .collect(), - })), - chain_id: chain_id as u32, - }; - match tx.try_send(Result::<_, Status>::Ok(item.clone())) { + // 2. Push the data to the response channel, i.e. stream the data to the client. + let current_batch_size = transaction_data.len(); + let end_of_batch_version = transaction_data.last().unwrap().1; + let resp_item = raw_datastream_response_builder(transaction_data, chain_id as u32); + match tx.try_send(Result::::Ok(resp_item)) { Ok(_) => {}, Err(TrySendError::Full(_)) => { warn!( request_id = request_id.as_str(), "[Indexer Data] Receiver is full; retrying." ); - std::thread::sleep(Duration::from_secs(1)); + tokio::time::sleep(Duration::from_millis( + RESPONSE_CHANNEL_FULL_BACKOFF_DURATION_MS, + )) + .await; continue; }, Err(TrySendError::Closed(_)) => { @@ -140,17 +162,21 @@ impl IndexerStream for DatastreamServer { break; }, } - current_version += current_batch_size; - ma.tick_now(current_batch_size); + // 3. Update the current version and record current tps. + tps_calculator.tick_now(current_batch_size as u64); + current_version = end_of_batch_version + 1; info!( request_id = request_id.as_str(), current_version = current_version, batch_size = current_batch_size, - tps = (ma.avg() * 1000.0) as u64, + tps = (tps_calculator.avg() * 1000.0) as u64, "[Indexer Data] Sending batch." ); } - info!("[Indexer Data] Client disconnected."); + info!( + request_id = request_id.as_str(), + "[Indexer Data] Client disconnected." + ); }); let output_stream = ReceiverStream::new(rx); @@ -159,3 +185,100 @@ impl IndexerStream for DatastreamServer { )) } } + +/// Builds the response for the raw datastream request. Partial batch is ok, i.e., a batch with transactions < 1000. +fn raw_datastream_response_builder( + data: Vec, + chain_id: u32, +) -> RawDatastreamResponse { + RawDatastreamResponse { + response: Some(DatastreamProtoResponse::Data(TransactionsOutput { + transactions: data + .into_iter() + .map(|(encoded, version)| TransactionOutput { + encoded_proto_data: encoded, + version, + ..TransactionOutput::default() + }) + .collect(), + })), + chain_id, + } +} + +/// Fetches data from cache or the file store. It returns the data if it is ready in the cache or file store. +/// Otherwise, it returns the status of the data fetching. +async fn data_fetch( + starting_version: u64, + cache_operator: &mut CacheOperator, + file_store_operator: &FileStoreOperator, +) -> anyhow::Result { + let batch_get_result = cache_operator + .batch_get_encoded_proto_data(starting_version) + .await; + + match batch_get_result { + // Data is not ready yet in the cache. + Ok(CacheBatchGetStatus::NotReady) => Ok(TransactionsDataStatus::AheadOfCache), + Ok(CacheBatchGetStatus::Ok(transactions)) => Ok(TransactionsDataStatus::Success( + build_protobuf_encoded_transaction_wrappers(transactions, starting_version), + )), + Ok(CacheBatchGetStatus::EvictedFromCache) => { + // Data is evicted from the cache. Fetch from file store. + let file_store_batch_get_result = + file_store_operator.get_transactions(starting_version).await; + match file_store_batch_get_result { + Ok(transactions) => Ok(TransactionsDataStatus::Success( + build_protobuf_encoded_transaction_wrappers(transactions, starting_version), + )), + Err(e) => { + if e.to_string().contains("Transactions file not found") { + Ok(TransactionsDataStatus::DataGap) + } else { + Err(e) + } + }, + } + }, + Err(e) => Err(e), + } +} + +/// Handles the case when the data is not ready in the cache, i.e., beyond the current head. +async fn ahead_of_cache_data_handling() { + // TODO: add exponential backoff. + tokio::time::sleep(Duration::from_millis( + AHEAD_OF_CACHE_RETRY_SLEEP_DURATION_MS, + )) + .await; +} + +/// Handles data gap errors, i.e., the data is not present in the cache or file store. +fn data_gap_handling(version: u64) { + // TODO(larry): add metrics/alerts to track the gap. + // Do not crash the server when gap detected since other clients may still be able to get data. + error!( + current_version = version, + "[Indexer Data] Data gap detected. Please check the logs for more details." + ); +} + +/// Handles data fetch errors, including cache and file store related errors. +async fn data_fetch_error_handling( + err: anyhow::Error, + current_version: u64, + chain_id: u64, + request_id: &str, +) { + error!( + request_id = request_id, + chain_id = chain_id, + current_version = current_version, + "[Indexer Data] Failed to fetch data from cache and file store. {:?}", + err + ); + tokio::time::sleep(Duration::from_millis( + TRANSIENT_DATA_ERROR_RETRY_SLEEP_DURATION_MS, + )) + .await; +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/Cargo.toml b/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/Cargo.toml deleted file mode 100644 index e7dc2a6772d84..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/Cargo.toml +++ /dev/null @@ -1,31 +0,0 @@ -[package] -name = "aptos-indexer-grpc-file-store-data-verifier" -description = "Indexer gRPC file store verifier to validate the data against the fullnode." -version = "0.1.0" - -# Workspace inherited keys -authors = { workspace = true } -edition = { workspace = true } -homepage = { workspace = true } -license = { workspace = true } -publish = { workspace = true } -repository = { workspace = true } -rust-version = { workspace = true } - -[dependencies] -anyhow = { workspace = true } -aptos-crash-handler = { workspace = true } -aptos-indexer-grpc-utils = { workspace = true } -aptos-logger = { workspace = true } -aptos-moving-average = { workspace = true } -aptos-protos = { workspace = true } -aptos-runtimes = { workspace = true } -clap = { workspace = true } -cloud-storage = { workspace = true } -futures = { workspace = true } -futures-util = { workspace = true } -redis = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -tokio = { workspace = true } -warp = { workspace = true } diff --git a/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/README.md b/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/README.md deleted file mode 100644 index 85114c1c60166..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/README.md +++ /dev/null @@ -1,18 +0,0 @@ -# Data Verifier -* This is to verify that data stored in GCS and fetched from fullnode are identical. - -## How to Run it - -* service account json with `write` access to bucket `${file_store_bucket_name}`, e.g., `xxx.json`. - -* `SERVICE_ACCOUNT` env var pointing to service account json file. - -* Run it: `cargo run --release -- -c config.yaml` - -* Yaml Example -```yaml -fullnode_grpc_address: 127.0.0.1:50051 -redis_address: 127.0.0.1:6379 -file_store_bucket_name: indexer-grpc-file-store-testnet -health_check_port: 8083 -``` \ No newline at end of file diff --git a/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/lib.rs deleted file mode 100644 index 5fd02eaeeb9b3..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/lib.rs +++ /dev/null @@ -1,4 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -pub mod worker; diff --git a/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/main.rs b/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/main.rs deleted file mode 100644 index 78c1f71028f71..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/main.rs +++ /dev/null @@ -1,50 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use aptos_indexer_grpc_file_store_data_verifier::worker::Worker; -use clap::Parser; -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; -use warp::Filter; - -#[derive(Parser)] -pub struct Args { - #[clap(short, long)] - pub config_path: String, -} - -fn main() { - aptos_logger::Logger::new().init(); - aptos_crash_handler::setup_panic_handler(); - - // Load config. - let args = Args::parse(); - let config = aptos_indexer_grpc_utils::config::IndexerGrpcConfig::load( - std::path::PathBuf::from(args.config_path), - ) - .unwrap(); - - let health_port = config.health_check_port; - let runtime = aptos_runtimes::spawn_named_runtime("indexercache".to_string(), None); - // Start serving. - runtime.spawn(async move { - let mut worker = Worker::new(config); - worker.run().await; - }); - - // Start liveness and readiness probes. - runtime.spawn(async move { - let readiness = warp::path("readiness") - .map(move || warp::reply::with_status("ready", warp::http::StatusCode::OK)); - warp::serve(readiness) - .run(([0, 0, 0, 0], health_port)) - .await; - }); - - let term = Arc::new(AtomicBool::new(false)); - while !term.load(Ordering::Acquire) { - std::thread::park(); - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/worker.rs b/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/worker.rs deleted file mode 100644 index 77d6d3a83f2d3..0000000000000 --- a/ecosystem/indexer-grpc/indexer-grpc-file-store-data-verifier/src/worker.rs +++ /dev/null @@ -1,119 +0,0 @@ -// Copyright © Aptos Foundation -// SPDX-License-Identifier: Apache-2.0 - -use aptos_indexer_grpc_utils::{ - cache_operator::CacheOperator, config::IndexerGrpcConfig, create_grpc_client, - file_store_operator::FileStoreOperator, -}; -use aptos_protos::datastream::v1::{raw_datastream_response::Response, RawDatastreamRequest}; -use futures::{self, StreamExt}; -use tokio::sync::mpsc::channel; - -pub struct Worker { - config: IndexerGrpcConfig, -} - -impl Worker { - pub fn new(config: IndexerGrpcConfig) -> Self { - Self { config } - } - - pub async fn run(&mut self) { - let redis_client = redis::Client::open(format!("redis://{}", self.config.redis_address)) - .expect("Create redis client failed."); - let conn = redis_client - .get_async_connection() - .await - .expect("Create redis connection failed."); - let mut cache_operator = CacheOperator::new(conn); - let chain_id = cache_operator - .get_chain_id() - .await - .expect("Get chain id failed."); - - let (file_rx, mut file_tx) = channel::(100_000); - let (grpc_rx, mut grpc_tx) = channel::(100_000); - - let indexer_address = self.config.fullnode_grpc_address.as_ref().unwrap().clone(); - let file_store_bucket_name = self.config.file_store_bucket_name.clone(); - // Spawn one task to fetch data from grpc server. - tokio::spawn(async move { - let gprc_sender = grpc_rx.clone(); - let mut grpc_client = create_grpc_client(format!("http://{}", indexer_address)).await; - let req = RawDatastreamRequest { - starting_version: 0, - ..RawDatastreamRequest::default() - }; - let mut stream = grpc_client.raw_datastream(req).await.unwrap().into_inner(); - - let mut tmap = std::collections::BTreeMap::::new(); - while let Some(resp) = stream.next().await { - let resp = resp.unwrap(); - let response_type = resp.response; - if let Some(response) = response_type { - match response { - Response::Data(d) => { - for t in d.transactions { - tmap.insert(t.version, t.encoded_proto_data); - } - }, - Response::Status(d) => match d.r#type { - 0 => {}, - 1 => { - for i in tmap.values() { - gprc_sender.send(i.clone()).await.unwrap(); - } - tmap.clear(); - }, - _ => { - panic!("Unknown status type.") - }, - }, - } - } - } - }); - tokio::spawn(async move { - let file_rx = file_rx.clone(); - let file_store_operator = FileStoreOperator::new(file_store_bucket_name); - file_store_operator.bootstrap().await; - - let mut starting_version = 0; - loop { - // Metadata exists. - let metadata = file_store_operator.get_file_store_metadata().await.unwrap(); - if metadata.chain_id != chain_id { - panic!("Chain id not match."); - } - - let file_version = metadata.version; - while starting_version < file_version { - let data = file_store_operator - .get_transactions_file(starting_version) - .await - .unwrap(); - - let len = data.transactions.len() as u64; - for t in data.transactions { - file_rx.send(t).await.unwrap(); - } - starting_version += len; - } - tokio::time::sleep(std::time::Duration::from_secs(1)).await; - } - }); - aptos_logger::info!("Worker started."); - let mut verified_count = 0; - loop { - let file_data = file_tx.recv().await.unwrap(); - let grpc_data = grpc_tx.recv().await.unwrap(); - if file_data != grpc_data { - panic!("Data not match. {}", verified_count); - } - verified_count += 1; - if verified_count % 1000 == 0 { - aptos_logger::info!(verified_count = verified_count, "Succssefully verified."); - } - } - } -} diff --git a/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs b/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs index 4654656e88650..460d9a4cf9cac 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-file-store/src/processor.rs @@ -2,13 +2,18 @@ // SPDX-License-Identifier: Apache-2.0 use aptos_indexer_grpc_utils::{ + build_protobuf_encoded_transaction_wrappers, cache_operator::{CacheBatchGetStatus, CacheOperator}, config::IndexerGrpcConfig, constants::BLOB_STORAGE_SIZE, file_store_operator::FileStoreOperator, + EncodedTransactionWithVersion, }; use aptos_moving_average::MovingAverage; -use std::{thread::sleep, time::Duration}; +use std::time::Duration; + +// If the version is ahead of the cache head, retry after a short sleep. +const AHEAD_OF_CACHE_SLEEP_DURATION_IN_MILLIS: u64 = 1000; /// Processor tails the data in cache and stores the data in file store. pub struct Processor { @@ -28,8 +33,8 @@ impl Processor { } } - /// Bootstrap the processor, including creating the redis connection and file store operator. - async fn bootstrap(&mut self) { + /// Init the processor, including creating the redis connection and file store operator. + async fn init(&mut self) { // Connection to redis is a hard dependency for file store processor. let conn = redis::Client::open(format!("redis://{}", self.config.redis_address)) .expect("Create redis client failed.") @@ -45,7 +50,7 @@ impl Processor { let file_store_operator = FileStoreOperator::new(self.config.file_store_bucket_name.clone()); - file_store_operator.bootstrap().await; + file_store_operator.verify_storage_bucket_existence().await; self.cache_operator = Some(cache_operator); self.file_store_processor = Some(file_store_operator); @@ -54,7 +59,7 @@ impl Processor { // Starts the processing. pub async fn run(&mut self) { - self.bootstrap().await; + self.init().await; let cache_chain_id = self.cache_chain_id.unwrap(); // If file store and cache chain id don't match, panic. @@ -66,16 +71,24 @@ impl Processor { .await .unwrap(); - // The version to fetch from cache. + // This implements a two-cursor approach: + // * One curosr is to track the current cache version. + // * The other cursor is to track the current file store version. + // * Constrains: + // * The current cache version >= the current file store version. + // * The current file store version is always a multiple of BLOB_STORAGE_SIZE. let mut current_cache_version = metadata.version; let mut current_file_store_version = current_cache_version; - // The transactions buffer. - let mut transactions: Vec = vec![]; - let mut ma = MovingAverage::new(10_000); - // Once we hit the head, the processing is slowed to single thread. - let mut hit_head = false; - + // The transactions buffer to store the transactions fetched from cache. + let mut transactions_buffer: Vec = vec![]; + let mut tps_calculator = MovingAverage::new(10_000); loop { + // 0. Data verfiication. + // File store version has to be a multiple of BLOB_STORAGE_SIZE. + if current_file_store_version % BLOB_STORAGE_SIZE as u64 != 0 { + panic!("File store version is not a multiple of BLOB_STORAGE_SIZE."); + } + let batch_get_result = self .cache_operator .as_mut() @@ -83,66 +96,121 @@ impl Processor { .batch_get_encoded_proto_data(current_cache_version) .await; - match batch_get_result { - Ok(CacheBatchGetStatus::Ok(t)) => { - current_cache_version += t.len() as u64; - transactions.extend(t); - }, - Ok(CacheBatchGetStatus::NotReady) => { - sleep(Duration::from_secs(1)); - aptos_logger::info!( - current_file_store_version = current_file_store_version, - current_cache_version = current_cache_version, - "Cache is not ready. Sleep for 1 second." - ); + let batch_get_result = + fullnode_grpc_status_handling(batch_get_result, current_cache_version); + + let current_transactions = match batch_get_result { + Some(transactions) => transactions, + None => { + // Cache is not ready yet, i.e., ahead of current head. Wait. + tokio::time::sleep(Duration::from_millis( + AHEAD_OF_CACHE_SLEEP_DURATION_IN_MILLIS, + )) + .await; continue; }, - Ok(CacheBatchGetStatus::HitTheHead(t)) => { - current_cache_version += t.len() as u64; - transactions.extend(t); - hit_head = true; - aptos_logger::info!( - current_file_store_version = current_file_store_version, - current_cache_version = current_cache_version, - "File store processor hits the head." - ); - }, - Ok(CacheBatchGetStatus::EvictedFromCache) => { - panic!( - "Cache evicted from cache. For file store worker, this is not expected." - ); - }, - Err(err) => { - panic!("Batch get encoded proto data failed: {}", err); - }, - } + }; + + let hit_head = current_transactions.len() != BLOB_STORAGE_SIZE; + // Update the current cache version. + current_cache_version += current_transactions.len() as u64; + transactions_buffer.extend(current_transactions); + // If not hit the head, we want to collect more transactions. - if !hit_head && transactions.len() < 10 * BLOB_STORAGE_SIZE { + if !hit_head && transactions_buffer.len() < 10 * BLOB_STORAGE_SIZE { // If we haven't hit the head, we want to collect more transactions. continue; } // If hit the head, we want to collect at least one batch of transactions. - if hit_head && transactions.len() < BLOB_STORAGE_SIZE { + if hit_head && transactions_buffer.len() < BLOB_STORAGE_SIZE { continue; } - let batch_size = match !hit_head && transactions.len() >= 10 * BLOB_STORAGE_SIZE { - true => 10 * BLOB_STORAGE_SIZE, - false => BLOB_STORAGE_SIZE, - }; - let current_batch: Vec = transactions.drain(..batch_size).collect(); + // Drain the transactions buffer and upload to file store in size of multiple of BLOB_STORAGE_SIZE. + let process_size = transactions_buffer.len() / BLOB_STORAGE_SIZE * BLOB_STORAGE_SIZE; + let current_batch = transactions_buffer.drain(..process_size).collect(); + self.file_store_processor .as_mut() .unwrap() - .upload_transactions(cache_chain_id, current_file_store_version, current_batch) + .upload_transactions(cache_chain_id, current_batch) .await .unwrap(); - ma.tick_now(batch_size as u64); + tps_calculator.tick_now(process_size as u64); aptos_logger::info!( - tps = (ma.avg() * 1000.0) as u64, + tps = (tps_calculator.avg() * 1000.0) as u64, current_file_store_version = current_file_store_version, "Upload transactions to file store." ); - current_file_store_version += batch_size as u64; + current_file_store_version += process_size as u64; } } } + +fn fullnode_grpc_status_handling( + fullnode_rpc_status: anyhow::Result, + batch_start_version: u64, +) -> Option> { + match fullnode_rpc_status { + Ok(CacheBatchGetStatus::Ok(encoded_transactions)) => Some( + build_protobuf_encoded_transaction_wrappers(encoded_transactions, batch_start_version), + ), + Ok(CacheBatchGetStatus::NotReady) => None, + Ok(CacheBatchGetStatus::EvictedFromCache) => { + panic!( + "[indexer file]Cache evicted from cache. For file store worker, this is not expected." + ); + }, + Err(err) => { + panic!("Batch get encoded proto data failed: {}", err); + }, + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn verify_the_grpc_status_handling_ahead_of_cache() { + let fullnode_rpc_status: anyhow::Result = + Ok(CacheBatchGetStatus::NotReady); + let batch_start_version = 0; + assert!(fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version).is_none()); + } + + #[test] + #[should_panic] + fn verify_the_grpc_status_handling_evicted_from_cache() { + let fullnode_rpc_status: anyhow::Result = + Ok(CacheBatchGetStatus::EvictedFromCache); + let batch_start_version = 0; + fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version); + } + + #[test] + #[should_panic] + fn verify_the_grpc_status_handling_error() { + let fullnode_rpc_status: anyhow::Result = + Err(anyhow::anyhow!("Error")); + let batch_start_version = 0; + fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version); + } + + #[test] + fn verify_the_grpc_status_handling_ok() { + let batch_start_version = 2000; + let transactions: Vec = std::iter::repeat("txn".to_string()).take(1000).collect(); + let transactions_with_version: Vec = transactions + .iter() + .enumerate() + .map(|(index, txn)| (txn.clone(), batch_start_version + index as u64)) + .collect(); + let fullnode_rpc_status: anyhow::Result = + Ok(CacheBatchGetStatus::Ok(transactions)); + let actual_transactions = + fullnode_grpc_status_handling(fullnode_rpc_status, batch_start_version); + assert!(actual_transactions.is_some()); + let actual_transactions = actual_transactions.unwrap(); + assert_eq!(actual_transactions, transactions_with_version); + } +} diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs index 9d8e3988d03ff..69a256d0ea82a 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/cache_operator.rs @@ -62,8 +62,6 @@ const CACHE_SCRIPT_UPDATE_LATEST_VERSION: &str = r#" pub enum CacheBatchGetStatus { /// OK with batch of encoded transactions. Ok(Vec), - /// Ok but cache head is hit; wait and retry for next batch. - HitTheHead(Vec), /// Requested version is already evicted from cache. Visit file store instead. EvictedFromCache, /// Not ready yet. Wait and retry. @@ -240,18 +238,10 @@ impl CacheOperator { let versions = (start_version..start_version + v) .map(|e| e.to_string()) .collect::>(); - let len = versions.len(); - let encoded_transactions: Result, RedisError> = self.conn.mget(versions).await; match encoded_transactions { - Ok(v) => { - if len == BLOB_STORAGE_SIZE { - Ok(CacheBatchGetStatus::Ok(v)) - } else { - Ok(CacheBatchGetStatus::HitTheHead(v)) - } - }, + Ok(v) => Ok(CacheBatchGetStatus::Ok(v)), Err(err) => Err(err.into()), } }, @@ -396,11 +386,7 @@ mod tests { .batch_get_encoded_proto_data(0) .await .unwrap(), - CacheBatchGetStatus::HitTheHead(vec![ - "t0".to_string(), - "t1".to_string(), - "t2".to_string() - ]) + CacheBatchGetStatus::Ok(vec!["t0".to_string(), "t1".to_string(), "t2".to_string()]) ); } diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator.rs index 1eb429735b9e1..bd6b0212c22c3 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/file_store_operator.rs @@ -1,7 +1,7 @@ // Copyright © Aptos Foundation // SPDX-License-Identifier: Apache-2.0 -use crate::constants::BLOB_STORAGE_SIZE; +use crate::{constants::BLOB_STORAGE_SIZE, EncodedTransactionWithVersion}; use cloud_storage::{Bucket, Object}; use itertools::{any, Itertools}; use serde::{Deserialize, Serialize}; @@ -18,12 +18,12 @@ pub fn generate_blob_name(starting_version: u64) -> String { /// TransactionsFile is the file format for storing transactions. /// It's a JSON file with name: ${starting_version}.json. #[derive(Serialize, Deserialize)] -pub struct TransactionsFile { +pub(crate) struct TransactionsFile { // The version of the first transaction in the file. // It must be the same as the starting_version in the file name. pub starting_version: u64, - /// Each transaction is a encoded string for Transaction protobuf. - /// Expected size of each vector is BLOB_STORAGE_SIZE, i.e., 1_000. + // Each transaction is a encoded string for Transaction protobuf. + // Expected size of each vector is BLOB_STORAGE_SIZE, i.e., 1_000. pub transactions: Vec, } @@ -32,9 +32,9 @@ pub struct TransactionsFile { #[derive(Serialize, Deserialize, Copy, Clone, Debug)] pub struct FileStoreMetadata { pub chain_id: u64, - /// The size of each file folder, BLOB_STORAGE_SIZE, i.e., 1_000. + // The size of each file folder, BLOB_STORAGE_SIZE, i.e., 1_000. pub file_folder_size: usize, - /// The current version of the file store. + // The current version of the file store. pub version: u64, } @@ -66,7 +66,7 @@ impl FileStoreOperator { } /// Bootstraps the file store operator. This is required before any other operations. - pub async fn bootstrap(&self) { + pub async fn verify_storage_bucket_existence(&self) { // Verifies the bucket exists. Bucket::read(&self.bucket_name) .await @@ -74,20 +74,35 @@ impl FileStoreOperator { } /// Gets the transactions files from the file store. version has to be a multiple of BLOB_STORAGE_SIZE. - pub async fn get_transactions_file(&self, version: u64) -> anyhow::Result { - anyhow::ensure!( - version % BLOB_STORAGE_SIZE as u64 == 0, - "Version has to be a multiple of BLOB_STORAGE_SIZE." - ); - - let current_file_name = generate_blob_name(version); + pub async fn get_transactions(&self, version: u64) -> anyhow::Result> { + let batch_start_version = version / BLOB_STORAGE_SIZE as u64 * BLOB_STORAGE_SIZE as u64; + let current_file_name = generate_blob_name(batch_start_version); match Object::download(&self.bucket_name, current_file_name.as_str()).await { Ok(file) => { let file: TransactionsFile = serde_json::from_slice(&file).expect("Expected file to be valid JSON."); - Ok(file) + Ok(file + .transactions + .into_iter() + .skip((version % BLOB_STORAGE_SIZE as u64) as usize) + .collect()) + }, + Err(cloud_storage::Error::Other(err)) => { + if err.contains("No such object: ") { + anyhow::bail!("[Indexer File] Transactions file not found. Gap might happen between cache and file store. {}", err) + } else { + anyhow::bail!( + "[Indexer File] Error happens when transaction file. {}", + err + ); + } + }, + Err(err) => { + anyhow::bail!( + "[Indexer File] Error happens when transaction file. {}", + err + ); }, - Err(err) => Err(anyhow::Error::from(err)), } } @@ -184,31 +199,30 @@ impl FileStoreOperator { pub async fn upload_transactions( &mut self, chain_id: u64, - starting_version: u64, - transactions: Vec, + transactions: Vec, ) -> anyhow::Result<()> { + let start_version = transactions.first().unwrap().1; + let batch_size = transactions.len(); anyhow::ensure!( - starting_version % BLOB_STORAGE_SIZE as u64 == 0, + start_version % BLOB_STORAGE_SIZE as u64 == 0, "Starting version has to be a multiple of BLOB_STORAGE_SIZE." ); - + anyhow::ensure!( + batch_size % BLOB_STORAGE_SIZE == 0, + "The number of transactions to upload has to be multiplier of BLOB_STORAGE_SIZE." + ); let mut tasks = vec![]; - let mut current_version = starting_version; // Split the transactions into batches of BLOB_STORAGE_SIZE. for i in transactions.chunks(BLOB_STORAGE_SIZE) { let bucket_name = self.bucket_name.clone(); let current_batch = i.iter().cloned().collect_vec(); + let transactions_file = build_transactions_file(current_batch).unwrap(); let task = tokio::spawn(async move { - let batch_version = current_version; match Object::create( bucket_name.clone().as_str(), - serde_json::to_vec(&TransactionsFile { - starting_version: batch_version, - transactions: current_batch, - }) - .unwrap(), - generate_blob_name(batch_version).as_str(), + serde_json::to_vec(&transactions_file).unwrap(), + generate_blob_name(transactions_file.starting_version).as_str(), JSON_FILE_TYPE, ) .await @@ -217,7 +231,6 @@ impl FileStoreOperator { Err(err) => Err(anyhow::Error::from(err)), } }); - current_version += BLOB_STORAGE_SIZE as u64; tasks.push(task); } let results = match futures::future::try_join_all(tasks).await { @@ -229,13 +242,41 @@ impl FileStoreOperator { anyhow::bail!("Uploading transactions failed."); } - self.update_file_store_metadata(chain_id, current_version) + self.update_file_store_metadata(chain_id, start_version + batch_size as u64) .await } } +fn build_transactions_file( + transactions: Vec, +) -> anyhow::Result { + let starting_version = transactions.first().unwrap().1; + anyhow::ensure!( + starting_version % BLOB_STORAGE_SIZE as u64 == 0, + "Starting version has to be a multiple of BLOB_STORAGE_SIZE." + ); + anyhow::ensure!( + transactions.len() == BLOB_STORAGE_SIZE, + "The number of transactions to upload has to be BLOB_STORAGE_SIZE." + ); + anyhow::ensure!( + transactions + .iter() + .enumerate() + .any(|(ind, (_, version))| ind + starting_version as usize == *version as usize), + "Transactions are in order." + ); + + Ok(TransactionsFile { + starting_version, + transactions: transactions.into_iter().map(|(tx, _)| tx).collect(), + }) +} + #[cfg(test)] mod tests { + use super::*; + #[test] fn verify_blob_naming() { assert_eq!(super::generate_blob_name(0), "files/0.json"); @@ -256,4 +297,28 @@ mod tests { "files/18446744073709551615.json" ); } + + #[test] + fn verify_build_transactions_file() { + // 1000 txns with starting version 0 succeeds. + let mut transactions = vec![]; + for i in 0..BLOB_STORAGE_SIZE { + transactions.push(("".to_string(), i as u64)); + } + assert!(build_transactions_file(transactions).is_ok()); + + // 1001 txns fails. + let mut transactions = vec![]; + for i in 0..BLOB_STORAGE_SIZE + 1 { + transactions.push(("".to_string(), i as u64)); + } + assert!(build_transactions_file(transactions).is_err()); + // 1000 txns with starting version 1 fails. + let mut transactions = vec![]; + for i in 1..BLOB_STORAGE_SIZE + 1 { + transactions.push(("".to_string(), i as u64)); + } + + assert!(build_transactions_file(transactions).is_err()); + } } diff --git a/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs b/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs index 4993776a60514..cfb549ecb2427 100644 --- a/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs +++ b/ecosystem/indexer-grpc/indexer-grpc-utils/src/lib.rs @@ -34,3 +34,18 @@ pub async fn create_grpc_client(address: String) -> GrpcClientType { .await .unwrap() } + +// (Protobuf encoded transaction, version) +pub type EncodedTransactionWithVersion = (String, u64); +/// Build the EncodedTransactionWithVersion from the encoded transactions and starting version. +#[inline] +pub fn build_protobuf_encoded_transaction_wrappers( + encoded_transactions: Vec, + starting_version: u64, +) -> Vec { + encoded_transactions + .into_iter() + .enumerate() + .map(|(ind, encoded_transaction)| (encoded_transaction, starting_version + ind as u64)) + .collect() +}