diff --git a/src/irmin-graphql/irmin_graphql.ml b/src/irmin-graphql/irmin_graphql.ml index 6e3e589aaec..7ec4efd106e 100644 --- a/src/irmin-graphql/irmin_graphql.ml +++ b/src/irmin-graphql/irmin_graphql.ml @@ -1,17 +1,24 @@ open Lwt.Infix module Schema = Graphql_lwt.Schema -module Graphql_server = Graphql_cohttp.Make(Schema)(Cohttp_lwt.Body) module type S = sig + module IO : Cohttp_lwt.S.IO type store type server + type response_action = + [ `Expert of Cohttp.Response.t + * (IO.ic + -> IO.oc + -> unit Lwt.t) + | `Response of Cohttp.Response.t * Cohttp_lwt.Body.t ] + val schema : store -> unit Schema.schema val execute_request : unit Schema.schema -> Cohttp_lwt.Request.t -> - Cohttp_lwt.Body.t -> (Cohttp.Response.t * Cohttp_lwt.Body.t) Lwt.t + Cohttp_lwt.Body.t -> response_action Lwt.t val server : store -> server end @@ -30,7 +37,16 @@ module type CONFIG = sig end module Make(Server: Cohttp_lwt.S.Server)(Config: CONFIG)(Store : Irmin.S) = struct + module IO = Server.IO module Sync = Irmin.Sync (Store) + module Graphql_server = Graphql_cohttp.Make(Schema)(IO)(Cohttp_lwt.Body) + + type response_action = + [ `Expert of Cohttp.Response.t + * (IO.ic + -> IO.oc + -> unit Lwt.t) + | `Response of Cohttp.Response.t * Cohttp_lwt.Body.t ] type tree_item = { key: Store.key; @@ -542,9 +558,60 @@ module Make(Server: Cohttp_lwt.S.Server)(Config: CONFIG)(Store : Irmin.S) = stru ; ] + let diff = Schema.(obj "Diff" + ~fields:(fun _ -> [ + field "commit" + ~typ:(non_null Lazy.(force commit)) + ~args:[] + ~resolve:(fun _ctx -> function + | `Added c + | `Removed c + | `Updated (_, c) -> c + ) + ]) + ) + + let map_diff diff ~added ~removed ~updated = + match diff with + | `Added x -> `Added (added x) + | `Removed x -> `Removed (removed x) + | `Updated (x, y) -> `Updated (updated x y) + + let subscriptions s = Schema.[ + subscription_field "watch" + ~typ:(non_null diff) + ~args:Arg.[ + arg "key" ~typ:Input.key + ] + ~resolve:(fun _ctx key -> + let stream, push = Lwt_stream.create () in + let destroy_stream watch () = + push None; + Lwt.ignore_result (Store.unwatch watch) + in + match key with + | None -> + Store.watch s (fun diff -> + push (Some diff); + Lwt.return () + ) >|= fun watch -> + Ok (stream, destroy_stream watch) + | Some key -> + Store.watch_key s key (function diff -> + push (Some (map_diff diff + ~added:(fun (c, _) -> c) + ~removed:(fun (c, _) -> c) + ~updated:(fun (before, _) (after, _) -> before, after))); + Lwt.return () + ) >|= fun watch -> + Ok (stream, destroy_stream watch) + ) + ] + let schema s = let mutations = mutations s @ remote s in - Schema.(schema ~mutations [ + let subscriptions = subscriptions s in + Schema.(schema ~mutations ~subscriptions [ io_field "commit" ~typ:(Lazy.force commit) ~args:Arg.[ @@ -583,5 +650,5 @@ module Make(Server: Cohttp_lwt.S.Server)(Config: CONFIG)(Store : Irmin.S) = stru let server store = let schema = schema store in let callback = Graphql_server.make_callback (fun _ctx -> ()) schema in - Server.make ~callback () + Server.make_response_action ~callback () end diff --git a/src/irmin-graphql/irmin_graphql.mli b/src/irmin-graphql/irmin_graphql.mli index becad0ff5d6..49042aaf613 100644 --- a/src/irmin-graphql/irmin_graphql.mli +++ b/src/irmin-graphql/irmin_graphql.mli @@ -1,14 +1,22 @@ module Schema : Graphql_intf.Schema with type 'a Io.t = 'a Lwt.t module type S = sig + module IO : Cohttp_lwt.S.IO type store type server + type response_action = + [ `Expert of Cohttp.Response.t + * (IO.ic + -> IO.oc + -> unit Lwt.t) + | `Response of Cohttp.Response.t * Cohttp_lwt.Body.t ] + val schema : store -> unit Schema.schema val execute_request : unit Schema.schema -> Cohttp_lwt.Request.t -> - Cohttp_lwt.Body.t -> (Cohttp.Response.t * Cohttp_lwt.Body.t) Lwt.t + Cohttp_lwt.Body.t -> response_action Lwt.t val server : store -> server end