Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: sql identifier default to case-sensitive #641

Merged
merged 11 commits into from
Feb 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
171 changes: 86 additions & 85 deletions Cargo.lock

Large diffs are not rendered by default.

14 changes: 7 additions & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ name = "ceresdb-server"
path = "src/bin/ceresdb-server.rs"

[workspace.dependencies]
arrow = { version = "31.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "31.0.0" }
arrow = { version = "32.0.0", features = ["prettyprint"] }
arrow_ipc = { version = "32.0.0" }
arrow_ext = { path = "components/arrow_ext" }
analytic_engine = { path = "analytic_engine" }
arena = { path = "components/arena" }
Expand All @@ -68,10 +68,10 @@ cluster = { path = "cluster" }
criterion = "0.3"
common_types = { path = "common_types" }
common_util = { path = "common_util" }
datafusion = "17.0.0"
datafusion-expr = "17.0.0"
datafusion-optimizer = "17.0.0"
datafusion-proto = "17.0.0"
datafusion = "18.0.0"
datafusion-expr = "18.0.0"
datafusion-optimizer = "18.0.0"
datafusion-proto = "18.0.0"
df_operator = { path = "df_operator" }
env_logger = "0.6"
futures = "0.3"
Expand All @@ -86,7 +86,7 @@ interpreters = { path = "interpreters" }
meta_client = { path = "meta_client" }
object_store = { path = "components/object_store" }
parquet_ext = { path = "components/parquet_ext" }
parquet = { version = "31.0.0" }
parquet = { version = "32.0.0" }
paste = "1.0"
profile = { path = "components/profile" }
prometheus = "0.12"
Expand Down
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,9 @@ check-license:
cd $(DIR); sh scripts/check-license.sh

clippy:
cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D warnings
ShiKaiWi marked this conversation as resolved.
Show resolved Hide resolved
cd $(DIR); cargo clippy --all-targets --all-features --workspace -- -D warnings \
-A clippy::result_large_err -A clippy::box_default -A clippy::extra-unused-lifetimes \
-A clippy::only-used-in-recursion

# test with address sanitizer
asan-test:
Expand Down
9 changes: 2 additions & 7 deletions analytic_engine/src/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,9 @@ pub enum Error {
InvalidOption { error: String, backtrace: Backtrace },
}

#[derive(Debug, Clone, Copy, Deserialize, PartialEq)]
#[derive(Debug, Clone, Copy, Deserialize, PartialEq, Default)]
pub enum CompactionStrategy {
#[default]
Default,
TimeWindow(TimeWindowCompactionOptions),
SizeTiered(SizeTieredCompactionOptions),
Expand Down Expand Up @@ -113,12 +114,6 @@ impl Default for TimeWindowCompactionOptions {
}
}

impl Default for CompactionStrategy {
fn default() -> Self {
CompactionStrategy::Default
}
}

const BUCKET_LOW_KEY: &str = "compaction_bucket_low";
const BUCKET_HIGH_KEY: &str = "compaction_bucket_high";
const MIN_THRESHOLD_KEY: &str = "compaction_min_threshold";
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/instance/flush_compaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1082,7 +1082,6 @@ fn split_record_batch_with_time_ranges(
timestamp_idx: usize,
) -> Result<Vec<RecordBatchWithKey>> {
let mut builders: Vec<RecordBatchWithKeyBuilder> = (0..time_ranges.len())
.into_iter()
.map(|_| RecordBatchWithKeyBuilder::new(record_batch.schema_with_key().clone()))
.collect();

Expand All @@ -1108,8 +1107,7 @@ fn split_record_batch_with_time_ranges(
.context(SplitRecordBatch)?;
} else {
panic!(
"Record timestamp is not in time_ranges, timestamp:{:?}, time_ranges:{:?}",
timestamp, time_ranges
"Record timestamp is not in time_ranges, timestamp:{timestamp:?}, time_ranges:{time_ranges:?}"
);
}
}
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Default for Config {
db_write_buffer_size: 0,
scan_batch_size: 500,
sst_background_read_parallelism: 8,
wal_storage: WalStorageConfig::RocksDB(Box::new(RocksDBConfig::default())),
wal_storage: WalStorageConfig::RocksDB(Box::default()),
remote_engine_client: remote_engine_client::config::Config::default(),
}
}
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ mod tests {
}

