Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Try to bounds the stream between the receiver and the PACK decoder #608

Merged
merged 2 commits into from
Dec 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 7 additions & 5 deletions src/not-so-smart/fetch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -113,19 +113,21 @@ struct
>>= function
| `Close -> return []
| `Continue res ->
let pack ctx =
let recv_pack ctx =
let open Smart in
let side_band =
Smart.Context.is_cap_shared ctx `Side_band
|| Smart.Context.is_cap_shared ctx `Side_band_64k
in
recv ctx (recv_pack ~side_band ~push_stdout ~push_stderr pack)
recv ctx (recv_pack ~push_stdout ~push_stderr side_band)
in
if res < 0 then Log.warn (fun m -> m "No common commits");
let rec go () =
Log.debug (fun m -> m "Read PACK file.");
Smart_flow.run sched fail io flow (pack ctx) |> prj
>>= fun continue -> if continue then go () else return ()
Smart_flow.run sched fail io flow (recv_pack ctx) |> prj >>= function
| `End_of_transmission -> return ()
| `Payload (str, off, len) -> pack (str, off, len) >>= go
| `Stdout -> go ()
| `Stderr -> go ()
in
Log.debug (fun m -> m "Start to download PACK file.");
go () >>= fun () -> return (List.combine refs uids)
Expand Down
2 changes: 1 addition & 1 deletion src/not-so-smart/fetch.mli
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ module Make
(Uid.t, Uid.t * int ref * int64, 'g) store ->
(Uid.t, _, Uid.t * int ref * int64, 'g, Scheduler.t) access ->
configuration ->
(string * int * int -> unit) ->
(string * int * int -> unit IO.t) ->
(Ref.t * Uid.t) list IO.t
end
21 changes: 8 additions & 13 deletions src/not-so-smart/protocol.ml
Original file line number Diff line number Diff line change
Expand Up @@ -574,37 +574,32 @@ module Decoder = struct
let junk_pack_without_sideband (decoder : decoder) =
decoder.pos <- decoder.max

let decode_pack ?(side_band = false) ~push_pack ~push_stdout ~push_stderr
decoder =
let decode_pack ?(side_band = false) ~push_stdout ~push_stderr decoder =
let with_side_band decoder =
let v = peek_pkt ~trim:false decoder in
match String.Sub.head v with
| Some '\001' ->
let off = String.Sub.start_pos v + 1 in
let len = String.Sub.stop_pos v - off in
let buf = String.Sub.base_string v in
push_pack (buf, off, len);
let str = String.Sub.to_string (String.Sub.tail v) in
junk_pkt decoder;
return true decoder
return (`Payload (str, 0, String.length str)) decoder
| Some '\002' ->
let tail = String.Sub.to_string (String.Sub.tail v) (* copy *) in
push_stdout tail;
junk_pkt decoder;
return true decoder
return `Stdout decoder
| Some '\003' ->
let tail = String.Sub.to_string (String.Sub.tail v) (* copy *) in
push_stderr tail;
junk_pkt decoder;
return true decoder
return `Stderr decoder
| Some _ -> fail decoder (`Invalid_side_band (String.Sub.to_string v))
| None -> return false decoder
| None -> return `End_of_transmission decoder
in
let end_of_pack decoder () = return false decoder in
let end_of_pack decoder () = return `End_of_transmission decoder in
let without_side_band decoder =
let buf, off, len = peek_pack_without_sideband decoder in
push_pack (buf, off, len);
junk_pack_without_sideband decoder;
return true decoder
return (`Payload (buf, off, len)) decoder
in
if side_band then prompt_pkt ~strict:true with_side_band decoder
else prompt_pack_without_sideband without_side_band end_of_pack decoder
Expand Down
8 changes: 6 additions & 2 deletions src/not-so-smart/protocol.mli
Original file line number Diff line number Diff line change
Expand Up @@ -164,11 +164,15 @@ module Decoder : sig

val decode_pack :
?side_band:bool ->
push_pack:(string * int * int -> unit) ->
push_stdout:(string -> unit) ->
push_stderr:(string -> unit) ->
decoder ->
(bool, [> error ]) state
( [ `Payload of string * int * int
| `End_of_transmission
| `Stdout
| `Stderr ],
[> error ] )
state

val decode_negotiation : decoder -> (string Negotiation.t, [> error ]) state
val decode_shallows : decoder -> (string Shallow.t list, [> error ]) state
Expand Down
16 changes: 9 additions & 7 deletions src/not-so-smart/smart.ml
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,14 @@ module Witness = struct
| Commands : (string, string) Commands.t option recv
| Recv_pack : {
side_band : bool;
push_pack : string * int * int -> unit;
push_stdout : string -> unit;
push_stderr : string -> unit;
}
-> bool recv
-> [ `Payload of string * int * int
| `End_of_transmission
| `Stdout
| `Stderr ]
recv
| Ack : string Negotiation.t recv
| Flush : unit recv
| Shallows : string Shallow.t list recv
Expand Down Expand Up @@ -93,8 +96,8 @@ module Value = struct
| Advertised_refs -> decode_advertised_refs decoder
| Result -> decode_result decoder
| Commands -> decode_commands decoder
| Recv_pack { side_band; push_pack; push_stdout; push_stderr } ->
decode_pack ~side_band ~push_pack ~push_stdout ~push_stderr decoder
| Recv_pack { side_band; push_stdout; push_stderr } ->
decode_pack ~side_band ~push_stdout ~push_stderr decoder
| Ack -> decode_negotiation decoder
| Status sideband -> decode_status ~sideband decoder
| Flush -> decode_flush decoder
Expand Down Expand Up @@ -138,9 +141,8 @@ let negotiation_done = Done
let negotiation_result = Result
let commands : _ send = Commands

let recv_pack ?(side_band = false) ?(push_stdout = ignore)
?(push_stderr = ignore) push_pack =
Recv_pack { side_band; push_pack; push_stdout; push_stderr }
let recv_pack ?(push_stdout = ignore) ?(push_stderr = ignore) side_band =
Recv_pack { side_band; push_stdout; push_stderr }

let recv_flush : _ recv = Flush
let status sideband = Status sideband
Expand Down
6 changes: 3 additions & 3 deletions src/not-so-smart/smart.mli
Original file line number Diff line number Diff line change
Expand Up @@ -216,11 +216,11 @@ val advertised_refs : (string, string) Advertised_refs.t recv
val negotiation_result : string Result.t recv

val recv_pack :
?side_band:bool ->
?push_stdout:(string -> unit) ->
?push_stderr:(string -> unit) ->
(string * int * int -> unit) ->
bool recv
bool ->
[ `Payload of string * int * int | `End_of_transmission | `Stdout | `Stderr ]
recv

val recv_flush : unit recv
val recv_commands : (string, string) Commands.t option recv
Expand Down
17 changes: 9 additions & 8 deletions src/not-so-smart/smart_git.ml
Original file line number Diff line number Diff line change
Expand Up @@ -371,10 +371,10 @@ struct
let v = String.sub payload off len in
pack (Some (v, 0, len)))
(fun refs ->
pack None;
pack None >>= fun () ->
Mimic.close flow >>= fun () -> Lwt.return_ok refs)
@@ fun exn ->
pack None;
pack None >>= fun () ->
Mimic.close flow >>= fun () -> Lwt.fail exn

let default_capabilities =
Expand Down Expand Up @@ -420,22 +420,23 @@ struct
in
Mimic.replace git_http_headers headers ctx

let fetch ?(push_stdout = ignore) ?(push_stderr = ignore) ?threads ~ctx
(access, light_load, heavy_load) store edn ?(version = `V1)
let fetch ?(push_stdout = ignore) ?(push_stderr = ignore) ?(bounds = 10)
?threads ~ctx (access, light_load, heavy_load) store edn ?(version = `V1)
?(capabilities = default_capabilities) ?deepen want t_pck t_idx ~src ~dst
~idx =
let open Rresult in
let open Lwt.Infix in
let hostname = edn.Endpoint.hostname in
let path = edn.Endpoint.path in
let stream, pusher = Lwt_stream.create () in
let stream, emitter = Lwt_stream.create_bounded bounds in
let pusher_with_logging = function
| Some (_, _, len) as v ->
| Some (str, off, len) ->
Log.debug (fun m -> m "Download %d byte(s) of the PACK file." len);
pusher v
emitter#push (str, off, len)
| None ->
Log.debug (fun m -> m "End of pack.");
pusher None
emitter#close;
Lwt.return_unit
in
let stream () = Lwt_stream.get stream in
let ctx = Mimic.add git_capabilities `Rd (Endpoint.to_ctx edn ctx) in
Expand Down
1 change: 1 addition & 0 deletions src/not-so-smart/smart_git_intf.ml
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ module type SMART_GIT = sig
val fetch :
?push_stdout:(string -> unit) ->
?push_stderr:(string -> unit) ->
?bounds:int ->
?threads:int ->
ctx:Mimic.ctx ->
(Uid.t, _, Uid.t * int ref * int64, 'g, Scheduler.t) Sigs.access
Expand Down