Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-23052][SS] Migrate ConsoleSink to data source V2 api. #20243

Closed
wants to merge 20 commits into from

Conversation

jose-torres
Copy link
Contributor

What changes were proposed in this pull request?

Migrate ConsoleSink to data source V2 api.

Note that this includes a missing piece in DataStreamWriter required to specify a data source V2 writer.

Note also that I've removed the "Rerun batch" part of the sink, because as far as I can tell this would never have actually happened. A MicroBatchExecution object will only commit each batch once for its lifetime, and a new MicroBatchExecution object would have a new ConsoleSink object which doesn't know it's retrying a batch. So I think this represents an anti-feature rather than a weakness in the V2 API.

How was this patch tested?

new unit test

@jose-torres jose-torres changed the title [SPARK-23052] Migrate ConsoleSink to data source V2 api. [SPARK-23052][SS] Migrate ConsoleSink to data source V2 api. Jan 12, 2018
@jose-torres
Copy link
Contributor Author

I split off PackedRowWriterFactory with the intent to refactor MemorySinkV2 to use it later, but I just realized no refactoring is actually needed. So I've slotted it in.

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86017 has finished for PR 20243 at commit 71cc6e4.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86018 has finished for PR 20243 at commit b52c990.

  • This patch fails from timeout after a configured wait of `250m`.
  • This patch merges cleanly.
  • This patch adds no public classes.

@jose-torres
Copy link
Contributor Author

My realization above was not in fact true. I took out the MemorySinkV2 changes and will do them in a later PR.

@SparkQA
Copy link

SparkQA commented Jan 12, 2018

Test build #86051 has finished for PR 20243 at commit e3af17c.

  • This patch fails Spark unit tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

Copy link
Contributor

@tdas tdas left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, though needs more tests with fake data source to verify whether they are handled correctly.

schema: StructType,
mode: OutputMode,
options: DataSourceV2Options): Optional[DataSourceV2Writer] = {
Optional.of(new ConsoleWriter(epochId, schema, options.asMap.asScala.toMap))
}

