Skip to content

Commit

Permalink
Cleanup
Browse files Browse the repository at this point in the history
Signed-off-by: Heinz N. Gies <[email protected]>
  • Loading branch information
Licenser committed Nov 2, 2023
1 parent deb6e01 commit a1049aa
Show file tree
Hide file tree
Showing 13 changed files with 36 additions and 46 deletions.
18 changes: 9 additions & 9 deletions src/connectors/impls/otel/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,9 +86,9 @@ impl ConnectorBuilder for Builder {

#[derive(Clone)]
pub(crate) struct RemoteOpenTelemetryEndpoint {
logs_client: LogsServiceClient<TonicChannel>,
metrics_client: MetricsServiceClient<TonicChannel>,
trace_client: TraceServiceClient<TonicChannel>,
logs: LogsServiceClient<TonicChannel>,
metrics: MetricsServiceClient<TonicChannel>,
traces: TraceServiceClient<TonicChannel>,
}

#[async_trait::async_trait]
Expand Down Expand Up @@ -128,9 +128,9 @@ impl Sink for OtelSink {
.await?;

self.remote = Some(RemoteOpenTelemetryEndpoint {
logs_client: LogsServiceClient::new(channel.clone()),
metrics_client: MetricsServiceClient::new(channel.clone()),
trace_client: TraceServiceClient::new(channel),
logs: LogsServiceClient::new(channel.clone()),
metrics: MetricsServiceClient::new(channel.clone()),
traces: TraceServiceClient::new(channel),
});

Ok(true)
Expand All @@ -152,23 +152,23 @@ impl Sink for OtelSink {
"Error converting payload to otel metrics",
)?,
};
remote.metrics_client.export(request).await.err()
remote.metrics.export(request).await.err()
} else if self.config.logs && value.contains_key("logs") {
let request = ExportLogsServiceRequest {
resource_logs: ctx.bail_err(
logs::resource_logs_to_pb(value),
"Error converting payload to otel logs",
)?,
};
remote.logs_client.export(request).await.err()
remote.logs.export(request).await.err()
} else if self.config.trace && value.contains_key("trace") {
let request = ExportTraceServiceRequest {
resource_spans: ctx.bail_err(
trace::resource_spans_to_pb(Some(value)),
"Error converting payload to otel span",
)?,
};
remote.trace_client.export(request).await.err()
remote.traces.export(request).await.err()
} else {
warn!("{ctx} Invalid or disabled otel payload: {value}");
None
Expand Down
2 changes: 1 addition & 1 deletion src/connectors/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) use tremor_common::{
};
pub(crate) use tremor_config::Impl;
pub use tremor_config::NameWithConfig;
pub use tremor_pipeline::{CbAction, EventIdGenerator, EventOriginUri, DEFAULT_STREAM_ID};
pub use tremor_pipeline::{CbAction, EventOriginUri, DEFAULT_STREAM_ID};
pub(crate) use tremor_script::prelude::*;
/// default buf size used for reading from files and streams (sockets etc)
///
Expand Down
2 changes: 1 addition & 1 deletion src/raft/store/statemachine/nodes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ pub(crate) struct NodesStateMachine {
}

impl NodesStateMachine {
const NEXT_NODE_ID: &str = "next_node_id";
const NEXT_NODE_ID: &'static str = "next_node_id";
}

#[async_trait::async_trait]
Expand Down
4 changes: 1 addition & 3 deletions tremor-cli/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
// limitations under the License.

//NOTE: error_chain
#![allow(deprecated)]
#![allow(missing_docs)]
#![allow(clippy::large_enum_variant)]
#![allow(deprecated, missing_docs, clippy::large_enum_variant)]

use crate::util::SourceKind;
use error_chain::error_chain;
Expand Down
8 changes: 4 additions & 4 deletions tremor-cli/src/test/before.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ pub(crate) struct Before {
#[serde(rename = "max-await-secs", default = "default_max_await_secs")]
until: u64,
#[serde(rename = "min-await-secs", default = "default_min_await_secs")]
before_start_delay: u64,
start_delay: u64,
}

