diff --git a/.gitignore b/.gitignore index eb83234..c738485 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ # other stuff .idea +target/* */target/* *.iml diff --git a/delta-sharing-core/pom.xml b/delta-sharing-core/pom.xml new file mode 100644 index 0000000..eedbe86 --- /dev/null +++ b/delta-sharing-core/pom.xml @@ -0,0 +1,82 @@ + + + 4.0.0 + + delta-sharing-core + + + com.spark-engine + spark-engine-parent + ${revision} + + + Spark Engine Delta Sharing Core Module + https://www.spark-engine.com + + + + org.projectlombok + lombok + + + com.google.code.findbugs + jsr305 + + + org.apache.spark + spark-sql_${scala.binary.version} + compile + + + io.delta + delta-core_${scala.binary.version} + + + + + com.spark-engine + spark-test + + + + + + + net.alchim31.maven + scala-maven-plugin + + + scala-compile-first + process-resources + + add-source + compile + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + + compile + + compile + + + + + + + + \ No newline at end of file diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/model/Table.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/model/Table.java new file mode 100644 index 0000000..9cc9db8 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/model/Table.java @@ -0,0 +1,11 @@ +package sparkengine.delta.sharing.model; + +import lombok.Value; + +@Value +public class Table { + + TableName tableName; + TableMetadata tableMetadata; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/model/TableMetadata.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/model/TableMetadata.java new file mode 100644 index 0000000..eff6b39 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/model/TableMetadata.java @@ -0,0 +1,13 @@ +package sparkengine.delta.sharing.model; + +import lombok.Value; + +import javax.annotation.Nonnull; + +@Value +public class TableMetadata { + + @Nonnull + String location; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/model/TableName.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/model/TableName.java new file mode 100644 index 0000000..f607627 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/model/TableName.java @@ -0,0 +1,17 @@ +package sparkengine.delta.sharing.model; + +import lombok.Value; + +import javax.annotation.Nonnull; + +@Value +public class TableName { + + @Nonnull + String share; + @Nonnull + String schema; + @Nonnull + String table; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/File.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/File.java new file mode 100644 index 0000000..a035f33 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/File.java @@ -0,0 +1,27 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Map; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = File.FileBuilder.class) +public class File { + + @Nonnull + String url; + @Nonnull + String id; + @Nonnull + Map partitionValues; + @Nonnull + long size; + @Nullable + String stats; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Format.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Format.java new file mode 100644 index 0000000..dac8ee5 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Format.java @@ -0,0 +1,22 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nonnull; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = Format.FormatBuilder.class) +public class Format { + + enum Provider { + parquet + } + + @Nonnull + @Builder.Default + Provider provider = Provider.parquet; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Metadata.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Metadata.java new file mode 100644 index 0000000..e8db129 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Metadata.java @@ -0,0 +1,29 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.List; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = Metadata.MetadataBuilder.class) +public class Metadata { + + @Nonnull + String id; + @Nullable + String name; + @Nullable + String description; + @Nonnull + Format format; + @Nonnull + String schemaString; + @Nonnull + List partitionColumns; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Protocol.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Protocol.java new file mode 100644 index 0000000..1c85319 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Protocol.java @@ -0,0 +1,17 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nonnull; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = Protocol.ProtocolBuilder.class) +public class Protocol { + + @Nonnull + int minReaderVersion; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Schema.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Schema.java new file mode 100644 index 0000000..a0f7ad5 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Schema.java @@ -0,0 +1,19 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nonnull; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = Schema.SchemaBuilder.class) +public class Schema { + + @Nonnull + String share; + @Nonnull + String name; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Schemas.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Schemas.java new file mode 100644 index 0000000..bfc48ad --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Schemas.java @@ -0,0 +1,21 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.List; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = Schemas.SchemasBuilder.class) +public class Schemas { + + @Nonnull + List items; + @Nullable + String nextPageToken; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Share.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Share.java new file mode 100644 index 0000000..755ae6b --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Share.java @@ -0,0 +1,17 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nonnull; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = Share.ShareBuilder.class) +public class Share { + + @Nonnull + String name; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Shares.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Shares.java new file mode 100644 index 0000000..01a413c --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Shares.java @@ -0,0 +1,21 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.List; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = Shares.SharesBuilder.class) +public class Shares { + + @Nonnull + List items; + @Nullable + String nextPageToken; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Table.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Table.java new file mode 100644 index 0000000..6c32376 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Table.java @@ -0,0 +1,21 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nonnull; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = Table.TableBuilder.class) +public class Table { + + @Nonnull + String share; + @Nonnull + String schema; + @Nonnull + String name; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/TableQuery.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/TableQuery.java new file mode 100644 index 0000000..15adfbd --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/TableQuery.java @@ -0,0 +1,20 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nullable; +import java.util.List; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = TableQuery.TableQueryBuilder.class) +public class TableQuery { + + @Nullable + List predicateHints; + @Nullable + Long limitHint; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Tables.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Tables.java new file mode 100644 index 0000000..2f614e9 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Tables.java @@ -0,0 +1,21 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.List; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = Tables.TablesBuilder.class) +public class Tables { + + @Nonnull + List items; + @Nullable + String nextPageToken; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Wrapper.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Wrapper.java new file mode 100644 index 0000000..6ca4ceb --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/protocol/v1/Wrapper.java @@ -0,0 +1,21 @@ +package sparkengine.delta.sharing.protocol.v1; + +import com.fasterxml.jackson.databind.annotation.JsonDeserialize; +import lombok.Builder; +import lombok.Value; + +import javax.annotation.Nullable; + +@Value +@Builder(setterPrefix = "with") +@JsonDeserialize(builder = Wrapper.WrapperBuilder.class) +public class Wrapper { + + @Nullable + Protocol protocol; + @Nullable + Metadata metaData; + @Nullable + File file; + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/utils/TableLayout.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/utils/TableLayout.java new file mode 100644 index 0000000..0d0d390 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/utils/TableLayout.java @@ -0,0 +1,101 @@ +package sparkengine.delta.sharing.utils; + +import com.google.common.hash.Hashing; +import io.delta.standalone.internal.date20210612.PartitionFilterUtils; +import lombok.AllArgsConstructor; +import lombok.SneakyThrows; +import org.apache.hadoop.fs.Path; +import org.apache.spark.sql.delta.DeltaLog; +import scala.Tuple2; +import scala.collection.JavaConverters; +import sparkengine.delta.sharing.protocol.v1.*; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.net.URI; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +@AllArgsConstructor +public class TableLayout { + + @Nonnull + DeltaLog deltaLog; + + public TableLayout update() { + deltaLog.update(false); + return this; + } + + public Stream streamTableMetadataProtocol() { + return streamTableProtocol(false, null); + } + + public Stream streamTableFilesProtocol() { + return streamTableProtocol(true, null); + } + + public long getTableVersion() { + return deltaLog.snapshot().version(); + } + + public Stream streamTableProtocol(boolean includeFiles, + @Nullable List predicateHits) { + var snapshot = deltaLog.snapshot(); + + var protocol = Protocol.builder().withMinReaderVersion(snapshot.protocol().minReaderVersion()).build(); + var metadata = Metadata.builder() + .withId(snapshot.metadata().id()) + .withName(snapshot.metadata().name()) + .withDescription(snapshot.metadata().description()) + .withFormat(Format.builder().build()) + .withSchemaString(snapshot.metadata().schemaString()) + .withPartitionColumns(new ArrayList<>(JavaConverters.asJavaCollection(snapshot.metadata().partitionColumns()))) + .build(); + + var wrappers = Stream.of( + Wrapper.builder().withProtocol(protocol).build(), + Wrapper.builder().withMetaData(metadata).build()); + + if (includeFiles) { + var addFiles = snapshot.allFiles().collectAsList(); + if (snapshot.metadata().partitionColumns().nonEmpty()) { + var newFiles = PartitionFilterUtils.evaluatePredicate( + snapshot.metadata().schemaString(), + snapshot.metadata().partitionColumns(), + JavaConverters.asScalaBuffer(Optional.ofNullable(predicateHits).orElse(List.of())), + JavaConverters.asScalaBuffer(addFiles) + ); + addFiles = JavaConverters.seqAsJavaList(newFiles); + } + + var files = addFiles.stream() + .map(addFile -> File.builder() + .withId(Hashing.md5().hashString(addFile.path(), StandardCharsets.UTF_8).toString()) + .withUrl(absolutePath(deltaLog.dataPath(), addFile.path()).toString()) + .withSize(addFile.size()) + .withPartitionValues(JavaConverters.asJavaCollection(addFile.partitionValues()).stream().collect(Collectors.toMap(Tuple2::_1, Tuple2::_2))) + .withStats(addFile.stats()) + .build()) + .map(file -> Wrapper.builder().withFile(file).build()); + + wrappers = Stream.concat(wrappers, files); + } + + return wrappers; + } + + @SneakyThrows + private Path absolutePath(Path path, String child) { + var p = new Path(new URI(child)); + if (p.isAbsolute()) { + throw new IllegalStateException("table containing absolute paths cannot be shared"); + } + return new Path(path, p); + } + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/utils/TableLayoutLoaderException.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/utils/TableLayoutLoaderException.java new file mode 100644 index 0000000..0b80ae9 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/utils/TableLayoutLoaderException.java @@ -0,0 +1,13 @@ +package sparkengine.delta.sharing.utils; + +public class TableLayoutLoaderException extends Exception { + + public TableLayoutLoaderException(String msg, Throwable t) { + super(msg, t); + } + + public TableLayoutLoaderException(String msg) { + super(msg); + } + +} diff --git a/delta-sharing-core/src/main/java/sparkengine/delta/sharing/utils/TableLayoutStore.java b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/utils/TableLayoutStore.java new file mode 100644 index 0000000..9536e94 --- /dev/null +++ b/delta-sharing-core/src/main/java/sparkengine/delta/sharing/utils/TableLayoutStore.java @@ -0,0 +1,49 @@ +package sparkengine.delta.sharing.utils; + +import com.google.common.cache.Cache; +import com.google.common.cache.CacheBuilder; +import lombok.Builder; +import lombok.Value; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.delta.DeltaLog; +import sparkengine.delta.sharing.model.Table; +import sparkengine.delta.sharing.model.TableMetadata; +import sparkengine.delta.sharing.model.TableName; + +import javax.annotation.Nonnull; +import java.util.concurrent.ExecutionException; + +@Value +@Builder +public class TableLayoutStore { + + public interface TableMetadataFetcher { + TableMetadata call(TableName tableName) throws TableLayoutLoaderException; + } + + @Nonnull + @Builder.Default + Cache sharingTablesCache = CacheBuilder.newBuilder().build(); + @Nonnull + SparkSession sparkSession; + + public TableLayout getTableLayout(TableName tableName, TableMetadataFetcher metadataFetcher) throws TableLayoutLoaderException { + try { + return sharingTablesCache.get(tableName, () -> { + var tableMetadata = metadataFetcher.call(tableName); + return new TableLayout(DeltaLog.forTable(sparkSession, tableMetadata.getLocation())); + }); + } catch (ExecutionException e) { + throw new TableLayoutLoaderException("unable to load table protocol for " + tableName, e); + } + } + + public TableLayout getTableLayout(Table table) throws TableLayoutLoaderException { + try { + return sharingTablesCache.get(table.getTableName(), () -> new TableLayout(DeltaLog.forTable(sparkSession, table.getTableMetadata().getLocation()))); + } catch (ExecutionException e) { + throw new TableLayoutLoaderException("unable to load table protocol for " + table.getTableName(), e); + } + } + +} diff --git a/delta-sharing-core/src/main/scala/io/delta/standalone/internal/date20210612/PartitionFilterUtils.scala b/delta-sharing-core/src/main/scala/io/delta/standalone/internal/date20210612/PartitionFilterUtils.scala new file mode 100644 index 0000000..f4e0ec0 --- /dev/null +++ b/delta-sharing-core/src/main/scala/io/delta/standalone/internal/date20210612/PartitionFilterUtils.scala @@ -0,0 +1,124 @@ +package io.delta.standalone.internal.date20210612 + +import org.apache.spark.sql.Encoders +import org.apache.spark.sql.catalyst.analysis.{UnresolvedAttribute, caseInsensitiveResolution} +import org.apache.spark.sql.catalyst.expressions.{And, Attribute, AttributeReference, BindReferences, Cast, EqualNullSafe, EqualTo, Expression, ExtractValue, GreaterThan, GreaterThanOrEqual, InterpretedPredicate, IsNotNull, IsNull, LessThan, LessThanOrEqual, Literal, Not} +import org.apache.spark.sql.catalyst.{CatalystTypeConverters, InternalRow} +import org.apache.spark.sql.delta.actions.AddFile +import org.apache.spark.sql.execution.SparkSqlParser +import org.apache.spark.sql.types.{DataType, StructField, StructType} +import org.slf4j.LoggerFactory + +import scala.util.Try +import scala.util.control.NonFatal + +object PartitionFilterUtils { + private val logger = LoggerFactory.getLogger(this.getClass) + + private lazy val sqlParser = new SparkSqlParser() + + def evaluatePredicate( + schemaString: String, + partitionColumns: Seq[String], + partitionFilters: Seq[String], + addFiles: Seq[AddFile]): Seq[AddFile] = { + try { + val tableSchema = DataType.fromJson(schemaString).asInstanceOf[StructType] + val partitionSchema = new StructType(partitionColumns.map(c => tableSchema(c)).toArray) + val addSchema = Encoders.product[AddFile].schema + val attrs = + addSchema.map(f => AttributeReference(f.name, f.dataType, f.nullable, f.metadata)()) + val exprs = + rewritePartitionFilters( + partitionSchema, + attrs, + partitionFilters.flatMap { f => + Try(sqlParser.parseExpression(f)).toOption + }.filter(f => isSupportedExpression(f, partitionSchema)) + ) + if (exprs.isEmpty) { + addFiles + } else { + val predicate = new InterpretedPredicate(BindReferences.bindReference(exprs.reduce(And), attrs)) + predicate.initialize(0) + addFiles.filter { addFile => + val converter = CatalystTypeConverters.createToCatalystConverter(addSchema) + predicate.eval(converter(addFile).asInstanceOf[InternalRow]) + } + } + } catch { + case NonFatal(e) => + logger.error(e.getMessage, e) + // Fail to evaluate the filters. Return all files as a fallback. + addFiles + } + } + + private def isSupportedExpression(e: Expression, partitionSchema: StructType): Boolean = { + def isPartitionColumOrConstant(e: Expression): Boolean = { + e match { + case _: Literal => true + case u: UnresolvedAttribute if u.nameParts.size == 1 => + val unquoted = u.name.stripPrefix("`").stripSuffix("`") + partitionSchema.exists(part => caseInsensitiveResolution(unquoted, part.name)) + case c: Cast => isPartitionColumOrConstant(c.child) + case _ => false + } + } + + e match { + case EqualTo(left, right) + if isPartitionColumOrConstant(left) && isPartitionColumOrConstant(right) => + true + case GreaterThan(left, right) + if isPartitionColumOrConstant(left) && isPartitionColumOrConstant(right) => + true + case LessThan(left, right) + if isPartitionColumOrConstant(left) && isPartitionColumOrConstant(right) => + true + case GreaterThanOrEqual(left, right) + if isPartitionColumOrConstant(left) && isPartitionColumOrConstant(right) => + true + case LessThanOrEqual(left, right) + if isPartitionColumOrConstant(left) && isPartitionColumOrConstant(right) => + true + case EqualNullSafe(left, right) + if isPartitionColumOrConstant(left) && isPartitionColumOrConstant(right) => + true + case IsNull(e) if isPartitionColumOrConstant(e) => + true + case IsNotNull(e) if isPartitionColumOrConstant(e) => + true + case Not(e) if isSupportedExpression(e, partitionSchema) => + true + case _ => false + } + } + + private def rewritePartitionFilters( + partitionSchema: StructType, + attrs: Seq[Attribute], + partitionFilters: Seq[Expression]): Seq[Expression] = { + val partitionValuesAttr = attrs.find(_.name == "partitionValues").head + partitionFilters.map(_.transformUp { + case a: Attribute => + // If we have a special column name, e.g. `a.a`, then an UnresolvedAttribute returns + // the column name as '`a.a`' instead of 'a.a', therefore we need to strip the backticks. + val unquoted = a.name.stripPrefix("`").stripSuffix("`") + val partitionCol = partitionSchema.find { field => field.name == unquoted } + partitionCol match { + case Some(StructField(name, dataType, _, _)) => + Cast( + ExtractValue( + partitionValuesAttr, + Literal(name), + org.apache.spark.sql.catalyst.analysis.caseInsensitiveResolution), + dataType) + case None => + // This should not be able to happen, but the case was present in the original code so + // we kept it to be safe. + UnresolvedAttribute(Seq("partitionValues", a.name)) + } + }) + } +} diff --git a/delta-sharing-core/src/main/scala/io/delta/standalone/internal/date20210612/README.md b/delta-sharing-core/src/main/scala/io/delta/standalone/internal/date20210612/README.md new file mode 100644 index 0000000..4037e96 --- /dev/null +++ b/delta-sharing-core/src/main/scala/io/delta/standalone/internal/date20210612/README.md @@ -0,0 +1 @@ +This file is an adaptation from https://raw.githubusercontent.com/delta-io/delta-sharing/main/server/src/main/scala/io/delta/standalone/internal/DeltaSharedTableLoader.scala \ No newline at end of file diff --git a/delta-sharing-core/src/test/java/sparkengine/delta/sharing/utils/TableLayoutTest.java b/delta-sharing-core/src/test/java/sparkengine/delta/sharing/utils/TableLayoutTest.java new file mode 100644 index 0000000..ccc2ab2 --- /dev/null +++ b/delta-sharing-core/src/test/java/sparkengine/delta/sharing/utils/TableLayoutTest.java @@ -0,0 +1,45 @@ +package sparkengine.delta.sharing.utils; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import sparkengine.delta.sharing.model.Table; +import sparkengine.delta.sharing.model.TableMetadata; +import sparkengine.delta.sharing.model.TableName; +import sparkengine.spark.test.SparkSessionManager; + +import java.util.List; + + +class TableLayoutTest extends SparkSessionManager { + + @Test + void testLoadAll() throws TableLayoutLoaderException { + + // given + var table = new Table(new TableName("a", "b", "c"), new TableMetadata("./src/test/resources/delta-table")); + var store = TableLayoutStore.builder().sparkSession(sparkSession).build(); + var loader = store.getTableLayout(table); + + // when + var files = loader.streamTableFilesProtocol(); + + // then + Assertions.assertEquals(16, files.count()); + } + + @Test + void testLoadWithHints() throws TableLayoutLoaderException { + + // given + var table = new Table(new TableName("a", "b", "c"), new TableMetadata("./src/test/resources/delta-table")); + var store = TableLayoutStore.builder().sparkSession(sparkSession).build(); + var loader = store.getTableLayout(table); + + // when + var files = loader.streamTableProtocol(true, List.of("key1='a'")); + + // then + Assertions.assertEquals(8, files.count()); + } + +} \ No newline at end of file diff --git a/delta-sharing-core/src/test/resources/delta-table/_delta_log/00000000000000000000.json b/delta-sharing-core/src/test/resources/delta-table/_delta_log/00000000000000000000.json new file mode 100644 index 0000000..4b80d59 --- /dev/null +++ b/delta-sharing-core/src/test/resources/delta-table/_delta_log/00000000000000000000.json @@ -0,0 +1,14 @@ +{"commitInfo":{"timestamp":1624075384057,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[\"key1\",\"key2\"]"},"isBlindAppend":false,"operationMetrics":{"numFiles":"11","numOutputBytes":"6820","numOutputRows":"11"}}} +{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} +{"metaData":{"id":"0b8c1676-0363-4eea-a4ef-146e3f634b86","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"key1\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"key2\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value1\",\"type\":\"boolean\",\"nullable\":true,\"metadata\":{}},{\"name\":\"value2\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["key1","key2"],"configuration":{},"createdTime":1624075382196}} +{"add":{"path":"key1=a/key2=@/part-00000-b63ac41f-0861-4ac0-acea-3e1d003a5892.c000.snappy.parquet","partitionValues":{"key1":"a","key2":"@"},"size":620,"modificationTime":1624075383097,"dataChange":true}} +{"add":{"path":"key1=a/key2=@/part-00001-d6817b5a-d886-490d-9776-9a3568f8eac9.c000.snappy.parquet","partitionValues":{"key1":"a","key2":"@"},"size":620,"modificationTime":1624075383097,"dataChange":true}} +{"add":{"path":"key1=a/key2=%2525/part-00002-3c28165e-8f73-41f3-bb2b-fdd73d66d00c.c000.snappy.parquet","partitionValues":{"key1":"a","key2":"%"},"size":620,"modificationTime":1624075383097,"dataChange":true}} +{"add":{"path":"key1=a/key2=%2525/part-00003-3f490d2a-2f11-49e3-b8ad-25ecb1b50b8f.c000.snappy.parquet","partitionValues":{"key1":"a","key2":"%"},"size":620,"modificationTime":1624075383097,"dataChange":true}} +{"add":{"path":"key1=a/key2=%2525/part-00004-cbd94e6f-e608-42e8-b6f3-2d392d088b4d.c000.snappy.parquet","partitionValues":{"key1":"a","key2":"%"},"size":620,"modificationTime":1624075383097,"dataChange":true}} +{"add":{"path":"key1=b/key2=@/part-00005-2a330d6c-916c-47b8-86b4-4a8bd86ca26e.c000.snappy.parquet","partitionValues":{"key1":"b","key2":"@"},"size":620,"modificationTime":1624075383097,"dataChange":true}} +{"add":{"path":"key1=b/key2=%2525/part-00006-dc953cc0-705c-4a41-a9c2-cf1a1f1391df.c000.snappy.parquet","partitionValues":{"key1":"b","key2":"%"},"size":620,"modificationTime":1624075383097,"dataChange":true}} +{"add":{"path":"key1=b/key2=%2525/part-00007-b67a04d8-5ce6-4bc1-a977-d74463fd78ec.c000.snappy.parquet","partitionValues":{"key1":"b","key2":"%"},"size":620,"modificationTime":1624075383097,"dataChange":true}} +{"add":{"path":"key1=b/key2=%2525/part-00008-f5d8cc96-60f5-41b3-8497-22aa1c3bedde.c000.snappy.parquet","partitionValues":{"key1":"b","key2":"%"},"size":620,"modificationTime":1624075383097,"dataChange":true}} +{"add":{"path":"key1=c/key2=%2525/part-00009-d17cd0af-67cf-4b5c-85ae-2e50ff7b91c1.c000.snappy.parquet","partitionValues":{"key1":"c","key2":"%"},"size":620,"modificationTime":1624075383097,"dataChange":true}} +{"add":{"path":"key1=c/key2=%2525/part-00010-fe1edb94-d52e-4f78-ac55-d3ab3dffcafa.c000.snappy.parquet","partitionValues":{"key1":"c","key2":"%"},"size":620,"modificationTime":1624075383097,"dataChange":true}} diff --git a/delta-sharing-core/src/test/resources/delta-table/_delta_log/00000000000000000001.json b/delta-sharing-core/src/test/resources/delta-table/_delta_log/00000000000000000001.json new file mode 100644 index 0000000..276f029 --- /dev/null +++ b/delta-sharing-core/src/test/resources/delta-table/_delta_log/00000000000000000001.json @@ -0,0 +1,3 @@ +{"commitInfo":{"timestamp":1624076224879,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[\"key1\",\"key2\"]"},"readVersion":0,"isBlindAppend":true,"operationMetrics":{"numFiles":"2","numOutputBytes":"1240","numOutputRows":"2"}}} +{"add":{"path":"key1=a/key2=%2525/part-00000-f95879c8-8106-49f4-be7d-751348402e41.c000.snappy.parquet","partitionValues":{"key1":"a","key2":"%"},"size":620,"modificationTime":1624076224872,"dataChange":true}} +{"add":{"path":"key1=c/key2=%2525/part-00001-7c06f3af-cc55-497c-8055-6727af286d98.c000.snappy.parquet","partitionValues":{"key1":"c","key2":"%"},"size":620,"modificationTime":1624076224872,"dataChange":true}} diff --git a/delta-sharing-core/src/test/resources/delta-table/_delta_log/00000000000000000002.json b/delta-sharing-core/src/test/resources/delta-table/_delta_log/00000000000000000002.json new file mode 100644 index 0000000..014541d --- /dev/null +++ b/delta-sharing-core/src/test/resources/delta-table/_delta_log/00000000000000000002.json @@ -0,0 +1,4 @@ +{"commitInfo":{"timestamp":1624076578450,"operation":"WRITE","operationParameters":{"mode":"Overwrite","partitionBy":"[]","predicate":"key1 = 'b' AND key2 = '@'"},"readVersion":1,"isBlindAppend":false,"operationMetrics":{"numFiles":"2","numOutputBytes":"1240","numOutputRows":"2"}}} +{"add":{"path":"key1=b/key2=@/part-00000-cdad80c4-7ed3-4280-b6d2-3d967a357d82.c000.snappy.parquet","partitionValues":{"key1":"b","key2":"@"},"size":620,"modificationTime":1624076578180,"dataChange":true}} +{"add":{"path":"key1=b/key2=@/part-00001-ab90884a-5fbd-4eca-a35d-97117e8e9e7f.c000.snappy.parquet","partitionValues":{"key1":"b","key2":"@"},"size":620,"modificationTime":1624076578180,"dataChange":true}} +{"remove":{"path":"key1=b/key2=@/part-00005-2a330d6c-916c-47b8-86b4-4a8bd86ca26e.c000.snappy.parquet","deletionTimestamp":1624076578449,"dataChange":true,"extendedFileMetadata":true,"partitionValues":{"key1":"b","key2":"@"},"size":620}} diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00000-f95879c8-8106-49f4-be7d-751348402e41.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00000-f95879c8-8106-49f4-be7d-751348402e41.c000.snappy.parquet.crc new file mode 100644 index 0000000..dc73694 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00000-f95879c8-8106-49f4-be7d-751348402e41.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00002-3c28165e-8f73-41f3-bb2b-fdd73d66d00c.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00002-3c28165e-8f73-41f3-bb2b-fdd73d66d00c.c000.snappy.parquet.crc new file mode 100644 index 0000000..7a0d2d3 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00002-3c28165e-8f73-41f3-bb2b-fdd73d66d00c.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00003-3f490d2a-2f11-49e3-b8ad-25ecb1b50b8f.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00003-3f490d2a-2f11-49e3-b8ad-25ecb1b50b8f.c000.snappy.parquet.crc new file mode 100644 index 0000000..bca2cac Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00003-3f490d2a-2f11-49e3-b8ad-25ecb1b50b8f.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00004-cbd94e6f-e608-42e8-b6f3-2d392d088b4d.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00004-cbd94e6f-e608-42e8-b6f3-2d392d088b4d.c000.snappy.parquet.crc new file mode 100644 index 0000000..bca2cac Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/.part-00004-cbd94e6f-e608-42e8-b6f3-2d392d088b4d.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00000-f95879c8-8106-49f4-be7d-751348402e41.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00000-f95879c8-8106-49f4-be7d-751348402e41.c000.snappy.parquet new file mode 100644 index 0000000..1d33e17 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00000-f95879c8-8106-49f4-be7d-751348402e41.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00002-3c28165e-8f73-41f3-bb2b-fdd73d66d00c.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00002-3c28165e-8f73-41f3-bb2b-fdd73d66d00c.c000.snappy.parquet new file mode 100644 index 0000000..7c0f265 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00002-3c28165e-8f73-41f3-bb2b-fdd73d66d00c.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00003-3f490d2a-2f11-49e3-b8ad-25ecb1b50b8f.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00003-3f490d2a-2f11-49e3-b8ad-25ecb1b50b8f.c000.snappy.parquet new file mode 100644 index 0000000..1742428 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00003-3f490d2a-2f11-49e3-b8ad-25ecb1b50b8f.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00004-cbd94e6f-e608-42e8-b6f3-2d392d088b4d.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00004-cbd94e6f-e608-42e8-b6f3-2d392d088b4d.c000.snappy.parquet new file mode 100644 index 0000000..1742428 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=%25/part-00004-cbd94e6f-e608-42e8-b6f3-2d392d088b4d.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/.part-00000-b63ac41f-0861-4ac0-acea-3e1d003a5892.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/.part-00000-b63ac41f-0861-4ac0-acea-3e1d003a5892.c000.snappy.parquet.crc new file mode 100644 index 0000000..bca2cac Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/.part-00000-b63ac41f-0861-4ac0-acea-3e1d003a5892.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/.part-00001-d6817b5a-d886-490d-9776-9a3568f8eac9.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/.part-00001-d6817b5a-d886-490d-9776-9a3568f8eac9.c000.snappy.parquet.crc new file mode 100644 index 0000000..f26e956 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/.part-00001-d6817b5a-d886-490d-9776-9a3568f8eac9.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/part-00000-b63ac41f-0861-4ac0-acea-3e1d003a5892.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/part-00000-b63ac41f-0861-4ac0-acea-3e1d003a5892.c000.snappy.parquet new file mode 100644 index 0000000..1742428 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/part-00000-b63ac41f-0861-4ac0-acea-3e1d003a5892.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/part-00001-d6817b5a-d886-490d-9776-9a3568f8eac9.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/part-00001-d6817b5a-d886-490d-9776-9a3568f8eac9.c000.snappy.parquet new file mode 100644 index 0000000..2126615 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=a/key2=@/part-00001-d6817b5a-d886-490d-9776-9a3568f8eac9.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/.part-00006-dc953cc0-705c-4a41-a9c2-cf1a1f1391df.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/.part-00006-dc953cc0-705c-4a41-a9c2-cf1a1f1391df.c000.snappy.parquet.crc new file mode 100644 index 0000000..0b65d66 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/.part-00006-dc953cc0-705c-4a41-a9c2-cf1a1f1391df.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/.part-00007-b67a04d8-5ce6-4bc1-a977-d74463fd78ec.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/.part-00007-b67a04d8-5ce6-4bc1-a977-d74463fd78ec.c000.snappy.parquet.crc new file mode 100644 index 0000000..05d73cc Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/.part-00007-b67a04d8-5ce6-4bc1-a977-d74463fd78ec.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/.part-00008-f5d8cc96-60f5-41b3-8497-22aa1c3bedde.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/.part-00008-f5d8cc96-60f5-41b3-8497-22aa1c3bedde.c000.snappy.parquet.crc new file mode 100644 index 0000000..ac9400f Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/.part-00008-f5d8cc96-60f5-41b3-8497-22aa1c3bedde.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/part-00006-dc953cc0-705c-4a41-a9c2-cf1a1f1391df.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/part-00006-dc953cc0-705c-4a41-a9c2-cf1a1f1391df.c000.snappy.parquet new file mode 100644 index 0000000..8dbdbd1 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/part-00006-dc953cc0-705c-4a41-a9c2-cf1a1f1391df.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/part-00007-b67a04d8-5ce6-4bc1-a977-d74463fd78ec.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/part-00007-b67a04d8-5ce6-4bc1-a977-d74463fd78ec.c000.snappy.parquet new file mode 100644 index 0000000..ed029d2 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/part-00007-b67a04d8-5ce6-4bc1-a977-d74463fd78ec.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/part-00008-f5d8cc96-60f5-41b3-8497-22aa1c3bedde.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/part-00008-f5d8cc96-60f5-41b3-8497-22aa1c3bedde.c000.snappy.parquet new file mode 100644 index 0000000..7733085 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=%25/part-00008-f5d8cc96-60f5-41b3-8497-22aa1c3bedde.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/.part-00000-cdad80c4-7ed3-4280-b6d2-3d967a357d82.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/.part-00000-cdad80c4-7ed3-4280-b6d2-3d967a357d82.c000.snappy.parquet.crc new file mode 100644 index 0000000..4b563cb Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/.part-00000-cdad80c4-7ed3-4280-b6d2-3d967a357d82.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/.part-00001-ab90884a-5fbd-4eca-a35d-97117e8e9e7f.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/.part-00001-ab90884a-5fbd-4eca-a35d-97117e8e9e7f.c000.snappy.parquet.crc new file mode 100644 index 0000000..a1e4923 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/.part-00001-ab90884a-5fbd-4eca-a35d-97117e8e9e7f.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/.part-00005-2a330d6c-916c-47b8-86b4-4a8bd86ca26e.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/.part-00005-2a330d6c-916c-47b8-86b4-4a8bd86ca26e.c000.snappy.parquet.crc new file mode 100644 index 0000000..d47d91d Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/.part-00005-2a330d6c-916c-47b8-86b4-4a8bd86ca26e.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/part-00000-cdad80c4-7ed3-4280-b6d2-3d967a357d82.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/part-00000-cdad80c4-7ed3-4280-b6d2-3d967a357d82.c000.snappy.parquet new file mode 100644 index 0000000..193a2e6 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/part-00000-cdad80c4-7ed3-4280-b6d2-3d967a357d82.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/part-00001-ab90884a-5fbd-4eca-a35d-97117e8e9e7f.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/part-00001-ab90884a-5fbd-4eca-a35d-97117e8e9e7f.c000.snappy.parquet new file mode 100644 index 0000000..2d27aa6 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/part-00001-ab90884a-5fbd-4eca-a35d-97117e8e9e7f.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/part-00005-2a330d6c-916c-47b8-86b4-4a8bd86ca26e.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/part-00005-2a330d6c-916c-47b8-86b4-4a8bd86ca26e.c000.snappy.parquet new file mode 100644 index 0000000..323aeae Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=b/key2=@/part-00005-2a330d6c-916c-47b8-86b4-4a8bd86ca26e.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/.part-00001-7c06f3af-cc55-497c-8055-6727af286d98.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/.part-00001-7c06f3af-cc55-497c-8055-6727af286d98.c000.snappy.parquet.crc new file mode 100644 index 0000000..9b12823 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/.part-00001-7c06f3af-cc55-497c-8055-6727af286d98.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/.part-00009-d17cd0af-67cf-4b5c-85ae-2e50ff7b91c1.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/.part-00009-d17cd0af-67cf-4b5c-85ae-2e50ff7b91c1.c000.snappy.parquet.crc new file mode 100644 index 0000000..2d2179b Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/.part-00009-d17cd0af-67cf-4b5c-85ae-2e50ff7b91c1.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/.part-00010-fe1edb94-d52e-4f78-ac55-d3ab3dffcafa.c000.snappy.parquet.crc b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/.part-00010-fe1edb94-d52e-4f78-ac55-d3ab3dffcafa.c000.snappy.parquet.crc new file mode 100644 index 0000000..15e6fb8 Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/.part-00010-fe1edb94-d52e-4f78-ac55-d3ab3dffcafa.c000.snappy.parquet.crc differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/part-00001-7c06f3af-cc55-497c-8055-6727af286d98.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/part-00001-7c06f3af-cc55-497c-8055-6727af286d98.c000.snappy.parquet new file mode 100644 index 0000000..37d42ae Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/part-00001-7c06f3af-cc55-497c-8055-6727af286d98.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/part-00009-d17cd0af-67cf-4b5c-85ae-2e50ff7b91c1.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/part-00009-d17cd0af-67cf-4b5c-85ae-2e50ff7b91c1.c000.snappy.parquet new file mode 100644 index 0000000..727d7ac Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/part-00009-d17cd0af-67cf-4b5c-85ae-2e50ff7b91c1.c000.snappy.parquet differ diff --git a/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/part-00010-fe1edb94-d52e-4f78-ac55-d3ab3dffcafa.c000.snappy.parquet b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/part-00010-fe1edb94-d52e-4f78-ac55-d3ab3dffcafa.c000.snappy.parquet new file mode 100644 index 0000000..cb9d52e Binary files /dev/null and b/delta-sharing-core/src/test/resources/delta-table/key1=c/key2=%25/part-00010-fe1edb94-d52e-4f78-ac55-d3ab3dffcafa.c000.snappy.parquet differ diff --git a/delta-sharing-web/pom.xml b/delta-sharing-web/pom.xml new file mode 100644 index 0000000..73d50a9 --- /dev/null +++ b/delta-sharing-web/pom.xml @@ -0,0 +1,196 @@ + + + 4.0.0 + + delta-sharing-web + + + com.spark-engine + spark-engine-parent + ${revision} + + + Spark Engine Delta Sharing Web Module + https://www.spark-engine.com + + + db/hsqldb/migration + jdbc:hsqldb:file:target/codegen-jooq/db/hsqldb;shutdown=true + 3.0.14 + 2.12.3 + + + + + + org.springdoc + springdoc-openapi-ui + ${springdoc.version} + + + org.springframework.boot + spring-boot-dependencies + ${spring-boot.version} + pom + import + + + + + + + + org.projectlombok + lombok + + + com.spark-engine + delta-sharing-core + + + org.slf4j + slf4j-log4j12 + + + com.fasterxml.jackson.core + jackson-databind + + + org.codehaus.janino + janino + + + + + com.fasterxml.jackson.core + jackson-databind + ${jackson.override.version} + + + org.codehaus.janino + janino + ${janino.override.version} + + + + + org.springframework.boot + spring-boot-starter-web + + + org.springdoc + springdoc-openapi-ui + + + + + org.springframework.boot + spring-boot-starter-jooq + + + org.springframework.boot + spring-boot-starter-data-jdbc + + + org.flywaydb + flyway-core + + + org.hsqldb + hsqldb + runtime + + + + + org.springframework.boot + spring-boot-starter-test + test + + + + + + + + org.springframework.boot + spring-boot-maven-plugin + + + + org.flywaydb + flyway-maven-plugin + ${flyway-maven-plugin.version} + + ${codegen.db.hsqldb.url} + sa + true + + filesystem:${project.basedir}/src/main/resources/${codegen.db.hsqldb.migration} + + + + + + + generate-sources + + migrate + + + + + + + org.hsqldb + hsqldb + ${hsqldb.version} + + + + + + org.jooq + jooq-codegen-maven + + + generate-sources + + generate + + + + + + ${codegen.db.hsqldb.url} + sa + + + + .* + + FLYWAY_SCHEMA_HISTORY + + PUBLIC + + + sparkengine.delta.sharing.db.hsqldb.jooq + target/generated-sources/db/hsqldb.jooq + + + + + + org.hsqldb + hsqldb + ${hsqldb.version} + + + + + + + + \ No newline at end of file diff --git a/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/Application.java b/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/Application.java new file mode 100644 index 0000000..aeb5fb8 --- /dev/null +++ b/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/Application.java @@ -0,0 +1,32 @@ +package sparkengine.delta.sharing.web; + +import org.apache.spark.sql.SparkSession; +import org.springframework.boot.SpringApplication; +import org.springframework.boot.autoconfigure.SpringBootApplication; +import org.springframework.context.annotation.Bean; +import sparkengine.delta.sharing.utils.TableLayoutStore; + +@SpringBootApplication +public class Application { + + public static void main(String[] args) { + SpringApplication.run(Application.class, args); + } + + + @Bean + public SparkSession sparkSession(SparkConfig sparkConfig) { + return SparkSession.builder() + .appName(sparkConfig.getName()) + .master(sparkConfig.getMaster()) + .config("spark.driver.memory", sparkConfig.getMemory()) + .getOrCreate(); + } + + @Bean + TableLayoutStore tableLayoutStore(SparkSession sparkSession) { + return TableLayoutStore.builder().sparkSession(sparkSession).build(); + } + +} + diff --git a/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/DeltaSharingController.java b/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/DeltaSharingController.java new file mode 100644 index 0000000..b02bbea --- /dev/null +++ b/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/DeltaSharingController.java @@ -0,0 +1,181 @@ +package sparkengine.delta.sharing.web; + +import lombok.AllArgsConstructor; +import lombok.Value; +import org.apache.commons.net.util.Base64; +import org.codehaus.jackson.map.DeserializationConfig; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.map.annotate.JsonSerialize; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.http.MediaType; +import org.springframework.http.ResponseEntity; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter; +import sparkengine.delta.sharing.model.TableName; +import sparkengine.delta.sharing.protocol.v1.*; +import sparkengine.delta.sharing.utils.TableLayoutLoaderException; +import sparkengine.delta.sharing.utils.TableLayoutStore; +import sparkengine.delta.sharing.web.store.ConfigRepository; +import sparkengine.delta.sharing.web.store.Page; + +import javax.annotation.Nonnull; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.Optional; +import java.util.stream.Collectors; + +@RestController +@RequestMapping("/delta-sharing/v1") +@Value +@AllArgsConstructor(onConstructor_ = {@Autowired}) +public class DeltaSharingController { + + public static final String DELTA_TABLE_VERSION = "Delta-Table-Version"; + @Autowired + ConfigRepository configRepository; + + @Autowired + TableLayoutStore tableLayoutStore; + + private static final ObjectMapper OBJECT_MAPPER_DNJSON; + + static { + OBJECT_MAPPER_DNJSON = new ObjectMapper(); + OBJECT_MAPPER_DNJSON.setSerializationInclusion(JsonSerialize.Inclusion.NON_NULL); + OBJECT_MAPPER_DNJSON.configure(DeserializationConfig.Feature.FAIL_ON_UNKNOWN_PROPERTIES, false); + } + + @GetMapping("/shares") + Shares getShares(@RequestParam(value = "maxResults", defaultValue = "500") int maxResults, + @RequestParam(value = "nextPageToken") Optional nextPageToken) { + + var page = getPage(maxResults, nextPageToken); + var items = configRepository + .streamShareNames(Optional.of(page)) + .map(name -> Share.builder().withName(name).build()) + .collect(Collectors.toList()); + var nextPage = items.size() < page.getSize() ? null : pageToToken(page); + return Shares.builder().withItems(items).withNextPageToken(nextPage).build(); + } + + @GetMapping("/shares/{share}/schemas") + Schemas getSchemas( + @PathVariable("share") String shareName, + @RequestParam(value = "maxResults", defaultValue = "500") int maxResults, + @RequestParam(value = "nextPageToken") Optional nextPageToken) { + + var page = getPage(maxResults, nextPageToken); + + var items = configRepository + .streamSchemaNames(shareName, Optional.of(page)) + .map(name -> Schema.builder().withShare(shareName).withName(name).build()) + .collect(Collectors.toList()); + var nextPage = items.size() < page.getSize() ? null : pageToToken(page); + return Schemas.builder().withItems(items).withNextPageToken(nextPage).build(); + } + + @GetMapping("/shares/{share}/schemas/{schema}/tables") + Tables getTables( + @PathVariable("share") String shareName, + @PathVariable("schema") String schemaName, + @RequestParam(value = "maxResults", defaultValue = "500") int maxResults, + @RequestParam(value = "nextPageToken") Optional nextPageToken) { + + var page = getPage(maxResults, nextPageToken); + + var items = configRepository + .streamTableNames(shareName, schemaName, Optional.of(page)) + .map(name -> Table.builder().withShare(shareName).withSchema(schemaName).withName(name).build()) + .collect(Collectors.toList()); + var nextPage = items.size() < page.getSize() ? null : pageToToken(page); + return Tables.builder().withItems(items).withNextPageToken(nextPage).build(); + } + + @RequestMapping(method = RequestMethod.HEAD, path = "/shares/{share}/schemas/{schema}/tables/{table}") + public ResponseEntity getTableVersion(@PathVariable("share") String shareName, + @PathVariable("schema") String schemaName, + @PathVariable("table") String tableName) throws TableLayoutLoaderException { + + + var tableLayout = tableLayoutStore.getTableLayout(new TableName(shareName, schemaName, tableName), configRepository::getTableMetadata); + + return ResponseEntity.ok().header(DELTA_TABLE_VERSION, Long.toString(tableLayout.getTableVersion())).build(); + } + + @GetMapping(value = "/shares/{share}/schemas/{schema}/tables/{table}/metadata", produces = MediaType.APPLICATION_NDJSON_VALUE) + public ResponseEntity getTableMetadata(@PathVariable("share") String shareName, + @PathVariable("schema") String schemaName, + @PathVariable("table") String tableName) throws TableLayoutLoaderException, IOException { + + var tableLayout = tableLayoutStore.getTableLayout(new TableName(shareName, schemaName, tableName), configRepository::getTableMetadata); + + ResponseBodyEmitter responseBodyEmitter = new ResponseBodyEmitter(); + try { + for (var wrapper : tableLayout.streamTableMetadataProtocol().collect(Collectors.toList())) { + responseBodyEmitter.send(OBJECT_MAPPER_DNJSON.writeValueAsString(wrapper)); + responseBodyEmitter.send("\n"); + } + } finally { + responseBodyEmitter.complete(); + } + + return ResponseEntity.ok() + .header(DELTA_TABLE_VERSION, Long.toString(tableLayout.getTableVersion())) + .contentType(MediaType.APPLICATION_NDJSON) + .body(responseBodyEmitter); + } + + @PostMapping(value = "/shares/{share}/schemas/{schema}/tables/{table}/query", + consumes = MediaType.APPLICATION_JSON_VALUE, + produces = MediaType.APPLICATION_NDJSON_VALUE) + public ResponseEntity getTableQuery( + @RequestBody TableQuery tableQuery, + @PathVariable("share") String shareName, + @PathVariable("schema") String schemaName, + @PathVariable("table") String tableName) throws TableLayoutLoaderException, IOException { + + var tableLayout = tableLayoutStore.getTableLayout(new TableName(shareName, schemaName, tableName), configRepository::getTableMetadata); + + ResponseBodyEmitter responseBodyEmitter = new ResponseBodyEmitter(); + try { + for (var wrapper : tableLayout.streamTableProtocol(true, tableQuery.getPredicateHints()).collect(Collectors.toList())) { + responseBodyEmitter.send(OBJECT_MAPPER_DNJSON.writeValueAsString(wrapper)); + responseBodyEmitter.send("\n"); + } + } finally { + responseBodyEmitter.complete(); + } + + return ResponseEntity.ok() + .header(DELTA_TABLE_VERSION, Long.toString(tableLayout.getTableVersion())) + .contentType(MediaType.APPLICATION_NDJSON) + .body(responseBodyEmitter); + } + + @Nonnull + private static Page getPage(int maxResults, Optional nextPageToken) { + + var page = new Page(tokenToStart(nextPageToken), maxResults); + + if (page.getSize() <= 0) { + throw new IllegalArgumentException("request size is negative or zero"); + } + if (page.getStart() < 0) { + throw new IllegalArgumentException("start offset is negative"); + } + return page; + } + + private static int tokenToStart(Optional nextPageToken) { + return nextPageToken + .map(token -> new String(Base64.decodeBase64(token), StandardCharsets.UTF_8)) + .map(Integer::parseInt) + .orElse(0); + } + + @Nonnull + private static String pageToToken(Page page) { + return Base64.encodeBase64String(Integer.toString(page.getStart() + page.getSize()).getBytes(StandardCharsets.UTF_8)); + } + +} \ No newline at end of file diff --git a/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/SparkConfig.java b/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/SparkConfig.java new file mode 100644 index 0000000..5450b50 --- /dev/null +++ b/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/SparkConfig.java @@ -0,0 +1,16 @@ +package sparkengine.delta.sharing.web; + +import lombok.Data; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.stereotype.Component; + +@Data +@Component +@ConfigurationProperties("spark") +public class SparkConfig { + + String name; + String master; + String memory; + +} diff --git a/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/store/ConfigRepository.java b/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/store/ConfigRepository.java new file mode 100644 index 0000000..bd3556e --- /dev/null +++ b/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/store/ConfigRepository.java @@ -0,0 +1,107 @@ +package sparkengine.delta.sharing.web.store; + +import org.jooq.DSLContext; +import org.jooq.ResultQuery; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.stereotype.Repository; +import sparkengine.delta.sharing.db.hsqldb.jooq.tables.records.SchemaConfigRecord; +import sparkengine.delta.sharing.db.hsqldb.jooq.tables.records.ShareConfigRecord; +import sparkengine.delta.sharing.db.hsqldb.jooq.tables.records.TableConfigRecord; +import sparkengine.delta.sharing.model.TableMetadata; +import sparkengine.delta.sharing.model.TableName; + +import java.util.Optional; +import java.util.stream.Stream; + +import static sparkengine.delta.sharing.db.hsqldb.jooq.Tables.*; + +@Repository +public class ConfigRepository { + + private final DSLContext dsl; + + @Autowired + public ConfigRepository(DSLContext dsl) { + this.dsl = dsl; + } + + public boolean isShare(String shareName) { + var query = dsl + .select(SHARE_CONFIG.NAME) + .from(SHARE_CONFIG) + .where(SHARE_CONFIG.NAME.eq(shareName)); + return dsl.fetchExists(query); + } + + public boolean addShare(String shareName) { + return dsl.insertInto(SHARE_CONFIG).columns(SHARE_CONFIG.NAME).values(shareName).execute() == 1; + } + + public Stream streamShareNames(Optional page) { + + var baseQuery = dsl.selectFrom(SHARE_CONFIG).orderBy(SHARE_CONFIG.NAME); + var query = page + .map(p -> (ResultQuery)baseQuery.offset(p.getStart()).limit(p.getSize())) + .orElse(baseQuery); + + return dsl.fetchStream(query).map(ShareConfigRecord::getName); + } + + public boolean isSchema(String shareName, String schemaName) { + var query = dsl + .select(SCHEMA_CONFIG.NAME) + .from(SCHEMA_CONFIG) + .where(SCHEMA_CONFIG.SHARE_NAME.eq(shareName).and(SCHEMA_CONFIG.NAME.eq(schemaName))); + return dsl.fetchExists(query); + } + + public boolean addSchema(String shareName, String schemaName) { + return dsl.insertInto(SCHEMA_CONFIG).columns(SCHEMA_CONFIG.SHARE_NAME, SCHEMA_CONFIG.NAME).values(shareName, schemaName).execute() == 1; + } + + public Stream streamSchemaNames(String shareName, Optional page) { + var baseQuery = dsl + .selectFrom(SCHEMA_CONFIG) + .where(SCHEMA_CONFIG.SHARE_NAME.eq(shareName)) + .orderBy(SCHEMA_CONFIG.NAME); + var query = page + .map(p -> (ResultQuery)baseQuery.offset(p.getStart()).limit(p.getSize())) + .orElse(baseQuery); + return dsl.fetchStream(query).map(SchemaConfigRecord::getName); + } + + public boolean isTable(String shareName, String schemaName, String tableName) { + var query = dsl + .select(TABLE_CONFIG.NAME) + .from(TABLE_CONFIG) + .where(TABLE_CONFIG.SHARE_NAME.eq(shareName) + .and(TABLE_CONFIG.SCHEMA_NAME.eq(schemaName) + .and(TABLE_CONFIG.NAME.eq(tableName)))); + return dsl.fetchExists(query); + } + + public TableMetadata getTableMetadata(TableName tableName) { + var condition = TABLE_CONFIG.SHARE_NAME.eq(tableName.getShare()) + .and(TABLE_CONFIG.SCHEMA_NAME.eq(tableName.getSchema()) + .and(TABLE_CONFIG.NAME.eq(tableName.getTable()))); + var data = dsl.fetchOne(TABLE_CONFIG, condition); + + return new TableMetadata(data.getLocation()); + } + + public boolean addTable(String shareName, String schemaName, String tableName) { + return dsl.insertInto(TABLE_CONFIG).columns(TABLE_CONFIG.SHARE_NAME, TABLE_CONFIG.SCHEMA_NAME, TABLE_CONFIG.NAME).values(shareName, schemaName, tableName).execute() == 1; + } + + public Stream streamTableNames(String shareName, String schemaName, Optional page) { + var baseQuery = dsl + .selectFrom(TABLE_CONFIG) + .where(TABLE_CONFIG.SHARE_NAME.eq(shareName).and(TABLE_CONFIG.SCHEMA_NAME.eq(schemaName))) + .orderBy(TABLE_CONFIG.NAME); + var query = page + .map(p -> (ResultQuery)baseQuery.offset(p.getStart()).limit(p.getSize())) + .orElse(baseQuery); + return dsl.fetchStream(query).map(TableConfigRecord::getName); + } + +} diff --git a/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/store/Page.java b/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/store/Page.java new file mode 100644 index 0000000..142c198 --- /dev/null +++ b/delta-sharing-web/src/main/java/sparkengine/delta/sharing/web/store/Page.java @@ -0,0 +1,11 @@ +package sparkengine.delta.sharing.web.store; + +import lombok.Value; + +@Value +public class Page { + + int start; + int size; + +} diff --git a/delta-sharing-web/src/main/resources/application.yml b/delta-sharing-web/src/main/resources/application.yml new file mode 100644 index 0000000..70108c8 --- /dev/null +++ b/delta-sharing-web/src/main/resources/application.yml @@ -0,0 +1,13 @@ +spring.flyway.locations: classpath:db/hsqldb/migration +springdoc.api-docs.path: /api-docs +springdoc.swagger-ui.path: /api.html + +spark: + name: "app" + master: "local[4]" + memory: "1G" + +--- +spring.config.activate.on-profile: test +spring.datasource.url: jdbc:hsqldb:mem:devDb +spring.datasource.driver-class-name: org.hsqldb.jdbc.JDBCDriver \ No newline at end of file diff --git a/delta-sharing-web/src/main/resources/db/hsqldb/migration/V1__INIT.sql b/delta-sharing-web/src/main/resources/db/hsqldb/migration/V1__INIT.sql new file mode 100644 index 0000000..22a2ac7 --- /dev/null +++ b/delta-sharing-web/src/main/resources/db/hsqldb/migration/V1__INIT.sql @@ -0,0 +1,23 @@ +CREATE TABLE SHARE_CONFIG ( + NAME varchar(1024) NOT NULL PRIMARY KEY +); + +CREATE TABLE SCHEMA_CONFIG ( + SHARE_NAME varchar(1024) NOT NULL, + NAME varchar(1024) NOT NULL, + CONSTRAINT schemaConfigPrimaryKey PRIMARY KEY(SHARE_NAME, NAME), + FOREIGN KEY (SHARE_NAME) REFERENCES SHARE_CONFIG(NAME) +); + +CREATE TABLE TABLE_CONFIG ( + SHARE_NAME varchar(1024) NOT NULL, + SCHEMA_NAME varchar(1024) NOT NULL, + NAME varchar(1024) NOT NULL, + LOCATION varchar(4096) NOT NULL, + CONSTRAINT tableConfigPrimaryKey PRIMARY KEY(SHARE_NAME, SCHEMA_NAME, NAME), + FOREIGN KEY (SHARE_NAME, SCHEMA_NAME) REFERENCES SCHEMA_CONFIG(SHARE_NAME, NAME) +); + +INSERT INTO SHARE_CONFIG(NAME) VALUES('test'); +INSERT INTO SCHEMA_CONFIG(SHARE_NAME, NAME) VALUES('test', 'test'); +INSERT INTO TABLE_CONFIG(SHARE_NAME, SCHEMA_NAME, NAME, LOCATION) VALUES('test', 'test', 'test', '/mnt/data/datasets/wiki/lake'); diff --git a/delta-sharing-web/src/test/java/sparkengine/delta/sharing/web/store/ConfigRepositoryTest.java b/delta-sharing-web/src/test/java/sparkengine/delta/sharing/web/store/ConfigRepositoryTest.java new file mode 100644 index 0000000..2b4bdca --- /dev/null +++ b/delta-sharing-web/src/test/java/sparkengine/delta/sharing/web/store/ConfigRepositoryTest.java @@ -0,0 +1,32 @@ +package sparkengine.delta.sharing.web.store; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.test.context.SpringBootTest; +import org.springframework.dao.DuplicateKeyException; +import org.springframework.test.context.ActiveProfiles; + +import java.util.UUID; + + +@SpringBootTest +@ActiveProfiles("test") +public class ConfigRepositoryTest { + + @Autowired + private ConfigRepository configRepository; + + @Test + public void testBasicOperations() { + + var name = UUID.randomUUID().toString(); + + Assertions.assertFalse(configRepository.isShare(name)); + Assertions.assertTrue(configRepository.addShare(name)); + Assertions.assertThrows(DuplicateKeyException.class, () -> configRepository.addShare(name)); + Assertions.assertTrue(configRepository.isShare(name)); + + } + +} diff --git a/pom.xml b/pom.xml index a72731a..e8f605e 100644 --- a/pom.xml +++ b/pom.xml @@ -45,7 +45,7 @@ - 0.14.1 + 0.14.2 UTF-8 @@ -54,15 +54,41 @@ ${java.version} javadoc-sources - + 3.0.2 3.1.2 2.12 2.12.10 + 1.0.0 + 1.18.18 + 1.78 2.10.5 + + + 2.4.1 + 5.2.4 + + + 2.5.1 + 1.5.2 + + 5.6.2 - 1.18.18 - 1.78 + + + 7.10.0 + 3.8.1 + 4.4.0 + 3.2.1 + 1.2.2 + 3.2.0 + 2.1.2.RELEASE + 1.18.18.0 + 2.22.2 + 2.7 + 1.6.7 + 1.6 + 3.14.11 @@ -74,6 +100,8 @@ plan-runtime plan-runtime-builder plan-app + delta-sharing-core + delta-sharing-web @@ -122,6 +150,17 @@ plan-app ${project.version} + + com.spark-engine + delta-sharing-core + ${project.version} + + + com.spark-engine + delta-sharing-web + ${project.version} + + com.google.code.findbugs @@ -140,22 +179,45 @@ ${spark.version} provided + + io.delta + delta-core_${scala.binary.version} + ${delta.version} + com.beust jcommander - ${jcmd.version} + ${jcommander.version} + + + org.flywaydb + flyway-core + ${flyway.version} + + org.hsqldb + hsqldb + ${hsqldb.version} + + com.fasterxml.jackson.dataformat jackson-dataformat-yaml - ${fasterxml.jackson.version} + 2.10.5 + + + com.fasterxml.jackson.core + jackson-databind + + com.fasterxml.jackson.core jackson-databind - ${fasterxml.jackson.version} + 2.10.5.1 + org.scala-lang @@ -167,6 +229,7 @@ scala-reflect ${scala.version} + org.junit.jupiter @@ -189,7 +252,7 @@ net.alchim31.maven scala-maven-plugin - 4.4.0 + ${scala-maven-plugin.version} -P:genjavadoc:out=${project.build.directory}/${generated.javadocsources.folder} @@ -206,17 +269,22 @@ org.apache.maven.plugins maven-compiler-plugin - 3.8.1 + ${maven-compiler-plugin.version} + + + org.springframework.boot + spring-boot-maven-plugin + ${spring-boot-maven-plugin.version} org.apache.maven.plugins maven-source-plugin - 3.2.1 + ${maven-source-plugin.version} org.codehaus.mojo flatten-maven-plugin - 1.2.2 + ${flatten-maven-plugin.version} ossrh @@ -224,32 +292,32 @@ org.apache.maven.plugins maven-javadoc-plugin - 3.2.0 + ${maven-javadoc-plugin.version} org.projectlombok lombok-maven-plugin - 1.18.18.0 + ${lombok-maven-plugin.version} org.apache.maven.plugins maven-surefire-plugin - 2.22.2 + ${maven-surefire-plugin.version} org.apache.maven.plugins maven-deploy-plugin - 2.7 + ${maven-deploy-plugin.version} org.sonatype.plugins nexus-staging-maven-plugin - 1.6.7 + ${nexus-staging-maven-plugin.version} org.apache.maven.plugins maven-gpg-plugin - 1.6 + ${maven-gpg-plugin.version} @@ -258,6 +326,16 @@ + + org.flywaydb + flyway-maven-plugin + ${flyway-maven-plugin.version} + + + org.jooq + jooq-codegen-maven + ${jooq-codegen-maven.version} + @@ -323,7 +401,7 @@ false true false - ${project.build.directory}/${generated.javadocsources.folder} + ${project.build.directory}/${generated.javadocsources.folder};${project.build.directory}/generated-sources