From 8d8b4a5e73a5ff4ca20986b8efee38e00b0f23cf Mon Sep 17 00:00:00 2001 From: Rostyslav Sotnychenko Date: Wed, 13 Dec 2017 16:27:39 +0200 Subject: [PATCH] MapR [SPARK-118] Spark OJAI Python: move MapR DB Connector class importing in order to fix MapR [ZEP-101] interpreter issue (#199) --- python/pyspark/java_gateway.py | 2 - python/pyspark/sql/maprpatch.py | 67 ++++++++++++++++++++++++++++++++ python/pyspark/sql/session.py | 68 +++------------------------------ 3 files changed, 72 insertions(+), 65 deletions(-) create mode 100644 python/pyspark/sql/maprpatch.py diff --git a/python/pyspark/java_gateway.py b/python/pyspark/java_gateway.py index 49d4772ade679..feb6b7bd6aa3d 100644 --- a/python/pyspark/java_gateway.py +++ b/python/pyspark/java_gateway.py @@ -146,8 +146,6 @@ def killChild(): java_import(gateway.jvm, "org.apache.spark.sql.api.python.*") java_import(gateway.jvm, "org.apache.spark.sql.hive.*") java_import(gateway.jvm, "scala.Tuple2") - # MapR DB Connector classes - java_import(gateway.jvm, "com.mapr.db.spark.sql.api.java.MapRDBJavaSession") return gateway diff --git a/python/pyspark/sql/maprpatch.py b/python/pyspark/sql/maprpatch.py new file mode 100644 index 0000000000000..2388e0a94c413 --- /dev/null +++ b/python/pyspark/sql/maprpatch.py @@ -0,0 +1,67 @@ +from py4j.java_gateway import java_import, JavaObject + +def mapr_session_patch(original_session, wrapped, gw, default_sample_size=1000.0, default_id_field ="_id"): + + # Import MapR DB Connector Java classes + java_import(gw.jvm, "com.mapr.db.spark.sql.api.java.MapRDBJavaSession") + + mapr_j_session = gw.jvm.MapRDBJavaSession(original_session._jsparkSession) + + def loadFromMapRDB(table_name, schema = None, sample_size=default_sample_size): + """ + Loads data from MapR-DB Table. + + :param table_name: MapR-DB table path. + :param schema: schema representation. + :param sample_size: sample size. + :return: a DataFrame + + >>> spark.loadFromMapRDB("/test-table").collect() + """ + df_reader = original_session.read \ + .format("com.mapr.db.spark.sql") \ + .option("tableName", table_name) \ + .option("sampleSize", sample_size) + + if schema: + df_reader.schema(schema) + + return df_reader.load() + + original_session.loadFromMapRDB = loadFromMapRDB + + + def saveToMapRDB(dataframe, table_name, id_field_path = default_id_field, create_table = False, bulk_insert = False): + """ + Saves data to MapR-DB Table. + + :param dataframe: a DataFrame which will be saved. + :param table_name: MapR-DB table path. + :param id_field_path: field name of document ID. + :param create_table: indicates if table creation required. + :param bulk_insert: indicates bulk insert. + :return: a RDD + + >>> spark.saveToMapRDB(df, "/test-table") + """ + DataFrame(mapr_j_session.saveToMapRDB(dataframe._jdf, table_name, id_field_path, create_table, bulk_insert), wrapped) + + original_session.saveToMapRDB = saveToMapRDB + + + def insertToMapRDB(dataframe, table_name, id_field_path = default_id_field, create_table = False, bulk_insert = False): + """ + Inserts data into MapR-DB Table. + + :param dataframe: a DataFrame which will be saved. + :param table_name: MapR-DB table path. + :param id_field_path: field name of document ID. + :param create_table: indicates if table creation required. + :param bulk_insert: indicates bulk insert. + :return: a RDD + + >>> spark.insertToMapRDB(df, "/test-table") + """ + DataFrame(mapr_j_session.insertToMapRDB(dataframe._jdf, table_name, id_field_path, create_table, bulk_insert), wrapped) + + original_session.insertToMapRDB = insertToMapRDB diff --git a/python/pyspark/sql/session.py b/python/pyspark/sql/session.py index 15dd961e90b62..0f2c37b731571 100644 --- a/python/pyspark/sql/session.py +++ b/python/pyspark/sql/session.py @@ -38,6 +38,8 @@ _parse_datatype_string from pyspark.sql.utils import install_exception_handler +import pyspark.sql.maprpatch + __all__ = ["SparkSession"] @@ -59,67 +61,6 @@ def toDF(self, schema=None, sampleRatio=None): RDD.toDF = toDF -def _mapr_session_patch(original_session, mapr_j_session, wrapped, default_sample_size=1000.0, default_id_field ="_id"): - - def loadFromMapRDB(self, table_name, schema = None, sample_size=default_sample_size): - """ - Loads data from MapR-DB Table. - - :param table_name: MapR-DB table path. - :param schema: schema representation. - :param sample_size: sample size. - :return: a DataFrame - - >>> spark.loadFromMapRDB("/test-table").collect() - """ - df_reader = original_session.read \ - .format("com.mapr.db.spark.sql") \ - .option("tableName", table_name) \ - .option("sampleSize", sample_size) - - if schema: - df_reader.schema(schema) - - return df_reader.load() - - SparkSession.loadFromMapRDB = loadFromMapRDB - - def saveToMapRDB(self, dataframe, table_name, id_field_path = default_id_field, create_table = False, bulk_insert = False): - """ - Saves data to MapR-DB Table. - - :param dataframe: a DataFrame which will be saved. - :param table_name: MapR-DB table path. - :param id_field_path: field name of document ID. - :param create_table: indicates if table creation required. - :param bulk_insert: indicates bulk insert. - :return: a RDD - - >>> spark.saveToMapRDB(df, "/test-table") - """ - DataFrame(mapr_j_session.saveToMapRDB(dataframe._jdf, table_name, id_field_path, create_table, bulk_insert), wrapped) - - SparkSession.saveToMapRDB = saveToMapRDB - - # TODO implement - # def updateToMapRDB(self, dataframe, table_name, mutation, id_value, condition): - - def insertToMapRDB(self, dataframe, table_name, id_field_path = default_id_field, create_table = False, bulk_insert = False): - """ - Inserts data into MapR-DB Table. - - :param dataframe: a DataFrame which will be saved. - :param table_name: MapR-DB table path. - :param id_field_path: field name of document ID. - :param create_table: indicates if table creation required. - :param bulk_insert: indicates bulk insert. - :return: a RDD - - >>> spark.insertToMapRDB(df, "/test-table") - """ - DataFrame(mapr_j_session.insertToMapRDB(dataframe._jdf, table_name, id_field_path, create_table, bulk_insert), wrapped) - - SparkSession.insertToMapRDB = insertToMapRDB class SparkSession(object): """The entry point to programming Spark with the Dataset and DataFrame API. @@ -285,8 +226,9 @@ def __init__(self, sparkContext, jsparkSession=None): self._wrapped = SQLContext(self._sc, self, self._jwrapped) _monkey_patch_RDD(self) - mapr_j_session = self._jvm.MapRDBJavaSession(self._jsparkSession) - _mapr_session_patch(self, mapr_j_session, self._wrapped) + ### Applying MapR patch + pyspark.sql.maprpatch.mapr_session_patch(self, self._wrapped, gw = self._sc._gateway) + install_exception_handler() # If we had an instantiated SparkSession attached with a SparkContext # which is stopped now, we need to renew the instantiated SparkSession.