-
Notifications
You must be signed in to change notification settings - Fork 175
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FEAT] connect: read/write -> csv, write -> json
- Loading branch information
1 parent
b8440ed
commit f950b00
Showing
3 changed files
with
66 additions
and
20 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
from __future__ import annotations | ||
|
||
import tempfile | ||
import shutil | ||
import os | ||
|
||
|
||
def test_write_csv(spark_session): | ||
# Create a temporary directory | ||
temp_dir = tempfile.mkdtemp() | ||
try: | ||
# Create DataFrame from range(10) | ||
df = spark_session.range(10) | ||
|
||
# Write DataFrame to CSV directory | ||
csv_dir = os.path.join(temp_dir, "test.csv") | ||
df.write.csv(csv_dir) | ||
|
||
# List all files in the CSV directory | ||
csv_files = [f for f in os.listdir(csv_dir) if f.endswith('.csv')] | ||
print(f"CSV files in directory: {csv_files}") | ||
|
||
# Assert there is at least one CSV file | ||
assert len(csv_files) > 0, "Expected at least one CSV file to be written" | ||
|
||
# Read back from the CSV directory (not specific file) | ||
df_read = spark_session.read.csv(csv_dir) | ||
|
||
# Verify the data is unchanged | ||
df_pandas = df.toPandas() | ||
df_read_pandas = df_read.toPandas() | ||
assert df_pandas["id"].equals(df_read_pandas["id"]), "Data should be unchanged after write/read" | ||
|
||
finally: | ||
# Clean up temp directory | ||
shutil.rmtree(temp_dir) |