Skip to content

Commit

Permalink
[Gluten-207] Support MergeTree DS V1 (facebookincubator#208)
Browse files Browse the repository at this point in the history
  • Loading branch information
zzcclp authored Jun 16, 2022
1 parent 7041aae commit 614543e
Show file tree
Hide file tree
Showing 37 changed files with 1,582 additions and 142 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.glutenproject.utils;

import org.apache.spark.SparkEnv;
import org.apache.spark.sql.execution.datasources.v2.clickhouse.ClickHouseConfig;

/**
* An object that generates IDs.
* This is broken into a separate class in case
* we ever want to support multiple worker threads per process.
* Refer to twitter-archive Snowflake.
*/
public class SnowflakeIdWorker {

// ==============================Fields===========================================
private final long twepoch = 1640966400L;

private final long workerIdBits = 6L;

private final long maxWorkerId = -1L ^ (-1L << workerIdBits);

private final long sequenceBits = 16L;

private final long workerIdShift = sequenceBits;

private final long timestampLeftShift = sequenceBits + workerIdBits;

private final long sequenceMask = -1L ^ (-1L << sequenceBits);

private long workerId;

private long sequence = 0L;

private long lastTimestamp = -1L;

//==============================Singleton=====================================
private static volatile SnowflakeIdWorker INSTANCE;

public static SnowflakeIdWorker getInstance() {
if (INSTANCE == null) {
synchronized (SnowflakeIdWorker.class) {
if (INSTANCE == null) {
if (!SparkEnv.get().conf().contains(ClickHouseConfig.CLICKHOUSE_WORKER_ID())) {
throw new IllegalArgumentException("Please set an unique value to " +
ClickHouseConfig.CLICKHOUSE_WORKER_ID());
}
INSTANCE = new SnowflakeIdWorker(
SparkEnv.get().conf()
.getLong(ClickHouseConfig.CLICKHOUSE_WORKER_ID(), 0));
}
}
}
return INSTANCE;
}

//==============================Constructors=====================================

public SnowflakeIdWorker(long workerId) {
if (workerId > maxWorkerId || workerId < 0) {
throw new IllegalArgumentException(
String.format("worker Id can't be greater than %d or less than 0", maxWorkerId));
}
this.workerId = workerId;
}

// ==============================Methods==========================================
public synchronized long nextId() {
long timestamp = timeGen();

if (timestamp < lastTimestamp) {
throw new RuntimeException(
String.format("Clock moved backwards. Refusing to generate id for %d milliseconds",
lastTimestamp - timestamp));
}

if (lastTimestamp == timestamp) {
sequence = (sequence + 1) & sequenceMask;
if (sequence == 0) {
timestamp = tilNextMillis(lastTimestamp);
}
}
else {
sequence = 0L;
}

lastTimestamp = timestamp;

return ((timestamp - twepoch) << timestampLeftShift) //
| (workerId << workerIdShift) //
| sequence;
}

protected long tilNextMillis(long lastTimestamp) {
long timestamp = timeGen();
while (timestamp <= lastTimestamp) {
timestamp = timeGen();
}
return timestamp;
}

protected long timeGen() {
return System.currentTimeMillis() / 1000L;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,24 @@ import io.glutenproject.GlutenConfig
import io.glutenproject.execution._
import io.glutenproject.vectorized.{BlockNativeWriter, CHColumnarBatchSerializer}
import org.apache.spark.{ShuffleDependency, SparkException}

import org.apache.spark.rdd.RDD
import org.apache.spark.serializer.Serializer
import org.apache.spark.shuffle.{GenShuffleWriterParameters, GlutenShuffleWriterWrapper}
import org.apache.spark.shuffle.utils.CHShuffleUtil
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression}
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.plans.physical.Partitioning
import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.exchange.BroadcastExchangeExec
import org.apache.spark.sql.execution.joins.{BuildSideRelation, ClickHouseBuildSideRelation}
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.execution.utils.CHExecUtil
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.vectorized.ColumnarBatch
import org.apache.spark.sql.{SparkSession, Strategy}
import org.apache.spark.sql.extension.{CHDataSourceV2Strategy, ClickHouseAnalysis}
import org.apache.spark.sql.internal.SQLConf

class CHSparkPlanExecApi extends ISparkPlanExecApi {

Expand Down Expand Up @@ -148,6 +152,26 @@ class CHSparkPlanExecApi extends ISparkPlanExecApi {
ClickHouseBuildSideRelation(child.output, batches)
}

/**
* Generate extended DataSourceV2 Strategy.
* Currently only for ClickHouse backend.
*
* @return
*/
override def genExtendedDataSourceV2Strategy(spark: SparkSession): Strategy = {
CHDataSourceV2Strategy(spark)
}

/**
* Generate extended Analyzer.
* Currently only for ClickHouse backend.
*
* @return
*/
override def genExtendedAnalyzer(spark: SparkSession, conf: SQLConf): Rule[LogicalPlan] = {
new ClickHouseAnalysis(spark, conf)
}

/**
* Get the backend api name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@ import io.glutenproject.GlutenConfig
import io.glutenproject.expression.{ExpressionConverter, ExpressionTransformer}
import io.glutenproject.substrait.SubstraitContext
import io.glutenproject.substrait.expression.SelectionNode

import io.glutenproject.utils.InputPartitionsUtil
import org.apache.spark.internal.Logging
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.physical.{HashPartitioning, Partitioning, RangePartitioning}
import org.apache.spark.sql.execution.datasources.FileFormat
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.v1.ClickHouseFileIndex
import org.apache.spark.sql.execution.datasources.{FileFormat, HadoopFsRelation, PartitionDirectory}

class CHTransformerApi extends ITransformerApi with Logging {

Expand Down Expand Up @@ -67,6 +69,20 @@ class CHTransformerApi extends ITransformerApi with Logging {
*/
def supportsReadFileFormat(fileFormat: FileFormat): Boolean = true

/**
* Generate Seq[InputPartition] for FileSourceScanExecTransformer.
*/
def genInputPartitionSeq(relation: HadoopFsRelation,
selectedPartitions: Array[PartitionDirectory]): Seq[InputPartition] = {
if (relation.location.isInstanceOf[ClickHouseFileIndex]) {
// Generate NativeMergeTreePartition for MergeTree
relation.location.asInstanceOf[ClickHouseFileIndex].partsPartitions
} else {
// Generate FilePartition for Parquet
InputPartitionsUtil.genInputPartitionSeq(relation, selectedPartitions)
}
}

/**
* Get the backend api name.
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.utils

import io.glutenproject.execution.NativeMergeTreePartition
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.v2.clickhouse.metadata.AddMergeTreeParts
import org.apache.spark.sql.execution.datasources.v2.clickhouse.table.ClickHouseTableV2

import scala.collection.mutable.ArrayBuffer

object MergeTreePartsPartitionsUtil {

def getPartsPartitions(sparkSession: SparkSession,
table: ClickHouseTableV2): Seq[InputPartition] = {
val maxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes
val partsFiles = table.listFiles()

val partitions = new ArrayBuffer[InputPartition]
val database = table.catalogTable.get.identifier.database.get
val tableName = table.catalogTable.get.identifier.table
val engine = table.snapshot.metadata.configuration.get("engine").get
val tablePath = table.deltaLog.dataPath.toString.substring(6)
var currentMinPartsNum = -1L
var currentMaxPartsNum = -1L
var currentSize = 0L

/** Close the current partition and move to the next. */
def closePartition(): Unit = {
if (currentMinPartsNum > 0L && currentMaxPartsNum >= currentMinPartsNum) {
val newPartition = NativeMergeTreePartition(partitions.size, engine, database, tableName,
tablePath, currentMinPartsNum, currentMaxPartsNum + 1)
partitions += newPartition
}
currentMinPartsNum = -1L
currentMaxPartsNum = -1L
currentSize = 0
}

val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes
// Assign files to partitions using "Next Fit Decreasing"
partsFiles.foreach { parts =>
if (currentSize + parts.bytesOnDisk > maxSplitBytes) {
closePartition()
}
// Add the given file to the current partition.
currentSize += parts.bytesOnDisk + openCostInBytes
if (currentMinPartsNum == -1L) {
currentMinPartsNum = parts.minBlockNumber
}
currentMaxPartsNum = parts.maxBlockNumber
}
closePartition()
partitions
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
/*
* Copyright (2021) The Delta Lake Project Authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.execution.datasources.v1

import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.expressions.{Cast, Expression, GenericInternalRow, Literal}
import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.delta.actions.AddFile
import org.apache.spark.sql.delta.{DeltaLog, Snapshot}
import org.apache.spark.sql.delta.files.TahoeFileIndex
import org.apache.spark.sql.execution.datasources.PartitionDirectory
import org.apache.spark.sql.execution.datasources.utils.MergeTreePartsPartitionsUtil
import org.apache.spark.sql.execution.datasources.v2.clickhouse.table.ClickHouseTableV2
import org.apache.spark.sql.types.StructType

import java.util.Objects

case class ClickHouseFileIndex(override val spark: SparkSession,
override val deltaLog: DeltaLog,
override val path: Path,
table: ClickHouseTableV2,
snapshotAtAnalysis: Snapshot,
partitionFilters: Seq[Expression] = Nil,
isTimeTravelQuery: Boolean = false)
extends TahoeFileIndex(spark, deltaLog, path) {

override def tableVersion: Long = {
if (isTimeTravelQuery) snapshotAtAnalysis.version else deltaLog.snapshot.version
}

protected def getSnapshotToScan: Snapshot = {
if (isTimeTravelQuery) snapshotAtAnalysis else deltaLog.update(stalenessAcceptable = true)
}

/** Provides the version that's being used as part of the scan if this is a time travel query. */
def versionToUse: Option[Long] =
if (isTimeTravelQuery) Some(snapshotAtAnalysis.version) else None

def getSnapshot: Snapshot = {
getSnapshotToScan
}

override def matchingFiles(
partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[AddFile] = {
Seq.empty[AddFile]
}

override def inputFiles: Array[String] = {
table.listFiles().map(_.path).toArray
}

override def listFiles(partitionFilters: Seq[Expression],
dataFilters: Seq[Expression]): Seq[PartitionDirectory] = {
table.listFiles().map(parts => {
val fileStats = new FileStatus(
/* length */ parts.bytesOnDisk,
/* isDir */ false,
/* blockReplication */ 0,
/* blockSize */ 1,
/* modificationTime */ parts.modificationTime,
absolutePath(parts.path))
PartitionDirectory(new GenericInternalRow(Array.empty[Any]), Seq(fileStats))
})
}

def partsPartitions: Seq[InputPartition] =
MergeTreePartsPartitionsUtil.getPartsPartitions(spark, table)

override def refresh(): Unit = {}

override val sizeInBytes: Long = table.listFiles().map(_.bytesOnDisk).sum

override def equals(that: Any): Boolean = that match {
case t: ClickHouseFileIndex =>
t.path == path && t.deltaLog.isSameLogAs(deltaLog) &&
t.versionToUse == versionToUse && t.partitionFilters == partitionFilters
case _ => false
}

override def hashCode: scala.Int = {
Objects.hashCode(path, deltaLog.tableId -> deltaLog.dataPath, versionToUse, partitionFilters)
}

override def partitionSchema: StructType = snapshotAtAnalysis.metadata.partitionSchema
}
Loading

0 comments on commit 614543e

Please sign in to comment.