Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: add integration test for compaction offload #1573

Merged
merged 11 commits into from
Oct 30, 2024
65 changes: 65 additions & 0 deletions integration_tests/cases/common/compact/compact.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
--
DROP TABLE IF EXISTS `compact_table1`;

affected_rows: 0

CREATE TABLE `compact_table1` (
`timestamp` timestamp NOT NULL,
`value` double,
`dic` string dictionary,
timestamp KEY (timestamp)) ENGINE=Analytic
WITH(
enable_ttl='false',
update_mode='OVERWRITE'
);

affected_rows: 0

INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
VALUES (1, 100, "d1"), (2, 200, "d2"), (3, 300, "d3");

affected_rows: 3

-- SQLNESS ARG pre_cmd=flush
INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
VALUES (1, 100, "update_d1"), (2, 200, "update_d2"), (3, 300, "update_d3");

affected_rows: 3

-- trigger manual compaction after flush memtable
-- SQLNESS ARG pre_cmd=flush
-- SQLNESS ARG pre_cmd=compact
SELECT
*
FROM
`compact_table1`
ORDER BY
`value` ASC;

tsid,timestamp,value,dic,
UInt64(0),Timestamp(1),Double(100.0),String("update_d1"),
UInt64(0),Timestamp(2),Double(200.0),String("update_d2"),
UInt64(0),Timestamp(3),Double(300.0),String("update_d3"),


DROP TABLE `compact_table1`;

affected_rows: 0

52 changes: 52 additions & 0 deletions integration_tests/cases/common/compact/compact.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
--

DROP TABLE IF EXISTS `compact_table1`;

CREATE TABLE `compact_table1` (
`timestamp` timestamp NOT NULL,
`value` double,
`dic` string dictionary,
timestamp KEY (timestamp)) ENGINE=Analytic
WITH(
enable_ttl='false',
update_mode='OVERWRITE'
);


INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
VALUES (1, 100, "d1"), (2, 200, "d2"), (3, 300, "d3");

-- SQLNESS ARG pre_cmd=flush
INSERT INTO `compact_table1` (`timestamp`, `value`, `dic`)
VALUES (1, 100, "update_d1"), (2, 200, "update_d2"), (3, 300, "update_d3");


-- trigger manual compaction after flush memtable
-- SQLNESS ARG pre_cmd=flush
-- SQLNESS ARG pre_cmd=compact
Rachelint marked this conversation as resolved.
Show resolved Hide resolved
SELECT
*
FROM
`compact_table1`
ORDER BY
`value` ASC;


DROP TABLE `compact_table1`;
5 changes: 5 additions & 0 deletions integration_tests/config/horaedb-cluster-0.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@ data_dir = "/tmp/horaedb0"
type = "RocksDB"
data_dir = "/tmp/horaedb0"

[analytic.compaction_mode]
Rachelint marked this conversation as resolved.
Show resolved Hide resolved
compaction_mode = "Offload"
node_picker = "Local"
endpoint = "127.0.0.1:8831"

[cluster_deployment]
mode = "WithMeta"
cmd_channel_buffer_size = 10
Expand Down
3 changes: 3 additions & 0 deletions integration_tests/config/horaedb-cluster-1.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@ data_dir = "/tmp/horaedb1"
type = "RocksDB"
data_dir = "/tmp/horaedb1"

[analytic.compaction_mode]
compaction_mode = "Local"

[cluster_deployment]
mode = "WithMeta"
cmd_channel_buffer_size = 10
Expand Down
21 changes: 21 additions & 0 deletions integration_tests/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ impl TryFrom<&str> for Protocol {
#[derive(Debug, Clone, Copy)]
enum Command {
Flush,
Compact,
}

impl TryFrom<&str> for Command {
Expand All @@ -272,6 +273,7 @@ impl TryFrom<&str> for Command {
fn try_from(s: &str) -> Result<Self, Self::Error> {
let cmd = match s {
"flush" => Self::Flush,
"compact" => Self::Compact,
_ => return Err(format!("Unknown command:{s}")),
};

Expand Down Expand Up @@ -305,6 +307,12 @@ impl<T: Send + Sync> Database for HoraeDB<T> {
panic!("Execute flush command failed, err:{e}");
}
}
Command::Compact => {
println!("Compact table...");
if let Err(e) = self.execute_compact().await {
panic!("Execute compact command failed, err:{e}");
}
}
}
}

Expand Down Expand Up @@ -363,6 +371,19 @@ impl<T> HoraeDB<T> {
Err(resp.text().await.unwrap_or_else(|e| format!("{e:?}")))
}

async fn execute_compact(&self) -> Result<(), String> {
// TODO(leslie): Improve code reusability. The following code is similar to
// `execute_flush()`.
let url = format!("http://{}/debug/compact_table", self.http_client.endpoint);
let resp = self.http_client.client.post(url).send().await.unwrap();

if resp.status() == StatusCode::OK {
return Ok(());
}

Err(resp.text().await.unwrap_or_else(|e| format!("{e:?}")))
}

async fn execute_influxql(
query: String,
http_client: HttpClient,
Expand Down
49 changes: 49 additions & 0 deletions src/server/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,7 @@ impl Service {
.or(self.admin_block())
// debug APIs
.or(self.flush_memtable())
.or(self.compact_table())
.or(self.update_log_level())
.or(self.profile_cpu())
.or(self.profile_heap())
Expand Down Expand Up @@ -524,6 +525,54 @@ impl Service {
})
}

// POST /debug/compact_table
fn compact_table(
&self,
) -> impl Filter<Extract = (impl warp::Reply,), Error = warp::Rejection> + Clone {
warp::path!("debug" / "compact_table")
Rachelint marked this conversation as resolved.
Show resolved Hide resolved
.and(warp::post())
.and(self.with_instance())
.and_then(|instance: InstanceRef| async move {
let get_all_tables = || {
let mut tables = Vec::new();
for catalog in instance
.catalog_manager
.all_catalogs()
.box_err()
.context(Internal)?
{
for schema in catalog.all_schemas().box_err().context(Internal)? {
for table in schema.all_tables().box_err().context(Internal)? {
tables.push(table);
}
}
}
Result::Ok(tables)
};
match get_all_tables() {
Ok(tables) => {
let mut failed_tables = Vec::new();
let mut success_tables = Vec::new();

for table in tables {
let table_name = table.name().to_string();
if let Err(e) = table.compact().await {
error!("compact {} failed, err:{}", &table_name, e);
failed_tables.push(table_name);
} else {
success_tables.push(table_name);
}
}
let mut result = HashMap::new();
result.insert("success", success_tables);
result.insert("failed", failed_tables);
Ok(reply::json(&result))
}
Err(e) => Err(reject::custom(e)),
}
})
}

// GET /metrics
fn metrics(
&self,
Expand Down
6 changes: 3 additions & 3 deletions src/wal/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,15 @@ workspace = true
[package.authors]
workspace = true

[package.edition]
workspace = true

[dependencies.rocksdb]
git = "https://github.com/tikv/rust-rocksdb.git"
rev = "f04f4dd8eacc30e67c24bc2529a6d9c6edb85f8f"
features = ["portable"]
optional = true

[package.edition]
workspace = true

[features]
wal-message-queue = ["dep:message_queue"]
wal-table-kv = ["dep:table_kv"]
Expand Down
Loading