Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RDD dynamic partition write function #25

Open
clojurians-org opened this issue Mar 22, 2017 · 2 comments
Open

RDD dynamic partition write function #25

clojurians-org opened this issue Mar 22, 2017 · 2 comments
Labels

Comments

@clojurians-org
Copy link

i wander whether has any plan on dynamic partition write function, as it is very common use case.

on cascalog, i use [templatefields and sink-template] keyword to control the dynamic partition.

      (?- (hfs-delimited "hdfs://192.168.1.3:9000/user/hive/warehouse/model.db/d_bolome_order"
                         :outfields ["?dw-dt" "?dw-ts" "?dw-src-id"
                                     "?product-dw-id" "?j-product-dw-src-id"
                                     "!!show-dw-id" "?j-show-dw-src-id"
                                     "!!preview-show-dw-id" "?j-preview-show-dw-src-id"
                                     "!!replay-show-dw-id" "?j-replay-show-dw-src-id"
                                     "?pay-dt" "?user-id" "?order-id"
                                     "?quantity" "?price" "?warehouse-id" "?coupon-id" "?event-id"
                                     "?copon-discount-amount" "?system-discount-amount" "?tax-amount" "?logistics-amount"]
                         :delimiter "\001"
                         :quote ""
                         :sinkmode :replace
                         :templatefields ["?dw-dt"]
                         :sink-template "p_dw_dt=%s"
                         :compression  :enable) $)))

currently , i convert the rdd to dataframe(by partitionBy) to complete this function.

  (as-> (keg/rdd (-> ss .read (.load "hdfs://192.168.1.3:9000/user/hive/warehouse/agg.db/d_bolome_user_order_trgx") (.repartition 8) .rdd)
               (map #(mapv (fn [idx] (.get % idx)) (-> % .length range)))
               (mapcat (fn [[user-id-trgx-str]]
                         (let [[user-id user-trgx] (clojure.edn/read-string user-id-trgx-str)
....
                             (mapv #(RowFactory/create (into-array [(:dm-ds-kind %)  (pr-str [user-id %])])) user-shift-tkvs)) )))
      $
    (.createDataFrame ss $
                      (DataTypes/createStructType (map #(DataTypes/createStructField % DataTypes/StringType false) ["p_ds" "user-id-tkv"])))
    (.write $)
    (.partitionBy $ (into-array ["p_ds"]))
    (.format $ "parquet")
    (.mode $ SaveMode/Overwrite)
    (.save $ "hdfs://192.168.1.3:9000/user/hive/warehouse/mlin.db/d_bolome_user_order"))

the rdd has the saveAsHadoopFile and MultipleTextOutputFormat class,
but it need to exended, so it's very inconvenient.
http://stackoverflow.com/questions/23995040/write-to-multiple-outputs-by-key-spark-one-spark-job

@cgrand
Copy link
Contributor

cgrand commented Mar 28, 2017

Isn't the :key option in keg/by-key what you are looking for? (keg/by-key some-data :key (juxt :a :b))

@clojurians-org
Copy link
Author

clojurians-org commented Mar 28, 2017

it's about the hdfs directory when save file to disk for mapping hive table, not about the data.
such as
/user/hive/warehouse/model/db/tableA/a=1/b=3/part-0000
/user/hive/warehouse/model/db/tableA/a=2/b=3/part-0000
....

create external table tableA (
....
) partitioned by (a string, b string)
stored as textfile ;

@cgrand cgrand added the feature label Mar 31, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants