Skip to content

Commit

Permalink
[SPARK-23317][SQL] rename ContinuousReader.setOffset to setStartOffset
Browse files Browse the repository at this point in the history
## What changes were proposed in this pull request?

In the document of `ContinuousReader.setOffset`, we say this method is used to specify the start offset. We also have a `ContinuousReader.getStartOffset` to get the value back. I think it makes more sense to rename `ContinuousReader.setOffset` to `setStartOffset`.

## How was this patch tested?

N/A

Author: Wenchen Fan <[email protected]>

Closes #20486 from cloud-fan/rename.
  • Loading branch information
cloud-fan authored and gatorsmile committed Feb 3, 2018
1 parent 3ff83ad commit fe73cb4
Show file tree
Hide file tree
Showing 6 changed files with 7 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ class KafkaContinuousReader(
override def readSchema: StructType = KafkaOffsetReader.kafkaSchema

private var offset: Offset = _
override def setOffset(start: ju.Optional[Offset]): Unit = {
override def setStartOffset(start: ju.Optional[Offset]): Unit = {
offset = start.orElse {
val offsets = initialOffsets match {
case EarliestOffsetRangeLimit => KafkaSourceOffset(offsetReader.fetchEarliestOffsets())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,12 @@ public interface ContinuousReader extends BaseStreamingSource, DataSourceReader
* start from the first record after the provided offset, or from an implementation-defined
* inferred starting point if no offset is provided.
*/
void setOffset(Optional<Offset> start);
void setStartOffset(Optional<Offset> start);

/**
* Return the specified or inferred start offset for this reader.
*
* @throws IllegalStateException if setOffset has not been called
* @throws IllegalStateException if setStartOffset has not been called
*/
Offset getStartOffset();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ class ContinuousExecution(

val loggedOffset = offsets.offsets(0)
val realOffset = loggedOffset.map(off => reader.deserializeOffset(off.json))
reader.setOffset(java.util.Optional.ofNullable(realOffset.orNull))
reader.setStartOffset(java.util.Optional.ofNullable(realOffset.orNull))
new StreamingDataSourceV2Relation(newOutput, reader)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class RateStreamContinuousReader(options: DataSourceOptions)

private var offset: Offset = _

override def setOffset(offset: java.util.Optional[Offset]): Unit = {
override def setStartOffset(offset: java.util.Optional[Offset]): Unit = {
this.offset = offset.orElse(RateStreamSourceV2.createInitialOffset(numPartitions, creationTime))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ class RateSourceV2Suite extends StreamTest {
test("continuous data") {
val reader = new RateStreamContinuousReader(
new DataSourceOptions(Map("numPartitions" -> "2", "rowsPerSecond" -> "20").asJava))
reader.setOffset(Optional.empty())
reader.setStartOffset(Optional.empty())
val tasks = reader.createDataReaderFactories()
assert(tasks.size == 2)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ case class FakeReader() extends MicroBatchReader with ContinuousReader {
def readSchema(): StructType = StructType(Seq())
def stop(): Unit = {}
def mergeOffsets(offsets: Array[PartitionOffset]): Offset = RateStreamOffset(Map())
def setOffset(start: Optional[Offset]): Unit = {}
def setStartOffset(start: Optional[Offset]): Unit = {}

def createDataReaderFactories(): java.util.ArrayList[DataReaderFactory[Row]] = {
throw new IllegalStateException("fake source - cannot actually read")
Expand Down

0 comments on commit fe73cb4

Please sign in to comment.