8000 Implement ZSink.untilOutputM by sviezypan · Pull Request #5063 · zio/zio · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Implement ZSink.untilOutputM #5063

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
Aug 31, 2021
Merged

Conversation

sviezypan
Copy link
Contributor

part of #4886

@sviezypan sviezypan requested a review from iravid as a code owner May 10, 2021 14:23
Copy link
Member
@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @sviezypan! I think the direction is good but needs a slight correction.

Try doing something like:

  1. Run self with self.channel.doneCollect
  2. FlatMap that and check the predicate
  3. If the predicate fails, recurse with (ZChannel.write(leftover) *> ZChannel.identity) prepended.

)(implicit ev: L <:< In): ZSink[R1, InErr, In, OutErr1, L, Option[Z]] = {

def processChunk(in: Chunk[In]): ZChannel[R1, InErr, Chunk[In], Any, OutErr1, Chunk[L], Option[Z]] =
(ZChannel.write(in) >>> self.channel).doneCollect.map { case (chunks, z) =>
Copy link
Member

Choose a reason for hiding this comment

< 8000 p class="mb-3"> The reason will be displayed to describe this comment to others. Learn more.

If I understand this correctly, this is subtly wrong because it assumes that the self sink only needs to read one chunk to complete.

It's worth testing this with a sink that needs two chunks to complete; e.g. ZSink.take(4) on a stream with two chunks of two elements.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you're right, ZSink.take(4) on two chunks of two fails. I'll write the test and fix it. Thanks for this catch.

case Some(value) => ZChannel.end(Some(value))
case None => loop
},
e => ZChannel.fail(e) >>> self.channel.map(Some(_)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can just fail here, no need to pipe into self.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the upstream errors, I am forwarding that error to the original sink for handling. Tbh I don't know about other way to make it compile since "e" is of type InErr but this channel needs OutErr as an output error and there's no relation between those two.

@sviezypan
Copy link
Contributor Author

The reason why I am not running

Run self with self.channel.doneCollect

is that I need to know when upstream channel ended with "Any" so I can succeed with None.

Otherwise I operate on ZSink's end value "Z", and I have no information about Z and I don't know when to end.
To give an example, when you run sink on empty stream, output Z will be some "default" value like ZSink.head and ZSink.last produces None, ZSink.mkString produces empty string, ZSink.sum produces 0 and so on....only thing I can do with Z is apply a function f: Z => ZIO[R1, E1, Boolean] but then by "false" I don't know if stream ended or just current output doesn't satisfy f.
So by
ZStream.fromIterable(List.empty[Int]).run(ZSink.sum[Nothing, Int].untilOutputM(s => ZIO.succeed(s > 0)))
or
ZStream.fromIterable(List(1,2).run(ZSink.head[Nothing, Int].untilOutputM(h => ZIO.succeed(h.fold(false)(_ >= 3)))))
I would repeat sink forever, even if stream ended.

I checked the old encoding and it seems that it was easier there, because Push was:
Option[Chunk[I]] => ZIO[R, (Either[E, Z], Chunk[L]), Unit]
so if I receive None, I know that stream ended.
Now with ZChannel I don't know when stream ended. Only thing I could do, is stick some channel before self.channel.
Do you have any other idea?

@sviezypan
Copy link
Contributor Author

Hi @iravid
after spending more time on this, it seems to me, that in order to find out if (stream is empty && stream did not satisfy f) condition, I need some more information on Z. I don't see a solution how to solve this here on this level (other than changing underlaying zchannel's InDone type from Any to Option[Any]). Could you please advice?

I am running in circles, its always either of the following:

  1. I run stream with sink's self.channel, I check the predicate and if I don't succeed, I will never terminate
  2. before applying self sink, some reader is applied that would succeed with None by end of the stream or process current chunk. In this case I don't know how to deal with situation where sink needs more than one chunk to complete. (this solution is currently implemented).

For both cases I added ignored tests.

@sviezypan sviezypan requested a review from iravid June 29, 2021 12:06
@jupposessho
Copy link
Contributor

hi @sviezypan,

I was trying to implement the same by mistake (searched with the new name) and made some progress, so I put it here as a hint. The existing test pass with this implementation but there is no error handling.

def untilOutputZIO[R1 <: R, OutErr1 >: OutErr](
    f: Z => ZIO[R1, OutErr1, Boolean]
  )(implicit ev: L <:< In): ZSink[R1, InErr, In, OutErr1, L, Option[Z]] =
    new ZSink(
      ZChannel
        .fromZIO(Ref.make(Chunk[In]()).zip(Ref.make(false)))
        .flatMap { case (leftoversRef, upstreamDoneRef) =>
          lazy val upstreamMarker: ZChannel[Any, InErr, Chunk[In], Any, InErr, Chunk[In], Any] =
            ZChannel.readWith(
              (in: Chunk[In]) => ZChannel.write(in) *> upstreamMarker,
              ZChannel.fail(_: InErr),
              (x: Any) => ZChannel.fromZIO(upstreamDoneRef.set(true)).as(x)
            )

          lazy val loop: ZChannel[R1, InErr, Chunk[In], Any, OutErr1, Chunk[L], Option[Z]] =
            channel.doneCollect
              .foldChannel(
                ZChannel.fail(_),
                { case (leftovers, doneValue) =>
                  for {
                    satisfied    <- ZChannel.fromZIO(f(doneValue))
                    _            <- ZChannel.fromZIO(leftoversRef.set(leftovers.flatten.asInstanceOf[Chunk[In]]))
                    upstreamDone <- ZChannel.fromZIO(upstreamDoneRef.get)
                    res <- if (satisfied) ZChannel.write(leftovers.flatten).as(Some(doneValue))
                           else if (upstreamDone)
                             if (leftovers.isEmpty) ZChannel.write(leftovers.flatten).as(None)
                             else ZChannel.write(leftovers.flatten).as(Some(doneValue))
                           else loop
                  } yield res
                }
              )

          upstreamMarker >>> ZChannel.bufferChunk(leftoversRef) >>> loop
        }
    )

@iravid
Copy link
Member
iravid commented Jul 1, 2021

Hey @sviezypan - can you check of @jupposessho's version works for you and maybe integrate the parts you need into your PR? I think you were basically just missing the upstreamMarker and buffer.

@sviezypan
Copy link
Contributor Author

hi @jupposessho thank you, idea with upstreamMarker is great, all tests pass. I don't want to hold it off any longer so I pushed your implementation, I hope thats ok with you.

@jupposessho
Copy link
Contributor

hi @jupposessho thank you, idea with upstreamMarker is great, all tests pass. I don't want to hold it off any longer so I pushed your implementation, I hope thats ok with you.

Hi @sviezypan, of course it's ok but all the credits should go to @iravid. I think one thing is missing here, handling the failure of the predicate.

ZStream
.fromIterable(List(1, 2, 3, 4))
.run(ZSink.head[Nothing, Int].untilOutputZIO(_ => ZIO.fail("failed predicate")))
)(equalTo(None))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is incorrect; a failure from the predicate function should result in a failure of the channel.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@iravid I reverted my latest commit, @jupposessho what exactly you mean with "handling the failure of the predicate" ?

