8000 #18948 add eagerComplete parameter to FlowOps.merge by rkuhn · Pull Request #19396 · akka/akka · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

#18948 add eagerComplete parameter to FlowOps.merge #19396

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions akka-docs-dev/rst/java/migration-guide-1.0-2.x-java.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ In that case you can still manually fuse those graphs which shall run on less Ac
* all Stages (this includes all built-in linear operators)
* TCP connections

Introduced proper named constructor methods insted of ``wrap()``
================================================================
Introduced proper named constructor methods instead of ``wrap()``
=================================================================

There were several, unrelated uses of ``wrap()`` which made it hard to find and hard to understand the intention of
the call. Therefore these use-cases now have methods with different names, helping Java 8 type inference (by reducing
Expand Down
8 changes: 4 additions & 4 deletions akka-docs-dev/rst/scala/migration-guide-1.0-2.x-scala.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ In that case you can still manually fuse those graphs which shall run on less Ac
* all Stages (this includes all built-in linear operators)
* TCP connections

Introduced proper named constructor methods insted of ``wrap()``
================================================================
Introduced proper named constructor methods instead of ``wrap()``
=================================================================

There were several, unrelated uses of ``wrap()`` which made it hard to find and hard to understand the intention of
the call. Therefore these use-cases now have methods with different names, helping Java 8 type inference (by reducing
Expand Down Expand Up @@ -351,8 +351,8 @@ should be replaced by

.. includecode:: code/docs/MigrationsScala.scala#flatMapConcat

`Sink.fanoutPublisher() and Sink.publisher() is now a single method`
====================================================================
`Sink.fanoutPublisher()` and `Sink.publisher()` is now a single method
======================================================================

It was a common user mistake to use ``Sink.publisher`` and get into trouble since it would only support
a single ``Subscriber``, and the discoverability of the apprpriate fix was non-obvious (Sink.fanoutPublisher).
Expand Down
2 changes: 1 addition & 1 deletion akka-docs-dev/rst/stages-overview.rst
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ concat the current stream has an element available; if the curre
prepend the given stream has an element available; if the given input completes, it tries the current one downstream backpressures all upstreams complete
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================

(*) This behavior is changeable to completing when any upstream completes by setting ``eagerClose=true``.
(*) This behavior is changeable to completing when any upstream completes by setting ``eagerComplete=true``.

Fan-out stages
^^^^^^^^^^^^^^
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ private object PoolConductor {
GraphDSL.create() { implicit b ⇒
import GraphDSL.Implicits._

val retryMerge = b.add(MergePreferred[RequestContext](1, eagerClose = true))
val retryMerge = b.add(MergePreferred[RequestContext](1, eagerComplete = true))
val slotSelector = b.add(new SlotSelector(slotCount, pipeliningLimit, log))
val route = b.add(new Route(slotCount))
val retrySplit = b.add(Broadcast[RawSlotEvent](2))
Expand Down
28 changes: 27 additions & 1 deletion
Original file line number Diff line number Diff line change
Expand Up @@ -1223,7 +1223,22 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* '''Cancels when''' downstream cancels
*/
def merge[T >: Out](that: Graph[SourceShape[T], _]): javadsl.Flow[In, T, Mat] =
new Flow(delegate.merge(that))
merge(that, eagerComplete = false)

/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* '''Emits when''' one of the inputs has an element available
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*/
def merge[T >: Out](that: Graph[SourceShape[T], _], eagerComplete: Boolean): javadsl.Flow[In, T, Mat] =
new Flow(delegate.merge(that, eagerComplete))

/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
Expand All @@ -1233,6 +1248,17 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
*/
def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2]): javadsl.Flow[In, T, M2] =
mergeMat(that, matF, eagerComplete = false)

/**
* Merge the given [[Source]] to this [[Flow]], taking elements as they arrive from input streams,
* picking randomly when several elements ready.
*
* @see [[#merge]]
*/
def mergeMat[T >: Out, M, M2](that: Graph[SourceShape[T], M],
matF: function.Function2[Mat, M, M2],
eagerComplete: Boolean): javadsl.Flow[In, T, M2] =
new Flow(delegate.mergeMat(that)(combinerToScala(matF)))

/**
Expand Down
28 changes: 14 additions & 14 deletions akka-stream/src/main/scala/akka/stream/javadsl/Graph.scala
1E79
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import akka.stream.impl.ConstantFun
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true)
*
* '''Cancels when''' downstream cancels
*/
Expand All @@ -36,20 +36,20 @@ object Merge {
/**
* Create a new `Merge` stage with the specified output type.
*
* @param eagerClose set to true in order to make this stage eagerly
* @param eagerComplete set to true in order to make this stage eagerly
* finish as soon as one of its inputs completes
*/
def create[T](inputPorts: Int, eagerClose: Boolean): Graph[UniformFanInShape[T, T], Unit] =
scaladsl.Merge(inputPorts, eagerClose = eagerClose)
def create[T](inputPorts: Int, eagerComplete: Boolean): Graph[UniformFanInShape[T, T], Unit] =
scaladsl.Merge(inputPorts, eagerComplete = eagerComplete)

/**
* Create a new `Merge` stage with the specified output type.
*
* @param eagerClose set to true in order to make this stage eagerly
* @param eagerComplete set to true in order to make this stage eagerly
* finish as soon as one of its inputs completes
*/
def create[T](clazz: Class[T], inputPorts: Int, eagerClose: Boolean): Graph[UniformFanInShape[T, T], Unit] =
create(inputPorts, eagerClose)
def create[T](clazz: Class[T], inputPorts: Int, eagerComplete: Boolean): Graph[UniformFanInShape[T, T], Unit] =
create(inputPorts, eagerComplete)
}

/**
Expand All @@ -61,7 +61,7 @@ object Merge {
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true)
*
* '''Cancels when''' downstream cancels
*/
Expand All @@ -80,20 +80,20 @@ object MergePreferred {
/**
* Create a new `MergePreferred` stage with the specified output type.
*
* @param eagerClose set to true in order to make this stage eagerly
* @param eagerComplete set to true in order to make this stage eagerly
* finish as soon as one of its inputs completes
*/
def create[T](secondaryPorts: Int, eagerClose: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] =
scaladsl.MergePreferred(secondaryPorts, eagerClose = eagerClose)
def create[T](secondaryPorts: Int, eagerComplete: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] =
scaladsl.MergePreferred(secondaryPorts, eagerComplete = eagerComplete)

/**
* Create a new `MergePreferred` stage with the specified output type.
*
* @param eagerClose set to true in order to make this stage eagerly
* @param eagerComplete set to true in order to make this stage eagerly
* finish as soon as one of its inputs completes
*/
def create[T](clazz: Class[T], secondaryPorts: Int, eagerClose: Boolean): Graph[scaladsl.MergePreferred F438 .MergePreferredShape[T], Unit] =
create(secondaryPorts, eagerClose)
def create[T](clazz: Class[T], secondaryPorts: Int, eagerComplete: Boolean): Graph[scaladsl.MergePreferred.MergePreferredShape[T], Unit] =
create(secondaryPorts, eagerComplete)

}

Expand Down
14 changes: 7 additions & 7 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1466,17 +1466,17 @@ trait FlowOps[+Out, +Mat] {
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*/
def merge[U >: Out, M](that: Graph[SourceShape[U], M]): Repr[U] =
via(mergeGraph(that))
def merge[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean = false): Repr[U] =
via(mergeGraph(that, eagerComplete))

protected def mergeGraph[U >: Out, M](that: Graph[SourceShape[U], M]): Graph[FlowShape[Out @uncheckedVariance, U], M] =
protected def mergeGraph[U >: Out, M](that: Graph[SourceShape[U], M], eagerComplete: Boolean): Graph[FlowShape[Out @uncheckedVariance, U], M] =
GraphDSL.create(that) { implicit b ⇒
r ⇒
val merge = b.add(Merge[U](2))
val merge = b.add(Merge[U](2, eagerComplete))
r ~> merge.in(1)
FlowShape(merge.in(0), merge.out)
}
Expand Down Expand Up @@ -1716,8 +1716,8 @@ trait FlowOpsMat[+Out, +Mat] extends FlowOps[Out, Mat] {
*
* @see [[#merge]].
*/
def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2])(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] =
viaMat(mergeGraph(that))(matF)
def mergeMat[U >: Out, Mat2, Mat3](that: Graph[SourceShape[U], Mat2], eagerComplete: Boolean = false)(matF: (Mat, Mat2) ⇒ Mat3): ReprMat[U, Mat3] =
viaMat(mergeGraph(that, eagerComplete))(matF)

/**
* Interleave is a deterministic merge of the given [[Source]] with elements of this [[Flow]].
Expand Down
21 changes: 11 additions & 10 deletions akka-stream/src/main/scala/akka/stream/scaladsl/Graph.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ object Merge {
* Create a new `Merge` with the specified number of input ports.
*
* @param inputPorts number of input ports
* @param eagerClose if true, the merge will complete as soon as one of its inputs completes.
* @param eagerComplete if true, the merge will complete as soon as one of its inputs completes.
*/
def apply[T](inputPorts: Int, eagerClose: Boolean = false): Merge[T] = new Merge(inputPorts, eagerClose)
def apply[T](inputPorts: Int, eagerComplete: Boolean = false): Merge[T] = new Merge(inputPorts, eagerComplete)

}

Expand All @@ -32,12 +32,13 @@ object Merge {
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*/
final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
final class Merge[T] private (val inputPorts: Int, val eagerComplete: Boolean) extends GraphStage[UniformFanInShape[T, T]] {
require(inputPorts > 1, "A Merge must have more than 1 input port")

val in: immutable.IndexedSeq[Inlet[T]] = Vector.tabulate(inputPorts)(i ⇒ Inlet[T]("Merge.in" + i))
val out: Outlet[T] = Outlet[T]("Merge.out")
override def initialAttributes = Attributes.name("Merge")
Expand Down Expand Up @@ -82,7 +83,7 @@ final class Merge[T] private (val inputPorts: Int, val eagerClose: Boolean) exte
}

override def onUpstreamFinish() =
if (eagerClose) {
if (eagerComplete) {
in.foreach(cancel)
runningUpstreams = 0
if (!pending) completeStage()
Expand Down Expand Up @@ -118,9 +119,9 @@ object MergePreferred {
* Create a new `MergePreferred` with the specified number of secondary input ports.
*
* @param secondaryPorts number of secondary input ports
* @param eagerClose if true, the merge will complete as soon as one of its inputs completes.
* @param eagerComplete if true, the merge will complete as soon as one of its inputs completes.
*/
def apply[T](secondaryPorts: Int, eagerClose: Boolean = false): MergePreferred[T] = new MergePreferred(secondaryPorts, eagerClose)
def apply[T](secondaryPorts: Int, eagerComplete: Boolean = false): MergePreferred[T] = new MergePreferred(secondaryPorts, eagerComplete)
}

/**
Expand All @@ -134,13 +135,13 @@ object MergePreferred {
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' all upstreams complete (eagerClose=false) or one upstream completes (eagerClose=true)
* '''Completes when''' all upstreams complete (eagerComplete=false) or one upstream completes (eagerComplete=true), default value is `false`
*
* '''Cancels when''' downstream cancels
*
* A `Broadcast` has one `in` port and 2 or more `out` ports.
*/
final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
final class MergePreferred[T] private (val secondaryPorts: Int, val eagerComplete: Boolean) extends GraphStage[MergePreferred.MergePreferredShape[T]] {
require(secondaryPorts >= 1, "A MergePreferred must have more than 0 secondary input ports")

override def initialAttributes = Attributes.name("MergePreferred")
Expand All @@ -155,7 +156,7 @@ final class MergePreferred[T] private (val secondaryPorts: Int, val eagerClose:
var openInputs = secondaryPorts + 1
def onComplete(): Unit = {
openInputs -= 1
if (eagerClose || openInputs == 0) completeStage()
if (eagerComplete || openInputs == 0) completeStage()
}

setHandler(out, new OutHandler {
Expand Down
0