fn default_dir() -> String {
Expand Down Expand Up @@ -179,9 +179,9 @@ impl Before {
tokio::time::sleep(Duration::from_millis(100)).await;
}
}
if self.before_start_delay > 0 {
let dur = Duration::from_secs(self.before_start_delay);
debug!("Sleeping for {}s ...", self.before_start_delay);
if self.start_delay > 0 {
let dur = Duration::from_secs(self.start_delay);
debug!("Sleeping for {}s ...", self.start_delay);
tokio::time::sleep(dur).await;
}
Ok(())
Expand Down
4 changes: 1 addition & 3 deletions tremor-pipeline/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
// limitations under the License.

//NOTE: error_chain
#![allow(deprecated)]
#![allow(clippy::large_enum_variant)]
#![allow(missing_docs)]
#![allow(deprecated, missing_docs, clippy::large_enum_variant)]

use error_chain::error_chain;
impl<P> From<std::sync::PoisonError<P>> for Error {
Expand Down
16 changes: 8 additions & 8 deletions tremor-pipeline/src/op/generic/batch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ struct Batch {
/// event id for the resulting batched event
/// the resulting id will be a new distinct id and will be tracking
/// all event ids (min and max) in the batched event
batch_event_id: EventId,
event_id: EventId,
is_transactional: bool,
event_id_gen: EventIdGenerator,
}
Expand All @@ -85,7 +85,7 @@ if let Some(map) = &node.config {
config,
max_delay_ns,
first_ns: 0,
batch_event_id: idgen.next_id(),
event_id: idgen.next_id(),
is_transactional: false,
event_id_gen: idgen,
}))
Expand Down Expand Up @@ -114,7 +114,7 @@ impl Operator for Batch {
transactional,
..
} = event;
self.batch_event_id.track(&id);
self.event_id.track(&id);
self.is_transactional = self.is_transactional || transactional;
self.data.consume(
data,
Expand Down Expand Up @@ -158,7 +158,7 @@ impl Operator for Batch {
..Event::default()
};
self.is_transactional = false;
swap(&mut self.batch_event_id, &mut event.id);
swap(&mut self.event_id, &mut event.id);
Ok(event.into())
} else {
Ok(EventAndInsights::default())
Expand Down Expand Up @@ -197,7 +197,7 @@ impl Operator for Batch {
..Event::default()
};
self.is_transactional = false;
swap(&mut self.batch_event_id, &mut event.id);
swap(&mut self.event_id, &mut event.id);
EventAndInsights::from(event)
} else {
EventAndInsights::default()
Expand Down Expand Up @@ -229,7 +229,7 @@ mod test {
max_delay_ns: None,
data: empty_payload(),
len: 0,
batch_event_id: idgen.next_id(),
event_id: idgen.next_id(),
is_transactional: false,
event_id_gen: idgen,
};
Expand Down Expand Up @@ -369,7 +369,7 @@ mod test {
max_delay_ns: Some(1_000_000),
data: empty_payload(),
len: 0,
batch_event_id: idgen.next_id(),
event_id: idgen.next_id(),
is_transactional: false,
event_id_gen: idgen,
};
Expand Down Expand Up @@ -445,7 +445,7 @@ mod test {
max_delay_ns: Some(100_000),
data: empty_payload(),
len: 0,
batch_event_id: idgen.next_id(),
event_id: idgen.next_id(),
is_transactional: false,
event_id_gen: idgen,
};
Expand Down
8 changes: 4 additions & 4 deletions tremor-pipeline/src/op/grouper/bucket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ op!(BucketGrouperFactory(_uid, node) {
#[derive(Debug)]
struct Rate {
/// the maximum number of events per time range
rate: u64,
max: u64,
/// time range in milliseconds, (default: 1000 - 1 second)
time_range: u64,
/// numbers of window in the time_range (default: 100)
Expand All @@ -112,12 +112,12 @@ struct Rate {

impl Rate {
pub fn from_meta(meta: &Value) -> Option<Self> {
let rate = meta.get("rate")?.as_u64()?;
let max = meta.get("rate")?.as_u64()?;

let time_range = meta.get_u64("time_range").unwrap_or(1000);
let windows = meta.get_usize("windows").unwrap_or(100);
Some(Self {
rate,
max,
time_range,
windows,
})
Expand Down Expand Up @@ -188,7 +188,7 @@ impl Operator for Grouper {
TimeWindow::new(
rate.windows,
rate.time_range / (rate.windows as u64),
rate.rate,
rate.max,
),
);
let Some(g) = groups.cache.get_mut(&dimensions) else {
Expand Down
8 changes: 4 additions & 4 deletions tremor-pipeline/src/op/trickle/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use tremor_value::{utils::sorted_serialize, Value};

#[derive(Debug)]
pub(crate) struct Select {
select: ast::SelectStmt<'static>,
stmt: ast::SelectStmt<'static>,
windows: Vec<Window>,
groups: HashMap<String, Group>,
recursion_limit: u32,
Expand Down Expand Up @@ -70,7 +70,7 @@ impl Select {
.unwrap_or(0);
Self {
windows,
select: select.clone(),
stmt: select.clone(),
groups: HashMap::new(),
recursion_limit: tremor_script::recursion_limit(),
dflt_group,
Expand Down Expand Up @@ -177,7 +177,7 @@ impl Operator for Select {
mut event: Event,
) -> Result<EventAndInsights> {
let Self {
select,
stmt: select,
windows,
groups,
recursion_limit,
Expand Down Expand Up @@ -318,7 +318,7 @@ impl Operator for Select {
) -> Result<EventAndInsights> {
// we only react on ticks and when we have windows
let Self {
select,
stmt: select,
windows,
groups,
recursion_limit,
Expand Down
4 changes: 1 addition & 3 deletions tremor-script/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@
// limitations under the License.

// NOTE: We need this because of error_chain
#![allow(clippy::large_enum_variant)]
#![allow(deprecated)]
#![allow(missing_docs)]
#![allow(deprecated, missing_docs, clippy::large_enum_variant)]

use crate::errors::ErrorKind::InvalidBinaryBoolean;
pub use crate::prelude::ValueType;
Expand Down
6 changes: 2 additions & 4 deletions tremor-value/src/known_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use beef::Cow;
use halfbrown::RawEntryMut;
use simd_json::ObjectHasher;
use std::fmt;
use std::hash::{BuildHasher, Hash, Hasher};
use std::hash::{BuildHasher};
use value_trait::prelude::*;

/// Well known key that can be looked up in a `Value` faster.
Expand Down Expand Up @@ -57,10 +57,8 @@ where
fn from(key: S) -> Self {
let key = Cow::from(key);
let hash_builder = ObjectHasher::default();
let mut hasher = hash_builder.build_hasher();
key.hash(&mut hasher);
Self {
hash: hasher.finish(),
hash: hash_builder.hash_one(&key),
key,
}
}
Expand Down
1 change: 0 additions & 1 deletion tremor-value/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ pub use crate::serde::structurize;
pub use error::*;
pub use known_key::{Error as KnownKeyError, KnownKey};
pub use simd_json::{json, json_typed, value::ObjectHasher, Buffers, StaticNode};
pub use value::from::*;
pub use value::{parse_to_value, parse_to_value_with_buffers, to_value, Object, Value};

use beef::Cow;
Expand Down
1 change: 0 additions & 1 deletion tremor-value/src/serde/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,3 @@ mod se;

pub use de::structurize;
pub use se::to_value;
pub use se::Serializer;

0 comments on commit a1049aa

Please sign in to comment.