Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23268][SQL]Reorganize packages in data source V2 #20435

Closed
wants to merge 8 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.expressions.codegen.{BufferHolder, UnsafeRo
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.kafka010.KafkaSource.{INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_FALSE, INSTRUCTION_FOR_FAIL_ON_DATA_LOSS_TRUE}
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType
import org.apache.spark.unsafe.types.UTF8String

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,16 @@ package org.apache.spark.sql.kafka010
import org.apache.kafka.common.TopicPartition

import org.apache.spark.sql.execution.streaming.{Offset, SerializedOffset}
import org.apache.spark.sql.sources.v2.streaming.reader.{Offset => OffsetV2, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming
import org.apache.spark.sql.sources.v2.reader.streaming.PartitionOffset
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep it same as before?

import org.apache.spark.sql.sources.v2.reader.streaming.{Offset => OffsetV2, PartitionOffset}


/**
* An [[Offset]] for the [[KafkaSource]]. This one tracks all partitions of subscribed topics and
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this Offset be streaming.Offset?

* their offsets.
*/
private[kafka010]
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long]) extends OffsetV2 {
case class KafkaSourceOffset(partitionToOffsets: Map[TopicPartition, Long])
extends streaming.Offset {

override val json = JsonUtils.partitionOffsets(partitionToOffsets)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SaveMode, SparkSessio
import org.apache.spark.sql.execution.streaming.{Sink, Source}
import org.apache.spark.sql.sources._
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport
import org.apache.spark.sql.sources.v2.writer.StreamWriteSupport
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.kafka010.KafkaWriter.validateQuery
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.types.StructType

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.streaming;
package org.apache.spark.sql.sources.v2.reader;

import java.util.Optional;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader;
import org.apache.spark.sql.types.StructType;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,14 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.streaming;
package org.apache.spark.sql.sources.v2.reader;

import java.util.Optional;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.streaming.reader.MicroBatchReader;
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader;
import org.apache.spark.sql.types.StructType;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.spark.sql.sources.v2.reader;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.partitioning.Partitioning;

/**
* A mix in interface for {@link DataSourceReader}. Data source readers can implement this
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;
package org.apache.spark.sql.sources.v2.reader.partitioning;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;

/**
* A concrete implementation of {@link Distribution}. Represents a distribution where records that
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,10 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;
package org.apache.spark.sql.sources.v2.reader.partitioning;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;

/**
* An interface to represent data distribution requirement, which specifies how the records should
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,11 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.reader;
package org.apache.spark.sql.sources.v2.reader.partitioning;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReaderFactory;
import org.apache.spark.sql.sources.v2.reader.SupportsReportPartitioning;

/**
* An interface to represent the output data partitioning for a data source, which is returned by
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.streaming.reader;
package org.apache.spark.sql.sources.v2.reader.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.streaming.reader;
package org.apache.spark.sql.sources.v2.reader.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.streaming.reader;
package org.apache.spark.sql.sources.v2.reader.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataSourceReader;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.streaming.reader;
package org.apache.spark.sql.sources.v2.reader.streaming;

import org.apache.spark.annotation.InterfaceStability;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.streaming.reader;
package org.apache.spark.sql.sources.v2.reader.streaming;

import java.io.Serializable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
/**
* A data source writer that is returned by
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceOptions)}/
* {@link org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport#createStreamWriter(
* {@link StreamWriteSupport#createStreamWriter(
* String, StructType, OutputMode, DataSourceOptions)}.
* It can mix in various writing optimization interfaces to speed up the data saving. The actual
* writing logic is delegated to {@link DataWriter}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,13 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.streaming;
package org.apache.spark.sql.sources.v2.writer;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSink;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
* limitations under the License.
*/

package org.apache.spark.sql.sources.v2.streaming.writer;
package org.apache.spark.sql.sources.v2.writer.streaming;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.writer.DataSourceWriter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.datasources.v2

import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Expression}
import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.sources.v2.reader.{ClusteredDistribution, Partitioning}
import org.apache.spark.sql.sources.v2.reader.partitioning.{ClusteredDistribution, Partitioning}

/**
* An adapter from public data source partitioning to catalyst internal `Partitioning`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.catalyst.plans.physical
import org.apache.spark.sql.execution.{ColumnarBatchScan, LeafExecNode, WholeStageCodegenExec}
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
import org.apache.spark.sql.types.StructType

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.streaming.continuous.{CommitPartitionEpoch, ContinuousExecution, EpochCoordinatorRef, SetWriterPartitions}
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer._
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.Utils

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.sources.{InternalRowMicroBatchWriter, MicroBatchWriter}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.{MicroBatchReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.reader.{MicroBatchReader, Offset => OffsetV2}
import org.apache.spark.sql.sources.v2.writer.SupportsWriteInternalRow
import org.apache.spark.sql.sources.v2.reader.{streaming, MicroBatchReadSupport}
import org.apache.spark.sql.sources.v2.reader.streaming.MicroBatchReader
import org.apache.spark.sql.sources.v2.writer.{StreamWriteSupport, SupportsWriteInternalRow}
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.util.{Clock, Utils}

Expand Down Expand Up @@ -403,7 +403,7 @@ class MicroBatchExecution(
val current = committedOffsets.get(reader).map(off => reader.deserializeOffset(off.json))
reader.setOffsetRange(
toJava(current),
Optional.of(available.asInstanceOf[OffsetV2]))
Optional.of(available.asInstanceOf[streaming.Offset]))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shall we still use OffsetV2?

logDebug(s"Retrieving data from $reader: $current -> $available")
Some(reader ->
new StreamingDataSourceV2Relation(reader.readSchema().toAttributes, reader))
Expand Down Expand Up @@ -492,7 +492,7 @@ class MicroBatchExecution(
}
}

private def toJava(scalaOption: Option[OffsetV2]): Optional[OffsetV2] = {
private def toJava(scalaOption: Option[streaming.Offset]): Optional[streaming.Offset] = {
Optional.ofNullable(scalaOption.orNull)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,10 @@ import org.apache.spark.sql.{AnalysisException, DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.execution.streaming.continuous.RateStreamContinuousReader
import org.apache.spark.sql.execution.streaming.sources.RateStreamMicroBatchReader
import org.apache.spark.sql.sources.{DataSourceRegister, StreamSourceProvider}
import org.apache.spark.sql.sources.v2._
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, MicroBatchReadSupport}
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, MicroBatchReader}
import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport
import org.apache.spark.sql.sources.v2.reader.streaming.ContinuousReader
import org.apache.spark.sql.types._
import org.apache.spark.util.{ManualClock, SystemClock}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import org.json4s.jackson.Serialization
import org.apache.spark.sql.sources.v2

case class RateStreamOffset(partitionToValueAndRunTimeMs: Map[Int, ValueRunTimeMsPair])
extends v2.streaming.reader.Offset {
extends v2.reader.streaming.Offset {
implicit val defaultFormats: DefaultFormats = DefaultFormats
override val json = Serialization.write(partitionToValueAndRunTimeMs)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.plans.logical.Statistics
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.sources.v2.DataSourceV2
import org.apache.spark.sql.sources.v2.streaming.ContinuousReadSupport
import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport

object StreamingRelation {
def apply(dataSource: DataSource): StreamingRelation = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import org.apache.spark.sql._
import org.apache.spark.sql.execution.streaming.sources.ConsoleWriter
import org.apache.spark.sql.sources.{BaseRelation, CreatableRelationProvider, DataSourceRegister}
import org.apache.spark.sql.sources.v2.{DataSourceOptions, DataSourceV2}
import org.apache.spark.sql.sources.v2.streaming.StreamWriteSupport
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer.StreamWriteSupport
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.streaming.OutputMode
import org.apache.spark.sql.types.StructType

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,23 +18,19 @@
package org.apache.spark.sql.execution.streaming.continuous

import java.util.concurrent.{ArrayBlockingQueue, BlockingQueue, TimeUnit}
import java.util.concurrent.atomic.{AtomicBoolean, AtomicLong}
import java.util.concurrent.atomic.AtomicBoolean

import scala.collection.JavaConverters._

import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.rdd.RDD
import org.apache.spark.rpc.RpcEndpointRef
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions.UnsafeRow
import org.apache.spark.sql.execution.datasources.v2.{DataSourceRDDPartition, RowToUnsafeDataReader}
import org.apache.spark.sql.execution.streaming._
import org.apache.spark.sql.execution.streaming.continuous._
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, PartitionOffset}
import org.apache.spark.sql.streaming.ProcessingTime
import org.apache.spark.util.{SystemClock, ThreadUtils}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, PartitionOffset}
import org.apache.spark.util.ThreadUtils

class ContinuousDataSourceRDD(
sc: SparkContext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ import org.apache.spark.sql.execution.SQLExecution
import org.apache.spark.sql.execution.datasources.v2.{DataSourceV2Relation, StreamingDataSourceV2Relation, WriteToDataSourceV2}
import org.apache.spark.sql.execution.streaming.{ContinuousExecutionRelation, StreamingRelationV2, _}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.{ContinuousReadSupport, StreamWriteSupport}
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.ContinuousReadSupport
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.StreamWriteSupport
import org.apache.spark.sql.streaming.{OutputMode, ProcessingTime, Trigger}
import org.apache.spark.sql.types.StructType
import org.apache.spark.util.{Clock, Utils}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.streaming.{RateSourceProvider, RateStreamO
import org.apache.spark.sql.execution.streaming.sources.RateStreamSourceV2
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.reader._
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousDataReader, ContinuousReader, Offset, PartitionOffset}
import org.apache.spark.sql.types.StructType

case class RateStreamPartitionOffset(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ import org.apache.spark.SparkEnv
import org.apache.spark.internal.Logging
import org.apache.spark.rpc.{RpcCallContext, RpcEndpointRef, RpcEnv, ThreadSafeRpcEndpoint}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.sources.v2.streaming.reader.{ContinuousReader, PartitionOffset}
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.reader.streaming.{ContinuousReader, PartitionOffset}
import org.apache.spark.sql.sources.v2.writer.WriterCommitMessage
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.util.RpcUtils

private[continuous] sealed trait EpochCoordinatorMessage extends Serializable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ package org.apache.spark.sql.execution.streaming.sources
import org.apache.spark.internal.Logging
import org.apache.spark.sql.{Row, SparkSession}
import org.apache.spark.sql.sources.v2.DataSourceOptions
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer.{DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter
import org.apache.spark.sql.types.StructType

/** Common methods used to create writes for the the console sink */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@ package org.apache.spark.sql.execution.streaming.sources

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.sources.v2.streaming.writer.StreamWriter
import org.apache.spark.sql.sources.v2.writer.{DataSourceWriter, DataWriterFactory, SupportsWriteInternalRow, WriterCommitMessage}
import org.apache.spark.sql.sources.v2.writer.streaming.StreamWriter

/**
* A [[DataSourceWriter]] used to hook V2 stream writers into a microbatch plan. It implements
Expand Down
Loading