Skip to content

Commit

Permalink
Merge branch 'main' into refactor_fb_codec
Browse files Browse the repository at this point in the history
  • Loading branch information
zealchen committed Aug 8, 2024
2 parents eea24f8 + 483eb98 commit 6edff22
Show file tree
Hide file tree
Showing 21 changed files with 940 additions and 501 deletions.
83 changes: 83 additions & 0 deletions integration_tests/cases/env/local/dml/insert_into_select.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
--
DROP TABLE IF EXISTS `insert_into_select_table1`;

affected_rows: 0

CREATE TABLE `insert_into_select_table1` (
`timestamp` timestamp NOT NULL,
`value` int,
`name` string,
timestamp KEY (timestamp)) ENGINE=Analytic
WITH(
enable_ttl='false'
);

affected_rows: 0

INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`)
VALUES
(1, 100, "s1"),
(2, 200, "s2"),
(3, 300, "s3"),
(4, 400, "s4"),
(5, 500, "s5");

affected_rows: 5

DROP TABLE IF EXISTS `insert_into_select_table2`;

affected_rows: 0

CREATE TABLE `insert_into_select_table2` (
`timestamp` timestamp NOT NULL,
`value` int,
`name` string NULL,
timestamp KEY (timestamp)) ENGINE=Analytic
WITH(
enable_ttl='false'
);

affected_rows: 0

INSERT INTO `insert_into_select_table2` (`timestamp`, `value`)
SELECT `timestamp`, `value`
FROM `insert_into_select_table1`;

affected_rows: 5

SELECT `timestamp`, `value`, `name`
FROM `insert_into_select_table2`;

timestamp,value,name,
Timestamp(1),Int32(100),String(""),
Timestamp(2),Int32(200),String(""),
Timestamp(3),Int32(300),String(""),
Timestamp(4),Int32(400),String(""),
Timestamp(5),Int32(500),String(""),


DROP TABLE `insert_into_select_table1`;

affected_rows: 0

DROP TABLE `insert_into_select_table2`;

affected_rows: 0

59 changes: 59 additions & 0 deletions integration_tests/cases/env/local/dml/insert_into_select.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
--

DROP TABLE IF EXISTS `insert_into_select_table1`;

CREATE TABLE `insert_into_select_table1` (
`timestamp` timestamp NOT NULL,
`value` int,
`name` string,
timestamp KEY (timestamp)) ENGINE=Analytic
WITH(
enable_ttl='false'
);

INSERT INTO `insert_into_select_table1` (`timestamp`, `value`, `name`)
VALUES
(1, 100, "s1"),
(2, 200, "s2"),
(3, 300, "s3"),
(4, 400, "s4"),
(5, 500, "s5");

DROP TABLE IF EXISTS `insert_into_select_table2`;

CREATE TABLE `insert_into_select_table2` (
`timestamp` timestamp NOT NULL,
`value` int,
`name` string NULL,
timestamp KEY (timestamp)) ENGINE=Analytic
WITH(
enable_ttl='false'
);

INSERT INTO `insert_into_select_table2` (`timestamp`, `value`)
SELECT `timestamp`, `value`
FROM `insert_into_select_table1`;

SELECT `timestamp`, `value`, `name`
FROM `insert_into_select_table2`;

DROP TABLE `insert_into_select_table1`;

DROP TABLE `insert_into_select_table2`;
4 changes: 1 addition & 3 deletions src/analytic_engine/src/instance/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,9 +218,7 @@ pub enum Error {
CreateOpenFailedTable { table: String, backtrace: Backtrace },

#[snafu(display("Failed to open manifest, err:{}", source))]
OpenManifest {
source: crate::manifest::details::Error,
},
OpenManifest { source: crate::manifest::Error },

#[snafu(display("Failed to find table, msg:{}.\nBacktrace:\n{}", msg, backtrace))]
TableNotExist { msg: String, backtrace: Backtrace },
Expand Down
130 changes: 19 additions & 111 deletions src/analytic_engine/src/manifest/details.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ use std::{
};

use async_trait::async_trait;
use generic_error::{BoxError, GenericError, GenericResult};
use generic_error::{BoxError, GenericResult};
use horaedbproto::manifest as manifest_pb;
use lazy_static::lazy_static;
use logger::{debug, info, warn};
use macros::define_result;
use object_store::{ObjectStoreRef, Path};
use parquet::data_type::AsBytes;
use prometheus::{exponential_buckets, register_histogram, Histogram};
use prost::Message;
use serde::{Deserialize, Serialize};
use snafu::{Backtrace, ResultExt, Snafu};
use table_engine::table::TableId;
use time_ext::ReadableDuration;
use tokio::sync::Mutex;
Expand All @@ -57,104 +55,12 @@ use crate::{
MetaEdit, MetaEditRequest, MetaUpdate, MetaUpdateDecoder, MetaUpdatePayload, Snapshot,
},
meta_snapshot::{MetaSnapshot, MetaSnapshotBuilder},
LoadRequest, Manifest, SnapshotRequest,
Error, LoadRequest, Manifest, Result, SnapshotRequest,
},
space::SpaceId,
table::data::{TableDataRef, TableShardInfo},
};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub(crate)))]
pub enum Error {
#[snafu(display(
"Failed to encode payloads, wal_location:{:?}, err:{}",
wal_location,
source
))]
EncodePayloads {
wal_location: WalLocation,
source: wal::manager::Error,
},

#[snafu(display("Failed to write update to wal, err:{}", source))]
WriteWal { source: wal::manager::Error },

#[snafu(display("Failed to read wal, err:{}", source))]
ReadWal { source: wal::manager::Error },

#[snafu(display("Failed to read log entry, err:{}", source))]
ReadEntry { source: wal::manager::Error },

#[snafu(display("Failed to apply table meta update, err:{}", source))]
ApplyUpdate {
source: crate::manifest::meta_snapshot::Error,
},

#[snafu(display("Failed to clean wal, err:{}", source))]
CleanWal { source: wal::manager::Error },

#[snafu(display(
"Failed to store snapshot, err:{}.\nBacktrace:\n{:?}",
source,
backtrace
))]
StoreSnapshot {
source: object_store::ObjectStoreError,
backtrace: Backtrace,
},

#[snafu(display(
"Failed to fetch snapshot, err:{}.\nBacktrace:\n{:?}",
source,
backtrace
))]
FetchSnapshot {
source: object_store::ObjectStoreError,
backtrace: Backtrace,
},

#[snafu(display(
"Failed to decode snapshot, err:{}.\nBacktrace:\n{:?}",
source,
backtrace
))]
DecodeSnapshot {
source: prost::DecodeError,
backtrace: Backtrace,
},

#[snafu(display("Failed to build snapshot, msg:{}.\nBacktrace:\n{:?}", msg, backtrace))]
BuildSnapshotNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to build snapshot, msg:{}, err:{}", msg, source))]
BuildSnapshotWithCause { msg: String, source: GenericError },

#[snafu(display(
"Failed to apply edit to table, msg:{}.\nBacktrace:\n{:?}",
msg,
backtrace
))]
ApplyUpdateToTableNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to apply edit to table, msg:{}, err:{}", msg, source))]
ApplyUpdateToTableWithCause { msg: String, source: GenericError },

#[snafu(display(
"Failed to apply snapshot to table, msg:{}.\nBacktrace:\n{:?}",
msg,
backtrace
))]
ApplySnapshotToTableNoCause { msg: String, backtrace: Backtrace },

#[snafu(display("Failed to apply snapshot to table, msg:{}, err:{}", msg, source))]
ApplySnapshotToTableWithCause { msg: String, source: GenericError },

#[snafu(display("Failed to load snapshot, err:{}", source))]
LoadSnapshot { source: GenericError },
}

define_result!(Error);

lazy_static! {
static ref RECOVER_TABLE_META_FROM_SNAPSHOT_DURATION: Histogram = register_histogram!(
"recover_table_meta_from_snapshot_duration",
Expand Down Expand Up @@ -197,7 +103,7 @@ impl MetaUpdateLogEntryIterator for MetaUpdateReaderImpl {
.iter
.next_log_entries(decoder, |_| true, buffer)
.await
.context(ReadEntry)?;
.map_err(anyhow::Error::new)?;
}

match self.buffer.pop_front() {
Expand Down Expand Up @@ -277,7 +183,7 @@ where
latest_seq = seq;
manifest_data_builder
.apply_update(update)
.context(ApplyUpdate)?;
.map_err(anyhow::Error::new)?;
}
Ok(Snapshot {
end_seq: latest_seq,
Expand All @@ -302,7 +208,7 @@ where
latest_seq = seq;
manifest_data_builder
.apply_update(update)
.context(ApplyUpdate)?;
.map_err(anyhow::Error::new)?;
has_logs = true;
}

Expand Down Expand Up @@ -633,7 +539,7 @@ impl MetaUpdateSnapshotStore for ObjectStoreBasedSnapshotStore {
self.store
.put(&self.snapshot_path, payload.into())
.await
.context(StoreSnapshot)?;
.map_err(anyhow::Error::new)?;

Ok(())
}
Expand Down Expand Up @@ -661,15 +567,13 @@ impl MetaUpdateSnapshotStore for ObjectStoreBasedSnapshotStore {
}

let payload = get_res
.context(FetchSnapshot)?
.map_err(anyhow::Error::new)?
.bytes()
.await
.context(FetchSnapshot)?;
.map_err(anyhow::Error::new)?;
let snapshot_pb =
manifest_pb::Snapshot::decode(payload.as_bytes()).context(DecodeSnapshot)?;
let snapshot = Snapshot::try_from(snapshot_pb)
.box_err()
.context(LoadSnapshot)?;
manifest_pb::Snapshot::decode(payload.as_bytes()).map_err(anyhow::Error::new)?;
let snapshot = Snapshot::try_from(snapshot_pb).map_err(anyhow::Error::new)?;

Ok(Some(snapshot))
}
Expand Down Expand Up @@ -702,7 +606,7 @@ impl MetaUpdateLogStore for WalBasedLogStore {
.wal_manager
.read_batch(&ctx, &read_req)
.await
.context(ReadWal)?;
.map_err(anyhow::Error::new)?;

Ok(MetaUpdateReaderImpl {
iter,
Expand All @@ -714,8 +618,12 @@ impl MetaUpdateLogStore for WalBasedLogStore {
async fn append(&self, meta_update: MetaUpdate) -> Result<SequenceNumber> {
let payload = MetaUpdatePayload::from(meta_update);
let log_batch_encoder = LogBatchEncoder::create(self.location);
let log_batch = log_batch_encoder.encode(&payload).context(EncodePayloads {
wal_location: self.location,
let log_batch = log_batch_encoder.encode(&payload).map_err(|e| {
anyhow::anyhow!(
"Failed to encode payloads, wal_location:{:?}, err:{}",
self.location,
e
)
})?;

let write_ctx = WriteContext {
Expand All @@ -725,14 +633,14 @@ impl MetaUpdateLogStore for WalBasedLogStore {
self.wal_manager
.write(&write_ctx, &log_batch)
.await
.context(WriteWal)
.map_err(|e| Error::from(anyhow::Error::new(e)))
}

async fn delete_up_to(&self, inclusive_end: SequenceNumber) -> Result<()> {
self.wal_manager
.mark_delete_entries_up_to(self.location, inclusive_end)
.await
.context(CleanWal)
.map_err(|e| Error::from(anyhow::Error::new(e)))
}
}

Expand Down
Loading

0 comments on commit 6edff22

Please sign in to comment.