Skip to content

Getting Started

vnnv01 edited this page Mar 6, 2018 · 1 revision

Let's first start by creating a directory to store our job definition and configuration. Once we have created this directory, we will change into this directory.

mkdir my-job
cd my-job

Now, let's define a simple job which will listen for messages on port 9090 of our localhost. We will filter messages which do not begin with the letter a. Messages which do begin with the letter a will be transformed to uppercase before being printed out to the console.

Place the following contents in a file named job.sql:

CREATE STREAM foo
FROM SOCKET
OPTIONS(
  'host'='localhost',
  'port'='9090'
);

CREATE TEMPORARY VIEW bar AS
SELECT
  upper(value)
FROM foo
WHERE value LIKE 'a%';

SAVE STREAM bar
TO CONSOLE;

Now, let's configure our job execution environment. For now, we will simply state we would like Spark to run locally.

Place the following contents in a file named spark.properties:

spark.master=local[*]
spark.submit.deployMode=client

Before we deploy our job, open a separate shell. Start a netcat server which will relay data to our job for processing.

nc -lk 9090

Now, let's deploy our job. Our job will begin executing once necessary Spark and PSTL infrastructure is up and running. You will see a decent volume of log messages while your job is deploying and starting up.

Run the following command:

pstl --deploy .

Your job is up and running once you see output similar to:

18/01/09 14:02:47 INFO StreamExecution: Starting new streaming query.
18/01/09 14:02:47 INFO StreamExecution: Streaming query made progress: {
  "id" : "7d2f2d56-da04-476a-8c74-6245099d0231",
  "runId" : "54c40dc1-c6ea-4194-83a3-154caf09f7f7",
  "name" : "bar",
  "timestamp" : "2018-01-09T22:02:47.920Z",
  "numInputRows" : 0,
  "processedRowsPerSecond" : 0.0,
  "durationMs" : {
    "getOffset" : 0,
    "triggerExecution" : 14
  },
  "stateOperators" : [ ],
  "sources" : [ {
    "description" : "TextSocketSource[host: localhost, port: 9090]",
    "startOffset" : null,
    "endOffset" : null,
    "numInputRows" : 0,
    "processedRowsPerSecond" : 0.0
  } ],
  "sink" : {
    "description" : "org.apache.spark.sql.execution.streaming.ConsoleSink@6e52d816"
  }
}

Hop back to your netcat shell, and begin entering some data:

foo
bar
abc

You should see output similar to the following from your PSTL job logs:

-------------------------------------------
Batch: 0
-------------------------------------------
+------------+
|upper(value)|
+------------+
+------------+
-------------------------------------------
Batch: 1
-------------------------------------------
+------------+
|upper(value)|
+------------+
|        ABC |
+------------+

Notice how only messages beginning with the letter a are processed and emitted to the console. You have just successfully created and deployed your first PSTL Job!