Skip to content

Commit

Permalink
[feature](statistics) Support get row count for JDBC external table. (a…
Browse files Browse the repository at this point in the history
  • Loading branch information
Jibing-Li authored Sep 2, 2024
1 parent 70eeda9 commit 8ff068e
Show file tree
Hide file tree
Showing 2 changed files with 88 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,28 @@

package org.apache.doris.datasource.jdbc;

import org.apache.doris.analysis.StatementBase;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.JdbcResource;
import org.apache.doris.catalog.JdbcTable;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.SchemaCacheValue;
import org.apache.doris.qe.AutoCloseConnectContext;
import org.apache.doris.qe.StmtExecutor;
import org.apache.doris.statistics.AnalysisInfo;
import org.apache.doris.statistics.BaseAnalysisTask;
import org.apache.doris.statistics.JdbcAnalysisTask;
import org.apache.doris.statistics.ResultRow;
import org.apache.doris.statistics.util.StatisticsUtil;
import org.apache.doris.thrift.TTableDescriptor;

import org.apache.commons.text.StringSubstitutor;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand All @@ -38,6 +47,9 @@
public class JdbcExternalTable extends ExternalTable {
private static final Logger LOG = LogManager.getLogger(JdbcExternalTable.class);

public static final String MYSQL_ROW_COUNT_SQL = "SELECT * FROM QUERY"
+ "(\"catalog\"=\"${ctlName}\", \"query\"=\"show table status from `${dbName}` like '${tblName}'\");";

private JdbcTable jdbcTable;

/**
Expand Down Expand Up @@ -98,4 +110,50 @@ public BaseAnalysisTask createAnalysisTask(AnalysisInfo info) {
makeSureInitialized();
return new JdbcAnalysisTask(info);
}

@Override
public long fetchRowCount() {
Map<String, String> params = new HashMap<>();
params.put("ctlName", catalog.getName());
params.put("dbName", dbName);
params.put("tblName", name);
switch (((JdbcExternalCatalog) catalog).getDatabaseTypeName()) {
case JdbcResource.MYSQL:
try (AutoCloseConnectContext r = StatisticsUtil.buildConnectContext()) {
StringSubstitutor stringSubstitutor = new StringSubstitutor(params);
String sql = stringSubstitutor.replace(MYSQL_ROW_COUNT_SQL);
StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, sql);
List<ResultRow> resultRows = stmtExecutor.executeInternalQuery();
if (resultRows == null || resultRows.size() != 1) {
LOG.info("No mysql status found for table {}.{}.{}", catalog.getName(), dbName, name);
return -1;
}
StatementBase parsedStmt = stmtExecutor.getParsedStmt();
if (parsedStmt == null || parsedStmt.getColLabels() == null) {
LOG.info("No column label found for table {}.{}.{}", catalog.getName(), dbName, name);
return -1;
}
ResultRow resultRow = resultRows.get(0);
List<String> colLabels = parsedStmt.getColLabels();
int index = colLabels.indexOf("TABLE_ROWS");
if (index == -1) {
LOG.info("No TABLE_ROWS in status for table {}.{}.{}", catalog.getName(), dbName, name);
return -1;
}
long rows = Long.parseLong(resultRow.get(index));
LOG.info("Get mysql table {}.{}.{} row count {}", catalog.getName(), dbName, name, rows);
return rows;
} catch (Exception e) {
LOG.warn("Failed to fetch mysql row count for table {}.{}.{}. Reason [{}]",
catalog.getName(), dbName, name, e.getMessage());
return -1;
}
case JdbcResource.ORACLE:
case JdbcResource.POSTGRESQL:
case JdbcResource.SQLSERVER:
default:
break;
}
return -1;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

suite("test_mysql_jdbc_statistics", "p0,external,mysql,external_docker,external_docker_mysql") {
String enabled = context.config.otherConfigs.get("enableJdbcTest")
logger.info("enabled " + enabled)
String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
String mysql_port = context.config.otherConfigs.get("mysql_57_port");
String s3_endpoint = getS3Endpoint()
Expand All @@ -35,28 +36,40 @@ suite("test_mysql_jdbc_statistics", "p0,external,mysql,external_docker,external_
);"""

sql """use ${catalog_name}.doris_test"""

def result = sql """show table stats ex_tb0"""
Thread.sleep(1000)
for (int i = 0; i < 20; i++) {
result = sql """show table stats ex_tb0""";
if (result[0][2] != "-1") {
assertEquals("5", result[0][2])
break;
}
logger.info("Table row count not ready yet. Wait 1 second.")
Thread.sleep(1000)
}
sql """analyze table ex_tb0 with sync"""
def result = sql """show column stats ex_tb0 (name)"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] == "name")
assertTrue(result[0][2] == "5.0")
assertTrue(result[0][3] == "5.0")
assertTrue(result[0][4] == "0.0")
assertTrue(result[0][5] == "15.0")
assertTrue(result[0][6] == "3.0")
result = sql """show column stats ex_tb0 (name)"""
assertEquals(result.size(), 1)
assertEquals(result[0][0], "name")
assertEquals(result[0][2], "5.0")
assertEquals(result[0][3], "5.0")
assertEquals(result[0][4], "0.0")
assertEquals(result[0][5], "15.0")
assertEquals(result[0][6], "3.0")
assertEquals(result[0][7], "'abc'")
assertEquals(result[0][8], "'abg'")

result = sql """show column stats ex_tb0 (id)"""
assertTrue(result.size() == 1)
assertTrue(result[0][0] == "id")
assertTrue(result[0][2] == "5.0")
assertTrue(result[0][3] == "5.0")
assertTrue(result[0][4] == "0.0")
assertTrue(result[0][5] == "20.0")
assertTrue(result[0][6] == "4.0")
assertTrue(result[0][7] == "111")
assertTrue(result[0][8] == "115")
assertEquals(result.size(), 1)
assertEquals(result[0][0], "id")
assertEquals(result[0][2], "5.0")
assertEquals(result[0][3], "5.0")
assertEquals(result[0][4], "0.0")
assertEquals(result[0][5], "20.0")
assertEquals(result[0][6], "4.0")
assertEquals(result[0][7], "111")
assertEquals(result[0][8], "115")

sql """drop catalog ${catalog_name}"""
}
Expand Down

0 comments on commit 8ff068e

Please sign in to comment.