Skip to content

Vertica Sink

vnnv01 edited this page Mar 6, 2018 · 1 revision

Features

Write Ahead Log

Options

commitLog.compaction.enabled

When true, compacts the vertica commit log periodically by purging entries which are older than commitLog.compaction.interval.

Defaults to false.

SAVE STREAM foo
TO VERTICA
OPTIONS(
  'commitLog.compaction.enabled'='true'
);

### commitLog.compaction.interval

When [commitLog.compaction.enabled](#commitlogcompactionenabled) is `true`, specifies the frequency at which the commit log is compacted, in micro-batches (e.g., every 10th micro-batch).

Defaults to `...`.

~~~sql
SAVE STREAM foo
TO VERTICA
OPTIONS(
  'commitLog.compaction.interval'='10'
);

### commitLog.table

Specifies the table name of the commit log in vertica. It is important to ensure the commit log schema is deployed in vertica before deploying any vertica streaming queries on your cluster, otherwise they will fail.

TODO: intro needs description of commit log, should cross link here rather than re-explaining.

Defaults to `pstl.commit_log`.

~~~sql
SAVE STREAM foo
TO VERTICA
OPTIONS(
  'commitLog.table'='foo.bar'
);

copy.durationMs

Specifies how long each incremental copy within a micro-batch is given to load data from kafka. Larger values will improve throughput, smaller values in some cases can reduce latency. Based on field experience, aggressively small values may result in incremental copy commands which never successfully load data. In these cases, the final copy command within a micro-batch must load all processed data since incremental copy commands never made progress.

Provide values in the format 50s, 100ms, or 250us, etc. Note there is no space between the number and units. Valid units include:

  • us: microseconds
  • ms: milliseconds
  • s: seconds
  • m: minutes
  • min: minutes
  • h: hours
  • d: days

Defaults to 1s.

SAVE STREAM foo
TO VERTICA
OPTIONS(
  'copy.durationMs'=`500ms`
);

copy.eofTimeoutMs

Specifies how long the final copy within a micro-batch is given to begin receiving messages from kafka. The copy command will be terminated if vertica receives no messages from kafka within this amount of time. Modifying this configuration is a trade-off between failing fast and failing prematurely in scenarios where other infrastructure may be experiencing congestion.

Provide values in the format 50s, 100ms, or 250us, etc. Note there is no space between the number and units. Valid units include:

  • us: microseconds
  • ms: milliseconds
  • s: seconds
  • m: minutes
  • min: minutes
  • h: hours
  • d: days

Defaults to 1s.

SAVE STREAM foo
TO VERTICA
OPTIONS(
  'copy.eofTimeoutMs'='500ms'
);

copy.options

Specifies additional copy options you would like appended to copy commands triggered by your vertica streaming query. In all cases, NO COMMIT will be appended to your copy options with or without your guidance to manage transactional semantics within each micro-batch. This option is typically useful for specifying where to store rejected data, whether or not to load to WOS or ROS, etc. Please refer to vertica's copy documentation for more details.

Defaults to none.

SAVE STREAM foo
TO VERTICA
OPTIONS(
  'copy.options'='REJECTED DATA AS TABLE foo.bar DIRECT'
);

jdbc.url

Specifies the JDBC url your vertica streaming query should use to establish a connection to the vertica cluster. It is recommended users always provide a JDBC url which points to load-balanced DNS, or similar. If a vertica streaming query fails, and it restarted, a new connection to the vertica cluster will be established. If the JDBC url always points to the same node, that node may be down, so the vertica streaming query will remain in a perpetually failed state until the node recovers.

Provide values in the format jdbc:vertica://$host:$port/$databaseName etc. Optionally, connection settings can be provided directly in the JDBC url jdbc:vertica://$host:$port/$databaseName?option1=value&option2=value.

Defaults to jdbc:vertica://localhost:5433/default.

SAVE STREAM foo
TO VERTICA
OPTIONS(
  'jdbc.url'='jdbc:vertica://vertica.acme.com:5433/acme'
);

jdbc.*

Specifies additional connection options to provide when establishing a JDBC connection with the vertica cluster. Please refer to vertica's JDBC connection properties documentation for more details.

Defaults to none.

SAVE STREAM foo
TO VERTICA
OPTIONS(
  'jdbc.user'='pstl',
  'jdbc.password'='changeit'
);

table

Specifies the vertica table this streaming query should target when loading data.

Provide values in the format schema-name.table-name.

Defaults to none.

SAVE STREAM foo
TO VERTICA
OPTIONS(
  'table'='pstl.target_table'
);

topic

Specifies the kafka topic to use when producing processed data. The vertica cluster will load data from this kafka topic.

Defaults to none.

SAVE STREAM foo
TO VERTICA
OPTIONS(
  'topic'='vertica_pstl_target_table'
);

kafka.consumer.*

kafka.producer.*

spark.sql.streaming.vertica.commitLog.compaction.enabled

spark.sql.streaming.vertica.commitLog.compaction.interval

spark.sql.streaming.vertica.commitLog.table

spark.sql.streaming.vertica.copy.durationMs

spark.sql.streaming.vertica.copy.eofTimeoutMs

spark.sql.streaming.vertica.copy.options

spark.sql.streaming.vertica.jdbc.url

spark.sql.streaming.vertica.kafka.bootstrap.servers

spark.sql.streaming.vertica.kafka.consumer.*

spark.sql.streaming.vertica.kafka.producerParallelism

spark.sql.streaming.vertica.kafka.producer.*

Clone this wiki locally