Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add new api for hash type #50

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,7 @@ src_managed/
project/boot/
project/plugins/project/
build/*.jar

.DS_Store

scalastyle-output.xml
2 changes: 1 addition & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ language: scala
scala:
- 2.10.4
before_install:
- git clone https://github.com/antirez/redis.git redis_for_spark-redis_test || true
- git clone --branch 3.0.0 https://github.com/antirez/redis.git redis_for_spark-redis_test || true
install:
- make -C redis_for_spark-redis_test -j4
script: make test
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.redislabs</groupId>
<artifactId>spark-redis</artifactId>
<version>0.3.2</version>
<version>0.3.4-SNAPSHOT</version>
<name>Spark-Redis</name>
<description>A Spark library for Redis</description>
<url>http://github.com/RedisLabs/spark-redis</url>
Expand Down
30 changes: 30 additions & 0 deletions src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,32 @@ class RedisKVRDD(prev: RDD[String],
}
}

class RedisKMapRDD(prev: RDD[String]) extends RDD[(String, Map[String, String])](prev) with Keys {

override def getPartitions: Array[Partition] = prev.partitions

override def compute(split: Partition,
context: TaskContext): Iterator[(String, Map[String, String])] = {
val partition: RedisPartition = split.asInstanceOf[RedisPartition]
val sPos = partition.slots._1
val ePos = partition.slots._2
val nodes = partition.redisConfig.getNodesBySlots(sPos, ePos)
val keys = firstParent[String].iterator(split, context)
groupKeysByNode(nodes, keys).flatMap {
x => {
val conn = x._1.endpoint.connect()
val hashKeys = filterKeysByType(conn, x._2, "hash")
val res = hashKeys.map(x => {
val m: Map[String, String] = conn.hgetAll(x).toMap
(x, m)
}).iterator
conn.close
res
}
}.iterator
}
}

class RedisListRDD(prev: RDD[String], val rddType: String) extends RDD[String](prev) with Keys {

override def getPartitions: Array[Partition] = prev.partitions
Expand Down Expand Up @@ -300,6 +326,10 @@ class RedisKeysRDD(sc: SparkContext,
def getHash(): RDD[(String, String)] = {
new RedisKVRDD(this, "hash")
}

def getHashX(): RDD[(String, Map[String, String])] = {
new RedisKMapRDD(this)
}
/**
* filter the 'zset' type keys and get all the elements(without scores) of them
* @return RedisZSetRDD[String]
Expand Down
80 changes: 80 additions & 0 deletions src/main/scala/com/redislabs/provider/redis/redisFunctions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
}
}

def fromRedisHashX(keys: Array[String],
partitionNum: Int = 3)
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))):
RDD[(String, Map[String, String])] = {
fromRedisKeys(keys.toSet.toArray, partitionNum)(redisConfig).getHashX
}

/**
* @param keysOrKeyPattern an array of keys or a key pattern
* @param partitionNum number of partitions
Expand Down Expand Up @@ -236,6 +243,25 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable {
kvs.foreachPartition(partition => setHash(hashName, partition, ttl, redisConfig))
}

/* def toRedisHASHIncrBy(kvs: RDD[(String, Long)], hashName: String, ttl: Int = 0)
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = {
kvs.foreachPartition(partition => incrHash(hashName, partition, ttl, redisConfig))
}

def toRedisHASHIncrByFloat(kvs: RDD[(String, Double)], hashName: String, ttl: Int = 0)
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = {
kvs.foreachPartition(partition => incrFloatHash(hashName, partition, ttl, redisConfig))
}*/

def toRedisHASHIncrByLong(keyMap: RDD[(String, Map[String, Long])], ttl: Int = 0)
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = {
keyMap.foreachPartition(partition => incrHash(partition, ttl, redisConfig))
}

