Skip to content

SQL Create Stream

vnnv01 edited this page Mar 6, 2018 · 1 revision

If you are not familiar with the basic characteristics of Spark Structured Streaming, please refer to the structured streaming programming guide.

WARNING: If you are attempting to enable this functionality outside the scope of a PSTL deployment, you will need to configure spark.sql.extensions properly with an extension class which provides a sql parser capable of interpreting these semantics. PSTL's parser can be found in this package: org.apache.spark.sql.catalyst.parser.pstl. The underlying extension class is simply reponsible for wiring up the parser implementation. The spark.sql.extensions configuration must be set before the underlying SparkSession is constructured.

Currently, Spark only offers programmatic access to structured streaming sources via the DataStreamReader API. PSTL enhances Spark's SQL parser to bridge the gap between SQL functionality and structured streaming sources. This allows users to easily define structured streaming sources in SQL for any number of data manipulation tasks.

Usage

Let's start by walking through the getting started example for Spark structured streaming, and its corresponding scala syntax. Then, we will learn how to translate the scala syntax to sql.

val lines = spark.readStream
  .format("socket")
  .option("host", "localhost")
  .option("port", 9999)
  .load()

In the above example, the user is defining a structured streaming source which will listen for data on localhost:9999. Let's see how we can convert this to a SQL query for use exclusively in our job definitions:

CREATE STREAM lines
FROM SOCKET
OPTIONS(
  'host'='localhost',
  'port'='9999'
);

In the above example, we can see the underlying format becomes FROM SOCKET. Similarly, each option(key, value) is specified in SQL as OPTIONS('key'='value', ...). The only subtle yet distinct difference between the two approaches is the SQL implementation will create a table named lines, so subsequent SQL operations can reference the underlying stream. As such, CREATE STREAM ... must be given a valid table name, otherwise the user will receive an error.

One feature we haven't covered from the scala syntax is the notion of providing a schema:

val lines = spark.readStream
  .format("socket")
  .schema(new StructType().add("value", "string"))
  .option("host", "localhost")
  .option("port", 9999)
  .load()

In the above example we are specifying the schema of each row written to our socket. We expect to interpret each message written to the socket as a string named value. We can achieve the same behavior in SQL as follows:

CREATE STREAM lines(value string)
FROM SOCKET
OPTIONS(
  'host'='localhost',
  'port'='9999'
);

Note the addition of a pseudo table definition to the table name lines(value string). Some sources require the user provide a schema to make sense of the underlying data (if they don't support inference, or inference is disabled for performance reasons, etc). In these cases, the user can easily provide a comma separated list of column names and respective data types.