From e09d5ada6a7437595cff5b460386ac32bdb03780 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Arpad=20M=C3=BCller?= Date: Mon, 16 Oct 2023 17:37:09 +0200 Subject: [PATCH] Azure blob storage support (#5546) Adds prototype-level support for [Azure blob storage](https://azure.microsoft.com/en-us/products/storage/blobs). Some corners were cut, see the TODOs and the followup issue #5567 for details. Steps to try it out: * Create a storage account with block blobs (this is a per-storage account setting). * Create a container inside that storage account. * Set the appropriate env vars: `AZURE_STORAGE_ACCOUNT, AZURE_STORAGE_ACCESS_KEY, REMOTE_STORAGE_AZURE_CONTAINER, REMOTE_STORAGE_AZURE_REGION` * Set the env var `ENABLE_REAL_AZURE_REMOTE_STORAGE=y` and run `cargo test -p remote_storage azure` Fixes #5562 --- Cargo.lock | 952 +++++++++++++++++-- Cargo.toml | 4 + control_plane/src/background_process.rs | 6 +- docs/pageserver-services.md | 10 + libs/remote_storage/Cargo.toml | 7 + libs/remote_storage/src/azure_blob.rs | 381 ++++++++ libs/remote_storage/src/lib.rs | 198 +++- libs/remote_storage/src/s3_bucket.rs | 54 +- libs/remote_storage/src/s3_bucket/metrics.rs | 2 +- libs/remote_storage/tests/test_real_azure.rs | 619 ++++++++++++ workspace_hack/Cargo.toml | 12 +- 11 files changed, 2096 insertions(+), 149 deletions(-) create mode 100644 libs/remote_storage/src/azure_blob.rs create mode 100644 libs/remote_storage/tests/test_real_azure.rs diff --git a/Cargo.lock b/Cargo.lock index be3f179d5fbd..aacf4e53d7cd 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,12 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "RustyXML" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b5ace29ee3216de37c0546865ad08edef58b0f9e76838ed8959a84a990e58c5" + [[package]] name = "addr2line" version = "0.19.0" @@ -17,6 +23,60 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" +[[package]] +name = "aead" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7fc95d1bdb8e6666b2b217308eeeb09f2d6728d104be3e31916cc74d15420331" +dependencies = [ + "generic-array", +] + +[[package]] +name = "aes" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "884391ef1066acaa41e766ba8f596341b96e93ce34f9a43e7d24bf0a0eaf0561" +dependencies = [ + "aes-soft", + "aesni", + "cipher", +] + +[[package]] +name = "aes-gcm" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5278b5fabbb9bd46e24aa69b2fdea62c99088e0a950a9be40e3e0101298f88da" +dependencies = [ + "aead", + "aes", + "cipher", + "ctr", + "ghash", + "subtle", +] + +[[package]] +name = "aes-soft" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be14c7498ea50828a38d0e24a765ed2effe92a705885b57d029cd67d45744072" +dependencies = [ + "cipher", + "opaque-debug", +] + +[[package]] +name = "aesni" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea2e11f5e94c2f7d386164cc2aa1f97823fed6f259e486940a71c174dd01b0ce" +dependencies = [ + "cipher", + "opaque-debug", +] + [[package]] name = "ahash" version = "0.8.3" @@ -132,7 +192,7 @@ dependencies = [ "num-traits", "rusticata-macros", "thiserror", - "time", + "time 0.3.21", ] [[package]] @@ -158,6 +218,17 @@ dependencies = [ "syn 1.0.109", ] +[[package]] +name = "async-channel" +version = "1.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81953c529336010edd6d8e358f886d9581267795c61b19475b71314bffa46d35" +dependencies = [ + "concurrent-queue", + "event-listener", + "futures-core", +] + [[package]] name = "async-compression" version = "0.4.0" @@ -171,6 +242,90 @@ dependencies = [ "tokio", ] +[[package]] +name = "async-executor" +version = "1.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2c1da3ae8dabd9c00f453a329dfe1fb28da3c0a72e2478cdcd93171740c20499" +dependencies = [ + "async-lock", + "async-task", + "concurrent-queue", + "fastrand 2.0.0", + "futures-lite", + "slab", +] + +[[package]] +name = "async-global-executor" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1b6f5d7df27bd294849f8eec66ecfc63d11814df7a4f5d74168a2394467b776" +dependencies = [ + "async-channel", + "async-executor", + "async-io", + "async-lock", + "blocking", + "futures-lite", + "once_cell", +] + +[[package]] +name = "async-io" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0fc5b45d93ef0529756f812ca52e44c221b35341892d3dcc34132ac02f3dd2af" +dependencies = [ + "async-lock", + "autocfg", + "cfg-if", + "concurrent-queue", + "futures-lite", + "log", + "parking", + "polling", + "rustix 0.37.19", + "slab", + "socket2 0.4.9", + "waker-fn", +] + +[[package]] +name = "async-lock" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "287272293e9d8c41773cec55e365490fe034813a2f172f502d6ddcf75b2f582b" +dependencies = [ + "event-listener", +] + +[[package]] +name = "async-std" +version = "1.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "62565bb4402e926b29953c785397c6dc0391b7b446e45008b0049eb43cec6f5d" +dependencies = [ + "async-channel", + "async-global-executor", + "async-io", + "async-lock", + "crossbeam-utils", + "futures-channel", + "futures-core", + "futures-io", + "futures-lite", + "gloo-timers", + "kv-log-macro", + "log", + "memchr", + "once_cell", + "pin-project-lite", + "pin-utils", + "slab", + "wasm-bindgen-futures", +] + [[package]] name = "async-stream" version = "0.3.5" @@ -193,6 +348,12 @@ dependencies = [ "syn 2.0.28", ] +[[package]] +name = "async-task" +version = "4.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b9441c6b2fe128a7c2bf680a44c34d0df31ce09e5b7e401fcca3faa483dbc921" + [[package]] name = "async-trait" version = "0.1.68" @@ -213,6 +374,12 @@ dependencies = [ "critical-section", ] +[[package]] +name = "atomic-waker" +version = "1.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1505bd5d3d116872e7271a6d4e16d81d0c8570876c8de68093a09ac269d8aac0" + [[package]] name = "autocfg" version = "1.1.0" @@ -242,7 +409,7 @@ dependencies = [ "http", "hyper", "ring", - "time", + "time 0.3.21", "tokio", "tower", "tracing", @@ -395,13 +562,13 @@ dependencies = [ "bytes", "form_urlencoded", "hex", - "hmac", + "hmac 0.12.1", "http", "once_cell", "percent-encoding", "regex", - "sha2", - "time", + "sha2 0.10.6", + "time 0.3.21", "tracing", ] @@ -433,8 +600,8 @@ dependencies = [ "http-body", "md-5", "pin-project-lite", - "sha1", - "sha2", + "sha1 0.10.5", + "sha2 0.10.6", "tracing", ] @@ -579,7 +746,7 @@ dependencies = [ "num-integer", "ryu", "serde", - "time", + "time 0.3.21", ] [[package]] @@ -603,7 +770,7 @@ dependencies = [ "aws-smithy-http", "aws-smithy-types", "http", - "rustc_version", + "rustc_version 0.4.0", "tracing", ] @@ -633,7 +800,7 @@ dependencies = [ "serde_json", "serde_path_to_error", "serde_urlencoded", - "sha1", + "sha1 0.10.5", "sync_wrapper", "tokio", "tokio-tungstenite", @@ -659,6 +826,75 @@ dependencies = [ "tower-service", ] +[[package]] +name = "azure_core" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e29286b9edfdd6f2c7e9d970bb5b015df8621258acab9ecfcea09b2d7692467" +dependencies = [ + "async-trait", + "base64 0.21.1", + "bytes", + "dyn-clone", + "futures", + "getrandom 0.2.9", + "http-types", + "log", + "paste", + "pin-project", + "quick-xml", + "rand 0.8.5", + "reqwest", + "rustc_version 0.4.0", + "serde", + "serde_json", + "time 0.3.21", + "url", + "uuid", +] + +[[package]] +name = "azure_storage" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bed0ccefde57930b2886fd4aed1f70ac469c197b8c2e94828290d71bcbdb5d97" +dependencies = [ + "RustyXML", + "async-trait", + "azure_core", + "bytes", + "futures", + "hmac 0.12.1", + "log", + "serde", + "serde_derive", + "serde_json", + "sha2 0.10.6", + "time 0.3.21", + "url", + "uuid", +] + +[[package]] +name = "azure_storage_blobs" +version = "0.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f91a52da2d192cfe43759f61e8bb31a5969f1722d5b85ac89627f356ad674ab4" +dependencies = [ + "RustyXML", + "azure_core", + "azure_storage", + "bytes", + "futures", + "log", + "serde", + "serde_derive", + "serde_json", + "time 0.3.21", + "url", + "uuid", +] + [[package]] name = "backtrace" version = "0.3.67" @@ -674,6 +910,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base-x" +version = "0.2.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cbbc9d0964165b47557570cce6c952866c2678457aca742aafc9fb771d30270" + [[package]] name = "base64" version = "0.13.1" @@ -746,6 +988,15 @@ version = "1.3.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" +[[package]] +name = "block-buffer" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4152116fd6e9dadb291ae18fc1ec3575ed6d84c29642d97890f4b4a3417297e4" +dependencies = [ + "generic-array", +] + [[package]] name = "block-buffer" version = "0.10.4" @@ -755,6 +1006,22 @@ dependencies = [ "generic-array", ] +[[package]] +name = "blocking" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8c36a4d0d48574b3dd360b4b7d95cc651d2b6557b6402848a27d4b228a473e2a" +dependencies = [ + "async-channel", + "async-lock", + "async-task", + "fastrand 2.0.0", + "futures-io", + "futures-lite", + "piper", + "tracing", +] + [[package]] name = "bstr" version = "1.5.0" @@ -897,6 +1164,15 @@ dependencies = [ "half", ] +[[package]] +name = "cipher" +version = "0.2.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12f8e7987cbd042a63249497f41aed09f8e65add917ea6566effbc56578d6801" +dependencies = [ + "generic-array", +] + [[package]] name = "clang-sys" version = "1.6.1" @@ -1031,6 +1307,21 @@ dependencies = [ "zstd", ] +[[package]] +name = "concurrent-queue" +version = "2.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f057a694a54f12365049b0958a1685bb52d567f5593b355fbf685838e873d400" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "const_fn" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbdcdcb6d86f71c5e97409ad45898af11cbc995b4ee8112d59095a28d376c935" + [[package]] name = "const_format" version = "0.2.30" @@ -1057,7 +1348,7 @@ version = "0.1.0" dependencies = [ "anyhow", "chrono", - "rand", + "rand 0.8.5", "serde", "serde_with", "utils", @@ -1099,6 +1390,23 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "cookie" +version = "0.14.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03a5d7b21829bc7b4bf4754a978a241ae54ea55a40f92bb20216e54096f4b951" +dependencies = [ + "aes-gcm", + "base64 0.13.1", + "hkdf", + "hmac 0.10.1", + "percent-encoding", + "rand 0.8.5", + "sha2 0.9.9", + "time 0.2.27", + "version_check", +] + [[package]] name = "core-foundation" version = "0.9.3" @@ -1124,13 +1432,19 @@ dependencies = [ "libc", ] +[[package]] +name = "cpuid-bool" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dcb25d077389e53838a8158c8e99174c5a9d902dee4904320db714f3c653ffba" + [[package]] name = "crc32c" version = "0.6.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3dfea2db42e9927a3845fb268a10a72faed6d416065f77873f05e411457c363e" dependencies = [ - "rustc_version", + "rustc_version 0.4.0", ] [[package]] @@ -1262,6 +1576,35 @@ dependencies = [ "typenum", ] +[[package]] +name = "crypto-mac" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4857fd85a0c34b3c3297875b747c1e02e06b6a0ea32dd892d8192b9ce0813ea6" +dependencies = [ + "generic-array", + "subtle", +] + +[[package]] +name = "ctor" +version = "0.1.26" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6d2301688392eb071b0bf1a37be05c469d3cc4dbbd95df672fe28ab021e6a096" +dependencies = [ + "quote", + "syn 1.0.109", +] + +[[package]] +name = "ctr" +version = "0.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb4a30d54f7443bf3d6191dcd486aca19e67cb3c49fa7a06a319966346707e7f" +dependencies = [ + "cipher", +] + [[package]] name = "darling" version = "0.20.1" @@ -1340,17 +1683,32 @@ dependencies = [ "rusticata-macros", ] +[[package]] +name = "digest" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d3dd60d1080a57a05ab032377049e0591415d2b31afd7028356dbf3cc6dcb066" +dependencies = [ + "generic-array", +] + [[package]] name = "digest" version = "0.10.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" dependencies = [ - "block-buffer", + "block-buffer 0.10.4", "crypto-common", "subtle", ] +[[package]] +name = "discard" +version = "1.0.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "212d0f5754cb6769937f4501cc0e67f4f4483c8d2c3e1e922ee9edbe4ab4c7c0" + [[package]] name = "displaydoc" version = "0.2.4" @@ -1362,6 +1720,12 @@ dependencies = [ "syn 2.0.28", ] +[[package]] +name = "dyn-clone" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23d2f3407d9a573d666de4b5bdf10569d73ca9478087346697dcbae6244bfbcd" + [[package]] name = "either" version = "1.8.1" @@ -1452,6 +1816,12 @@ dependencies = [ "libc", ] +[[package]] +name = "event-listener" +version = "2.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0206175f82b8d6bf6652ff7d71a1e27fd2e4efde587fd368662814d6ec1d9ce0" + [[package]] name = "fail" version = "0.5.1" @@ -1460,7 +1830,7 @@ checksum = "fe5e43d0f78a42ad591453aedb1d7ae631ce7ee445c7643691055a9ed8d3b01c" dependencies = [ "log", "once_cell", - "rand", + "rand 0.8.5", ] [[package]] @@ -1609,6 +1979,21 @@ version = "0.3.28" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +[[package]] +name = "futures-lite" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49a9d51ce47660b1e808d3c990b4709f2f415d928835a17dfd16991515c46bce" +dependencies = [ + "fastrand 1.9.0", + "futures-core", + "futures-io", + "memchr", + "parking", + "pin-project-lite", + "waker-fn", +] + [[package]] name = "futures-macro" version = "0.3.28" @@ -1666,6 +2051,17 @@ dependencies = [ "version_check", ] +[[package]] +name = "getrandom" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8fc3cb4d91f53b50155bdcfd23f6a4c39ae1969c2ae85982b135750cccaf5fce" +dependencies = [ + "cfg-if", + "libc", + "wasi 0.9.0+wasi-snapshot-preview1", +] + [[package]] name = "getrandom" version = "0.2.9" @@ -1675,10 +2071,20 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "wasm-bindgen", ] +[[package]] +name = "ghash" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "97304e4cd182c3846f7575ced3890c53012ce534ad9114046b0a9e00bb30a375" +dependencies = [ + "opaque-debug", + "polyval", +] + [[package]] name = "gimli" version = "0.27.2" @@ -1713,6 +2119,18 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" +[[package]] +name = "gloo-timers" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b995a66bb87bebce9a0f4a95aed01daca4872c050bfcb21653361c03bc35e5c" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "h2" version = "0.3.19" @@ -1784,7 +2202,7 @@ source = "git+https://github.com/japaric/heapless.git?rev=644653bf3b831c6bb4963b dependencies = [ "atomic-polyfill", "hash32", - "rustc_version", + "rustc_version 0.4.0", "spin 0.9.8", "stable_deref_trait", ] @@ -1826,13 +2244,33 @@ dependencies = [ "thiserror", ] +[[package]] +name = "hkdf" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51ab2f639c231793c5f6114bdb9bbe50a7dbbfcd7c7c6bd8475dec2d991e964f" +dependencies = [ + "digest 0.9.0", + "hmac 0.10.1", +] + +[[package]] +name = "hmac" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1441c6b1e930e2817404b5046f1f989899143a12bf92de603b69f4e0aee1e15" +dependencies = [ + "crypto-mac", + "digest 0.9.0", +] + [[package]] name = "hmac" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6c49c37c09c17a53d937dfbb742eb3a961d65a994e6bcdcf37e7399d0cc8ab5e" dependencies = [ - "digest", + "digest 0.10.7", ] [[package]] @@ -1861,11 +2299,33 @@ dependencies = [ name = "http-body" version = "0.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" +dependencies = [ + "bytes", + "http", + "pin-project-lite", +] + +[[package]] +name = "http-types" +version = "2.12.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6e9b187a72d63adbfba487f48095306ac823049cb504ee195541e91c7775f5ad" dependencies = [ - "bytes", - "http", + "anyhow", + "async-channel", + "async-std", + "base64 0.13.1", + "cookie", + "futures-lite", + "infer", "pin-project-lite", + "rand 0.7.3", + "serde", + "serde_json", + "serde_qs", + "serde_urlencoded", + "url", ] [[package]] @@ -1947,6 +2407,19 @@ dependencies = [ "tokio-io-timeout", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-tungstenite" version = "0.11.1" @@ -2010,6 +2483,12 @@ dependencies = [ "serde", ] +[[package]] +name = "infer" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "64e9829a50b42bb782c1df523f78d332fe371b10c661e78b7a3c34b0198e9fac" + [[package]] name = "inotify" version = "0.9.6" @@ -2151,6 +2630,15 @@ dependencies = [ "libc", ] +[[package]] +name = "kv-log-macro" +version = "1.0.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de8b303297635ad57c9f5059fd9cee7a47f8e8daa09df0fcd07dd39fb22977f" +dependencies = [ + "log", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -2208,6 +2696,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "abb12e687cfb44aa40f41fc3978ef76448f9b6038cad6aef4259d3c095a2382e" dependencies = [ "cfg-if", + "value-bag", ] [[package]] @@ -2237,7 +2726,7 @@ version = "0.10.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6365506850d44bff6e2fbcb5176cf63650e48bd45ef2fe2665ae1570e0f4b9ca" dependencies = [ - "digest", + "digest 0.10.7", ] [[package]] @@ -2329,7 +2818,7 @@ checksum = "5b9d9a46eff5b4ff64b45a9e316a6d1e0bc719ef429cbec4dc630684212bfdf9" dependencies = [ "libc", "log", - "wasi", + "wasi 0.11.0+wasi-snapshot-preview1", "windows-sys 0.45.0", ] @@ -2490,6 +2979,12 @@ version = "11.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ab1bc2a289d34bd04a330323ac98a1b4bc82c9d9fcb1e66b63caa84da26b575" +[[package]] +name = "opaque-debug" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "624a8340c38c1b80fd549087862da4ba43e08858af025b236e509b6649fc13d5" + [[package]] name = "openssl" version = "0.10.55" @@ -2629,7 +3124,7 @@ dependencies = [ "once_cell", "opentelemetry_api", "percent-encoding", - "rand", + "rand 0.8.5", "thiserror", "tokio", "tokio-stream", @@ -2716,7 +3211,7 @@ dependencies = [ "postgres_connection", "postgres_ffi", "pq_proto", - "rand", + "rand 0.8.5", "regex", "remote_storage", "reqwest", @@ -2766,6 +3261,12 @@ dependencies = [ "workspace_hack", ] +[[package]] +name = "parking" +version = "2.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e52c774a4c39359c1d1c52e43f73dd91a75a614652c825408eec30c95a9b2067" + [[package]] name = "parking_lot" version = "0.11.2" @@ -2821,20 +3322,26 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "346f04948ba92c43e8469c1ee6736c7563d71012b17d40745260fe106aac2166" dependencies = [ "base64ct", - "rand_core", + "rand_core 0.6.4", "subtle", ] +[[package]] +name = "paste" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de3145af08024dea9fa9914f381a17b8fc6034dfb00f3a84013f7ff43f29ed4c" + [[package]] name = "pbkdf2" version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f0ca0b5a68607598bf3bad68f32227a8164f6254833f84eafaac409cd6746c31" dependencies = [ - "digest", - "hmac", + "digest 0.10.7", + "hmac 0.12.1", "password-hash", - "sha2", + "sha2 0.10.6", ] [[package]] @@ -2928,6 +3435,17 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" +[[package]] +name = "piper" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "668d31b1c4eba19242f2088b2bf3316b82ca31082a8335764db4e083db7485d4" +dependencies = [ + "atomic-waker", + "fastrand 2.0.0", + "futures-io", +] + [[package]] name = "pkg-config" version = "0.3.27" @@ -2962,6 +3480,33 @@ dependencies = [ "plotters-backend", ] +[[package]] +name = "polling" +version = "2.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" +dependencies = [ + "autocfg", + "bitflags", + "cfg-if", + "concurrent-queue", + "libc", + "log", + "pin-project-lite", + "windows-sys 0.48.0", +] + +[[package]] +name = "polyval" +version = "0.4.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eebcc4aa140b9abd2bc40d9c3f7ccec842679cd79045ac3a7ac698c1a064b7cd" +dependencies = [ + "cpuid-bool", + "opaque-debug", + "universal-hash", +] + [[package]] name = "postgres" version = "0.19.4" @@ -2995,12 +3540,12 @@ dependencies = [ "byteorder", "bytes", "fallible-iterator", - "hmac", + "hmac 0.12.1", "lazy_static", "md-5", "memchr", - "rand", - "sha2", + "rand 0.8.5", + "sha2 0.10.6", "stringprep", ] @@ -3064,7 +3609,7 @@ dependencies = [ "memoffset 0.8.0", "once_cell", "postgres", - "rand", + "rand 0.8.5", "regex", "serde", "thiserror", @@ -3086,7 +3631,7 @@ dependencies = [ "bytes", "pin-project-lite", "postgres-protocol", - "rand", + "rand 0.8.5", "thiserror", "tokio", "tracing", @@ -3229,7 +3774,7 @@ dependencies = [ "hashbrown 0.13.2", "hashlink", "hex", - "hmac", + "hmac 0.12.1", "hostname", "humantime", "hyper", @@ -3247,7 +3792,7 @@ dependencies = [ "postgres_backend", "pq_proto", "prometheus", - "rand", + "rand 0.8.5", "rcgen", "regex", "reqwest", @@ -3262,7 +3807,7 @@ dependencies = [ "scopeguard", "serde", "serde_json", - "sha2", + "sha2 0.10.6", "socket2 0.5.3", "sync_wrapper", "thiserror", @@ -3284,6 +3829,16 @@ dependencies = [ "x509-parser", ] +[[package]] +name = "quick-xml" +version = "0.30.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eff6510e86862b57b210fd8cbe8ed3f0d7d600b9c2863cd4549a2e033c66e956" +dependencies = [ + "memchr", + "serde", +] + [[package]] name = "quote" version = "1.0.32" @@ -3293,6 +3848,19 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "rand" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a6b1679d49b24bbfe0c803429aa1874472f50d9b363131f0e89fc356b544d03" +dependencies = [ + "getrandom 0.1.16", + "libc", + "rand_chacha 0.2.2", + "rand_core 0.5.1", + "rand_hc", +] + [[package]] name = "rand" version = "0.8.5" @@ -3300,8 +3868,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" dependencies = [ "libc", - "rand_chacha", - "rand_core", + "rand_chacha 0.3.1", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_chacha" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f4c8ed856279c9737206bf725bf36935d8666ead7aa69b52be55af369d193402" +dependencies = [ + "ppv-lite86", + "rand_core 0.5.1", ] [[package]] @@ -3311,7 +3889,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" dependencies = [ "ppv-lite86", - "rand_core", + "rand_core 0.6.4", +] + +[[package]] +name = "rand_core" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bde5296fc891b0cef12a6d03ddccc162ce7b2aff54160af9338f8d40df6d19" +dependencies = [ + "getrandom 0.1.16", ] [[package]] @@ -3320,7 +3907,16 @@ version = "0.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" dependencies = [ - "getrandom", + "getrandom 0.2.9", +] + +[[package]] +name = "rand_hc" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ca3129af7b92a17112d59ad498c6f81eaf463253766b90396d39ea7a39d6613c" +dependencies = [ + "rand_core 0.5.1", ] [[package]] @@ -3353,7 +3949,7 @@ checksum = "4954fbc00dcd4d8282c987710e50ba513d351400dbdd00e803a05172a90d8976" dependencies = [ "pem 2.0.1", "ring", - "time", + "time 0.3.21", "yasna", ] @@ -3424,13 +4020,20 @@ dependencies = [ "aws-sdk-s3", "aws-smithy-http", "aws-types", + "azure_core", + "azure_storage", + "azure_storage_blobs", + "bytes", "camino", "camino-tempfile", + "futures-util", + "http-types", "hyper", + "itertools", "metrics", "once_cell", "pin-project-lite", - "rand", + "rand 0.8.5", "scopeguard", "serde", "serde_json", @@ -3459,11 +4062,13 @@ dependencies = [ "http-body", "hyper", "hyper-rustls", + "hyper-tls", "ipnet", "js-sys", "log", "mime", "mime_guess", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -3473,11 +4078,14 @@ dependencies = [ "serde_json", "serde_urlencoded", "tokio", + "tokio-native-tls", "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots 0.25.2", "winreg", @@ -3508,7 +4116,7 @@ dependencies = [ "async-trait", "chrono", "futures", - "getrandom", + "getrandom 0.2.9", "http", "hyper", "parking_lot 0.11.2", @@ -3529,7 +4137,7 @@ checksum = "1b97ad83c2fc18113346b7158d79732242002427c30f620fa817c1f32901e0a8" dependencies = [ "anyhow", "async-trait", - "getrandom", + "getrandom 0.2.9", "matchit", "opentelemetry", "reqwest", @@ -3547,7 +4155,7 @@ checksum = "e09bbcb5003282bcb688f0bae741b278e9c7e8f378f561522c9806c58e075d9b" dependencies = [ "anyhow", "chrono", - "rand", + "rand 0.8.5", ] [[package]] @@ -3596,7 +4204,7 @@ dependencies = [ "futures", "futures-timer", "rstest_macros", - "rustc_version", + "rustc_version 0.4.0", ] [[package]] @@ -3611,7 +4219,7 @@ dependencies = [ "quote", "regex", "relative-path", - "rustc_version", + "rustc_version 0.4.0", "syn 2.0.28", "unicode-ident", ] @@ -3628,13 +4236,22 @@ version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "08d43f7aa6b08d49f382cde6a7982047c3426db949b1424bc4b7ec9ae12c6ce2" +[[package]] +name = "rustc_version" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "138e3e0acb6c9fb258b19b67cb8abd63c00679d2851805ea151465464fe9030a" +dependencies = [ + "semver 0.9.0", +] + [[package]] name = "rustc_version" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366" dependencies = [ - "semver", + "semver 1.0.17", ] [[package]] @@ -3760,7 +4377,7 @@ dependencies = [ "histogram", "itertools", "pageserver", - "rand", + "rand 0.8.5", "reqwest", "serde", "serde_json", @@ -3894,12 +4511,27 @@ dependencies = [ "libc", ] +[[package]] +name = "semver" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d7eb9ef2c18661902cc47e535f9bc51b78acd254da71d375c2f6720d9a40403" +dependencies = [ + "semver-parser", +] + [[package]] name = "semver" version = "1.0.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed" +[[package]] +name = "semver-parser" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "388a1df253eca08550bef6c72392cfe7c30914bf41df5269b68cbd6ff8f570a3" + [[package]] name = "sentry" version = "0.31.6" @@ -3940,7 +4572,7 @@ dependencies = [ "hostname", "libc", "os_info", - "rustc_version", + "rustc_version 0.4.0", "sentry-core", "uname", ] @@ -3952,7 +4584,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8339474f587f36cb110fa1ed1b64229eea6d47b0b886375579297b7e47aeb055" dependencies = [ "once_cell", - "rand", + "rand 0.8.5", "sentry-types", "serde", "serde_json", @@ -3987,12 +4619,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "99dc599bd6646884fc403d593cdcb9816dd67c50cff3271c01ff123617908dcd" dependencies = [ "debugid", - "getrandom", + "getrandom 0.2.9", "hex", "serde", "serde_json", "thiserror", - "time", + "time 0.3.21", "url", "uuid", ] @@ -4038,6 +4670,17 @@ dependencies = [ "serde", ] +[[package]] +name = "serde_qs" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c7715380eec75f029a4ef7de39a9200e0a63823176b759d055b613f5a87df6a6" +dependencies = [ + "percent-encoding", + "serde", + "thiserror", +] + [[package]] name = "serde_spanned" version = "0.6.2" @@ -4072,7 +4715,7 @@ dependencies = [ "serde", "serde_json", "serde_with_macros", - "time", + "time 0.3.21", ] [[package]] @@ -4087,6 +4730,15 @@ dependencies = [ "syn 2.0.28", ] +[[package]] +name = "sha1" +version = "0.6.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c1da05c97445caa12d05e848c4a4fcbbea29e748ac28f7e80e9b010392063770" +dependencies = [ + "sha1_smol", +] + [[package]] name = "sha1" version = "0.10.5" @@ -4095,7 +4747,26 @@ checksum = "f04293dc80c3993519f2d7f6f511707ee7094fe0c6d3406feb330cdb3540eba3" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.7", +] + +[[package]] +name = "sha1_smol" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae1a47186c03a32177042e55dbc5fd5aee900b8e0069a8d70fba96a9375cd012" + +[[package]] +name = "sha2" +version = "0.9.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4d58a1e1bf39749807d89cf2d98ac2dfa0ff1cb3faa38fbb64dd88ac8013d800" +dependencies = [ + "block-buffer 0.9.0", + "cfg-if", + "cpufeatures", + "digest 0.9.0", + "opaque-debug", ] [[package]] @@ -4106,7 +4777,7 @@ checksum = "82e6b795fe2e3b1e845bafcb27aa35405c4d47cdfc92af5fc8d3002f76cebdc0" dependencies = [ "cfg-if", "cpufeatures", - "digest", + "digest 0.10.7", ] [[package]] @@ -4163,7 +4834,7 @@ dependencies = [ "num-bigint", "num-traits", "thiserror", - "time", + "time 0.3.21", ] [[package]] @@ -4228,12 +4899,70 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a8f112729512f8e442d81f95a8a7ddf2b7c6b8a1a6f509a95864142b30cab2d3" +[[package]] +name = "standback" +version = "0.2.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e113fb6f3de07a243d434a56ec6f186dfd51cb08448239fe7bcae73f87ff28ff" +dependencies = [ + "version_check", +] + [[package]] name = "static_assertions" version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a2eb9349b6444b326872e140eb1cf5e7c522154d69e7a0ffb0fb81c06b37543f" +[[package]] +name = "stdweb" +version = "0.4.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d022496b16281348b52d0e30ae99e01a73d737b2f45d38fed4edf79f9325a1d5" +dependencies = [ + "discard", + "rustc_version 0.2.3", + "stdweb-derive", + "stdweb-internal-macros", + "stdweb-internal-runtime", + "wasm-bindgen", +] + +[[package]] +name = "stdweb-derive" +version = "0.5.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c87a60a40fccc84bef0652345bbbbbe20a605bf5d0ce81719fc476f5c03b50ef" +dependencies = [ + "proc-macro2", + "quote", + "serde", + "serde_derive", + "syn 1.0.109", +] + +[[package]] +name = "stdweb-internal-macros" +version = "0.2.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "58fa5ff6ad0d98d1ffa8cb115892b6e69d67799f6763e162a1c9db421dc22e11" +dependencies = [ + "base-x", + "proc-macro2", + "quote", + "serde", + "serde_derive", + "serde_json", + "sha1 0.6.1", + "syn 1.0.109", +] + +[[package]] +name = "stdweb-internal-runtime" +version = "0.1.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "213701ba3370744dcd1a12960caa4843b3d68b4d1c0a5d575e0d65b2ee9d16c0" + [[package]] name = "storage_broker" version = "0.1.0" @@ -4467,6 +5196,21 @@ dependencies = [ "once_cell", ] +[[package]] +name = "time" +version = "0.2.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4752a97f8eebd6854ff91f1c1824cd6160626ac4bd44287f7f4ea2035a02a242" +dependencies = [ + "const_fn", + "libc", + "standback", + "stdweb", + "time-macros 0.1.1", + "version_check", + "winapi", +] + [[package]] name = "time" version = "0.3.21" @@ -4474,9 +5218,10 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f3403384eaacbca9923fa06940178ac13e4edb725486d70e8e15881d0c836cc" dependencies = [ "itoa", + "js-sys", "serde", "time-core", - "time-macros", + "time-macros 0.2.9", ] [[package]] @@ -4485,6 +5230,16 @@ version = "0.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7300fbefb4dadc1af235a9cef3737cea692a9d97e1b9cbcd4ebdae6f8868e6fb" +[[package]] +name = "time-macros" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "957e9c6e26f12cb6d0dd7fc776bb67a706312e7299aed74c8dd5b17ebb27e2f1" +dependencies = [ + "proc-macro-hack", + "time-macros-impl", +] + [[package]] name = "time-macros" version = "0.2.9" @@ -4494,6 +5249,19 @@ dependencies = [ "time-core", ] +[[package]] +name = "time-macros-impl" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd3c141a1b43194f3f56a1411225df8646c55781d5f26db825b3d98507eb482f" +dependencies = [ + "proc-macro-hack", + "proc-macro2", + "quote", + "standback", + "syn 1.0.109", +] + [[package]] name = "tinytemplate" version = "1.2.1" @@ -4803,7 +5571,7 @@ dependencies = [ "indexmap", "pin-project", "pin-project-lite", - "rand", + "rand 0.8.5", "slab", "tokio", "tokio-util", @@ -4855,7 +5623,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "09d48f71a791638519505cefafe162606f706c25592e4bde4d97600c0195312e" dependencies = [ "crossbeam-channel", - "time", + "time 0.3.21", "tracing-subscriber", ] @@ -4989,8 +5757,8 @@ dependencies = [ "http", "httparse", "log", - "rand", - "sha1", + "rand 0.8.5", + "sha1 0.10.5", "thiserror", "url", "utf-8", @@ -5053,6 +5821,16 @@ version = "0.2.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f962df74c8c05a667b5ee8bcf162993134c104e96440b663c8daa176dc772d8c" +[[package]] +name = "universal-hash" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8326b2c654932e3e4f9196e69d08fdf7cfd718e1dc6f66b347e6024a0c961402" +dependencies = [ + "generic-array", + "subtle", +] + [[package]] name = "untrusted" version = "0.7.1" @@ -5130,7 +5908,7 @@ dependencies = [ "pin-project-lite", "postgres_connection", "pq_proto", - "rand", + "rand 0.8.5", "regex", "routerify", "sentry", @@ -5158,7 +5936,7 @@ version = "1.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "345444e32442451b267fc254ae85a209c64be56d2890e601a0c37ff0c3c5ecd2" dependencies = [ - "getrandom", + "getrandom 0.2.9", "serde", ] @@ -5168,6 +5946,16 @@ version = "0.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "830b7e5d4d90034032940e4ace0d9a9a057e7a45cd94e6c007832e39edb82f6d" +[[package]] +name = "value-bag" +version = "1.0.0-alpha.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2209b78d1249f7e6f3293657c9779fe31ced465df091bbd433a1cf88e916ec55" +dependencies = [ + "ctor", + "version_check", +] + [[package]] name = "vcpkg" version = "0.2.15" @@ -5208,6 +5996,12 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5c3082ca00d5a5ef149bb8b555a72ae84c9c59f7250f013ac822ac2e49b19c64" +[[package]] +name = "waker-fn" +version = "1.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3c4517f54858c779bbcbf228f4fca63d121bf85fbecb2dc578cdf4a39395690" + [[package]] name = "wal_craft" version = "0.1.0" @@ -5245,6 +6039,12 @@ dependencies = [ "try-lock", ] +[[package]] +name = "wasi" +version = "0.9.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" + [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -5317,6 +6117,19 @@ version = "0.2.86" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ed9d5b4305409d1fc9482fee2d7f9bcbf24b3972bf59817ef757e23982242a93" +[[package]] +name = "wasm-streams" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b4609d447824375f43e1ffbc051b50ad8f4b3ae8219680c94452ea05eb240ac7" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "wasm-timer" version = "0.2.5" @@ -5597,6 +6410,7 @@ dependencies = [ "futures-channel", "futures-core", "futures-executor", + "futures-io", "futures-sink", "futures-util", "hex", @@ -5610,7 +6424,7 @@ dependencies = [ "num-integer", "num-traits", "prost", - "rand", + "rand 0.8.5", "regex", "regex-syntax 0.7.2", "reqwest", @@ -5621,9 +6435,11 @@ dependencies = [ "serde_json", "smallvec", "socket2 0.4.9", + "standback", "syn 1.0.109", "syn 2.0.28", - "time", + "time 0.3.21", + "time-macros 0.2.9", "tokio", "tokio-rustls", "tokio-util", @@ -5651,7 +6467,7 @@ dependencies = [ "oid-registry", "rusticata-macros", "thiserror", - "time", + "time 0.3.21", ] [[package]] @@ -5675,7 +6491,7 @@ version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e17bb3549cc1321ae1296b9cdc2698e2b6cb1992adfa19a8c72e5b7a738f44cd" dependencies = [ - "time", + "time 0.3.21", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 2b9da977e566..4827652229ae 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -36,6 +36,9 @@ license = "Apache-2.0" [workspace.dependencies] anyhow = { version = "1.0", features = ["backtrace"] } async-compression = { version = "0.4.0", features = ["tokio", "gzip"] } +azure_core = "0.16.0" +azure_storage = "0.16" +azure_storage_blobs = "0.16.0" flate2 = "1.0.26" async-stream = "0.3" async-trait = "0.1" @@ -76,6 +79,7 @@ hex = "0.4" hex-literal = "0.4" hmac = "0.12.1" hostname = "0.3.1" +http-types = "2" humantime = "2.1" humantime-serde = "1.1.1" hyper = "0.14" diff --git a/control_plane/src/background_process.rs b/control_plane/src/background_process.rs index 186d49fe8b52..c0016ece1767 100644 --- a/control_plane/src/background_process.rs +++ b/control_plane/src/background_process.rs @@ -86,7 +86,7 @@ where .stdout(process_log_file) .stderr(same_file_for_stderr) .args(args); - let filled_cmd = fill_aws_secrets_vars(fill_rust_env_vars(background_command)); + let filled_cmd = fill_remote_storage_secrets_vars(fill_rust_env_vars(background_command)); filled_cmd.envs(envs); let pid_file_to_check = match initial_pid_file { @@ -238,11 +238,13 @@ fn fill_rust_env_vars(cmd: &mut Command) -> &mut Command { filled_cmd } -fn fill_aws_secrets_vars(mut cmd: &mut Command) -> &mut Command { +fn fill_remote_storage_secrets_vars(mut cmd: &mut Command) -> &mut Command { for env_key in [ "AWS_ACCESS_KEY_ID", "AWS_SECRET_ACCESS_KEY", "AWS_SESSION_TOKEN", + "AZURE_STORAGE_ACCOUNT", + "AZURE_STORAGE_ACCESS_KEY", ] { if let Ok(value) = std::env::var(env_key) { cmd = cmd.env(env_key, value); diff --git a/docs/pageserver-services.md b/docs/pageserver-services.md index fc259c8a5fa3..ba5d3c423e50 100644 --- a/docs/pageserver-services.md +++ b/docs/pageserver-services.md @@ -96,6 +96,16 @@ prefix_in_bucket = '/test_prefix/' `AWS_SECRET_ACCESS_KEY` and `AWS_ACCESS_KEY_ID` env variables can be used to specify the S3 credentials if needed. +or + +```toml +[remote_storage] +container_name = 'some-container-name' +container_region = 'us-east' +prefix_in_container = '/test-prefix/' +``` + +`AZURE_STORAGE_ACCOUNT` and `AZURE_STORAGE_ACCESS_KEY` env variables can be used to specify the azure credentials if needed. ## Repository background tasks diff --git a/libs/remote_storage/Cargo.toml b/libs/remote_storage/Cargo.toml index d9386487508e..5d3bda70af9f 100644 --- a/libs/remote_storage/Cargo.toml +++ b/libs/remote_storage/Cargo.toml @@ -13,6 +13,7 @@ aws-types.workspace = true aws-config.workspace = true aws-sdk-s3.workspace = true aws-credential-types.workspace = true +bytes.workspace = true camino.workspace = true hyper = { workspace = true, features = ["stream"] } serde.workspace = true @@ -26,6 +27,12 @@ metrics.workspace = true utils.workspace = true pin-project-lite.workspace = true workspace_hack.workspace = true +azure_core.workspace = true +azure_storage.workspace = true +azure_storage_blobs.workspace = true +futures-util.workspace = true +http-types.workspace = true +itertools.workspace = true [dev-dependencies] camino-tempfile.workspace = true diff --git a/libs/remote_storage/src/azure_blob.rs b/libs/remote_storage/src/azure_blob.rs new file mode 100644 index 000000000000..ac2664b81a7c --- /dev/null +++ b/libs/remote_storage/src/azure_blob.rs @@ -0,0 +1,381 @@ +//! Azure Blob Storage wrapper + +use std::num::NonZeroU32; +use std::{borrow::Cow, collections::HashMap, io::Cursor}; + +use super::REMOTE_STORAGE_PREFIX_SEPARATOR; +use anyhow::Result; +use azure_core::request_options::{MaxResults, Metadata, Range}; +use azure_core::Header; +use azure_storage::StorageCredentials; +use azure_storage_blobs::prelude::ClientBuilder; +use azure_storage_blobs::{ + blob::operations::GetBlobBuilder, + prelude::{BlobClient, ContainerClient}, +}; +use futures_util::StreamExt; +use http_types::StatusCode; +use tokio::io::AsyncRead; +use tracing::debug; + +use crate::s3_bucket::RequestKind; +use crate::{ + AzureConfig, ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage, + StorageMetadata, +}; + +pub struct AzureBlobStorage { + client: ContainerClient, + prefix_in_container: Option, + max_keys_per_list_response: Option, + concurrency_limiter: ConcurrencyLimiter, +} + +impl AzureBlobStorage { + pub fn new(azure_config: &AzureConfig) -> Result { + debug!( + "Creating azure remote storage for azure container {}", + azure_config.container_name + ); + + let account = + std::env::var("AZURE_STORAGE_ACCOUNT").expect("missing AZURE_STORAGE_ACCOUNT"); + let access_key = + std::env::var("AZURE_STORAGE_ACCESS_KEY").expect("missing AZURE_STORAGE_ACCESS_KEY"); + + let credentials = StorageCredentials::access_key(account.clone(), access_key); + + let builder = ClientBuilder::new(account, credentials); + + let client = builder.container_client(azure_config.container_name.to_owned()); + + let max_keys_per_list_response = + if let Some(limit) = azure_config.max_keys_per_list_response { + Some( + NonZeroU32::new(limit as u32) + .ok_or_else(|| anyhow::anyhow!("max_keys_per_list_response can't be 0"))?, + ) + } else { + None + }; + + Ok(AzureBlobStorage { + client, + prefix_in_container: azure_config.prefix_in_container.to_owned(), + max_keys_per_list_response, + concurrency_limiter: ConcurrencyLimiter::new(azure_config.concurrency_limit.get()), + }) + } + + pub fn relative_path_to_name(&self, path: &RemotePath) -> String { + assert_eq!(std::path::MAIN_SEPARATOR, REMOTE_STORAGE_PREFIX_SEPARATOR); + let path_string = path + .get_path() + .as_str() + .trim_end_matches(REMOTE_STORAGE_PREFIX_SEPARATOR); + match &self.prefix_in_container { + Some(prefix) => { + if prefix.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { + prefix.clone() + path_string + } else { + format!("{prefix}{REMOTE_STORAGE_PREFIX_SEPARATOR}{path_string}") + } + } + None => path_string.to_string(), + } + } + + fn name_to_relative_path(&self, key: &str) -> RemotePath { + let relative_path = + match key.strip_prefix(self.prefix_in_container.as_deref().unwrap_or_default()) { + Some(stripped) => stripped, + // we rely on Azure to return properly prefixed paths + // for requests with a certain prefix + None => panic!( + "Key {key} does not start with container prefix {:?}", + self.prefix_in_container + ), + }; + RemotePath( + relative_path + .split(REMOTE_STORAGE_PREFIX_SEPARATOR) + .collect(), + ) + } + + async fn download_for_builder( + &self, + metadata: StorageMetadata, + builder: GetBlobBuilder, + ) -> Result { + let mut response = builder.into_stream(); + + // TODO give proper streaming response instead of buffering into RAM + // https://github.com/neondatabase/neon/issues/5563 + let mut buf = Vec::new(); + while let Some(part) = response.next().await { + let part = match part { + Ok(l) => l, + Err(e) => { + return Err(if let Some(http_err) = e.as_http_error() { + match http_err.status() { + StatusCode::NotFound => DownloadError::NotFound, + StatusCode::BadRequest => { + DownloadError::BadInput(anyhow::Error::new(e)) + } + _ => DownloadError::Other(anyhow::Error::new(e)), + } + } else { + DownloadError::Other(e.into()) + }); + } + }; + let data = part + .data + .collect() + .await + .map_err(|e| DownloadError::Other(e.into()))?; + buf.extend_from_slice(&data.slice(..)); + } + Ok(Download { + download_stream: Box::pin(Cursor::new(buf)), + metadata: Some(metadata), + }) + } + // TODO get rid of this function once we have metadata included in the response + // https://github.com/Azure/azure-sdk-for-rust/issues/1439 + async fn get_metadata( + &self, + blob_client: &BlobClient, + ) -> Result { + let builder = blob_client.get_metadata(); + + match builder.into_future().await { + Ok(r) => { + let mut map = HashMap::new(); + + for md in r.metadata.iter() { + map.insert( + md.name().as_str().to_string(), + md.value().as_str().to_string(), + ); + } + Ok(StorageMetadata(map)) + } + Err(e) => { + return Err(if let Some(http_err) = e.as_http_error() { + match http_err.status() { + StatusCode::NotFound => DownloadError::NotFound, + StatusCode::BadRequest => DownloadError::BadInput(anyhow::Error::new(e)), + _ => DownloadError::Other(anyhow::Error::new(e)), + } + } else { + DownloadError::Other(e.into()) + }); + } + } + } + + async fn permit(&self, kind: RequestKind) -> tokio::sync::SemaphorePermit<'_> { + self.concurrency_limiter + .acquire(kind) + .await + .expect("semaphore is never closed") + } +} + +fn to_azure_metadata(metadata: StorageMetadata) -> Metadata { + let mut res = Metadata::new(); + for (k, v) in metadata.0.into_iter() { + res.insert(k, v); + } + res +} + +#[async_trait::async_trait] +impl RemoteStorage for AzureBlobStorage { + async fn list_prefixes( + &self, + prefix: Option<&RemotePath>, + ) -> Result, DownloadError> { + // get the passed prefix or if it is not set use prefix_in_bucket value + let list_prefix = prefix + .map(|p| self.relative_path_to_name(p)) + .or_else(|| self.prefix_in_container.clone()) + .map(|mut p| { + // required to end with a separator + // otherwise request will return only the entry of a prefix + if !p.ends_with(REMOTE_STORAGE_PREFIX_SEPARATOR) { + p.push(REMOTE_STORAGE_PREFIX_SEPARATOR); + } + p + }); + + let mut builder = self + .client + .list_blobs() + .delimiter(REMOTE_STORAGE_PREFIX_SEPARATOR.to_string()); + + if let Some(prefix) = list_prefix { + builder = builder.prefix(Cow::from(prefix.to_owned())); + } + + if let Some(limit) = self.max_keys_per_list_response { + builder = builder.max_results(MaxResults::new(limit)); + } + + let mut response = builder.into_stream(); + let mut res = Vec::new(); + while let Some(l) = response.next().await { + let entry = match l { + Ok(l) => l, + Err(e) => { + return Err(if let Some(http_err) = e.as_http_error() { + match http_err.status() { + StatusCode::NotFound => DownloadError::NotFound, + StatusCode::BadRequest => { + DownloadError::BadInput(anyhow::Error::new(e)) + } + _ => DownloadError::Other(anyhow::Error::new(e)), + } + } else { + DownloadError::Other(e.into()) + }); + } + }; + let name_iter = entry + .blobs + .prefixes() + .map(|prefix| self.name_to_relative_path(&prefix.name)); + res.extend(name_iter); + } + Ok(res) + } + + async fn list_files(&self, folder: Option<&RemotePath>) -> anyhow::Result> { + let folder_name = folder + .map(|p| self.relative_path_to_name(p)) + .or_else(|| self.prefix_in_container.clone()); + + let mut builder = self.client.list_blobs(); + + if let Some(folder_name) = folder_name { + builder = builder.prefix(Cow::from(folder_name.to_owned())); + } + + if let Some(limit) = self.max_keys_per_list_response { + builder = builder.max_results(MaxResults::new(limit)); + } + + let mut response = builder.into_stream(); + let mut res = Vec::new(); + while let Some(l) = response.next().await { + let entry = l.map_err(anyhow::Error::new)?; + let name_iter = entry + .blobs + .blobs() + .map(|bl| self.name_to_relative_path(&bl.name)); + res.extend(name_iter); + } + Ok(res) + } + + async fn upload( + &self, + mut from: impl AsyncRead + Unpin + Send + Sync + 'static, + data_size_bytes: usize, + to: &RemotePath, + metadata: Option, + ) -> anyhow::Result<()> { + let _permit = self.permit(RequestKind::Put).await; + let blob_client = self.client.blob_client(self.relative_path_to_name(to)); + + // TODO FIX THIS UGLY HACK and don't buffer the entire object + // into RAM here, but use the streaming interface. For that, + // we'd have to change the interface though... + // https://github.com/neondatabase/neon/issues/5563 + let mut buf = Vec::with_capacity(data_size_bytes); + tokio::io::copy(&mut from, &mut buf).await?; + let body = azure_core::Body::Bytes(buf.into()); + + let mut builder = blob_client.put_block_blob(body); + + if let Some(metadata) = metadata { + builder = builder.metadata(to_azure_metadata(metadata)); + } + + let _response = builder.into_future().await?; + + Ok(()) + } + + async fn download(&self, from: &RemotePath) -> Result { + let _permit = self.permit(RequestKind::Get).await; + let blob_client = self.client.blob_client(self.relative_path_to_name(from)); + + let metadata = self.get_metadata(&blob_client).await?; + + let builder = blob_client.get(); + + self.download_for_builder(metadata, builder).await + } + + async fn download_byte_range( + &self, + from: &RemotePath, + start_inclusive: u64, + end_exclusive: Option, + ) -> Result { + let _permit = self.permit(RequestKind::Get).await; + let blob_client = self.client.blob_client(self.relative_path_to_name(from)); + + let metadata = self.get_metadata(&blob_client).await?; + + let mut builder = blob_client.get(); + + if let Some(end_exclusive) = end_exclusive { + builder = builder.range(Range::new(start_inclusive, end_exclusive)); + } else { + // Open ranges are not supported by the SDK so we work around + // by setting the upper limit extremely high (but high enough + // to still be representable by signed 64 bit integers). + // TODO remove workaround once the SDK adds open range support + // https://github.com/Azure/azure-sdk-for-rust/issues/1438 + let end_exclusive = u64::MAX << 4; + builder = builder.range(Range::new(start_inclusive, end_exclusive)); + } + + self.download_for_builder(metadata, builder).await + } + + async fn delete(&self, path: &RemotePath) -> anyhow::Result<()> { + let _permit = self.permit(RequestKind::Delete).await; + let blob_client = self.client.blob_client(self.relative_path_to_name(path)); + + let builder = blob_client.delete(); + + match builder.into_future().await { + Ok(_response) => Ok(()), + Err(e) => { + if let Some(http_err) = e.as_http_error() { + if http_err.status() == StatusCode::NotFound { + return Ok(()); + } + } + Err(anyhow::Error::new(e)) + } + } + } + + async fn delete_objects<'a>(&self, paths: &'a [RemotePath]) -> anyhow::Result<()> { + // Permit is already obtained by inner delete function + + // TODO batch requests are also not supported by the SDK + // https://github.com/Azure/azure-sdk-for-rust/issues/1068 + // https://github.com/Azure/azure-sdk-for-rust/issues/1249 + for path in paths { + self.delete(path).await?; + } + Ok(()) + } +} diff --git a/libs/remote_storage/src/lib.rs b/libs/remote_storage/src/lib.rs index 3560c94c71b1..435364d83a1e 100644 --- a/libs/remote_storage/src/lib.rs +++ b/libs/remote_storage/src/lib.rs @@ -4,7 +4,10 @@ //! [`RemoteStorage`] trait a CRUD-like generic abstraction to use for adapting external storages with a few implementations: //! * [`local_fs`] allows to use local file system as an external storage //! * [`s3_bucket`] uses AWS S3 bucket as an external storage +//! * [`azure_blob`] allows to use Azure Blob storage as an external storage //! + +mod azure_blob; mod local_fs; mod s3_bucket; mod simulate_failures; @@ -21,11 +24,15 @@ use anyhow::{bail, Context}; use camino::{Utf8Path, Utf8PathBuf}; use serde::{Deserialize, Serialize}; -use tokio::io; +use tokio::{io, sync::Semaphore}; use toml_edit::Item; use tracing::info; -pub use self::{local_fs::LocalFs, s3_bucket::S3Bucket, simulate_failures::UnreliableWrapper}; +pub use self::{ + azure_blob::AzureBlobStorage, local_fs::LocalFs, s3_bucket::S3Bucket, + simulate_failures::UnreliableWrapper, +}; +use s3_bucket::RequestKind; /// How many different timelines can be processed simultaneously when synchronizing layers with the remote storage. /// During regular work, pageserver produces one layer file per timeline checkpoint, with bursts of concurrency @@ -39,6 +46,11 @@ pub const DEFAULT_REMOTE_STORAGE_MAX_SYNC_ERRORS: u32 = 10; /// ~3500 PUT/COPY/POST/DELETE or 5500 GET/HEAD S3 requests /// pub const DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT: usize = 100; +/// We set this a little bit low as we currently buffer the entire file into RAM +/// +/// Here, a limit of max 20k concurrent connections was noted. +/// +pub const DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT: usize = 30; /// No limits on the client side, which currenltly means 1000 for AWS S3. /// pub const DEFAULT_MAX_KEYS_PER_LIST_RESPONSE: Option = None; @@ -217,6 +229,7 @@ impl std::error::Error for DownloadError {} pub enum GenericRemoteStorage { LocalFs(LocalFs), AwsS3(Arc), + AzureBlob(Arc), Unreliable(Arc), } @@ -228,6 +241,7 @@ impl GenericRemoteStorage { match self { Self::LocalFs(s) => s.list_files(folder).await, Self::AwsS3(s) => s.list_files(folder).await, + Self::AzureBlob(s) => s.list_files(folder).await, Self::Unreliable(s) => s.list_files(folder).await, } } @@ -242,6 +256,7 @@ impl GenericRemoteStorage { match self { Self::LocalFs(s) => s.list_prefixes(prefix).await, Self::AwsS3(s) => s.list_prefixes(prefix).await, + Self::AzureBlob(s) => s.list_prefixes(prefix).await, Self::Unreliable(s) => s.list_prefixes(prefix).await, } } @@ -256,6 +271,7 @@ impl GenericRemoteStorage { match self { Self::LocalFs(s) => s.upload(from, data_size_bytes, to, metadata).await, Self::AwsS3(s) => s.upload(from, data_size_bytes, to, metadata).await, + Self::AzureBlob(s) => s.upload(from, data_size_bytes, to, metadata).await, Self::Unreliable(s) => s.upload(from, data_size_bytes, to, metadata).await, } } @@ -264,6 +280,7 @@ impl GenericRemoteStorage { match self { Self::LocalFs(s) => s.download(from).await, Self::AwsS3(s) => s.download(from).await, + Self::AzureBlob(s) => s.download(from).await, Self::Unreliable(s) => s.download(from).await, } } @@ -283,6 +300,10 @@ impl GenericRemoteStorage { s.download_byte_range(from, start_inclusive, end_exclusive) .await } + Self::AzureBlob(s) => { + s.download_byte_range(from, start_inclusive, end_exclusive) + .await + } Self::Unreliable(s) => { s.download_byte_range(from, start_inclusive, end_exclusive) .await @@ -294,6 +315,7 @@ impl GenericRemoteStorage { match self { Self::LocalFs(s) => s.delete(path).await, Self::AwsS3(s) => s.delete(path).await, + Self::AzureBlob(s) => s.delete(path).await, Self::Unreliable(s) => s.delete(path).await, } } @@ -302,6 +324,7 @@ impl GenericRemoteStorage { match self { Self::LocalFs(s) => s.delete_objects(paths).await, Self::AwsS3(s) => s.delete_objects(paths).await, + Self::AzureBlob(s) => s.delete_objects(paths).await, Self::Unreliable(s) => s.delete_objects(paths).await, } } @@ -319,6 +342,11 @@ impl GenericRemoteStorage { s3_config.bucket_name, s3_config.bucket_region, s3_config.prefix_in_bucket, s3_config.endpoint); Self::AwsS3(Arc::new(S3Bucket::new(s3_config)?)) } + RemoteStorageKind::AzureContainer(azure_config) => { + info!("Using azure container '{}' in region '{}' as a remote storage, prefix in container: '{:?}'", + azure_config.container_name, azure_config.container_region, azure_config.prefix_in_container); + Self::AzureBlob(Arc::new(AzureBlobStorage::new(azure_config)?)) + } }) } @@ -383,6 +411,9 @@ pub enum RemoteStorageKind { /// AWS S3 based storage, storing all files in the S3 bucket /// specified by the config AwsS3(S3Config), + /// Azure Blob based storage, storing all files in the container + /// specified by the config + AzureContainer(AzureConfig), } /// AWS S3 bucket coordinates and access credentials to manage the bucket contents (read and write). @@ -422,11 +453,45 @@ impl Debug for S3Config { } } +/// Azure bucket coordinates and access credentials to manage the bucket contents (read and write). +#[derive(Clone, PartialEq, Eq)] +pub struct AzureConfig { + /// Name of the container to connect to. + pub container_name: String, + /// The region where the bucket is located at. + pub container_region: String, + /// A "subfolder" in the container, to use the same container separately by multiple remote storage users at once. + pub prefix_in_container: Option, + /// Azure has various limits on its API calls, we need not to exceed those. + /// See [`DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT`] for more details. + pub concurrency_limit: NonZeroUsize, + pub max_keys_per_list_response: Option, +} + +impl Debug for AzureConfig { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("AzureConfig") + .field("bucket_name", &self.container_name) + .field("bucket_region", &self.container_region) + .field("prefix_in_bucket", &self.prefix_in_container) + .field("concurrency_limit", &self.concurrency_limit) + .field( + "max_keys_per_list_response", + &self.max_keys_per_list_response, + ) + .finish() + } +} + impl RemoteStorageConfig { pub fn from_toml(toml: &toml_edit::Item) -> anyhow::Result> { let local_path = toml.get("local_path"); let bucket_name = toml.get("bucket_name"); let bucket_region = toml.get("bucket_region"); + let container_name = toml.get("container_name"); + let container_region = toml.get("container_region"); + + let use_azure = container_name.is_some() && container_region.is_some(); let max_concurrent_syncs = NonZeroUsize::new( parse_optional_integer("max_concurrent_syncs", toml)? @@ -440,9 +505,13 @@ impl RemoteStorageConfig { ) .context("Failed to parse 'max_sync_errors' as a positive integer")?; + let default_concurrency_limit = if use_azure { + DEFAULT_REMOTE_STORAGE_AZURE_CONCURRENCY_LIMIT + } else { + DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT + }; let concurrency_limit = NonZeroUsize::new( - parse_optional_integer("concurrency_limit", toml)? - .unwrap_or(DEFAULT_REMOTE_STORAGE_S3_CONCURRENCY_LIMIT), + parse_optional_integer("concurrency_limit", toml)?.unwrap_or(default_concurrency_limit), ) .context("Failed to parse 'concurrency_limit' as a positive integer")?; @@ -451,33 +520,70 @@ impl RemoteStorageConfig { .context("Failed to parse 'max_keys_per_list_response' as a positive integer")? .or(DEFAULT_MAX_KEYS_PER_LIST_RESPONSE); - let storage = match (local_path, bucket_name, bucket_region) { + let endpoint = toml + .get("endpoint") + .map(|endpoint| parse_toml_string("endpoint", endpoint)) + .transpose()?; + + let storage = match ( + local_path, + bucket_name, + bucket_region, + container_name, + container_region, + ) { // no 'local_path' nor 'bucket_name' options are provided, consider this remote storage disabled - (None, None, None) => return Ok(None), - (_, Some(_), None) => { + (None, None, None, None, None) => return Ok(None), + (_, Some(_), None, ..) => { bail!("'bucket_region' option is mandatory if 'bucket_name' is given ") } - (_, None, Some(_)) => { + (_, None, Some(_), ..) => { bail!("'bucket_name' option is mandatory if 'bucket_region' is given ") } - (None, Some(bucket_name), Some(bucket_region)) => RemoteStorageKind::AwsS3(S3Config { - bucket_name: parse_toml_string("bucket_name", bucket_name)?, - bucket_region: parse_toml_string("bucket_region", bucket_region)?, - prefix_in_bucket: toml - .get("prefix_in_bucket") - .map(|prefix_in_bucket| parse_toml_string("prefix_in_bucket", prefix_in_bucket)) - .transpose()?, - endpoint: toml - .get("endpoint") - .map(|endpoint| parse_toml_string("endpoint", endpoint)) - .transpose()?, - concurrency_limit, - max_keys_per_list_response, - }), - (Some(local_path), None, None) => RemoteStorageKind::LocalFs(Utf8PathBuf::from( - parse_toml_string("local_path", local_path)?, - )), - (Some(_), Some(_), _) => bail!("local_path and bucket_name are mutually exclusive"), + (None, Some(bucket_name), Some(bucket_region), ..) => { + RemoteStorageKind::AwsS3(S3Config { + bucket_name: parse_toml_string("bucket_name", bucket_name)?, + bucket_region: parse_toml_string("bucket_region", bucket_region)?, + prefix_in_bucket: toml + .get("prefix_in_bucket") + .map(|prefix_in_bucket| { + parse_toml_string("prefix_in_bucket", prefix_in_bucket) + }) + .transpose()?, + endpoint, + concurrency_limit, + max_keys_per_list_response, + }) + } + (_, _, _, Some(_), None) => { + bail!("'container_name' option is mandatory if 'container_region' is given ") + } + (_, _, _, None, Some(_)) => { + bail!("'container_name' option is mandatory if 'container_region' is given ") + } + (None, None, None, Some(container_name), Some(container_region)) => { + RemoteStorageKind::AzureContainer(AzureConfig { + container_name: parse_toml_string("container_name", container_name)?, + container_region: parse_toml_string("container_region", container_region)?, + prefix_in_container: toml + .get("prefix_in_container") + .map(|prefix_in_container| { + parse_toml_string("prefix_in_container", prefix_in_container) + }) + .transpose()?, + concurrency_limit, + max_keys_per_list_response, + }) + } + (Some(local_path), None, None, None, None) => RemoteStorageKind::LocalFs( + Utf8PathBuf::from(parse_toml_string("local_path", local_path)?), + ), + (Some(_), Some(_), ..) => { + bail!("'local_path' and 'bucket_name' are mutually exclusive") + } + (Some(_), _, _, Some(_), Some(_)) => { + bail!("local_path and 'container_name' are mutually exclusive") + } }; Ok(Some(RemoteStorageConfig { @@ -513,6 +619,46 @@ fn parse_toml_string(name: &str, item: &Item) -> anyhow::Result { Ok(s.to_string()) } +struct ConcurrencyLimiter { + // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. + // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. + // The helps to ensure we don't exceed the thresholds. + write: Arc, + read: Arc, +} + +impl ConcurrencyLimiter { + fn for_kind(&self, kind: RequestKind) -> &Arc { + match kind { + RequestKind::Get => &self.read, + RequestKind::Put => &self.write, + RequestKind::List => &self.read, + RequestKind::Delete => &self.write, + } + } + + async fn acquire( + &self, + kind: RequestKind, + ) -> Result, tokio::sync::AcquireError> { + self.for_kind(kind).acquire().await + } + + async fn acquire_owned( + &self, + kind: RequestKind, + ) -> Result { + Arc::clone(self.for_kind(kind)).acquire_owned().await + } + + fn new(limit: usize) -> ConcurrencyLimiter { + Self { + read: Arc::new(Semaphore::new(limit)), + write: Arc::new(Semaphore::new(limit)), + } + } +} + #[cfg(test)] mod tests { use super::*; diff --git a/libs/remote_storage/src/s3_bucket.rs b/libs/remote_storage/src/s3_bucket.rs index c63f24cf857c..fc94281666eb 100644 --- a/libs/remote_storage/src/s3_bucket.rs +++ b/libs/remote_storage/src/s3_bucket.rs @@ -4,7 +4,7 @@ //! allowing multiple api users to independently work with the same S3 bucket, if //! their bucket prefixes are both specified and different. -use std::{borrow::Cow, sync::Arc}; +use std::borrow::Cow; use anyhow::Context; use aws_config::{ @@ -24,22 +24,20 @@ use aws_sdk_s3::{ use aws_smithy_http::body::SdkBody; use hyper::Body; use scopeguard::ScopeGuard; -use tokio::{ - io::{self, AsyncRead}, - sync::Semaphore, -}; +use tokio::io::{self, AsyncRead}; use tokio_util::io::ReaderStream; use tracing::debug; use super::StorageMetadata; use crate::{ - Download, DownloadError, RemotePath, RemoteStorage, S3Config, MAX_KEYS_PER_DELETE, - REMOTE_STORAGE_PREFIX_SEPARATOR, + ConcurrencyLimiter, Download, DownloadError, RemotePath, RemoteStorage, S3Config, + MAX_KEYS_PER_DELETE, REMOTE_STORAGE_PREFIX_SEPARATOR, }; pub(super) mod metrics; -use self::metrics::{AttemptOutcome, RequestKind}; +use self::metrics::AttemptOutcome; +pub(super) use self::metrics::RequestKind; /// AWS S3 storage. pub struct S3Bucket { @@ -50,46 +48,6 @@ pub struct S3Bucket { concurrency_limiter: ConcurrencyLimiter, } -struct ConcurrencyLimiter { - // Every request to S3 can be throttled or cancelled, if a certain number of requests per second is exceeded. - // Same goes to IAM, which is queried before every S3 request, if enabled. IAM has even lower RPS threshold. - // The helps to ensure we don't exceed the thresholds. - write: Arc, - read: Arc, -} - -impl ConcurrencyLimiter { - fn for_kind(&self, kind: RequestKind) -> &Arc { - match kind { - RequestKind::Get => &self.read, - RequestKind::Put => &self.write, - RequestKind::List => &self.read, - RequestKind::Delete => &self.write, - } - } - - async fn acquire( - &self, - kind: RequestKind, - ) -> Result, tokio::sync::AcquireError> { - self.for_kind(kind).acquire().await - } - - async fn acquire_owned( - &self, - kind: RequestKind, - ) -> Result { - Arc::clone(self.for_kind(kind)).acquire_owned().await - } - - fn new(limit: usize) -> ConcurrencyLimiter { - Self { - read: Arc::new(Semaphore::new(limit)), - write: Arc::new(Semaphore::new(limit)), - } - } -} - #[derive(Default)] struct GetObjectRequest { bucket: String, diff --git a/libs/remote_storage/src/s3_bucket/metrics.rs b/libs/remote_storage/src/s3_bucket/metrics.rs index 2068ca0e38c2..ea11edafa57d 100644 --- a/libs/remote_storage/src/s3_bucket/metrics.rs +++ b/libs/remote_storage/src/s3_bucket/metrics.rs @@ -6,7 +6,7 @@ use once_cell::sync::Lazy; pub(super) static BUCKET_METRICS: Lazy = Lazy::new(Default::default); #[derive(Clone, Copy, Debug)] -pub(super) enum RequestKind { +pub(crate) enum RequestKind { Get = 0, Put = 1, Delete = 2, diff --git a/libs/remote_storage/tests/test_real_azure.rs b/libs/remote_storage/tests/test_real_azure.rs new file mode 100644 index 000000000000..5ebbd9e95b84 --- /dev/null +++ b/libs/remote_storage/tests/test_real_azure.rs @@ -0,0 +1,619 @@ +use std::collections::HashSet; +use std::env; +use std::num::{NonZeroU32, NonZeroUsize}; +use std::ops::ControlFlow; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::UNIX_EPOCH; + +use anyhow::Context; +use camino::Utf8Path; +use once_cell::sync::OnceCell; +use remote_storage::{ + AzureConfig, Download, GenericRemoteStorage, RemotePath, RemoteStorageConfig, RemoteStorageKind, +}; +use test_context::{test_context, AsyncTestContext}; +use tokio::task::JoinSet; +use tracing::{debug, error, info}; + +static LOGGING_DONE: OnceCell<()> = OnceCell::new(); + +const ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME: &str = "ENABLE_REAL_AZURE_REMOTE_STORAGE"; + +const BASE_PREFIX: &str = "test"; + +/// Tests that the Azure client can list all prefixes, even if the response comes paginated and requires multiple HTTP queries. +/// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified. +/// See the client creation in [`create_azure_client`] for details on the required env vars. +/// If real Azure tests are disabled, the test passes, skipping any real test run: currently, there's no way to mark the test ignored in runtime with the +/// deafult test framework, see https://github.com/rust-lang/rust/issues/68007 for details. +/// +/// First, the test creates a set of Azure blobs with keys `/${random_prefix_part}/${base_prefix_str}/sub_prefix_${i}/blob_${i}` in [`upload_azure_data`] +/// where +/// * `random_prefix_part` is set for the entire Azure client during the Azure client creation in [`create_azure_client`], to avoid multiple test runs interference +/// * `base_prefix_str` is a common prefix to use in the client requests: we would want to ensure that the client is able to list nested prefixes inside the bucket +/// +/// Then, verifies that the client does return correct prefixes when queried: +/// * with no prefix, it lists everything after its `${random_prefix_part}/` — that should be `${base_prefix_str}` value only +/// * with `${base_prefix_str}/` prefix, it lists every `sub_prefix_${i}` +/// +/// With the real Azure enabled and `#[cfg(test)]` Rust configuration used, the Azure client test adds a `max-keys` param to limit the response keys. +/// This way, we are able to test the pagination implicitly, by ensuring all results are returned from the remote storage and avoid uploading too many blobs to Azure. +/// +/// Lastly, the test attempts to clean up and remove all uploaded Azure files. +/// If any errors appear during the clean up, they get logged, but the test is not failed or stopped until clean up is finished. +#[test_context(MaybeEnabledAzureWithTestBlobs)] +#[tokio::test] +async fn azure_pagination_should_work( + ctx: &mut MaybeEnabledAzureWithTestBlobs, +) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledAzureWithTestBlobs::Enabled(ctx) => ctx, + MaybeEnabledAzureWithTestBlobs::Disabled => return Ok(()), + MaybeEnabledAzureWithTestBlobs::UploadsFailed(e, _) => { + anyhow::bail!("Azure init failed: {e:?}") + } + }; + + let test_client = Arc::clone(&ctx.enabled.client); + let expected_remote_prefixes = ctx.remote_prefixes.clone(); + + let base_prefix = RemotePath::new(Utf8Path::new(ctx.enabled.base_prefix)) + .context("common_prefix construction")?; + let root_remote_prefixes = test_client + .list_prefixes(None) + .await + .context("client list root prefixes failure")? + .into_iter() + .collect::>(); + assert_eq!( + root_remote_prefixes, HashSet::from([base_prefix.clone()]), + "remote storage root prefixes list mismatches with the uploads. Returned prefixes: {root_remote_prefixes:?}" + ); + + let nested_remote_prefixes = test_client + .list_prefixes(Some(&base_prefix)) + .await + .context("client list nested prefixes failure")? + .into_iter() + .collect::>(); + let remote_only_prefixes = nested_remote_prefixes + .difference(&expected_remote_prefixes) + .collect::>(); + let missing_uploaded_prefixes = expected_remote_prefixes + .difference(&nested_remote_prefixes) + .collect::>(); + assert_eq!( + remote_only_prefixes.len() + missing_uploaded_prefixes.len(), 0, + "remote storage nested prefixes list mismatches with the uploads. Remote only prefixes: {remote_only_prefixes:?}, missing uploaded prefixes: {missing_uploaded_prefixes:?}", + ); + + Ok(()) +} + +/// Tests that Azure client can list all files in a folder, even if the response comes paginated and requirees multiple Azure queries. +/// Uses real Azure and requires [`ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME`] and related Azure cred env vars specified. Test will skip real code and pass if env vars not set. +/// See `Azure_pagination_should_work` for more information. +/// +/// First, create a set of Azure objects with keys `random_prefix/folder{j}/blob_{i}.txt` in [`upload_azure_data`] +/// Then performs the following queries: +/// 1. `list_files(None)`. This should return all files `random_prefix/folder{j}/blob_{i}.txt` +/// 2. `list_files("folder1")`. This should return all files `random_prefix/folder1/blob_{i}.txt` +#[test_context(MaybeEnabledAzureWithSimpleTestBlobs)] +#[tokio::test] +async fn azure_list_files_works( + ctx: &mut MaybeEnabledAzureWithSimpleTestBlobs, +) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledAzureWithSimpleTestBlobs::Enabled(ctx) => ctx, + MaybeEnabledAzureWithSimpleTestBlobs::Disabled => return Ok(()), + MaybeEnabledAzureWithSimpleTestBlobs::UploadsFailed(e, _) => { + anyhow::bail!("Azure init failed: {e:?}") + } + }; + let test_client = Arc::clone(&ctx.enabled.client); + let base_prefix = + RemotePath::new(Utf8Path::new("folder1")).context("common_prefix construction")?; + let root_files = test_client + .list_files(None) + .await + .context("client list root files failure")? + .into_iter() + .collect::>(); + assert_eq!( + root_files, + ctx.remote_blobs.clone(), + "remote storage list_files on root mismatches with the uploads." + ); + let nested_remote_files = test_client + .list_files(Some(&base_prefix)) + .await + .context("client list nested files failure")? + .into_iter() + .collect::>(); + let trim_remote_blobs: HashSet<_> = ctx + .remote_blobs + .iter() + .map(|x| x.get_path()) + .filter(|x| x.starts_with("folder1")) + .map(|x| RemotePath::new(x).expect("must be valid path")) + .collect(); + assert_eq!( + nested_remote_files, trim_remote_blobs, + "remote storage list_files on subdirrectory mismatches with the uploads." + ); + Ok(()) +} + +#[test_context(MaybeEnabledAzure)] +#[tokio::test] +async fn azure_delete_non_exising_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledAzure::Enabled(ctx) => ctx, + MaybeEnabledAzure::Disabled => return Ok(()), + }; + + let path = RemotePath::new(Utf8Path::new( + format!("{}/for_sure_there_is_nothing_there_really", ctx.base_prefix).as_str(), + )) + .with_context(|| "RemotePath conversion")?; + + ctx.client.delete(&path).await.expect("should succeed"); + + Ok(()) +} + +#[test_context(MaybeEnabledAzure)] +#[tokio::test] +async fn azure_delete_objects_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> { + let ctx = match ctx { + MaybeEnabledAzure::Enabled(ctx) => ctx, + MaybeEnabledAzure::Disabled => return Ok(()), + }; + + let path1 = RemotePath::new(Utf8Path::new(format!("{}/path1", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path2 = RemotePath::new(Utf8Path::new(format!("{}/path2", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let path3 = RemotePath::new(Utf8Path::new(format!("{}/path3", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let data1 = "remote blob data1".as_bytes(); + let data1_len = data1.len(); + let data2 = "remote blob data2".as_bytes(); + let data2_len = data2.len(); + let data3 = "remote blob data3".as_bytes(); + let data3_len = data3.len(); + ctx.client + .upload(std::io::Cursor::new(data1), data1_len, &path1, None) + .await?; + + ctx.client + .upload(std::io::Cursor::new(data2), data2_len, &path2, None) + .await?; + + ctx.client + .upload(std::io::Cursor::new(data3), data3_len, &path3, None) + .await?; + + ctx.client.delete_objects(&[path1, path2]).await?; + + let prefixes = ctx.client.list_prefixes(None).await?; + + assert_eq!(prefixes.len(), 1); + + ctx.client.delete_objects(&[path3]).await?; + + Ok(()) +} + +#[test_context(MaybeEnabledAzure)] +#[tokio::test] +async fn azure_upload_download_works(ctx: &mut MaybeEnabledAzure) -> anyhow::Result<()> { + let MaybeEnabledAzure::Enabled(ctx) = ctx else { + return Ok(()); + }; + + let path = RemotePath::new(Utf8Path::new(format!("{}/file", ctx.base_prefix).as_str())) + .with_context(|| "RemotePath conversion")?; + + let data = "remote blob data here".as_bytes(); + let data_len = data.len() as u64; + + ctx.client + .upload(std::io::Cursor::new(data), data.len(), &path, None) + .await?; + + async fn download_and_compare(mut dl: Download) -> anyhow::Result> { + let mut buf = Vec::new(); + tokio::io::copy(&mut dl.download_stream, &mut buf).await?; + Ok(buf) + } + // Normal download request + let dl = ctx.client.download(&path).await?; + let buf = download_and_compare(dl).await?; + assert_eq!(buf, data); + + // Full range (end specified) + let dl = ctx + .client + .download_byte_range(&path, 0, Some(data_len)) + .await?; + let buf = download_and_compare(dl).await?; + assert_eq!(buf, data); + + // partial range (end specified) + let dl = ctx.client.download_byte_range(&path, 4, Some(10)).await?; + let buf = download_and_compare(dl).await?; + assert_eq!(buf, data[4..10]); + + // partial range (end beyond real end) + let dl = ctx + .client + .download_byte_range(&path, 8, Some(data_len * 100)) + .await?; + let buf = download_and_compare(dl).await?; + assert_eq!(buf, data[8..]); + + // Partial range (end unspecified) + let dl = ctx.client.download_byte_range(&path, 4, None).await?; + let buf = download_and_compare(dl).await?; + assert_eq!(buf, data[4..]); + + // Full range (end unspecified) + let dl = ctx.client.download_byte_range(&path, 0, None).await?; + let buf = download_and_compare(dl).await?; + assert_eq!(buf, data); + + Ok(()) +} + +fn ensure_logging_ready() { + LOGGING_DONE.get_or_init(|| { + utils::logging::init( + utils::logging::LogFormat::Test, + utils::logging::TracingErrorLayerEnablement::Disabled, + ) + .expect("logging init failed"); + }); +} + +struct EnabledAzure { + client: Arc, + base_prefix: &'static str, +} + +impl EnabledAzure { + async fn setup(max_keys_in_list_response: Option) -> Self { + let client = create_azure_client(max_keys_in_list_response) + .context("Azure client creation") + .expect("Azure client creation failed"); + + EnabledAzure { + client, + base_prefix: BASE_PREFIX, + } + } +} + +enum MaybeEnabledAzure { + Enabled(EnabledAzure), + Disabled, +} + +#[async_trait::async_trait] +impl AsyncTestContext for MaybeEnabledAzure { + async fn setup() -> Self { + ensure_logging_ready(); + + if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { + info!( + "`{}` env variable is not set, skipping the test", + ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME + ); + return Self::Disabled; + } + + Self::Enabled(EnabledAzure::setup(None).await) + } +} + +enum MaybeEnabledAzureWithTestBlobs { + Enabled(AzureWithTestBlobs), + Disabled, + UploadsFailed(anyhow::Error, AzureWithTestBlobs), +} + +struct AzureWithTestBlobs { + enabled: EnabledAzure, + remote_prefixes: HashSet, + remote_blobs: HashSet, +} + +#[async_trait::async_trait] +impl AsyncTestContext for MaybeEnabledAzureWithTestBlobs { + async fn setup() -> Self { + ensure_logging_ready(); + if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { + info!( + "`{}` env variable is not set, skipping the test", + ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME + ); + return Self::Disabled; + } + + let max_keys_in_list_response = 10; + let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap()); + + let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await; + + match upload_azure_data(&enabled.client, enabled.base_prefix, upload_tasks_count).await { + ControlFlow::Continue(uploads) => { + info!("Remote objects created successfully"); + + Self::Enabled(AzureWithTestBlobs { + enabled, + remote_prefixes: uploads.prefixes, + remote_blobs: uploads.blobs, + }) + } + ControlFlow::Break(uploads) => Self::UploadsFailed( + anyhow::anyhow!("One or multiple blobs failed to upload to Azure"), + AzureWithTestBlobs { + enabled, + remote_prefixes: uploads.prefixes, + remote_blobs: uploads.blobs, + }, + ), + } + } + + async fn teardown(self) { + match self { + Self::Disabled => {} + Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => { + cleanup(&ctx.enabled.client, ctx.remote_blobs).await; + } + } + } +} + +// NOTE: the setups for the list_prefixes test and the list_files test are very similar +// However, they are not idential. The list_prefixes function is concerned with listing prefixes, +// whereas the list_files function is concerned with listing files. +// See `RemoteStorage::list_files` documentation for more details +enum MaybeEnabledAzureWithSimpleTestBlobs { + Enabled(AzureWithSimpleTestBlobs), + Disabled, + UploadsFailed(anyhow::Error, AzureWithSimpleTestBlobs), +} +struct AzureWithSimpleTestBlobs { + enabled: EnabledAzure, + remote_blobs: HashSet, +} + +#[async_trait::async_trait] +impl AsyncTestContext for MaybeEnabledAzureWithSimpleTestBlobs { + async fn setup() -> Self { + ensure_logging_ready(); + if env::var(ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME).is_err() { + info!( + "`{}` env variable is not set, skipping the test", + ENABLE_REAL_AZURE_REMOTE_STORAGE_ENV_VAR_NAME + ); + return Self::Disabled; + } + + let max_keys_in_list_response = 10; + let upload_tasks_count = 1 + (2 * usize::try_from(max_keys_in_list_response).unwrap()); + + let enabled = EnabledAzure::setup(Some(max_keys_in_list_response)).await; + + match upload_simple_azure_data(&enabled.client, upload_tasks_count).await { + ControlFlow::Continue(uploads) => { + info!("Remote objects created successfully"); + + Self::Enabled(AzureWithSimpleTestBlobs { + enabled, + remote_blobs: uploads, + }) + } + ControlFlow::Break(uploads) => Self::UploadsFailed( + anyhow::anyhow!("One or multiple blobs failed to upload to Azure"), + AzureWithSimpleTestBlobs { + enabled, + remote_blobs: uploads, + }, + ), + } + } + + async fn teardown(self) { + match self { + Self::Disabled => {} + Self::Enabled(ctx) | Self::UploadsFailed(_, ctx) => { + cleanup(&ctx.enabled.client, ctx.remote_blobs).await; + } + } + } +} + +fn create_azure_client( + max_keys_per_list_response: Option, +) -> anyhow::Result> { + use rand::Rng; + + let remote_storage_azure_container = env::var("REMOTE_STORAGE_AZURE_CONTAINER").context( + "`REMOTE_STORAGE_AZURE_CONTAINER` env var is not set, but real Azure tests are enabled", + )?; + let remote_storage_azure_region = env::var("REMOTE_STORAGE_AZURE_REGION").context( + "`REMOTE_STORAGE_AZURE_REGION` env var is not set, but real Azure tests are enabled", + )?; + + // due to how time works, we've had test runners use the same nanos as bucket prefixes. + // millis is just a debugging aid for easier finding the prefix later. + let millis = std::time::SystemTime::now() + .duration_since(UNIX_EPOCH) + .context("random Azure test prefix part calculation")? + .as_millis(); + + // because nanos can be the same for two threads so can millis, add randomness + let random = rand::thread_rng().gen::(); + + let remote_storage_config = RemoteStorageConfig { + max_concurrent_syncs: NonZeroUsize::new(100).unwrap(), + max_sync_errors: NonZeroU32::new(5).unwrap(), + storage: RemoteStorageKind::AzureContainer(AzureConfig { + container_name: remote_storage_azure_container, + container_region: remote_storage_azure_region, + prefix_in_container: Some(format!("test_{millis}_{random:08x}/")), + concurrency_limit: NonZeroUsize::new(100).unwrap(), + max_keys_per_list_response, + }), + }; + Ok(Arc::new( + GenericRemoteStorage::from_config(&remote_storage_config).context("remote storage init")?, + )) +} + +struct Uploads { + prefixes: HashSet, + blobs: HashSet, +} + +async fn upload_azure_data( + client: &Arc, + base_prefix_str: &'static str, + upload_tasks_count: usize, +) -> ControlFlow { + info!("Creating {upload_tasks_count} Azure files"); + let mut upload_tasks = JoinSet::new(); + for i in 1..upload_tasks_count + 1 { + let task_client = Arc::clone(client); + upload_tasks.spawn(async move { + let prefix = format!("{base_prefix_str}/sub_prefix_{i}/"); + let blob_prefix = RemotePath::new(Utf8Path::new(&prefix)) + .with_context(|| format!("{prefix:?} to RemotePath conversion"))?; + let blob_path = blob_prefix.join(Utf8Path::new(&format!("blob_{i}"))); + debug!("Creating remote item {i} at path {blob_path:?}"); + + let data = format!("remote blob data {i}").into_bytes(); + let data_len = data.len(); + task_client + .upload(std::io::Cursor::new(data), data_len, &blob_path, None) + .await?; + + Ok::<_, anyhow::Error>((blob_prefix, blob_path)) + }); + } + + let mut upload_tasks_failed = false; + let mut uploaded_prefixes = HashSet::with_capacity(upload_tasks_count); + let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); + while let Some(task_run_result) = upload_tasks.join_next().await { + match task_run_result + .context("task join failed") + .and_then(|task_result| task_result.context("upload task failed")) + { + Ok((upload_prefix, upload_path)) => { + uploaded_prefixes.insert(upload_prefix); + uploaded_blobs.insert(upload_path); + } + Err(e) => { + error!("Upload task failed: {e:?}"); + upload_tasks_failed = true; + } + } + } + + let uploads = Uploads { + prefixes: uploaded_prefixes, + blobs: uploaded_blobs, + }; + if upload_tasks_failed { + ControlFlow::Break(uploads) + } else { + ControlFlow::Continue(uploads) + } +} + +async fn cleanup(client: &Arc, objects_to_delete: HashSet) { + info!( + "Removing {} objects from the remote storage during cleanup", + objects_to_delete.len() + ); + let mut delete_tasks = JoinSet::new(); + for object_to_delete in objects_to_delete { + let task_client = Arc::clone(client); + delete_tasks.spawn(async move { + debug!("Deleting remote item at path {object_to_delete:?}"); + task_client + .delete(&object_to_delete) + .await + .with_context(|| format!("{object_to_delete:?} removal")) + }); + } + + while let Some(task_run_result) = delete_tasks.join_next().await { + match task_run_result { + Ok(task_result) => match task_result { + Ok(()) => {} + Err(e) => error!("Delete task failed: {e:?}"), + }, + Err(join_err) => error!("Delete task did not finish correctly: {join_err}"), + } + } +} + +// Uploads files `folder{j}/blob{i}.txt`. See test description for more details. +async fn upload_simple_azure_data( + client: &Arc, + upload_tasks_count: usize, +) -> ControlFlow, HashSet> { + info!("Creating {upload_tasks_count} Azure files"); + let mut upload_tasks = JoinSet::new(); + for i in 1..upload_tasks_count + 1 { + let task_client = Arc::clone(client); + upload_tasks.spawn(async move { + let blob_path = PathBuf::from(format!("folder{}/blob_{}.txt", i / 7, i)); + let blob_path = RemotePath::new( + Utf8Path::from_path(blob_path.as_path()).expect("must be valid blob path"), + ) + .with_context(|| format!("{blob_path:?} to RemotePath conversion"))?; + debug!("Creating remote item {i} at path {blob_path:?}"); + + let data = format!("remote blob data {i}").into_bytes(); + let data_len = data.len(); + task_client + .upload(std::io::Cursor::new(data), data_len, &blob_path, None) + .await?; + + Ok::<_, anyhow::Error>(blob_path) + }); + } + + let mut upload_tasks_failed = false; + let mut uploaded_blobs = HashSet::with_capacity(upload_tasks_count); + while let Some(task_run_result) = upload_tasks.join_next().await { + match task_run_result + .context("task join failed") + .and_then(|task_result| task_result.context("upload task failed")) + { + Ok(upload_path) => { + uploaded_blobs.insert(upload_path); + } + Err(e) => { + error!("Upload task failed: {e:?}"); + upload_tasks_failed = true; + } + } + } + + if upload_tasks_failed { + ControlFlow::Break(uploaded_blobs) + } else { + ControlFlow::Continue(uploaded_blobs) + } +} diff --git a/workspace_hack/Cargo.toml b/workspace_hack/Cargo.toml index b2303869f2a4..4b250822e5ef 100644 --- a/workspace_hack/Cargo.toml +++ b/workspace_hack/Cargo.toml @@ -31,13 +31,14 @@ futures = { version = "0.3" } futures-channel = { version = "0.3", features = ["sink"] } futures-core = { version = "0.3" } futures-executor = { version = "0.3" } +futures-io = { version = "0.3" } futures-sink = { version = "0.3" } futures-util = { version = "0.3", features = ["channel", "io", "sink"] } hex = { version = "0.4", features = ["serde"] } hyper = { version = "0.14", features = ["full"] } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits"] } -log = { version = "0.4", default-features = false, features = ["std"] } +log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] } memchr = { version = "2" } nom = { version = "7" } num-bigint = { version = "0.4" } @@ -47,7 +48,7 @@ prost = { version = "0.11" } rand = { version = "0.8", features = ["small_rng"] } regex = { version = "1" } regex-syntax = { version = "0.7" } -reqwest = { version = "0.11", default-features = false, features = ["blocking", "json", "multipart", "rustls-tls"] } +reqwest = { version = "0.11", default-features = false, features = ["blocking", "default-tls", "json", "multipart", "rustls-tls", "stream"] } ring = { version = "0.16", features = ["std"] } rustls = { version = "0.21", features = ["dangerous_configuration"] } scopeguard = { version = "1" } @@ -55,7 +56,8 @@ serde = { version = "1", features = ["alloc", "derive"] } serde_json = { version = "1", features = ["raw_value"] } smallvec = { version = "1", default-features = false, features = ["write"] } socket2 = { version = "0.4", default-features = false, features = ["all"] } -time = { version = "0.3", features = ["formatting", "macros", "parsing"] } +standback = { version = "0.2", default-features = false, features = ["std"] } +time = { version = "0.3", features = ["macros", "serde-well-known"] } tokio = { version = "1", features = ["fs", "io-std", "io-util", "macros", "net", "process", "rt-multi-thread", "signal", "test-util"] } tokio-rustls = { version = "0.24" } tokio-util = { version = "0.7", features = ["codec", "io"] } @@ -75,14 +77,16 @@ cc = { version = "1", default-features = false, features = ["parallel"] } either = { version = "1" } itertools = { version = "0.10" } libc = { version = "0.2", features = ["extra_traits"] } -log = { version = "0.4", default-features = false, features = ["std"] } +log = { version = "0.4", default-features = false, features = ["kv_unstable", "std"] } memchr = { version = "2" } nom = { version = "7" } prost = { version = "0.11" } regex = { version = "1" } regex-syntax = { version = "0.7" } serde = { version = "1", features = ["alloc", "derive"] } +standback = { version = "0.2", default-features = false, features = ["std"] } syn-dff4ba8e3ae991db = { package = "syn", version = "1", features = ["extra-traits", "full", "visit"] } syn-f595c2ba2a3f28df = { package = "syn", version = "2", features = ["extra-traits", "full", "visit", "visit-mut"] } +time-macros = { version = "0.2", default-features = false, features = ["formatting", "parsing", "serde"] } ### END HAKARI SECTION