Skip to content

Commit

Permalink
[SPARK-23119][SS] Minor fixes to V2 streaming APIs
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

- Added `InterfaceStability.Evolving` annotations
- Improved docs.

## How was this patch tested?
Existing tests.

Author: Tathagata Das <[email protected]>

Closes #20286 from tdas/SPARK-23119.

(cherry picked from commit bac0d66)
Signed-off-by: Shixiong Zhu <[email protected]>
  • Loading branch information
tdas authored and zsxwing committed Jan 18, 2018
1 parent b84c2a3 commit 9783aea
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Optional;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.DataSourceV2;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.streaming.reader.ContinuousReader;
Expand All @@ -28,6 +29,7 @@
* A mix-in interface for {@link DataSourceV2}. Data sources can implement this interface to
* provide data reading ability for continuous stream processing.
*/
@InterfaceStability.Evolving
public interface ContinuousReadSupport extends DataSourceV2 {
/**
* Creates a {@link ContinuousReader} to scan the data from this data source.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,13 @@

package org.apache.spark.sql.sources.v2.streaming.reader;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataReader;

/**
* A variation on {@link DataReader} for use with streaming in continuous processing mode.
*/
@InterfaceStability.Evolving
public interface ContinuousDataReader<T> extends DataReader<T> {
/**
* Get the offset of the current record, or the start offset if no records have been read.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.sources.v2.streaming.reader;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;

Expand All @@ -27,11 +28,15 @@
* interface to allow reading in a continuous processing mode stream.
*
* Implementations must ensure each read task output is a {@link ContinuousDataReader}.
*
* Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
* DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
*/
@InterfaceStability.Evolving
public interface ContinuousReader extends BaseStreamingSource, DataSourceV2Reader {
/**
* Merge offsets coming from {@link ContinuousDataReader} instances in each partition to
* a single global offset.
* Merge partitioned offsets coming from {@link ContinuousDataReader} instances for each
* partition to a single global offset.
*/
Offset mergeOffsets(PartitionOffset[] offsets);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.spark.sql.sources.v2.streaming.reader;

import org.apache.spark.annotation.InterfaceStability;
import org.apache.spark.sql.sources.v2.reader.DataSourceV2Reader;
import org.apache.spark.sql.execution.streaming.BaseStreamingSource;

Expand All @@ -25,7 +26,11 @@
/**
* A mix-in interface for {@link DataSourceV2Reader}. Data source readers can implement this
* interface to indicate they allow micro-batch streaming reads.
*
* Note: This class currently extends {@link BaseStreamingSource} to maintain compatibility with
* DataSource V1 APIs. This extension will be removed once we get rid of V1 completely.
*/
@InterfaceStability.Evolving
public interface MicroBatchReader extends DataSourceV2Reader, BaseStreamingSource {
/**
* Set the desired offset range for read tasks created from this reader. Read tasks will
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,20 @@

package org.apache.spark.sql.sources.v2.streaming.reader;

import org.apache.spark.annotation.InterfaceStability;

/**
* An abstract representation of progress through a [[MicroBatchReader]] or [[ContinuousReader]].
* During execution, Offsets provided by the data source implementation will be logged and used as
* restart checkpoints. Sources should provide an Offset implementation which they can use to
* reconstruct the stream position where the offset was taken.
* An abstract representation of progress through a {@link MicroBatchReader} or
* {@link ContinuousReader}.
* During execution, offsets provided by the data source implementation will be logged and used as
* restart checkpoints. Each source should provide an offset implementation which the source can use
* to reconstruct a position in the stream up to which data has been seen/processed.
*
* Note: This class currently extends {@link org.apache.spark.sql.execution.streaming.Offset} to
* maintain compatibility with DataSource V1 APIs. This extension will be removed once we
* get rid of V1 completely.
*/
@InterfaceStability.Evolving
public abstract class Offset extends org.apache.spark.sql.execution.streaming.Offset {
/**
* A JSON-serialized representation of an Offset that is
Expand All @@ -37,7 +45,7 @@ public abstract class Offset extends org.apache.spark.sql.execution.streaming.Of
/**
* Equality based on JSON string representation. We leverage the
* JSON representation for normalization between the Offset's
* in memory and on disk representations.
* in deserialized and serialized representations.
*/
@Override
public boolean equals(Object obj) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@

import java.io.Serializable;

import org.apache.spark.annotation.InterfaceStability;

/**
* Used for per-partition offsets in continuous processing. ContinuousReader implementations will
* provide a method to merge these into a global Offset.
*
* These offsets must be serializable.
*/
@InterfaceStability.Evolving
public interface PartitionOffset extends Serializable {
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,14 @@
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.sources.v2.DataSourceV2Options;
import org.apache.spark.sql.sources.v2.WriteSupport;
import org.apache.spark.sql.streaming.OutputMode;
import org.apache.spark.sql.types.StructType;

/**
* A data source writer that is returned by
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}.
* {@link WriteSupport#createWriter(String, StructType, SaveMode, DataSourceV2Options)}/
* {@link org.apache.spark.sql.sources.v2.streaming.MicroBatchWriteSupport#createMicroBatchWriter(String, long, StructType, OutputMode, DataSourceV2Options)}/
* {@link org.apache.spark.sql.sources.v2.streaming.ContinuousWriteSupport#createContinuousWriter(String, StructType, OutputMode, DataSourceV2Options)}.
* It can mix in various writing optimization interfaces to speed up the data saving. The actual
* writing logic is delegated to {@link DataWriter}.
*
Expand Down

0 comments on commit 9783aea

Please sign in to comment.