Copy link
Contributor
@jupposessho jupposessho 8000 Jul 3, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I mean, the predicate could fail for some value(since it's a f => ZIO, could be a http call or other effectual computation). See the current implementation of this method: f(z).mapError(err => (Left(err), leftover)) ...
And in this case the channel should fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am sorry, but I still have no idea what you mean. Current behaviour is that If the predicate fails, channel fails as well. As indicated in the types, predicate fails with OutErr1 which is also an err type of the channel. If I map over the error to some tuple of (Either[OutErr1], Chunk[Chunk[L]]) (or whatever other value), code won't compile because compiler infers err type to be closest supertype of tuple and OurErr1 which is Any. Only thing I can do in mapError is either identity or I can throw some kind of exception because then common supertype of OutErr1 and Nothing is OutErr1 but I don't think that's useful here. Anyway, thank you for your message and let me know what I am missing.

@sviezypan
Copy link
Contributor Author

hi @iravid I would like to move forward with this PR and possibly finish it.
Can you please recheck if anything else is required here?

@sviezypan sviezypan requested a review from iravid August 18, 2021 09:51
iravid
iravid previously approved these changes Aug 18, 2021
Copy link
Member
@iravid iravid left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @sviezypan, this looks great. Great work and sorry it took so long to review!

@iravid
Copy link
Member
iravid commented Aug 18, 2021

Oh sorry, this is failing compilation on 2.11. Could you look into that?

@iravid
Copy link
Member
iravid commented Aug 20, 2021

Looks good @sviezypan - just missing an fmt now

@sviezypan sviezypan requested a review from iravid August 31, 2021 17:31
@iravid iravid merged commit da5c79b into zio:series/2.x Aug 31, 2021
@iravid
Copy link
Member
iravid commented Aug 31, 2021

Thank you for pushing through on this, @sviezypan!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants
0