diff --git a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala index 5e263549..7610e913 100644 --- a/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala +++ b/src/main/scala/com/redislabs/provider/redis/redisFunctions.scala @@ -244,6 +244,18 @@ class RedisContext(@transient val sc: SparkContext) extends Serializable { } } + /** + * @param kvs RDD of + * @param geoName target geo's name which hold all the kvs + * @param ttl time to live + */ + def toRedisGEO(kvs: RDD[(Double,Double,String)], geoName: String, ttl: Int = 0) + (implicit + redisConfig: RedisConfig = RedisConfig.fromSparkConf(sc.getConf), + readWriteConfig: ReadWriteConfig = ReadWriteConfig.fromSparkConf(sc.getConf)) { + kvs.foreachPartition(partition => setGeo(geoName, partition, ttl, redisConfig, readWriteConfig)) + } + /** * @param kvs Pair RDD of K/V * @param ttl time to live @@ -362,6 +374,23 @@ object RedisContext extends Serializable { conn.close() } + /** + * @param geoName + * @param arr k/vs which should be saved in the target host + * save all the k/vs to zsetName(zset type) to the target host + * @param ttl time to live + */ + def setGeo(geoName: String, arr: Iterator[(Double,Double,String)], ttl: Int, redisConfig: RedisConfig, readWriteConfig: ReadWriteConfig) { + implicit val rwConf: ReadWriteConfig = readWriteConfig + val conn = redisConfig.connectionForKey(geoName) + val pipeline = foreachWithPipelineNoLastSync(conn, arr) { case (pipeline, (lon,lat,mem)) => + pipeline.geoadd(geoName,lon,lat,mem) + } + if (ttl > 0) pipeline.expire(geoName, ttl) + pipeline.sync() + conn.close() + } + /** * @param zsetName * @param arr k/vs which should be saved in the target host