Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

first cut at full RST processing for established connections #148

Merged
merged 2 commits into from
Jun 9, 2015
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 13 additions & 9 deletions tcp/pcb.ml
Original file line number Diff line number Diff line change
Expand Up @@ -137,18 +137,18 @@ struct

(* Process an incoming TCP packet that has an active PCB *)
let input _t pkt (pcb,_) =
(* URG_TODO: Deal correctly with incomming RST segment *)
let sequence = Sequence.of_int32 (Tcp_wire.get_tcp_sequence pkt) in
let ack_number =
Sequence.of_int32 (Tcp_wire.get_tcp_ack_number pkt)
in
let fin = Tcp_wire.get_fin pkt in
let syn = Tcp_wire.get_syn pkt in
let ack = Tcp_wire.get_ack pkt in
let rst = Tcp_wire.get_rst pkt in
let window = Tcp_wire.get_tcp_window pkt in
let data = Wire.get_payload pkt in
let seg =
RXS.segment ~sequence ~fin ~syn ~ack ~ack_number ~window ~data
RXS.segment ~sequence ~fin ~syn ~rst ~ack ~ack_number ~window ~data
in
let { rxq; _ } = pcb in
(* Coalesce any outstanding segments and retrieve ready segments *)
Expand Down Expand Up @@ -193,13 +193,16 @@ struct

module Wnd = struct

let thread ~urx:_ ~utx ~wnd:_ ~tx_wnd_update =
let thread ~urx:_ ~utx ~wnd:_ ~state ~tx_wnd_update =
(* Monitor our transmit window when updates are received
remotely, and tell the application that new space is
available when it is blocked *)
let rec tx_window_t () =
Lwt_mvar.take tx_wnd_update >>= fun tx_wnd ->
UTX.free utx tx_wnd >>= fun () ->
begin match State.state state with
| State.Reset -> UTX.reset utx
| _ -> UTX.free utx tx_wnd
end >>= fun () ->
tx_window_t ()
in
tx_window_t ()
Expand Down Expand Up @@ -303,7 +306,7 @@ struct
let th =
(Tx.thread t pcb ~send_ack ~rx_ack) <?>
(Rx.thread pcb ~rx_data) <?>
(Wnd.thread ~utx ~urx ~wnd ~tx_wnd_update)
(Wnd.thread ~utx ~urx ~wnd ~state ~tx_wnd_update)
in
pcb_allocs := !pcb_allocs + 1;
th_allocs := !th_allocs + 1;
Expand Down Expand Up @@ -470,7 +473,6 @@ struct
min 4000 (min (Window.tx_mss pcb.wnd)
(Int32.to_int (UTX.available pcb.utx)))

(* URG_TODO: raise exception if not in Established or Close_wait state *)
(* Wait for more write space *)
let write_wait_for pcb sz =
UTX.wait_for pcb.utx (Int32.of_int sz)
Expand All @@ -486,10 +488,12 @@ struct
let remaing_bit = Cstruct.sub data av_len (len - av_len) in
writefn pcb wfn first_bit >>= fun () ->
writefn pcb wfn remaing_bit
| _ -> wfn [data]
| _ ->
match State.state pcb.state with
| State.Established | State.Close_wait -> wfn [data]
(* URG_TODO: return error instead of dropping silently *)
| _ -> return_unit

(* URG_TODO: raise exception when trying to write to closed connection
instead of quietly returning *)
(* Blocking write on a PCB *)
let write pcb data = writefn pcb (UTX.write pcb.utx) data
let writev pcb data = Lwt_list.iter_s (fun d -> write pcb d) data
Expand Down
43 changes: 33 additions & 10 deletions tcp/segment.ml
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ module Rx(Time:V1_LWT.TIME) = struct
fin: bool;
syn: bool;
ack: bool;
rst: bool;
ack_number: Sequence.t;
window: int;
}
Expand All @@ -52,8 +53,8 @@ module Rx(Time:V1_LWT.TIME) = struct
(Sequence.to_string seg.sequence) seg.fin seg.syn seg.ack
(Sequence.to_string seg.ack_number) seg.window

let segment ~sequence ~fin ~syn ~ack ~ack_number ~window ~data =
{ sequence; fin; syn; ack; ack_number; window; data }
let segment ~sequence ~fin ~syn ~rst ~ack ~ack_number ~window ~data =
{ sequence; fin; syn; ack; rst; ack_number; window; data }

let len seg =
(Cstruct.len seg.data) +
Expand Down Expand Up @@ -104,7 +105,20 @@ module Rx(Time:V1_LWT.TIME) = struct
let input (q:t) seg =
(* Check that the segment fits into the valid receive window *)
let force_ack = ref false in
if not (Window.valid q.wnd seg.sequence) then return_unit
(* URG_TODO also check that this is a valid RST i.e. the seq/ack numbers are in the window *)
if (seg.rst) then begin
StateTick.tick q.state State.Recv_rst;
(* Dump all the received but out of order frames *)
q.segs <- S.empty;
(* Signal TX side *)
let txalert ack_svcd = match ack_svcd with
| true -> Lwt_mvar.put q.tx_ack ((Window.ack_seq q.wnd), (Window.ack_win q.wnd))
| false -> return_unit
in
txalert (Window.ack_serviced q.wnd) >>= fun () ->
(* Use the fin path to inform the application of end of stream *)
Lwt_mvar.put q.rx_data (None, Some 0)
end else if not (Window.valid q.wnd seg.sequence) then return_unit
else
(* Insert the latest segment *)
let segs = S.add seg q.segs in
Expand Down Expand Up @@ -327,13 +341,22 @@ module Tx (Time:V1_LWT.TIME) (Clock:V1.CLOCK) = struct
Window.set_ack_serviced q.wnd true;
let seq = Window.ack_seq q.wnd in
let win = Window.ack_win q.wnd in
let ack_len = Sequence.sub seq (Window.tx_una q.wnd) in
let dupacktest () =
0l = Sequence.to_int32 ack_len &&
Window.tx_wnd_unscaled q.wnd = Int32.of_int win &&
not (Lwt_sequence.is_empty q.segs)
in
serviceack (dupacktest ()) ack_len seq win;
begin match State.state q.state with
| State.Reset -> let rec empty_segs segs =
match Lwt_sequence.take_opt_l segs with
| None -> ()
| Some s -> empty_segs segs
in
empty_segs q.segs
| _ ->
let ack_len = Sequence.sub seq (Window.tx_una q.wnd) in
let dupacktest () =
0l = Sequence.to_int32 ack_len &&
Window.tx_wnd_unscaled q.wnd = Int32.of_int win &&
not (Lwt_sequence.is_empty q.segs)
in
serviceack (dupacktest ()) ack_len seq win
end;
(* Inform the window thread of updates to the transmit window *)
Lwt_mvar.put q.tx_wnd_update win >>= fun () ->
tx_ack_t ()
Expand Down
2 changes: 1 addition & 1 deletion tcp/segment.mli
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module Rx (T:V1_LWT.TIME) : sig
val string_of_segment: segment -> string

val segment:
sequence:Sequence.t -> fin:bool -> syn:bool -> ack:bool ->
sequence:Sequence.t -> fin:bool -> syn:bool -> rst:bool -> ack:bool ->
ack_number:Sequence.t -> window:int -> data:Cstruct.t ->
segment

Expand Down
15 changes: 11 additions & 4 deletions tcp/state.ml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ type action =
| Recv_synack of Sequence.t
| Recv_ack of Sequence.t
| Recv_fin
| Recv_finack of Sequence.t
(* | Recv_finack of Sequence.t *)
| Send_syn of Sequence.t
| Send_synack of Sequence.t
| Send_rst
Expand All @@ -42,6 +42,7 @@ type tcpstate =
| Fin_wait_2 of int
| Closing of Sequence.t
| Time_wait
| Reset

type close_cb = unit -> unit

