diff --git a/.merlin b/.merlin index 9e9ca9ac8..4d6a183b0 100644 --- a/.merlin +++ b/.merlin @@ -1,4 +1,6 @@ -PKG lwt ipaddr lwt mirage-types cstruct io-page uint mirage-flow oUnit alcotest mirage-vnetif pcap-format +PKG lwt ipaddr lwt mirage-types cstruct io-page uint mirage-flow oUnit alcotest +PKG mirage-vnetif pcap-format mirage-console.unix + B _build/** S lib/ S tcp/ diff --git a/_oasis b/_oasis index 09848aaf9..923c35390 100644 --- a/_oasis +++ b/_oasis @@ -71,7 +71,8 @@ Library tcp Findlibparent: tcpip Findlibname: tcp Modules: Options,Wire,State,Tcptimer,Sequence,Ack, - Window,Segment,User_buffer,Pcb,Flow + Window,Segment,User_buffer,Pcb,Flow, + Stats, Log BuildDepends: io-page, mirage-types, ipaddr, @@ -311,4 +312,4 @@ Executable test Test test Run$: flag(tests) - Command: $test + Command: $test -q diff --git a/_tags b/_tags index d7393fb44..79fadad8a 100644 --- a/_tags +++ b/_tags @@ -1,5 +1,5 @@ # OASIS_START -# DO NOT EDIT (digest: 60995254e5001cb612a968dd205139c1) +# DO NOT EDIT (digest: 20191569a0d28c34718dadfefb87302c) # Ignore VCS directories, you can use the same kind of rule outside # OASIS_START/STOP if you want to exclude directories that contains # useless stuff for the build process @@ -51,6 +51,8 @@ true: annot, bin_annot "tcp/user_buffer.cmx": for-pack(Tcp) "tcp/pcb.cmx": for-pack(Tcp) "tcp/flow.cmx": for-pack(Tcp) +"tcp/stats.cmx": for-pack(Tcp) +"tcp/log.cmx": for-pack(Tcp) : pkg_bytes : pkg_cstruct : pkg_io-page diff --git a/channel/channel.ml b/channel/channel.ml index 1982d7978..f1205e9d6 100644 --- a/channel/channel.ml +++ b/channel/channel.ml @@ -16,7 +16,7 @@ (** Buffered reading and writing over the Flow API *) -open Lwt +open Lwt.Infix module Make(Flow:V1_LWT.FLOW) = struct @@ -53,20 +53,20 @@ module Make(Flow:V1_LWT.FLOW) = struct buffers, this will be violated causing Channel users to see Cstruct exceptions *) t.ibuf <- Some buf; - return_unit + Lwt.return_unit | `Error e -> - fail (Read_error e) + Lwt.fail (Read_error e) | `Eof -> (* close the flow before throwing exception; otherwise it will never be GC'd *) Flow.close t.flow >>= fun () -> - fail End_of_file + Lwt.fail End_of_file let rec get_ibuf t = match t.ibuf with | None -> ibuf_refill t >>= fun () -> get_ibuf t | Some buf when Cstruct.len buf = 0 -> ibuf_refill t >>= fun () -> get_ibuf t - | Some buf -> return buf + | Some buf -> Lwt.return buf (* Read one character from the input channel *) let read_char t = @@ -75,7 +75,7 @@ module Make(Flow:V1_LWT.FLOW) = struct let c = Cstruct.get_char buf 0 in t.ibuf <- Some (Cstruct.shift buf 1); (* advance read buffer, possibly to EOF *) - return c + Lwt.return c (* Read up to len characters from the input channel and at most a full view. If not specified, read all *) @@ -88,10 +88,10 @@ module Make(Flow:V1_LWT.FLOW) = struct let hd,tl = Cstruct.split buf len in t.ibuf <- Some tl; (* leave some in the buffer; next time, we won't do a blocking read *) - return hd + Lwt.return hd end else begin t.ibuf <- None; - return buf + Lwt.return buf end (* Read up to len characters from the input channel as a @@ -99,10 +99,12 @@ module Make(Flow:V1_LWT.FLOW) = struct let read_stream ?len t = Lwt_stream.from (fun () -> Lwt.catch - (fun () -> read_some ?len t >>= fun v -> return (Some v)) - (function End_of_file -> return_none | e -> fail e) + (fun () -> read_some ?len t >>= fun v -> Lwt.return (Some v)) + (function End_of_file -> Lwt.return_none | e -> Lwt.fail e) ) + let zero = Cstruct.create 0 + (* Read until a character is found *) let read_until t ch = Lwt.catch @@ -116,12 +118,12 @@ module Make(Flow:V1_LWT.FLOW) = struct match scan 0 with | None -> (* not found, return what we have until EOF *) t.ibuf <- None; (* basically guaranteeing that next read is EOF *) - return (false, buf) + Lwt.return (false, buf) | Some off -> (* found, so split the buffer *) let hd = Cstruct.sub buf 0 off in t.ibuf <- Some (Cstruct.shift buf (off+1)); - return (true, hd)) - (function End_of_file -> return (false, Cstruct.create 0) | e -> fail e) + Lwt.return (true, hd)) + (function End_of_file -> Lwt.return (false, zero) | e -> Lwt.fail e) (* This reads a line of input, which is terminated either by a CRLF sequence, or the end of the channel (which counts as a line). @@ -130,7 +132,7 @@ module Make(Flow:V1_LWT.FLOW) = struct let rec get acc = read_until t '\n' >>= function |(false, v) -> - if Cstruct.len v = 0 then return (v :: acc) else get (v :: acc) + if Cstruct.len v = 0 then Lwt.return (v :: acc) else get (v :: acc) |(true, v) -> begin (* chop the CR if present *) let vlen = Cstruct.len v in @@ -138,7 +140,7 @@ module Make(Flow:V1_LWT.FLOW) = struct if vlen > 0 && (Cstruct.get_char v (vlen-1) = '\r') then Cstruct.sub v 0 (vlen-1) else v in - return (v :: acc) + Lwt.return (v :: acc) end in get [] >|= List.rev @@ -208,9 +210,9 @@ module Make(Flow:V1_LWT.FLOW) = struct let l = List.rev t.obufq in t.obufq <- []; Flow.writev t.flow l >>= function - | `Ok () -> Lwt.return_unit - | `Error e -> fail (Write_error e) - | `Eof -> fail End_of_file + | `Ok () -> Lwt.return_unit + | `Error e -> Lwt.fail (Write_error e) + | `Eof -> Lwt.fail End_of_file let close t = Lwt.finalize (fun () -> flush t) (fun () -> Flow.close t.flow) diff --git a/dhcp/dhcp_clientv4.ml b/dhcp/dhcp_clientv4.ml index c41834fec..701810f1d 100644 --- a/dhcp/dhcp_clientv4.ml +++ b/dhcp/dhcp_clientv4.ml @@ -15,7 +15,7 @@ * *) -open Lwt +open Lwt.Infix open Printf module Make (Console : V1_LWT.CONSOLE) @@ -182,9 +182,9 @@ module Make (Console : V1_LWT.CONSOLE) end |_ -> Console.log_s t.c "DHCP: ack not for us" end - | Shutting_down -> return_unit - | Lease_held _ -> Console.log_s t.c "DHCP input: lease already held" - | Disabled -> Console.log_s t.c "DHCP input: disabled" + | Shutting_down -> Lwt.return_unit + | Lease_held _ -> Console.log_s t.c "DHCP input: lease already held" + | Disabled -> Console.log_s t.c "DHCP input: disabled" (* Start a DHCP discovery off on an interface *) let start_discovery t = @@ -201,7 +201,7 @@ module Make (Console : V1_LWT.CONSOLE) >>= fun () -> t.state <- Request_sent xid; output_broadcast t ~xid ~yiaddr ~siaddr ~options >>= fun () -> - return_unit + Lwt.return_unit (* DHCP state thred *) let rec dhcp_thread t = @@ -234,7 +234,7 @@ module Make (Console : V1_LWT.CONSOLE) (String.concat ", " (List.map Ipaddr.V4.to_string info.gateways))) >>= fun () -> offer_push (Some info); - return_unit + Lwt.return_unit in let t = { c; mac; udp; state; new_offer } in (* TODO cancellation *) diff --git a/lib/arpv4.ml b/lib/arpv4.ml index c4def96c0..41179a83e 100644 --- a/lib/arpv4.ml +++ b/lib/arpv4.ml @@ -15,7 +15,7 @@ * *) -open Lwt +open Lwt.Infix open Printf module Make (Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = struct @@ -120,7 +120,7 @@ module Make (Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = str let spa = Ipaddr.V4.of_int32 (get_arp_tpa frame) in (* the requested address *) let tpa = Ipaddr.V4.of_int32 (get_arp_spa frame) in (* the requesting host IPv4 *) output t { op=`Reply; sha; tha; spa; tpa } - end else return_unit + end else Lwt.return_unit |2 -> (* Reply *) let spa = Ipaddr.V4.of_int32 (get_arp_spa frame) in let sha = Macaddr.of_bytes_exn (copy_arp_sha frame) in @@ -128,10 +128,10 @@ module Make (Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = str (Ipaddr.V4.to_string spa) (Macaddr.to_string sha); (* If we have pending entry, notify the waiters that answer is ready *) notify t spa sha; - return_unit + Lwt.return_unit |n -> printf "ARP: Unknown message %d ignored\n%!" n; - return_unit + Lwt.return_unit and output t arp = (* Obtain a buffer to write into *) @@ -193,12 +193,12 @@ module Make (Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = str let add_ip t ip = if not (List.mem ip t.bound_ips) then set_ips t (ip :: t.bound_ips) - else return_unit + else Lwt.return_unit let remove_ip t ip = if List.mem ip t.bound_ips then set_ips t (List.filter ((<>) ip) t.bound_ips) - else return_unit + else Lwt.return_unit (* Query the cache for an ARP entry, which may result in the sender sleeping waiting for a response *) diff --git a/lib/ethif.ml b/lib/ethif.ml index 0aba469cf..8bfd43bbd 100644 --- a/lib/ethif.ml +++ b/lib/ethif.ml @@ -15,7 +15,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. * *) -open Lwt +open Lwt.Infix module Make(Netif : V1_LWT.NETWORK) = struct @@ -42,25 +42,23 @@ module Make(Netif : V1_LWT.NETWORK) = struct MProf.Trace.label "ethif.input"; let frame_mac = Macaddr.of_bytes (Wire_structs.copy_ethernet_dst frame) in match frame_mac with - | None -> return_unit - | Some frame_mac -> begin - if (((Macaddr.compare frame_mac (mac t)) == 0) || (not (Macaddr.is_unicast frame_mac))) then - match Wire_structs.get_ethernet_ethertype frame with - | 0x0806 -> - arpv4 frame (* ARP *) - | 0x0800 -> (* IPv4 *) - let payload = Cstruct.shift frame Wire_structs.sizeof_ethernet in - ipv4 payload - | 0x86dd -> - let payload = Cstruct.shift frame Wire_structs.sizeof_ethernet in - ipv6 payload - | _etype -> - let _payload = Cstruct.shift frame Wire_structs.sizeof_ethernet in - (* TODO default etype payload *) - return_unit - else - return_unit - end + | None -> Lwt.return_unit + | Some frame_mac -> + if Macaddr.compare frame_mac (mac t) = 0 + || not (Macaddr.is_unicast frame_mac) + then match Wire_structs.get_ethernet_ethertype frame with + | 0x0806 -> arpv4 frame (* ARP *) + | 0x0800 -> (* IPv4 *) + let payload = Cstruct.shift frame Wire_structs.sizeof_ethernet in + ipv4 payload + | 0x86dd -> + let payload = Cstruct.shift frame Wire_structs.sizeof_ethernet in + ipv6 payload + | _etype -> + let _payload = Cstruct.shift frame Wire_structs.sizeof_ethernet in + (* TODO default etype payload *) + Lwt.return_unit + else Lwt.return_unit let write t frame = MProf.Trace.label "ethif.write"; @@ -72,7 +70,7 @@ module Make(Netif : V1_LWT.NETWORK) = struct let connect netif = MProf.Trace.label "ethif.connect"; - return (`Ok { netif }) + Lwt.return (`Ok { netif }) - let disconnect _ = return_unit + let disconnect _ = Lwt.return_unit end diff --git a/lib/ipv4.ml b/lib/ipv4.ml index 399481ef8..dab39326f 100644 --- a/lib/ipv4.ml +++ b/lib/ipv4.ml @@ -14,7 +14,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt +open Lwt.Infix open Printf module Make(Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = struct @@ -71,14 +71,14 @@ module Make(Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = stru let destination_mac t = function |ip when ip = Ipaddr.V4.broadcast || ip = Ipaddr.V4.any -> (* Broadcast *) - return Macaddr.broadcast + Lwt.return Macaddr.broadcast |ip when is_local t ip -> (* Local *) Arpv4.query t.arp ip >>= begin function | `Ok mac -> Lwt.return mac | `Timeout -> Lwt.fail (No_route_to_destination_address ip) end |ip when Ipaddr.V4.is_multicast ip -> - return (mac_of_multicast ip) + Lwt.return (mac_of_multicast ip) |ip -> begin (* Gateway *) match t.gateways with |hd::_ -> @@ -86,11 +86,11 @@ module Make(Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = stru | `Ok mac -> Lwt.return mac | `Timeout -> printf "IP.output: arp timeout to gw %s\n%!" (Ipaddr.V4.to_string ip); - fail (No_route_to_destination_address ip) + Lwt.fail (No_route_to_destination_address ip) end |[] -> printf "IP.output: no route to %s\n%!" (Ipaddr.V4.to_string ip); - fail (No_route_to_destination_address ip) + Lwt.fail (No_route_to_destination_address ip) end end @@ -155,13 +155,14 @@ module Make(Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = stru | 15 -> "Precedence cutoff in effect" | code -> Printf.sprintf "Unknown code: %d" code in printf "ICMP Destination Unreachable: %s\n%!" descr; - return () + Lwt.return_unit let icmp_input t src _hdr buf = MProf.Trace.label "icmp_input"; match Wire_structs.Ipv4_wire.get_icmpv4_ty buf with |0 -> (* echo reply *) - return (printf "ICMP: discarding echo reply\n%!") + printf "ICMP: discarding echo reply\n%!"; + Lwt.return_unit |3 -> icmp_dst_unreachable buf |8 -> (* echo request *) (* convert the echo request into an echo reply *) @@ -177,7 +178,7 @@ module Make(Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = stru write t frame buf |ty -> printf "ICMP unknown ty %d\n" ty; - return_unit + Lwt.return_unit let input t ~tcp ~udp ~default buf = (* buf pointers to start of IPv4 header here *) @@ -195,7 +196,7 @@ module Make(Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = stru | Some `TCP -> tcp ~src ~dst data | Some `UDP -> udp ~src ~dst data | None -> default ~proto ~src ~dst data - end else return_unit + end else Lwt.return_unit let connect ethif = let ip = Ipaddr.V4.any in @@ -203,9 +204,9 @@ module Make(Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = stru let gateways = [] in let arp = Arpv4.create ethif in let t = { ethif; arp; ip; netmask; gateways } in - return (`Ok t) + Lwt.return (`Ok t) - let disconnect _ = return_unit + let disconnect _ = Lwt.return_unit let set_ip t ip = t.ip <- ip; @@ -216,13 +217,13 @@ module Make(Ethif : V1_LWT.ETHIF) (Clock : V1.CLOCK) (Time : V1_LWT.TIME) = stru let set_ip_netmask t netmask = t.netmask <- netmask; - return_unit + Lwt.return_unit let get_ip_netmasks t = [t.netmask] let set_ip_gateways t gateways = t.gateways <- gateways; - return_unit + Lwt.return_unit let get_ip_gateways { gateways; _ } = gateways diff --git a/lib/ipv6.ml b/lib/ipv6.ml index ba4f1180d..7b34b89c4 100644 --- a/lib/ipv6.ml +++ b/lib/ipv6.ml @@ -887,8 +887,7 @@ let add_routers ~now state ips = let get_routers state = RouterList.to_list state.router_list -let (>>=) = Lwt.(>>=) -let (>|=) = Lwt.(>|=) +open Lwt.Infix module Make (E : V1_LWT.ETHIF) (T : V1_LWT.TIME) (C : V1.CLOCK) = struct type ethif = E.t diff --git a/lib/tcpip_stack_direct.ml b/lib/tcpip_stack_direct.ml index dd9d5e7d6..06ed9ee6f 100644 --- a/lib/tcpip_stack_direct.ml +++ b/lib/tcpip_stack_direct.ml @@ -14,7 +14,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt +open Lwt.Infix type direct_ipv4_input = src:Ipaddr.V4.t -> dst:Ipaddr.V4.t -> Cstruct.t -> unit Lwt.t module type UDPV4_DIRECT = V1_LWT.UDPV4 @@ -83,8 +83,8 @@ struct Ipv4.set_ip t.ipv4 info.Dhcp.ip_addr >>= fun () -> (match info.Dhcp.netmask with - |Some nm -> Ipv4.set_ip_netmask t.ipv4 nm - |None -> return_unit) + | Some nm -> Ipv4.set_ip_netmask t.ipv4 nm + | None -> Lwt.return_unit) >>= fun () -> Ipv4.set_ip_gateways t.ipv4 info.Dhcp.gateways >>= fun () -> @@ -135,9 +135,9 @@ struct ~listeners:(tcpv4_listeners t)) ~udp:(Udpv4.input t.udpv4 ~listeners:(udpv4_listeners t)) - ~default:(fun ~proto:_ ~src:_ ~dst:_ _ -> return_unit) + ~default:(fun ~proto:_ ~src:_ ~dst:_ _ -> Lwt.return_unit) t.ipv4) - ~ipv6:(fun _ -> return_unit) + ~ipv6:(fun _ -> Lwt.return_unit) t.ethif) let connect id ethif ipv4 udpv4 tcpv4 = @@ -159,9 +159,8 @@ struct to spawn a background thread, but we need to consider how to inform the application stack that the IP address has changed (perhaps via a control Lwt_stream that the application can ignore if it doesn't care). *) - Console.log_s t.c "Manager: configuration done" - >>= fun () -> - return (`Ok t) + Console.log_s t.c "Manager: configuration done" >>= fun () -> + Lwt.return (`Ok t) let disconnect t = (* TODO: kill the listening thread *) diff --git a/lib/udp.ml b/lib/udp.ml index 290cef68f..0675bd5d8 100644 --- a/lib/udp.ml +++ b/lib/udp.ml @@ -14,7 +14,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt +open Lwt.Infix module Make(Ip: V1_LWT.IP) = struct @@ -44,15 +44,15 @@ module Make(Ip: V1_LWT.IP) = struct (Wire_structs.get_udp_length buf - Wire_structs.sizeof_udp) in match listeners ~dst_port with - | None -> return_unit + | None -> Lwt.return_unit | Some fn -> let src_port = Wire_structs.get_udp_source_port buf in fn ~src ~dst ~src_port data let writev ?source_port ~dest_ip ~dest_port t bufs = begin match source_port with - | None -> fail (Failure "TODO; random source port") - | Some p -> return p + | None -> Lwt.fail (Failure "TODO; random source port") + | Some p -> Lwt.return p end >>= fun source_port -> let frame, header_len = Ip.allocate_frame t.ip ~dst:dest_ip ~proto:`UDP in let frame = Cstruct.set_len frame (header_len + Wire_structs.sizeof_udp) in @@ -68,8 +68,7 @@ module Make(Ip: V1_LWT.IP) = struct let write ?source_port ~dest_ip ~dest_port t buf = writev ?source_port ~dest_ip ~dest_port t [buf] - let connect ip = - return (`Ok { ip }) + let connect ip = Lwt.return (`Ok { ip }) - let disconnect _ = return_unit + let disconnect _ = Lwt.return_unit end diff --git a/lib_test/common.ml b/lib_test/common.ml index 5551064f6..d5c59037a 100644 --- a/lib_test/common.ml +++ b/lib_test/common.ml @@ -1,16 +1,21 @@ -open Lwt - -let cmp a b = String.compare a b = 0 +let (>>=) = Lwt.(>>=) let fail fmt = Printf.ksprintf OUnit.assert_failure fmt -let expect msg expected actual = - (if not (cmp expected actual) then - fail "Expected '%s', got '%s': %s" expected actual msg) ; - Lwt.return_unit - let or_error name fn t = fn t >>= function | `Error e -> fail "or_error starting %s" name - | `Ok t -> return t + | `Ok t -> Lwt.return t + +let assert_string msg a b = + let cmp a b = String.compare a b = 0 in + OUnit.assert_equal ~msg ~printer:(fun x -> x) ~cmp a b + +let assert_cstruct msg a b = + OUnit.assert_equal ~msg ~printer:Cstruct.to_string ~cmp:Cstruct.equal a b + +let assert_bool msg a b = + OUnit.assert_equal ~msg ~printer:string_of_bool a b +let assert_int msg a b = + OUnit.assert_equal ~msg ~printer:string_of_int a b diff --git a/lib_test/test.ml b/lib_test/test.ml index 243e7a2f7..d92abacbe 100644 --- a/lib_test/test.ml +++ b/lib_test/test.ml @@ -17,7 +17,7 @@ let suite = [ "channel", Test_channel.suite ; "connect", Test_connect.suite ; - "iperf", Test_iperf.suite ; + "iperf" , Test_iperf.suite ; ] let run test () = @@ -25,7 +25,7 @@ let run test () = let () = let suite = List.map (fun (n, s) -> - n, List.map (fun (d, f) -> d, `Quick, run f) s + n, List.map (fun (d, s, f) -> d, s, run f) s ) suite in - Alcotest.run "irmin" suite + Alcotest.run "tcpip" suite diff --git a/lib_test/test_channel.ml b/lib_test/test_channel.ml index 27eeb27d3..013ae872c 100644 --- a/lib_test/test_channel.ml +++ b/lib_test/test_channel.ml @@ -1,30 +1,28 @@ -open Lwt +open Common -(* this is a very small set of tests for the channel interface, intended to - ensure that EOF conditions on the underlying flow are handled properly *) +let (>>=) = Lwt.(>>=) + +(* this is a very small set of tests for the channel interface, + intended to ensure that EOF conditions on the underlying flow are + handled properly *) module Channel = Channel.Make(Fflow) -let cmp a b = - match (String.compare a b) with | 0 -> true | _ -> false +let err_read ch = + fail "character %c was returned from Channel.read_char on an empty flow" ch -let fail fmt = Printf.ksprintf OUnit.assert_failure fmt +let err_no_exception () = fail "no exception" +let err_wrong_exception e = fail "wrong exception: %s" (Printexc.to_string e) let test_read_char_eof () = let f = Fflow.make () in let c = Channel.create f in - let try_char_read () = - Channel.read_char c >>= fun ch -> - fail "character %c was returned from Channel.read_char on an empty flow" ch - in + let try_char_read () = Channel.read_char c >>= err_read in Lwt.try_bind (try_char_read) - (fun () -> fail "no exception") (* "success" case (no exceptions) *) + err_no_exception (* "success" case (no exceptions) *) (function | End_of_file -> Lwt.return_unit - | e -> fail "wrong exception: %s" (Printexc.to_string e)) - -let check a b = - OUnit.assert_equal ~printer:(fun a -> a) ~cmp a (Cstruct.to_string b) + | e -> err_wrong_exception e) let test_read_until_eof () = let input = @@ -34,15 +32,17 @@ let test_read_until_eof () = let c = Channel.create f in Channel.read_until c 'v' >>= function | true, buf -> - check "I am the " buf; + assert_cstruct "wrong flow prefix" + (Cstruct.of_string "I am the ") buf; Channel.read_until c '\xff' >>= fun (found, buf) -> - OUnit.assert_equal ~msg:"claimed we found a char that couldn't have been - there in read_until" false found; - check "ery model of a modern major general" buf; + assert_bool "found a char that couldn't have been there in read_until" + false found; + assert_cstruct "wrong flow suffix" + (Cstruct.of_string "ery model of a modern major general") buf; Channel.read_until c '\n' >>= fun (found, buf) -> - OUnit.assert_equal ~msg:"claimed we found a char after EOF in read_until" + assert_bool "found a char after EOF in read_until" false found; - OUnit.assert_equal ~printer:string_of_int 0 (Cstruct.len buf); + assert_int "wrong flow size" 0 (Cstruct.len buf); Lwt.return_unit | false, _ -> OUnit.assert_failure "thought we couldn't find a 'v' in input test" @@ -52,11 +52,11 @@ let test_read_line () = let f = Fflow.make ~input:(Fflow.input_string input) () in let c = Channel.create f in Channel.read_line c >>= fun buf -> - check input (Cstruct.of_string (Cstruct.copyv buf)); + assert_string "read line" input (Cstruct.copyv buf); Lwt.return_unit let suite = [ - "read_char + EOF" , test_read_char_eof; - "read_until + EOF", test_read_until_eof; - "read_line" , test_read_line; + "read_char + EOF" , `Quick, test_read_char_eof; + "read_until + EOF", `Quick, test_read_until_eof; + "read_line" , `Quick, test_read_line; ] diff --git a/lib_test/test_connect.ml b/lib_test/test_connect.ml index 381bd0ddf..3cd7a9cad 100644 --- a/lib_test/test_connect.ml +++ b/lib_test/test_connect.ml @@ -14,29 +14,47 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt open Common open Vnetif_common +let (>>=) = Lwt.(>>=) + module Test_connect (B : Vnetif_backends.Backend) = struct module C = Console module V = VNETIF_STACK (B) - let netmask = Ipaddr.V4.of_string_exn "255.255.255.0" - let gw = Ipaddr.V4.of_string_exn "10.0.0.1" + let netmask = Ipaddr.V4.of_string_exn "255.255.255.0" + let gw = Ipaddr.V4.of_string_exn "10.0.0.1" let client_ip = Ipaddr.V4.of_string_exn "10.0.0.101" let server_ip = Ipaddr.V4.of_string_exn "10.0.0.100" let test_string = "Hello world from Mirage 123456789...." let backend = V.create_backend () + let log_s c fmt = Printf.ksprintf (C.log_s c) (fmt ^^ "%!") + + let err_read_eof () = fail "accept got EOF while reading" + let err_write_eof () = fail "client tried to write, got EOF" + + let err_read e = + let err = V.Stackv4.TCPV4.error_message e in + fail "Error while reading: %s" err + + let err_write e = + let err = V.Stackv4.TCPV4.error_message e in + fail "client tried to write, got %s" err + let accept c flow expected = let ip, port = V.Stackv4.TCPV4.get_dest flow in - C.log_s c (Printf.sprintf "Accepted connection from %s:%d%!" (Ipaddr.V4.to_string ip) port) >>= fun () -> - V.Stackv4.TCPV4.read flow >>= (function - | `Ok b -> OS.Time.sleep 0.1 >>= fun () -> expect "accept" expected (Cstruct.to_string b) (* sleep first to capture data in pcap *) - | `Eof | `Error _ -> fail "Error while reading%!") + log_s c "Accepted connection from %s:%d" (Ipaddr.V4.to_string ip) port >>= fun () -> - C.log_s c "Connection closed%!" + V.Stackv4.TCPV4.read flow >>= function + | `Eof -> err_read_eof () + | `Error e -> err_read e + | `Ok b -> + OS.Time.sleep 0.1 >>= fun () -> + (* sleep first to capture data in pcap *) + assert_string "accept" expected (Cstruct.to_string b); + log_s c "Connection closed" let test_tcp_connect_two_stacks () = or_error "console" Console.connect "console" >>= fun c -> @@ -51,32 +69,43 @@ module Test_connect (B : Vnetif_backends.Backend) = struct (Lwt_unix.sleep 0.1 >>= fun () -> V.create_stack c backend client_ip netmask [gw] >>= fun s2 -> - or_error "connect" (V.Stackv4.TCPV4.create_connection (V.Stackv4.tcpv4 s2)) (server_ip, 80) >>= fun flow -> - C.log_s c "Connected to other end...%!" >>= fun () -> - V.Stackv4.TCPV4.write flow (Cstruct.of_string test_string) >>= (function - | `Ok () -> C.log_s c "wrote hello world%!" - | `Error _ -> fail "client tried to write, got error%!" - | `Eof -> fail "client tried to write, got eof%!") >>= fun () -> - V.Stackv4.TCPV4.close flow >>= fun () -> - Lwt_unix.sleep 1.0 >>= fun () -> (* record some traffic after close *) - Lwt.return_unit) ] >>= fun () -> + let conn = V.Stackv4.TCPV4.create_connection (V.Stackv4.tcpv4 s2) in + or_error "connect" conn (server_ip, 80) >>= fun flow -> + log_s c "Connected to other end..." >>= fun () -> + V.Stackv4.TCPV4.write flow (Cstruct.of_string test_string) >>= function + | `Error e -> err_write e + | `Eof -> err_write_eof () + | `Ok () -> + log_s c "wrote hello world" >>= fun () -> + V.Stackv4.TCPV4.close flow >>= fun () -> + Lwt_unix.sleep 1.0 >>= fun () -> (* record some traffic after close *) + Lwt.return_unit) ] >>= fun () -> Lwt.return_unit let record_pcap = - V.record_pcap backend + V.record_pcap backend end let test_tcp_connect_two_stacks_basic () = let module Test = Test_connect(Vnetif_backends.Basic) in - Test.record_pcap "tests/pcap/tcp_connect_two_stacks_basic.pcap" Test.test_tcp_connect_two_stacks + Test.record_pcap + "tests/pcap/tcp_connect_two_stacks_basic.pcap" + Test.test_tcp_connect_two_stacks let test_tcp_connect_two_stacks_trailing_bytes () = let module Test = Test_connect(Vnetif_backends.Trailing_bytes) in - Test.record_pcap "tests/pcap/tcp_connect_two_stacks_trailing_bytes.pcap" Test.test_tcp_connect_two_stacks + Test.record_pcap + "tests/pcap/tcp_connect_two_stacks_trailing_bytes.pcap" + Test.test_tcp_connect_two_stacks let suite = [ - "test_tcp_connect_two_stacks_basic" , test_tcp_connect_two_stacks_basic; - "test_tcp_connect_two_stacks_trailing_bytes" , test_tcp_connect_two_stacks_trailing_bytes; + + "connect two stacks, basic test", `Quick, + test_tcp_connect_two_stacks_basic; + + "connect two stacks, with trailing bytes", `Quick, + test_tcp_connect_two_stacks_trailing_bytes; + ] diff --git a/lib_test/test_iperf.ml b/lib_test/test_iperf.ml index 961a10da9..29a2836c8 100644 --- a/lib_test/test_iperf.ml +++ b/lib_test/test_iperf.ml @@ -16,18 +16,19 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt open Common open Vnetif_common +let (>>=) = Lwt.(>>=) + +module Test_iperf (B : Vnetif_backends.Backend) = struct -module Test_iperf ( B : Vnetif_backends.Backend ) = struct module C = Console module V = VNETIF_STACK (B) let backend = V.create_backend () - let netmask = Ipaddr.V4.of_string_exn "255.255.255.0" - let gw = Ipaddr.V4.of_string_exn "10.0.0.1" + let netmask = Ipaddr.V4.of_string_exn "255.255.255.0" + let gw = Ipaddr.V4.of_string_exn "10.0.0.1" let client_ip = Ipaddr.V4.of_string_exn "10.0.0.101" let server_ip = Ipaddr.V4.of_string_exn "10.0.0.100" @@ -40,27 +41,63 @@ module Test_iperf ( B : Vnetif_backends.Backend ) = struct mutable last_time: float; } - let msg = "01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890" + let msg = + "01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890\ + abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijk\ + lmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuv\ + wxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFG\ + HIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQR\ + STUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ012\ + 34567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abc\ + defghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmn\ + opqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxy\ + zABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJ\ + KLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTU\ + VWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ012345\ + 67890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdef\ + ghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopq\ + rstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzAB\ + CDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLM\ + NOPQRSTUVWXYZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWX\ + YZ01234567890abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ012345678\ + 90abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890abcdefghi\ + jklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ01234567890" let mlen = String.length msg + let err_eof () = fail "EOF while writing to TCP flow" + + let err_connect e ip port () = + let err = V.Stackv4.TCPV4.error_message e in + let ip = Ipaddr.V4.to_string ip in + fail "Unable to connect to %s:%d: %s" ip port err + + let err_write e () = + let err = V.Stackv4.TCPV4.error_message e in + fail "Error while writing to TCP flow: %s" err + + let err_read e () = + let err = V.Stackv4.TCPV4.error_message e in + fail "Error in server while reading: %s" err + + let log_s c fmt = Printf.ksprintf (C.log_s c) (fmt ^^ "%!") + let write_and_check flow buf = V.Stackv4.TCPV4.write flow buf >>= function - | `Ok () -> Lwt.return_unit - | `Eof -> V.Stackv4.TCPV4.close flow >>= fun () -> raise (Failure "EOF while writing to TCP flow") - | `Error _ -> V.Stackv4.TCPV4.close flow >>= fun () -> raise (Failure "Error while writing to TCP flow") + | `Ok () -> Lwt.return_unit + | `Eof -> V.Stackv4.TCPV4.close flow >>= err_eof + | `Error e -> V.Stackv4.TCPV4.close flow >>= err_write e let tcp_connect t (ip, port) = V.Stackv4.TCPV4.create_connection t (ip, port) >>= function - | `Error e -> raise (Failure (Printf.sprintf "Unable to connect to %s:%d" (Ipaddr.V4.to_string ip) port)) - | `Ok f -> Lwt.return f + | `Error e -> err_connect e ip port () + | `Ok f -> Lwt.return f - let iperfclient c s dest_ip dport = + let iperfclient c s amt dest_ip dport = let iperftx flow = - C.log_s c (Printf.sprintf "Iperf client: Made connection to server.%!") >>= fun () -> + log_s c "Iperf client: Made connection to server." >>= fun () -> let a = Cstruct.sub (Io_page.(to_cstruct (get 1))) 0 mlen in Cstruct.blit_from_string msg 0 a 0 mlen; - let amt = 25000000 in let rec loop = function | 0 -> Lwt.return_unit | n -> write_and_check flow a >>= fun () -> loop (n-1) @@ -70,45 +107,51 @@ module Test_iperf ( B : Vnetif_backends.Backend ) = struct write_and_check flow a >>= fun () -> V.Stackv4.TCPV4.close flow in - C.log_s c (Printf.sprintf "Iperf client: Attempting connection.%!") >>= fun () -> + log_s c "Iperf client: Attempting connection." >>= fun () -> tcp_connect (V.Stackv4.tcpv4 s) (dest_ip, dport) >>= fun flow -> iperftx flow >>= fun () -> - C.log_s c (Printf.sprintf "Iperf client: Done.%!") + log_s c "Iperf client: Done." - let print_data c st ts_now = - C.log_s c (Printf.sprintf "Iperf server: t = %f, rate = %Ld KBits/s, totbytes = %Ld, live_words = %d%!" - (ts_now -. st.start_time) - (Int64.of_float (((Int64.to_float st.bin_bytes) /. (ts_now -. st.last_time)) /. 125.)) - st.bytes Gc.((stat()).live_words)) >>= fun () -> + let print_data c st ts_now = + let server = ts_now -. st.start_time in + let rate = + (Int64.to_float st.bin_bytes /. (ts_now -. st.last_time)) /. 125. + in + let live_words = Gc.((stat()).live_words) in + log_s c "Iperf server: t = %.0f, rate = %.0fd KBits/s, totbytes = %Ld, \ + live_words = %d" server rate st.bytes live_words >>= fun () -> st.last_time <- ts_now; st.bin_bytes <- 0L; st.bin_packets <- 0L; Lwt.return_unit let iperf c s server_done_u flow = - C.log_s c (Printf.sprintf "Iperf server: Received connection.%!") >>= fun () -> + log_s c "Iperf server: Received connection." >>= fun () -> let t0 = Clock.time () in - let st = {bytes=0L; packets=0L; bin_bytes=0L; bin_packets=0L; start_time = t0; last_time = t0} in + let st = { + bytes=0L; packets=0L; bin_bytes=0L; bin_packets=0L; start_time = t0; + last_time = t0 + } in let rec iperf_h flow = V.Stackv4.TCPV4.read flow >>= fun f -> match f with - | `Error _ -> raise (Failure "Unknown error in server while reading") + | `Error e -> err_read e () | `Eof -> - let ts_now = (Clock.time ()) in + let ts_now = (Clock.time ()) in st.bin_bytes <- st.bytes; st.bin_packets <- st.packets; st.last_time <- st.start_time; print_data c st ts_now >>= fun () -> V.Stackv4.TCPV4.close flow >>= fun () -> C.log_s c "Iperf server: Done - closed connection." - | `Ok data -> + | `Ok data -> begin let l = Cstruct.len data in st.bytes <- (Int64.add st.bytes (Int64.of_int l)); st.packets <- (Int64.add st.packets 1L); st.bin_bytes <- (Int64.add st.bin_bytes (Int64.of_int l)); st.bin_packets <- (Int64.add st.bin_packets 1L); - let ts_now = (Clock.time ()) in + let ts_now = (Clock.time ()) in (if ((ts_now -. st.last_time) >= 1.0) then print_data c st ts_now else @@ -120,8 +163,8 @@ module Test_iperf ( B : Vnetif_backends.Backend ) = struct Lwt.wakeup server_done_u (); Lwt.return_unit - let tcp_iperf () = - or_error "console" Console.connect "console" >>= fun c -> + let tcp_iperf amt () = + or_error "console" C.connect "console" >>= fun c -> let port = 5001 in let server_ready, server_ready_u = Lwt.wait () in @@ -130,20 +173,25 @@ module Test_iperf ( B : Vnetif_backends.Backend ) = struct Lwt.pick [ (Lwt_unix.sleep timeout >>= fun () -> (* timeout *) - fail "iperf test timed out after %f seconds" timeout) ; + fail "iperf test timed out after %f seconds" timeout); (server_ready >>= fun () -> Lwt_unix.sleep 0.1 >>= fun() -> (* Give server 0.1 s to call listen *) - C.log_s c (Printf.sprintf "I am client with IP %s, trying to connect to server @ %s:%d" (Ipaddr.V4.to_string client_ip) (Ipaddr.V4.to_string server_ip) port) >>= fun () -> + log_s c "I am client with IP %s, trying to connect to server @ %s:%d" + (Ipaddr.V4.to_string client_ip) + (Ipaddr.V4.to_string server_ip) port + >>= fun () -> V.create_stack c backend client_ip netmask [gw] >>= fun client_s -> - iperfclient c client_s server_ip port) ; + iperfclient c client_s amt server_ip port); - (C.log_s c (Printf.sprintf "I am server with IP %s, expecting connections on port %d" (Ipaddr.V4.to_string server_ip) port) >>= fun () -> + (log_s c "I am server with IP %s, expecting connections on port %d" + (Ipaddr.V4.to_string server_ip) port >>= fun () -> V.create_stack c backend server_ip netmask [gw] >>= fun server_s -> V.Stackv4.listen_tcpv4 server_s ~port (iperf c server_s server_done_u); Lwt.wakeup server_ready_u (); V.Stackv4.listen server_s) ] >>= fun () -> - C.log_s c "Waiting for server_done..." >>= fun () -> + + log_s c "Waiting for server_done..." >>= fun () -> server_done >>= fun () -> Lwt.return_unit (* exit cleanly *) @@ -151,21 +199,42 @@ module Test_iperf ( B : Vnetif_backends.Backend ) = struct V.record_pcap backend end -let test_tcp_iperf_two_stacks_basic () = +let test_tcp_iperf_two_stacks_basic amt () = let module Test = Test_iperf (Vnetif_backends.Basic) in - Test.record_pcap "tests/pcap/tcp_iperf_two_stacks_basic.pcap" Test.tcp_iperf + Test.record_pcap + "tests/pcap/tcp_iperf_two_stacks_basic.pcap" + (Test.tcp_iperf amt) -let test_tcp_iperf_two_stacks_trailing_bytes () = +let test_tcp_iperf_two_stacks_trailing_bytes amt () = let module Test = Test_iperf (Vnetif_backends.Trailing_bytes) in - Test.record_pcap "tests/pcap/tcp_iperf_two_stacks_trailing_bytes.pcap" Test.tcp_iperf + Test.record_pcap + "tests/pcap/tcp_iperf_two_stacks_trailing_bytes.pcap" + (Test.tcp_iperf amt) -let test_tcp_iperf_two_stacks_uniform_packet_loss () = +let test_tcp_iperf_two_stacks_uniform_packet_loss amt () = let module Test = Test_iperf (Vnetif_backends.Uniform_packet_loss) in - Test.record_pcap "tests/pcap/tcp_iperf_two_stacks_uniform_packet_loss.pcap" Test.tcp_iperf + Test.record_pcap + "tests/pcap/tcp_iperf_two_stacks_uniform_packet_loss.pcap" + (Test.tcp_iperf amt) +let amt_quick = 10_000_000 +let amt_slow = amt_quick * 100 let suite = [ - "test_tcp_iperf_two_stacks_basic" , test_tcp_iperf_two_stacks_basic; - "test_tcp_iperf_two_stacks_trailing_bytes" , test_tcp_iperf_two_stacks_trailing_bytes; - "test_tcp_iperf_two_stacks_uniform_packet_loss" , test_tcp_iperf_two_stacks_uniform_packet_loss; + + "iperf with two stacks, basic tests", `Quick, + test_tcp_iperf_two_stacks_basic amt_quick; + + "iperf with two stacks, testing trailing_bytes", `Quick, + test_tcp_iperf_two_stacks_trailing_bytes amt_quick; + + "iperf with two stacks and uniform packet loss", `Quick, + test_tcp_iperf_two_stacks_uniform_packet_loss amt_quick; + + "iperf with two stacks, basic tests, longer", `Slow, + test_tcp_iperf_two_stacks_basic amt_slow; + + "iperf with two stacks and uniform packet loss, longer", `Slow, + test_tcp_iperf_two_stacks_uniform_packet_loss amt_slow; + ] diff --git a/lib_test/vnetif_common.ml b/lib_test/vnetif_common.ml index 4eb95b849..8b7141897 100644 --- a/lib_test/vnetif_common.ml +++ b/lib_test/vnetif_common.ml @@ -70,7 +70,7 @@ module VNETIF_STACK ( B : Vnetif_backends.Backend) : VNETIF_STACK = struct or_error "tcpv4" T.connect ipv4 >>= fun tcpv4 -> let config = { V1_LWT.name = "stack"; - console = c; + console = c; interface = netif; mode = `IPv4 (ip, netmask, gw); } in @@ -78,7 +78,7 @@ module VNETIF_STACK ( B : Vnetif_backends.Backend) : VNETIF_STACK = struct let create_backend_listener backend listenf = match (B.register backend) with - | `Error e -> fail "Error occured while registering to backend" + | `Error e -> fail "Error occured while registering to backend" | `Ok id -> (B.set_listen_fn backend id listenf); id let disable_backend_listener backend id = @@ -99,12 +99,12 @@ module VNETIF_STACK ( B : Vnetif_backends.Backend) : VNETIF_STACK = struct let time = Unix.gettimeofday () in Pcap.LE.set_pcap_packet_incl_len pcap_buf (Int32.of_int (Cstruct.len buffer)); Pcap.LE.set_pcap_packet_orig_len pcap_buf (Int32.of_int (Cstruct.len buffer)); - Pcap.LE.set_pcap_packet_ts_sec pcap_buf (Int32.of_float time); + Pcap.LE.set_pcap_packet_ts_sec pcap_buf (Int32.of_float time); let frac = (time -. (float_of_int (truncate time))) *. 1000000.0 in Pcap.LE.set_pcap_packet_ts_usec pcap_buf (Int32.of_float frac); - (try + (try Lwt_io.write channel ((Cstruct.to_string pcap_buf) ^ (Cstruct.to_string buffer)) - with + with Lwt_io.Channel_closed msg -> Printf.printf "Warning: Pcap output channel already closed: %s.\n" msg; Lwt.return_unit) >>= fun () -> Lwt.return_unit @@ -120,4 +120,3 @@ module VNETIF_STACK ( B : Vnetif_backends.Backend) : VNETIF_STACK = struct Lwt.return_unit ) end - diff --git a/opam b/opam index 4e4074932..7a4a29538 100644 --- a/opam +++ b/opam @@ -13,13 +13,14 @@ authors: [ tags: ["org:mirage"] build: [ - ["./configure" "--prefix" prefix - "--%{mirage-flow+alcotest+mirage-vnetif+pcap-format:enable}%-tests" - "--%{mirage-xen:enable}%-xen" - ] + ["./configure" "--prefix" prefix "--%{mirage-xen:enable}%-xen"] [make] ] -build-test: [make "test" "TESTFLAGS=-v"] +build-test: [ + ["./configure" "--enable-tests"] + [make "test" "TESTFLAGS=-v"] +] + install: [make "install"] remove: ["ocamlfind" "remove" "tcpip"] depends: [ @@ -35,7 +36,8 @@ depends: [ "mirage-flow" {test} "mirage-vnetif" {test} "alcotest" {test} - "pcap-format" {test} + "pcap-format" {test} + "lwt" {>= "2.4.7"} ] depopts: [ "mirage-xen" diff --git a/setup.ml b/setup.ml index fa35dd067..345d0e32d 100644 --- a/setup.ml +++ b/setup.ml @@ -1,7 +1,7 @@ (* setup.ml generated for the first time by OASIS v0.4.5 *) (* OASIS_START *) -(* DO NOT EDIT (digest: 998ba22d44a9da9fecedfa9dd25bf45c) *) +(* DO NOT EDIT (digest: 38cb04279f0430b7792c79e11b16913d) *) (* Regenerated by OASIS v0.4.5 Visit http://oasis.forge.ocamlcore.org for more information and @@ -6809,7 +6809,7 @@ let setup_t = CustomPlugin.Test.main { CustomPlugin.cmd_main = - [(OASISExpr.EBool true, ("$test", []))]; + [(OASISExpr.EBool true, ("$test", ["-q"]))]; cmd_clean = [(OASISExpr.EBool true, None)]; cmd_distclean = [(OASISExpr.EBool true, None)] }) @@ -6824,7 +6824,7 @@ let setup_t = CustomPlugin.Test.clean { CustomPlugin.cmd_main = - [(OASISExpr.EBool true, ("$test", []))]; + [(OASISExpr.EBool true, ("$test", ["-q"]))]; cmd_clean = [(OASISExpr.EBool true, None)]; cmd_distclean = [(OASISExpr.EBool true, None)] }) @@ -6837,7 +6837,7 @@ let setup_t = CustomPlugin.Test.distclean { CustomPlugin.cmd_main = - [(OASISExpr.EBool true, ("$test", []))]; + [(OASISExpr.EBool true, ("$test", ["-q"]))]; cmd_clean = [(OASISExpr.EBool true, None)]; cmd_distclean = [(OASISExpr.EBool true, None)] }) @@ -7196,7 +7196,9 @@ let setup_t = "Segment"; "User_buffer"; "Pcb"; - "Flow" + "Flow"; + "Stats"; + "Log" ]; lib_pack = true; lib_internal_modules = []; @@ -7876,7 +7878,8 @@ let setup_t = }, { test_type = (`Test, "custom", Some "0.4"); - test_command = [(OASISExpr.EBool true, ("$test", []))]; + test_command = + [(OASISExpr.EBool true, ("$test", ["-q"]))]; test_custom = { pre_command = [(OASISExpr.EBool true, None)]; @@ -7903,7 +7906,7 @@ let setup_t = }; oasis_fn = Some "_oasis"; oasis_version = "0.4.5"; - oasis_digest = Some "\015_y\240h\229\197\012\031\140&<\154\143x\129"; + oasis_digest = Some "\016§\017lÕ\015\019ËS\146Èòdãìß"; oasis_exec = None; oasis_setup_args = []; setup_update = false @@ -7911,6 +7914,6 @@ let setup_t = let setup () = BaseSetup.setup setup_t;; -# 7915 "setup.ml" +# 7918 "setup.ml" (* OASIS_STOP *) let () = setup ();; diff --git a/tcp/ack.ml b/tcp/ack.ml index 1599ecc84..918ba6cf3 100644 --- a/tcp/ack.ml +++ b/tcp/ack.ml @@ -14,7 +14,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt +open Lwt.Infix (* General signature for all the ack modules *) module type M = sig @@ -51,12 +51,12 @@ module Immediate : M = struct let receive t ack_number = match t.pushpending with - | true -> return_unit + | true -> Lwt.return_unit | false -> pushack t ack_number let transmit t _ = t.pushpending <- false; - return_unit + Lwt.return_unit end @@ -82,25 +82,22 @@ module Delayed (Time:V1_LWT.TIME) : M = struct let transmitack r ack_number = match r.pushpending with - | true -> return_unit - | false -> r.pushpending <- true; + | true -> Lwt.return_unit + | false -> + r.pushpending <- true; transmitacknow r ack_number - let ontimer r s = match r.delayed with - | false -> - Tcptimer.Stoptimer - | true -> begin - match r.delayedack = s with - | false -> - Tcptimer.Continue r.delayedack - | true -> - r.delayed <- false; - let _ = transmitack r s in - Tcptimer.Stoptimer - end - + | false -> Lwt.return Tcptimer.Stoptimer + | true -> + match r.delayedack = s with + | false -> + Lwt.return (Tcptimer.Continue r.delayedack) + | true -> + r.delayed <- false; + transmitack r s >>= fun () -> + Lwt.return Tcptimer.Stoptimer let t ~send_ack ~last : t = let pushpending = false in @@ -134,6 +131,6 @@ module Delayed (Time:V1_LWT.TIME) : M = struct let transmit t _ = t.r.delayed <- false; t.r.pushpending <- false; - return_unit + Lwt.return_unit end diff --git a/tcp/flow.ml b/tcp/flow.ml index e24b6a7a6..10ddbe2ce 100644 --- a/tcp/flow.ml +++ b/tcp/flow.ml @@ -14,13 +14,14 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -let (>>=) = Lwt.(>>=) -let (>|=) = Lwt.(>|=) +open Lwt.Infix (* TODO: modify V1.TCP to have a proper return type *) exception Bad_state of State.tcpstate +let debug = Log.create "Flow" + module Make(IP:V1_LWT.IP)(TM:V1_LWT.TIME)(C:V1.CLOCK)(R:V1.RANDOM) = struct module Pcb = Pcb.Make(IP)(TM)(C)(R) @@ -40,14 +41,16 @@ module Make(IP:V1_LWT.IP)(TM:V1_LWT.TIME)(C:V1.CLOCK)(R:V1.RANDOM) = struct | `Refused ] - let err_timeout () = - (* Printf.printf "Failed to connect to %s:%d\n%!" *) - (* (Ipaddr.V4.to_string daddr) dport; *) + let err_timeout daddr dport = + Log.f debug (fun fmt -> + Log.pf fmt "Failed to connect to %a:%d\n%!" + Ipaddr.pp_hum (IP.to_uipaddr daddr) dport); Lwt.return (`Error `Timeout) - let err_refused () = - (* Printf.printf "Refused connection to %s:%d\n%!" *) - (* (Ipaddr.V4.to_string daddr) dport; *) + let err_refused daddr dport = + Log.f debug (fun fmt -> + Log.pf fmt "Refused connection to %a:%d\n%!" + Ipaddr.pp_hum (IP.to_uipaddr daddr) dport); Lwt.return (`Error `Refused) let ok x = Lwt.return (`Ok x) @@ -85,9 +88,8 @@ module Make(IP:V1_LWT.IP)(TM:V1_LWT.TIME)(C:V1.CLOCK)(R:V1.RANDOM) = struct let create_connection tcp (daddr, dport) = Pcb.connect tcp ~dest_ip:daddr ~dest_port:dport >>= function - | `Timeout -> err_timeout () - | `Rst -> err_refused () + | `Timeout -> err_timeout daddr dport + | `Rst -> err_refused daddr dport | `Ok (fl, _) -> ok fl - end diff --git a/tcp/flow.mli b/tcp/flow.mli index 67fab6eb8..6232d2a68 100644 --- a/tcp/flow.mli +++ b/tcp/flow.mli @@ -14,6 +14,8 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) +val debug: Log.t + exception Bad_state of State.tcpstate module Make (IP:V1_LWT.IP)(TM:V1_LWT.TIME)(C:V1.CLOCK)(R:V1.RANDOM) : sig diff --git a/tcp/log.ml b/tcp/log.ml new file mode 100644 index 000000000..e0fdb46b9 --- /dev/null +++ b/tcp/log.ml @@ -0,0 +1,60 @@ +(* + * Copyright (c) 2015 Thomas Gazagnaire + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +type section = int + +type t = { + name: string; + id : int; + mutable enabled: bool; + mutable stats: bool; +} + +let c = ref 0 + +let f t = + if t.enabled && t.stats then + let stats = Stats.create () in + fun pp -> Format.printf ("Tcp.%s%a: %t\n%!") t.name Stats.pp stats pp + else if t.enabled then + fun pp -> Format.printf ("Tcp.%s: %t\n%!") t.name pp + else + fun _pp -> () + +let s t str = f t (fun fmt -> Format.pp_print_string fmt str) + +let create ?(enabled=false) ?(stats=true) name = + incr c; + { name; id = !c; stats; enabled } + +let enable t = t.enabled <- true +let disable t = t.enabled <- false +let enabled t = t.enabled +let name t = t.name +let stats t = t.stats +let set_stats t b = t.stats <- b + +let rec pp_print_list ?(pp_sep = Format.pp_print_cut) pp_v ppf = function + | [] -> () + | [v] -> pp_v ppf v + | v :: vs -> + pp_v ppf v; + pp_sep ppf (); + pp_print_list ~pp_sep pp_v ppf vs + + +let ps = Format.pp_print_string +let pf = Format.fprintf diff --git a/tcp/log.mli b/tcp/log.mli new file mode 100644 index 000000000..d3743ca04 --- /dev/null +++ b/tcp/log.mli @@ -0,0 +1,65 @@ +(* + * Copyright (c) 2015 Thomas Gazagnaire + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +(** Logging module for TCP *) + +type t +(** The type for managing logging values. *) + +val create: ?enabled:bool -> ?stats:bool -> string -> t +(** Create a new section. By default, the section is disabled and the + stats are printed. *) + +val enable: t -> unit +(** Enable a section. *) + +val disable: t -> unit +(** Disable a section. *) + +val set_stats: t -> bool -> unit +(** Display the stats on every debug line. *) + +val stats: t -> bool +(** Check if the stats are displayed. *) + +val enabled: t -> bool +(** [enabled t] is [true] iff [t] is enabled. *) + +val name: t -> string +(** [name t] is the section name. *) + +val f: t -> (Format.formatter -> unit) -> unit +(** Print a formatted entry into a logger. *) + +val s: t -> string -> unit +(** Print a string into a logger. *) + +val ps: Format.formatter -> string -> unit +(** Same as {!format.pp_print_string}. *) + +val pf: Format.formatter -> ('a, Format.formatter, unit) format -> 'a +(** Same as {!Format.fprintf}, to be used with {!f}. *) + +val pp_print_list: + ?pp_sep:(Format.formatter -> unit -> unit) -> + (Format.formatter -> 'a -> unit) -> (Format.formatter -> 'a list -> unit) +(** [pp_print_list ?pp_sep pp_v ppf l] prints the list [l]. [pp_v] is + used on the elements of [l] and each element is separated by + a call to [pp_sep] (defaults to {!pp_print_cut}). Does nothing on + empty lists. + + @since 4.02.0 +*) diff --git a/tcp/options.ml b/tcp/options.ml index 1e69a58e1..9683f7c88 100644 --- a/tcp/options.ml +++ b/tcp/options.ml @@ -158,15 +158,21 @@ let marshal buf ts = tlen+3 | _ -> assert false -let to_string = function - | Noop -> "Noop" - | MSS m -> Printf.sprintf "MSS=%d" m - | Window_size_shift b -> Printf.sprintf "Window>>%d" b - | SACK_ok -> "SACK_ok" - | SACK x -> Printf.(sprintf "SACK=(%s)" (String.concat "," - (List.map (fun (l,r) -> sprintf "%lu,%lu" l r) x))) - | Timestamp (a,b) -> Printf.sprintf "Timestamp(%lu,%lu)" a b - | Unknown (t,_) -> Printf.sprintf "%d?" t +let pp_sack fmt x = + let pp_v fmt (l, r) = Log.pf fmt "[%lu,%lu]" l r in + Log.pp_print_list pp_v fmt x -let prettyprint s = - Printf.sprintf "[ %s ]" (String.concat "; " (List.map to_string s)) +let pp fmt = function + | Noop -> Log.ps fmt "Noop" + | MSS m -> Log.pf fmt "MSS=%d" m + | Window_size_shift b -> Log.pf fmt "Window>> %d" b + | SACK_ok -> Log.ps fmt "SACK_ok" + | SACK x -> Log.pf fmt "SACK[%a]" pp_sack x + | Timestamp (a,b) -> Log.pf fmt "Timestamp(%lu,%lu)" a b + | Unknown (t,_) -> Log.pf fmt "%d?" t + +let pps fmt = function + | [] -> Log.ps fmt "[]" + | x -> + let ppl fmt x = Log.pp_print_list pp fmt x in + Log.pf fmt "[ %a ]" ppl x diff --git a/tcp/options.mli b/tcp/options.mli index 9cb35199a..2523093e9 100644 --- a/tcp/options.mli +++ b/tcp/options.mli @@ -29,4 +29,5 @@ type t = val marshal: Cstruct.t -> t list -> int val unmarshal : Cstruct.t -> t list -val prettyprint : t list -> string +val pp : Format.formatter -> t -> unit +val pps : Format.formatter -> t list -> unit diff --git a/tcp/pcb.ml b/tcp/pcb.ml index 1ed891ba9..e2d366e44 100644 --- a/tcp/pcb.ml +++ b/tcp/pcb.ml @@ -15,8 +15,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt -open Printf +open Lwt.Infix type error = [`Bad_state of State.tcpstate] @@ -36,6 +35,10 @@ let iter_s f l = in aux l + +let debug = Log.create "PCB" +let info = Log.create ~enabled:true ~stats:false "PCB" + module Tcp_wire = Wire_structs.Tcp_wire cstruct pseudo_header { @@ -85,6 +88,14 @@ struct connects: (WIRE.id, (connection_result Lwt.u * Sequence.t)) Hashtbl.t; } + let pp_stats fmt t = + Log.pf fmt "[channels=%d listens=%d connects=%d]" + (Hashtbl.length t.channels) + (Hashtbl.length t.listens) + (Hashtbl.length t.connects) + + let with_stats name t fmt = Log.pf fmt "%s: %a" name pp_stats t + let ip { ip; _ } = ip let verify_checksum _ _ _ = true @@ -118,6 +129,7 @@ struct (* Queue up an immediate close segment *) let close pcb = + Log.s debug "TX.close"; match State.state pcb.state with | State.Established | State.Close_wait -> UTX.wait_for_flushed pcb.utx >>= fun () -> @@ -125,7 +137,10 @@ struct STATE.tick pcb.state (State.Send_fin (Window.tx_nxt wnd)); TXS.output ~flags:Segment.Fin pcb.txq [] ) - | _ -> return_unit + | _ -> + Log.f debug (fun fmt -> + Log.pf fmt "TX.close: skipping, state=%a" State.pp pcb.state); + Lwt.return_unit (* Thread that transmits ACKs in response to received packets, thus telling the other side that more can be sent, and @@ -180,7 +195,7 @@ struct let rec rx_application_t () = Lwt_mvar.take rx_data >>= fun (data, winadv) -> begin match winadv with - | None -> return_unit + | None -> Lwt.return_unit | Some winadv -> if (winadv > 0) then ( Window.rx_advance wnd winadv; @@ -198,10 +213,11 @@ struct rx_application_t () | Some data -> let rec queue = function + | [] -> Lwt.return_unit | hd::tl -> User_buffer.Rx.add_r urx (Some hd) >>= fun () -> queue tl - | [] -> return_unit in + in queue data >>= fun _ -> rx_application_t () end @@ -237,19 +253,22 @@ struct let clearpcb t id tx_isn = (* TODO: add more info to log msgs *) + Log.f debug (with_stats "removing pcb from tables" t); match hashtbl_find t.channels id with | Some _ -> - (* printf "TCP: removing pcb from tables\n%!";*) - Hashtbl.remove t.channels id + Log.s debug "removed from channels!!"; + Hashtbl.remove t.channels id; + Stats.decr_channel (); | None -> match hashtbl_find t.listens id with | Some (isn, _) -> if isn = tx_isn then ( - printf "TCP: removing incomplete listen pcb\n%!"; - Hashtbl.remove t.listens id + Log.s debug "removing incomplete listen pcb"; + Hashtbl.remove t.listens id; + Stats.decr_listen (); ) | None -> - printf "TCP: error in removing pcb - no such connection\n%!" + Log.s debug "error in removing pcb - no such connection" let pcb_allocs = ref 0 let th_allocs = ref 0 @@ -332,20 +351,28 @@ struct let fnth = fun _ -> th_frees := !th_frees + 1 in Gc.finalise fnpcb pcb; Gc.finalise fnth th; - return (pcb, th, opts) + Lwt.return (pcb, th, opts) let new_server_connection t params id pushf = + Log.f debug (with_stats "new-server-connection" t); new_pcb t params id >>= fun (pcb, th, opts) -> STATE.tick pcb.state State.Passive_open; STATE.tick pcb.state (State.Send_synack params.tx_isn); (* Add the PCB to our listens table *) - Hashtbl.replace t.listens id (params.tx_isn, (pushf, (pcb, th))); + if Hashtbl.mem t.listens id then ( + Log.s info "WARNING: connection already being attempted"; + Hashtbl.remove t.listens id; + Stats.decr_listen (); + ); + Hashtbl.add t.listens id (params.tx_isn, (pushf, (pcb, th))); + Stats.incr_listen (); (* Queue a SYN ACK for transmission *) let options = Options.MSS 1460 :: opts in TXS.output ~flags:Segment.Syn ~options pcb.txq [] >>= fun () -> - return (pcb, th) + Lwt.return (pcb, th) let new_client_connection t params id ack_number = + Log.f debug (with_stats "new-client-connection" t); let tx_isn = params.tx_isn in let params = { params with tx_isn = Sequence.incr tx_isn } in new_pcb t params id >>= fun (pcb, th, _) -> @@ -353,34 +380,40 @@ struct STATE.tick pcb.state (State.Send_syn tx_isn); (* Add the PCB to our connection table *) Hashtbl.add t.channels id (pcb, th); + Stats.incr_channel (); STATE.tick pcb.state (State.Recv_synack (Sequence.of_int32 ack_number)); (* xmit ACK *) TXS.output pcb.txq [] >>= fun () -> - return (pcb, th) + Lwt.return (pcb, th) let process_reset t id = + Log.f debug (with_stats "process-reset" t); match hashtbl_find t.connects id with | Some (wakener, _) -> (* URG_TODO: check if RST ack num is valid before it is accepted *) Hashtbl.remove t.connects id; + Stats.decr_connect (); Lwt.wakeup wakener `Rst; - return_unit + Lwt.return_unit | None -> match hashtbl_find t.listens id with | Some (_, (_, (pcb, th))) -> Hashtbl.remove t.listens id; + Stats.decr_listen (); STATE.tick pcb.state State.Recv_rst; Lwt.cancel th; - return_unit + Lwt.return_unit | None -> (* Incoming RST possibly to listen port - ignore per RFC793 pg65 *) - return_unit + Lwt.return_unit let process_synack t id ~pkt ~ack_number ~sequence ~options ~syn ~fin = + Log.f debug (with_stats "process-synack" t); match hashtbl_find t.connects id with | Some (wakener, tx_isn) -> if Sequence.(to_int32 (incr tx_isn)) = ack_number then ( Hashtbl.remove t.connects id; + Stats.decr_connect (); let tx_wnd = Tcp_wire.get_tcp_window pkt in let rx_wnd = 65535 in (* TODO: fix hardcoded value - it assumes that this value was @@ -391,18 +424,19 @@ struct id ack_number >>= fun (pcb, th) -> Lwt.wakeup wakener (`Ok (pcb, th)); - return_unit + Lwt.return_unit ) else (* Normally sending a RST reply to a random pkt would be in order but here we stay quiet since we are actively trying to connect this id *) - return_unit + Lwt.return_unit | None -> (* Incomming SYN-ACK with no pending connect and no matching pcb - send RST *) Tx.send_rst t id ~sequence ~ack_number ~syn ~fin let process_syn t id ~listeners ~pkt ~ack_number ~sequence ~options ~syn ~fin = + Log.f debug (with_stats "process-syn" t); match listeners id.WIRE.local_port with | Some pushf -> let tx_isn = Sequence.of_int ((Random.int 65535) + 0x1AFE0000) in @@ -414,29 +448,32 @@ struct { tx_wnd; sequence; options; tx_isn; rx_wnd; rx_wnd_scaleoffer } id pushf >>= fun _ -> - return_unit + Lwt.return_unit | None -> Tx.send_rst t id ~sequence ~ack_number ~syn ~fin let process_ack t id ~pkt ~ack_number ~sequence ~syn ~fin = + Log.f debug (with_stats "process-ack" t); match hashtbl_find t.listens id with | Some (tx_isn, (pushf, newconn)) -> if Sequence.(to_int32 (incr tx_isn)) = ack_number then ( (* Established connection - promote to active channels *) Hashtbl.remove t.listens id; + Stats.decr_listen (); Hashtbl.add t.channels id newconn; + Stats.incr_channel (); (* Finish processing ACK, so pcb.state is correct *) Rx.input t pkt newconn >>= fun () -> (* send new connection up to listener *) pushf (fst newconn) ) else (* No RST because we are trying to connect on this id *) - return_unit + Lwt.return_unit | None -> match hashtbl_find t.connects id with | Some _ -> (* No RST because we are trying to connect on this id *) - return_unit + Lwt.return_unit | None -> (* ACK but no matching pcb and no listen - send RST *) Tx.send_rst t id ~sequence ~ack_number ~syn ~fin @@ -459,12 +496,15 @@ struct | false, true -> process_ack t id ~pkt ~ack_number ~sequence ~syn ~fin | false, false -> (* What the hell is this packet? No SYN,ACK,RST *) - return_unit + Log.s debug "input-no-pcb: unknown packet"; + Lwt.return_unit (* Main input function for TCP packets *) let input t ~listeners ~src ~dst data = match verify_checksum src dst data with - | false -> printf "RX.input: checksum error\n%!"; return_unit + | false -> + Log.s debug "RX.input: checksum error"; + Lwt.return_unit | true -> let source_port = Tcp_wire.get_tcp_src_port data in let dest_port = Tcp_wire.get_tcp_dst_port data in @@ -554,20 +594,19 @@ struct in Time.sleep rxtime >>= fun () -> match hashtbl_find t.connects id with + | None -> Lwt.return_unit | Some (wakener, isn) -> if isn = tx_isn then if count > 3 then ( Hashtbl.remove t.connects id; + Stats.decr_connect (); Lwt.wakeup wakener `Timeout; - return_unit + Lwt.return_unit ) else ( Tx.send_syn t id ~tx_isn ~options ~window >>= fun () -> connecttimer t id tx_isn options window (count + 1) ) - else - return_unit - | None -> - return_unit + else Lwt.return_unit let connect t ~dest_ip ~dest_port = let id = getid t dest_ip dest_port in @@ -579,16 +618,20 @@ struct in let window = 5840 in let th, wakener = MProf.Trace.named_task "TCP connect" in - if Hashtbl.mem t.connects id then - printf "WARNING: connection already being attempted\n%!"; - Hashtbl.replace t.connects id (wakener, tx_isn); + if Hashtbl.mem t.connects id then ( + Log.s info "WARNING: connection already being attempted"; + Hashtbl.remove t.connects id; + Stats.decr_connect (); + ); + Hashtbl.add t.connects id (wakener, tx_isn); + Stats.incr_connect (); Tx.send_syn t id ~tx_isn ~options ~window >>= fun () -> - let _ = connecttimer t id tx_isn options window 0 in + Lwt.async (fun () -> connecttimer t id tx_isn options window 0); th (* Construct the main TCP thread *) let create ip = - let _ = Random.self_init () in + Random.self_init (); let localport = 10000 + (Random.int 10000) in let listens = Hashtbl.create 1 in let connects = Hashtbl.create 1 in diff --git a/tcp/pcb.mli b/tcp/pcb.mli index 6058693b6..42dd39f64 100644 --- a/tcp/pcb.mli +++ b/tcp/pcb.mli @@ -18,6 +18,9 @@ type error = [`Bad_state of State.tcpstate] type 'a result = [`Ok of 'a | `Error of error] +val info : Log.t +val debug: Log.t + module Make(Ip:V1_LWT.IP)(Time:V1_LWT.TIME)(Clock:V1.CLOCK)(Random:V1.RANDOM) : sig (** Overall state of the TCP stack *) diff --git a/tcp/segment.ml b/tcp/segment.ml index bcbe9c433..b85149870 100644 --- a/tcp/segment.ml +++ b/tcp/segment.ml @@ -14,14 +14,24 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Printf -open Lwt +open Lwt.Infix + +let debug = Log.create "Segment" +let info = Log.create ~enabled:true ~stats:false "Segment" + +let lwt_sequence_add_l s seq = + let (_:'a Lwt_sequence.node) = Lwt_sequence.add_l s seq in + () + +let lwt_sequence_add_r s seq = + let (_:'a Lwt_sequence.node) = Lwt_sequence.add_r s seq in + () let peek_opt_l seq = match Lwt_sequence.take_opt_l seq with | None -> None | Some s -> - let _ = Lwt_sequence.add_l s seq in + lwt_sequence_add_l s seq; Some s let peek_l seq = @@ -60,10 +70,11 @@ module Rx(Time:V1_LWT.TIME) = struct window: int; } - let string_of_segment seg = - sprintf "TCP: RX seg seq=%s fin=%b syn=%b ack=%b acknum=%s win=%d" - (Sequence.to_string seg.sequence) seg.fin seg.syn seg.ack - (Sequence.to_string seg.ack_number) seg.window + let pp_segment fmt seg = + Log.pf fmt + "RX seg seq=%a fin=%b syn=%b ack=%b acknum=%a win=%d" + Sequence.pp seg.sequence seg.fin seg.syn seg.ack + Sequence.pp seg.ack_number seg.window let segment ~sequence ~fin ~syn ~rst ~ack ~ack_number ~window ~data = { sequence; fin; syn; ack; rst; ack_number; window; data } @@ -91,11 +102,11 @@ module Rx(Time:V1_LWT.TIME) = struct let segs = S.empty in { segs; rx_data; tx_ack; wnd; state } - let to_string t = - String.concat ", " - (List.map (fun seg -> - sprintf "%lu[%d]" (Sequence.to_int32 seg.sequence) (len seg) - ) (S.elements t.segs)) + let pp fmt t = + let pp_v fmt seg = + Log.pf fmt "%a[%d]" Sequence.pp seg.sequence (len seg) + in + Log.pp_print_list pp_v fmt (S.elements t.segs) (* If there is a FIN flag at the end of this segment set. TODO: should look for a FIN and chop off the rest of the set as they @@ -115,6 +126,7 @@ module Rx(Time:V1_LWT.TIME) = struct queue, update the window, extract any ready segments into the user receive queue, and signal any acks to the Tx queue *) let input (q:t) seg = + Log.s debug "input"; (* Check that the segment fits into the valid receive window *) let force_ack = ref false in if not (Window.valid q.wnd seg.sequence) then Lwt.return_unit @@ -170,9 +182,10 @@ module Rx(Time:V1_LWT.TIME) = struct Lwt_mvar.put q.tx_ack ((Window.ack_seq q.wnd), (Window.ack_win q.wnd)) end else begin Window.set_ack_seq_win q.wnd seg.ack_number seg.window; - return_unit + Lwt.return_unit end - end else return_unit in + end else Lwt.return_unit + in (* Inform the user application of new data *) let urx_inform = (* TODO: deal with overlapping fragments *) @@ -187,9 +200,9 @@ module Rx(Time:V1_LWT.TIME) = struct window as closed and tell the application *) (if fin ready then begin if S.cardinal waiting != 0 then - printf "TCP: warning, rx closed but waiting segs != 0\n%!"; + Log.s info "warning, rx closed but waiting segs != 0"; Lwt_mvar.put q.rx_data (None, Some 0) - end else return_unit) + end else Lwt.return_unit) in tx_ack <&> urx_inform @@ -240,8 +253,8 @@ module Tx (Time:V1_LWT.TIME) (Clock:V1.CLOCK) = struct mutable dup_acks: int; (* dup ack count for re-xmits *) } -(* let string_of_seg seg = - sprintf "[%s%d]" + let pp_seg fmt seg = + Log.pf fmt "[%s%d]" (match seg.flags with | No_flags ->"" | Syn ->"SYN " @@ -249,7 +262,6 @@ module Tx (Time:V1_LWT.TIME) (Clock:V1.CLOCK) = struct | Rst -> "RST " | Psh -> "PSH ") (len seg) -*) let ack_segment _ _ = () (* Take any action to the user transmit queue due to this being @@ -262,84 +274,92 @@ module Tx (Time:V1_LWT.TIME) (Clock:V1.CLOCK) = struct | State.Syn_rcvd _ | State.Established | State.Fin_wait_1 _ | State.Close_wait | State.Last_ack _ -> begin match peek_opt_l segs with - | None -> - Tcptimer.Stoptimer + | None -> Lwt.return Tcptimer.Stoptimer | Some rexmit_seg -> match rexmit_seg.seq = seq with | false -> - (* printf "PUSHING TIMER - new time = %f, new seq = %d\n%!" - (Window.rto wnd) (Sequence.to_int rexmit_seg.seq); *) - Tcptimer.ContinueSetPeriod (Window.rto wnd, rexmit_seg.seq) + Log.f debug (fun fmt -> + Log.pf fmt "PUSHING TIMER - new time=%f, new seq=%a" + (Window.rto wnd) Sequence.pp rexmit_seg.seq); + let ret = + Tcptimer.ContinueSetPeriod (Window.rto wnd, rexmit_seg.seq) + in + Lwt.return ret | true -> if (Window.max_rexmits_done wnd) then ( (* TODO - include more in log msg like ipaddrs *) - printf "Max retransmits reached for connection - terminating\n%!"; + Log.s info "Max retransmits reached for connection - terminating"; StateTick.tick st State.Timeout; - Tcptimer.Stoptimer + Lwt.return Tcptimer.Stoptimer ) else ( let flags = rexmit_seg.flags in let options = [] in (* TODO: put the right options *) - printf "TCP retransmission on timer seq = %d\n%!" - (Sequence.to_int rexmit_seg.seq); + Log.f info (fun fmt -> + Log.pf fmt "TCP retransmission on timer seq = %d" + (Sequence.to_int rexmit_seg.seq)); (* FIXME: suspicious ignore *) - let _ = xmit ~flags ~wnd ~options ~seq rexmit_seg.data in + xmit ~flags ~wnd ~options ~seq rexmit_seg.data >>= fun () -> Window.backoff_rto wnd; - (* printf "PUSHING TIMER - new time = %f, new seq = %d\n%!" - (Window.rto wnd) (Sequence.to_int rexmit_seg.seq); *) - Tcptimer.ContinueSetPeriod (Window.rto wnd, rexmit_seg.seq) + Log.f debug (fun fmt -> + Log.pf fmt "PUSHING TIMER - new time = %f, new seq = %a" + (Window.rto wnd) Sequence.pp rexmit_seg.seq); + let ret = + Tcptimer.ContinueSetPeriod (Window.rto wnd, rexmit_seg.seq) + in + Lwt.return ret ) end - | _ -> - Tcptimer.Stoptimer + | _ -> Lwt.return Tcptimer.Stoptimer + + let rec clearsegs q ack_remaining segs = + match ack_remaining > 0l with + | false -> 0l (* here we return 0l instead of ack_remaining in case + the ack was an old packet in the network *) + | true -> + match Lwt_sequence.take_opt_l segs with + | None -> + Log.s info "Dubious ACK received"; + ack_remaining + | Some s -> + let seg_len = (Int32.of_int (len s)) in + match ack_remaining < seg_len with + | true -> + Log.s info "Partial ACK received"; + (* return uncleared segment to the sequence *) + lwt_sequence_add_l s segs; + ack_remaining + | false -> + ack_segment q s; + clearsegs q (Int32.sub ack_remaining seg_len) segs let rto_t q tx_ack = (* Listen for incoming TX acks from the receive queue and ACK segments in our retransmission queue *) let rec tx_ack_t () = let serviceack dupack ack_len seq win = - let rec clearsegs ack_remaining segs = - match ack_remaining > 0l with - | false -> 0l (* here we return 0l instead of ack_remaining in case - the ack was an old packet in the network *) - | true -> - match Lwt_sequence.take_opt_l segs with - | None -> - printf "TCP: Dubious ACK received\n%!"; - ack_remaining - | Some s -> - let seg_len = (Int32.of_int (len s)) in - match ack_remaining < seg_len with - | true -> - printf "TCP: Partial ACK received\n%!"; - (* return uncleared segment to the sequence *) - let _ = Lwt_sequence.add_l s segs in - ack_remaining - | false -> - ack_segment q s; - clearsegs (Int32.sub ack_remaining seg_len) segs - in - let partleft = clearsegs (Sequence.to_int32 ack_len) q.segs in + let partleft = clearsegs q (Sequence.to_int32 ack_len) q.segs in TX.tx_ack q.wnd (Sequence.sub seq (Sequence.of_int32 partleft)) win; - match (dupack || (Window.fast_rec q.wnd)) with + match dupack || Window.fast_rec q.wnd with | true -> q.dup_acks <- q.dup_acks + 1; - if (q.dup_acks = 3) || - ((q.dup_acks > 3) && ((Sequence.to_int32 ack_len) > 0l)) then begin + if q.dup_acks = 3 || + (q.dup_acks > 3 && Sequence.to_int32 ack_len > 0l) then begin (* alert window module to fall into fast recovery *) Window.alert_fast_rexmit q.wnd seq; (* retransmit the bottom of the unacked list of packets *) let rexmit_seg = peek_l q.segs in - (* printf "TCP fast retransmission seq = %d, dupack = %d\n%!" - (Sequence.to_int rexmit_seg.seq) (Sequence.to_int seq); *) + Log.f debug (fun fmt -> + Log.pf fmt "TCP fast retransmission seq=%a, dupack=%a" + Sequence.pp rexmit_seg.seq Sequence.pp seq); let { wnd; _ } = q in let flags=rexmit_seg.flags in let options=[] in (* TODO: put the right options *) - (* XXX: suspicisous ignore *) - let _ = q.xmit ~flags ~wnd ~options ~seq rexmit_seg.data in - () - end + q.xmit ~flags ~wnd ~options ~seq rexmit_seg.data + end else + Lwt.return_unit | false -> - q.dup_acks <- 0 + q.dup_acks <- 0; + Lwt.return_unit in Lwt_mvar.take tx_ack >>= fun _ -> Window.set_ack_serviced q.wnd true; @@ -350,7 +370,8 @@ module Tx (Time:V1_LWT.TIME) (Clock:V1.CLOCK) = struct (* Note: This is not stricly necessary, as the PCB will be GCed later on. However, it helps removing pressure on the GC. *) - reset_seq q.segs + reset_seq q.segs; + Lwt.return_unit | _ -> let ack_len = Sequence.sub seq (Window.tx_una q.wnd) in let dupacktest () = @@ -359,7 +380,7 @@ module Tx (Time:V1_LWT.TIME) (Clock:V1.CLOCK) = struct not (Lwt_sequence.is_empty q.segs) in serviceack (dupacktest ()) ack_len seq win - end; + end >>= fun () -> (* Inform the window thread of updates to the transmit window *) Lwt_mvar.put q.tx_wnd_update win >>= fun () -> tx_ack_t () @@ -396,9 +417,9 @@ module Tx (Time:V1_LWT.TIME) (Clock:V1.CLOCK) = struct (* Queue up segment just sent for retransmission if needed *) let q_rexmit () = match seq_len > 0 with - | false -> return_unit + | false -> Lwt.return_unit | true -> - let _ = Lwt_sequence.add_r seg q.segs in + lwt_sequence_add_r seg q.segs; let p = Window.rto q.wnd in TT.start q.rexmit_timer ~p seg.seq in diff --git a/tcp/segment.mli b/tcp/segment.mli index c39841720..1475f6715 100644 --- a/tcp/segment.mli +++ b/tcp/segment.mli @@ -16,6 +16,9 @@ (** TCP segments *) +val info : Log.t +val debug : Log.t + (** The receive queue stores out-of-order segments, and can coalesece them on input and pass on an ordered list up the stack to the application. @@ -28,7 +31,7 @@ module Rx (T:V1_LWT.TIME) : sig type segment (** Individual received TCP segment *) - val string_of_segment: segment -> string + val pp_segment: Format.formatter -> segment -> unit val segment: sequence:Sequence.t -> fin:bool -> syn:bool -> rst:bool -> ack:bool -> @@ -38,7 +41,7 @@ module Rx (T:V1_LWT.TIME) : sig type t (** Queue of receive segments *) - val to_string: t -> string + val pp: Format.formatter -> t -> unit val create: rx_data:(Cstruct.t list option * int option) Lwt_mvar.t -> diff --git a/tcp/sequence.ml b/tcp/sequence.ml index c1dffea56..f99a44d6a 100644 --- a/tcp/sequence.ml +++ b/tcp/sequence.ml @@ -43,9 +43,10 @@ let sub a b = Int32.sub a b (* a++ *) let incr a = Int32.add a 1l -let compare a b = Int32.compare a b +let compare a b = Int32.compare a b let of_int32 t = t let of_int t = Int32.of_int t let to_int32 t = t let to_int t = Int32.to_int t -let to_string t = Printf.sprintf "%lu" t + +let pp fmt t = Format.fprintf fmt "%lu" t diff --git a/tcp/sequence.mli b/tcp/sequence.mli index 451dedcb5..0d6ea54aa 100644 --- a/tcp/sequence.mli +++ b/tcp/sequence.mli @@ -45,4 +45,5 @@ val of_int32: int32 -> t val of_int: int -> t val to_int32: t -> int32 val to_int: t -> int -val to_string: t -> string + +val pp: Format.formatter -> t -> unit diff --git a/tcp/state.ml b/tcp/state.ml index 629ec9dd5..a78613376 100644 --- a/tcp/state.ml +++ b/tcp/state.ml @@ -14,9 +14,11 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt +open Lwt.Infix open Printf +let debug = Log.create "State" + type action = | Passive_open | Recv_rst @@ -56,34 +58,34 @@ let t ~on_close = let state t = t.state -let string_of_action = function - | Passive_open -> "Passive_open" - | Recv_rst -> "Recv_rst" - | Recv_synack x -> "Recv_synack " ^ (Sequence.to_string x) - | Recv_ack x -> "Recv_ack " ^ (Sequence.to_string x) - | Recv_fin -> "Recv_fin" - (* | Recv_finack x -> "Recv_finack " ^ (Sequence.to_string x) *) - | Send_syn x -> "Send_syn " ^ (Sequence.to_string x) - | Send_synack x -> "Send_synack " ^ (Sequence.to_string x) - | Send_rst -> "Send_rst" - | Send_fin x -> "Send_fin " ^ (Sequence.to_string x) - | Timeout -> "Timeout" - -let string_of_tcpstate = function - | Closed -> "Closed" - | Listen -> "Listen" - | Syn_rcvd x -> "Syn_rcvd " ^ (Sequence.to_string x) - | Syn_sent x -> "Syn_sent " ^ (Sequence.to_string x) - | Established -> "Established" - | Close_wait -> "Close_wait" - | Last_ack x -> "Last_ack " ^ (Sequence.to_string x) - | Fin_wait_1 x -> "Fin_wait_1 " ^ (Sequence.to_string x) - | Fin_wait_2 i -> "Fin_wait_2 " ^ (string_of_int i) - | Closing x -> "Closing " ^ (Sequence.to_string x) - | Time_wait -> "Time_wait" - -let to_string t = - sprintf "{ %s }" (string_of_tcpstate t.state) +let pp_action fmt = function + | Passive_open -> Log.ps fmt "Passive_open" + | Recv_rst -> Log.ps fmt "Recv_rst" + | Recv_synack x -> Log.pf fmt "Recv_synack(%a)" Sequence.pp x + | Recv_ack x -> Log.pf fmt "Recv_ack(%a)" Sequence.pp x + | Recv_fin -> Log.ps fmt "Recv_fin" + (* | Recv_finack x -> pf fmt "Recv_finack(%a)" Sequence.pp x *) + | Send_syn x -> Log.pf fmt "Send_syn(%a)" Sequence.pp x + | Send_synack x -> Log.pf fmt "Send_synack(%a)" Sequence.pp x + | Send_rst -> Log.ps fmt "Send_rst" + | Send_fin x -> Log.pf fmt "Send_fin(%a)" Sequence.pp x + | Timeout -> Log.ps fmt "Timeout" + +let pp_tcpstate fmt = function + | Closed -> Log.ps fmt "Closed" + | Listen -> Log.ps fmt "Listen" + | Syn_rcvd x -> Log.pf fmt "Syn_rcvd(%a)" Sequence.pp x + | Syn_sent x -> Log.pf fmt "Syn_sent(%a)" Sequence.pp x + | Established -> Log.ps fmt "Established" + | Close_wait -> Log.ps fmt "Close_wait" + | Last_ack x -> Log.pf fmt "Last_ack(%a)" Sequence.pp x + | Fin_wait_1 x -> Log.pf fmt "Fin_wait_1(%a)" Sequence.pp x + | Fin_wait_2 i -> Log.pf fmt "Fin_wait_2(%d)" i + | Closing x -> Log.pf fmt "Closing(%a)" Sequence.pp x + | Time_wait -> Log.ps fmt "Time_wait" + | Reset -> Log.ps fmt "Reset" + +let pp fmt t = Log.pf fmt "{ %a }" pp_tcpstate t.state module Make(Time:V1_LWT.TIME) = struct @@ -91,29 +93,31 @@ module Make(Time:V1_LWT.TIME) = struct let time_wait_time = (* 30. *) 2. let rec finwait2timer t count timeout = - Time.sleep timeout - >>= fun () -> + Log.f debug (fun fmt -> Log.pf fmt "finwait2timer %.02f" timeout); + Time.sleep timeout >>= fun () -> match t.state with | Fin_wait_2 i -> + Log.s debug "finwait2timer: Fin_wait_2"; if i = count then begin t.state <- Closed; t.on_close (); - return_unit + Lwt.return_unit end else begin finwait2timer t i timeout end - | _ -> - return_unit + | s -> + Log.f debug (fun fmt -> Log.pf fmt "finwait2timer: %a" pp_tcpstate s); + Lwt.return_unit let timewait t twomsl = - Time.sleep twomsl - >>= fun () -> + Log.f debug (fun fmt -> Log.pf fmt "timewait %.02f" twomsl); + Time.sleep twomsl >>= fun () -> t.state <- Closed; + Log.s debug "timewait on_close"; t.on_close (); - return_unit + Lwt.return_unit let tick t (i:action) = - (* printf "%s - %s -> " (to_string t) (action_to_string i); *) let diffone x y = Sequence.incr y = x in let tstr s (i:action) = match s, i with @@ -133,7 +137,7 @@ module Make(Time:V1_LWT.TIME) = struct | Fin_wait_1 a, Recv_ack b -> if diffone b a then let count = 0 in - let _ = finwait2timer t count fin_wait_2_time in + Lwt.async (fun () -> finwait2timer t count fin_wait_2_time); Fin_wait_2 count else Fin_wait_1 a @@ -141,8 +145,10 @@ module Make(Time:V1_LWT.TIME) = struct | Fin_wait_1 _, Timeout -> t.on_close (); Closed | Fin_wait_1 _, Recv_rst -> t.on_close (); Reset | Fin_wait_2 i, Recv_ack _ -> Fin_wait_2 (i + 1) - | Fin_wait_2 _, Recv_fin -> let _ = timewait t time_wait_time in Time_wait | Fin_wait_2 _, Recv_rst -> t.on_close (); Reset + | Fin_wait_2 _, Recv_fin -> + Lwt.async (fun () -> timewait t time_wait_time); + Time_wait | Closing a, Recv_ack b -> if diffone b a then Time_wait else Closing a | Closing _, Timeout -> t.on_close (); Closed | Closing _, Recv_rst -> t.on_close (); Reset @@ -155,6 +161,11 @@ module Make(Time:V1_LWT.TIME) = struct | Last_ack _, Recv_rst -> t.on_close (); Reset | x, _ -> x in - t.state <- tstr t.state i - (* ; printf "%s\n%!" (to_string t) *) + let old_state = t.state in + let new_state = tstr t.state i in + Log.f debug (fun fmt -> + Log.pf fmt "%a - %a -> %a" + pp_tcpstate old_state pp_action i pp_tcpstate new_state); + t.state <- new_state; + end diff --git a/tcp/state.mli b/tcp/state.mli index d914b69f2..fb5842f32 100644 --- a/tcp/state.mli +++ b/tcp/state.mli @@ -14,6 +14,8 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) +val debug: Log.t + type action = | Passive_open | Recv_rst @@ -27,7 +29,7 @@ type action = | Send_fin of Sequence.t | Timeout -val string_of_action: action -> string +val pp_action: Format.formatter -> action -> unit type tcpstate = | Closed @@ -43,7 +45,7 @@ type tcpstate = | Time_wait | Reset -val string_of_tcpstate : tcpstate -> string +val pp_tcpstate : Format.formatter -> tcpstate -> unit type close_cb = unit -> unit @@ -56,7 +58,7 @@ type t = { val state : t -> tcpstate val t : on_close:close_cb -> t -val to_string: t -> string +val pp: Format.formatter -> t -> unit module Make(Time : V1_LWT.TIME) : sig val fin_wait_2_time : float diff --git a/tcp/stats.ml b/tcp/stats.ml new file mode 100644 index 000000000..60ffbf173 --- /dev/null +++ b/tcp/stats.ml @@ -0,0 +1,98 @@ +(* + * Copyright (c) 2015 Thomas Gazagnaire + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +module Gc = struct + + let gc = ref false + let enable () = gc := true + let disable () = gc := false + + let full = ref false + let full_major b = full := b + + let words () = + let t = Gc.stat () in + t.Gc.live_words / 1_000 + + let run_full_major () = if !full then Gc.full_major () + + let pp fmt () = + match !gc with + | false -> () + | true -> + run_full_major (); + Format.fprintf fmt "|%dk" (words ()) + +end + +type counter = { + incrs: int; + decrs: int; +} + +let zero = { incrs = 0; decrs = 0 } +let value c = c.incrs - c.decrs +let incrs c = c.incrs +let decrs c = c.decrs + +let pp_counter fmt t = Format.fprintf fmt "%d" (value t) + +type t = { + tcp_flows : counter; + tcp_listens : counter; + tcp_channels: counter; + tcp_connects: counter; + tcp_timers : counter; +} + +let pp fmt t = Format.fprintf fmt "[%a|%a|%a|%a%a]" + pp_counter t.tcp_timers + pp_counter t.tcp_listens + pp_counter t.tcp_channels + pp_counter t.tcp_connects + Gc.pp () + +let tcp_flows = ref zero +let tcp_listens = ref zero +let tcp_channels = ref zero +let tcp_connects = ref zero +let tcp_timers = ref zero + +let incr r = let c = !r in r := { c with incrs = c.incrs + 1 } +let decr r = let c = !r in r := { c with decrs = c.decrs + 1 } + +let incr_flow t = incr tcp_flows +let decr_flow t = decr tcp_flows + +let incr_listen t = incr tcp_listens +let decr_listen t = decr tcp_listens + +let incr_channel t = incr tcp_channels +let decr_channel t = decr tcp_channels + +let incr_connect t = incr tcp_connects +let decr_connect t = decr tcp_connects + +let incr_timer t = incr tcp_timers +let decr_timer t = decr tcp_timers + +let create () = { + tcp_flows = !tcp_flows; + tcp_listens = !tcp_listens; + tcp_channels = !tcp_channels; + tcp_connects = !tcp_connects; + tcp_timers = !tcp_timers; +} diff --git a/tcp/stats.mli b/tcp/stats.mli new file mode 100644 index 000000000..b3e6564b6 --- /dev/null +++ b/tcp/stats.mli @@ -0,0 +1,75 @@ +(* + * Copyright (c) 2015 Thomas Gazagnaire + * + * Permission to use, copy, modify, and distribute this software for any + * purpose with or without fee is hereby granted, provided that the above + * copyright notice and this permission notice appear in all copies. + * + * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES + * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF + * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR + * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES + * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN + * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF + * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. + *) + +(** TCP Statistics *) + +type counter +(** The type for counters. *) + +val value: counter -> int +(** The counter value. [value t] is [{!incr} t] - [{!decrs} t].*) + +val incrs: counter -> int +(** How many time the counter has been increased. *) + +val decrs: counter -> int +(** How many time the counter has been decreased. *) + +type t = { + tcp_flows : counter; + tcp_listens : counter; + tcp_channels: counter; + tcp_connects: counter; + tcp_timers : counter; +} + +val pp: Format.formatter -> t -> unit + +val incr_flow: unit -> unit +val decr_flow: unit -> unit + +val incr_listen: unit -> unit +val decr_listen: unit -> unit + +val incr_channel: unit -> unit +val decr_channel: unit -> unit + +val incr_connect: unit -> unit +val decr_connect: unit -> unit + +val incr_timer: unit -> unit +val decr_timer: unit -> unit + +val create: unit -> t + +module Gc: sig + (** Show GC stats *) + + val enable: unit -> unit + (** Show live works (in k) on every debug line. *) + + val disable: unit -> unit + + val full_major: bool -> unit + (** [full_major true] runs a [Gc.full_major] before printing any + debug statement. Quite expensive but can sometimes be useful. By + default, it is set to [false]. + + {b Note:} This is very slow, use it if you really need it! + + *) + +end diff --git a/tcp/tcp.mlpack b/tcp/tcp.mlpack index 7a605cd00..7d25cd8f2 100644 --- a/tcp/tcp.mlpack +++ b/tcp/tcp.mlpack @@ -1,5 +1,5 @@ # OASIS_START -# DO NOT EDIT (digest: a1d3c8591e91c674b25051803a310a2d) +# DO NOT EDIT (digest: 195c2067e8367d1d5113b00734c29168) Options Wire State @@ -11,4 +11,6 @@ Segment User_buffer Pcb Flow +Stats +Log # OASIS_STOP diff --git a/tcp/tcptimer.ml b/tcp/tcptimer.ml index dcdbffe3c..aea1eca0d 100644 --- a/tcp/tcptimer.ml +++ b/tcp/tcptimer.ml @@ -14,7 +14,9 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt +open Lwt.Infix + +let debug = Log.create "TCP.Tcptimer" type tr = | Stoptimer @@ -22,7 +24,7 @@ type tr = | ContinueSetPeriod of (float * Sequence.t) type t = { - expire: (Sequence.t -> tr); + expire: (Sequence.t -> tr Lwt.t); mutable period: float; mutable running: bool; } @@ -32,17 +34,26 @@ module Make(Time:V1_LWT.TIME) = struct let running = false in {period; expire; running} - let rec timerloop t s = - Time.sleep t.period >>= fun () -> - match t.expire s with - | Stoptimer -> - t.running <- false; - return_unit - | Continue d -> - timerloop t d - | ContinueSetPeriod (p, d) -> - t.period <- p; - timerloop t d + let timerloop t s = + Log.s debug "timerloop"; + Stats.incr_timer (); + let rec aux t s = + Time.sleep t.period >>= fun () -> + t.expire s >>= function + | Stoptimer -> + Stats.decr_timer (); + t.running <- false; + Log.s debug "timerloop: stoptimer"; + Lwt.return_unit + | Continue d -> + Log.s debug "timerloop: continuer"; + aux t d + | ContinueSetPeriod (p, d) -> + Log.s debug "timerloop: coontinuesetperiod"; + t.period <- p; + aux t d + in + aux t s let period t = t.period @@ -50,8 +61,8 @@ module Make(Time:V1_LWT.TIME) = struct if not t.running then begin t.period <- p; t.running <- true; - let _ = timerloop t s in - return_unit + Lwt.async (fun () -> timerloop t s); + Lwt.return_unit end else - return_unit + Lwt.return_unit end diff --git a/tcp/tcptimer.mli b/tcp/tcptimer.mli index 36fc9e4f2..0bdb4e2f5 100644 --- a/tcp/tcptimer.mli +++ b/tcp/tcptimer.mli @@ -14,7 +14,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -type t +type t type tr = | Stoptimer @@ -22,7 +22,9 @@ type tr = | ContinueSetPeriod of (float * Sequence.t) module Make(T:V1_LWT.TIME) : sig - val t : period: float -> expire: (Sequence.t -> tr) -> t + val t : period: float -> expire: (Sequence.t -> tr Lwt.t) -> t val start : t -> ?p:float -> Sequence.t -> unit Lwt.t end + +val debug: Log.t diff --git a/tcp/user_buffer.ml b/tcp/user_buffer.ml index 9ebad2872..5824d8d5e 100644 --- a/tcp/user_buffer.ml +++ b/tcp/user_buffer.ml @@ -15,7 +15,11 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt +open Lwt.Infix + +let lwt_sequence_add_l s seq = + let (_:'a Lwt_sequence.node) = Lwt_sequence.add_l s seq in + () (* A bounded queue to receive data segments and let readers block on receiving them. Also supports a monitor that is informed when the @@ -47,7 +51,7 @@ module Rx = struct let rx_wnd = max 0l (Int32.sub t.max_size t.cur_size) in Window.set_rx_wnd t.wnd rx_wnd; match t.watcher with - |None -> return_unit + |None -> Lwt.return_unit |Some w -> Lwt_mvar.put w t.cur_size let seglen s = @@ -65,14 +69,14 @@ module Rx = struct notify_size_watcher t >>= fun () -> th >>= fun () -> ignore(Lwt_sequence.add_r s t.q); - return_unit + Lwt.return_unit else match Lwt_sequence.take_opt_l t.readers with | None -> t.cur_size <- Int32.(add t.cur_size (of_int (seglen s))); ignore(Lwt_sequence.add_r s t.q); notify_size_watcher t | Some u -> - return (Lwt.wakeup u s) + Lwt.return (Lwt.wakeup u s) let take_l t = if Lwt_sequence.is_empty t.q then begin @@ -89,7 +93,7 @@ module Rx = struct |None -> () |Some w -> Lwt.wakeup w () end; - return s + Lwt.return s end let cur_size t = t.cur_size @@ -147,7 +151,7 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct (* Wait until at least sz bytes are available in the window *) let rec wait_for t sz = if (available t) >= sz then begin - return_unit + Lwt.return_unit end else begin let th,u = MProf.Trace.named_task "User_buffer.wait_for" in @@ -175,7 +179,7 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct (* Wait until the user buffer is flushed *) let rec wait_for_flushed t = if Lwt_sequence.is_empty t.buffer then begin - return_unit + Lwt.return_unit end else begin let th,u = MProf.Trace.named_task "User_buffer.wait_for_flushed" in @@ -188,15 +192,12 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct let rec clear_buffer t = let rec addon_more curr_data l = match Lwt_sequence.take_opt_l t.buffer with - | None -> - (* printf "out at 1\n%!";*) - List.rev curr_data + | None -> List.rev curr_data | Some s -> let s_len = len s in match s_len > l with | true -> - (*printf "out at 2 %lu %lu\n%!" s_len l;*) - let _ = Lwt_sequence.add_l s t.buffer in + lwt_sequence_add_l s t.buffer; List.rev curr_data | false -> t.bufbytes <- Int32.sub t.bufbytes s_len; @@ -210,12 +211,12 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct | true -> begin match avail_len with |0l -> (* return pkt to buffer *) - let _ = Lwt_sequence.add_l s t.buffer in + lwt_sequence_add_l s t.buffer; None |_ -> (* split buffer into a partial write *) let to_send,remaining = Cstruct.split s (Int32.to_int avail_len) in (* queue remaining view *) - let _ = Lwt_sequence.add_l remaining t.buffer in + lwt_sequence_add_l remaining t.buffer; t.bufbytes <- Int32.sub t.bufbytes avail_len; Some [to_send] end @@ -229,10 +230,10 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct Some [s] in match Lwt_sequence.is_empty t.buffer with - | true -> return_unit + | true -> Lwt.return_unit | false -> match get_pkt_to_send () with - | None -> return_unit + | None -> Lwt.return_unit | Some pkt -> let b = compactbufs pkt in TXS.output ~flags:Segment.Psh t.txq b >>= fun () -> @@ -248,7 +249,7 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct match datav with |[] -> begin match acc with - |[] -> return_unit + |[] -> Lwt.return_unit |_ -> transmit acc end |hd::tl -> @@ -272,7 +273,7 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct t.bufbytes <- Int32.add t.bufbytes l; List.iter (fun data -> ignore(Lwt_sequence.add_r data t.buffer)) datav; if t.bufbytes < mss then - return_unit + Lwt.return_unit else clear_buffer t | true -> @@ -281,7 +282,7 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct | true -> t.bufbytes <- Int32.add t.bufbytes l; List.iter (fun data -> ignore(Lwt_sequence.add_r data t.buffer)) datav; - return_unit + Lwt.return_unit | false -> let max_size = Window.tx_mss t.wnd in transmit_segments ~mss:max_size ~txq:t.txq datav @@ -292,14 +293,14 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct | false -> t.bufbytes <- Int32.add t.bufbytes l; List.iter (fun data -> ignore(Lwt_sequence.add_r data t.buffer)) datav; - return_unit + Lwt.return_unit | true -> let avail_len = available_cwnd t in match avail_len < l with | true -> t.bufbytes <- Int32.add t.bufbytes l; List.iter (fun data -> ignore(Lwt_sequence.add_r data t.buffer)) datav; - return_unit + Lwt.return_unit | false -> let max_size = Window.tx_mss t.wnd in transmit_segments ~mss:max_size ~txq:t.txq datav @@ -307,11 +308,11 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct let inform_app t = match Lwt_sequence.take_opt_l t.writers with - | None -> return_unit + | None -> Lwt.return_unit | Some w -> Lwt.wakeup w (); (* TODO: check if this should wake all writers not just one *) - return_unit + Lwt.return_unit (* Indicate that more bytes are available for waiting writers. Note that sz does not take window scaling into account, and so diff --git a/tcp/window.ml b/tcp/window.ml index a55e69fcb..652f5ed3c 100644 --- a/tcp/window.ml +++ b/tcp/window.ml @@ -14,7 +14,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Printf +let debug = Log.create "Window" type t = { tx_mss: int; @@ -61,12 +61,14 @@ let alpha = 0.125 (* see RFC 2988 *) let beta = 0.25 (* see RFC 2988 *) (* To string for debugging *) -let to_string t = - sprintf "rx_nxt=%s rx_nxt_inseq=%s tx_nxt=%s rx_wnd=%lu tx_wnd=%lu snd_una=%s" - (Sequence.to_string t.rx_nxt) - (Sequence.to_string t.rx_nxt_inseq) - (Sequence.to_string t.tx_nxt) - t.rx_wnd t.tx_wnd (Sequence.to_string t.snd_una) +let pp fmt t = + Log.pf fmt + "rx_nxt=%a rx_nxt_inseq=%a tx_nxt=%a rx_wnd=%lu tx_wnd=%lu snd_una=%a" + Sequence.pp t.rx_nxt + Sequence.pp t.rx_nxt_inseq + Sequence.pp t.tx_nxt + t.rx_wnd t.tx_wnd + Sequence.pp t.snd_una (* Initialise the sequence space *) let t ~rx_wnd_scale ~tx_wnd_scale ~rx_wnd ~tx_wnd ~rx_isn ~tx_mss ~tx_isn = @@ -111,8 +113,9 @@ let valid t seq = let redge = Sequence.(add t.rx_nxt (of_int32 t.rx_wnd)) in let ledge = Sequence.(sub t.rx_nxt (of_int32 t.max_rx_wnd)) in let r = Sequence.between seq ledge redge in - (* printf "TCP_window: valid check for seq=%s for range %s[%lu] res=%b\n%!" - (Sequence.to_string seq) (Sequence.to_string t.rx_nxt) t.rx_wnd r; *) + (* PERF: ~5% perf degradation if commenting out that line + Log.f debug "valid: seq=%a range=%a[%lu] res=%b" + Sequence.pp seq Sequence.pp t.rx_nxt t.rx_wnd r; *) r (* Advance received packet sequence number *) @@ -170,7 +173,7 @@ module Make(Clock:V1.CLOCK) = struct if Sequence.gt r t.snd_una then t.snd_una <- r; if Sequence.geq r t.fast_rec_th then begin - (* printf "EXITING fast recovery\n%!"; *) + Log.s debug "EXITING fast recovery"; t.cwnd <- t.ssthresh; t.fast_recovery <- false; end else begin @@ -224,14 +227,10 @@ let alert_fast_rexmit t _ = let inflight = Sequence.to_int32 (Sequence.sub t.tx_nxt t.snd_una) in let newssthresh = max (Int32.div inflight 2l) (Int32.of_int (t.tx_mss * 2)) in let newcwnd = Int32.add newssthresh (Int32.of_int (t.tx_mss * 2)) in - (* - printf "ENTERING fast recovery inflight=%d, ssthresh=%d -> %d, cwnd=%d -> %d\n%!" - (Int32.to_int inflight) - (Int32.to_int t.ssthresh) - (Int32.to_int newssthresh) - (Int32.to_int t.cwnd) - (Int32.to_int newcwnd); - *) + Log.f debug (fun fmt -> + Log.pf fmt "ENTERING fast recovery inflight=%ld, ssthresh=%ld -> %ld, \ + cwnd=%ld -> %ld" + inflight t.ssthresh newssthresh t.cwnd newcwnd); t.fast_recovery <- true; t.fast_rec_th <- t.tx_nxt; t.ssthresh <- newssthresh; diff --git a/tcp/window.mli b/tcp/window.mli index 8eed32430..2edff9990 100644 --- a/tcp/window.mli +++ b/tcp/window.mli @@ -14,10 +14,11 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) +val debug: Log.t type t -val to_string: t -> string +val pp: Format.formatter -> t -> unit val t : rx_wnd_scale:int -> tx_wnd_scale:int -> rx_wnd:int -> tx_wnd:int -> rx_isn:Sequence.t -> tx_mss:int option -> tx_isn:Sequence.t -> t diff --git a/tcp/wire.ml b/tcp/wire.ml index 3844607af..1feaed3a1 100644 --- a/tcp/wire.ml +++ b/tcp/wire.ml @@ -14,7 +14,9 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) -open Lwt +open Lwt.Infix + +let debug = Log.create "Wire" module Tcp_wire = Wire_structs.Tcp_wire @@ -75,13 +77,17 @@ module Make (Ip:V1_LWT.IP) = struct Tcp_wire.set_tcp_urg_ptr tcp_frame 0; let checksum = Ip.checksum frame (tcp_frame :: datav) in Tcp_wire.set_tcp_checksum tcp_frame checksum; - (* printf "TCP.xmit checksum %04x %s.%d->%s.%d rst %b syn %b fin %b psh %b seq - %lu ack %lu %s datalen %d datafrag %d dataoff %d olen %d\n%!" checksum - (ipv4_addr_to_string id.local_ip) id.local_port - (ipv4_addr_to_string id.dest_ip) id.dest_port - rst syn fin psh sequence ack_number (Options.prettyprint options) - (Cstruct.lenv datav) (List.length datav) data_off options_len; - *) + (* PERF: uncommenting the next expression results in ~10% perf degradation + Log.f debug (fun fmt -> + Log.pf fmt + "xmit checksum=%04x %a.%d->%a.%d rst=%b syn=%b fin=%b psh=%b \ + seq=%lu ack=%lu options=%a datalen=%d datafrag=%d dataoff=%d olen=%d" + checksum + Ipaddr.pp_hum (Ip.to_uipaddr id.local_ip) id.local_port + Ipaddr.pp_hum (Ip.to_uipaddr id.dest_ip) id.dest_port + rst syn fin psh sequence ack_number Options.pps options + (Cstruct.lenv datav) (List.length datav) data_off options_len); *) MProf.Counter.increase count_tcp_to_ip (Cstruct.lenv datav); Ip.writev ip frame datav + end diff --git a/tcp/wire.mli b/tcp/wire.mli index ef9332e2c..9eea7693f 100644 --- a/tcp/wire.mli +++ b/tcp/wire.mli @@ -14,6 +14,7 @@ * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. *) +val debug: Log.t val get_options : Cstruct.t -> Options.t list val set_options : Cstruct.t -> Options.t list -> int val get_payload : Cstruct.t -> Cstruct.t