Skip to content

Commit

Permalink
Merge branch 'main' into support-read-csv
Browse files Browse the repository at this point in the history
  • Loading branch information
jitingxu1 authored Sep 23, 2024
2 parents 9acda5c + abb5593 commit e62925b
Show file tree
Hide file tree
Showing 38 changed files with 1,110 additions and 873 deletions.
1 change: 1 addition & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,4 @@ PGPASSWORD="postgres"
MYSQL_PWD="ibis"
MSSQL_SA_PASSWORD="1bis_Testing!"
DRUID_URL="druid://localhost:8082/druid/v2/sql"
SPARK_CONFIG=./docker/spark-connect/conf.properties
1 change: 0 additions & 1 deletion .github/labeler.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ tests:
nix:
- changed-files:
- any-glob-to-any-file: "**/*.nix"
- any-glob-to-any-file: "poetry.lock"

datatypes:
- changed-files:
Expand Down
33 changes: 25 additions & 8 deletions .github/workflows/ibis-backends.yml
Original file line number Diff line number Diff line change
Expand Up @@ -442,13 +442,9 @@ jobs:
- name: download backend data
run: just download-data

- name: show docker compose version
if: matrix.backend.services != null
run: docker compose version

- name: start services
if: matrix.backend.services != null
run: docker compose up --wait ${{ join(matrix.backend.services, ' ') }}
run: just up ${{ join(matrix.backend.services, ' ') }}

- name: install python
uses: actions/setup-python@v5
Expand Down Expand Up @@ -600,7 +596,7 @@ jobs:

- name: start services
if: matrix.backend.services != null
run: docker compose up --wait ${{ join(matrix.backend.services, ' ') }}
run: just up ${{ join(matrix.backend.services, ' ') }}

- name: install python
uses: actions/setup-python@v5
Expand Down Expand Up @@ -653,7 +649,7 @@ jobs:
run: docker compose logs

test_pyspark:
name: PySpark ${{ matrix.pyspark-minor-version }} ubuntu-latest python-${{ matrix.python-version }}
name: PySpark ${{ matrix.tag }} ${{ matrix.pyspark-minor-version }} ubuntu-latest python-${{ matrix.python-version }}
runs-on: ubuntu-latest
strategy:
fail-fast: false
Expand All @@ -665,19 +661,29 @@ jobs:
deps:
- "'pandas@<2'"
- "'numpy@<1.24'"
tag: local
- python-version: "3.11"
pyspark-version: "3.5.2"
pyspark-minor-version: "3.5"
deps:
- "'pandas@>2'"
- "'numpy@>1.24'"
tag: local
- python-version: "3.12"
pyspark-version: "3.5.2"
pyspark-minor-version: "3.5"
deps:
- "'pandas@>2'"
- "'numpy@>1.24'"
- setuptools
tag: local
- python-version: "3.12"
pyspark-version: "3.5.2"
pyspark-minor-version: "3.5"
deps:
- setuptools
tag: remote
SPARK_REMOTE: "sc://localhost:15002"
steps:
- name: checkout
uses: actions/checkout@v4
Expand All @@ -691,6 +697,10 @@ jobs:
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

- name: start services
if: matrix.tag == 'remote'
run: just up spark-connect

- name: download backend data
run: just download-data

Expand Down Expand Up @@ -730,7 +740,14 @@ jobs:
shell: bash
run: just download-iceberg-jar ${{ matrix.pyspark-minor-version }}

- name: run tests
- name: run spark connect tests
if: matrix.tag == 'remote'
run: just ci-check -m pyspark
env:
SPARK_REMOTE: ${{ matrix.SPARK_REMOTE }}

- name: run spark tests
if: matrix.tag == 'local'
run: just ci-check -m pyspark

- name: check that no untracked files were produced
Expand Down
20 changes: 20 additions & 0 deletions compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -589,6 +589,24 @@ services:
networks:
- risingwave

spark-connect:
image: bitnami/spark:3.5.2
ports:
- 15002:15002
command: /opt/bitnami/spark/sbin/start-connect-server.sh --name ibis_testing --packages org.apache.spark:spark-connect_2.12:3.5.2,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2
healthcheck:
test:
- CMD-SHELL
- bash -c 'printf \"GET / HTTP/1.1\n\n\" > /dev/tcp/127.0.0.1/15002; exit $$?;'
interval: 5s
retries: 6
volumes:
- spark-connect:/data
- $PWD/docker/spark-connect/conf.properties:/opt/bitnami/spark/conf/spark-defaults.conf:ro
# - $PWD/docker/spark-connect/log4j2.properties:/opt/bitnami/spark/conf/log4j2.properties:ro
networks:
- spark-connect

