From e1b79e1ae40a9b5266110e694c9eadeae3cc19e2 Mon Sep 17 00:00:00 2001 From: Patrick Elsen Date: Mon, 20 Nov 2023 14:35:43 +0100 Subject: [PATCH] Adds prototype registry storage (#168) - Adds storage implementation - Adds CI workflow for registry - Addresses code review --- .github/workflows/{ci.yml => cli.yml} | 2 +- .github/workflows/registry.yml | 72 +++++++ Cargo.lock | 12 +- registry/Cargo.toml | 34 +++- registry/Proto.toml | 2 +- registry/deny.toml | 13 ++ registry/docker-compose.yml | 9 + .../migrations/20230828111733_users.up.sql | 49 +++-- .../migrations/20230828111844_packages.up.sql | 12 ++ registry/proto/registry.proto | 13 ++ registry/proto/resolver.proto | 22 --- registry/proto/version.proto | 15 ++ registry/src/{storage/mod.rs => api.rs} | 13 ++ registry/src/api/grpc.rs | 47 +++++ registry/src/context.rs | 27 +++ registry/src/lib.rs | 32 +++- registry/src/main.rs | 9 +- registry/src/options.rs | 88 ++++++++- registry/src/proto.rs | 6 + registry/src/schema.rs | 13 -- registry/src/storage.rs | 107 +++++++++++ registry/src/storage/filesystem.rs | 140 ++++++++++++++ registry/src/storage/s3.rs | 100 ++++++++++ registry/src/{db.rs => types.rs} | 23 ++- registry/tests/storage/filesystem.rs | 88 +++++++++ registry/tests/storage/main.rs | 159 ++++++++++++++++ registry/tests/storage/s3.rs | 178 ++++++++++++++++++ tests/cmd/package/out/lib-0.0.1.tgz | Bin 251 -> 128 bytes 28 files changed, 1203 insertions(+), 82 deletions(-) rename .github/workflows/{ci.yml => cli.yml} (99%) create mode 100644 .github/workflows/registry.yml create mode 100644 registry/deny.toml delete mode 100644 registry/proto/resolver.proto create mode 100644 registry/proto/version.proto rename registry/src/{storage/mod.rs => api.rs} (78%) create mode 100644 registry/src/api/grpc.rs delete mode 100644 registry/src/schema.rs create mode 100644 registry/src/storage.rs create mode 100644 registry/src/storage/filesystem.rs create mode 100644 registry/src/storage/s3.rs rename registry/src/{db.rs => types.rs} (53%) create mode 100644 registry/tests/storage/filesystem.rs create mode 100644 registry/tests/storage/main.rs create mode 100644 registry/tests/storage/s3.rs diff --git a/.github/workflows/ci.yml b/.github/workflows/cli.yml similarity index 99% rename from .github/workflows/ci.yml rename to .github/workflows/cli.yml index 73601e9e..49de2349 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/cli.yml @@ -1,4 +1,4 @@ -name: CI +name: Buffrs CLI on: push: diff --git a/.github/workflows/registry.yml b/.github/workflows/registry.yml new file mode 100644 index 00000000..18232e01 --- /dev/null +++ b/.github/workflows/registry.yml @@ -0,0 +1,72 @@ +name: Buffrs Registry + +on: + push: + branches: ["main"] + pull_request: + +env: + MINIMUM_LINE_COVERAGE_PERCENT: 5 + +jobs: + fmt: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: rustup update && rustup component add rustfmt + - run: cd registry && cargo fmt --check --all + + clippy: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: rustup update && rustup component add clippy + - uses: Swatinem/rust-cache@v2 + - run: cd registry && cargo clippy --all-targets --workspace -- -D warnings -D clippy::all + + test: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + lfs: "true" + - uses: isbang/compose-action@v1.5.1 + with: + compose-file: "./registry/docker-compose.yml" + - run: rustup update + - uses: Swatinem/rust-cache@v2 + - run: cd registry && cargo test --workspace + env: + RUST_BACKTRACE: 1 + + deny: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - run: rustup update + - uses: Swatinem/rust-cache@v2 + - run: cargo install cargo-deny || true + - run: cd registry && cargo deny --workspace check + + coverage: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + with: + lfs: 'true' + - uses: isbang/compose-action@v1.5.1 + with: + compose-file: "./registry/docker-compose.yml" + - run: rustup update + - run: rustup component add llvm-tools-preview + - uses: Swatinem/rust-cache@v2 + - run: cargo install cargo-llvm-cov || true + - run: cd registry && cargo llvm-cov --workspace --fail-under-lines "$MINIMUM_LINE_COVERAGE_PERCENT" + + typos: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: Swatinem/rust-cache@v2 + - run: cargo install typos-cli || true + - run: cd registry && typos diff --git a/Cargo.lock b/Cargo.lock index 942b7c38..c987f172 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -235,7 +235,7 @@ dependencies = [ "tar", "thiserror", "tokio", - "toml 0.8.5", + "toml 0.8.6", "tonic-build", "tracing", "tracing-subscriber", @@ -2050,14 +2050,14 @@ dependencies = [ [[package]] name = "toml" -version = "0.8.5" +version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3efaf127c78d5339cc547cce4e4d973bd5e4f56e949a06d091c082ebeef2f800" +checksum = "8ff9e3abce27ee2c9a37f9ad37238c1bdd4e789c84ba37df76aa4d528f5072cc" dependencies = [ "serde", "serde_spanned", "toml_datetime", - "toml_edit 0.20.5", + "toml_edit 0.20.7", ] [[package]] @@ -2084,9 +2084,9 @@ dependencies = [ [[package]] name = "toml_edit" -version = "0.20.5" +version = "0.20.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "782bf6c2ddf761c1e7855405e8975472acf76f7f36d0d4328bd3b7a2fae12a85" +checksum = "70f427fce4d84c72b5b732388bf4a9f4531b53f74e2887e3ecb2481f68f66d81" dependencies = [ "indexmap 2.0.2", "serde", diff --git a/registry/Cargo.toml b/registry/Cargo.toml index 46cea14a..539adaf5 100644 --- a/registry/Cargo.toml +++ b/registry/Cargo.toml @@ -4,18 +4,40 @@ version = "0.1.0" edition = "2021" description = "Registry for buffrs, a modern protocol buffer package manager" license = "Apache-2.0" +default-run = "buffrs-registry" [dependencies] -buffrs = { path = "../", version = "0.6.4" } -prost = "0.12.1" -tonic = "0.10.2" +async-trait = "0.1.74" +aws-config = { version = "0.56.1", optional = true } +aws-sdk-s3 = { version = "0.34.0", optional = true } +axum_tonic = "0.1.0" +buffrs = { path = "../", version = "0.7.1" } +bytes = "1.5.0" +chrono = "0.4.31" clap = { version = "4.3", features = ["cargo", "derive", "env"] } +eyre = "0.6.8" +miette = "5.10.0" +prost = "0.12.1" +semver = "1.0.20" +sqlx = { version = "0.7.2", features = ["runtime-tokio", "postgres", "macros", "migrate"] } +ssh-key = { version = "0.6.2", features = ["ed25519"] } +thiserror = "1.0.49" tokio = { version = "1.33.0", features = ["full"] } +tonic = "0.10.2" tracing = "0.1.37" tracing-subscriber = "0.3.17" -sqlx = { version = "0.7.2", features = ["runtime-tokio", "postgres", "macros", "migrate"] } -eyre = "0.6.8" url = "2.4.1" [build-dependencies] -buffrs = { path = "../", version = "0.7.0" } +buffrs = { path = "../", version = "0.7.1" } + +[features] +default = ["storage-s3"] +storage-s3 = ["dep:aws-config", "dep:aws-sdk-s3"] + +[dev-dependencies] +aws-credential-types = { version = "0.56.1", features = ["hardcoded-credentials"] } +proptest = "1.3.1" +rand = "0.8.5" +tempfile = "3.8.1" +test-strategy = "0.3.1" diff --git a/registry/Proto.toml b/registry/Proto.toml index d25a005e..63267b49 100644 --- a/registry/Proto.toml +++ b/registry/Proto.toml @@ -1,4 +1,4 @@ [package] type = "api" -name = "buffrs-registry" +name = "buffrs" version = "0.0.1" diff --git a/registry/deny.toml b/registry/deny.toml new file mode 100644 index 00000000..66b8e4f2 --- /dev/null +++ b/registry/deny.toml @@ -0,0 +1,13 @@ +[licenses] +unlicensed = "deny" +allow = ["Apache-2.0", "BSD-3-Clause", "MIT", "Unicode-DFS-2016", "ISC"] +default = "deny" + +[[licenses.clarify]] +name = "ring" +expression = "ISC" +license-files = [{ path = "LICENSE", hash = 0xbd0eed23 }] + +[bans] +multiple-versions = "warn" +wildcards = "deny" diff --git a/registry/docker-compose.yml b/registry/docker-compose.yml index 9317a924..dd035b8e 100644 --- a/registry/docker-compose.yml +++ b/registry/docker-compose.yml @@ -7,3 +7,12 @@ services: POSTGRES_USER: buffrs ports: - 127.0.0.1:5432:5432 + storage: + image: minio/minio + command: server /data + environment: + MINIO_ROOT_USER: buffrs + MINIO_ROOT_PASSWORD: password + MINIO_DOMAIN: localhost + ports: + - 127.0.0.1:9000:9000 diff --git a/registry/migrations/20230828111733_users.up.sql b/registry/migrations/20230828111733_users.up.sql index db1e4746..08340c61 100644 --- a/registry/migrations/20230828111733_users.up.sql +++ b/registry/migrations/20230828111733_users.up.sql @@ -1,18 +1,37 @@ -CREATE TABLE users ( - id SERIAL PRIMARY KEY, - -- metadata - name TEXT, - email TEXT, - avatar TEXT, - -- static user token here - token TEXT NOT NULL UNIQUE, - -- timestamps - created_at TIMESTAMPTZ NOT NULL, - updated_at TIMESTAMPTZ NOT NULL, - deleted_at TIMESTAMPTZ +-- table of users. +CREATE TABLE "users" ( + "id" SERIAL PRIMARY KEY, + "handle" TEXT NOT NULL UNIQUE, + "created_at" TIMESTAMPTZ NOT NULL DEFAULT (now()), + "updated_at" TIMESTAMPTZ NOT NULL DEFAULT (now()), + -- when users are deleted, we keep the row, but set the deleted_at field. + "deleted_at" TIMESTAMPTZ ); -CREATE TABLE user_tokens ( - user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, - token TEXT NOT NULL UNIQUE +CREATE TABLE "user_tokens" ( + "user" INTEGER NOT NULL REFERENCES users(id) ON DELETE CASCADE, + "prefix" TEXT NOT NULL UNIQUE, + "hash" TEXT NOT NULL UNIQUE, + "allow_publish" BOOLEAN NOT NULL DEFAULT (false), + "allow_update" BOOLEAN NOT NULL DEFAULT (false), + "allow_yank" BOOLEAN NOT NULL DEFAULT (false), + "created_at" TIMESTAMPTZ NOT NULL DEFAULT (now()), + "expires_at" TIMESTAMPTZ NOT NULL, + "deleted_at" TIMESTAMPTZ ); + +-- view showing only active users +CREATE VIEW "users_active" AS + SELECT * + FROM users + WHERE deleted_at IS NULL; + +-- view showing only active tokens +CREATE VIEW "user_tokens_active" AS + SELECT + tokens.*, + users.handle + FROM users_active users + JOIN user_tokens tokens on tokens.user = users.id + WHERE tokens.expires_at > now() + AND tokens.deleted_at IS NULL; diff --git a/registry/migrations/20230828111844_packages.up.sql b/registry/migrations/20230828111844_packages.up.sql index 05bdb6da..7c146deb 100644 --- a/registry/migrations/20230828111844_packages.up.sql +++ b/registry/migrations/20230828111844_packages.up.sql @@ -20,3 +20,15 @@ CREATE TABLE package_owners ( created_at TIMESTAMPTZ NOT NULL, deleted_at TIMESTAMPTZ ); + +CREATE TABLE package_invites ( + id SERIAL PRIMARY KEY, + token TEXT NOT NULL UNIQUE, + + user_id INTEGER NOT NULL REFERENCES users(id) ON DELETE RESTRICT, + package_id INTEGER NOT NULL REFERENCES packages(id) ON DELETE RESTRICT, + created_by INTEGER NOT NULL REFERENCES users(id) ON DELETE RESTRICT, + + created_at TIMESTAMPTZ NOT NULL, + expires_at TIMESTAMPTZ NOT NULL +); diff --git a/registry/proto/registry.proto b/registry/proto/registry.proto index f60400b2..f8448493 100644 --- a/registry/proto/registry.proto +++ b/registry/proto/registry.proto @@ -14,6 +14,10 @@ service Registry { // Download a package rpc Download(DownloadRequest) returns (DownloadResponse); + + // Get versions for a package + rpc Versions(VersionsRequest) + returns (VersionsResponse); } message PublishRequest { @@ -29,3 +33,12 @@ message DownloadRequest { message DownloadResponse { buffrs.package.Compressed package = 1; } + +message VersionsRequest { + string name = 1; + string requirement = 2; +} + +message VersionsResponse { + repeated string version = 1; +} diff --git a/registry/proto/resolver.proto b/registry/proto/resolver.proto deleted file mode 100644 index f1939044..00000000 --- a/registry/proto/resolver.proto +++ /dev/null @@ -1,22 +0,0 @@ -// (c) Copyright 2023 Helsing GmbH. All rights reserved. - -syntax = "proto3"; - -import "package.proto"; -import "dependency.proto"; - -package buffrs.resolver; - -service Resolver { - // Resolve a set of dependencies to packages - rpc Resolve(ResolveRequest) - returns (ResolveResponse); -} - -message ResolveRequest { - repeated buffrs.dependency.Dependency dependencies = 1; -} - -message ResolveResponse { - repeated buffrs.package.Package packages = 1; -} diff --git a/registry/proto/version.proto b/registry/proto/version.proto new file mode 100644 index 00000000..a21ee66f --- /dev/null +++ b/registry/proto/version.proto @@ -0,0 +1,15 @@ + +syntax = "proto3"; + +package buffrs.version; + +message VersionCore { + uint32 major = 1; + uint32 minor = 2; + uint32 patch = 3; +} + +message SemanticVersion { + VersionCore version = 1; + optional string prerelease = 2; +} diff --git a/registry/src/storage/mod.rs b/registry/src/api.rs similarity index 78% rename from registry/src/storage/mod.rs rename to registry/src/api.rs index 146bee7f..cb9ba743 100644 --- a/registry/src/storage/mod.rs +++ b/registry/src/api.rs @@ -11,3 +11,16 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +//! # Buffrs Registry API + +use crate::context::Context; + +mod grpc; + +impl Context { + /// Launch API + pub async fn launch_api(&self) { + todo!() + } +} diff --git a/registry/src/api/grpc.rs b/registry/src/api/grpc.rs new file mode 100644 index 00000000..262d13f5 --- /dev/null +++ b/registry/src/api/grpc.rs @@ -0,0 +1,47 @@ +// Copyright 2023 Helsing GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::{ + context::Context, + proto::buffrs::registry::{ + registry_server::Registry, DownloadRequest, DownloadResponse, PublishRequest, + PublishResponse, VersionsRequest, VersionsResponse, + }, +}; +use async_trait::async_trait; +use tonic::{Request, Response, Status}; + +#[async_trait] +impl Registry for Context { + async fn publish( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } + + async fn download( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } + + async fn versions( + &self, + _request: Request, + ) -> Result, Status> { + todo!() + } +} diff --git a/registry/src/context.rs b/registry/src/context.rs index 146bee7f..16fcca38 100644 --- a/registry/src/context.rs +++ b/registry/src/context.rs @@ -11,3 +11,30 @@ // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. + +//! Buffrs Registry Context +//! +//! This type holds the necessary context for a buffrs registry. It is used to implement the +//! various APIs that this registry provides. + +use crate::storage::AnyStorage; + +/// Context +/// +/// This contains all context needed for a buffrs registry. +#[derive(Clone, Debug)] +pub struct Context { + storage: AnyStorage, +} + +impl Context { + /// Create a new context from a metadata instance and a storage instance. + pub fn new(storage: AnyStorage) -> Self { + Self { storage } + } + + /// Get reference to the storage instance. + pub fn storage(&self) -> &AnyStorage { + &self.storage + } +} diff --git a/registry/src/lib.rs b/registry/src/lib.rs index 40159508..d3746ea1 100644 --- a/registry/src/lib.rs +++ b/registry/src/lib.rs @@ -12,9 +12,35 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! # Buffrs Registry +//! +//! This crate implements a registry for buffrs. The registry is responsible for publishing and +//! making available packages that are published. As such, it also performs authentication and +//! validation on uploaded packages. +//! +//! ## Dependencies +//! +//! It requires two stateful services: one to store metadata, which is expressed by the +//! [`Database`](db::Database) trait. Typically, this would be implemented using a Postgres +//! database, but the code is written in a way that other services can be plugged in instead. +//! +//! The other dependency it has is on a way to store package sources, which is expressed using the +//! [`Storage`](storage::Storage) traits. Typically, this is achieved using an S3 bucket, but other +//! storage mechanisms can be implemented. +//! +//! ## APIs +//! +//! Generally, talking to this registry is possible using a gRPC API, which is defined by the +//! protocol buffer definitions available in this repository and exported as the [`proto`] module. +//! +//! Additionally however, this registry also has some simple REST API endpoints which can be used +//! by simpler clients to access packages. It is not possible to publish packages using these +//! endpoints however. + +#![warn(missing_docs)] + +pub mod api; pub mod context; -pub mod db; -pub mod options; pub mod proto; -pub mod schema; pub mod storage; +pub mod types; diff --git a/registry/src/main.rs b/registry/src/main.rs index 1519c5c3..8c76ccb4 100644 --- a/registry/src/main.rs +++ b/registry/src/main.rs @@ -12,13 +12,16 @@ // See the License for the specific language governing permissions and // limitations under the License. -use buffrs_registry::{db::connect, options::Options}; use clap::Parser; +mod options; +use options::Options; + #[tokio::main] async fn main() -> eyre::Result<()> { - let config = Options::parse(); + let options = Options::parse(); tracing_subscriber::fmt::init(); - let _db = connect(config.database.as_str()).await.unwrap(); + let context = options.build().await?; + context.launch_api().await; Ok(()) } diff --git a/registry/src/options.rs b/registry/src/options.rs index 700abc47..cf447027 100644 --- a/registry/src/options.rs +++ b/registry/src/options.rs @@ -12,9 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. -use clap::Parser; -use std::net::SocketAddr; -use url::Url; +use buffrs_registry::{context::Context, storage::*}; +use clap::{Parser, ValueEnum}; +use eyre::Result; +use std::{net::SocketAddr, path::PathBuf, sync::Arc}; #[derive(Parser, Clone, Debug)] pub struct Options { @@ -22,8 +23,81 @@ pub struct Options { #[clap(long, short, env, default_value = "0.0.0.0:4367")] pub listen: SocketAddr, - /// URL of Postgres database to connect to. - #[clap(long, short, env)] - #[cfg_attr(dev, clap(default_value = "postgres://buffrs:buffrs@localhost"))] - pub database: Url, + /// Storage related options. + #[clap(flatten)] + pub storage: StorageOptions, +} + +impl Options { + pub async fn build(&self) -> Result { + let storage = self.storage.build().await?; + Ok(Context::new(storage)) + } +} + +#[cfg(feature = "storage-s3")] +const DEFAULT_STORAGE: &str = "s3"; +#[cfg(not(feature = "storage-s3"))] +const DEFAULT_STORAGE: &'static str = "filesystem"; + +/// Options for storage. +#[derive(Parser, Clone, Debug)] +pub struct StorageOptions { + /// Which storage backend to use. + #[clap(long, default_value = DEFAULT_STORAGE)] + pub storage: StorageKind, + + #[clap(flatten)] + pub filesystem: FilesystemStorageOptions, + + #[clap(flatten)] + #[cfg(feature = "storage-s3")] + pub s3: S3StorageOptions, +} + +#[derive(Parser, Clone, Debug)] +pub struct FilesystemStorageOptions { + /// Path to store packages at. + #[clap(long, required_if_eq("storage", "filesystem"))] + pub filesystem_storage: Option, +} + +#[derive(Parser, Clone, Debug)] +pub struct S3StorageOptions {} + +/// Kind of storage to use. +#[derive(ValueEnum, Clone, Debug)] +pub enum StorageKind { + Filesystem, + #[cfg(feature = "storage-s3")] + S3, +} + +impl FilesystemStorageOptions { + async fn build(&self) -> Result { + Ok(Filesystem::new(self.filesystem_storage.clone().unwrap())) + } +} + +#[cfg(feature = "storage-s3")] +impl S3StorageOptions { + async fn build(&self) -> Result { + todo!() + } +} + +impl StorageOptions { + fn wrap(&self, storage: S) -> Arc { + Arc::new(storage) + } + + pub async fn build(&self) -> Result> { + let storage = match self.storage { + StorageKind::Filesystem => self.wrap(self.filesystem.build().await?), + #[cfg(feature = "storage-s3")] + StorageKind::S3 => self.wrap(self.s3.build().await?), + }; + + Ok(storage) + } } diff --git a/registry/src/proto.rs b/registry/src/proto.rs index 0e762b75..b878f65d 100644 --- a/registry/src/proto.rs +++ b/registry/src/proto.rs @@ -12,4 +12,10 @@ // See the License for the specific language governing permissions and // limitations under the License. +//! Autogenerated protocol buffer types and services. +//! +//! The types in this module are autogenerated by buffrs. + +#![allow(missing_docs)] + ::buffrs::include!(); diff --git a/registry/src/schema.rs b/registry/src/schema.rs deleted file mode 100644 index 146bee7f..00000000 --- a/registry/src/schema.rs +++ /dev/null @@ -1,13 +0,0 @@ -// Copyright 2023 Helsing GmbH -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. diff --git a/registry/src/storage.rs b/registry/src/storage.rs new file mode 100644 index 00000000..7a80b4fb --- /dev/null +++ b/registry/src/storage.rs @@ -0,0 +1,107 @@ +// Copyright 2023 Helsing GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! # Package Storage trait and implementations. +//! +//! This module contains the [`Storage`] trait, which is used by the buffrs registry to store +//! packages in arbitrary places. Depending on the enabled features, this contains implementations +//! and layers that can be used by the registry. +//! +//! Currently, all package fetches will go through the registry. In the future, it might be +//! implemented to allow the package registry to redirect directly to a presigned URL for a storage +//! endpoint, such as a S3 bucket. + +use crate::types::PackageVersion; +use bytes::Bytes; +use std::{fmt, sync::Arc}; + +mod filesystem; +#[cfg(feature = "storage-s3")] +mod s3; + +pub use filesystem::Filesystem; +#[cfg(feature = "storage-s3")] +pub use s3::S3; + +/// Generic, shared error type. +/// +/// As the underlying error type used by the implementation is not known, this error type is used +/// to allow errors to be cached when appropriate. Using an [`Arc`] here allows the error to be +/// cloned and stored, while retaining as much information as possible. +pub type SharedError = Arc; + +/// Error putting a package into storage. +/// +/// This classifies the errors produced downstream according to their semantics. The only error we +/// really care about at the moment is the `PackageMissing` case, because that one has different +/// caching semantics than other errors. +#[derive(thiserror::Error, Debug, Clone)] +pub enum StorageError { + /// Package missing + #[error("package missing")] + PackageMissing(#[source] SharedError), + + /// Unknown error + #[error(transparent)] + Other(#[from] SharedError), +} + +/// Arbitrary storage instance. +pub type AnyStorage = Arc; + +/// # Storage for package sources +/// +/// This trait specifies a generic storage implementation for package sources. These will store the +/// compressed tarball containing the package sources. +/// +/// ## Error handling +/// +/// In general, errors are always passed through and never hidden. That is why there is a +/// [`PackageMissing`][StorageError::PackageMissing] error that is returned rather than the call +/// simply returning an [`Result>`]. This allows downstream users to inspect the +/// errors themselves if needed, and allows for more descriptive error logs. +/// +/// The underlying errors are stored as a [`SharedError`], which uses an [`Arc`] to allow errors to +/// be cloned. This allows for caching errors, where it makes sense. +/// +/// ## Put semantics +/// +/// The semantics of the [`package_put`](Storage::package_put) call are overwrite (rather than +/// error on existing package). That might be surprising, since packages are considered immutable +/// once published. But the justification for this is that we have a distributed system here, in +/// which the database is the leader. +/// +/// In case of an error during package publishing, the transaction that adds the package to the +/// database is not committed, which could result in a package being in storage but not in the +/// database. In that case, the user will retry publishing which should succeed. +/// +/// Having dirty data in a caching layer cannot happen because it could only be in the cache if it +/// ends up in the database. +/// +/// The database is responsible for avoiding concurrent package publishes that would result in race +/// conditions. Additionally, checksums and signatures are used to verify package sources. +#[async_trait::async_trait] +pub trait Storage: Send + Sync + fmt::Debug { + /// Write package to storage. + /// + /// In general, packages are immutable once stored. However, the semantics of this call are + /// those of overwrite. Refer to the documentation of the trait for more context. + async fn package_put(&self, version: &PackageVersion, data: &[u8]) -> Result<(), StorageError>; + + /// Get package from storage. + /// + /// If the package does not exist, this will return a [`StorageError::PackageMissing`]. This + /// call should only succeed once the package has been successfully written. + async fn package_get(&self, version: &PackageVersion) -> Result; +} diff --git a/registry/src/storage/filesystem.rs b/registry/src/storage/filesystem.rs new file mode 100644 index 00000000..4ce11ca3 --- /dev/null +++ b/registry/src/storage/filesystem.rs @@ -0,0 +1,140 @@ +// Copyright 2023 Helsing GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use super::*; +use std::{ + fmt::Debug, + io::ErrorKind, + path::{Path, PathBuf}, +}; +use tokio::{fs::OpenOptions, io::AsyncWriteExt}; + +/// Filesystem-backed storage for packages. +/// +/// This storage layer needs a root path, which should be a folder. It will store packages as files +/// in the root path, named after the package name and version. +/// +/// For example, if the root path is `/path/to/storage`, then a package might be stored at +/// `/path/to/storage/mypackage_0.1.5.tar.gz`. See [`Filesystem::package_path()`] for more +/// information. +/// +/// # Examples +/// +/// Create a new filesystem storage instance: +/// +/// ```rust +/// # use buffrs_registry::{types::PackageVersion, storage::Filesystem}; +/// # use std::path::Path; +/// let filesystem = Filesystem::new("/path/to/storage"); +/// +/// let package = PackageVersion { +/// package: "mypackage".parse().unwrap(), +/// version: "0.1.5".parse().unwrap(), +/// }; +/// +/// assert_eq!(filesystem.package_path(&package), Path::new("/path/to/storage/mypackage_0.1.5.tar.gz")); +/// ``` +#[derive(Clone, Debug)] +pub struct Filesystem = PathBuf> { + path: P, +} + +/// Error interacting with the filesystem. +/// +/// This error adds some context to the underlying [`std::io::Error`], such as the path that was +/// being written to. +#[derive(thiserror::Error, Debug)] +#[error("error writing to {path:?}")] +pub struct FilesystemError { + /// Path that was being written to or read from. + path: PathBuf, + /// Error that occurred. + #[source] + error: std::io::Error, +} + +impl> Filesystem

{ + /// Create new Filesystem storage instance. + pub fn new(path: P) -> Self { + Self { path } + } + + /// Get the base path of this filesystem storage instance. + pub fn path(&self) -> &Path { + self.path.as_ref() + } + + /// Get the full path of a package version. + /// + /// Uses [`PackageVersion::file_name()`] to determine the file name. + pub fn package_path(&self, version: &PackageVersion) -> PathBuf { + self.path().join(version.file_name()) + } + + async fn do_package_put( + &self, + version: &PackageVersion, + data: &[u8], + ) -> Result<(), FilesystemError> { + let path = self.package_path(version); + let mut file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&path) + .await + .map_err(|error| FilesystemError { + path: path.clone(), + error, + })?; + file.write_all(data) + .await + .map_err(|error| FilesystemError { + path: path.clone(), + error, + })?; + file.flush() + .await + .map_err(|error| FilesystemError { path, error })?; + Ok(()) + } + + async fn do_package_get(&self, version: &PackageVersion) -> Result { + let path = self.package_path(version); + tokio::fs::read(&path) + .await + .map(Into::into) + .map_err(|error| FilesystemError { path, error }) + } +} + +#[async_trait::async_trait] +impl + Send + Sync + Debug> Storage for Filesystem

{ + async fn package_put(&self, version: &PackageVersion, data: &[u8]) -> Result<(), StorageError> { + self.do_package_put(version, data) + .await + .map_err(|error| StorageError::Other(Arc::new(error))) + } + + async fn package_get(&self, version: &PackageVersion) -> Result { + let result = self.do_package_get(version).await; + match result { + Ok(bytes) => Ok(bytes), + Err(error) if error.error.kind() == ErrorKind::NotFound => { + Err(StorageError::PackageMissing(Arc::new(error))) + } + Err(error) => Err(StorageError::Other(Arc::new(error))), + } + } +} diff --git a/registry/src/storage/s3.rs b/registry/src/storage/s3.rs new file mode 100644 index 00000000..f807068c --- /dev/null +++ b/registry/src/storage/s3.rs @@ -0,0 +1,100 @@ +// Copyright 2023 Helsing GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use crate::storage::{Storage, StorageError}; +use crate::types::PackageVersion; +use aws_sdk_s3::{ + error::SdkError, + operation::get_object::GetObjectError, + primitives::{ByteStream, SdkBody}, + Client, +}; +use bytes::Bytes; +use std::sync::Arc; + +/// # S3-backed package storage. +/// +/// This storage implementation keeps packages in an S3 bucket using the `aws_sdk` crate. The +/// packages are named similar to how they are named in the filesystem. +/// +/// For example, a package named `mypackage` with version `0.1.5` would be stored as +/// `mypackage_0.1.5.tar.gz` in the bucket. +#[derive(Clone, Debug)] +pub struct S3 { + client: Client, + bucket: String, +} + +impl S3 { + /// Create new instance given an S3 [`Client`] and a bucket name. + pub fn new(client: Client, bucket: String) -> Self { + Self { client, bucket } + } + + /// Get reference to the S3 client being used. + pub fn client(&self) -> &Client { + &self.client + } + + /// Get reference to the name of the bucket that this instance writes to. + pub fn bucket(&self) -> &str { + &self.bucket + } +} + +#[async_trait::async_trait] +impl Storage for S3 { + async fn package_put(&self, version: &PackageVersion, data: &[u8]) -> Result<(), StorageError> { + self.client + .put_object() + .bucket(&self.bucket) + .key(version.file_name()) + .body(ByteStream::new(SdkBody::from(data))) + .send() + .await + .map(|_| ()) + .map_err(|error| StorageError::Other(Arc::new(error))) + } + + async fn package_get(&self, version: &PackageVersion) -> Result { + let response = self + .client + .get_object() + .bucket(&self.bucket) + .key(version.file_name()) + .send() + .await; + + // determine if this is a no such key error and translate into package missing + if let Err(SdkError::ServiceError(error)) = &response { + if let GetObjectError::NoSuchKey(error) = error.err() { + return Err(StorageError::PackageMissing(Arc::new(error.clone()))); + } + } + + // return other errors as-is + let response = match response { + Ok(response) => response, + Err(error) => return Err(StorageError::Other(Arc::new(error))), + }; + + // collect response + response + .body + .collect() + .await + .map_err(|error| StorageError::Other(Arc::new(error))) + .map(|data| data.into_bytes()) + } +} diff --git a/registry/src/db.rs b/registry/src/types.rs similarity index 53% rename from registry/src/db.rs rename to registry/src/types.rs index 7f3cd6f6..1a3ac58c 100644 --- a/registry/src/db.rs +++ b/registry/src/types.rs @@ -12,12 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. -use sqlx::PgPool; +//! # Shared type definitions -pub async fn connect(string: &str) -> eyre::Result { - let pool = PgPool::connect(string).await?; +use buffrs::package::PackageName; +use semver::Version; - sqlx::migrate!().run(&pool).await?; +/// Represents a Buffrs package version +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct PackageVersion { + /// Package name + pub package: PackageName, - Ok(pool) + /// Package version + pub version: Version, +} + +impl PackageVersion { + /// Determine the file name of a package. + pub fn file_name(&self) -> String { + let Self { package, version } = &self; + format!("{package}_{version}.tar.gz") + } } diff --git a/registry/tests/storage/filesystem.rs b/registry/tests/storage/filesystem.rs new file mode 100644 index 00000000..d8d532e6 --- /dev/null +++ b/registry/tests/storage/filesystem.rs @@ -0,0 +1,88 @@ +//! Unit tests for [`Filesystem`]. +//! +//! These test verify that the filesystem storage layer is implemented correctly. Every single +//! test uses a new temporary filesystem location created by [`temp_filesystem`] to ensure that +//! tests do not interfere with each other. Every single test performs some setup using manual +//! filesystem interactions, run at most one method under test, and verify the outputs and the +//! filesystem side effects. + +use super::*; + +use std::error::Error; +use tempfile::TempDir; + +/// Create a temporary filesystem storage. +pub async fn temp_filesystem() -> (Filesystem, Cleanup) { + let dir = TempDir::new().unwrap(); + let storage = Filesystem::new(dir.path().to_path_buf()); + let cleanup = async move { + dir.close().unwrap(); + }; + (storage, Box::pin(cleanup)) +} + +#[proptest(async = "tokio")] +async fn can_write_package( + #[strategy(package_version())] version: PackageVersion, + contents: Vec, +) { + with(temp_filesystem, |storage| async move { + storage.package_put(&version, &contents).await.unwrap(); + + let path = storage.path().join(version.file_name()); + let found = tokio::fs::read(&path).await.unwrap(); + assert_eq!(found, contents); + }) + .await; +} + +#[proptest(async = "tokio")] +async fn can_write_package_existing( + #[strategy(package_version())] version: PackageVersion, + previous: Vec, + contents: Vec, +) { + with(temp_filesystem, |storage| async move { + let path = storage.path().join(version.file_name()); + tokio::fs::write(&path, &previous).await.unwrap(); + + storage.package_put(&version, &contents).await.unwrap(); + + let found = tokio::fs::read(&path).await.unwrap(); + assert_eq!(found, contents); + }) + .await; +} + +#[proptest(async = "tokio")] +async fn cannot_read_package_missing(#[strategy(package_version())] version: PackageVersion) { + with(temp_filesystem, |storage| async move { + let path = storage.path().join(version.file_name()); + + let error = storage.package_get(&version).await.err().unwrap(); + + assert!(matches!(error, StorageError::PackageMissing(_))); + assert_eq!(error.to_string(), format!("package missing")); + assert_eq!( + error.source().unwrap().to_string(), + format!("error writing to {path:?}") + ); + }) + .await; +} + +#[proptest(async = "tokio")] +async fn can_read_package( + #[strategy(package_version())] version: PackageVersion, + contents: Vec, +) { + with(temp_filesystem, |storage| async move { + let path = storage.path().join(version.file_name()); + tokio::fs::write(&path, &contents).await.unwrap(); + + let found = storage.package_get(&version).await.unwrap(); + + assert_eq!(&found[..], &contents); + }) + .await; +} diff --git a/registry/tests/storage/main.rs b/registry/tests/storage/main.rs new file mode 100644 index 00000000..90c4725b --- /dev/null +++ b/registry/tests/storage/main.rs @@ -0,0 +1,159 @@ +// Copyright 2023 Helsing GmbH +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use buffrs_registry::{storage::*, types::PackageVersion}; +use proptest::prop_compose; +use std::{future::Future, pin::Pin, sync::Arc}; +use test_strategy::{proptest, Arbitrary}; + +mod filesystem; +#[cfg(feature = "storage-s3")] +mod s3; + +/// Generic future used for cleanup tasks. +type Cleanup = Pin>>; + +/// Run a closure with a temporary instance and run cleanup afterwards. +async fn with< + S: Storage, + O1: Future, + F1: Fn() -> O1, + O2: Future, + F2: FnOnce(S) -> O2, +>( + function: F1, + closure: F2, +) { + let (storage, cleanup) = function().await; + closure(storage).await; + cleanup.await; +} + +/// Create temporary instances of a storage backend. If the cache `feature` is enabled, this will +/// create an additional cached instance. +async fn create_temp_instances< + S: Storage + 'static, + O: Future, + F: Fn() -> O, +>( + storages: &mut Vec, + cleanups: &mut Vec, + function: F, +) { + // create instance + let (storage, cleanup) = function().await; + storages.push(Arc::new(storage)); + cleanups.push(cleanup); +} + +/// Create temporary instances of all storage providers. +async fn temp_instances() -> (Vec, Cleanup) { + let mut storage: Vec = vec![]; + let mut cleanup: Vec = vec![]; + + // create filesystem instances + create_temp_instances(&mut storage, &mut cleanup, filesystem::temp_filesystem).await; + + // create s3 instances, if enabled. + #[cfg(feature = "storage-s3")] + create_temp_instances(&mut storage, &mut cleanup, s3::temp_s3).await; + + let cleanup = Box::pin(async move { + for c in cleanup.into_iter() { + c.await; + } + }); + + (storage, cleanup) +} + +use buffrs::package::PackageName; +use semver::{BuildMetadata, Prerelease, Version}; + +prop_compose! { + fn package_name()(name in "[a-z][a-z0-9-]{0,127}") -> PackageName { + name.try_into().unwrap() + } +} + +prop_compose! { + fn semver_version()(major: u64, minor: u64, patch: u64) -> Version { + Version { + minor, + major, + patch, + pre: Prerelease::EMPTY, + build: BuildMetadata::EMPTY, + } + } +} + +prop_compose! { + fn package_version()( + package in package_name(), + version in semver_version() + ) -> PackageVersion { + PackageVersion { + package, + version + } + } +} + +#[proptest(async = "tokio", cases = 10)] +async fn can_package_put(#[strategy(package_version())] version: PackageVersion, bytes: Vec) { + let (instances, cleanup) = temp_instances().await; + + for storage in instances { + println!("Testing {storage:?}"); + + let result = storage.package_get(&version).await; + assert!(matches!(result, Err(StorageError::PackageMissing(_)))); + + storage.package_put(&version, &bytes).await.unwrap(); + + let result = storage.package_get(&version).await.unwrap(); + assert_eq!(result, bytes); + } + + cleanup.await; +} + +#[derive(Arbitrary, Debug)] +struct PackageContents { + #[strategy(package_version())] + version: PackageVersion, + bytes: Vec, +} + +#[proptest(async = "tokio", cases = 10)] +async fn can_package_put_many(packages: Vec) { + let (instances, cleanup) = temp_instances().await; + + for storage in instances { + println!("Testing {storage:?}"); + + for package in &packages { + storage + .package_put(&package.version, &package.bytes) + .await + .unwrap(); + + let result = storage.package_get(&package.version).await.unwrap(); + assert_eq!(result, package.bytes); + } + } + + cleanup.await; +} diff --git a/registry/tests/storage/s3.rs b/registry/tests/storage/s3.rs new file mode 100644 index 00000000..eef8e0d2 --- /dev/null +++ b/registry/tests/storage/s3.rs @@ -0,0 +1,178 @@ +//! Unit tests for [`S3`]. +//! +//! These test verify that the S3 storage layer is implemented correctly. Every single test +//! uses a new temporary bucket created by [`temp_s3`] to ensure that tests do not interfere +//! with each other. Every single test performs some setup using manual bucket interactions, +//! run at most one method under test, and verify the outputs and the bucket side effects. + +use super::*; +use aws_credential_types::Credentials; +use aws_sdk_s3::{ + primitives::{ByteStream, SdkBody}, + types::*, + Client, +}; + +use rand::{thread_rng, Rng}; +use std::error::Error; + +/// Generate random name for a bucket. +fn random_bucket() -> String { + let mut rng = thread_rng(); + (0..16).map(|_| rng.gen_range('a'..='z')).collect() +} + +/// Generate a client with test credentials. +async fn minio_client() -> Client { + let credentials = Credentials::from_keys("buffrs", "password", None); + let config = aws_config::from_env() + .endpoint_url("http://localhost:9000") + .region("us-east-1") + .credentials_provider(credentials) + .load() + .await; + + Client::new(&config) +} + +/// Delete bucket. +async fn delete_bucket(client: Client, bucket: String) { + let objects = client + .list_objects_v2() + .bucket(&bucket) + .send() + .await + .unwrap(); + + let mut delete_objects: Vec = vec![]; + for obj in objects.contents().iter().flat_map(|i| i.iter()) { + let obj_id = ObjectIdentifier::builder() + .set_key(Some(obj.key().unwrap().to_string())) + .build(); + delete_objects.push(obj_id); + } + + if !delete_objects.is_empty() { + client + .delete_objects() + .bucket(&bucket) + .delete(Delete::builder().set_objects(Some(delete_objects)).build()) + .send() + .await + .unwrap(); + } + + client.delete_bucket().bucket(bucket).send().await.unwrap(); +} + +/// Create test client for S3. +pub async fn temp_s3() -> (S3, Cleanup) { + let client = minio_client().await; + let bucket = random_bucket(); + client.create_bucket().bucket(&bucket).send().await.unwrap(); + let s3 = S3::new(client.clone(), bucket.clone()); + (s3, Box::pin(delete_bucket(client, bucket))) +} + +#[proptest(async = "tokio")] +async fn can_write_package( + #[strategy(package_version())] version: PackageVersion, + contents: Vec, +) { + with(temp_s3, |storage| async move { + // write package using trait + storage.package_put(&version, &contents).await.unwrap(); + + // verify manually that it is there. + let response = storage + .client() + .get_object() + .bucket(storage.bucket()) + .key(version.file_name()) + .send() + .await + .unwrap(); + let data = response.body.collect().await.unwrap().into_bytes(); + assert_eq!(contents, data); + }) + .await; +} + +#[proptest(async = "tokio")] +async fn can_write_package_existing( + #[strategy(package_version())] version: PackageVersion, + previous: Vec, + contents: Vec, +) { + with(temp_s3, |storage| async move { + // put an object into storage manually + storage + .client() + .put_object() + .bucket(storage.bucket()) + .key(version.file_name()) + .body(ByteStream::new(SdkBody::from(previous))) + .send() + .await + .unwrap(); + + // overwrite it using trait + storage.package_put(&version, &contents).await.unwrap(); + + // check that it was overwritten + let response = storage + .client() + .get_object() + .bucket(storage.bucket()) + .key(version.file_name()) + .send() + .await + .unwrap(); + let data = response.body.collect().await.unwrap().into_bytes(); + assert_eq!(contents, data); + }) + .await; +} + +#[proptest(async = "tokio")] +async fn cannot_read_package_missing(#[strategy(package_version())] version: PackageVersion) { + with(temp_s3, |storage| async move { + // read a non-existing package + let error = storage.package_get(&version).await.err().unwrap(); + + // ensure we get the right error and a cause + assert!(matches!(error, StorageError::PackageMissing(_))); + let error = error.source().unwrap(); + assert_eq!( + error.to_string(), + "NoSuchKey: The specified key does not exist." + ); + }) + .await; +} + +#[proptest(async = "tokio")] +async fn can_read_package( + #[strategy(package_version())] version: PackageVersion, + contents: Vec, +) { + with(temp_s3, |storage| async move { + // put an object into storage manually + storage + .client() + .put_object() + .bucket(storage.bucket()) + .key(version.file_name()) + .body(ByteStream::new(SdkBody::from(&contents[..]))) + .send() + .await + .unwrap(); + + // read a package using trait + let found = storage.package_get(&version).await.unwrap(); + + // verify it was what we had written + assert_eq!(&found[..], &contents); + }) + .await; +} diff --git a/tests/cmd/package/out/lib-0.0.1.tgz b/tests/cmd/package/out/lib-0.0.1.tgz index d59a51ac73cf29dc314a6f6c3719f4087fb37cd4..21dc55ed1262652dc226b80d52086f7e7c20848d 100644 GIT binary patch literal 128 zcmWN?K@!3s3;@78uiyig5)(-K8weoGsC0z(;OliSd*#n){jKYq$JmW|w0V1$vHY)R zT%^CuII@__s@{?wH2}GNiw?J9i(4ZKC4`dKL_yp}jO3UtMe8B3kCd|&0P(RD&rHa7 M3~ID*)<9PM0FW0ZwEzGB literal 251 zcmV8wxC5;S?tV+@vW+h4T^|=$V?6tPsI*T(#I&sj3D@w_qn%8`_ z=UZHWIoC+XG$@N@0D@QV;fP8XQn;CgEQ|tx`5J2!YZR*j^#X|3QyVD$5QY5z27_uuIMS^U)PjnC#E_W~ci=XqY=-2ld975e}X006#Y Bd{zJe