Skip to content

Commit

Permalink
feat: high level api for connecting with a specific profile/cluster (#…
Browse files Browse the repository at this point in the history
…4207)

Co-authored-by: Luis Moreno <[email protected]>
  • Loading branch information
morenol and morenol authored Oct 9, 2024
1 parent 2d0d5d8 commit 313e024
Show file tree
Hide file tree
Showing 8 changed files with 58 additions and 38 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion crates/fluvio-extension-common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-extension-common"
version = "0.14.2"
version = "0.14.3"
edition = "2021"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Fluvio extension common"
Expand Down
12 changes: 3 additions & 9 deletions crates/fluvio-extension-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,15 +154,9 @@ pub mod target {
.into());
}

let config_file = ConfigFile::load(None)?;
let cluster = config_file
.config()
// NOTE: This will not fallback to current cluster like it did before
// Current cluster will be used when no profile is given.
.cluster_with_profile(&profile)
.ok_or_else(|| {
IoError::new(ErrorKind::Other, "Cluster not found for profile")
})?;
let cluster = FluvioConfig::load_with_profile(&profile)?.ok_or_else(|| {
IoError::new(ErrorKind::Other, "Cluster not found for profile")
})?;
Ok(cluster.clone())
}
(None, Some(cluster)) => {
Expand Down
2 changes: 1 addition & 1 deletion crates/fluvio-socket/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "fluvio-socket"
version = "0.14.10"
version = "0.14.11"
edition = "2021"
authors = ["Fluvio Contributors <[email protected]>"]
description = "Provide TCP socket wrapper for fluvio protocol"
Expand Down
8 changes: 4 additions & 4 deletions crates/fluvio-socket/src/multiplexing.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,21 +17,21 @@ use async_channel::bounded;
use async_channel::Receiver;
use async_channel::Sender;
use async_lock::Mutex;
use bytes::{Bytes};
use bytes::Bytes;
use event_listener::Event;
use fluvio_future::net::ConnectionFd;
use futures_util::ready;
use futures_util::stream::{Stream, StreamExt};
use pin_project::{pin_project, pinned_drop};
use tokio::select;
use tracing::{info, warn};
use tracing::{debug, error, trace, instrument};

use fluvio_future::net::ConnectionFd;
use fluvio_future::timer::sleep;
use futures_util::ready;
use fluvio_protocol::api::Request;
use fluvio_protocol::api::RequestHeader;
use fluvio_protocol::api::RequestMessage;
use fluvio_protocol::{Decoder};
use fluvio_protocol::Decoder;

use crate::SocketError;
use crate::ExclusiveFlvSink;
Expand Down
8 changes: 8 additions & 0 deletions crates/fluvio/src/config/cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,14 @@ impl FluvioConfig {
Ok(cluster_config.to_owned())
}

/// get cluster config from profile
/// if profile is not found, return None
pub fn load_with_profile(profile_name: &str) -> Result<Option<Self>, FluvioError> {
let config_file = ConfigFile::load_default_or_new()?;
let cluster_config = config_file.config().cluster_with_profile(profile_name);
Ok(cluster_config.cloned())
}

/// Create a new cluster configuration with no TLS.
pub fn new(addr: impl Into<String>) -> Self {
Self {
Expand Down
6 changes: 6 additions & 0 deletions crates/fluvio/src/config/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -503,6 +503,12 @@ pub mod test {

let cluster = config.current_cluster().expect("cluster should exist");
assert_eq!(cluster.endpoint, "127.0.0.1:9003");
// access from profile
config
.cluster_with_profile("local")
.expect("cluster should exists");
// access from cluster
config.cluster("local").expect("cluster should exists");
}

#[test]
Expand Down
50 changes: 31 additions & 19 deletions crates/fluvio/src/fluvio.rs
Original file line number Diff line number Diff line change
@@ -1,39 +1,31 @@
use std::convert::TryFrom;
use std::sync::Arc;

use fluvio_sc_schema::partition::PartitionMirrorConfig;
use fluvio_sc_schema::topic::MirrorConfig;
use fluvio_sc_schema::topic::PartitionMap;
use fluvio_sc_schema::topic::ReplicaSpec;
use tracing::{debug, info};
use anyhow::{Context, Result};
use semver::Version;
use tokio::sync::OnceCell;
use anyhow::Result;
use tracing::{debug, info};

use fluvio_future::net::DomainConnector;
use fluvio_sc_schema::partition::PartitionMirrorConfig;
use fluvio_sc_schema::topic::{MirrorConfig, PartitionMap, ReplicaSpec};
use fluvio_sc_schema::objects::ObjectApiWatchRequest;
use fluvio_types::PartitionId;
use fluvio_socket::{
ClientConfig, Versions, VersionedSerialSocket, SharedMultiplexerSocket, MultiplexerSocket,
};
use fluvio_future::net::DomainConnector;
use semver::Version;

use crate::admin::FluvioAdmin;
use crate::error::anyhow_version_error;
use crate::producer::TopicProducerPool;
use crate::spu::SpuPool;
use crate::TopicProducer;
use crate::PartitionConsumer;

use crate::FluvioError;
use crate::FluvioConfig;
use crate::consumer::{MultiplePartitionConsumer, PartitionSelectionStrategy};
use crate::consumer::{
ConsumerStream, MultiplePartitionConsumerStream, Record, ConsumerConfigExt, ConsumerOffset,
MultiplePartitionConsumer, PartitionSelectionStrategy, ConsumerStream,
MultiplePartitionConsumerStream, Record, ConsumerConfigExt, ConsumerOffset,
};
use crate::metrics::ClientMetrics;
use crate::producer::TopicProducerConfig;
use crate::spu::SpuSocketPool;
use crate::producer::{TopicProducerPool, TopicProducerConfig};
use crate::sync::MetadataStores;
use crate::spu::{SpuPool, SpuSocketPool};
use crate::{TopicProducer, PartitionConsumer, FluvioError, FluvioConfig};

/// An interface for interacting with Fluvio streaming
pub struct Fluvio {
Expand Down Expand Up @@ -90,6 +82,26 @@ impl Fluvio {
Self::connect_with_connector(connector, config).await
}

/// Creates a new Fluvio client with the given profile
///
/// # Example
///
/// ```no_run
/// # use fluvio::{Fluvio, FluvioError, FluvioConfig};
/// use fluvio::config::ConfigFile;
/// # async fn do_connect_with_profile_name() -> anyhow::Result<()> {
/// let fluvio = Fluvio::connect_with_profile("local").await?;
/// # Ok(())
/// # }
/// ```
pub async fn connect_with_profile(profile: &str) -> Result<Self> {
let config = FluvioConfig::load_with_profile(profile)?.context(format!(
"Failed to load cluster config with profile `{profile}`"
))?;
Self::connect_with_config(&config).await
}

/// Creates a new Fluvio client with the given connector and configuration
pub async fn connect_with_connector(
connector: DomainConnector,
config: &FluvioConfig,
Expand Down

0 comments on commit 313e024

Please sign in to comment.