Skip to content

Commit

Permalink
feat: implement opentsdb query (apache#1453)
Browse files Browse the repository at this point in the history
## Rationale
Opentsdb write protocol is already supported, this PR implement query
protocol.

## Detailed Changes
- Convert opentsdb query requests into datafusion logical plans
- Convert the RecordBatch format of the query results into the return
response format of the opentsdb query requests

## Test Plan
- Existing tests
- add new unit tests and integration

---------

Co-authored-by: jiacai2050 <[email protected]>
  • Loading branch information
baojinri and jiacai2050 authored Mar 29, 2024
1 parent 9d64fc1 commit a1db882
Show file tree
Hide file tree
Showing 16 changed files with 1,307 additions and 15 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

106 changes: 106 additions & 0 deletions integration_tests/cases/env/local/opentsdb/basic.result
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
--
DROP TABLE IF EXISTS `opentsdb_table1`;

affected_rows: 0

CREATE TABLE `opentsdb_table1` (
`time` timestamp NOT NULL,
`level_description` string TAG,
`location` string TAG,
`value` double,
timestamp KEY (time)) ENGINE = Analytic WITH (
enable_ttl = 'false'
);

affected_rows: 0

-- Insert Records:
-- ("2015-08-18T00:00:00Z", "between 6 and 9 feet", "coyote_creek", 8.12),
-- ("2015-08-18T00:00:00Z", "below 3 feet", "santa_monica", 2.064),
-- ("2015-08-18T00:06:00Z", "between 6 and 9 feet", "coyote_creek", 8.005),
-- ("2015-08-18T00:06:00Z", "below 3 feet", "santa_monica", 2.116),
-- ("2015-08-18T00:12:00Z", "between 6 and 9 feet", "coyote_creek", 7.887),
-- ("2015-08-18T00:12:00Z", "below 3 feet", "santa_monica", 2.028);
INSERT INTO opentsdb_table1(time, level_description, location, value)
VALUES
(1439827200000, "between 6 and 9 feet", "coyote_creek", 8.12),
(1439827200000, "below 3 feet", "santa_monica", 2.064),
(1439827560000, "between 6 and 9 feet", "coyote_creek", 8.005),
(1439827560000, "below 3 feet", "santa_monica", 2.116),
(1439827620000, "between 6 and 9 feet", "coyote_creek", 7.887),
(1439827620000, "below 3 feet", "santa_monica", 2.028);

affected_rows: 6

-- SQLNESS ARG protocol=opentsdb
{
"start": 1439827200000,
"end": 1439827620000,
"queries": [
{
"aggregator": "none",
"metric": "opentsdb_table1",
"tags": {}
}
]
}
;

[{"metric":"opentsdb_table1","tags":{"level_description":"below 3 feet","location":"santa_monica"},"aggregatedTags":[],"dps":{"1439827200000":2.064,"1439827560000":2.116,"1439827620000":2.028}},{"metric":"opentsdb_table1","tags":{"level_description":"between 6 and 9 feet","location":"coyote_creek"},"aggregatedTags":[],"dps":{"1439827200000":8.12,"1439827560000":8.005,"1439827620000":7.887}}]

-- SQLNESS ARG protocol=opentsdb
{
"start": 1439827200000,
"end": 1439827620000,
"queries": [
{
"aggregator": "none",
"metric": "opentsdb_table1",
"tags": {
"location": "coyote_creek"
}
}
]
}
;

[{"metric":"opentsdb_table1","tags":{"level_description":"between 6 and 9 feet","location":"coyote_creek"},"aggregatedTags":[],"dps":{"1439827200000":8.12,"1439827560000":8.005,"1439827620000":7.887}}]

-- SQLNESS ARG protocol=opentsdb
{
"start": 1439827200000,
"end": 1439827620000,
"queries": [
{
"aggregator": "sum",
"metric": "opentsdb_table1",
"tags": {
}
}
]
}
;

[{"metric":"opentsdb_table1","tags":{},"aggregatedTags":[],"dps":{"1439827200000":10.184,"1439827560000":10.121,"1439827620000":9.915}}]

DROP TABLE IF EXISTS `opentsdb_table1`;

affected_rows: 0

93 changes: 93 additions & 0 deletions integration_tests/cases/env/local/opentsdb/basic.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
--
-- Licensed to the Apache Software Foundation (ASF) under one
-- or more contributor license agreements. See the NOTICE file
-- distributed with this work for additional information
-- regarding copyright ownership. The ASF licenses this file
-- to you under the Apache License, Version 2.0 (the
-- "License"); you may not use this file except in compliance
-- with the License. You may obtain a copy of the License at
--
-- http://www.apache.org/licenses/LICENSE-2.0
--
-- Unless required by applicable law or agreed to in writing,
-- software distributed under the License is distributed on an
-- "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-- KIND, either express or implied. See the License for the
-- specific language governing permissions and limitations
-- under the License.
--

DROP TABLE IF EXISTS `opentsdb_table1`;

CREATE TABLE `opentsdb_table1` (
`time` timestamp NOT NULL,
`level_description` string TAG,
`location` string TAG,
`value` double,
timestamp KEY (time)) ENGINE = Analytic WITH (
enable_ttl = 'false'
);

-- Insert Records:
-- ("2015-08-18T00:00:00Z", "between 6 and 9 feet", "coyote_creek", 8.12),
-- ("2015-08-18T00:00:00Z", "below 3 feet", "santa_monica", 2.064),
-- ("2015-08-18T00:06:00Z", "between 6 and 9 feet", "coyote_creek", 8.005),
-- ("2015-08-18T00:06:00Z", "below 3 feet", "santa_monica", 2.116),
-- ("2015-08-18T00:12:00Z", "between 6 and 9 feet", "coyote_creek", 7.887),
-- ("2015-08-18T00:12:00Z", "below 3 feet", "santa_monica", 2.028);
INSERT INTO opentsdb_table1(time, level_description, location, value)
VALUES
(1439827200000, "between 6 and 9 feet", "coyote_creek", 8.12),
(1439827200000, "below 3 feet", "santa_monica", 2.064),
(1439827560000, "between 6 and 9 feet", "coyote_creek", 8.005),
(1439827560000, "below 3 feet", "santa_monica", 2.116),
(1439827620000, "between 6 and 9 feet", "coyote_creek", 7.887),
(1439827620000, "below 3 feet", "santa_monica", 2.028);


-- SQLNESS ARG protocol=opentsdb
{
"start": 1439827200000,
"end": 1439827620000,
"queries": [
{
"aggregator": "none",
"metric": "opentsdb_table1",
"tags": {}
}
]
}
;

-- SQLNESS ARG protocol=opentsdb
{
"start": 1439827200000,
"end": 1439827620000,
"queries": [
{
"aggregator": "none",
"metric": "opentsdb_table1",
"tags": {
"location": "coyote_creek"
}
}
]
}
;

-- SQLNESS ARG protocol=opentsdb
{
"start": 1439827200000,
"end": 1439827620000,
"queries": [
{
"aggregator": "sum",
"metric": "opentsdb_table1",
"tags": {
}
}
]
}
;

DROP TABLE IF EXISTS `opentsdb_table1`;
28 changes: 28 additions & 0 deletions integration_tests/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ pub struct HoraeDB<T> {
enum Protocol {
Sql,
InfluxQL,
OpenTSDB,
}

impl TryFrom<&str> for Protocol {
Expand All @@ -252,6 +253,7 @@ impl TryFrom<&str> for Protocol {
let protocol = match s {
"influxql" => Protocol::InfluxQL,
"sql" => Protocol::Sql,
"opentsdb" => Protocol::OpenTSDB,
_ => return Err(format!("unknown protocol:{s}")),
};

Expand Down Expand Up @@ -312,6 +314,10 @@ impl<T: Send + Sync> Database for HoraeDB<T> {
let http_client = self.http_client.clone();
Self::execute_influxql(query, http_client, context.context).await
}
Protocol::OpenTSDB => {
let http_client = self.http_client.clone();
Self::execute_opentsdb(query, http_client, context.context).await
}
}
}
}
Expand Down Expand Up @@ -383,6 +389,28 @@ impl<T> HoraeDB<T> {
Box::new(query_res)
}

async fn execute_opentsdb(
query: String,
http_client: HttpClient,
_params: HashMap<String, String>,
) -> Box<dyn Display> {
let query = query.trim().trim_end_matches(';');
let url = format!("http://{}/opentsdb/api/query", http_client.endpoint);
let resp = http_client
.client
.post(url)
.header("content-type", "application/json")
.body(query.to_string())
.send()
.await
.unwrap();
let query_res = match resp.text().await {
Ok(text) => text,
Err(e) => format!("Failed to do influxql query, err:{e:?}"),
};
Box::new(query_res)
}

async fn execute_sql(query: String, client: Arc<dyn DbClient>) -> Box<dyn Display> {
let query_ctx = RpcContext {
database: Some("public".to_string()),
Expand Down
41 changes: 41 additions & 0 deletions src/common_types/src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,47 @@ pub fn build_schema_for_cpu() -> Schema {
builder.primary_key_indexes(vec![0, 1]).build().unwrap()
}

/// Build a schema for testing:
/// (tsid(uint64), key2(timestamp), tag1(string), tag2(string), value(double),
pub fn build_schema_for_metric() -> Schema {
let builder = schema::Builder::new()
.auto_increment_column_id(true)
.add_key_column(
column_schema::Builder::new(TSID_COLUMN.to_string(), DatumKind::UInt64)
.build()
.unwrap(),
)
.unwrap()
.add_key_column(
column_schema::Builder::new("timestamp".to_string(), DatumKind::Timestamp)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("tag1".to_string(), DatumKind::String)
.is_tag(true)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("tag2".to_string(), DatumKind::String)
.is_tag(true)
.build()
.unwrap(),
)
.unwrap()
.add_normal_column(
column_schema::Builder::new("value".to_string(), DatumKind::Double)
.build()
.unwrap(),
)
.unwrap();

builder.primary_key_indexes(vec![0, 1]).build().unwrap()
}

#[allow(clippy::too_many_arguments)]
pub fn build_row_for_dictionary(
key1: &[u8],
Expand Down
Loading

0 comments on commit a1db882

Please sign in to comment.