Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23119][SS] Minor fixes to V2 streaming APIs #20286

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Optional;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
Expand All @@ -28,6 +29,7 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for continuous stream processing.
*/
@InterfaceStability.Evolving
public interface ContinuousReadSupport extends DataSourceV2 {
/**
* Creates a {@link ContinuousReader} to scan the data from this data source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.sources.v2.streaming.reader;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;

/**
* A variation on {@link DataReader} for use with streaming in continuous processing mode.
*/
@InterfaceStability.Evolving
public interface ContinuousDataReader<T> extends DataReader<T> {
/**
* Get the offset of the current record, or the start offset if no records have been read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.sources.v2.streaming.reader;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;

Expand All @@ -27,11 +28,15 @@
* interface to allow reading in a continuous processing mode stream.
*
* Implementations must ensure each read task output is a {@link ContinuousDataReader}.
*
* Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
* DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
*/
@InterfaceStability.Evolving
public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reader {
/**
* Merge offsets coming from {@link ContinuousDataReader} instances in each partition to
* a single global offset.
* Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
* partition to a single global offset.
*/
Offset mergeOffsets(PartitionOffset[] offsets);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.sources.v2.streaming.reader;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;

Expand All @@ -25,7 +26,11 @@
/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* interface to indicate they allow micro-batch streaming reads.
*
* Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
* DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
*/
@InterfaceStability.Evolving
public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource {
/**
* Set the desired offset range for read tasks created from this reader. Read tasks will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@

package org.apache.spark.sql.sources.v2.streaming.reader;

import org.apache.spark.annotation.InterfaceStability;

/**
* An abstract representation of progress through a [[MicroBatchReader]] or [[ContinuousReader]].
* During execution, Offsets provided by the data source implementation will be logged and used as
* restart checkpoints. Sources should provide an Offset implementation which they can use to
* reconstruct the stream position where the offset was taken.
* An abstract representation of progress through a {@link MicroBatchReader} or
* {@link ContinuousReader}.
* During execution, offsets provided by the data source implementation will be logged and used as
* restart checkpoints. Each source should provide an offset implementation which the source can use
* to reconstruct a position in the stream up to which data has been seen/processed.
*
* Note: This class currently extends {@link org.apache.spark.sql.execution.streaming.Offset} to
* maintain compatibility with DataSource V1 APIs. This extension will be removed once we
* get rid of V1 completely.
*/
@InterfaceStability.Evolving
public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset {
/**
* A JSON-serialized representation of an Offset that is
Expand All @@ -37,7 +45,7 @@ public abstract class Offset extends org.apache.spark.sql.execution.streaming.Of
/**
* Equality based on JSON string representation. We leverage the
* JSON representation for normalization between the Offset's
* in memory and on disk representations.
* in deserialized and serialized representations.
*/
@Override
public boolean equals(Object obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import java.io.Serializable;

import org.apache.spark.annotation.InterfaceStability;

/**
* Used for per-partition offsets in continuous processing. ContinuousReader implementations will
* provide a method to merge these into a global Offset.
*
* These offsets must be serializable.
*/
@InterfaceStability.Evolving
public interface PartitionOffset extends Serializable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

/**
* A data source writer that is returned by
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}.
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}/
* {@link org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport#createMicroBatchWriter(String, long, StructType, OutputMode, DataSourceV2Options)}/
* {@link org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport#createContinuousWriter(String, StructType, OutputMode, DataSourceV2Options)}.
* It can mix in various writing optimization interfaces to speed up the data saving. The actual
* writing logic is delegated to {@link DataWriter}.
*
Expand Down