Skip to content

Commit

Permalink
MapR [SPARK-139] Remove "update" related APIs from connector (apache#203
Browse files Browse the repository at this point in the history
)
  • Loading branch information
Mikhail Gorbov authored and ekrivokonmapr committed Sep 19, 2019
1 parent 788e4eb commit 77fc11d
Show file tree
Hide file tree
Showing 19 changed files with 37 additions and 9,402 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,25 @@ package com.mapr.db.spark.RDD

import com.mapr.db.exceptions.TableNotFoundException
import com.mapr.db.spark.RDD.partitioner.MapRDBPartitioner
import com.mapr.db.spark.condition.{DBQueryCondition, Predicate}
import com.mapr.db.spark.configuration.SerializableConfiguration
import com.mapr.db.spark.dbclient.DBClient
import com.mapr.db.spark.utils.{LoggingTrait, MapRDBUtils}
import com.mapr.db.spark.writers._
import org.apache.hadoop.conf.Configuration
import org.ojai.{Document, DocumentConstants, Value}
import org.ojai.store.DocumentMutation

import org.apache.spark.Partitioner
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.RDD
import org.apache.spark.Partitioner

private[spark] class DocumentRDDFunctions extends LoggingTrait {
protected def saveToMapRDBInternal[T](
rdd: RDD[T],
tablename: String,
createTable: Boolean = false,
bulkInsert: Boolean = false,
function1: (Broadcast[SerializableConfiguration],
Boolean) => ((Iterator[T]) => Unit)): Unit = {
rdd: RDD[T],
tableName: String,
createTable: Boolean = false,
bulkInsert: Boolean = false,
function1: (Broadcast[SerializableConfiguration], Boolean) =>
((Iterator[T]) => Unit)): Unit = {

var isNewAndBulkLoad = (false, false)

val partitioner: Option[Partitioner] = rdd.partitioner
Expand All @@ -40,11 +38,10 @@ private[spark] class DocumentRDDFunctions extends LoggingTrait {

try {
isNewAndBulkLoad =
MapRDBUtils.checkOrCreateTable(tablename, bulkInsert, createTable, keys)
MapRDBUtils.checkOrCreateTable(tableName, bulkInsert, createTable, keys)
} catch {
case e: TableNotFoundException =>
logError(
"Table: " + tablename + " not found and createTable set to: " + createTable)
logError("Table: " + tableName + " not found and createTable set to: " + createTable)
throw e
case any: Exception => throw any
}
Expand All @@ -55,7 +52,7 @@ private[spark] class DocumentRDDFunctions extends LoggingTrait {
rdd.context.broadcast(serializableConf)
rdd.foreachPartition(function1(cnf, isNewAndBulkLoad._2))
if (isNewAndBulkLoad._1 && isNewAndBulkLoad._2) {
MapRDBUtils.setBulkLoad(tablename, false)
MapRDBUtils.setBulkLoad(tableName, false)
}
}
}
Expand Down Expand Up @@ -133,59 +130,6 @@ private[spark] case class OJAIDocumentRDDFunctions[T](rdd: RDD[T])(
}
)
}

def updateToMapRDB(tableName: String,
mutation: (T) => DocumentMutation,
getID: (T) => Value): Unit = {
logDebug(
"updateToMapRDB in OJAIDocumentRDDFunctions is called for table: " + tableName)
this.saveToMapRDBInternal(
rdd,
tableName,
false,
false,
(cnf: Broadcast[SerializableConfiguration], isnewAndBulkLoad: Boolean) =>
(iter: Iterator[T]) =>
if (iter.nonEmpty) {
val writer = TableUpdateWriter(DBClient().getTable(tableName))
while (iter.hasNext) {
val element = iter.next
f.update(mutation(element), getID(element), writer)
}
writer.close()
}
)
}

