From b13888c8c3248fb7acf518401ec81b5d40aaa08c Mon Sep 17 00:00:00 2001 From: ZhouQiang Date: Wed, 8 Mar 2017 19:52:57 +0800 Subject: [PATCH 1/6] add new func : toRedisHASHIncr, toRedisHASHIncrFloat --- .../provider/redis/redisFunctions.scala | 28 ++++++++++++ .../redis/rdd/RedisRDDClusterSuite.scala | 43 ++++++++++++++++++- 2 files changed, 69 insertions(+), 2 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index 0a3de12d..64560d7e 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -236,6 +236,16 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { kvs.foreachPartition(partition => setHash(hashName, partition, ttl, redisConfig)) } + def toRedisHASHIncrBy(kvs: RDD[(String, Long)], hashName: String, ttl: Int = 0) + (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = { + kvs.foreachPartition(partition => incrHash(hashName, partition, ttl, redisConfig)) + } + + def toRedisHASHIncrByFloat(kvs: RDD[(String, Double)], hashName: String, ttl: Int = 0) + (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = { + kvs.foreachPartition(partition => incrFloatHash(hashName, partition, ttl, redisConfig)) + } + /** * @param kvs Pair RDD of K/V * @param zsetName target zset's name which hold all the kvs @@ -322,6 +332,24 @@ object RedisContext extends Serializable { conn.close } + def incrHash(hashName: String, arr: Iterator[(String, Long)], ttl: Int, redisConfig: RedisConfig): Unit = { + val conn = redisConfig.connectionForKey(hashName) + val pipeline = conn.pipelined() + arr.foreach( x => pipeline.hincrBy(hashName, x._1, x._2)) + if (ttl > 0) pipeline.expire(hashName, ttl) + pipeline.sync() + conn.close() + } + + def incrFloatHash(hashName: String, arr: Iterator[(String, Double)], ttl: Int, redisConfig: RedisConfig): Unit = { + val conn = redisConfig.connectionForKey(hashName) + val pipeline = conn.pipelined() + arr.foreach( x => pipeline.hincrByFloat(hashName, x._1, x._2)) + if (ttl > 0) pipeline.expire(hashName, ttl) + pipeline.sync() + conn.close() + } + /** * @param zsetName * @param arr k/vs which should be saved in the target host diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala index d439a043..1d764e86 100644 --- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala @@ -19,8 +19,11 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with getLines.toArray.mkString("\n") - val wcnts = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)).map((_, 1)). - reduceByKey(_ + _).map(x => (x._1, x._2.toString)) + val wcnt = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)).map((_, 1)). + reduceByKey(_ + _) + val wcntl = wcnt.map(x => (x._1, x._2.toLong * 2)) + val wcntd = wcnt.map(x => (x._1, x._2.toDouble * 0.31)) + val wcnts = wcnt.map(x => (x._1, x._2.toString)) val wds = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)) @@ -39,6 +42,8 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with sc.toRedisHASH(wcnts, "all:words:cnt:hash")(redisConfig) sc.toRedisLIST(wds, "all:words:list" )(redisConfig) sc.toRedisSET(wds, "all:words:set")(redisConfig) + sc.toRedisHASHIncrBy(wcntl, "all:words:cnt:hash:incr:long")(redisConfig) + sc.toRedisHASHIncrByFloat(wcntd, "all:words:cnt:hash:incr:float")(redisConfig) } test("RedisKVRDD - default(cluster)") { @@ -136,6 +141,40 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with hashContents should be (wcnts) } + test("RedisHashIncrByLongRDD - default(cluster)") { + val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:long") + val hashContents = redisHashRDD.sortByKey().collect + val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1) + hashContents should be (wcntl) + } + + test("RedisHashIncrByLongRDD - cluster") { + implicit val c: RedisConfig = redisConfig + val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:long") + val hashContents = redisHashRDD.sortByKey().collect + val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1) + hashContents should be (wcntl) + } + + test("RedisHashIncrByFloatRDD - default(cluster)") { + val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:float") + val hashContents = redisHashRDD.sortByKey().collect + val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 0.31).toString)).toArray.sortBy(_._1) + hashContents should be (wcntd) + } + + test("RedisHashIncrByFloatRDD - cluster") { + implicit val c: RedisConfig = redisConfig + val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:float") + val hashContents = redisHashRDD.sortByKey().collect + val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 0.31).toString)).toArray.sortBy(_._1) + hashContents should be (wcntd) + } + test("RedisListRDD - default(cluster)") { val redisListRDD = sc.fromRedisList( "all:words:list") val listContents = redisListRDD.sortBy(x => x).collect From 6b60f0a3b4df9235cbcc7650d3e6da1b323074f1 Mon Sep 17 00:00:00 2001 From: ZhouQiang Date: Mon, 13 Mar 2017 17:28:18 +0800 Subject: [PATCH 2/6] add hash support with multi keys in a RDD --- .../provider/redis/redisFunctions.scala | 47 ++++++++++++++++++- .../redis/rdd/RedisRDDClusterSuite.scala | 40 +++++++++++++--- 2 files changed, 79 insertions(+), 8 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index 64560d7e..ffe71b71 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -236,7 +236,7 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { kvs.foreachPartition(partition => setHash(hashName, partition, ttl, redisConfig)) } - def toRedisHASHIncrBy(kvs: RDD[(String, Long)], hashName: String, ttl: Int = 0) + /* def toRedisHASHIncrBy(kvs: RDD[(String, Long)], hashName: String, ttl: Int = 0) (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = { kvs.foreachPartition(partition => incrHash(hashName, partition, ttl, redisConfig)) } @@ -244,8 +244,17 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { def toRedisHASHIncrByFloat(kvs: RDD[(String, Double)], hashName: String, ttl: Int = 0) (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = { kvs.foreachPartition(partition => incrFloatHash(hashName, partition, ttl, redisConfig)) + }*/ + + def toRedisHASHIncrByLong(keyMap: RDD[(String, Map[String, Long])], ttl: Int = 0) + (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = { + keyMap.foreachPartition(partition => incrHash(partition, ttl, redisConfig)) } + def toRedisHASHIncrByFloat(keyMap: RDD[(String, Map[String, Double])], ttl: Int = 0) + (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): Unit = { + keyMap.foreachPartition(partition => incrFloatHash(partition, ttl, redisConfig)) + } /** * @param kvs Pair RDD of K/V * @param zsetName target zset's name which hold all the kvs @@ -341,6 +350,24 @@ object RedisContext extends Serializable { conn.close() } + def incrHash(arr: Iterator[(String, Map[String, Long])], ttl: Int, redisConfig: RedisConfig): Unit = { + arr.map(kv => (redisConfig.getHost(kv._1), kv)).toArray.groupBy(_._1). + mapValues(a => a.map(p => p._2)).foreach { + x => { + val conn = x._1.endpoint.connect() + val pipeline = conn.pipelined + x._2.foreach(x => { + x._2.foreach(map => { + pipeline.hincrBy(x._1, map._1, map._2) + }) + if (ttl > 0) pipeline.expire(x._1, ttl) + }) + pipeline.sync + conn.close + } + } + } + def incrFloatHash(hashName: String, arr: Iterator[(String, Double)], ttl: Int, redisConfig: RedisConfig): Unit = { val conn = redisConfig.connectionForKey(hashName) val pipeline = conn.pipelined() @@ -350,6 +377,24 @@ object RedisContext extends Serializable { conn.close() } + def incrFloatHash(arr: Iterator[(String, Map[String, Double])], ttl: Int, redisConfig: RedisConfig): Unit = { + arr.map(kv => (redisConfig.getHost(kv._1), kv)).toArray.groupBy(_._1). + mapValues(a => a.map(p => p._2)).foreach { + x => { + val conn = x._1.endpoint.connect() + val pipeline = conn.pipelined + x._2.foreach(x => { + x._2.foreach(map => { + pipeline.hincrByFloat(x._1, map._1, map._2) + }) + if (ttl > 0) pipeline.expire(x._1, ttl) + }) + pipeline.sync + conn.close + } + } + } + /** * @param zsetName * @param arr k/vs which should be saved in the target host diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala index 1d764e86..85e817d2 100644 --- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala @@ -25,6 +25,13 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with val wcntd = wcnt.map(x => (x._1, x._2.toDouble * 0.31)) val wcnts = wcnt.map(x => (x._1, x._2.toString)) + val wcntl2 = wcntl.map(kv => { + val m = Map{kv._1 -> kv._2} + ("all:words:cnt:hash:incr:long:2", m) + }).reduceByKey((v1, v2) => { + v1 ++ v2 + }) + val wds = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)) // THERE IS NOT AUTH FOR CLUSTER @@ -42,8 +49,10 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with sc.toRedisHASH(wcnts, "all:words:cnt:hash")(redisConfig) sc.toRedisLIST(wds, "all:words:list" )(redisConfig) sc.toRedisSET(wds, "all:words:set")(redisConfig) - sc.toRedisHASHIncrBy(wcntl, "all:words:cnt:hash:incr:long")(redisConfig) - sc.toRedisHASHIncrByFloat(wcntd, "all:words:cnt:hash:incr:float")(redisConfig) + // sc.toRedisHASHIncrBy(wcntl, "all:words:cnt:hash:incr:long")(redisConfig) + // sc.toRedisHASHIncrByFloat(wcntd, "all:words:cnt:hash:incr:float")(redisConfig) + + sc.toRedisHASHIncrByLong(wcntl2)(redisConfig) } test("RedisKVRDD - default(cluster)") { @@ -141,24 +150,41 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with hashContents should be (wcnts) } - test("RedisHashIncrByLongRDD - default(cluster)") { + /* test("RedisHashIncrByLongRDD - default(cluster)") { val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:long") val hashContents = redisHashRDD.sortByKey().collect val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1) hashContents should be (wcntl) + }*/ + + test("RedisHashIncrByLongRDD - key map - default(cluster)") { + val redisHashRDD = sc.fromRedisHash("all:words:cnt:hash:incr:long:2") + val hashContents = redisHashRDD.sortByKey().collect + val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1) + hashContents should be (wcntl) } - test("RedisHashIncrByLongRDD - cluster") { + test("RedisHashIncrByLongRDD - key map - cluster") { implicit val c: RedisConfig = redisConfig - val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:long") + val redisHashRDD = sc.fromRedisHash("all:words:cnt:hash:incr:long:2") val hashContents = redisHashRDD.sortByKey().collect val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1) hashContents should be (wcntl) } - test("RedisHashIncrByFloatRDD - default(cluster)") { + /* test("RedisHashIncrByLongRDD - cluster") { + implicit val c: RedisConfig = redisConfig + val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:long") + val hashContents = redisHashRDD.sortByKey().collect + val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1) + hashContents should be (wcntl) + }*/ + + /* test("RedisHashIncrByFloatRDD - default(cluster)") { val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:float") val hashContents = redisHashRDD.sortByKey().collect val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). @@ -173,7 +199,7 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 0.31).toString)).toArray.sortBy(_._1) hashContents should be (wcntd) - } + }*/ test("RedisListRDD - default(cluster)") { val redisListRDD = sc.fromRedisList( "all:words:list") From d79fc990fecb82a89879923e5d55a3af5d5cad35 Mon Sep 17 00:00:00 2001 From: ZhouQiang Date: Mon, 13 Mar 2017 20:54:37 +0800 Subject: [PATCH 3/6] deploy --- .gitignore | 4 ++++ pom.xml | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/.gitignore b/.gitignore index 52ebc5d2..d18a4343 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,7 @@ src_managed/ project/boot/ project/plugins/project/ build/*.jar + +.DS_Store + +scalastyle-output.xml diff --git a/pom.xml b/pom.xml index da2d0c05..0501195a 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.redislabs spark-redis - 0.3.2 + 0.3.3-SNAPSHOT Spark-Redis A Spark library for Redis http://github.com/RedisLabs/spark-redis From b493098ebccfd1442f6d67cd0f3e604e104fd5d8 Mon Sep 17 00:00:00 2001 From: ZhouQiang Date: Tue, 14 Mar 2017 11:29:22 +0800 Subject: [PATCH 4/6] add hash support with multi keys in a RDD --- .../provider/redis/rdd/RedisRDD.scala | 30 ++++++++++ .../provider/redis/redisFunctions.scala | 7 +++ .../redis/rdd/RedisRDDClusterSuite.scala | 55 ++++++++++++++++++- 3 files changed, 89 insertions(+), 3 deletions(-) diff --git a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala index 6ba23f6e..de8bab94 100644 --- a/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala +++ b/src/main/scala/com/redislabs/provider/redis/rdd/RedisRDD.scala @@ -65,6 +65,32 @@ class RedisKVRDD(prev: RDD[String], } } +class RedisKMapRDD(prev: RDD[String]) extends RDD[(String, Map[String, String])](prev) with Keys { + + override def getPartitions: Array[Partition] = prev.partitions + + override def compute(split: Partition, + context: TaskContext): Iterator[(String, Map[String, String])] = { + val partition: RedisPartition = split.asInstanceOf[RedisPartition] + val sPos = partition.slots._1 + val ePos = partition.slots._2 + val nodes = partition.redisConfig.getNodesBySlots(sPos, ePos) + val keys = firstParent[String].iterator(split, context) + groupKeysByNode(nodes, keys).flatMap { + x => { + val conn = x._1.endpoint.connect() + val hashKeys = filterKeysByType(conn, x._2, "hash") + val res = hashKeys.map(x => { + val m: Map[String, String] = conn.hgetAll(x).toMap + (x, m) + }).iterator + conn.close + res + } + }.iterator + } +} + class RedisListRDD(prev: RDD[String], val rddType: String) extends RDD[String](prev) with Keys { override def getPartitions: Array[Partition] = prev.partitions @@ -300,6 +326,10 @@ class RedisKeysRDD(sc: SparkContext, def getHash(): RDD[(String, String)] = { new RedisKVRDD(this, "hash") } + + def getHashX(): RDD[(String, Map[String, String])] = { + new RedisKMapRDD(this) + } /** * filter the 'zset' type keys and get all the elements(without scores) of them * @return RedisZSetRDD[String] diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index ffe71b71..5739662c 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -105,6 +105,13 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { } } + def fromRedisHashX(keys: Array[String], + partitionNum: Int = 3) + (implicit redisConfig: RedisConfig = new RedisConfig(new RedisEndpoint(sc.getConf))): + RDD[(String, Map[String, String])] = { + fromRedisKeys(keys.toSet.toArray, partitionNum)(redisConfig).getHashX + } + /** * @param keysOrKeyPattern an array of keys or a key pattern * @param partitionNum number of partitions diff --git a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala index 85e817d2..ecaa1d0a 100644 --- a/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala +++ b/src/test/scala/com/redislabs/provider/redis/rdd/RedisRDDClusterSuite.scala @@ -22,7 +22,7 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with val wcnt = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)).map((_, 1)). reduceByKey(_ + _) val wcntl = wcnt.map(x => (x._1, x._2.toLong * 2)) - val wcntd = wcnt.map(x => (x._1, x._2.toDouble * 0.31)) + val wcntd = wcnt.map(x => (x._1, x._2.toDouble + 0.31)) val wcnts = wcnt.map(x => (x._1, x._2.toString)) val wcntl2 = wcntl.map(kv => { @@ -32,6 +32,13 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with v1 ++ v2 }) + val wcntd2 = wcntd.map(kv => { + val m = Map{kv._1 -> kv._2} + ("all:words:cnt:hash:incr:float:2", m) + }).reduceByKey((v1, v2) => { + v1 ++ v2 + }) + val wds = sc.parallelize(content.split("\\W+").filter(!_.isEmpty)) // THERE IS NOT AUTH FOR CLUSTER @@ -53,6 +60,9 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with // sc.toRedisHASHIncrByFloat(wcntd, "all:words:cnt:hash:incr:float")(redisConfig) sc.toRedisHASHIncrByLong(wcntl2)(redisConfig) + sc.toRedisHASHIncrByLong(wcntl2)(redisConfig) + sc.toRedisHASHIncrByFloat(wcntd2)(redisConfig) + sc.toRedisHASHIncrByFloat(wcntd2)(redisConfig) } test("RedisKVRDD - default(cluster)") { @@ -162,7 +172,7 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with val redisHashRDD = sc.fromRedisHash("all:words:cnt:hash:incr:long:2") val hashContents = redisHashRDD.sortByKey().collect val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). - map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1) + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1) hashContents should be (wcntl) } @@ -171,10 +181,49 @@ class RedisRDDClusterSuite extends FunSuite with ENV with BeforeAndAfterAll with val redisHashRDD = sc.fromRedisHash("all:words:cnt:hash:incr:long:2") val hashContents = redisHashRDD.sortByKey().collect val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). - map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2).toString)).toArray.sortBy(_._1) + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1) hashContents should be (wcntl) } + test("RedisHashIncrByLongRDD - key map X - default(cluster)") { + val redisHashRDD = sc.fromRedisHashX(Array("all:words:cnt:hash:incr:long:2", + "all:words:cnt:hash:incr:long:2")) + val hashContents = redisHashRDD.collect + val hashValue = hashContents(0)._2.toArray.sortBy(_._1) + val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1) + hashValue should be (wcntl) + } + + test("RedisHashIncrByLongRDD - key map X - cluster") { + implicit val c: RedisConfig = redisConfig + val redisHashRDD = sc.fromRedisHashX(Array("all:words:cnt:hash:incr:long:2", + "all:words:cnt:hash:incr:long:2")) + val hashContents = redisHashRDD.collect + val hashValue = hashContents(0)._2.toArray.sortBy(_._1) + val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1) + hashValue should be (wcntl) + } + + test("RedisHashIncrByRDD - X - default(cluster)") { + val redisHashRDD = sc.fromRedisHashX(Array("all:words:cnt:hash:incr:long:2", + "all:words:cnt:hash:incr:float:2")) + val hashContents = redisHashRDD.collect.foreach(x => { + if (x._1.equals("all:words:cnt:hash:incr:long:2")) { + val hashValue = x._2.toArray.sortBy(_._1) + val wcntl = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 4).toString)).toArray.sortBy(_._1) + hashValue should be (wcntl) + } else if (x._1.equals("all:words:cnt:hash:incr:float:2")) { + val hashValue = x._2.toArray.sortBy(_._1) + val wcntd = content.split("\\W+").filter(!_.isEmpty).map((_, 1)).groupBy(_._1). + map(x => (x._1, (x._2.map(_._2).reduce(_ + _) * 2 + 0.62).toString)).toArray.sortBy(_._1) + hashValue should be (wcntd) + } + }) + } + /* test("RedisHashIncrByLongRDD - cluster") { implicit val c: RedisConfig = redisConfig val redisHashRDD = sc.fromRedisHash( "all:words:cnt:hash:incr:long") From afdfdd2b084209b54c3e4169987c3e5bb29395cb Mon Sep 17 00:00:00 2001 From: ZhouQiang Date: Tue, 14 Mar 2017 12:32:34 +0800 Subject: [PATCH 5/6] =?UTF-8?q?=E6=9B=B4=E6=96=B0=E7=89=88=E6=9C=AC?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 0501195a..6c8a8550 100644 --- a/pom.xml +++ b/pom.xml @@ -3,7 +3,7 @@ 4.0.0 com.redislabs spark-redis - 0.3.3-SNAPSHOT + 0.3.4-SNAPSHOT Spark-Redis A Spark library for Redis http://github.com/RedisLabs/spark-redis From 890ecacfa853eaa37cdd651d068bbed83554c28a Mon Sep 17 00:00:00 2001 From: ZhouQiang Date: Tue, 14 Mar 2017 14:46:58 +0800 Subject: [PATCH 6/6] using redis 3.0.0 for make test --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index d748c887..ccfd143f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,7 +3,7 @@ language: scala scala: - 2.10.4 before_install: - - git clone https://github.com/antirez/redis.git redis_for_spark-redis_test || true + - git clone --branch 3.0.0 https://github.com/antirez/redis.git redis_for_spark-redis_test || true install: - make -C redis_for_spark-redis_test -j4 script: make test