From fdde7a983f5c69a39322298cce1b5a823a65dd8c Mon Sep 17 00:00:00 2001 From: Greg Haskins Date: Sat, 16 Nov 2019 16:29:47 -0500 Subject: [PATCH 1/2] [#14] Remove length calculation Clojurescript doesn't have a method to compute the size of protobuf messages, and we can get away without having one by using auto-expanding ByteArrayOutputStream types. We therefore remove support for all the size-XX serdes and (pb/length) functions. This is part of the breaking changes to support both Clojure and Clojurescript Partially addresses #14 Signed-off-by: Greg Haskins --- project.clj | 2 +- src/protojure/grpc/codec/lpm.clj | 34 ++++++++--------- src/protojure/protobuf.clj | 14 +++---- src/protojure/protobuf/serdes.clj | 61 +++++-------------------------- test/example/hello.clj | 16 ++------ test/example/types.clj | 42 +++------------------ test/protojure/protobuf_test.clj | 56 ++++++++++++++-------------- 7 files changed, 68 insertions(+), 157 deletions(-) diff --git a/project.clj b/project.clj index fc70552..987bf31 100644 --- a/project.clj +++ b/project.clj @@ -1,4 +1,4 @@ -(defproject protojure "1.0.2-SNAPSHOT" +(defproject protojure "1.1.0-SNAPSHOT" :description "Support library for protoc-gen-clojure, providing native Clojure support for Google Protocol Buffers and GRPC applications" :url "http://github.com/protojure/library" :license {:name "Apache License 2.0" diff --git a/src/protojure/grpc/codec/lpm.clj b/src/protojure/grpc/codec/lpm.clj index 3c6c534..3846155 100644 --- a/src/protojure/grpc/codec/lpm.clj +++ b/src/protojure/grpc/codec/lpm.clj @@ -151,25 +151,20 @@ The value for the **content-coding** option must be one of ;;-------------------------------------------------------------------------------------- ;; Encoder ;;-------------------------------------------------------------------------------------- -(defn- encode-header [os compressed? len] +(defn- encode-buffer [buf len compressed? os] (.write os (int (if compressed? 1 0))) - (.write os (num->bytes len))) - -(defn- encode-uncompressed - ([msg os] - (encode-uncompressed msg (pb/length msg) os)) - ([msg len os] - (encode-header os false len) - (pb/->pb msg os))) - -(defn- encode-compressed-buffer [buf len os] - (encode-header os true len) + (.write os (num->bytes len)) (.write os buf)) -(defn- compress-msg [compressor msg] +(defn- encode-uncompressed [msg os] + (let [buf (pb/->pb msg) + len (count buf)] + (encode-buffer buf len false os))) + +(defn- compress-buffer [compressor buf] (let [os (ByteArrayOutputStream.) cos (compressor os)] - (pb/->pb msg cos) + (.write cos buf) (.close cos) (.toByteArray os))) @@ -177,12 +172,13 @@ The value for the **content-coding** option must be one of "This function will encode the message either with or without compression, depending on whichever results in the smaller message" [msg compressor os] - (let [buf (compress-msg compressor msg) - clen (count buf) - len (pb/length msg)] + (let [buf (pb/->pb msg) + len (count buf) + cbuf (compress-buffer compressor buf) + clen (count buf)] (if (< clen len) - (encode-compressed-buffer buf clen os) - (encode-uncompressed msg len os)))) + (encode-buffer cbuf clen true os) + (encode-buffer buf len false os)))) ;;-------------------------------------------------------------------------------------------- (defn encode diff --git a/src/protojure/protobuf.clj b/src/protojure/protobuf.clj index d3b294a..70e4828 100644 --- a/src/protojure/protobuf.clj +++ b/src/protojure/protobuf.clj @@ -4,20 +4,18 @@ (ns protojure.protobuf "Main API entry point for protobuf applications" - (:import (com.google.protobuf - CodedOutputStream))) + (:import (com.google.protobuf CodedOutputStream) + (java.io ByteArrayOutputStream))) (defprotocol Writer - (serialize [this os]) - (length [this])) + (serialize [this os])) (defn ->pb "Serialize a record implementing the [[Writer]] protocol into protobuf bytes." ([msg] - (let [len (length msg) - data (byte-array len)] - (->pb msg data) - data)) + (let [os (ByteArrayOutputStream.)] + (->pb msg os) + (.toByteArray os))) ([msg output] (let [os (CodedOutputStream/newInstance output)] (serialize msg os) diff --git a/src/protojure/protobuf/serdes.clj b/src/protojure/protobuf/serdes.clj index b2f4c32..e596541 100644 --- a/src/protojure/protobuf/serdes.clj +++ b/src/protojure/protobuf/serdes.clj @@ -80,23 +80,10 @@ (when-not (and (get options# :optimize true) (~default? value#)) (. os# ~sym tag# value#)))))) -(defmacro defsizefn [type default?] - (let [name (symbol (str "size-" type)) - sym (symbol (str "compute" type "Size")) - doc (format "Compute length of serialized '%s' type" type)] - `(defn ~name ~doc - ([tag# value#] - (~name tag# {} value#)) - ([tag# options# value#] - (if-not (and (get options# :optimize true) (~default? value#)) - (. CodedOutputStream ~sym tag# value#) - 0))))) - (defmacro defserdes [type default?] `(do (defparsefn ~type) - (defwritefn ~type ~default?) - (defsizefn ~type ~default?))) + (defwritefn ~type ~default?))) (def default-scalar? #(or (nil? %) (zero? %))) (def default-string? empty?) @@ -137,16 +124,6 @@ (let [bytestring (ByteString/copyFrom value)] (.writeBytes os tag bytestring))))) -(defn size-Bytes - "Compute length of serialized 'Bytes' type" - ([tag value] - (size-Bytes tag {} value)) - ([tag {:keys [optimize] :or {optimize true} :as options} value] - (if-not (and optimize (empty? value)) - (let [bytestring (ByteString/copyFrom value)] - (CodedOutputStream/computeBytesSize tag bytestring)) - 0))) - (defn cis->undefined "Deserialize an unknown type, retaining its tag/type" [tag is] @@ -208,11 +185,13 @@ (defn write-embedded "Serialize an embedded type along with tag/length metadata" [tag item os] - (let [len (if (some? item) (pb/length item) 0)] - (when-not (zero? len) - (.writeTag os tag 2);; embedded messages are always type=2 (string) - (.writeUInt32NoTag os len) - (pb/serialize item os)))) + (when (some? item) + (let [bytes (pb/->pb item) + len (count bytes)] + (when-not (zero? len) + (.writeTag os tag 2);; embedded messages are always type=2 (string) + (.writeUInt32NoTag os len) + (.writeRawBytes os bytes))))) ;; FIXME: Add support for optimizing packable types (defn write-repeated @@ -224,26 +203,4 @@ (defn write-map "Serialize user format [key val] using given map item constructor" [constructor tag items os] - (write-repeated write-embedded tag (map (fn [[key value]] (constructor {:key key :value value})) items) os)) - -(defn size-embedded - "Compute length of serialized embedded type, including the metadata header" - [tag item] - (let [len (if (some? item) (pb/length item) 0)] - (if-not (zero? len) - (+ - (size-UInt32 tag {:optimize false} len) ;; This accounts for the tag+length preamble - len) ;; And this is the embedded item itself - 0))) - -(defn size-repeated - "Compute length of serialized repeated type" - [f tag items] - (if-not (empty? items) - (reduce + (map (partial f tag) items)) - 0)) - -(defn size-map - "Compute length of user format [key val] using given map item constructor" - [constructor tag item] - (size-repeated size-embedded tag (map (fn [[key value]] (constructor {:key key :value value})) item))) + (write-repeated write-embedded tag (map (fn [[key value]] (constructor {:key key :value value})) items) os)) \ No newline at end of file diff --git a/test/example/hello.clj b/test/example/hello.clj index 9772f36..42dc70b 100644 --- a/test/example/hello.clj +++ b/test/example/hello.clj @@ -40,10 +40,7 @@ pb/Writer (serialize [this os] - (write-String 1 {:optimize true} (:name this) os)) - - (length [this] - (reduce + [(size-String 1 {:optimize true} (:name this))]))) + (write-String 1 {:optimize true} (:name this) os))) (s/def :com.sttgts.omnia.hello.messages.HelloRequest/name string?) (s/def ::HelloRequest-spec (s/keys :opt-un [:com.sttgts.omnia.hello.messages.HelloRequest/name])) @@ -90,11 +87,7 @@ (serialize [this os] (write-String 1 {:optimize true} (:name this) os) - (write-Int32 2 {:optimize true} (:count this) os)) - - (length [this] - (reduce + [(size-String 1 {:optimize true} (:name this)) - (size-Int32 2 {:optimize true} (:count this))]))) + (write-Int32 2 {:optimize true} (:count this) os))) (s/def :com.sttgts.omnia.hello.messages.RepeatHelloRequest/name string?) (s/def :com.sttgts.omnia.hello.messages.RepeatHelloRequest/count int?) @@ -142,10 +135,7 @@ pb/Writer (serialize [this os] - (write-String 1 {:optimize true} (:message this) os)) - - (length [this] - (reduce + [(size-String 1 {:optimize true} (:message this))]))) + (write-String 1 {:optimize true} (:message this) os))) (s/def :com.sttgts.omnia.hello.messages.HelloReply/message string?) (s/def ::HelloReply-spec (s/keys :opt-un [:com.sttgts.omnia.hello.messages.HelloReply/message])) diff --git a/test/example/types.clj b/test/example/types.clj index 8c77d76..9ea9730 100644 --- a/test/example/types.clj +++ b/test/example/types.clj @@ -18,13 +18,7 @@ (serialize [this os] (write-String 1 (:currency_code this) os) (write-Int64 2 (:units this) os) - (write-Int32 3 (:nanos this) os)) - - (length [this] - (+ - (size-String 1 (:currency_code this)) - (size-Int64 2 (:units this)) - (size-Int32 3 (:nanos this))))) + (write-Int32 3 (:nanos this) os))) (def Money-defaults {:currency_code "" :units 0 :nanos 0}) @@ -69,10 +63,7 @@ pb/Writer (serialize [this os] - (write-repeated write-Int32 1 (:data this) os)) - - (length [this] - (size-repeated size-Int32 1 (:data this)))) + (write-repeated write-Int32 1 (:data this) os))) (def SimpleRepeated-defaults {:data []}) @@ -116,10 +107,7 @@ pb/Writer (serialize [this os] - (write-String 1 {:optimize true} (:s this) os)) - - (length [this] - (size-String 1 {:optimize true} (:s this)))) + (write-String 1 {:optimize true} (:s this) os))) (def SimpleString-defaults {:s ""}) @@ -164,12 +152,7 @@ (serialize [this os] (write-String 1 {:optimize true} (:key this) os) - (write-Int32 2 {:optimize true} (:value this) os)) - - (length [this] - (+ - (size-String 1 {:optimize true} (:key this)) - (size-Int32 2 {:optimize true} (:value this))))) + (write-Int32 2 {:optimize true} (:value this) os))) (def AllThingsMap-MSimpleEntry-defaults {:key "" :value 0}) @@ -215,12 +198,7 @@ (serialize [this os] (write-String 1 {:optimize true} (:key this) os) - (write-embedded 2 (:value this) os)) - - (length [this] - (+ - (size-String 1 {:optimize true} (:key this)) - (size-embedded 2 (:value this))))) + (write-embedded 2 (:value this) os))) (def AllThingsMap-MComplexEntry-defaults {:key ""}) @@ -270,15 +248,7 @@ (write-Int32 2 {:optimize true} (:i this) os) (write-map new-AllThingsMap-MSimpleEntry 3 (:mSimple this) os) (write-map new-AllThingsMap-MComplexEntry 4 (:mComplex this) os) - (write-embedded 5 (:sSimple this) os)) - - (length [this] - (+ - (size-String 1 {:optimize true} (:s this)) - (size-Int32 2 {:optimize true} (:i this)) - (size-map new-AllThingsMap-MSimpleEntry 3 (:mSimple this)) - (size-map new-AllThingsMap-MComplexEntry 4 (:mComplex this)) - (size-embedded 5 (:sSimple this))))) + (write-embedded 5 (:sSimple this) os))) (def AllThingsMap-defaults {:s "" :i 0 :mSimple [] :mComplex []}) diff --git a/test/protojure/protobuf_test.clj b/test/protojure/protobuf_test.clj index 6887f28..5f95e7d 100644 --- a/test/protojure/protobuf_test.clj +++ b/test/protojure/protobuf_test.clj @@ -14,7 +14,7 @@ [example.types :as example]) (:import (com.google.protobuf CodedOutputStream CodedInputStream) - (java.io ByteArrayInputStream) + (java.io ByteArrayOutputStream) (org.apache.commons.io.input CloseShieldInputStream) (org.apache.commons.io.output CloseShieldOutputStream)) (:refer-clojure :exclude [resolve])) @@ -25,11 +25,11 @@ (defn- fns [type] (mapv #(clojure.core/resolve (symbol "protojure.protobuf.serdes" (str % type))) - ["size-" "write-" "cis->"])) + ["write-" "cis->"])) (defn- resolve-fns [type] - (let [[sizefn writefn parsefn] (fns type)] - {:sizefn sizefn :writefn writefn :parsefn parsefn})) + (let [[writefn parsefn] (fns type)] + {:writefn writefn :parsefn parsefn})) (defn- pbverify "End to end serdes testing for a specific message" @@ -40,23 +40,24 @@ (defn- with-buffer "Invokes 'f' with a fully formed buffered output-stream and returns the bytes" - [len f] - (let [buf (byte-array len) - os (CodedOutputStream/newInstance buf)] - (f os) - (.flush os) - buf)) + [f] + (let [os (ByteArrayOutputStream.) + cos (CodedOutputStream/newInstance os)] + (f cos) + (.flush cos) + (.toByteArray os))) -(defn- write [sizefn writefn tag value] - (let [len (sizefn tag value)] - (with-buffer len (partial writefn tag value)))) +(defn- size [f] + (count (with-buffer f))) + +(defn- write [writefn tag value] + (with-buffer (partial writefn tag value))) (defn- write-embedded [tag item] - (write serdes/size-embedded serdes/write-embedded tag item)) + (write serdes/write-embedded tag item)) -(defn- write-repeated [sizefn writefn tag items] - (let [len (serdes/size-repeated sizefn tag items)] - (with-buffer len (partial serdes/write-repeated writefn tag items)))) +(defn- write-repeated [writefn tag items] + (with-buffer (partial serdes/write-repeated writefn tag items))) (defn- parse [^bytes buf readfn] (let [is (CodedInputStream/newInstance buf)] @@ -131,8 +132,8 @@ (defn- validate-e2e "validate that we can do a complete end-to-end serialize->deserialize cycle" [{:keys [type input]}] - (let [{:keys [sizefn writefn parsefn]} (resolve-fns type) - output (-> (write sizefn writefn tag input) + (let [{:keys [writefn parsefn]} (resolve-fns type) + output (-> (write writefn tag input) (parse parsefn))] (is (data-equal? input output)))) @@ -146,22 +147,21 @@ A correct functioning optimizer will elide the write, resulting in no errors even despite our bogus stream." [{:keys [type input default]}] - (let [{:keys [writefn sizefn]} (resolve-fns type)] - (is (pos? (sizefn tag input))) - (is (zero? (sizefn tag default))) - (is (pos? (sizefn tag {:optimize false} default))) + (let [{:keys [writefn]} (resolve-fns type)] + (is (pos? (size (partial writefn tag input)))) + (is (zero? (size (partial writefn tag default)))) + (is (pos? (size (partial writefn tag {:optimize false} default)))) (writefn tag default nil))) (defn- validate-repeated [{:keys [type input packable? repeatfn]}] - (let [{:keys [sizefn writefn parsefn]} (resolve-fns type) + (let [{:keys [writefn parsefn]} (resolve-fns type) items (vec (repeatfn 10 input)) - output (-> (write-repeated sizefn writefn tag items) + output (-> (write-repeated writefn tag items) (parse-repeated parsefn packable? tag) (get tag))] - (is (data-equal? items output)) - (is (zero? (serdes/size-repeated sizefn tag []))))) + (is (data-equal? items output)))) ;; We add a silly codec named "mycustom" that does nothing. We use the CloseShieldXXX family ;; of proxy stream classes so that we pass the IO through, but bury the (.close) operation. This @@ -256,7 +256,7 @@ (deftest embedded-nil-test (testing "Check that embedded but unset messages are handled properly" - (is (zero? (serdes/size-embedded tag nil))) + (is (zero? (size (partial serdes/write-embedded tag nil)))) (serdes/write-embedded tag nil nil))) (deftest grpc-lpm-test From fe51874bbcca19f45a858856b7fbf87ca8d2a7d9 Mon Sep 17 00:00:00 2001 From: Greg Haskins Date: Fri, 8 Nov 2019 08:16:18 -0700 Subject: [PATCH 2/2] [#14] Split up code in preparation for clojurescript support We need to split out common code from clojure/jvm specific code so that we may pave the way for clojurescript support in a future patch. Partially addresses #14 Signed-off-by: Greg Haskins --- src/protojure/grpc/codec/lpm.clj | 6 +- src/protojure/protobuf.clj | 4 +- src/protojure/protobuf/protocol.cljc | 9 + src/protojure/protobuf/serdes.clj | 206 --------------------- src/protojure/protobuf/serdes/complex.cljc | 58 ++++++ src/protojure/protobuf/serdes/core.clj | 105 +++++++++++ src/protojure/protobuf/serdes/stream.clj | 15 ++ src/protojure/protobuf/serdes/utils.cljc | 61 ++++++ test/example/hello.clj | 18 +- test/example/types.clj | 20 +- test/protojure/protobuf_test.clj | 12 +- 11 files changed, 279 insertions(+), 235 deletions(-) create mode 100644 src/protojure/protobuf/protocol.cljc delete mode 100644 src/protojure/protobuf/serdes.clj create mode 100644 src/protojure/protobuf/serdes/complex.cljc create mode 100644 src/protojure/protobuf/serdes/core.clj create mode 100644 src/protojure/protobuf/serdes/stream.clj create mode 100644 src/protojure/protobuf/serdes/utils.cljc diff --git a/src/protojure/grpc/codec/lpm.clj b/src/protojure/grpc/codec/lpm.clj index 3846155..c928059 100644 --- a/src/protojure/grpc/codec/lpm.clj +++ b/src/protojure/grpc/codec/lpm.clj @@ -6,7 +6,7 @@ "Utility functions for GRPC [length-prefixed-message](https://github.com/grpc/grpc/blob/master/doc/PROTOCOL-HTTP2.md#requests) encoding." (:require [clojure.core.async :refer [! go go-loop] :as async] [promesa.core :as p] - [protojure.protobuf :as pb] + [protojure.protobuf :refer [->pb]] [protojure.grpc.codec.compression :as compression] [clojure.tools.logging :as log]) (:import (protojure.internal.grpc.io InputStream @@ -157,7 +157,7 @@ The value for the **content-coding** option must be one of (.write os buf)) (defn- encode-uncompressed [msg os] - (let [buf (pb/->pb msg) + (let [buf (->pb msg) len (count buf)] (encode-buffer buf len false os))) @@ -172,7 +172,7 @@ The value for the **content-coding** option must be one of "This function will encode the message either with or without compression, depending on whichever results in the smaller message" [msg compressor os] - (let [buf (pb/->pb msg) + (let [buf (->pb msg) len (count buf) cbuf (compress-buffer compressor buf) clen (count buf)] diff --git a/src/protojure/protobuf.clj b/src/protojure/protobuf.clj index 70e4828..1ac7e91 100644 --- a/src/protojure/protobuf.clj +++ b/src/protojure/protobuf.clj @@ -4,12 +4,10 @@ (ns protojure.protobuf "Main API entry point for protobuf applications" + (:require [protojure.protobuf.protocol :refer [serialize]]) (:import (com.google.protobuf CodedOutputStream) (java.io ByteArrayOutputStream))) -(defprotocol Writer - (serialize [this os])) - (defn ->pb "Serialize a record implementing the [[Writer]] protocol into protobuf bytes." ([msg] diff --git a/src/protojure/protobuf/protocol.cljc b/src/protojure/protobuf/protocol.cljc new file mode 100644 index 0000000..a029fb6 --- /dev/null +++ b/src/protojure/protobuf/protocol.cljc @@ -0,0 +1,9 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; Copyright © 2019 Manetu, Inc. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.protobuf.protocol) + +(defprotocol Writer + (serialize [this os])) \ No newline at end of file diff --git a/src/protojure/protobuf/serdes.clj b/src/protojure/protobuf/serdes.clj deleted file mode 100644 index e596541..0000000 --- a/src/protojure/protobuf/serdes.clj +++ /dev/null @@ -1,206 +0,0 @@ -;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved -;; -;; SPDX-License-Identifier: Apache-2.0 - -(ns protojure.protobuf.serdes - "Serializer/deserializer support for fundamental protobuf types." - (:require [protojure.protobuf :as pb]) - (:import (com.google.protobuf CodedInputStream - CodedOutputStream - WireFormat - UnknownFieldSet - ExtensionRegistry - ByteString))) - -(defn tag-map - " - Returns a lazy sequence consisting of the result of applying f to the set of - protobuf objects delimited by protobuf tags. - - #### Parameters - - | Value | Type | Description | - |----------|--------------------|------------------------------------------------------------------------------------------------| - | **init** | _map_ | A map of initial values | - | **f** | _(fn [tag index])_ | An arity-2 function that accepts a tag and index and returns a [k v] (see _Return type_ below) | - | **is** | [CodedInputStream](https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/CodedInputStream) | An input stream containing serialized protobuf data | - - #### Return Type - - _f_ should evaluate to a 2-entry vector in the form [key value], where: - - - _key_ is either - - a keyword representing the field name when the index is known - - simply the index value when it is not - - _value_ is either - - a value that will be returned verbatim to be associated to the _key_ - - a function that will take a collection of previously deserialized values with the same tag and update it to incorporate the new value (to support _repeated_ types, etc) - - - #### Example - - ``` - (tag-map - (fn [tag index] - (case index - 1 [:currency_code (cis->String is)] - 2 [:units (cis->Int64 is)] - 3 [:nanos (cis->Int32 is)] - [index (cis->undefined tag is)])) - is)) - ``` - " - ([f is] - (tag-map {} f is)) - ([init f is] - (loop [acc init tag (.readTag is)] - (if (pos? tag) - (let [[k v] (f tag (bit-shift-right tag 3))] - (recur (if (fn? v) - (update acc k v) - (assoc acc k v)) - (.readTag is))) - acc)))) - -(defmacro defparsefn [type] - (let [name (symbol (str "cis->" type)) - sym (symbol (str "read" type)) - doc (format "Deserialize a '%s' type" type)] - `(defn ~name ~doc [is#] - (. is# ~sym)))) - -(defmacro defwritefn [type default?] - (let [name (symbol (str "write-" type)) - sym (symbol (str "write" type)) - doc (format "Serialize a '%s' type" type)] - `(defn ~name ~doc - ([tag# value# os#] - (~name tag# {} value# os#)) - ([tag# options# value# os#] - (when-not (and (get options# :optimize true) (~default? value#)) - (. os# ~sym tag# value#)))))) - -(defmacro defserdes [type default?] - `(do - (defparsefn ~type) - (defwritefn ~type ~default?))) - -(def default-scalar? #(or (nil? %) (zero? %))) -(def default-string? empty?) -(def default-bool? #(not (true? %))) - -(defmacro defscalar [type] - `(defserdes ~type default-scalar?)) - -(defscalar "Double") -(defscalar "Enum") -(defscalar "Fixed32") -(defscalar "Fixed64") -(defscalar "Float") -(defscalar "Int32") -(defscalar "Int64") -(defscalar "SFixed32") -(defscalar "SFixed64") -(defscalar "SInt32") -(defscalar "SInt64") -(defscalar "UInt32") -(defscalar "UInt64") - -(defserdes "String" default-string?) -(defserdes "Bool" default-bool?) - -;; manually implement the "Bytes" scalar so we can properly handle native byte-array import/export -(defn cis->Bytes - "Deserialize 'Bytes' type" - [is] - (.toByteArray (.readBytes is))) - -(defn write-Bytes - "Serialize 'Bytes' type" - ([tag value os] - (write-Bytes tag {} value os)) - ([tag {:keys [optimize] :or {optimize true} :as options} value os] - (when-not (and optimize (empty? value)) - (let [bytestring (ByteString/copyFrom value)] - (.writeBytes os tag bytestring))))) - -(defn cis->undefined - "Deserialize an unknown type, retaining its tag/type" - [tag is] - (let [num (WireFormat/getTagFieldNumber tag) - type (WireFormat/getTagWireType tag)] - (case type - 0 (.readInt64 is) - 1 (.readFixed64 is) - 2 (.readBytes is) - 3 (.readGroup is num (UnknownFieldSet/newBuilder) (ExtensionRegistry/getEmptyRegistry)) - 4 nil - 5 (.readFixed32 is)))) - -(defn cis->embedded - "Deserialize an embedded type, where **f** is an (fn) that can deserialize the embedded message" - [f is] - (let [len (.readRawVarint32 ^CodedInputStream is) - lim (.pushLimit is len)] - (let [result (f is)] - (.popLimit is lim) - result))) - -(defn cis->map - "Deserialize a wire format map-type to user format [key val]" - [f is] - (let [{:keys [key value]} (f is)] - (partial into {key value}))) - -(defn cis->repeated - "Deserialize an 'unpacked' repeated type (see [[cis->packablerepeated]])" - [f is] - (fn [coll] - (conj (or coll []) (f is)))) - -(defn- repeated-seq - "Returns a lazy sequence of repeated items on an input-stream" - [f is] - (lazy-seq (when (not (.isAtEnd is)) - (cons (f is) (repeated-seq f is))))) - -(defn cis->packedrepeated - "Deserialize a 'packed' repeated type (see [[cis->packablerepeated]])" - [f is] - (fn [coll] - (cis->embedded #(reduce conj (or coll []) (repeated-seq f %)) is))) - -(defn cis->packablerepeated - " - Deserialize a repeated type which may optionally support [packed format](https://developers.google.com/protocol-buffers/docs/encoding#packed). - The field type will indicate unpacked (0) vs packed (2). - " - [tag f is] - (let [type (WireFormat/getTagWireType tag)] - (case type - 0 (cis->repeated f is) - 2 (cis->packedrepeated f is) - (cis->undefined tag is)))) - -(defn write-embedded - "Serialize an embedded type along with tag/length metadata" - [tag item os] - (when (some? item) - (let [bytes (pb/->pb item) - len (count bytes)] - (when-not (zero? len) - (.writeTag os tag 2);; embedded messages are always type=2 (string) - (.writeUInt32NoTag os len) - (.writeRawBytes os bytes))))) - -;; FIXME: Add support for optimizing packable types -(defn write-repeated - "Serialize a repeated type" - [f tag items os] - (doseq [item items] - (f tag item os))) - -(defn write-map - "Serialize user format [key val] using given map item constructor" - [constructor tag items os] - (write-repeated write-embedded tag (map (fn [[key value]] (constructor {:key key :value value})) items) os)) \ No newline at end of file diff --git a/src/protojure/protobuf/serdes/complex.cljc b/src/protojure/protobuf/serdes/complex.cljc new file mode 100644 index 0000000..b000a02 --- /dev/null +++ b/src/protojure/protobuf/serdes/complex.cljc @@ -0,0 +1,58 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; Copyright © 2019 Manetu, Inc. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.protobuf.serdes.complex + "Serializer/deserializer support for complex protobuf types." + (:require [protojure.protobuf.serdes.core :refer :all] + [protojure.protobuf.serdes.stream :as stream])) + +(defn cis->map + "Deserialize a wire format map-type to user format [key val]" + [f is] + (let [{:keys [key value]} (f is)] + (partial into {key value}))) + +(defn cis->repeated + "Deserialize an 'unpacked' repeated type (see [[cis->packablerepeated]])" + [f is] + (fn [coll] + (conj (or coll []) (f is)))) + +(defn- repeated-seq + "Returns a lazy sequence of repeated items on an input-stream" + [f is] + (lazy-seq (when (not (stream/end? is)) + (cons (f is) (repeated-seq f is))))) + +(defn cis->packedrepeated + "Deserialize a 'packed' repeated type (see [[cis->packablerepeated]])" + [f is] + (fn [coll] + (cis->embedded #(reduce conj (or coll []) (repeated-seq f %)) is))) + +(defn cis->packablerepeated + " + Deserialize a repeated type which may optionally support [packed format](https://developers.google.com/protocol-buffers/docs/encoding#packed). + The field type will indicate unpacked (0) vs packed (2). + " + [tag f is] + (let [type (bit-and 0x2 tag)] + (case type + 0 (cis->repeated f is) + 2 (cis->packedrepeated f is) + (cis->undefined tag is)))) + +;; FIXME: Add support for optimizing packable types +(defn write-repeated + "Serialize a repeated type" + [f tag items os] + (doseq [item items] + (f tag item os))) + +(defn write-map + "Serialize user format [key val] using given map item constructor" + [constructor tag items os] + (write-repeated write-embedded tag (map (fn [[key value]] (constructor {:key key :value value})) items) os)) + diff --git a/src/protojure/protobuf/serdes/core.clj b/src/protojure/protobuf/serdes/core.clj new file mode 100644 index 0000000..2fe07e1 --- /dev/null +++ b/src/protojure/protobuf/serdes/core.clj @@ -0,0 +1,105 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.protobuf.serdes.core + "Serializer/deserializer support for fundamental protobuf types." + (:require [protojure.protobuf :refer [->pb]] + [protojure.protobuf.serdes.utils :as utils]) + (:import (com.google.protobuf CodedInputStream + CodedOutputStream + WireFormat + UnknownFieldSet + ExtensionRegistry + ByteString))) + +(defmacro defparsefn [type] + (let [name (symbol (str "cis->" type)) + sym (symbol (str "read" type)) + doc (format "Deserialize a '%s' type" type)] + `(defn ~name ~doc [is#] + (. is# ~sym)))) + +(defmacro defwritefn [type default?] + (let [name (symbol (str "write-" type)) + sym (symbol (str "write" type)) + doc (format "Serialize a '%s' type" type)] + `(defn ~name ~doc + ([tag# value# os#] + (~name tag# {} value# os#)) + ([tag# options# value# os#] + (when-not (and (get options# :optimize true) (~default? value#)) + (. os# ~sym tag# value#)))))) + +(defmacro defserdes [type default?] + `(do + (defparsefn ~type) + (defwritefn ~type ~default?))) + +(defmacro defscalar [type] + `(defserdes ~type utils/default-scalar?)) + +(defscalar "Double") +(defscalar "Enum") +(defscalar "Fixed32") +(defscalar "Fixed64") +(defscalar "Float") +(defscalar "Int32") +(defscalar "Int64") +(defscalar "SFixed32") +(defscalar "SFixed64") +(defscalar "SInt32") +(defscalar "SInt64") +(defscalar "UInt32") +(defscalar "UInt64") + +(defserdes "String" utils/default-bytes?) +(defserdes "Bool" utils/default-bool?) + +;; manually implement the "Bytes" scalar so we can properly handle native byte-array import/export +(defn cis->Bytes + "Deserialize 'Bytes' type" + [is] + (.toByteArray (.readBytes is))) + +(defn write-Bytes + "Serialize 'Bytes' type" + ([tag value os] + (write-Bytes tag {} value os)) + ([tag {:keys [optimize] :or {optimize true} :as options} value os] + (when-not (and optimize (empty? value)) + (let [bytestring (ByteString/copyFrom value)] + (.writeBytes os tag bytestring))))) + +(defn cis->undefined + "Deserialize an unknown type, retaining its tag/type" + [tag is] + (let [num (WireFormat/getTagFieldNumber tag) + type (WireFormat/getTagWireType tag)] + (case type + 0 (.readInt64 is) + 1 (.readFixed64 is) + 2 (.readBytes is) + 3 (.readGroup is num (UnknownFieldSet/newBuilder) (ExtensionRegistry/getEmptyRegistry)) + 4 nil + 5 (.readFixed32 is)))) + +(defn cis->embedded + "Deserialize an embedded type, where **f** is an (fn) that can deserialize the embedded message" + [f is] + (let [len (.readRawVarint32 ^CodedInputStream is) + lim (.pushLimit is len)] + (let [result (f is)] + (.popLimit is lim) + result))) + +(defn write-embedded + "Serialize an embedded type along with tag/length metadata" + [tag item os] + (when (some? item) + (let [bytes (->pb item) + len (count bytes)] + (when-not (zero? len) + (.writeTag os tag 2);; embedded messages are always type=2 (string) + (.writeUInt32NoTag os len) + (.writeRawBytes os bytes))))) \ No newline at end of file diff --git a/src/protojure/protobuf/serdes/stream.clj b/src/protojure/protobuf/serdes/stream.clj new file mode 100644 index 0000000..0574432 --- /dev/null +++ b/src/protojure/protobuf/serdes/stream.clj @@ -0,0 +1,15 @@ +;; Copyright © 2019 Manetu, Inc. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.protobuf.serdes.stream + (:import (com.google.protobuf CodedInputStream))) + +(defn end? [is] + (.isAtEnd ^CodedInputStream is)) + +(defn read-tag [is] + (.readTag ^CodedInputStream is)) + +(defn new-cis [src] + (CodedInputStream/newInstance src)) diff --git a/src/protojure/protobuf/serdes/utils.cljc b/src/protojure/protobuf/serdes/utils.cljc new file mode 100644 index 0000000..420ac55 --- /dev/null +++ b/src/protojure/protobuf/serdes/utils.cljc @@ -0,0 +1,61 @@ +;; Copyright © 2019 State Street Bank and Trust Company. All rights reserved +;; Copyright © 2019 Manetu, Inc. All rights reserved +;; +;; SPDX-License-Identifier: Apache-2.0 + +(ns protojure.protobuf.serdes.utils + (:require [protojure.protobuf.serdes.stream :as stream])) + +(defn tag-map + " + Returns a lazy sequence consisting of the result of applying f to the set of + protobuf objects delimited by protobuf tags. + + #### Parameters + + | Value | Type | Description | + |----------|--------------------|------------------------------------------------------------------------------------------------| + | **init** | _map_ | A map of initial values | + | **f** | _(fn [tag index])_ | An arity-2 function that accepts a tag and index and returns a [k v] (see _Return type_ below) | + | **is** | [CodedInputStream](https://developers.google.com/protocol-buffers/docs/reference/java/com/google/protobuf/CodedInputStream) | An input stream containing serialized protobuf data | + + #### Return Type + + _f_ should evaluate to a 2-entry vector in the form [key value], where: + + - _key_ is either + - a keyword representing the field name when the index is known + - simply the index value when it is not + - _value_ is either + - a value that will be returned verbatim to be associated to the _key_ + - a function that will take a collection of previously deserialized values with the same tag and update it to incorporate the new value (to support _repeated_ types, etc) + + + #### Example + + ``` + (tag-map + (fn [tag index] + (case index + 1 [:currency_code (cis->String is)] + 2 [:units (cis->Int64 is)] + 3 [:nanos (cis->Int32 is)] + [index (cis->undefined tag is)])) + is)) + ``` + " + ([f is] + (tag-map {} f is)) + ([init f is] + (loop [acc init tag (stream/read-tag is)] + (if (pos? tag) + (let [[k v] (f tag (bit-shift-right tag 3))] + (recur (if (fn? v) + (update acc k v) + (assoc acc k v)) + (stream/read-tag is))) + acc)))) + +(def default-scalar? #(or (nil? %) (zero? %))) +(def default-bytes? empty?) +(def default-bool? #(not (true? %))) diff --git a/test/example/hello.clj b/test/example/hello.clj index 42dc70b..97a8dc8 100644 --- a/test/example/hello.clj +++ b/test/example/hello.clj @@ -4,12 +4,12 @@ ;;; Message Implementation of package com.sttgts.omnia.hello ;;;---------------------------------------------------------------------------------- (ns example.hello - (:require [protojure.protobuf :as pb] - [protojure.protobuf.serdes :refer :all] - [clojure.set :as set] - [clojure.spec.alpha :as s]) - (:import (com.google.protobuf - CodedInputStream))) + (:require [protojure.protobuf.protocol :as pb] + [protojure.protobuf.serdes.core :refer :all] + [protojure.protobuf.serdes.complex :refer :all] + [protojure.protobuf.serdes.utils :refer [tag-map]] + [protojure.protobuf.serdes.stream :as stream] + [clojure.spec.alpha :as s])) ;;---------------------------------------------------------------------------------- ;;---------------------------------------------------------------------------------- @@ -76,7 +76,7 @@ "Protobuf to HelloRequest" [input] (-> input - CodedInputStream/newInstance + stream/new-cis cis->HelloRequest)) ;----------------------------------------------------------------------------- @@ -125,7 +125,7 @@ "Protobuf to RepeatHelloRequest" [input] (-> input - CodedInputStream/newInstance + stream/new-cis cis->RepeatHelloRequest)) ;----------------------------------------------------------------------------- @@ -171,6 +171,6 @@ "Protobuf to HelloReply" [input] (-> input - CodedInputStream/newInstance + stream/new-cis cis->HelloReply)) diff --git a/test/example/types.clj b/test/example/types.clj index 9ea9730..8ebfa3f 100644 --- a/test/example/types.clj +++ b/test/example/types.clj @@ -3,9 +3,11 @@ ;; SPDX-License-Identifier: Apache-2.0 (ns example.types - (:require [protojure.protobuf :as pb] - [protojure.protobuf.serdes :refer :all]) - (:import (com.google.protobuf CodedInputStream))) + (:require [protojure.protobuf.protocol :as pb] + [protojure.protobuf.serdes.core :refer :all] + [protojure.protobuf.serdes.complex :refer :all] + [protojure.protobuf.serdes.utils :refer [tag-map]] + [protojure.protobuf.serdes.stream :as stream])) ;----------------------------------------------------------------------------- ; Money @@ -53,7 +55,7 @@ "Protobuf to Money" [input] (-> input - CodedInputStream/newInstance + stream/new-cis cis->Money)) ;----------------------------------------------------------------------------- @@ -97,7 +99,7 @@ "Protobuf to SimpleRepeated" [input] (-> input - CodedInputStream/newInstance + stream/new-cis cis->SimpleRepeated)) ;----------------------------------------------------------------------------- @@ -141,7 +143,7 @@ "Protobuf to SimpleString" [input] (-> input - CodedInputStream/newInstance + stream/new-cis cis->SimpleString)) ;----------------------------------------------------------------------------- @@ -187,7 +189,7 @@ "Protobuf to AllThingsMap-MSimpleEntry" [input] (-> input - CodedInputStream/newInstance + stream/new-cis cis->AllThingsMap-MSimpleEntry)) ;----------------------------------------------------------------------------- @@ -234,7 +236,7 @@ "Protobuf to AllThingsMap-MComplexEntry" [input] (-> input - CodedInputStream/newInstance + stream/new-cis cis->AllThingsMap-MComplexEntry)) ;----------------------------------------------------------------------------- @@ -287,6 +289,6 @@ "Protobuf to AllThingsMap" [input] (-> input - CodedInputStream/newInstance + stream/new-cis cis->AllThingsMap)) diff --git a/test/protojure/protobuf_test.clj b/test/protojure/protobuf_test.clj index 5f95e7d..5a59c9a 100644 --- a/test/protojure/protobuf_test.clj +++ b/test/protojure/protobuf_test.clj @@ -5,7 +5,9 @@ (ns protojure.protobuf-test (:require [clojure.test :refer :all] [clojure.core.async :refer [!! ! go] :as async] - [protojure.protobuf.serdes :as serdes] + [protojure.protobuf.serdes.core :as serdes] + [protojure.protobuf.serdes.complex :as serdes.complex] + [protojure.protobuf.serdes.utils :refer [tag-map]] [protojure.protobuf :refer [->pb]] [protojure.grpc.codec.lpm :as lpm] [protojure.grpc.codec.compression :as compression] @@ -24,7 +26,7 @@ ;;----------------------------------------------------------------------------- (defn- fns [type] - (mapv #(clojure.core/resolve (symbol "protojure.protobuf.serdes" (str % type))) + (mapv #(clojure.core/resolve (symbol "protojure.protobuf.serdes.core" (str % type))) ["write-" "cis->"])) (defn- resolve-fns [type] @@ -57,7 +59,7 @@ (write serdes/write-embedded tag item)) (defn- write-repeated [writefn tag items] - (with-buffer (partial serdes/write-repeated writefn tag items))) + (with-buffer (partial serdes.complex/write-repeated writefn tag items))) (defn- parse [^bytes buf readfn] (let [is (CodedInputStream/newInstance buf)] @@ -66,8 +68,8 @@ (defn- parse-repeated [^bytes buf readfn packable? tag] (let [is (CodedInputStream/newInstance buf) - f (if packable? (partial serdes/cis->packablerepeated tag) serdes/cis->repeated)] - (serdes/tag-map + f (if packable? (partial serdes.complex/cis->packablerepeated tag) serdes.complex/cis->repeated)] + (tag-map (fn [tag index] [index (f readfn is)]) is)))