Skip to content

Commit

Permalink
MapR [SPARK-118] Spark OJAI Python: move MapR DB Connector class impo…
Browse files Browse the repository at this point in the history
…rting in order to fix MapR [ZEP-101] interpreter issue (apache#199)

(cherry picked from commit ef88f8a)
  • Loading branch information
rsotn-mapr authored and Mikhail Gorbov committed Jan 2, 2018
1 parent 996f433 commit 229e8c1
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 65 deletions.
2 changes: 0 additions & 2 deletions python/pyspark/java_gateway.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,5 @@ def killChild():
java_import(gateway.jvm, "org.apache.spark.sql.*")
java_import(gateway.jvm, "org.apache.spark.sql.hive.*")
java_import(gateway.jvm, "scala.Tuple2")
# MapR DB Connector classes
java_import(gateway.jvm, "com.mapr.db.spark.sql.api.java.MapRDBJavaSession")

return gateway
67 changes: 67 additions & 0 deletions python/pyspark/sql/maprpatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
from py4j.java_gateway import java_import, JavaObject

def mapr_session_patch(original_session, wrapped, gw, default_sample_size=1000.0, default_id_field ="_id"):

# Import MapR DB Connector Java classes
java_import(gw.jvm, "com.mapr.db.spark.sql.api.java.MapRDBJavaSession")

mapr_j_session = gw.jvm.MapRDBJavaSession(original_session._jsparkSession)

def loadFromMapRDB(table_name, schema = None, sample_size=default_sample_size):
"""
Loads data from MapR-DB Table.
:param table_name: MapR-DB table path.
:param schema: schema representation.
:param sample_size: sample size.
:return: a DataFrame
>>> spark.loadFromMapRDB("/test-table").collect()
"""
df_reader = original_session.read \
.format("com.mapr.db.spark.sql") \
.option("tableName", table_name) \
.option("sampleSize", sample_size)

if schema:
df_reader.schema(schema)

return df_reader.load()

original_session.loadFromMapRDB = loadFromMapRDB


def saveToMapRDB(dataframe, table_name, id_field_path = default_id_field, create_table = False, bulk_insert = False):
"""
Saves data to MapR-DB Table.
:param dataframe: a DataFrame which will be saved.
:param table_name: MapR-DB table path.
:param id_field_path: field name of document ID.
:param create_table: indicates if table creation required.
:param bulk_insert: indicates bulk insert.
:return: a RDD
>>> spark.saveToMapRDB(df, "/test-table")
"""
DataFrame(mapr_j_session.saveToMapRDB(dataframe._jdf, table_name, id_field_path, create_table, bulk_insert), wrapped)

original_session.saveToMapRDB = saveToMapRDB


def insertToMapRDB(dataframe, table_name, id_field_path = default_id_field, create_table = False, bulk_insert = False):
"""
Inserts data into MapR-DB Table.
:param dataframe: a DataFrame which will be saved.
:param table_name: MapR-DB table path.
:param id_field_path: field name of document ID.
:param create_table: indicates if table creation required.
:param bulk_insert: indicates bulk insert.
:return: a RDD
>>> spark.insertToMapRDB(df, "/test-table")
"""
DataFrame(mapr_j_session.insertToMapRDB(dataframe._jdf, table_name, id_field_path, create_table, bulk_insert), wrapped)

original_session.insertToMapRDB = insertToMapRDB
68 changes: 5 additions & 63 deletions python/pyspark/sql/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@
_infer_schema, _has_nulltype, _merge_type, _create_converter, _parse_datatype_string
from pyspark.sql.utils import install_exception_handler

import pyspark.sql.maprpatch

__all__ = ["SparkSession"]


Expand All @@ -59,67 +61,6 @@ def toDF(self, schema=None, sampleRatio=None):

RDD.toDF = toDF

def _mapr_session_patch(original_session, mapr_j_session, wrapped, default_sample_size=1000.0, default_id_field ="_id"):

def loadFromMapRDB(self, table_name, schema = None, sample_size=default_sample_size):
"""
Loads data from MapR-DB Table.
:param table_name: MapR-DB table path.
:param schema: schema representation.
:param sample_size: sample size.
:return: a DataFrame
>>> spark.loadFromMapRDB("/test-table").collect()
"""
df_reader = original_session.read \
.format("com.mapr.db.spark.sql") \
.option("tableName", table_name) \
.option("sampleSize", sample_size)

if schema:
df_reader.schema(schema)

return df_reader.load()

SparkSession.loadFromMapRDB = loadFromMapRDB

def saveToMapRDB(self, dataframe, table_name, id_field_path = default_id_field, create_table = False, bulk_insert = False):
"""
Saves data to MapR-DB Table.
:param dataframe: a DataFrame which will be saved.
:param table_name: MapR-DB table path.
:param id_field_path: field name of document ID.
:param create_table: indicates if table creation required.
:param bulk_insert: indicates bulk insert.
:return: a RDD
>>> spark.saveToMapRDB(df, "/test-table")
"""
DataFrame(mapr_j_session.saveToMapRDB(dataframe._jdf, table_name, id_field_path, create_table, bulk_insert), wrapped)

SparkSession.saveToMapRDB = saveToMapRDB

# TODO implement
# def updateToMapRDB(self, dataframe, table_name, mutation, id_value, condition):

def insertToMapRDB(self, dataframe, table_name, id_field_path = default_id_field, create_table = False, bulk_insert = False):
"""
Inserts data into MapR-DB Table.
:param dataframe: a DataFrame which will be saved.
:param table_name: MapR-DB table path.
:param id_field_path: field name of document ID.
:param create_table: indicates if table creation required.
:param bulk_insert: indicates bulk insert.
:return: a RDD
>>> spark.insertToMapRDB(df, "/test-table")
"""
DataFrame(mapr_j_session.insertToMapRDB(dataframe._jdf, table_name, id_field_path, create_table, bulk_insert), wrapped)

SparkSession.insertToMapRDB = insertToMapRDB

class SparkSession(object):
"""The entry point to programming Spark with the Dataset and DataFrame API.
Expand Down Expand Up @@ -280,8 +221,9 @@ def __init__(self, sparkContext, jsparkSession=None):
self._wrapped = SQLContext(self._sc, self, self._jwrapped)
_monkey_patch_RDD(self)

mapr_j_session = self._jvm.MapRDBJavaSession(self._jsparkSession)
_mapr_session_patch(self, mapr_j_session, self._wrapped)
### Applying MapR patch
pyspark.sql.maprpatch.mapr_session_patch(self, self._wrapped, gw = self._sc._gateway)

install_exception_handler()
# If we had an instantiated SparkSession attached with a SparkContext
# which is stopped now, we need to renew the instantiated SparkSession.
Expand Down

0 comments on commit 229e8c1

Please sign in to comment.