-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Implement ZSink.untilOutputM #5063
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @sviezypan! I think the direction is good but needs a slight correction.
Try doing something like:
- Run self with self.channel.doneCollect
- FlatMap that and check the predicate
- If the predicate fails, recurse with
(ZChannel.write(leftover) *> ZChannel.identity)
prepended.
)(implicit ev: L <:< In): ZSink[R1, InErr, In, OutErr1, L, Option[Z]] = { | ||
|
||
def processChunk(in: Chunk[In]): ZChannel[R1, InErr, Chunk[In], Any, OutErr1, Chunk[L], Option[Z]] = | ||
(ZChannel.write(in) >>> self.channel).doneCollect.map { case (chunks, z) => |
There was a problem hiding this comment.
Choose a reason for hiding this comment
< 8000 p class="mb-3"> The reason will be displayed to describe this comment to others. Learn more.If I understand this correctly, this is subtly wrong because it assumes that the self
sink only needs to read one chunk to complete.
It's worth testing this with a sink that needs two chunks to complete; e.g. ZSink.take(4) on a stream with two chunks of two elements.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you're right, ZSink.take(4) on two chunks of two fails. I'll write the test and fix it. Thanks for this catch.
case Some(value) => ZChannel.end(Some(value)) | ||
case None => loop | ||
}, | ||
e => ZChannel.fail(e) >>> self.channel.map(Some(_)), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can just fail here, no need to pipe into self.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the upstream errors, I am forwarding that error to the original sink for handling. Tbh I don't know about other way to make it compile since "e" is of type InErr but this channel needs OutErr as an output error and there's no relation between those two.
The reason why I am not running
is that I need to know when upstream channel ended with "Any" so I can succeed with None. Otherwise I operate on ZSink's end value "Z", and I have no information about Z and I don't know when to end. I checked the old encoding and it seems that it was easier there, because Push was: |
Hi @iravid I am running in circles, its always either of the following:
For both cases I added ignored tests. |
hi @sviezypan, I was trying to implement the same by mistake (searched with the new name) and made some progress, so I put it here as a hint. The existing test pass with this implementation but there is no error handling. def untilOutputZIO[R1 <: R, OutErr1 >: OutErr](
f: Z => ZIO[R1, OutErr1, Boolean]
)(implicit ev: L <:< In): ZSink[R1, InErr, In, OutErr1, L, Option[Z]] =
new ZSink(
ZChannel
.fromZIO(Ref.make(Chunk[In]()).zip(Ref.make(false)))
.flatMap { case (leftoversRef, upstreamDoneRef) =>
lazy val upstreamMarker: ZChannel[Any, InErr, Chunk[In], Any, InErr, Chunk[In], Any] =
ZChannel.readWith(
(in: Chunk[In]) => ZChannel.write(in) *> upstreamMarker,
ZChannel.fail(_: InErr),
(x: Any) => ZChannel.fromZIO(upstreamDoneRef.set(true)).as(x)
)
lazy val loop: ZChannel[R1, InErr, Chunk[In], Any, OutErr1, Chunk[L], Option[Z]] =
channel.doneCollect
.foldChannel(
ZChannel.fail(_),
{ case (leftovers, doneValue) =>
for {
satisfied <- ZChannel.fromZIO(f(doneValue))
_ <- ZChannel.fromZIO(leftoversRef.set(leftovers.flatten.asInstanceOf[Chunk[In]]))
upstreamDone <- ZChannel.fromZIO(upstreamDoneRef.get)
res <- if (satisfied) ZChannel.write(leftovers.flatten).as(Some(doneValue))
else if (upstreamDone)
if (leftovers.isEmpty) ZChannel.write(leftovers.flatten).as(None)
else ZChannel.write(leftovers.flatten).as(Some(doneValue))
else loop
} yield res
}
)
upstreamMarker >>> ZChannel.bufferChunk(leftoversRef) >>> loop
}
) |
Hey @sviezypan - can you check of @jupposessho's version works for you and maybe integrate the parts you need into your PR? I think you were basically just missing the upstreamMarker and buffer. |
hi @jupposessho thank you, idea with upstreamMarker is great, all tests pass. I don't want to hold it off any longer so I pushed your implementation, I hope thats ok with you. |
Hi @sviezypan, of course it's ok but all the credits should go to @iravid. I think one thing is missing here, handling the failure of the predicate. |
ZStream | ||
.fromIterable(List(1, 2, 3, 4)) | ||
.run(ZSink.head[Nothing, Int].untilOutputZIO(_ => ZIO.fail("failed predicate"))) | ||
)(equalTo(None)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is incorrect; a failure from the predicate function should result in a failure of the channel.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@iravid I reverted my latest commit, @jupposessho what exactly you mean with "handling the failure of the predicate" ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean, the predicate could fail for some value(since it's a f => ZIO
, could be a http call or other effectual computation). See the current implementation of this method: f(z).mapError(err => (Left(err), leftover))
...
And in this case the channel should fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry, but I still have no idea what you mean. Current behaviour is that If the predicate fails, channel fails as well. As indicated in the types, predicate fails with OutErr1
which is also an err type of the channel. If I map over the error to some tuple of (Either[OutErr1], Chunk[Chunk[L]])
(or whatever other value), code won't compile because compiler infers err type to be closest supertype of tuple and OurErr1
which is Any
. Only thing I can do in mapError is either identity
or I can throw some kind of exception because then common supertype of OutErr1
and Nothing
is OutErr1
but I don't think that's useful here. Anyway, thank you for your message and let me know what I am missing.
hi @iravid I would like to move forward with this PR and possibly finish it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you @sviezypan, this looks great. Great work and sorry it took so long to review!
Oh sorry, this is failing compilation on 2.11. Could you look into that? |
Looks good @sviezypan - just missing an |
Thank you for pushing through on this, @sviezypan! |
part of #4886