Skip to content

Commit

Permalink
[FEAT] connect: createDataFrame (WIP) help needed
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewgazelka committed Nov 21, 2024
1 parent 3a8c076 commit f88c39c
Show file tree
Hide file tree
Showing 4 changed files with 54 additions and 1 deletion.
6 changes: 5 additions & 1 deletion src/daft-connect/src/translation/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,13 @@ use eyre::{bail, Context};
use spark_connect::{relation::RelType, Relation};
use tracing::warn;

use crate::translation::logical_plan::{aggregate::aggregate, project::project, range::range};
use crate::translation::logical_plan::{aggregate::aggregate, project::project, range::range, to_df::to_df};

mod aggregate;
mod project;
mod range;
mod to_df;
mod local_relation;

pub fn to_logical_plan(relation: Relation) -> eyre::Result<LogicalPlanBuilder> {
if let Some(common) = relation.common {
Expand All @@ -24,6 +26,8 @@ pub fn to_logical_plan(relation: Relation) -> eyre::Result<LogicalPlanBuilder> {
RelType::Aggregate(a) => {
aggregate(*a).wrap_err("Failed to apply aggregate to logical plan")
}
RelType::ToDf(t) => to_df(*t).wrap_err("Failed to apply to_df to logical plan"),
RelType::LocalRelation(l) => local_relation(*l).wrap_err("Failed to apply local_relation to logical plan"),
plan => bail!("Unsupported relation type: {plan:?}"),
}
}
10 changes: 10 additions & 0 deletions src/daft-connect/src/translation/logical_plan/local_relation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
use daft_logical_plan::LogicalPlanBuilder;

pub fn local_relation(
local_relation: spark_connect::LocalRelation,
) -> eyre::Result<LogicalPlanBuilder> {
let spark_connect::LocalRelation {
data,
schema,
} = local_relation;
}
26 changes: 26 additions & 0 deletions src/daft-connect/src/translation/logical_plan/to_df.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
use eyre::{bail, WrapErr};
use daft_logical_plan::LogicalPlanBuilder;
use crate::translation::to_logical_plan;

pub fn to_df(to_df: spark_connect::ToDf) -> eyre::Result<LogicalPlanBuilder> {
let spark_connect::ToDf {
input,
column_names,
} = to_df;

let Some(input) = input else {
bail!("Input is required")
};

let plan = to_logical_plan(*input)
.wrap_err_with(|| format!("Failed to translate relation to logical plan: {input:?}"))?;

let column_names: Vec<_> = column_names
.iter()
.map(|name| daft_dsl::col(name))
.collect();

let plan = plan.with_columns(column_names)?

Ok(plan)
}
13 changes: 13 additions & 0 deletions tests/connect/test_create_df.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
from __future__ import annotations


def test_create_df(spark_session):
# Create a DataFrame with duplicate values
data = [(1,), (2,), (2,), (3,), (3,), (3,)]
df = spark_session.createDataFrame(data, ["value"])

# Collect and verify results
result = df.collect()

# Verify the DataFrame has the expected number of rows and values
assert sorted([row.value for row in result]) == [1, 2, 2, 3, 3, 3]

0 comments on commit f88c39c

Please sign in to comment.