diff --git a/src/stream/merge.rs b/src/stream/merge.rs index 76fb89f..702c7cd 100644 --- a/src/stream/merge.rs +++ b/src/stream/merge.rs @@ -94,7 +94,10 @@ where { match first.poll_recv(cx) { PollRecv::Ready(v) => MergePoll::First(PollRecv::Ready(v)), - PollRecv::Pending => MergePoll::Second(second.poll_recv(cx)), + PollRecv::Pending => MergePoll::Second(match second.poll_recv(cx) { + PollRecv::Closed => PollRecv::Pending, + recv => recv, + }), PollRecv::Closed => MergePoll::Second(second.poll_recv(cx)), } } @@ -167,6 +170,24 @@ mod tests { assert_eq!(PollRecv::Closed, Pin::new(&mut find).poll_recv(&mut cx)); } + #[test] + fn swap_closed_pending() { + let left = from_poll_iter(vec![]); + let right = from_poll_iter(vec![ + PollRecv::Ready(1), + PollRecv::Pending, + PollRecv::Ready(2), + ]); + let mut find = MergeStream::new(left, right); + + let mut cx = Context::empty(); + + assert_eq!(PollRecv::Ready(1), Pin::new(&mut find).poll_recv(&mut cx)); + assert_eq!(PollRecv::Pending, Pin::new(&mut find).poll_recv(&mut cx)); + assert_eq!(PollRecv::Ready(2), Pin::new(&mut find).poll_recv(&mut cx)); + assert_eq!(PollRecv::Closed, Pin::new(&mut find).poll_recv(&mut cx)); + } + #[test] fn pending_uses_right() { let left = from_poll_iter(vec![PollRecv::Pending]); @@ -188,6 +209,7 @@ mod tests { let mut cx = Context::empty(); assert_eq!(PollRecv::Ready(1), Pin::new(&mut find).poll_recv(&mut cx)); + assert_eq!(PollRecv::Pending, Pin::new(&mut find).poll_recv(&mut cx)); assert_eq!(PollRecv::Closed, Pin::new(&mut find).poll_recv(&mut cx)); } }