Skip to content

Commit

Permalink
Merge branch 'main' into DAT-18791
Browse files Browse the repository at this point in the history
  • Loading branch information
KushnirykOleh authored Nov 18, 2024
2 parents ac799c3 + 5da8172 commit b48a45f
Show file tree
Hide file tree
Showing 27 changed files with 603 additions and 58 deletions.
6 changes: 3 additions & 3 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<liquibase.version>4.29.2</liquibase.version>
<liquibase.version>4.30.0</liquibase.version>
<sonar.organization>liquibase</sonar.organization>
<sonar.projectKey>${sonar.organization}_${project.artifactId}</sonar.projectKey>
<sonar.projectName>${project.name}</sonar.projectName>
Expand Down Expand Up @@ -89,7 +89,7 @@
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.34</version>
<version>1.18.36</version>
<scope>provided</scope>
</dependency>
<dependency>
Expand Down Expand Up @@ -214,7 +214,7 @@
<plugin>
<groupId>org.sonarsource.scanner.maven</groupId>
<artifactId>sonar-maven-plugin</artifactId>
<version>4.0.0.4121</version>
<version>5.0.0.4389</version>
</plugin>

<plugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
import java.util.ArrayList;
import java.util.List;

import static liquibase.ext.databricks.parser.NamespaceDetailsDatabricks.DATABRICKS_NAMESPACE;