def createRelation(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is createRelation used for? For batch?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume so. I'm not familiar with it, but it's not on the streaming source codepath.


/**
* A simple [[DataWriterFactory]] whose tasks just pack rows into the commit message for delivery
* to the [[org.apache.spark.sql.sources.v2.writer.DataSourceV2Writer]] on the driver.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: to a DataSourceV2Writer

case _ => throw new AnalysisException(
s"Data source $source does not support continuous writing")
}
val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are checking for the same conditions here as well as in the StreamingQueryManager.createQuery. I think we need to refactor this, probably sometime in the future once we get rid of v1 completely.

Either way, we should immediately add a general test suite (say StreamingDataSourceV2Suite) that tests these cases with various fake data sources.

class ConsoleWriter(batchId: Long, schema: StructType, options: Map[String, String])
extends DataSourceV2Writer with Logging {
// Number of rows to display, by default 20 rows
private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure get("numRows") on this options map is case-insensitive. Why not just pass on the DataSoureV2Options directly to this writer and use that directly? That is already case-insensitive.

In fact that same pattern should be used for all v2 reader/writer (verify this for Kafka continuous).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsoleRelation creates this map from a DataSoureV2Options, it contains lowercased keys.
Using DataSoureV2Options or asking for "numrows" would both work, but with DataSoureV2Options
options.get("numRows").map(_.toInt).getOrElse(20)
could also be simplified to
options.getInt("numRows", 20)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not that easy for Kafka continuous, because we're feeding the maps into utility methods (and Kafka-level interfaces) which insist on a java.util.Map[String, Object]. Fortunately the parameters already appear to be case sensitive there, and I think we have tests verifying that various parameters can be specified.

* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

class ConsoleWriter(batchId: Long, schema: StructType, options: Map[String, String])
extends DataSourceV2Writer with Logging {
// Number of rows to display, by default 20 rows
private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConsoleRelation creates this map from a DataSoureV2Options, it contains lowercased keys.
Using DataSoureV2Options or asking for "numrows" would both work, but with DataSoureV2Options
options.get("numRows").map(_.toInt).getOrElse(20)
could also be simplified to
options.getInt("numRows", 20)

* limitations under the License.
*/

package org.apache.spark.sql.execution.streaming.sources
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

private val numRowsToShow = options.get("numRows").map(_.toInt).getOrElse(20)

// Truncate the displayed data if it is too long, by default it is true
private val isTruncated = options.get("truncate").map(_.toBoolean).getOrElse(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same simplification possibility here, if DataSoureV2Options is used

|
|""".stripMargin)
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We could have a test to check numrows, something like this:

  test("console with numRows") {
    val input = MemoryStream[Int]

    val captured = new ByteArrayOutputStream()
    Console.withOut(captured) {
      val query = input.toDF().writeStream.format("console").option("NUMROWS", 2).start()
      try {
        input.addData(1, 2, 3)
        query.processAllAvailable()
      } finally {
        query.stop()
      }
    }

    assert(captured.toString() ==
      """-------------------------------------------
        |Batch: 0
        |-------------------------------------------
        |+-----+
        ||value|
        |+-----+
        ||    1|
        ||    2|
        |+-----+
        |only showing top 2 rows
        |
        |""".stripMargin)
  }

  test("console with truncation") {
    val input = MemoryStream[String]

    val captured = new ByteArrayOutputStream()
    Console.withOut(captured) {
      val query = input.toDF().writeStream.format("console").option("TRUNCATE", true).start()
      try {
        input.addData("123456789012345678901234567890")
        query.processAllAvailable()
      } finally {
        query.stop()
      }
    }

    assert(captured.toString() ==
      """-------------------------------------------
        |Batch: 0
        |-------------------------------------------
        |+--------------------+
        ||               value|
        |+--------------------+
        ||12345678901234567...|
        |+--------------------+
        |
        |""".stripMargin)
  }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indeed we could. Thanks for writing out the tests!

@@ -17,58 +17,36 @@

package org.apache.spark.sql.execution.streaming

import org.apache.spark.internal.Logging
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can you move this file into the sources subdirectory to make it consistent with other v2 sources?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in fact this file can be merged into the ConsoleWriter.scala. The combined file will be named console.scala

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can do this in a followup PR. It's not as simple as just moving it; we have to add an alias so that .format("org.apache.spark.sql.execution.streaming.ConsoleSinkProvider") continues to work.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

argh. okay. later then.


assert(ex.getMessage.contains(
"Data source fake-neither-mode does not support continuous writing"))
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you are testing only different types of sinks, not the different types of sources.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added tests for all 443 combinations of source/sink/trigger. Note that:

  • I had to revert the earlier change to initialize ContinuousExecution.sources to null, because it turns out this interferes with error generation on newly constructed executions.
  • Two of the cases don't throw the error until after start(). This will take a decent amount of disruptive changes to fix; the problem is that DataStreamWriter doesn't have direct visibility to what sources were used to generate it. We'd need to crawl the tree similarly to how we do it in the execution.

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86209 has finished for PR 20243 at commit be880b1.

  • This patch fails Spark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options)

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86213 has finished for PR 20243 at commit fac17a4.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86212 has finished for PR 20243 at commit e4c6429.

  • This patch fails PySpark unit tests.
  • This patch does not merge cleanly.
  • This patch adds the following public classes (experimental):
  • class FakeStreamingNeitherMode extends DataSourceRegister with DataSourceV2
  • class StreamingDataSourceV2Suite extends StreamTest

@@ -69,7 +69,7 @@ class ContinuousExecution(
ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
})
case StreamingRelationV2(_, sourceName, _, _, _) =>
throw new AnalysisException(
throw new UnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? An incorrect data source is not an operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there's an argument that it is - you're asking the data source (which is correct in the sense that it's a real, existing source) to do a type of read/write it doesn't support.

The primary motivation is that the existing code has already made the choice to throw an UnsupportedOperationException when you try to stream from a source that only outputs in batch mode.

@@ -54,7 +54,7 @@ class ContinuousExecution(
sparkSession, name, checkpointRoot, analyzedPlan, sink,
trigger, triggerClock, outputMode, deleteCheckpointOnStop) {

@volatile protected var continuousSources: Seq[ContinuousReader] = _
@volatile protected var continuousSources: Seq[ContinuousReader] = Seq()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why this change. is it related to this PR?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. As mentioned in an earlier comment, initializing to null means the StreamingQueryException won't construct if it happens before sources are set.

import org.apache.spark.sql.sources.v2.writer.{DataSourceV2Writer, DataWriterFactory, WriterCommitMessage}
import org.apache.spark.sql.types.StructType

class ConsoleWriter(batchId: Long, schema: StructType, options: DataSourceV2Options)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs and link it to the ConsoleSinkProvider since it's in a different file.

override def commit(messages: Array[WriterCommitMessage]): Unit = synchronized {
val batch = messages.collect {
case PackedRowCommitMessage(rows) => rows
}.fold(Array())(_ ++ _)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this complicated fold? Just array.collect { ... } returns an Array .. isnt it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It returns an array of arrays of rows, which isn't what we need.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use flatten instead of fold. Much cleaner.


case class PackedRowCommitMessage(rows: Array[Row]) extends WriterCommitMessage

class PackedRowDataWriter() extends DataWriter[Row] with Logging {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs.

}
}

case class PackedRowCommitMessage(rows: Array[Row]) extends WriterCommitMessage
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add docs.

override def write(row: Row): Unit = data.append(row)

override def commit(): PackedRowCommitMessage = {
val msg = PackedRowCommitMessage(data.clone().toArray)
Copy link
Contributor

@tdas tdas Jan 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you cloning and then calling toArray? Just data.toArray will create an immutable copy.

val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
val sink = (ds.newInstance(), trigger) match {
case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
case (_, _: ContinuousTrigger) => throw new UnsupportedOperationException(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AnalysisException.
Incorrect trigger or incompatible data source is not an operation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as above

case (w: ContinuousWriteSupport, _: ContinuousTrigger) => w
case (_, _: ContinuousTrigger) => throw new UnsupportedOperationException(
s"Data source $source does not support continuous writing")
case (w: MicroBatchWriteSupport, _) => w
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isnt there a case where it does not have MicroBatchWriteSupport, but the trigger is ProcessingTime/OneTime? That should have a different error message.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, we have to just fall back to the V1 path, because V1 sinks don't have MicroBatchWriteSupport.

query.stop()
}

private def testUnsupportedOperationCase(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just rename to testNegativeCase

.trigger(trigger)
.start()

eventually(timeout(streamingTimeout)) {
Copy link
Contributor

@tdas tdas Jan 17, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the case we should avoid. start() should fail if source/sink has a mismatch. Here start() is not failing. We must validate the source and sink earlier to avoid such a query from being started.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the reasons I discussed earlier, this is hard to do and won't fit in this PR.

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86232 has finished for PR 20243 at commit c0ec93f.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86235 has finished for PR 20243 at commit 516fd4a.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 17, 2018

Test build #86284 has finished for PR 20243 at commit 2916010.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

@tdas
Copy link
Contributor

tdas commented Jan 18, 2018

LGTM, assuming tests pass.

@SparkQA
Copy link

SparkQA commented Jan 18, 2018

Test build #86296 has finished for PR 20243 at commit 278eeb4.

  • This patch passes all tests.
  • This patch does not merge cleanly.
  • This patch adds no public classes.

@SparkQA
Copy link

SparkQA commented Jan 18, 2018

Test build #86298 has finished for PR 20243 at commit f3c170e.

  • This patch passes all tests.
  • This patch merges cleanly.
  • This patch adds no public classes.

asfgit pushed a commit that referenced this pull request Jan 18, 2018
## What changes were proposed in this pull request?

Migrate ConsoleSink to data source V2 api.

Note that this includes a missing piece in DataStreamWriter required to specify a data source V2 writer.

Note also that I've removed the "Rerun batch" part of the sink, because as far as I can tell this would never have actually happened. A MicroBatchExecution object will only commit each batch once for its lifetime, and a new MicroBatchExecution object would have a new ConsoleSink object which doesn't know it's retrying a batch. So I think this represents an anti-feature rather than a weakness in the V2 API.

## How was this patch tested?

new unit test

Author: Jose Torres <[email protected]>

Closes #20243 from jose-torres/console-sink.

(cherry picked from commit 1c76a91)
Signed-off-by: Tathagata Das <[email protected]>
@asfgit asfgit closed this in 1c76a91 Jan 18, 2018
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants