Skip to content

Commit

Permalink
readme updates (#68)
Browse files Browse the repository at this point in the history
* readme updates

* iceberg documentation. Bakes pyiceberg config into dev for ease of getting started.

refs #51
  • Loading branch information
turbolytics authored Jan 15, 2025
1 parent 4e4720f commit 9c36467
Show file tree
Hide file tree
Showing 7 changed files with 85 additions and 7 deletions.
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ test: test-unit test-integration

.PHONY: test-unit
test-unit:
PYICEBERG_HOME=$(shell pwd)/dev/config/ pytest --ignore=tests/benchmarks --ignore=tests/integration tests
PYICEBERG_HOME=$(shell pwd)/tests/config/ pytest --ignore=tests/benchmarks --ignore=tests/integration tests

.PHONY: test-image
test-image: docker-image
pytest tests/release

.PHONY: test-integration
test-integration:
PYICEBERG_HOME=$(shell pwd)/dev/config/ pytest tests/integration
PYICEBERG_HOME=$(shell pwd)/tests/config/ pytest tests/integration

.PHONY: start-backing-services
start-backing-services:
Expand Down
65 changes: 62 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,14 @@ SQLFlow executes SQL against streaming data, such as Kafka or webhooks. Think of
- Sinks
- [x] Kafka Producer
- [x] Stdout
- [x] Local Disk
- [ ] Postgres
- [ ] Local Disk
- [ ] S3
- Serialization
- [x] JSON Input
- [x] JSON Output
- [ ] Parquet Output
- [x] Parquet Output
- [x] Iceberg Output
- Handlers
- [x] Memory Persistence
- [ ] Disk Persistence
Expand Down Expand Up @@ -118,12 +119,70 @@ SQLFlow supports DuckDB over websocket. Running SQL against the [Bluesky firehos

<img width="1280" alt="bluesky firehose config" src="https://github.com/user-attachments/assets/86a46875-3cfa-46d3-ab08-1457c29115d9" />

Invoke sql-flow using the configuration listed above:
The following command starts a bluesky consumer and prints every post to stdout:

```
docker run -v $(pwd)/dev/config/examples:/examples turbolytics/sql-flow:latest run /examples/bluesky/bluesky.raw.stdout.yml
```

![output](https://github.com/user-attachments/assets/185c6453-debc-439a-a2b9-ed20fdc82851)

[Checkout the configuration files here](https://github.com/turbolytics/sql-flow/tree/main/dev/config/examples/bluesky)

## Streaming to Iceberg

SQLFlow supports writing to Iceberg tables using [pyiceberg](https://py.iceberg.apache.org/).

The following configuration writes to an Iceberg table using a local SQLite catalog:

- Initialize the SQLite iceberg catalog and test table, which will create the directories referenced in the file above
```
python3 cmd/setup-iceberg-local.py setup
created default.city_events
created default.bluesky_post_events
Catalog setup complete.
```

- Start Kafka Locally
```
docker-compose -f dev/kafka-single.yml up -d
```

- Publish Test Messages to Kafka
```
python3 cmd/publish-test-data.py --num-messages=5000 --topic="input-kafka-mem-iceberg"
```

- Run SQLFlow, which will read from kafka and write to the iceberg table locally
```
docker run \
-e SQLFLOW_KAFKA_BROKERS=host.docker.internal:29092 \
-e PYICEBERG_HOME=/tmp/iceberg/
-v $(pwd)/dev/config/iceberg/.pyiceberg.yaml:/tmp/iceberg/.pyiceberg.yaml \
-v /tmp/sqlflow/warehouse:/tmp/sqlflow/warehouse
-v $(pwd)/dev/config/examples:/examples
turbolytics/sql-flow:latest run /examples/kafka.mem.iceberg.yml
```

- Verify iceberg data was written by querying it with duckdb
```
% duckdb
v1.1.3 19864453f7
Enter ".help" for usage hints.
Connected to a transient in-memory database.
Use ".open FILENAME" to reopen on a persistent database.
D select count(*) from '/tmp/sqlflow/warehouse/default.db/city_events/data/*.parquet';
┌──────────────┐
│ count_star() │
│ int64 │
├──────────────┤
│ 5000 │
└──────────────┘
```




## Recipes

Coming Soon, until then checkout:
Expand Down
15 changes: 15 additions & 0 deletions dev/config/examples/bluesky/bluesky.raw.stdout.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
# Consumes the bluesky firehose
pipeline:
batch_size: 1
source:
type: websocket
websocket:
uri: 'wss://jetstream2.us-east.bsky.network/subscribe?wantedCollections=app.bsky.feed.post'

handler:
type: 'handlers.InferredMemBatch'
sql: |
SELECT * FROM batch
sink:
type: console
2 changes: 1 addition & 1 deletion dev/config/examples/kafka.mem.iceberg.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ pipeline:
source:
type: kafka
kafka:
brokers: [{{ kafka_brokers|default('localhost:9092') }}]
brokers: [{{ SQLFLOW_KAFKA_BROKERS|default('localhost:9092') }}]
group_id: test
auto_offset_reset: earliest
topics:
Expand Down
4 changes: 4 additions & 0 deletions dev/config/iceberg/.pyiceberg.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
catalog:
sqlflow_test:
uri: sqlite:////tmp/sqlflow/warehouse/catalog.db
warehouse: file:////tmp/sqlflow/warehouse
File renamed without changes.
2 changes: 1 addition & 1 deletion tests/integration/test_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ def test_kafka_mem_iceberg(bootstrap_server):
conf = new_from_path(
path=os.path.join(settings.CONF_DIR, 'examples', 'kafka.mem.iceberg.yml'),
setting_overrides={
'kafka_brokers': bootstrap_server,
'SQLFLOW_KAFKA_BROKERS': bootstrap_server,
'catalog_name': catalog_name,
'table_name': table_name,
},
Expand Down

0 comments on commit 9c36467

Please sign in to comment.