8000 [h2] scheduler: fix stream starvation by anmonteiro · Pull Request #204 · anmonteiro/ocaml-h2 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[h2] scheduler: fix stream starvation #204

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ Unreleased
([#201](https://github.com/anmonteiro/ocaml-h2/pull/201))
- h2: don't schedule streams as dependencies of others marked for removal
([#205](https://github.com/anmonteiro/ocaml-h2/pull/205))
- h2: revise scheduling algorithm to avoid starvation
([#199](https://github.com/anmonteiro/ocaml-h2/pull/199),
[#204](https://github.com/anmonteiro/ocaml-h2/pull/204), reported in
[#162](https://github.com/anmonteiro/ocaml-h2/issues/162), thanks
[@quernd](https://github.com/quernd))

0.9.0 2022-08-14
---------------
Expand Down
201 changes: 121 additions & 80 deletions lib/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -159,30 +159,38 @@ module Make (Streamd : StreamDescriptor) = struct
; inflow = initial_recv_window_size
}

let remove_from_parent (Parent parent) id =
let stream_id : type a. a node -> int32 = function
| Connection _ -> Stream_identifier.connection
| Stream { descriptor; _ } -> Streamd.id descriptor

let children : type a. a node -> PriorityQueue.t = function
| Stream { children; _ } -> children
| Connection { children; _ } -> children

let remove_child : type a. a node -> int32 -> unit =
fun parent id ->
match parent with
| Connection root ->
| Connection ({ children; _ } as node) ->
(* From RFC7540§5.3.1:
* A stream that is not dependent on any other stream is given a stream
* dependency of 0x0. In other words, the non-existent stream 0 forms
* the root of the tree. *)
root.children <- PriorityQueue.remove id root.children
| Stream stream ->
stream.children <- PriorityQueue.remove id stream.children

let children : type a. a node -> PriorityQueue.t = function
| Stream { children; _ } -> children
| Connection { children; _ } -> children
node.children <- PriorityQueue.remove id children
| Stream ({ children; _ } as node) ->
node.children <- PriorityQueue.remove id children

let stream_id : type a. a node -> int32 = function
| Connection _ -> Stream_identifier.connection
| Stream { descriptor; _ } -> Streamd.id descriptor
let update_children : type a. a node -> PriorityQueue.t -> unit =
fun parent updated_children ->
match parent with
| Connection s -> s.children <- updated_children
| Stream s -> s.children <- updated_children

let set_parent stream_node ~exclusive new_parent =
let (Stream ({ descriptor; _ } as stream)) = stream_node in
let (Parent new_parent_node) = new_parent in
let set_parent stream_node ~exclusive (Parent new_parent_node as new_parent) =
let (Stream ({ descriptor; parent = Parent old_parent_node; _ } as stream)) =
stream_node
in
let stream_id = Streamd.id descriptor in
remove_from_parent stream.parent stream_id;
remove_child old_parent_node stream_id;
stream.parent <- new_parent;
let new_children =
let new_children = children new_parent_node in
Expand All @@ -208,9 +216,7 @@ module Make (Streamd : StreamDescriptor) = struct
PriorityQueue.sg stream_id stream_node)
else PriorityQueue.add stream_id stream_node new_children
in
match new_parent_node with
| Stream stream -> stream.children <- new_children
| Connection root -> root.children <- new_children
update_children new_parent_node new_children

let would_create_cycle ~new_parent (Stream { descriptor; _ }) =
let rec inner : type a. a node -> bool = function
Expand Down Expand Up @@ -284,6 +290,25 @@ module Make (Streamd : StreamDescriptor) = struct
set_parent stream_node ~exclusive new_parent);
stream.priority <- new_priority)

let update_t node n =
let (Stream ({ parent = Parent parent; descriptor; _ } as stream)) = node in
let tlast_p =
match parent with
| Connection { t_last; _ } -> t_last
| Stream { t_last; _ } -> t_last
in
stream.t <- tlast_p + (n * 256 / stream.priority.weight);
let id = Streamd.id descriptor in
remove_child parent id;
let updated_children = PriorityQueue.add id node (children parent) in
update_children parent updated_children

let update_t_last : type a. a node -> int -> unit =
fun p_node t_last ->
match p_node with
| Connection p -> p.t_last <- t_last
| Stream p -> p.t_last <- t_last

let add
(Connection root as t)
~priority
Expand All @@ -303,6 +328,7 @@ module Make (Streamd : StreamDescriptor) = struct
root.children <- PriorityQueue.add stream_id stream root.children;
if priority != Priority.default_priority
then reprioritize_stream t ~priority stream;
update_t stream 0;
stream

let get_node (Connection root) stream_id =
Expand All @@ -314,7 +340,7 @@ module Make (Streamd : StreamDescriptor) = struct
| None -> None

let iter (Connection { all_streams; _ }) ~f =
StreamsTbl.iter (fun _id -> f) all_streams
StreamsTbl.iter (fun _id stream -> f stream) all_streams

let allowed_to_transmit (Connection root) (Stream stream) =
Int32.compare root.flow 0l > 0 && Int32.compare stream.flow 0l > 0
Expand All @@ -339,26 +365,17 @@ module Make (Streamd : StreamDescriptor) = struct
0l
in
let written =
Int32.of_int
@@ Streamd.flush_write_body
~max_bytes:(Int32.to_int allowed_bytes)
descriptor
Streamd.flush_write_body
~max_bytes:(Int32.to_int allowed_bytes)
descriptor
in
let written32 = Int32.of_int written in
(* From RFC7540§6.9.1:
* After sending a flow-controlled frame, the sender reduces the space
* available in both windows by the length of the transmitted frame. *)
root.flow <- Int32.sub root.flow written;
stream.flow <- Int32.sub stream.flow written;
Int32.to_int written

let update_t stream n =
let (Stream ({ parent = Parent parent; _ } as stream)) = stream in
let tlast_p =
match parent with
| Connection { t_last; _ } -> t_last
| Stream { t_last; _ } -> t_last
in
stream.t <- tlast_p + (n * 256 / stream.priority.weight)
root.flow <- Int32.sub root.flow written32;
stream.flow <- Int32.sub stream.flow written32;
written

let mark_for_removal (Connection root) id closed =
StreamsTbl.replace root.marked_for_removal id closed
Expand Down Expand Up @@ -400,57 +417,79 @@ module Make (Streamd : StreamDescriptor) = struct
*)
let flush t max_seen_ids =
let rec schedule : type a. a node -> int * bool = function
| Connection p ->
| Connection _ as p_node ->
(* The root can never send data. *)
(match PriorityQueue.pop p.children with
| Some ((id, (Stream i as i_node)), children') ->
p.t_last <- i.t;
let written, subtree_is_active = schedule i_node in
if subtree_is_active
then (
update_t i_node written;
p.children <- PriorityQueue.add id i_node children')
else (
implicitly_close_idle_stream i.descriptor max_seen_ids;
(* XXX(anmonteiro): we may not want to remove from the tree right
* away. *)
p.children <- children');
written, subtree_is_active
| None ->
(* Queue is empty, see line 6 above. *)
0, false)
| Stream ({ descriptor; _ } as p) as p_node ->
if Streamd.requires_output descriptor
traverse p_node
| Stream ({ descriptor; _ } as stream) as p_node ->
let written =
if Streamd.requires_output descriptor
then
(* In this branch, flow-control has no bearing on activity, otherwise
* a flow-controlled stream would be considered inactive (because it
* can't make progress at the moment) and removed from the priority
* tree altogether. *)
write t p_node
else 0
in
if written > 0
then
(* In this branch, flow-control has no bearing on activity, otherwise
* a flow-controlled stream would be considered inactive (because it
* can't make progress at the moment) and removed from the priority
* tree altogether. *)
let written = write t p_node in
(* We check for activity again, because the stream may have gone
* inactive after the call to `write` above. *)
let subtree_is_active =
Streamd.requires_output descriptor
|| not (PriorityQueue.is_empty p.children)
|| not (PriorityQueue.is_empty stream.children)
in
written, subtree_is_active
else (
match PriorityQueue.pop p.children with
| Some ((id, (Stream i as i_node)), children') ->
p.t_last <- i.t;
let written, subtree_is_active = schedule i_node in
if subtree_is_active
then (
update_t i_node written;
p.children <- PriorityQueue.add id i_node children')
else (
implicitly_close_idle_stream i.descriptor max_seen_ids;
p.children <- children');
written, subtree_is_active
| None ->
(* Queue is empty, see line 6 above. *)
0, false)
else
(* If we haven't written anything, check if any of our children
have. *)
let written, subtree_is_active' = traverse p_node in
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is the depth-first traversal of the node's children

let subtree_is_active =
Streamd.requires_output descriptor || subtree_is_active'
in
(match written with
| 0 -> written, subtree_is_active
| written ->
(* If there's still more to write, put the node back in the tree. *)
if subtree_is_active then update_t p_node written;
written, subtree_is_active)
and traverse : type a. a node -> int * bool =
fun p_node ->
let rec loop remaining_children =
Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

loop traverses a level of the tree to find data to send.

match PriorityQueue.pop remaining_children with
| Some ((id, (Stream i as i_node)), remaining_children') ->
update_t_last p_node i.t;
let written, subtree_is_active = schedule i_node in

if not subtree_is_active
then (
implicitly_close_idle_stream i.descriptor max_seen_ids;
(* XXX(anmonteiro): we may not want to remove from the tree right
* away. *)
remove_child p_node id);

(match written with
| 0 ->
(* If this subtree didn't write anything, check the other children
in the priority queue. *)
loop remaining_children'
| written ->
(* If there's still more to write, put the node back in the tree. *)
if subtree_is_active then update_t i_node written;
written, subtree_is_active)
| None ->
(* No data written, but queue was not originally empty.
* Therefore, we can't determine the subtree is inactive. *)
0, true
in
let children = children p_node in
match PriorityQueue.is_empty children with
| true ->
(* Queue is empty, see line 6 above. *)
0, false
| false -> loop children
in

let (Connection root) = t in
ignore (schedule t);
StreamsTbl.iter
Expand Down Expand Up @@ -514,17 +553,19 @@ module Make (Streamd : StreamDescriptor) = struct

let pp_hum fmt t =
let rec pp_hum_inner level fmt t =
let pp_binding fmt (i, Stream { children; t; _ }) =
let pp_binding fmt (id, Stream { children; t; _ }) =
Format.fprintf
fmt
"\n%s%ld, %d -> [%a]"
(String.make (level * 2) ' ')
i
id
t
(pp_hum_inner (level + 1))
children
in
PriorityQueue.pp pp_binding fmt t
in
pp_hum_inner 0 fmt t

let pp_hum fmt (Connection { children; _ }) = pp_hum fmt children
end
4 changes: 2 additions & 2 deletions lib_test/test_h2_client.ml
Original file line number Diff line number Diff line change
Expand Up @@ -1389,8 +1389,8 @@ module Client_connection_tests = struct
report_write_result t (`Ok lenv);
let frames, _lenv = flush_pending_writes t in
Alcotest.(check (list int))
"Writes empty DATA frame"
(List.map Frame.FrameType.serialize Frame.FrameType.[ Data ])
"Writes empty DATA frames for the two requests"
(List.map Frame.FrameType.serialize Frame.FrameType.[ Data; Data ])
(List.map
(fun Frame.{ frame_header = { frame_type; _ }; _ } ->
Frame.FrameType.serialize frame_type)
Expand Down
0