Skip to content

Commit

Permalink
chore: Release pipeline (#97)
Browse files Browse the repository at this point in the history
Signed-off-by: Sreekanth <[email protected]>
  • Loading branch information
BulkBeing authored Oct 10, 2024
1 parent 0870af9 commit aa3fa7d
Show file tree
Hide file tree
Showing 24 changed files with 3,738 additions and 87 deletions.
89 changes: 52 additions & 37 deletions .github/workflows/ci.yaml
Original file line number Diff line number Diff line change
@@ -1,54 +1,69 @@
name: Rust
name: Tests

on:
push:
branches: [ main ]
branches:
- main
- release-*
pull_request:
branches: [ main ]
branches:
- main

defaults:
run:
shell: bash

jobs:
build:
name: Unit Tests
name: Clippy and Unit Tests
runs-on: ubuntu-24.04
timeout-minutes: 10
steps:
- name: Check out code
uses: actions/checkout@v2
uses: actions/checkout@v4

- name: Set up Rust
uses: actions-rs/toolchain@v1
- name: Install Rust
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
toolchain: stable
override: true
cache-workspaces: |
. -> target
./examples/batchmap-cat -> target
./examples/batchmap-flatmap -> target
./examples/map-cat -> target
./examples/map-tickgen-serde -> target
./examples/mapt-event-time-filter -> target
./examples/reduce-counter -> target
./examples/sideinput -> target
./examples/sideinput/udf -> target
./examples/simple-source -> target
./examples/sink-log -> target
./examples/source-transformer-now -> target
- name: Cache Cargo dependencies
uses: actions/cache@v2
with:
path: |
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
target/
key: ${{ runner.os }}-cargo-${{ hashFiles('**/Cargo.lock') }}

- name: Install protoc (for Protocol Buffers)
- name: Configure sccache
run: |
set -eux -o pipefail
PROTOC_VERSION=3.19.4
PROTOC_ZIP=protoc-$PROTOC_VERSION-linux-x86_64.zip
curl -OL https://github.com/protocolbuffers/protobuf/releases/download/v$PROTOC_VERSION/$PROTOC_ZIP
sudo unzip -o $PROTOC_ZIP -d /usr/local bin/protoc
sudo unzip -o $PROTOC_ZIP -d /usr/local 'include/*'
sudo chmod +x /usr/local/bin/protoc
sudo find /usr/local/include -type f | xargs sudo chmod a+r
sudo find /usr/local/include -type d | xargs sudo chmod a+rx
rm -f $PROTOC_ZIP
- name: Build
run: cargo build --verbose
echo "RUSTC_WRAPPER=sccache" >> $GITHUB_ENV
echo "SCCACHE_GHA_ENABLED=true" >> $GITHUB_ENV
- name: Run sccache-cache
uses: mozilla-actions/[email protected]

- name: Install dependencies
run: sudo apt-get install -y protobuf-compiler

- name: Code Generation
run: make codegen

- name: Ensure generated code is checked in
run: git diff --exit-code

- name: Lint
run: make lint

- name: Run tests
run: cargo test --verbose
run: make test

- name: Documentation generation
run: RUSTFLAGS="-D warnings" cargo doc

- name: Check formatting
run: cargo fmt --all -- --check
- name: Dry run Cargo Publish
run: cargo publish --dry-run
46 changes: 46 additions & 0 deletions .github/workflows/release.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: Release

on:
release:
types: [ created ]

defaults:
run:
shell: bash

jobs:
build:
name: Publish to crates.io
runs-on: ubuntu-24.04
timeout-minutes: 12
# run workflow only on numaproj/numaflow-rs repository
if: ${{ github.repository }} == "numaproj/numaflow-rs"
steps:
- name: Check out code
uses: actions/checkout@v4

- name: Install Rust
uses: actions-rust-lang/setup-rust-toolchain@v1
with:
cache: false

- name: Install dependencies
run: sudo apt-get install -y protobuf-compiler

- name: Code Generation
run: make codegen

- name: Ensure generated code is checked in
run: git diff --exit-code

- name: Lint
run: make lint

- name: Run tests
run: make test

- name: Documentation generation
run: RUSTFLAGS="-D warnings" cargo doc

- name: Dry run Cargo Publish
run: CARGO_REGISTRY_TOKEN=${{ secrets.CARGO_PUBLISH }} cargo publish
15 changes: 11 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,23 @@ homepage = "https://numaproj.github.io/numaflow/"
repository = "https://github.com/numaproj/numaflow-rs"
keywords = ["numaflow", "streaming", "messaging", "event-driven"]
categories = ["network-programming", "api-bindings"]
exclude = [
".github/*",
".gitignore",
".dockerignore",
"hack/*",
"Makefile",
]


[lib]
name = "numaflow"
path = "src/lib.rs"

[dependencies]
tonic = "0.12.2"
prost = "0.13.2"
prost-types = "0.13.2"
tonic = "0.12.3"
prost = "0.13.3"
prost-types = "0.13.3"
tokio = { version = "1.0", features = ["macros", "rt-multi-thread", "signal"] }
tokio-util = "0.7.12"
tokio-stream = { version = "0.1.16", features = ["net"] }
Expand All @@ -34,7 +41,7 @@ thiserror = "1.0"
hyper-util = "0.1.7"

[build-dependencies]
tonic-build = "0.12.2"
tonic-build = "0.12.3"

[dev-dependencies]
tempfile = "3.9.0"
Expand Down
28 changes: 26 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,39 @@
# perform a cargo fmt on all directories containing a Cargo.toml file
.PHONY: lint
# find all directories containing Cargo.toml files
DIRS := $(shell find . -type f -name Cargo.toml -exec dirname {} \; | sort -u)
DIRS := $(shell find . -type f -name Cargo.toml -not -path "./target/*" -exec dirname {} \; | sort -u)
$(info Included directories: $(DIRS))
lint:
fmt:
@for dir in $(DIRS); do \
echo "Formatting code in $$dir"; \
cargo fmt --all --manifest-path "$$dir/Cargo.toml"; \
done

# Check if all files are formatted and run clippy on all directories containing a Cargo.toml file
.PHONY: lint
lint: test-fmt clippy

.PHONY: test-fmt
test-fmt:
@for dir in $(DIRS); do \
echo "Checking if code is formatted in directory: $$dir"; \
cargo fmt --all --check --manifest-path "$$dir/Cargo.toml" || { echo "Code is not formatted in $$dir"; exit 1; }; \
done

.PHONY: clippy
clippy:
@for dir in $(DIRS); do \
echo "Running clippy in directory: $$dir"; \
cargo clippy --workspace --manifest-path "$$dir/Cargo.toml" -- -D warnings || { echo "Clippy warnings/errors found in $$dir"; exit 1; }; \
done

# run cargo test on the repository root
.PHONY: test
test:
cargo test --workspace

.PHONY: codegen
codegen:
# Change timestamps so that tonic_build code generation will always be triggered.
touch proto/*
PROTO_CODE_GEN=1 cargo build
8 changes: 7 additions & 1 deletion build.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
use std::env;

fn main() {
if env::var("PROTO_CODE_GEN").unwrap_or("0".to_string()) != "1" {
return;
}
tonic_build::configure()
.build_server(true)
.compile(
.out_dir("src/servers")
.compile_protos(
&[
"proto/source.proto",
"proto/sourcetransform.proto",
Expand Down
2 changes: 1 addition & 1 deletion examples/reduce-counter/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ mod counter {
&self,
keys: Vec<String>,
mut input: Receiver<ReduceRequest>,
md: &Metadata,
_md: &Metadata,
) -> Vec<Message> {
let mut counter = 0;
// the loop exits when input is closed which will happen only on close of book.
Expand Down
5 changes: 4 additions & 1 deletion examples/source-transformer-now/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ impl sourcetransform::SourceTransformer for NowCat {
&self,
input: sourcetransform::SourceTransformRequest,
) -> Vec<sourcetransform::Message> {
vec![sourcetransform::Message::new(input.value, chrono::offset::Utc::now()).keys(input.keys.clone())]
vec![
sourcetransform::Message::new(input.value, chrono::offset::Utc::now())
.keys(input.keys.clone()),
]
}
}
7 changes: 2 additions & 5 deletions src/batchmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,13 @@ use crate::batchmap::proto::batch_map_server::BatchMap;
use crate::error::Error;
use crate::error::Error::BatchMapError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
use crate::servers::batchmap as proto;
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_MAX_MESSAGE_SIZE: usize = 64 * 1024 * 1024;
const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/batchmap.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
const DROP: &str = "U+005C__DROP__";
/// Numaflow Batch Map Proto definitions.
pub mod proto {
tonic::include_proto!("batchmap.v1");
}

struct BatchMapService<T: BatchMapper> {
handler: Arc<T>,
Expand Down Expand Up @@ -106,7 +103,7 @@ impl From<proto::BatchMapRequest> for Datum {
}
}
}
/// Message is the response struct from the [`Mapper::map`] .
/// Message is the response struct from the [`Mapper::map`][`crate::map::Mapper::map`] .
#[derive(Debug, PartialEq)]
pub struct Message {
/// Keys are a collection of strings which will be passed on to the next vertex as is. It can
Expand Down
2 changes: 2 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ pub mod sideinput;
/// batchmap is for writing the [batch map mode](https://numaflow.numaproj.io/user-guide/user-defined-functions/map/batchmap/) handlers.
pub mod batchmap;

mod servers;

// Error handling on Numaflow SDKs!
//
// Any non-recoverable error will cause the process to shutdown with a non-zero exit status. All errors are non-recoverable.
Expand Down
7 changes: 1 addition & 6 deletions src/map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use tonic::{async_trait, Request, Response, Status, Streaming};
use tracing::{error, info};

use crate::error::{Error, ErrorKind};
use crate::map::proto::MapResponse;
use crate::servers::map::{self as proto, MapResponse};
use crate::shared::{self, shutdown_signal, ContainerType};

const DEFAULT_CHANNEL_SIZE: usize = 1000;
Expand All @@ -21,11 +21,6 @@ const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/map.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/mapper-server-info";
const DROP: &str = "U+005C__DROP__";

/// Numaflow Map Proto definitions.
pub mod proto {
tonic::include_proto!("map.v1");
}

struct MapService<T> {
handler: Arc<T>,
shutdown_tx: mpsc::Sender<()>,
Expand Down
10 changes: 3 additions & 7 deletions src/reduce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use tonic::{async_trait, Request, Response, Status};
use crate::error::Error;
use crate::error::Error::ReduceError;
use crate::error::ErrorKind::{InternalError, UserDefinedError};
pub use crate::servers::reduce as proto;
use crate::shared::{self, prost_timestamp_from_utc, ContainerType};

const KEY_JOIN_DELIMITER: &str = ":";
Expand All @@ -22,11 +23,6 @@ const DEFAULT_SOCK_ADDR: &str = "/var/run/numaflow/reduce.sock";
const DEFAULT_SERVER_INFO_FILE: &str = "/var/run/numaflow/reducer-server-info";
const DROP: &str = "U+005C__DROP__";

/// Numaflow Reduce Proto definitions.
pub mod proto {
tonic::include_proto!("reduce.v1");
}

struct ReduceService<C> {
creator: Arc<C>,
shutdown_tx: Sender<()>,
Expand Down Expand Up @@ -80,7 +76,7 @@ pub trait ReducerCreator {
/// Reducer trait for implementing Reduce handler.
#[async_trait]
pub trait Reducer {
/// reduce_handle is provided with a set of keys, a channel of [`Datum`], and [`Metadata`]. It
/// reduce_handle is provided with a set of keys, a channel of [`ReduceRequest`], and [`Metadata`]. It
/// returns 0, 1, or more results as a [`Vec`] of [`Message`]. Reduce is a stateful operation and
/// the channel is for the collection of keys and for that time [Window].
/// You can read more about reduce [here](https://numaflow.numaproj.io/user-guide/user-defined-functions/reduce/reduce/).
Expand All @@ -103,7 +99,7 @@ pub trait Reducer {
/// use numaflow::reduce::{Reducer, Metadata};
/// use tokio::sync::mpsc::Receiver;
/// use tonic::async_trait;
/// use numaflow::reduce::proto::reduce_server::Reduce;
/// use numaflow::reduce::proto::reduce_server::Reduce;
/// pub(crate) struct Counter {}
///
/// pub(crate) struct CounterCreator {}
Expand Down
27 changes: 27 additions & 0 deletions src/servers.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#[path = "servers/batchmap.v1.rs"]
#[rustfmt::skip]
pub mod batchmap;

#[path = "servers/map.v1.rs"]
#[rustfmt::skip]
pub mod map;

#[path = "servers/reduce.v1.rs"]
#[rustfmt::skip]
pub mod reduce;

#[path = "servers/sideinput.v1.rs"]
#[rustfmt::skip]
pub mod sideinput;

#[path = "servers/sink.v1.rs"]
#[rustfmt::skip]
pub mod sink;

#[path = "servers/source.v1.rs"]
#[rustfmt::skip]
pub mod source;

#[path = "servers/sourcetransformer.v1.rs"]
#[rustfmt::skip]
pub mod sourcetransformer;
Loading

0 comments on commit aa3fa7d

Please sign in to comment.