Skip to content

Commit

Permalink
minor improvements
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Nov 3, 2023
1 parent caa48e2 commit 3d4acd6
Show file tree
Hide file tree
Showing 10 changed files with 23 additions and 31 deletions.
2 changes: 1 addition & 1 deletion src/connectors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ pub(crate) trait Context: Display + Clone {
#[derive(Clone)]
pub(crate) struct ConnectorContext {
/// alias of the connector instance
pub(crate) alias: alias::Connector,
alias: alias::Connector,
/// type of the connector
connector_type: ConnectorType,
/// The Quiescence Beacon
Expand Down
1 change: 1 addition & 0 deletions src/connectors/impls/discord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ mod utils;

use crate::channel::{bounded, Receiver, Sender};
use crate::connectors::prelude::*;
use crate::connectors::Context as ContextTrait;
use handler::Handler;
use serenity::prelude::*;
use tokio::task::JoinHandle;
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/impls/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ impl Connector for MetricsConnector {
ctx: SourceContext,
builder: SourceManagerBuilder,
) -> Result<Option<SourceAddr>> {
let source = MetricsSource::new(ctx.app_ctx.metrics.rx());
let source = MetricsSource::new(ctx.app_ctx().metrics.rx());
info!("{ctx} Metrics connector id: {}", source.rx.id());
Ok(Some(builder.spawn(source, ctx)))
}
Expand Down
4 changes: 2 additions & 2 deletions src/connectors/impls/tcp/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl Source for TcpServerSource {
let tls_reader = TcpReader::tls_server(
tls_read_stream,
vec![0; buf_size],
ctx.alias.clone(),
ctx.alias().clone(),
origin_uri.clone(),
meta,
reader_runtime,
Expand Down Expand Up @@ -293,7 +293,7 @@ impl Source for TcpServerSource {
let tcp_reader = TcpReader::new(
read_stream,
vec![0; buf_size],
ctx.alias.clone(),
ctx.alias().clone(),
origin_uri.clone(),
meta,
reader_runtime,
Expand Down
23 changes: 10 additions & 13 deletions src/connectors/sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,23 +251,23 @@ pub(crate) trait SinkRuntime: Send + Sync {
#[derive(Clone)]
pub(crate) struct SinkContextInner {
/// the connector unique identifier
pub(crate) uid: SinkUId,
uid: SinkUId,
/// the connector alias
pub(crate) alias: alias::Connector,
alias: alias::Connector,
/// the connector type
pub(crate) connector_type: ConnectorType,
connector_type: ConnectorType,

/// check if we are paused or should stop reading/writing
pub(crate) quiescence_beacon: QuiescenceBeacon,
quiescence_beacon: QuiescenceBeacon,

/// notifier the connector runtime if we lost a connection
pub(crate) notifier: ConnectionLostNotifier,
notifier: ConnectionLostNotifier,

/// sender for raft requests
/// Application Context
pub(crate) app_ctx: AppContext,
app_ctx: AppContext,

pub(crate) killswitch: KillSwitch,
killswitch: KillSwitch,
}
#[derive(Clone)]
pub(crate) struct SinkContext(Arc<SinkContextInner>);
Expand All @@ -285,18 +285,15 @@ impl SinkContext {
KillSwitch::dummy(),
)
}

pub(crate) fn killswitch(&self) -> KillSwitch {
self.0.killswitch.clone()
}

pub(crate) fn uid(&self) -> SinkUId {
self.0.uid
}
pub(crate) fn notifier(&self) -> &ConnectionLostNotifier {
&self.0.notifier
}
pub(crate) fn app_ctx(&self) -> &AppContext {
&self.0.app_ctx
}

pub(crate) fn new(
uid: SinkUId,
alias: alias::Connector,
Expand Down
16 changes: 8 additions & 8 deletions src/connectors/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -274,26 +274,26 @@ pub(crate) trait StreamReader: Send {
#[derive(Clone)]
pub(crate) struct SourceContext {
/// connector uid
pub(crate) uid: SourceUId,
uid: SourceUId,
/// connector alias
pub(crate) alias: alias::Connector,
alias: alias::Connector,

/// connector type
pub(crate) connector_type: ConnectorType,
connector_type: ConnectorType,
/// The Quiescence Beacon
pub(crate) quiescence_beacon: QuiescenceBeacon,
quiescence_beacon: QuiescenceBeacon,

/// tool to notify the connector when the connection is lost
pub(crate) notifier: ConnectionLostNotifier,
notifier: ConnectionLostNotifier,

/// Application Context
pub(crate) app_ctx: AppContext,
app_ctx: AppContext,

/// kill switch
pub(crate) killswitch: KillSwitch,
killswitch: KillSwitch,

/// Precomputed prefix for logging
pub(crate) prefix: alias::SourceContext,
prefix: alias::SourceContext,
}

impl SourceContext {
Expand Down
1 change: 0 additions & 1 deletion src/connectors/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -343,7 +343,6 @@ impl ConnectorHarness {

// this is only used in integration tests,
// otherwise this throws an error when compiled for non-integration tests
#[allow(dead_code)]
pub(crate) async fn signal_tick_to_sink(&self) -> Result<()> {
self.addr
.send_sink(SinkMsg::Signal {
Expand Down
2 changes: 0 additions & 2 deletions src/connectors/utils/pb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![allow(dead_code)]

use crate::connectors::prelude::*;
use crate::errors::{Error, ErrorKind, Result};
use simd_json::StaticNode;
Expand Down
1 change: 0 additions & 1 deletion src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,6 @@ pub(crate) enum MgmtMsg {
}

#[cfg(test)]
#[allow(dead_code)]
mod report {
use tremor_common::ports::Port;

Expand Down
2 changes: 0 additions & 2 deletions src/raft/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ impl TestNode {
})
}

#[allow(dead_code)]
async fn just_start(path: impl AsRef<Path>) -> ClusterResult<Self> {
let addr = free_node_addr().await?;
let running = Node::load_from_store(path, raft_config()?).await?;
Expand All @@ -82,7 +81,6 @@ impl TestNode {
})
}

#[allow(dead_code)]
async fn join_as_learner(path: impl AsRef<Path>, join_addr: &Addr) -> ClusterResult<Self> {
let addr = free_node_addr().await?;
let mut node = Node::new(path, raft_config()?);
Expand Down

0 comments on commit 3d4acd6

Please sign in to comment.