-
Notifications
You must be signed in to change notification settings - Fork 33
[h2] scheduler: fix stream starvation #204
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
3 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<
10000
/div>
View file
Open in desktop
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -159,30 +159,38 @@ module Make (Streamd : StreamDescriptor) = struct | |
; inflow = initial_recv_window_size | ||
} | ||
|
||
let remove_from_parent (Parent parent) id = | ||
let stream_id : type a. a node -> int32 = function | ||
| Connection _ -> Stream_identifier.connection | ||
| Stream { descriptor; _ } -> Streamd.id descriptor | ||
|
||
let children : type a. a node -> PriorityQueue.t = function | ||
| Stream { children; _ } -> children | ||
| Connection { children; _ } -> children | ||
|
||
let remove_child : type a. a node -> int32 -> unit = | ||
fun parent id -> | ||
match parent with | ||
| Connection root -> | ||
| Connection ({ children; _ } as node) -> | ||
(* From RFC7540§5.3.1: | ||
* A stream that is not dependent on any other stream is given a stream | ||
* dependency of 0x0. In other words, the non-existent stream 0 forms | ||
* the root of the tree. *) | ||
root.children <- PriorityQueue.remove id root.children | ||
| Stream stream -> | ||
stream.children <- PriorityQueue.remove id stream.children | ||
|
||
let children : type a. a node -> PriorityQueue.t = function | ||
| Stream { children; _ } -> children | ||
| Connection { children; _ } -> children | ||
node.children <- PriorityQueue.remove id children | ||
| Stream ({ children; _ } as node) -> | ||
node.children <- PriorityQueue.remove id children | ||
|
||
let stream_id : type a. a node -> int32 = function | ||
| Connection _ -> Stream_identifier.connection | ||
| Stream { descriptor; _ } -> Streamd.id descriptor | ||
let update_children : type a. a node -> PriorityQueue.t -> unit = | ||
fun parent updated_children -> | ||
match parent with | ||
| Connection s -> s.children <- updated_children | ||
| Stream s -> s.children <- updated_children | ||
|
||
let set_parent stream_node ~exclusive new_parent = | ||
let (Stream ({ descriptor; _ } as stream)) = stream_node in | ||
let (Parent new_parent_node) = new_parent in | ||
let set_parent stream_node ~exclusive (Parent new_parent_node as new_parent) = | ||
let (Stream ({ descriptor; parent = Parent old_parent_node; _ } as stream)) = | ||
stream_node | ||
in | ||
let stream_id = Streamd.id descriptor in | ||
remove_from_parent stream.parent stream_id; | ||
remove_child old_parent_node stream_id; | ||
stream.parent <- new_parent; | ||
let new_children = | ||
let new_children = children new_parent_node in | ||
|
@@ -208,9 +216,7 @@ module Make (Streamd : StreamDescriptor) = struct | |
PriorityQueue.sg stream_id stream_node) | ||
else PriorityQueue.add stream_id stream_node new_children | ||
in | ||
match new_parent_node with | ||
| Stream stream -> stream.children <- new_children | ||
| Connection root -> root.children <- new_children | ||
update_children new_parent_node new_children | ||
|
||
let would_create_cycle ~new_parent (Stream { descriptor; _ }) = | ||
let rec inner : type a. a node -> bool = function | ||
|
@@ -284,6 +290,25 @@ module Make (Streamd : StreamDescriptor) = struct | |
set_parent stream_node ~exclusive new_parent); | ||
stream.priority <- new_priority) | ||
|
||
let update_t node n = | ||
let (Stream ({ parent = Parent parent; descriptor; _ } as stream)) = node in | ||
let tlast_p = | ||
match parent with | ||
| Connection { t_last; _ } -> t_last | ||
| Stream { t_last; _ } -> t_last | ||
in | ||
stream.t <- tlast_p + (n * 256 / stream.priority.weight); | ||
let id = Streamd.id descriptor in | ||
remove_child parent id; | ||
let updated_children = PriorityQueue.add id node (children parent) in | ||
update_children parent updated_children | ||
|
||
let update_t_last : type a. a node -> int -> unit = | ||
fun p_node t_last -> | ||
match p_node with | ||
| Connection p -> p.t_last <- t_last | ||
| Stream p -> p.t_last <- t_last | ||
|
||
let add | ||
(Connection root as t) | ||
~priority | ||
|
@@ -303,6 +328,7 @@ module Make (Streamd : StreamDescriptor) = struct | |
root.children <- PriorityQueue.add stream_id stream root.children; | ||
if priority != Priority.default_priority | ||
then reprioritize_stream t ~priority stream; | ||
update_t stream 0; | ||
stream | ||
|
||
let get_node (Connection root) stream_id = | ||
|
@@ -314,7 +340,7 @@ module Make (Streamd : StreamDescriptor) = struct | |
| None -> None | ||
|
||
let iter (Connection { all_streams; _ }) ~f = | ||
StreamsTbl.iter (fun _id -> f) all_streams | ||
StreamsTbl.iter (fun _id stream -> f stream) all_streams | ||
|
||
let allowed_to_transmit (Connection root) (Stream stream) = | ||
Int32.compare root.flow 0l > 0 && Int32.compare stream.flow 0l > 0 | ||
|
@@ -339,26 +365,17 @@ module Make (Streamd : StreamDescriptor) = struct | |
0l | ||
in | ||
let written = | ||
Int32.of_int | ||
@@ Streamd.flush_write_body | ||
~max_bytes:(Int32.to_int allowed_bytes) | ||
descriptor | ||
Streamd.flush_write_body | ||
~max_bytes:(Int32.to_int allowed_bytes) | ||
descriptor | ||
in | ||
let written32 = Int32.of_int written in | ||
(* From RFC7540§6.9.1: | ||
* After sending a flow-controlled frame, the sender reduces the space | ||
* available in both windows by the length of the transmitted frame. *) | ||
root.flow <- Int32.sub root.flow written; | ||
stream.flow <- Int32.sub stream.flow written; | ||
Int32.to_int written | ||
|
||
let update_t stream n = | ||
let (Stream ({ parent = Parent parent; _ } as stream)) = stream in | ||
let tlast_p = | ||
match parent with | ||
| Connection { t_last; _ } -> t_last | ||
| Stream { t_last; _ } -> t_last | ||
in | ||
stream.t <- tlast_p + (n * 256 / stream.priority.weight) | ||
root.flow <- Int32.sub root.flow written32; | ||
stream.flow <- Int32.sub stream.flow written32; | ||
written | ||
|
||
let mark_for_removal (Connection root) id closed = | ||
StreamsTbl.replace root.marked_for_removal id closed | ||
|
@@ -400,57 +417,79 @@ module Make (Streamd : StreamDescriptor) = struct | |
*) | ||
let flush t max_seen_ids = | ||
let rec schedule : type a. a node -> int * bool = function | ||
| Connection p -> | ||
| Connection _ as p_node -> | ||
(* The root can never send data. *) | ||
(match PriorityQueue.pop p.children with | ||
| Some ((id, (Stream i as i_node)), children') -> | ||
p.t_last <- i.t; | ||
let written, subtree_is_active = schedule i_node in | ||
if subtree_is_active | ||
then ( | ||
update_t i_node written; | ||
p.children <- PriorityQueue.add id i_node children') | ||
else ( | ||
implicitly_close_idle_stream i.descriptor max_seen_ids; | ||
(* XXX(anmonteiro): we may not want to remove from the tree right | ||
* away. *) | ||
p.children <- children'); | ||
written, subtree_is_active | ||
| None -> | ||
(* Queue is empty, see line 6 above. *) | ||
0, false) | ||
| Stream ({ descriptor; _ } as p) as p_node -> | ||
if Streamd.requires_output descriptor | ||
traverse p_node | ||
| Stream ({ descriptor; _ } as stream) as p_node -> | ||
let written = | ||
if Streamd.requires_output descriptor | ||
then | ||
(* In this branch, flow-control has no bearing on activity, otherwise | ||
* a flow-controlled stream would be considered inactive (because it | ||
* can't make progress at the moment) and removed from the priority | ||
* tree altogether. *) | ||
write t p_node | ||
else 0 | ||
in | ||
if written > 0 | ||
then | ||
(* In this branch, flow-control has no bearing on activity, otherwise | ||
* a flow-controlled stream would be considered inactive (because it | ||
* can't make progress at the moment) and removed from the priority | ||
* tree altogether. *) | ||
let written = write t p_node in | ||
(* We check for activity again, because the stream may have gone | ||
* inactive after the call to `write` above. *) | ||
let subtree_is_active = | ||
Streamd.requires_output descriptor | ||
|| not (PriorityQueue.is_empty p.children) | ||
|| not (PriorityQueue.is_empty stream.children) | ||
in | ||
written, subtree_is_active | ||
else ( | ||
match PriorityQueue.pop p.children with | ||
| Some ((id, (Stream i as i_node)), children') -> | ||
p.t_last <- i.t; | ||
let written, subtree_is_active = schedule i_node in | ||
if subtree_is_active | ||
then ( | ||
update_t i_node written; | ||
p.children <- PriorityQueue.add id i_node children') | ||
else ( | ||
implicitly_close_idle_stream i.descriptor max_seen_ids; | ||
p.children <- children'); | ||
written, subtree_is_active | ||
| None -> | ||
(* Queue is empty, see line 6 above. *) | ||
0, false) | ||
else | ||
(* If we haven't written anything, check if any of our children | ||
have. *) | ||
let written, subtree_is_active' = traverse p_node in | ||
let subtree_is_active = | ||
Streamd.requires_output descriptor || subtree_is_active' | ||
in | ||
(match written with | ||
| 0 -> written, subtree_is_active | ||
| written -> | ||
(* If there's still more to write, put the node back in the tree. *) | ||
if subtree_is_active then update_t p_node written; | ||
written, subtree_is_active) | ||
and traverse : type a. a node -> int * bool = | ||
fun p_node -> | ||
let rec loop remaining_children = | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
|
||
match PriorityQueue.pop remaining_children with | ||
| Some ((id, (Stream i as i_node)), remaining_children') -> | ||
update_t_last p_node i.t; | ||
let written, subtree_is_active = schedule i_node in | ||
|
||
if not subtree_is_active | ||
then ( | ||
implicitly_close_idle_stream i.descriptor max_seen_ids; | ||
(* XXX(anmonteiro): we may not want to remove from the tree right | ||
* away. *) | ||
remove_child p_node id); | ||
|
||
(match written with | ||
| 0 -> | ||
(* If this subtree didn't write anything, check the other children | ||
in the priority queue. *) | ||
loop remaining_children' | ||
| written -> | ||
(* If there's still more to write, put the node back in the tree. *) | ||
if subtree_is_active then update_t i_node written; | ||
written, subtree_is_active) | ||
| None -> | ||
(* No data written, but queue was not originally empty. | ||
* Therefore, we can't determine the subtree is inactive. *) | ||
0, true | ||
in | ||
let children = children p_node in | ||
match PriorityQueue.is_empty children with | ||
| true -> | ||
(* Queue is empty, see line 6 above. *) | ||
0, false | ||
| false -> loop children | ||
in | ||
|
||
let (Connection root) = t in | ||
ignore (schedule t); | ||
StreamsTbl.iter | ||
|
@@ -514,17 +553,19 @@ module Make (Streamd : StreamDescriptor) = struct | |
|
||
let pp_hum fmt t = | ||
let rec pp_hum_inner level fmt t = | ||
let pp_binding fmt (i, Stream { children; t; _ }) = | ||
let pp_binding fmt (id, Stream { children; t; _ }) = | ||
Format.fprintf | ||
fmt | ||
"\n%s%ld, %d -> [%a]" | ||
(String.make (level * 2) ' ') | ||
i | ||
id | ||
t | ||
(pp_hum_inner (level + 1)) | ||
children | ||
in | ||
PriorityQueue.pp pp_binding fmt t | ||
in | ||
pp_hum_inner 0 fmt t | ||
|
||
let pp_hum fmt (Connection { children; _ }) = pp_hum fmt children | ||
end |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is the depth-first traversal of the node's children