def updateToMapRDB(tableName: String,
mutation: (T) => DocumentMutation,
getID: (T) => Value,
condition: Predicate): Unit = {
logDebug(
"updateToMapRDB in OJAIDocumentRDDFunctions is called for table: " + tableName)
val queryCondition = DBQueryCondition(condition.build.build())

this.saveToMapRDBInternal(
rdd,
tableName,
false,
false,
(cnf: Broadcast[SerializableConfiguration], isnewAndBulkLoad: Boolean) =>
(iter: Iterator[T]) =>
if (iter.nonEmpty) {
val writer =
TableCheckAndMutateWriter(DBClient().getTable(tableName))
while (iter.hasNext) {
val element = iter.next
f.checkAndUpdate(mutation(element),
queryCondition,
getID(element),
writer)
}
writer.close()
}
)
}
}

private[spark] case class PairedDocumentRDDFunctions[K, V](rdd: RDD[(K, V)])(
Expand Down Expand Up @@ -223,8 +167,8 @@ private[spark] case class PairedDocumentRDDFunctions[K, V](rdd: RDD[(K, V)])(
def insertToMapRDB(tablename: String,
createTable: Boolean = false,
bulkInsert: Boolean = false): Unit = {
logDebug(
"insertToMapRDB in PairedDocumentRDDFunctions is called for table: " +

logDebug("insertToMapRDB in PairedDocumentRDDFunctions is called for table: " +
tablename + " with bulkinsert flag set: " + bulkInsert + " and createTable:" + createTable)

this.saveToMapRDBInternal[(K, V)](
Expand All @@ -245,57 +189,4 @@ private[spark] case class PairedDocumentRDDFunctions[K, V](rdd: RDD[(K, V)])(
}
)
}

def updateToMapRDB(tablename: String,
mutation: (V) => DocumentMutation): Unit = {
logDebug(
"updateToMapRDB in PairedDocumentRDDFunctions is called for table: " + tablename)

this.saveToMapRDBInternal[(K, V)](
rdd,
tablename,
false,
false,
(cnf: Broadcast[SerializableConfiguration], isnewAndBulkLoad: Boolean) =>
(iter: Iterator[(K, V)]) =>
if (iter.nonEmpty) {
val writer = TableUpdateWriter(DBClient().getTable(tablename))
while (iter.hasNext) {
val element = iter.next
f.update(mutation(element._2), f.getValue(element._1), writer)
}
writer.close()
}
)
}

def updateToMapRDB(tablename: String,
mutation: (V) => DocumentMutation,
condition: Predicate): Unit = {
logDebug(
"updateToMapRDB in PairedDocumentRDDFunctions is called for table: " + tablename)

val queryCondition = DBQueryCondition(condition.build.build())

this.saveToMapRDBInternal[(K, V)](
rdd,
tablename,
false,
false,
(cnf: Broadcast[SerializableConfiguration], isnewAndBulkLoad: Boolean) =>
(iter: Iterator[(K, V)]) =>
if (iter.nonEmpty) {
val writer =
TableCheckAndMutateWriter(DBClient().getTable(tablename))
while (iter.hasNext) {
val element = iter.next
f.checkAndMutate(mutation(element._2),
queryCondition,
f.getValue(element._1),
writer)
}
writer.close()
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,12 +118,6 @@ class DefaultSource
idFieldPath,
createTable = true,
bulkInsert = bulkMode)
case "Update" =>
MapRSpark.update(data,
tableName,
idFieldPath,
createTable = false,
bulkInsert = bulkMode)
case _ =>
throw new UnsupportedOperationException("Not supported operation")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,4 @@ private[spark] case class MapRDBDataFrameFunctions(@transient df: DataFrame)
createTable: Boolean = false,
bulkInsert: Boolean = false): Unit =
MapRSpark.insert(df, tableName, idFieldPath, createTable, bulkInsert)

def updateToMapRDB(tableName: String,
mutation: (Row) => DocumentMutation,
getID: (Row) => org.ojai.Value): Unit =
MapRSpark.update(df, tableName, mutation, getID)

def updateToMapRDB(tableName: String,
mutation: (Row) => DocumentMutation,
getID: (Row) => org.ojai.Value,
condition: Predicate): Unit =
MapRSpark.update(df, tableName, mutation, getID, condition)
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,35 +51,6 @@ object MapRSpark {
idFieldPath = idFieldPath)
}

def update[D](dataset: Dataset[D],
tableName: String,
idFieldPath: String,
createTable: Boolean,
bulkInsert: Boolean): Unit = {
val documentRdd = dataset.toDF.rdd.map(MapRSqlUtils.rowToDocument)
documentRdd.saveToMapRDB(tableName,
createTable = createTable,
bulkInsert = bulkInsert,
idFieldPath = idFieldPath)
}

def update(df: DataFrame,
tableName: String,
mutation: (Row) => DocumentMutation,
getID: (Row) => org.ojai.Value): Unit = {
val documentRdd = df.rdd
documentRdd.updateToMapRDB(tableName, mutation, getID)
}

def update(df: DataFrame,
tableName: String,
mutation: (Row) => DocumentMutation,
getID: (Row) => org.ojai.Value,
condition: Predicate): Unit = {
val documentRdd = df.rdd
documentRdd.updateToMapRDB(tableName, mutation, getID, condition)
}

def save(
dfw: DataFrameWriter[_],
tableName: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,52 +14,47 @@ private[spark] sealed trait OJAIValue[T] extends Serializable {
type Self
def getValue(elem: T): Document
def write(doc: Document, getID: (Document) => Value, writer: Writer)
def update(mutation: DocumentMutation, getID: Value, writer: TableUpdateWriter)
def checkAndUpdate(mutation: DocumentMutation, queryCondition: DBQueryCondition, getID: Value, writer: TableCheckAndMutateWriter)
}

private[spark] object OJAIValue extends BaseOJAIValue {

implicit def rowOJAIDocument[T]: OJAIValue[Row] = new OJAIValue[Row] {
override type Self = Row

override def getValue(elem: Row): Document = MapRSqlUtils.rowToDocument(elem).getDoc

override def write(doc: Document, getID: (Document) => Value, writer: Writer) = writer.write(doc, getID(doc))

override def update(mutation: DocumentMutation, getID: Value, writer: TableUpdateWriter) = writer.write(mutation, getID)

override def checkAndUpdate(mutation: DocumentMutation, queryCondition: DBQueryCondition, getID: Value, writer: TableCheckAndMutateWriter): Unit =
writer.write(mutation, queryCondition, getID)
override def getValue(elem: Row): Document =
MapRSqlUtils.rowToDocument(elem).getDoc

override def write(doc: Document,
getID: (Document) => Value,
writer: Writer) = writer.write(doc, getID(doc))
}

implicit def defaultOJAIDocument[T]: OJAIValue[OJAIDocument] = new OJAIValue[OJAIDocument] {
type Self = OJAIDocument
override def getValue(elem: OJAIDocument): Document = elem.getDoc
override def write(doc: Document, getID: (Document)=> Value, writer: Writer) = writer.write(doc, getID(doc))
override def update(mutation: DocumentMutation, getID: Value, writer: TableUpdateWriter) = writer.write(mutation, getID)
override def checkAndUpdate(mutation: DocumentMutation, queryCondition: DBQueryCondition, getID: Value, writer: TableCheckAndMutateWriter): Unit =
writer.write(mutation, queryCondition, getID)
}
implicit def defaultOJAIDocument[T]: OJAIValue[OJAIDocument] =
new OJAIValue[OJAIDocument] {
type Self = OJAIDocument
override def getValue(elem: OJAIDocument): Document = elem.getDoc
override def write(doc: Document,
getID: (Document) => Value,
writer: Writer) = writer.write(doc, getID(doc))
}
}

private[spark] trait BaseOJAIValue {
implicit def overrideDefault[T <: AnyRef]: OJAIValue[T] = new OJAIValue[T] {
type Self = AnyRef
override def getValue(elem: T): Document = BeanCodec.decode(DBClient().newDocumentBuilder(), elem)
override def write(doc: Document, getID: (Document) => Value, writer: Writer) = writer.write(doc, getID(doc))
override def update(mutation: DocumentMutation, getID: Value, writer: TableUpdateWriter) = writer.write(mutation, getID)
override def checkAndUpdate(mutation: DocumentMutation, queryCondition: DBQueryCondition, getID: Value, writer: TableCheckAndMutateWriter): Unit =
writer.write(mutation, queryCondition, getID)
override def getValue(elem: T): Document =
BeanCodec.decode(DBClient().newDocumentBuilder(), elem)
override def write(doc: Document,
getID: (Document) => Value,
writer: Writer) = writer.write(doc, getID(doc))
}

def overrideJavaDefault[T <: AnyRef]: OJAIValue[T] = new OJAIValue[T] {
type Self = AnyRef
override def getValue(elem: T): Document = org.ojai.beans.BeanCodec.decode(DBClient().newDocumentBuilder(), elem)
override def write(doc: Document, getID: (Document) => Value, writer: Writer) = writer.write(doc, getID(doc))
override def update(mutation: DocumentMutation, getID: Value, writer: TableUpdateWriter) = writer.write(mutation, getID)
override def checkAndUpdate(mutation: DocumentMutation, queryCondition: DBQueryCondition, getID: Value, writer: TableCheckAndMutateWriter): Unit =
writer.write(mutation, queryCondition, getID)
override def getValue(elem: T): Document =
org.ojai.beans.BeanCodec.decode(DBClient().newDocumentBuilder(), elem)
override def write(doc: Document,
getID: (Document) => Value,
writer: Writer) = writer.write(doc, getID(doc))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ private[spark] sealed trait OJAIKey[T] extends Serializable {
type Self
def getValue(elem: T): Self
def write(doc: Document, key: Self, table: Writer)
def update(mutation: DocumentMutation, key: Self, table: TableUpdateWriter)
def checkAndMutate(mutation: DocumentMutation,
queryCondition: DBQueryCondition,
key: Self,
Expand All @@ -25,9 +24,6 @@ private[spark] object OJAIKey {
override def getValue(elem: String) = elem
override def write(doc: Document, key: String, table: Writer) =
table.write(doc, key)
override def update(mutation: DocumentMutation,
key: String,
table: TableUpdateWriter) = table.write(mutation, key)
override def checkAndMutate(mutation: DocumentMutation,
queryCondition: DBQueryCondition,
key: String,
Expand All @@ -40,9 +36,7 @@ private[spark] object OJAIKey {
override def getValue(elem: ByteBuffer) = elem
override def write(doc: Document, key: ByteBuffer, table: Writer) =
table.write(doc, key)
override def update(mutation: DocumentMutation,
key: ByteBuffer,
table: TableUpdateWriter) = table.write(mutation, key)

override def checkAndMutate(mutation: DocumentMutation,
queryCondition: DBQueryCondition,
key: ByteBuffer,
Expand All @@ -55,9 +49,6 @@ private[spark] object OJAIKey {
override def getValue(elem: DBBinaryValue) = elem.getByteBuffer()
override def write(doc: Document, key: ByteBuffer, table: Writer) =
table.write(doc, key)
override def update(mutation: DocumentMutation,
key: ByteBuffer,
table: TableUpdateWriter) = table.write(mutation, key)
override def checkAndMutate(mutation: DocumentMutation,
queryCondition: DBQueryCondition,
key: ByteBuffer,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,26 +51,6 @@ private[spark] case class TableInsertWriter(@transient table: DocumentStore)
}
}

private[spark] case class TableUpdateWriter(@transient table: DocumentStore) {

def write(mutation: DocumentMutation, key: ByteBuffer): Unit = {
write(mutation, DBValueBuilderImpl.KeyValueBuilder.initFrom(key))
}

def write(mutation: DocumentMutation, key: String): Unit = {
write(mutation, DBValueBuilderImpl.KeyValueBuilder.initFrom(key))
}

def write(mutation: DocumentMutation, key: org.ojai.Value): Unit = {
table.update(key, mutation)
}

def close(): Unit = {
table.flush()
table.close()
}
}

private[spark] case class TableCheckAndMutateWriter(
@transient table: DocumentStore) {

Expand Down
Loading

0 comments on commit 77fc11d

Please sign in to comment.