Skip to content
This repository has been archived by the owner on Sep 18, 2023. It is now read-only.

[NSE-461]Fix issue when doing rowToArrowColumnar conversion for ArrayType #492

Closed

Conversation

xuechendi
Copy link
Collaborator

This PR is used to fix some small issues when convert rowToArrowColumnar when data contains arrayType

Fixed: #461

Signed-off-by: Chendi Xue [email protected]

@github-actions
Copy link

#461

@xuechendi
Copy link
Collaborator Author

root@sr602:/home/vmagent/app/recdp/gazelle# cat test_columnar_pandas_udf_with_arr.py                                                                                                [18/1834]
import findspark
findspark.init()

import os
import pandas as pd
import numpy as np
from pyspark.sql import *
from pyspark import *
import pyspark.sql.functions as f
from pyspark.sql.types import *
from typing import Iterator, Tuple
from timeit import default_timer as timer
import logging
from pyspark.sql.functions import pandas_udf, PandasUDFType


path_prefix = "file://"
current_path = "/home/vmagent/app/test/test_output"
original_folder = "/home/vmagent/app/test/test"

native_sql_path = "/home/vmagent/app/native-sql-engine/native-sql-engine/core/target/spark-columnar-core-1.2.0-snapshot-jar-with-dependencies.jar"
native_arrow_datasource_path = "/home/vmagent/app/native-sql-engine/arrow-data-source/standard/target/spark-arrow-datasource-standard-1.2.0-snapshot-jar-with-dependencies.jar"

##### 1. Start spark and initialize data processor #####
t0 = timer()
spark = SparkSession.builder.master('local[*]')\
    .appName("udf_column")\
    .config("spark.sql.broadcastTimeout", "7200")\
    .config("spark.cleaner.periodicGC.interval", "10min")\
    .config("spark.driver.extraClassPath",
            f"{native_sql_path}:{native_arrow_datasource_path}")\
    .config("spark.sql.extensions", "com.intel.oap.ColumnarPlugin, com.intel.oap.spark.sql.ArrowWriteExtension")\
    .config("spark.shuffle.manager", "org.apache.spark.shuffle.sort.ColumnarShuffleManager")\
    .config("spark.driver.memory", "200G")\
    .config("spark.driver.memoryOverhead", "300G")\
    .config("spark.memory.offHeap.size", "300G")\
    .config("spark.oap.sql.columnar.arrowudf", "true")\
    .getOrCreate()

# define UDF
def python_udf_get_joined(x):
    if x.size > 0:
        return "_".join(["_".join(i.split()) for i in x])
    else:
        return ""

@pandas_udf('string')
def pandas_udf_get_first(v):
    v1s = []
    for index, token in v.items():
        v1s.append(python_udf_get_joined(token))
    return pd.Series(v1s, dtype=str)


df = spark.read.format("arrow").load(path_prefix + original_folder)
df = df.filter("engaged_with_user_id is not null")
df = df.groupby('engaged_with_user_id').agg(f.collect_list('present_media').alias('posted_tweet_types'), f.first('language').alias('language'))
df = df.withColumn('posted_tweet_types',  pandas_udf_get_first(f.col('posted_tweet_types')))
#df = df.repartition(1)
df.show()

image
image

@xuechendi xuechendi force-pushed the wip_ArrayType_r2c_fix branch from 6256f02 to 6be005d Compare August 26, 2021 07:57
@xuechendi xuechendi closed this Sep 2, 2021
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support ArrayType in Gazelle
1 participant