Skip to content

Commit

Permalink
[#1736] feat(postgresql): Support PostgreSQL index.
Browse files Browse the repository at this point in the history
  • Loading branch information
Clearvive authored and Clearvive committed Jan 31, 2024
1 parent 45c0f57 commit 17131ec
Show file tree
Hide file tree
Showing 4 changed files with 131 additions and 20 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
/*
* Copyright 2024 Datastrato Pvt Ltd.
* This software is licensed under the Apache License version 2.
*/
package com.datastrato.gravitino.catalog.jdbc.bean;

import com.datastrato.gravitino.rel.indexes.Index;
import java.util.Objects;

public class JdbcIndexBean {

private final Index.IndexType indexType;

private final String colName;

private final String name;

private final int order;

public JdbcIndexBean(Index.IndexType indexType, String colName, String name, int order) {
this.indexType = indexType;
this.colName = colName;
this.name = name;
this.order = order;
}

public Index.IndexType getIndexType() {
return indexType;
}

public String getColName() {
return colName;
}

public String getName() {
return name;
}

public int getOrder() {
return order;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
JdbcIndexBean that = (JdbcIndexBean) o;
return order == that.order
&& indexType == that.indexType
&& Objects.equals(colName, that.colName)
&& Objects.equals(name, that.name);
}

@Override
public int hashCode() {
return Objects.hash(indexType, colName, name, order);
}

@Override
public String toString() {
return "JdbcIndexBean{"
+ "indexType="
+ indexType
+ ", colName='"
+ colName
+ '\''
+ ", name='"
+ name
+ '\''
+ ", order="
+ order
+ '}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.datastrato.gravitino.catalog.jdbc.JdbcColumn;
import com.datastrato.gravitino.catalog.jdbc.JdbcTable;
import com.datastrato.gravitino.catalog.jdbc.bean.JdbcIndexBean;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcExceptionConverter;
import com.datastrato.gravitino.catalog.jdbc.converter.JdbcTypeConverter;
import com.datastrato.gravitino.catalog.jdbc.utils.JdbcConnectorUtils;
Expand All @@ -17,18 +18,18 @@
import com.datastrato.gravitino.rel.expressions.transforms.Transform;
import com.datastrato.gravitino.rel.indexes.Index;
import com.datastrato.gravitino.rel.indexes.Indexes;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.Lists;
import com.google.common.collect.SetMultimap;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import javax.sql.DataSource;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -246,28 +247,59 @@ protected List<Index> getIndexes(String databaseName, String tableName, Database
List<Index> indexes = new ArrayList<>();

// Get primary key information
SetMultimap<String, String> primaryKeyGroupByName = HashMultimap.create();
ResultSet primaryKeys = getPrimaryKeys(databaseName, tableName, metaData);
List<JdbcIndexBean> jdbcIndexBeans = new ArrayList<>();
while (primaryKeys.next()) {
String columnName = primaryKeys.getString("COLUMN_NAME");
primaryKeyGroupByName.put(primaryKeys.getString("PK_NAME"), columnName);
}
for (String key : primaryKeyGroupByName.keySet()) {
indexes.add(Indexes.primary(key, convertIndexFieldNames(primaryKeyGroupByName.get(key))));
jdbcIndexBeans.add(
new JdbcIndexBean(
Index.IndexType.PRIMARY_KEY,
primaryKeys.getString("COLUMN_NAME"),
primaryKeys.getString("PK_NAME"),
primaryKeys.getInt("KEY_SEQ")));
}

Set<String> primaryKeyNames =
jdbcIndexBeans.stream()
.filter(jdbcIndexBean -> jdbcIndexBean.getIndexType() == Index.IndexType.PRIMARY_KEY)
.map(JdbcIndexBean::getName)
.collect(Collectors.toSet());

// Get unique key information
SetMultimap<String, String> indexGroupByName = HashMultimap.create();
ResultSet indexInfo = getIndexInfo(databaseName, tableName, metaData);
while (indexInfo.next()) {
String indexName = indexInfo.getString("INDEX_NAME");
if (!indexInfo.getBoolean("NON_UNIQUE") && !primaryKeyGroupByName.containsKey(indexName)) {
String columnName = indexInfo.getString("COLUMN_NAME");
indexGroupByName.put(indexName, columnName);
if (!indexInfo.getBoolean("NON_UNIQUE") && !primaryKeyNames.contains(indexName)) {
jdbcIndexBeans.add(
new JdbcIndexBean(
Index.IndexType.UNIQUE_KEY,
indexInfo.getString("COLUMN_NAME"),
indexName,
indexInfo.getInt("ORDINAL_POSITION")));
}
}
for (String key : indexGroupByName.keySet()) {
indexes.add(Indexes.unique(key, convertIndexFieldNames(indexGroupByName.get(key))));

// Assemble into Index
Map<Index.IndexType, List<JdbcIndexBean>> indexBeanGroupByIndexType =
jdbcIndexBeans.stream().collect(Collectors.groupingBy(JdbcIndexBean::getIndexType));

for (Map.Entry<Index.IndexType, List<JdbcIndexBean>> entry :
indexBeanGroupByIndexType.entrySet()) {
// Group by index Name
Map<String, List<JdbcIndexBean>> indexBeanGroupByName =
entry.getValue().stream().collect(Collectors.groupingBy(JdbcIndexBean::getName));
for (Map.Entry<String, List<JdbcIndexBean>> indexEntry : indexBeanGroupByName.entrySet()) {
List<String> colNames =
indexEntry.getValue().stream()
.sorted(Comparator.comparingInt(JdbcIndexBean::getOrder))
.map(JdbcIndexBean::getColName)
.collect(Collectors.toList());
String[][] colStrArrays = convertIndexFieldNames(colNames);
if (entry.getKey() == Index.IndexType.PRIMARY_KEY) {
indexes.add(Indexes.primary(indexEntry.getKey(), colStrArrays));
} else {
indexes.add(Indexes.unique(indexEntry.getKey(), colStrArrays));
}
}
}
return indexes;
}
Expand All @@ -282,7 +314,7 @@ protected ResultSet getPrimaryKeys(
return metaData.getPrimaryKeys(databaseName, null, tableName);
}

protected String[][] convertIndexFieldNames(Set<String> fieldNames) {
protected String[][] convertIndexFieldNames(List<String> fieldNames) {
return fieldNames.stream().map(colName -> new String[] {colName}).toArray(String[][]::new);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -754,6 +754,11 @@ public void testAutoIncrement() {
col4,
col5
};
indexes =
new Index[] {
Indexes.createMysqlPrimaryKey(new String[][] {{"col_1_1"}, {"col_2"}}),
Indexes.unique("u1_key", new String[][] {{"col_2"}, {"col_3"}})
};
assertionsTableInfo(
tableName, table_comment, Arrays.asList(alterColumns), properties, indexes, table);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.ArrayUtils;
Expand Down Expand Up @@ -247,10 +246,11 @@ protected static void assertionsTableInfo(
Assertions.assertEquals(
indexByName.get(table.index()[i].name()).type(), table.index()[i].type());
for (int j = 0; j < table.index()[i].fieldNames().length; j++) {
Set<String> colNames =
Arrays.stream(indexByName.get(table.index()[i].name()).fieldNames()[j])
.collect(Collectors.toSet());
colNames.containsAll(Arrays.asList(table.index()[i].fieldNames()[j]));
for (int k = 0; k < table.index()[i].fieldNames()[j].length; k++) {
Assertions.assertEquals(
indexByName.get(table.index()[i].name()).fieldNames()[j][k],
table.index()[i].fieldNames()[j][k]);
}
}
}
}
Expand Down

0 comments on commit 17131ec

Please sign in to comment.