Expand All @@ -61,7 +62,7 @@ let string_of_action = function
| Recv_synack x -> "Recv_synack " ^ (Sequence.to_string x)
| Recv_ack x -> "Recv_ack " ^ (Sequence.to_string x)
| Recv_fin -> "Recv_fin"
| Recv_finack x -> "Recv_finack " ^ (Sequence.to_string x)
(* | Recv_finack x -> "Recv_finack " ^ (Sequence.to_string x) *)
| Send_syn x -> "Send_syn " ^ (Sequence.to_string x)
| Send_synack x -> "Send_synack " ^ (Sequence.to_string x)
| Send_rst -> "Send_rst"
Expand Down Expand Up @@ -127,7 +128,8 @@ module Make(Time:V1_LWT.TIME) = struct
| Established, Recv_ack _ -> Established
| Established, Send_fin a -> Fin_wait_1 a
| Established, Recv_fin -> Close_wait
| Established, Timeout -> t.on_close (); Closed
| Established, Timeout -> t.on_close (); Closed
| Established, Recv_rst -> t.on_close (); Reset
| Fin_wait_1 a, Recv_ack b ->
if diffone b a then
let count = 0 in
Expand All @@ -136,16 +138,21 @@ module Make(Time:V1_LWT.TIME) = struct
else
Fin_wait_1 a
| Fin_wait_1 a, Recv_fin -> Closing a
| Fin_wait_1 a, Recv_finack b -> if diffone b a then Time_wait else Fin_wait_1 a
| Fin_wait_1 _, Timeout -> t.on_close (); Closed
| Fin_wait_1 _, Recv_rst -> t.on_close (); Reset
| Fin_wait_2 i, Recv_ack _ -> Fin_wait_2 (i + 1)
| Fin_wait_2 _, Recv_fin -> let _ = timewait t time_wait_time in Time_wait
| Fin_wait_2 _, Recv_rst -> t.on_close (); Reset
| Closing a, Recv_ack b -> if diffone b a then Time_wait else Closing a
| Closing _, Timeout -> t.on_close (); Closed
| Closing _, Recv_rst -> t.on_close (); Reset
| Time_wait, Timeout -> t.on_close (); Closed
| Close_wait, Send_fin a -> Last_ack a
| Close_wait, Timeout -> t.on_close (); Closed
| Close_wait, Recv_rst -> t.on_close (); Reset
| Last_ack a, Recv_ack b -> if diffone b a then (t.on_close (); Closed) else Last_ack a
| Last_ack _, Timeout -> t.on_close (); Closed
| Last_ack _, Recv_rst -> t.on_close (); Reset
| x, _ -> x
in
t.state <- tstr t.state i
Expand Down
3 changes: 2 additions & 1 deletion tcp/state.mli
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ type action =
| Recv_synack of Sequence.t
| Recv_ack of Sequence.t
| Recv_fin
| Recv_finack of Sequence.t
(* | Recv_finack of Sequence.t *)
| Send_syn of Sequence.t
| Send_synack of Sequence.t
| Send_rst
Expand All @@ -41,6 +41,7 @@ type tcpstate =
| Fin_wait_2 of int
| Closing of Sequence.t
| Time_wait
| Reset

val string_of_tcpstate : tcpstate -> string

Expand Down
15 changes: 15 additions & 0 deletions tcp/user_buffer.ml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ open Lwt
queue size changes *)
module Rx = struct

(* TODO: check that flow control works on the rx side - ie if the application
stops taking data the window closes so the other side stops sending *)

type t = {
q: Cstruct.t option Lwt_sequence.t;
wnd: Window.t;
Expand Down Expand Up @@ -307,6 +310,7 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct
| None -> return_unit
| Some w ->
Lwt.wakeup w ();
(* TODO: check if this should wake all writers not just one *)
return_unit

(* Indicate that more bytes are available for waiting writers.
Expand All @@ -317,4 +321,15 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) = struct
clear_buffer t >>= fun () ->
inform_app t

let rec dump_buffer t =
match Lwt_sequence.is_empty t.buffer with
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this could be shortened to

let dump_buffer t =
  Lwt_sequence.iter_node_l Lwt_sequence.remove t

(untested though)

| true -> return_unit
| false ->
let _ = Lwt_sequence.take_l t.buffer in
dump_buffer t

let reset t =
dump_buffer t >>= fun () ->
inform_app t

end
1 change: 1 addition & 0 deletions tcp/user_buffer.mli
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ module Tx(Time:V1_LWT.TIME)(Clock:V1.CLOCK) : sig
val write: t -> Cstruct.t list -> unit Lwt.t
val write_nodelay: t -> Cstruct.t list -> unit Lwt.t
val free: t -> int -> unit Lwt.t
val reset: t -> unit Lwt.t
end