def toRedisHASHIncrByFloat(keyMap: RDD[(String, Map[String, Double])], ttl: Int = 0)
(implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = {
keyMap.foreachPartition(partition => incrFloatHash(partition, ttl, redisConfig))
}
/**
* @param kvs Pair RDD of K/V
* @param zsetName target zset's name which hold all the kvs
Expand Down Expand Up @@ -322,6 +348,60 @@ object RedisContext extends Serializable {
conn.close
}

def incrHash(hashName: String, arr: Iterator[(String, Long)], ttl: Int, redisConfig: RedisConfig): Unit = {
val conn = redisConfig.connectionForKey(hashName)
val pipeline = conn.pipelined()
arr.foreach( x => pipeline.hincrBy(hashName, x._1, x._2))
if (ttl > 0) pipeline.expire(hashName, ttl)
pipeline.sync()
conn.close()
}

def incrHash(arr: Iterator[(String, Map[String, Long])], ttl: Int, redisConfig: RedisConfig): Unit = {
arr.map(kv => (redisConfig.getHost(kv._1), kv)).toArray.groupBy(_._1).
mapValues(a => a.map(p => p._2)).foreach {
x => {
val conn = x._1.endpoint.connect()
val pipeline = conn.pipelined
x._2.foreach(x => {
x._2.foreach(map => {
pipeline.hincrBy(x._1, map._1, map._2)
})
if (ttl > 0) pipeline.expire(x._1, ttl)
})
pipeline.sync
conn.close
}
}
}

def incrFloatHash(hashName: String, arr: Iterator[(String, Double)], ttl: Int, redisConfig: RedisConfig): Unit = {
val conn = redisConfig.connectionForKey(hashName)
val pipeline = conn.pipelined()
arr.foreach( x => pipeline.hincrByFloat(hashName, x._1, x._2))
if (ttl > 0) pipeline.expire(hashName, ttl)
pipeline.sync()
conn.close()
}

def incrFloatHash(arr: Iterator[(String, Map[String, Double])], ttl: Int, redisConfig: RedisConfig): Unit = {
arr.map(kv => (redisConfig.getHost(kv._1), kv)).toArray.groupBy(_._1).
mapValues(a => a.map(p => p._2)).foreach {
x => {
val conn = x._1.endpoint.connect()
val pipeline = conn.pipelined
x._2.foreach(x => {
x._2.foreach(map => {
pipeline.hincrByFloat(x._1, map._1, map._2)
})
if (ttl > 0) pipeline.expire(x._1, ttl)
})
pipeline.sync
conn.close
}
}
}

/**
* @param zsetName
* @param arr k/vs which should be saved in the target host
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,25 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with
getLines.toArray.mkString("\n")


val wcnts = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)).map((_, 1)).
reduceByKey(_ + _).map(x => (x._1, x._2.toString))
val wcnt = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)).map((_, 1)).
reduceByKey(_ + _)
val wcntl = wcnt.map(x => (x._1, x._2.toLong * 2))
val wcntd = wcnt.map(x => (x._1, x._2.toDouble + 0.31))
val wcnts = wcnt.map(x => (x._1, x._2.toString))

val wcntl2 = wcntl.map(kv => {
val m = Map{kv._1 -> kv._2}
("all:words:cnt:hash:incr:long:2", m)
}).reduceByKey((v1, v2) => {
v1 ++ v2
})

val wcntd2 = wcntd.map(kv => {
val m = Map{kv._1 -> kv._2}
("all:words:cnt:hash:incr:float:2", m)
}).reduceByKey((v1, v2) => {
v1 ++ v2
})

val wds = sc.parallelize(content.split("\\W+").filter(!_.isEmpty))

Expand All @@ -39,6 +56,13 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with
sc.toRedisHASH(wcnts, "all:words:cnt:hash")(redisConfig)
sc.toRedisLIST(wds, "all:words:list" )(redisConfig)
sc.toRedisSET(wds, "all:words:set")(redisConfig)
// sc.toRedisHASHIncrBy(wcntl, "all:words:cnt:hash:incr:long")(redisConfig)
// sc.toRedisHASHIncrByFloat(wcntd, "all:words:cnt:hash:incr:float")(redisConfig)

sc.toRedisHASHIncrByLong(wcntl2)(redisConfig)
sc.toRedisHASHIncrByLong(wcntl2)(redisConfig)
sc.toRedisHASHIncrByFloat(wcntd2)(redisConfig)
sc.toRedisHASHIncrByFloat(wcntd2)(redisConfig)
}

test("RedisKVRDD - default(cluster)") {
Expand Down Expand Up @@ -136,6 +160,96 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with
hashContents should be (wcnts)
}

/* test("RedisHashIncrByLongRDD - default(cluster)") {
val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:long")
val hashContents = redisHashRDD.sortByKey().collect
val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1)
hashContents should be (wcntl)
}*/

test("RedisHashIncrByLongRDD - key map - default(cluster)") {
val redisHashRDD = sc.fromRedisHash("all:words:cnt:hash:incr:long:2")
val hashContents = redisHashRDD.sortByKey().collect
val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1)
hashContents should be (wcntl)
}