@DatabaseChange(name = "alterCluster", description = "Alter Cluster", priority = PrioritizedService.PRIORITY_DATABASE +500)
public class AlterClusterChangeDatabricks extends AbstractChange {

Expand Down Expand Up @@ -104,4 +106,9 @@ public List<NoneConfig> getClusterBy() {
public void setClusterBy(List<NoneConfig> clusterBy) {
this.clusterBy = clusterBy;
}

@Override
public String getSerializedObjectNamespace() {
return DATABRICKS_NAMESPACE;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import liquibase.database.Database;
import liquibase.exception.ValidationErrors;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.ext.databricks.parser.NamespaceDetailsDatabricks;
import liquibase.servicelocator.PrioritizedService;
import liquibase.statement.core.CreateTableStatement;
import lombok.Setter;
Expand All @@ -15,6 +14,10 @@
@DatabaseChange(name = "createTable", description = "Create Table", priority = PrioritizedService.PRIORITY_DATABASE)
@Setter
public class CreateTableChangeDatabricks extends CreateTableChange {
private static final String DOUBLE_INIT_ERROR = "Double initialization of extended table properties is not allowed. " +
"Please avoid using both EXT createTable attributes and Databricks specific extendedTableProperties element. " +
"Element databricks:extendedTableProperties is preferred way to set databricks specific configurations.";
private static final String PARTITION_CLUSTER_COLLISION_ERROR = "Databricks does not support CLUSTER columns AND PARTITION BY columns, please pick one.";
private String tableFormat;
private String tableLocation;
private String clusterColumns;
Expand All @@ -32,7 +35,16 @@ public ValidationErrors validate(Database database) {
validationErrors.addAll(super.validate(database));

if (partitionColumns != null && clusterColumns != null) {
validationErrors.addError("Databricks does not support CLUSTER columns AND PARTITION BY columns, please pick one. And do not supply the other");
validationErrors.addError(PARTITION_CLUSTER_COLLISION_ERROR);
}
if(extendedTableProperties != null) {
boolean anyPropertyDuplicated = tableFormat != null && extendedTableProperties.getTableFormat() != null
|| tableLocation != null && extendedTableProperties.getTableLocation() != null
|| clusterColumns != null && extendedTableProperties.getClusterColumns() != null
|| partitionColumns != null && extendedTableProperties.getPartitionColumns() !=null;
if(anyPropertyDuplicated) {
validationErrors.addError(DOUBLE_INIT_ERROR);
}
}
return validationErrors;
}
Expand All @@ -58,11 +70,18 @@ protected CreateTableStatement generateCreateTableStatement() {

CreateTableStatementDatabricks ctas = new CreateTableStatementDatabricks(getCatalogName(), getSchemaName(), getTableName());

ctas.setTableFormat(this.getTableFormat());
ctas.setTableLocation(this.getTableLocation());
ctas.setClusterColumns(this.getClusterColumns());
ctas.setPartitionColumns(this.getPartitionColumns());
ctas.setExtendedTableProperties(this.getExtendedTableProperties());
if(this.getExtendedTableProperties() != null) {
ctas.setTableLocation(getExtendedTableProperties().getTableLocation());
ctas.setTableFormat(getExtendedTableProperties().getTableFormat());
ctas.setClusterColumns(getExtendedTableProperties().getClusterColumns());
ctas.setPartitionColumns(getExtendedTableProperties().getPartitionColumns());
ctas.setExtendedTableProperties(this.getExtendedTableProperties());
} else {
ctas.setTableFormat(this.getTableFormat());
ctas.setTableLocation(this.getTableLocation());
ctas.setClusterColumns(this.getClusterColumns());
ctas.setPartitionColumns(this.getPartitionColumns());
}

return ctas;
}
Expand All @@ -71,13 +90,4 @@ protected CreateTableStatement generateCreateTableStatement() {
public ExtendedTableProperties getExtendedTableProperties() {
return extendedTableProperties;
}

@Override
public String getSerializableFieldNamespace(String field) {
if("clusterColumns".equalsIgnoreCase(field)) {
return NamespaceDetailsDatabricks.DATABRICKS_NAMESPACE;
}
return getSerializedObjectNamespace();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,11 @@
@Setter
@Getter
public class ExtendedTableProperties extends AbstractLiquibaseSerializable{
private String tableFormat;
private String tableLocation;
private String tblProperties;
private String clusterColumns;
private String partitionColumns;

@Override
public String getSerializedObjectName() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Catalog;
import liquibase.structure.core.Schema;
import org.apache.commons.lang3.StringUtils;
import lombok.Setter;

import java.math.BigInteger;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.*;


Expand Down Expand Up @@ -363,4 +363,23 @@ public void setConnection(DatabaseConnection conn) {
super.setConnection(dbConn);
}

@Override
public void checkDatabaseConnection() throws DatabaseException {
DatabricksConnection connection = (DatabricksConnection) getConnection();
try {
String catalogName = getConnectionCatalogName();
String schemaName = getConnectionSchemaName();
ResultSet schemasAlikeUsed = connection.getMetaData().getSchemas(catalogName, schemaName);
while (schemasAlikeUsed.next()) {
if (schemasAlikeUsed.getString(1).equals(schemaName)) {
return;
}
}
throw new DatabaseException(String.format("Please specify existing schema and catalog in connection url. " +
"Current connection points to '%s.%s'", catalogName, schemaName));
} catch (SQLException e) {
Scope.getCurrentScope().getLog(getClass()).info("Error checking database connection", e);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,18 @@
import liquibase.diff.output.changelog.ChangeGeneratorChain;
import liquibase.diff.output.changelog.core.ChangedTableChangeGenerator;
import liquibase.ext.databricks.change.AbstractAlterPropertiesChangeDatabricks;
import liquibase.ext.databricks.change.alterCluster.AlterClusterChangeDatabricks;
import liquibase.ext.databricks.change.alterCluster.ColumnConfig;
import liquibase.ext.databricks.change.alterCluster.NoneConfig;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Table;

import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;

import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getAlterTablePropertiesChangeDatabricks;

Expand Down Expand Up @@ -43,8 +50,50 @@ public Change[] fixChanged(DatabaseObject changedObject, ObjectDifferences diffe
System.arraycopy(change, 0, changes, changes.length - change.length, change.length);
}
}
if (difference.getField().equals("clusteringColumns")) {
AlterClusterChangeDatabricks[] change = getAlterClusterChangeDatabricks((Table) changedObject, control, difference);
if (changes == null || changes.length == 0) {
changes = change;
} else {
changes = Arrays.copyOf(changes, changes.length + change.length);
System.arraycopy(change, 0, changes, changes.length - change.length, change.length);
}
}
}
return changes;
}

private AlterClusterChangeDatabricks[] getAlterClusterChangeDatabricks(Table changedObject, DiffOutputControl control, Difference difference) {
AlterClusterChangeDatabricks[] changes = new AlterClusterChangeDatabricks[0];
List<String> referencedValues = difference.getReferenceValue() == null ? null :
Arrays.asList(((String)difference.getReferenceValue()).split(","));
List<String> comparedValues = difference.getComparedValue() == null ? null :
Arrays.asList(((String)difference.getComparedValue()).split(","));
if(!Objects.equals(referencedValues, comparedValues)) {
AlterClusterChangeDatabricks change = new AlterClusterChangeDatabricks();
change.setTableName(changedObject.getName());
if (control.getIncludeCatalog()) {
change.setCatalogName(changedObject.getSchema().getCatalogName());
}
if (control.getIncludeSchema()) {
change.setSchemaName(changedObject.getSchema().getName());
}
if (referencedValues == null) {
NoneConfig noneConfig = new NoneConfig();
noneConfig.setNone("true");
change.setClusterBy(Collections.singletonList(noneConfig));
} else {
List<ColumnConfig> columnConfigList = referencedValues.stream().map(colName -> {
ColumnConfig columnConfig = new ColumnConfig();
columnConfig.setName(colName);
return columnConfig;
}).collect(Collectors.toList());
change.setColumns(columnConfigList);
}
changes = new AlterClusterChangeDatabricks[]{change};
}

return changes;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private static Map<String, String> convertToMapExcludingDeltaParameters(Object r
/**
* Get the extended properties excluding delta parameters
*/
public static String getExtendedProperties(String tblProperties) {
public static String getFilteredTblProperties(String tblProperties) {
Map<String, String> properties = convertToMapExcludingDeltaParameters(tblProperties);
return properties.entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining(","));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
import liquibase.diff.output.changelog.ChangeGeneratorChain;
import liquibase.diff.output.changelog.core.ChangedViewChangeGenerator;
import liquibase.ext.databricks.change.AbstractAlterPropertiesChangeDatabricks;
import liquibase.ext.databricks.change.alterViewProperties.AlterViewPropertiesChangeDatabricks;
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.View;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import liquibase.ext.databricks.database.DatabricksDatabase;
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.Table;
import org.apache.commons.lang3.ObjectUtils;

import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getExtendedProperties;
import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getFilteredTblProperties;

public class MissingTableChangeGeneratorDatabricks extends MissingTableChangeGenerator {

Expand All @@ -32,18 +33,21 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
if (changes == null || changes.length == 0) {
return changes;
}
//so far we intentionally omit tableLocation in generated changelog
ExtendedTableProperties extendedTableProperties = new ExtendedTableProperties(
null,
getExtendedProperties(missingObject.getAttribute("tblProperties", String.class)));
String clusterColumns = missingObject.getAttribute("clusteringColumns", "");
String tblProperties = getFilteredTblProperties(missingObject.getAttribute("tblProperties", String.class));
tblProperties = tblProperties.isEmpty() ? null : tblProperties;
String clusteringColumns = missingObject.getAttribute("clusteringColumns", String.class);
String partitionColumns = missingObject.getAttribute("partitionColumns", String.class);
ExtendedTableProperties extendedTableProperties = null;
//so far we intentionally omit tableLocation and tableFormat in generated changelog
if(ObjectUtils.anyNotNull(clusteringColumns, partitionColumns, tblProperties)) {
extendedTableProperties = new ExtendedTableProperties(null, null, tblProperties, clusteringColumns, partitionColumns);
}

changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes, clusterColumns);
changes[0] = getCreateTableChangeDatabricks(extendedTableProperties, changes);
return changes;
}

private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties,
Change[] changes, String clusterColumns) {
private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTableProperties extendedTableProperties, Change[] changes) {
CreateTableChange temp = (CreateTableChange) changes[0];
CreateTableChangeDatabricks createTableChangeDatabricks = new CreateTableChangeDatabricks();
createTableChangeDatabricks.setColumns(temp.getColumns());
Expand All @@ -55,10 +59,7 @@ private CreateTableChangeDatabricks getCreateTableChangeDatabricks(ExtendedTable
createTableChangeDatabricks.setRemarks(temp.getRemarks());
createTableChangeDatabricks.setIfNotExists(temp.getIfNotExists());
createTableChangeDatabricks.setRowDependencies(temp.getRowDependencies());
if (!clusterColumns.isEmpty()) {
createTableChangeDatabricks.setClusterColumns(clusterColumns);
}

//All not null properties should be attached in the CreateTableChangeDatabricks::generateCreateTableStatement
createTableChangeDatabricks.setExtendedTableProperties(extendedTableProperties);
return createTableChangeDatabricks;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
import liquibase.structure.DatabaseObject;
import liquibase.structure.core.View;

import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getExtendedProperties;
import static liquibase.ext.databricks.diff.output.changelog.ChangedTblPropertiesUtil.getFilteredTblProperties;

/**
* Custom implementation of {@link MissingViewChangeGenerator} for Databricks.
Expand All @@ -33,7 +33,7 @@ public Change[] fixMissing(DatabaseObject missingObject, DiffOutputControl contr
if (changes == null || changes.length == 0) {
return changes;
}
changes[0] = getCreateViewChangeDatabricks(getExtendedProperties(missingObject.getAttribute("tblProperties", String.class)), changes);
changes[0] = getCreateViewChangeDatabricks(getFilteredTblProperties(missingObject.getAttribute("tblProperties", String.class)), changes);
return changes;
}

Expand Down
Loading

0 comments on commit b48a45f

Please sign in to comment.