Skip to content

Spark Extensions

vnnv01 edited this page Mar 6, 2018 · 1 revision

PSTL is nothing more than a general purpose Spark SQL application. As a result, all of the general purpose Spark SQL functionality is available to you. For example, any standard Spark SQL Function can be used within a job definition. However, PSTL also modifies and extends Spark SQL to provide additional productivity enhancements. Below are a list of our modifications, and the reasoning for each customization.

SQL Analyzer

PSTL injects a few additional rules to Spark SQL's Analyzer at runtime, primarily to modify certain runtime features to ensure a safe runtime environment for each job definition.

Checkpoint Location

Spark Structured Streaming allows users to specify a checkpointLocation for each streaming query. The configured checkpointLocation is used to store a write-ahead-log (WAL), enabling consistent recovery semantics during failures, etc. The underlying WAL also enables exactly-once semantics for each micro-batch processed by a streaming query.

Typically, the user configures checkpointLocation as an option on every streaming query. Since PSTL is a generic execution runtime for a PSTL job, we automate configuration of the checkpointLocation for each streaming query we detect in a PSTL job definition. As long as the PSTL administrator has configured spark.sql.streaming.checkpointLocation for their installation, the configured value will be used as a base path. PSTL will create subdirectories underneath this base directory for each submitted job (based on the job id). Following, PSTL will create additional subdirectories underneath each job's base directory for each streaming query.

Convention based configuration of the checkpointLocation provides multiple benefits:

  • Administrators can easily access the WAL(s) for all jobs from a consistent base path, rather than having to cross reference the job definition to determine its WAL location.
  • Users can more easily implement jobs without having to be concerned about every possible configuration.

Like all good conventions, users can override this behavior if needed. If a specific job should not follow the convention ${spark.sql.streaming.checkpointLocation}/$jobId/$queryId, the user can simply provide a checkpointLocation option for their streaming query. If checkpointLocation is specified in a streaming query's options, it takes precedence over the convention. For example:

SAVE STREAM foo
TO JSON
OPTIONS(
  'checkpointLocation'='/path/to/checkpoint'
);

Alternatively, the user can override spark.sql.streaming.checkpointLocation in their job specific spark.properties if they wish to continue using the convention, but with a base path which deviates from the system default.

-- spark.properties: spark.sql.streaming.checkpointLocation=/path/to/checkpoint
SAVE STREAM foo
TO JSON
OPTIONS();

Disable Global Statements

PSTL explicitly disables the following Spark SQL features:

  • Global views
  • Global temporary views
  • Global functions (non-temporary function)