fn table_name_from_id(table_id: TableId) -> String {
format!("table_{:?}", table_id)
format!("table_{table_id:?}")
}

async fn open_manifest(&self) -> ManifestImpl {
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ impl<'a> Payload for WritePayload<'a> {
WritePayload::AlterOption(req) => req.encoded_len(),
};

HEADER_SIZE + body_size as usize
HEADER_SIZE + body_size
}

fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<()> {
Expand Down
3 changes: 1 addition & 2 deletions analytic_engine/src/sampler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,8 +257,7 @@ mod tests {
assert_eq!(
*expect,
pick_duration(*interval).as_millis() as u64,
"Case {}",
i
"Case {i}"
);
}
}
Expand Down
3 changes: 1 addition & 2 deletions analytic_engine/src/setup.rs
Original file line number Diff line number Diff line change
Expand Up @@ -325,8 +325,7 @@ impl EngineBuilder for KafkaWalEngineBuilder {
_ => {
return InvalidWalConfig {
msg: format!(
"invalid wal storage config while opening kafka wal, config:{:?}",
config
"invalid wal storage config while opening kafka wal, config:{config:?}"
),
}
.fail();
Expand Down
1 change: 0 additions & 1 deletion analytic_engine/src/sst/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -327,7 +327,6 @@ impl FileHandleSet {
let seek_key = FileOrdKey::for_seek(time_range.inclusive_start());
self.file_map
.range(seek_key..)
.into_iter()
.filter_map(|(_key, file)| {
if file.intersect_with_time_range(time_range) {
Some(file.clone())
Expand Down
16 changes: 4 additions & 12 deletions analytic_engine/src/sst/parquet/async_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,8 +400,7 @@ impl AsyncFileReader for ObjectStoreReader {
.get_range(&self.path, range)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch range from object store, err:{}",
e
"Failed to fetch range from object store, err:{e}"
))
})
.boxed()
Expand All @@ -423,8 +422,7 @@ impl AsyncFileReader for ObjectStoreReader {
.get_ranges(&self.path, &ranges)
.map_err(|e| {
parquet::errors::ParquetError::General(format!(
"Failed to fetch ranges from object store, err:{}",
e
"Failed to fetch ranges from object store, err:{e}"
))
})
.await
Expand Down Expand Up @@ -572,8 +570,7 @@ impl Stream for RecordBatchReceiver {
let rx_group_len = self.rx_group.len();
let cur_rx = self.rx_group.get_mut(cur_rx_idx).unwrap_or_else(|| {
panic!(
"cur_rx_idx is impossible to be out-of-range, cur_rx_idx:{}, rx_group len:{}",
cur_rx_idx, rx_group_len
"cur_rx_idx is impossible to be out-of-range, cur_rx_idx:{cur_rx_idx}, rx_group len:{rx_group_len}"
)
});
let poll_result = cur_rx.poll_recv(cx);
Expand Down Expand Up @@ -673,7 +670,6 @@ impl<'a> SstReader for ThreadedReader<'a> {

let channel_cap_per_sub_reader = self.channel_cap / self.read_parallelism + 1;
let (tx_group, rx_group): (Vec<_>, Vec<_>) = (0..read_parallelism)
.into_iter()
.map(|_| mpsc::channel::<Result<RecordBatchWithKey>>(channel_cap_per_sub_reader))
.unzip();

Expand Down Expand Up @@ -755,10 +751,7 @@ mod tests {
}

fn gen_test_data(amount: usize) -> Vec<u32> {
(0..amount)
.into_iter()
.map(|_| rand::random::<u32>())
.collect()
(0..amount).map(|_| rand::random::<u32>()).collect()
}

// We mock a thread model same as the one in `ThreadedReader` to check its
Expand All @@ -772,7 +765,6 @@ mod tests {
let channel_cap_per_sub_reader = 10;
let reader_num = 5;
let (tx_group, rx_group): (Vec<_>, Vec<_>) = (0..reader_num)
.into_iter()
.map(|_| mpsc::channel::<u32>(channel_cap_per_sub_reader))
.unzip();

Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/sst/parquet/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ pub const META_VALUE_HEADER: u8 = 0;
pub fn encode_sst_meta_data(meta_data: ParquetMetaData) -> Result<KeyValue> {
let meta_data_pb = sst_pb::ParquetMetaData::from(meta_data);

let mut buf = BytesMut::with_capacity(meta_data_pb.encoded_len() as usize + 1);
let mut buf = BytesMut::with_capacity(meta_data_pb.encoded_len() + 1);
buf.try_put_u8(META_VALUE_HEADER)
.expect("Should write header into the buffer successfully");

Expand Down Expand Up @@ -588,7 +588,7 @@ impl HybridRecordDecoder {
if let Some(bitmap) = old_null_bitmap {
if !bitmap.is_set(idx) {
for i in 0..value_num {
bit_util::unset_bit(null_slice, length_so_far + i as usize);
bit_util::unset_bit(null_slice, length_so_far + i);
}
}
}
Expand Down
6 changes: 2 additions & 4 deletions analytic_engine/src/sst/parquet/hybrid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -395,9 +395,7 @@ impl ListArrayBuilder {
let start = array.value_offsets()[slice_arg.offset];
let end = array.value_offsets()[slice_arg.offset + slice_arg.length];

for i in
(slice_arg.offset as usize)..(slice_arg.offset + slice_arg.length as usize)
{
for i in slice_arg.offset..(slice_arg.offset + slice_arg.length) {
inner_length_so_far += array.value_length(i);
inner_offsets.push(inner_length_so_far);
}
Expand Down Expand Up @@ -740,7 +738,7 @@ mod tests {
true,
))))
.len(2)
.add_buffer(Buffer::from_slice_ref(&offsets))
.add_buffer(Buffer::from_slice_ref(offsets))
.add_child_data(string_data.data().to_owned())
.build()
.unwrap();
Expand Down
2 changes: 1 addition & 1 deletion analytic_engine/src/table/sst_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ const SST_FILE_SUFFIX: &str = "sst";
#[inline]
/// Generate the sst file name.
pub fn sst_file_name(id: FileId) -> String {
format!("{}.{}", id, SST_FILE_SUFFIX)
format!("{id}.{SST_FILE_SUFFIX}")
}

pub fn new_sst_file_path(space_id: SpaceId, table_id: TableId, file_id: FileId) -> Path {
Expand Down
4 changes: 1 addition & 3 deletions analytic_engine/src/tests/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -211,9 +211,7 @@ pub fn assert_batch_eq_to_row_group(record_batches: &[RecordBatch], row_group: &
assert_eq!(
&cursor.datum(column_idx),
datum,
"record_batches:{:?}, row_group:{:?}",
record_batches,
row_group
"record_batches:{record_batches:?}, row_group:{row_group:?}"
);
}
cursor.step();
Expand Down
4 changes: 2 additions & 2 deletions analytic_engine/src/tests/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use crate::{
},
storage_options::{LocalOptions, ObjectStoreOptions, StorageOptions},
tests::table::{self, FixedSchemaTable, RowTuple},
Config, ObkvWalConfig, RocksDBConfig, WalStorageConfig,
Config, RocksDBConfig, WalStorageConfig,
};

const DAY_MS: i64 = 24 * 60 * 60 * 1000;
Expand Down Expand Up @@ -544,7 +544,7 @@ impl Default for MemoryEngineContext {
data_path: dir.path().to_str().unwrap().to_string(),
}),
},
wal_storage: WalStorageConfig::Obkv(Box::new(ObkvWalConfig::default())),
wal_storage: WalStorageConfig::Obkv(Box::default()),
..Default::default()
};

Expand Down
8 changes: 4 additions & 4 deletions benchmarks/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ fn bench_merge_sst(c: &mut Criterion) {
for i in 0..bench.num_benches() {
bench.init_for_bench(i, true);
group.bench_with_input(
BenchmarkId::new("merge_sst", format!("{}/{}/dedup", sst_file_ids, i)),
BenchmarkId::new("merge_sst", format!("{sst_file_ids}/{i}/dedup")),
&bench,
bench_merge_sst_iter,
);

bench.init_for_bench(i, false);
group.bench_with_input(
BenchmarkId::new("merge_sst", format!("{}/{}/no-dedup", sst_file_ids, i)),
BenchmarkId::new("merge_sst", format!("{sst_file_ids}/{i}/no-dedup")),
&bench,
bench_merge_sst_iter,
);
Expand Down Expand Up @@ -155,14 +155,14 @@ fn bench_merge_memtable(c: &mut Criterion) {
for i in 0..bench.num_benches() {
bench.init_for_bench(i, true);
group.bench_with_input(
BenchmarkId::new("merge_memtable", format!("{}/{}/dedup", sst_file_ids, i)),
BenchmarkId::new("merge_memtable", format!("{sst_file_ids}/{i}/dedup")),
&bench,
bench_merge_memtable_iter,
);

bench.init_for_bench(i, false);
group.bench_with_input(
BenchmarkId::new("merge_memtable", format!("{}/{}/no-dedup", sst_file_ids, i)),
BenchmarkId::new("merge_memtable", format!("{sst_file_ids}/{i}/no-dedup")),
&bench,
bench_merge_memtable_iter,
);
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/arrow2_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ impl Arrow2Bench {

pub fn init_for_bench(&mut self, i: usize) {
let projection = if i < self.max_projections {
(0..i + 1).into_iter().collect()
(0..i + 1).collect()
} else {
Vec::new()
};
Expand Down
5 changes: 1 addition & 4 deletions benchmarks/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,7 @@ pub struct BenchConfig {
pub fn bench_config_from_env() -> BenchConfig {
let path = match env::var(BENCH_CONFIG_PATH_KEY) {
Ok(v) => v,
Err(e) => panic!(
"Env {} is required to run benches, err:{}.",
BENCH_CONFIG_PATH_KEY, e
),
Err(e) => panic!("Env {BENCH_CONFIG_PATH_KEY} is required to run benches, err:{e}."),
};

let mut toml_buf = String::new();
Expand Down
2 changes: 1 addition & 1 deletion benchmarks/src/parquet_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl ParquetBench {

pub fn init_for_bench(&mut self, i: usize) {
let projection = if i < self.max_projections {
(0..i + 1).into_iter().collect()
(0..i + 1).collect()
} else {
Vec::new()
};
Expand Down
4 changes: 2 additions & 2 deletions benchmarks/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ pub fn projected_schema_by_number(
max_projections: usize,
) -> ProjectedSchema {
if num_columns < max_projections {
let projection = (0..num_columns + 1).into_iter().collect();
let projection = (0..num_columns + 1).collect();

ProjectedSchema::new(schema.clone(), Some(projection)).unwrap()
} else {
Expand Down Expand Up @@ -202,7 +202,7 @@ impl<'a> Payload for WritePayload<'a> {

fn encode_size(&self) -> usize {
let body_size = self.0.len();
HEADER_SIZE + body_size as usize
HEADER_SIZE + body_size
}

fn encode_to<B: BufMut>(&self, buf: &mut B) -> Result<()> {
Expand Down
2 changes: 1 addition & 1 deletion build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ fn main() {
.expect("Convert git branch env to string");
if !branch.is_empty() {
*config.git_mut().branch_mut() = false;
println!("cargo:rustc-env=VERGEN_GIT_BRANCH={}", branch);
println!("cargo:rustc-env=VERGEN_GIT_BRANCH={branch}");
}
}

Expand Down
6 changes: 3 additions & 3 deletions catalog_impls/src/table_based.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ impl Manager for TableBasedManager {
}

fn all_catalogs(&self) -> manager::Result<Vec<CatalogRef>> {
Ok(self.catalogs.iter().map(|(_, v)| v.clone() as _).collect())
Ok(self.catalogs.values().map(|v| v.clone() as _).collect())
}
}

Expand Down Expand Up @@ -895,8 +895,8 @@ impl Schema for SchemaImpl {
.read()
.unwrap()
.tables_by_name
.iter()
.map(|(_, v)| v.clone())
.values()
.cloned()
.collect())
}
}
Expand Down
4 changes: 2 additions & 2 deletions catalog_impls/src/volatile.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,8 @@ impl Manager for ManagerImpl {
fn all_catalogs(&self) -> manager::Result<Vec<CatalogRef>> {
Ok(self
.catalogs
.iter()
.map(|(_, v)| v.clone() as CatalogRef)
.values()
.map(|v| v.clone() as CatalogRef)
.collect())
}
}
Expand Down
Loading