8000 [core] remove putFiber/takeFiber from Channel and Hub by fwbrasil · Pull Request #1196 · getkyo/kyo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[core] remove putFiber/takeFiber from Channel and Hub #1196

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 2 additions & 11 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2448,15 +2448,6 @@ val b: Unit < (Async & Abort[Closed]) =
val c: Int < (Async & Abort[Closed]) =
a.map(_.take)

// &# 8000 39;putFiber' returns a `Fiber` that
// will complete once the put completes
val d: Fiber[Closed, Unit] < IO =
a.map(_.putFiber(42))

// 'takeFiber' also returns a fiber
val e: Fiber[Closed, Int] < IO =
a.map(_.takeFiber)

// Closes the channel. If successful,
// returns a Some with the drained
// elements. All pending puts and takes
Expand All @@ -2483,7 +2474,7 @@ val a: Hub[Int] < (IO & Resource) =

// Hub provide APIs similar to
// channels: size, offer, isEmpty,
// isFull, putFiber, put
// isFull, put
val b: Boolean < (IO & Abort[Closed] & Resource) =
a.map(_.offer(1))

Expand All @@ -2503,7 +2494,7 @@ val d: Listener[Int] < (IO & Abort[Closed] & Resource) =
// Listeners provide methods for
// receiving messages similar to
// channels: size, isEmpty, isFull,
// poll, takeFiber, take
// poll, take
val e: Int < (Async & Abort[Closed] & Resource) =
d.map(_.take)

Expand Down
16 changes: 0 additions & 16 deletions kyo-core/shared/src/main/scala/kyo/Channel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -173,22 +173,6 @@ object Channel:
else Loop.continue(chunk2, size2)
end if

/** Creates a fiber that puts an element into the channel.
*
* @param value
* The element to put
* @return
* A fiber that completes when the element is put into the channel
*/
def putFiber(value: A)(using Frame): Fiber[Closed, Unit] < IO = IO.Unsafe(self.putFiber(value).safe)

/** Creates a fiber that takes an element from the channel.
*
* @return
* A fiber that completes with the taken element
*/
def takeFiber(using Frame): Fiber[Closed, A] < IO = IO.Unsafe(self.takeFiber().safe)

/** Drains all elements from the channel.
*
* @return
Expand Down
16 changes: 0 additions & 16 deletions kyo-core/shared/src/main/scala/kyo/Hub.scala
Original file line number Diff line number Diff line change
Expand Up @@ -57,15 +57,6 @@ final class Hub[A] private[kyo] (
*/
def full(using Frame): Boolean < (IO & Abort[Closed]) = ch.full

/** Creates a fiber that puts an element into the Hub.
*
* @param v
* the element to put
* @return
* a Fiber that, when run, will put the element into the Hub
*/
def putFiber(v: A)(using Frame): Fiber[Closed, Unit] < IO = ch.putFiber(v)

/** Puts an element into the Hub, blocking if necessary until space is available.
*
* This operation will block when both a listener's buffer is full and preventing the Hub from processing, and the Hub's buffer is
Expand Down Expand Up @@ -305,13 +296,6 @@ object Hub:
*/
def poll(using Frame): Maybe[A] < (IO & Abort[Closed]) = child.poll

/** Creates a fiber that takes an element from the Listener's buffer.
*
* @return
* a Fiber that, when run, will take an element from the Listener's buffer
*/
def takeFiber(using Frame): Fiber[Closed, A] < IO = child.takeFiber