The above features are disabled to explicitly ensure jobs are isolated from one another (e.g., they can't share a global view). Currently, PSTL launches each job as a dedicated Spark driver, so this is primarily to police user behavior. However, in the future, PSTL may colocate similar jobs on the same Spark driver, so disabling these features are more important if we transition to such a model. Below are some examples to demonstrate what is and is not allowed.

-- allowed
CREATE VIEW ...;
-- not allowed
CREATE GLOBAL VIEW ...;

-- allowed
CREATE TEMPORARY VIEW ...;
-- not allowed
CREATE GLOBAL TEMPORARY VIEW ...;

-- allowed
CREATE TEMPORARY FUNCTION ...;
-- not allowed
CREATE FUNCTION ...;

Inject Streaming Options

Most sources and sinks require some number of options to function correctly. For example, Kafka sources require a reference to the bootstrap servers so we can connect to the Kafka cluster. Certain options are generally environment specific. Specifying these options for every source or sink across all of your job definitions can become tedious. PSTL provides a resolution rule for the SQL analyzer which allows us to automatically push these options down to sources and sinks if they are specified in Spark's system configuration.

Let's use a Kafka source as an example, typically we would define a Kafka source as follows:

CREATE STREAM foo
FROM KAFKA
OPTIONS(
  'kafka.bootstrap.servers'='localhost:9092',
  'subscribe'='foo'
);

Note the kafka.bootstrap.servers option. It is quite likely these same bootstrap servers will be used for other Kafka sources and sinks in a common environment (e.g., dev, stage, prod, etc.). We can reduce this repetition by configuring kafka.bootstrap.servers elsewhere in a slightly different format.

spark-defaults.conf:

spark.sql.streaming.kafka.bootstrap.servers localhost:9092

In the above example, we specify the bootstrap servers in spark-defaults.conf, e.g. Spark's system configuration. Note the key is prefixed with spark.sql.streaming.

PSTL spark.properties:

spark.sql.streaming.kafka.bootstrap.servers=localhost:9092

In the above example, we specify the bootstrap servers in PSTL's spark.properties, e.g. PSTL's system configuration for Spark settings. Note the key is prefixed with spark.sql.streaming.

PSTL Job spark.properties:

spark.sql.streaming.kafka.bootstrap.servers=localhost:9092

In the above example, we specify the bootstrap servers in a PSTL Job's spark.properties, e.g. PSTL's job configuration for Spark settings. Note the key is prefixed with spark.sql.streaming.

Essentially, by prefixing any option used by a source or sink with spark.sql.streaming, we can move the configuration to a more reusable location. In some cases, sources and sinks may share common options. Kafka is a good example, both the source and the sink expect to receive a set of bootstrap servers.

Consider a use case where we have two Kafka clusters: one for data from upstream producers, one for processed data within our business unit. In this case we may want to configure the default kafka.bootstrap.servers for our sources to one value, and the default for our sinks to another value.

This is possible by specifying spark.sql.streaming.source.kafka.bootstrap.servers for sources, and spark.sql.streaming.sink.kafka.bootstrap.servers for sinks in one of the locations described above. If both spark.sql.streaming.kafka.bootstrap.servers and spark.sql.streaming.source.kafka.bootstrap.servers are defined, the source specific setting takes higher precedence (the same goes for a sink specific setting).

SQL Functions

from_avro

This function returns the sql row(s) for a given Avro schema or Avro bytes.

SELECT _FUNC_(bytesCol, '{...}');
       output

All the supported types from Avro to Spark Sql are in the Library. The Union type in Avro have three basic considerable types in spark SQL.

  • union(int, long) will be mapped to LongType.
  • union(float, double) will be mapped to DoubleType.
  • union(something, null), where something is any supported Avro type. This will be mapped to the same Spark SQL type as that of something, with nullable set to true.

from_protobuf

This function converts protobuf schema to sparkSQL. It is a function that returns Proto bytes deserialized based on class mapped to relevant Apache Spark DataTypes(s), class loading is expected to be handled upstream.

sfs_cat

to_avro

This function converts sparkSQL into avro schema. This function returns Avro bytes with a given struct value.

 SELECT _FUNC_(named_struct('a', 1, 'b', 2));
       output
 SELECT _FUNC_(array(named_struct('a', 1, 'b', 2)));
       output

Generally for all the types, the mapping from Spark types to Avro types is straightforward (e.g. IntegerType gets converted to int); however, there are a few special cases which are here.

Using the Spark's Dataframe API is the highly recommended way to read or write Avro Data from Spark SQL.

The SQL API for querying Avro Data in SQL by registering the data as temporary table is

CREATE TEMPORARY TABLE episodes
USING com.databricks.spark.avro
OPTIONS (path "src/test/resources/episodes.avro")

to_protobuf

val personsPB = new ProtoParquetRDD(sc, "persons.parquet", classOf[Person])

This is for reading the sparkSQL's parquet file as RDD[Protobuf]. For this we need SparkContext, parquet path and protobuf class. The raw data will be ingested as Dataframe and it persists as parquet file. This parquet file is read back as RDD[parquet]. With the typesafe protobuf getters, any data transformation and extraction can be done.

vertica_hash

SQL Parser

PSTL extends Spark's SQL Parser with additional non-standard SQL statements. These additional statements help bridge the gap with Spark SQL features which are not yet supported by Spark's SQL Parser. Similarly, some of these additional statements help users define things which are unlikely to be supported in the future by Spark's SQL Parser. Since all PSTL jobs are defined in SQL, it is important we provide necessary statement infrastructure to represent the needs of our users.

CREATE STREAM

Allows the user to define a Spark Structured Streaming Source in SQL. In addition to providing full feature parity with Spark's Scala DSL for Structured Streaming Sources, CREATE STREAM also automatically registers a temporary view based on the stream name. Automatically registering a temporary view with the stream name allows the user to reference the source as if it were any other table.

ANTLR grammar:

CREATE STREAM tableIdentifier ('(' colTypeList ')')?
sourceProvider
(OPTIONS tablePropertyList)?

Example:

CREATE STREAM foo
FROM KAFKA
OPTIONS(
  'key'='value'
);

Example with schema:

CREATE STREAM foo(a int, b string)
FROM JSON
OPTIONS(
  'path'='/path/to/json'
);

SAVE STREAM

Allows the user to define a Spark Structured Streaming Sink in SQL. As with Spark's Scala DSL, the tableIdentifier provided to SAVE STREAM must be a streaming relation (e.g., SAVE STREAM foo will fail, if the foo table is not based on a Spark Structured Streaming Source at the root of the DAG).

ANTLR Grammar:

SAVE STREAM tableIdentifier
sinkProvider
(IDENTIFIED BY qualifiedName)?
(TRIGGER trigger)?
(OUTPUT MODE outputMode)?
(PARTITIONED ON identifierList)?
(OPTIONS tablePropertyList)?

Example:

SAVE STREAM foo
TO KAFKA
OPTIONS(
  'key'='value'
);

Example with identifier:

SAVE STREAM foo
TO KAFKA
IDENTIFIED BY bar
OPTIONS(...);

In the above example, the streaming query's queryName will be set to bar. If the IDENTIFIED BY clause is not provided, the streaming query's queryName will be set to foo (e.g., the input table).

Example with trigger:

SAVE STREAM foo
TO KAFKA
TRIGGER 5 MINUTES
OPTIONS(...);

In the above example, the streaming query will only be executed every 5 minutes. If TRIGGER is not specified, the streaming query will be executed as quickly as each micro-batch processes data from the source.

Example with output mode:

SAVE STREAM foo
TO KAFKA
OUTPUT MODE UPDATE
OPTIONS(...);

In the above example, the streaming query's output mode will be update. If OUTPUT MODE is not specified, the streaming query's output mode will default to append. Note, the sink must support output mode update or complete, otherwise specifying an output mode other than append will fail. The documentation for each specific sink will specify which output mode(s) the sink supports.

Example with partitioning:

SAVE STREAM foo
TO JSON
PARTITIONED ON(a)
OPTIONS(...);

In the above example, the streaming query will partition output based on distinct values of column a. Note, column a must exist in the input table foo. Note, the sink must support partitioning, otherwise this is a no-op. The documentation for each specific sink will specify whether or not partitioning is supported and specifically how partitioned column(s) are handled if it is supported.

SHOW STREAMS

Allows the user to view active streaming queries as a table, printed to the console. Typically SHOW STREAMS is used in interactive environments when prototyping.

SHOW STREAMS;

STOP STREAM

Allows the user to stop a specific streaming query, based on the queryName. Typically, STOP STREAM is used in interactive environments when prototyping.

STOP STREAM foo;
Clone this wiki locally