networks:
impala:
# docker defaults to naming networks "$PROJECT_$NETWORK" but the Java Hive
Expand All @@ -606,6 +624,7 @@ networks:
exasol:
flink:
risingwave:
spark-connect:

volumes:
clickhouse:
Expand All @@ -617,3 +636,4 @@ volumes:
exasol:
impala:
risingwave:
spark-connect:
12 changes: 12 additions & 0 deletions docker/spark-connect/conf.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
spark.driver.extraJavaOptions=-Duser.timezone=GMT
spark.executor.extraJavaOptions=-Duser.timezone=GMT
spark.jars.packages=org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.5.2
spark.sql.catalog.local.type=hadoop
spark.sql.catalog.local.warehouse=warehouse
spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
spark.sql.legacy.timeParserPolicy=LEGACY
spark.sql.session.timeZone=UTC
spark.sql.streaming.schemaInference=true
spark.ui.enabled=false
spark.ui.showConsoleProgress=false
68 changes: 68 additions & 0 deletions docker/spark-connect/log4j2.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# Set everything to be logged to the console
rootLogger.level = error
rootLogger.appenderRef.stdout.ref = console

# In the pattern layout configuration below, we specify an explicit `%ex` conversion
# pattern for logging Throwables. If this was omitted, then (by default) Log4J would
# implicitly add an `%xEx` conversion pattern which logs stacktraces with additional
# class packaging information. That extra information can sometimes add a substantial
# performance overhead, so we disable it in our default logging config.
# For more information, see SPARK-39361.
appender.console.type = Console
appender.console.name = console
appender.console.target = SYSTEM_ERR
appender.console.layout.type = PatternLayout
appender.console.layout.pattern = %d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n%ex

# Set the default spark-shell/spark-sql log level to WARN. When running the
# spark-shell/spark-sql, the log level for these classes is used to overwrite
# the root logger's log level, so that the user can have different defaults
# for the shell and regular Spark apps.
logger.repl.name = org.apache.spark.repl.Main
logger.repl.level = error

logger.thriftserver.name = org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver
logger.thriftserver.level = error

# Settings to quiet third party logs that are too verbose
logger.jetty1.name = org.sparkproject.jetty
logger.jetty1.level = error
logger.jetty2.name = org.sparkproject.jetty.util.component.AbstractLifeCycle
logger.jetty2.level = error
logger.replexprTyper.name = org.apache.spark.repl.SparkIMain$exprTyper
logger.replexprTyper.level = error
logger.replSparkILoopInterpreter.name = org.apache.spark.repl.SparkILoop$SparkILoopInterpreter
logger.replSparkILoopInterpreter.level = error
logger.parquet1.name = org.apache.parquet
logger.parquet1.level = error
logger.parquet2.name = parquet
logger.parquet2.level = error

# SPARK-9183: Settings to avoid annoying messages when looking up nonexistent UDFs in SparkSQL with Hive support
logger.RetryingHMSHandler.name = org.apache.hadoop.hive.metastore.RetryingHMSHandler
logger.RetryingHMSHandler.level = fatal
logger.FunctionRegistry.name = org.apache.hadoop.hive.ql.exec.FunctionRegistry
logger.FunctionRegistry.level = error