/** Takes an element from the Listener's buffer, potentially blocking if the buffer is empty.
*
* This operation will block until:
Expand Down
94 changes: 46 additions & 48 deletions kyo-core/shared/src/test/scala/kyo/ChannelTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -48,40 +48,40 @@ class ChannelTest extends Test:
for
c <- Channel.init[Int](2)
b <- c.offer(1)
put <- c.putFiber(2)
f <- c.full
take1 <- c.takeFiber
take2 <- c.takeFiber
put <- Async.run(c.put(2))
_ <- untilTrue(c.full)
take1 <- Async.run(c.take)
take2 <- Async.run(c.take)
v1 <- take1.get
_ <- put.get
v2 <- take1.get
v3 <- take2.get
yield assert(b && f && v1 == 1 && v2 == 1 && v3 == 2)
yield assert(b && v1 == 1 && v2 == 1 && v3 == 2)
}
"blocking put" in run {
for
c <- Channel.init[Int](2)
_ <- c.put(1)
_ <- c.put(2)
f <- c.putFiber(3)
f <- Async.run(c.put(3))
_ <- Async.sleep(10.millis)
d1 <- f.done
v1 <- c.poll
d2 <- f.done
_ <- untilTrue(f.done)
v2 <- c.poll
v3 <- c.poll
yield assert(!d1 && d2 && v1 == Maybe(1) && v2 == Maybe(2) && v3 == Maybe(3))
yield assert(!d1 && v1 == Maybe(1) && v2 == Maybe(2) && v3 == Maybe(3))
}
"blocking take" in run {
for
c <- Channel.init[Int](2)
f <- c.takeFiber
f <- Async.run(c.take)
_ <- Async.sleep(10.millis)
d1 <- f.done
_ <- c.put(1)
d2 <- f.done
_ <- untilTrue(f.done)
v <- f.get
yield assert(!d1 && d2 && v == 1)
yield assert(!d1 && v == 1)
}
"putBatch" - {
"non-nested" - {
Expand Down Expand Up @@ -121,8 +121,8 @@ class ChannelTest extends Test:
"should notify waiting takers immediately" in run {
for
c <- Channel.init[Int](2)
take1 <- c.takeFiber
take2 <- c.takeFiber
take1 <- Async.run(c.take)
take2 <- Async.run(c.take)
_ <- c.putBatch(Seq(1, 2))
v1 <- take1.get
v2 <- take2.get
Expand All @@ -133,11 +133,11 @@ class ChannelTest extends Test:
c <- Channel.init[Int](2)
_ <- c.put(1)
_ <- c.put(2)
take1 <- c.takeFiber
take1 <- Async.run(c.take)
fiber <- Async.run(c.putBatch(Seq(3, 4)))
v1 <- take1.get
done1 <- fiber.done
take2 <- c.takeFiber
take2 <- Async.run(c.take)
v2 <- take2.get
_ <- fiber.get
yield assert(v1 == 1 && v2 == 2 && !done1)
Expand Down Expand Up @@ -313,29 +313,27 @@ class ChannelTest extends Test:
}
"should consider pending puts" in run {
import AllowUnsafe.embrace.danger
IO.Unsafe.evalOrThrow {
for
c <- Channel.init[Int](2)
_ <- c.putFiber(1)
_ <- c.putFiber(2)
_ <- c.putFiber(3)
result <- c.drain
finalSize <- c.size
yield assert(result == Chunk(1, 2, 3) && finalSize == 0)
}
for
c <- Channel.init[Int](2)
_ <- Async.run(c.put(1))
_ <- Async.run(c.put(2))
_ <- Async.run(c.put(3))
result <- c.drain
finalSize <- c.size
yield assert(result == Chunk(1, 2, 3) && finalSize == 0)
end for
}
"should consider pending puts - zero capacity" in run {
import AllowUnsafe.embrace.danger
IO.Unsafe.evalOrThrow {
for
c <- Channel.init[Int](0)
_ <- c.putFiber(1)
_ <- c.putFiber(2)
_ <- c.putFiber(3)
result <- c.drain
finalSize <- c.size
yield assert(result == Chunk(1, 2, 3) && finalSize == 0)
}
for
c <- Channel.init[Int](0)
_ <- Async.run(c.put(1))
_ <- Async.run(c.put(2))
_ <- Async.run(c.put(3))
result <- c.drain
_ <- untilTrue(c.size.map(_ == 0))
yield assert(result == Chunk(1, 2, 3))
end for
}
"race with close" in run {
verifyRaceDrainWithClose(2, _.drain, _.close)
Expand Down Expand Up @@ -394,10 +392,10 @@ class ChannelTest extends Test:
IO.Unsafe.evalOrThrow {
for
c <- Channel.init[Int](2)
_ <- c.putFiber(1)
_ <- c.putFiber(2)
_ <- c.putFiber(3)
_ <- c.putFiber(4)
_ <- Async.run(c.put(1))
_ <- Async.run(c.put(2))
_ <- Async.run(c.put(3))
_ <- Async.run(c.put(4))
result <- c.drainUpTo(3)
finalSize <- c.size
yield assert(result == Chunk(1, 2, 3) && finalSize == 1)
Expand All @@ -408,10 +406,10 @@ class ChannelTest extends Test:
IO.Unsafe.evalOrThrow {
for
c <- Channel.init[Int](0)
_ <- c.putFiber(1)
_ <- c.putFiber(2)
_ <- c.putFiber(3)
_ <- c.putFiber(4)
_ <- Async.run(c.put(1))
_ <- Async.run(c.put(2))
_ <- Async.run(c.put(3))
_ <- Async.run(c.put(4))
result <- c.drainUpTo(3)
finalSize <- c.size
yield assert(result == Chunk(1, 2, 3) && finalSize == 0)
Expand Down Expand Up @@ -447,7 +445,7 @@ class ChannelTest extends Test:
"pending take" in run {
for
c <- Channel.init[Int](2)
f <- c.takeFiber
f <- Async.run(c.take)
r <- c.close
d <- f.getResult
t <- Abort.run(c.full)
Expand All @@ -458,7 +456,7 @@ class ChannelTest extends Test:
c <- Channel.init[Int](2)
_ <- c.put(1)
_ <- c.put(2)
f <- c.putFiber(3)
f <- Async.run(c.put(3))
r <- c.close
d <- f.getResult
e <- Abort.run(c.offer(1))
Expand All @@ -467,7 +465,7 @@ class ChannelTest extends Test:
"no buffer w/ pending put" in run {
for
c <- Channel.init[Int](0)
f <- c.putFiber(1)
f <- Async.run(c.put(1))
r <- c.close
d <- f.getResult
t <- Abort.run(c.poll)
Expand All @@ -476,7 +474,7 @@ class ChannelTest extends Test:
"no buffer w/ pending take" in run {
for
c <- Channel.init[Int](0)
f <- c.takeFiber
f <- Async.run(c.take)
r <- c.close
d <- f.getResult
t <- Abort.run[Throwable](c.put(1))
Expand All @@ -486,7 +484,7 @@ class ChannelTest extends Test:
"no buffer" in run {
for
c <- Channel.init[Int](0)
_ <- c.putFiber(1)
_ <- Async.run(c.put(1))
v <- c.take
f <- c.full
e <- c.empty
Expand Down
2 changes: 1 addition & 1 deletion kyo-core/shared/src/test/scala/kyo/HubTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -406,7 +406,7 @@ class HubTest extends Test:
for
h <- Hub.init[Int](0)
l <- h.listen
f <- h.putFiber(1)
f <- Async.run(h.put(1))
v <- l.take
d <- f.done
yield assert(v == 1 && d)
Expand Down
Loading
0