test("RedisHashIncrByLongRDD - key map - cluster") {
implicit val c: RedisConfig = redisConfig
val redisHashRDD = sc.fromRedisHash("all:words:cnt:hash:incr:long:2")
val hashContents = redisHashRDD.sortByKey().collect
val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1)
hashContents should be (wcntl)
}

test("RedisHashIncrByLongRDD - key map X - default(cluster)") {
val redisHashRDD = sc.fromRedisHashX(Array("all:words:cnt:hash:incr:long:2",
"all:words:cnt:hash:incr:long:2"))
val hashContents = redisHashRDD.collect
val hashValue = hashContents(0)._2.toArray.sortBy(_._1)
val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1)
hashValue should be (wcntl)
}

test("RedisHashIncrByLongRDD - key map X - cluster") {
implicit val c: RedisConfig = redisConfig
val redisHashRDD = sc.fromRedisHashX(Array("all:words:cnt:hash:incr:long:2",
"all:words:cnt:hash:incr:long:2"))
val hashContents = redisHashRDD.collect
val hashValue = hashContents(0)._2.toArray.sortBy(_._1)
val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1)
hashValue should be (wcntl)
}

test("RedisHashIncrByRDD - X - default(cluster)") {
val redisHashRDD = sc.fromRedisHashX(Array("all:words:cnt:hash:incr:long:2",
"all:words:cnt:hash:incr:float:2"))
val hashContents = redisHashRDD.collect.foreach(x => {
if (x._1.equals("all:words:cnt:hash:incr:long:2")) {
val hashValue = x._2.toArray.sortBy(_._1)
val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1)
hashValue should be (wcntl)
} else if (x._1.equals("all:words:cnt:hash:incr:float:2")) {
val hashValue = x._2.toArray.sortBy(_._1)
val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2 + 0.62).toString)).toArray.sortBy(_._1)
hashValue should be (wcntd)
}
})
}

/* test("RedisHashIncrByLongRDD - cluster") {
implicit val c: RedisConfig = redisConfig
val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:long")
val hashContents = redisHashRDD.sortByKey().collect
val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1)
hashContents should be (wcntl)
}*/

/* test("RedisHashIncrByFloatRDD - default(cluster)") {
val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:float")
val hashContents = redisHashRDD.sortByKey().collect
val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 0.31).toString)).toArray.sortBy(_._1)
hashContents should be (wcntd)
}

test("RedisHashIncrByFloatRDD - cluster") {
implicit val c: RedisConfig = redisConfig
val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:float")
val hashContents = redisHashRDD.sortByKey().collect
val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1).
map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 0.31).toString)).toArray.sortBy(_._1)
hashContents should be (wcntd)
}*/

test("RedisListRDD - default(cluster)") {
val redisListRDD = sc.fromRedisList( "all:words:list")
val listContents = redisListRDD.sortBy(x => x).collect
Expand Down