# For deploying Spark ThriftServer
# SPARK-34128: Suppress undesirable TTransportException warnings involved in THRIFT-4805
appender.console.filter.1.type = RegexFilter
appender.console.filter.1.regex = .*Thrift error occurred during processing of message.*
appender.console.filter.1.onMatch = deny
appender.console.filter.1.onMismatch = neutral
5 changes: 5 additions & 0 deletions ibis/backends/bigquery/tests/unit/test_datatypes.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@
param(dt.string, "STRING", id="string"),
param(dt.Array(dt.int64), "ARRAY<INT64>", id="array<int64>"),
param(dt.Array(dt.string), "ARRAY<STRING>", id="array<string>"),
param(
dt.Struct({"a": dt.String(nullable=False)}),
"STRUCT<`a` STRING NOT NULL>",
id="struct<a: !string>",
),
param(
dt.Struct.from_tuples(
[("a", dt.int64), ("b", dt.string), ("c", dt.Array(dt.string))]
Expand Down
29 changes: 24 additions & 5 deletions ibis/backends/polars/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from collections.abc import Iterable, Mapping
from functools import lru_cache
from pathlib import Path
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Literal

import polars as pl

Expand Down Expand Up @@ -492,6 +492,7 @@ def _to_dataframe(
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
engine: Literal["cpu", "gpu"] | pl.GPUEngine = "cpu",
**kwargs: Any,
) -> pl.DataFrame:
self._run_pre_execute_hooks(expr)
Expand All @@ -501,7 +502,7 @@ def _to_dataframe(
limit = ibis.options.sql.default_limit
if limit is not None:
lf = lf.limit(limit)
df = lf.collect(streaming=streaming)
df = lf.collect(streaming=streaming, engine=engine)
# XXX: Polars sometimes returns data with the incorrect column names.
# For now we catch this case and rename them here if needed.
expected_cols = tuple(table_expr.columns)
Expand All @@ -515,10 +516,16 @@ def execute(
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
engine: Literal["cpu", "gpu"] | pl.GPUEngine = "cpu",
**kwargs: Any,
):
df = self._to_dataframe(
expr, params=params, limit=limit, streaming=streaming, **kwargs
expr,
params=params,
limit=limit,
streaming=streaming,
engine=engine,
**kwargs,
)
if isinstance(expr, (ir.Table, ir.Scalar)):
return expr.__pandas_result__(df.to_pandas())
Expand All @@ -540,10 +547,16 @@ def to_polars(
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
engine: Literal["cpu", "gpu"] | pl.GPUEngine = "cpu",
**kwargs: Any,
):
df = self._to_dataframe(
expr, params=params, limit=limit, streaming=streaming, **kwargs
expr,
params=params,
limit=limit,
streaming=streaming,
engine=engine,
**kwargs,
)
return expr.__polars_result__(df)

Expand All @@ -553,12 +566,18 @@ def _to_pyarrow_table(
params: Mapping[ir.Expr, object] | None = None,
limit: int | None = None,
streaming: bool = False,
engine: Literal["cpu", "gpu"] | pl.GPUEngine = "cpu",
**kwargs: Any,
):
from ibis.formats.pyarrow import PyArrowData

df = self._to_dataframe(
expr, params=params, limit=limit, streaming=streaming, **kwargs
expr,
params=params,
limit=limit,
streaming=streaming,
engine=engine,
**kwargs,
)
return PyArrowData.convert_table(df.to_arrow(), expr.as_table().schema())

Expand Down
6 changes: 5 additions & 1 deletion ibis/backends/polars/compiler.py
Original file line number Diff line number Diff line change
Expand Up @@ -1021,7 +1021,11 @@ def array_flatten(op, **kw):
.then(None)
.when(result.list.len() == 0)
.then([])
.otherwise(result.flatten())
# polars doesn't have an efficient API (yet?) for removing one level of
# nesting from an array so we use elementwise evaluation
#
# https://github.com/ibis-project/ibis/issues/10135
.otherwise(result.list.eval(pl.element().flatten()))
)


Expand Down
16 changes: 16 additions & 0 deletions ibis/backends/polars/tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,3 +60,19 @@ def test_memtable_polars_types(con):
res = con.to_polars((t.x + t.y + t.z).name("test"))
sol = (df["x"] + df["y"] + df["z"]).rename("test")
pl.testing.assert_series_equal(res, sol)


@pytest.mark.parametrize("to_method", ["to_pyarrow", "to_polars"])
def test_streaming(con, mocker, to_method):
t = con.table("functional_alltypes")
mocked_collect = mocker.patch("polars.LazyFrame.collect")
getattr(con, to_method)(t, streaming=True)
mocked_collect.assert_called_once_with(streaming=True, engine="cpu")


@pytest.mark.parametrize("to_method", ["to_pyarrow", "to_polars"])
def test_engine(con, mocker, to_method):
t = con.table("functional_alltypes")
mocked_collect = mocker.patch("polars.LazyFrame.collect")
getattr(con, to_method)(t, engine="gpu")
mocked_collect.assert_called_once_with(streaming=False, engine="gpu")
Loading

0 comments on commit e62925b

Please sign in to comment.