diff --git a/src/common/meta/src/key.rs b/src/common/meta/src/key.rs index 22179afbad98..e78c4fa406e1 100644 --- a/src/common/meta/src/key.rs +++ b/src/common/meta/src/key.rs @@ -52,6 +52,9 @@ //! - The value is a [ViewInfoValue] struct; it contains the encoded logical plan. //! - This key is mainly used in constructing the view in Datanode and Frontend. //! +//! 11. Flownode address key: `__flow/addr/{flownode_id}` +//! - The value is a [FlownodeAddrValue] struct; it contains the address of the flownode. +//! //! All keys have related managers. The managers take care of the serialization and deserialization //! of keys and values, and the interaction with the underlying KV store backend. //! @@ -80,6 +83,10 @@ //! {flownode_id}/ //! {flow_id}/ //! {partition_id} +//! +//! addr/ +//! {flownode_id} +//! pub mod catalog_name; pub mod datanode_table; @@ -130,6 +137,7 @@ use self::table_route::{TableRouteManager, TableRouteValue}; use self::tombstone::TombstoneManager; use crate::ddl::utils::region_storage_path; use crate::error::{self, Result, SerdeJsonSnafu}; +use crate::key::flow::flownode_addr::FlownodeAddrValue; use crate::key::table_route::TableRouteKey; use crate::key::txn_helper::TxnOpGetResponseSet; use crate::kv_backend::txn::{Txn, TxnOp}; @@ -1189,7 +1197,8 @@ impl_table_meta_value! { ViewInfoValue, DatanodeTableValue, FlowInfoValue, - FlowNameValue + FlowNameValue, + FlownodeAddrValue } impl_optional_meta_value! { diff --git a/src/common/meta/src/key/flow.rs b/src/common/meta/src/key/flow.rs index f66d17da33f7..8dddfafa65de 100644 --- a/src/common/meta/src/key/flow.rs +++ b/src/common/meta/src/key/flow.rs @@ -14,6 +14,7 @@ pub mod flow_info; pub(crate) mod flow_name; +pub(crate) mod flownode_addr; pub(crate) mod flownode_flow; pub(crate) mod table_flow; diff --git a/src/common/meta/src/key/flow/flownode_addr.rs b/src/common/meta/src/key/flow/flownode_addr.rs new file mode 100644 index 000000000000..ecaa5a2722d7 --- /dev/null +++ b/src/common/meta/src/key/flow/flownode_addr.rs @@ -0,0 +1,164 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use futures::stream::BoxStream; +use futures::TryStreamExt; +use lazy_static::lazy_static; +use regex::Regex; +use serde::{Deserialize, Serialize}; +use snafu::OptionExt; + +use crate::error::{self, Result}; +use crate::key::flow::FlowScoped; +use crate::key::{BytesAdapter, FlowId, FlowPartitionId, MetaKey, TableMetaValue}; +use crate::kv_backend::txn::{Txn, TxnOp}; +use crate::kv_backend::KvBackendRef; +use crate::range_stream::{PaginationStream, DEFAULT_PAGE_SIZE}; +use crate::rpc::store::RangeRequest; +use crate::rpc::KeyValue; +use crate::FlownodeId; + +lazy_static! { + static ref FLOWNODE_FLOW_KEY_PATTERN: Regex = + Regex::new(&format!("^{FLOWNODE_ADDR_KEY_PREFIX}/([0-9]+)$")).unwrap(); +} + +const FLOWNODE_ADDR_KEY_PREFIX: &str = "addr"; + +/// The key of mapping [FlownodeId] to address. +/// +/// The layout `__flow/addr/{flownode_id}` +pub struct FlownodeAddrKey(FlowScoped); + +impl FlownodeAddrKey { + /// Returns a new [FlownodeAddrKey]. + pub fn new(flownode_id: FlownodeId) -> FlownodeAddrKey { + let inner = FlownodeAddrKeyInner { flownode_id }; + FlownodeAddrKey(FlowScoped::new(inner)) + } + + /// Returns the [FlownodeId]. + pub fn flownode_id(&self) -> FlownodeId { + self.0.flownode_id + } +} + +impl<'a> MetaKey<'a, FlownodeAddrKey> for FlownodeAddrKey { + fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result { + Ok(FlownodeAddrKey( + FlowScoped::::from_bytes(bytes)?, + )) + } +} + +/// The key of mapping [FlownodeId] to [FlowId]. +pub struct FlownodeAddrKeyInner { + flownode_id: FlownodeId, +} + +impl<'a> MetaKey<'a, FlownodeAddrKeyInner> for FlownodeAddrKeyInner { + fn to_bytes(&self) -> Vec { + format!("{FLOWNODE_ADDR_KEY_PREFIX}/{}", self.flownode_id).into_bytes() + } + + fn from_bytes(bytes: &'a [u8]) -> Result { + let key = std::str::from_utf8(bytes).map_err(|e| { + error::InvalidTableMetadataSnafu { + err_msg: format!( + "FlownodeAddrKeyInner '{}' is not a valid UTF8 string: {e}", + String::from_utf8_lossy(bytes) + ), + } + .build() + })?; + let captures = + FLOWNODE_FLOW_KEY_PATTERN + .captures(key) + .context(error::InvalidTableMetadataSnafu { + err_msg: format!("Invalid FlownodeAddrKeyInner '{key}'"), + })?; + // Safety: pass the regex check above + let flownode_id = captures[1].parse::().unwrap(); + Ok(FlownodeAddrKeyInner { flownode_id }) + } +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +pub struct FlownodeAddrValue { + pub addr: String, +} + +impl FlownodeAddrValue { + pub fn new(addr: String) -> Self { + Self { addr } + } + + pub fn addr(&self) -> &str { + &self.addr + } +} + +pub type FlownodeAddrManagerRef = Arc; + +pub struct FlownodeAddrManager { + kv_backend: KvBackendRef, +} + +impl FlownodeAddrManager { + pub fn new(kv_backend: KvBackendRef) -> Self { + Self { kv_backend } + } + + /// Return the address of the flownode. + pub async fn get(&self, flownode_id: FlownodeId) -> Result> { + let key = FlownodeAddrKey::new(flownode_id).to_bytes(); + self.kv_backend + .get(&key) + .await? + .map(|x| FlownodeAddrValue::try_from_raw_value(&x.value)) + .transpose() + } + + pub(crate) fn build_register_txn( + &self, + flownode_id: FlownodeId, + value: FlownodeAddrValue, + ) -> Result { + let key = FlownodeAddrKey::new(flownode_id).to_bytes(); + Ok(Txn::put_if_not_exists( + key.clone(), + value.try_as_raw_value()?, + )) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_key_serde() { + let flownode_addr_key = FlownodeAddrKey::new(1); + let expected = b"__flow/addr/1".to_vec(); + assert_eq!(expected, flownode_addr_key.to_bytes()); + let de = FlownodeAddrKey::from_bytes(&expected).unwrap(); + assert_eq!(flownode_addr_key.flownode_id(), de.flownode_id()); + } +} diff --git a/src/flow/src/adapter/server.rs b/src/flow/src/adapter/server.rs index c0d0854572c7..2a533e099f85 100644 --- a/src/flow/src/adapter/server.rs +++ b/src/flow/src/adapter/server.rs @@ -134,7 +134,6 @@ impl servers::server::Server for FlownodeServer { .context(StartGrpcSnafu); }); - // TODO(discord9): better place for dataflow to run per second let manager_ref = self.flow_service.manager.clone(); let _handle = manager_ref.clone().run_background();