From 8a5d096969de6e58fc12a6cddaa7c3680b4d062c Mon Sep 17 00:00:00 2001 From: Scott T Weaver Date: Thu, 31 Mar 2022 17:49:11 -0500 Subject: [PATCH 1/9] Added support for 'min' and 'max' on both Histogram and Summary. --- .../test/scala/zio/metrics/MetricSpec.scala | 24 ++++++++----- ...oncurrentMetricHooksPlatformSpecific.scala | 35 ++++++++++++++++-- ...oncurrentMetricHooksPlatformSpecific.scala | 36 +++++++++++++++++-- ...oncurrentMetricHooksPlatformSpecific.scala | 31 ++++++++++++++-- .../main/scala/zio/metrics/MetricState.scala | 4 +++ 5 files changed, 113 insertions(+), 17 deletions(-) diff --git a/core-tests/shared/src/test/scala/zio/metrics/MetricSpec.scala b/core-tests/shared/src/test/scala/zio/metrics/MetricSpec.scala index c2f5b6376bd0..451c2d2db254 100644 --- a/core-tests/shared/src/test/scala/zio/metrics/MetricSpec.scala +++ b/core-tests/shared/src/test/scala/zio/metrics/MetricSpec.scala @@ -158,7 +158,7 @@ object MetricSpec extends ZIOBaseSpec { _ <- ZIO.succeed(1.0) @@ h _ <- ZIO.succeed(3.0) @@ h state <- h.value - } yield assertTrue(state.count == 2L, state.sum == 4.0) + } yield assertTrue(state.count == 2L, state.sum == 4.0, state.min == 1.0, state.max == 3.0) }, test("direct observe") { val h = Metric.histogram("h2", Histogram.Boundaries.linear(0, 1.0, 10)).tagged(labels1) @@ -167,7 +167,7 @@ object MetricSpec extends ZIOBaseSpec { _ <- h.update(1.0) _ <- h.update(3.0) state <- h.value - } yield assertTrue(state.count == 2L, state.sum == 4.0) + } yield assertTrue(state.count == 2L, state.sum == 4.0, state.min == 1.0, state.max == 3.0) }, test("observeDurations") { val h = @@ -187,7 +187,11 @@ object MetricSpec extends ZIOBaseSpec { } yield assertTrue( state.count == 2L, state.sum > 3.9, - state.sum <= elapsed + state.sum <= elapsed, + state.min >= 1.0, + state.min < 2.0, + state.max >= 3.0, + state.max < 4.0 ) } @@ withLiveClock, test("observeHistogram") { @@ -201,7 +205,9 @@ object MetricSpec extends ZIOBaseSpec { state <- h.value } yield assertTrue( state.count == 2L, - state.sum == 4.0 + state.sum == 4.0, + state.min == 1.0, + state.max == 3.0 ) }, test("observeHistogramWith") { @@ -214,7 +220,7 @@ object MetricSpec extends ZIOBaseSpec { _ <- ZIO.succeed("x") @@ h _ <- ZIO.succeed("xyz") @@ h state <- h.value - } yield assertTrue(state.count == 2L, state.sum == 4.0) + } yield assertTrue(state.count == 2L, state.sum == 4.0, state.min == 1.0, state.max == 3.0) }, test("observeHistogramWith + taggedWith") { val boundaries = Histogram.Boundaries.linear(0, 1.0, 10) @@ -245,7 +251,7 @@ object MetricSpec extends ZIOBaseSpec { _ <- ZIO.succeed(1.0) @@ s _ <- ZIO.succeed(3.0) @@ s state <- s.value - } yield assertTrue(state.count == 2L, state.sum == 4.0) + } yield assertTrue(state.count == 2L, state.sum == 4.0, state.min == 1.0, state.max == 3.0) }, test("direct observe") { val s = Metric @@ -256,7 +262,7 @@ object MetricSpec extends ZIOBaseSpec { _ <- s.update(1.0) _ <- s.update(3.0) state <- s.value - } yield assertTrue(state.count == 2L, state.sum == 4.0) + } yield assertTrue(state.count == 2L, state.sum == 4.0, state.min == 1.0, state.max == 3.0) }, test("observeSummary") { val s = Metric @@ -267,7 +273,7 @@ object MetricSpec extends ZIOBaseSpec { _ <- ZIO.succeed(1.0) @@ s _ <- ZIO.succeed(3.0) @@ s state <- s.value - } yield assertTrue(state.count == 2L, state.sum == 4.0) + } yield assertTrue(state.count == 2L, state.sum == 4.0, state.min == 1.0, state.max == 3.0) }, test("observeSummaryWith") { val s = Metric @@ -279,7 +285,7 @@ object MetricSpec extends ZIOBaseSpec { _ <- ZIO.succeed("x") @@ s _ <- ZIO.succeed("xyz") @@ s state <- s.value - } yield assertTrue(state.count == 2L, state.sum == 4.0) + } yield assertTrue(state.count == 2L, state.sum == 4.0, state.min == 1.0, state.max == 3.0) }, test("observeSummaryWith + taggedWith") { val s0 = Metric diff --git a/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index 7c61455489c9..692f088b8449 100644 --- a/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -38,7 +38,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { val boundaries = Array.ofDim[Double](bounds.length) var count = 0L var sum = 0.0 - val size = bounds.length + var size = bounds.length + val minMax = new MinMax bounds.sorted.zipWithIndex.foreach { case (n, i) => boundaries(i) = n } @@ -59,6 +60,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { values(from) = values(from) + 1 count += 1 sum += value + minMax.update(value) () } @@ -76,7 +78,13 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { builder.result() } - MetricHook(update, () => MetricState.Histogram(getBuckets(), count, sum)) + MetricHook( + update, + { () => + val (min, max) = minMax.get + MetricState.Histogram(getBuckets(), count, min, max, sum) + } + ) } def summary(key: MetricKey.Summary): MetricHook.Summary = { @@ -86,6 +94,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { var head = 0 var count = 0L var sum = 0.0 + val minMax = new MinMax val sortedQuantiles: Chunk[Double] = quantiles.sorted(DoubleOrdering) @@ -129,18 +138,23 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { count += 1 sum += value + minMax.update(value) () } MetricHook( t => observe(t._1, t._2), - () => + { () => + val (min, max) = minMax.get MetricState.Summary( error, snapshot(java.time.Instant.now()), getCount(), + min, + max, getSum() ) + } ) } @@ -168,4 +182,19 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook(update, () => MetricState.Frequency(snapshot())) } + + private final class MinMax { + private var minMax = Option.empty[(Double, Double)] + + def get(): (Double, Double) = minMax.getOrElse(0.0 -> 0.0) + + def update(value: Double): Unit = + minMax = minMax match { + case minMax @ Some((min, max)) => + if (value < min) Some((value, max)) + else if (value > max) Some((min, value)) + else minMax + case None => Some((value, value)) + } + } } diff --git a/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index a7df171c31a0..1eea6477a2c5 100644 --- a/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -41,6 +41,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { val count = new LongAdder val sum = new DoubleAdder val size = bounds.length + val minMax = new AtomicMinMax() bounds.sorted.zipWithIndex.foreach { case (n, i) => boundaries(i) = n } @@ -61,6 +62,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { values.getAndIncrement(from) count.increment() sum.add(value) + minMax.update(value) () } @@ -78,7 +80,13 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { builder.result() } - MetricHook(update, () => MetricState.Histogram(getBuckets(), count.longValue(), sum.doubleValue())) + MetricHook( + update, + { () => + val (min, max) = minMax.get() + MetricState.Histogram(getBuckets(), count.longValue(), min, max, sum.doubleValue()) + } + ) } def summary(key: MetricKey.Summary): MetricHook.Summary = { @@ -88,6 +96,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { val head = new AtomicInteger(0) val count = new LongAdder val sum = new DoubleAdder + val minMax = new AtomicMinMax() val sortedQuantiles: Chunk[Double] = quantiles.sorted(DoubleOrdering) @@ -130,20 +139,26 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { values.set(target, tuple) } + val value = tuple._1 count.increment() - sum.add(tuple._1) + sum.add(value) + minMax.update(value) () } MetricHook( observe(_), - () => + { () => + val (min, max) = minMax.get() MetricState.Summary( error, snapshot(java.time.Instant.now()), getCount(), + min, + max, getSum() ) + } ) } @@ -175,4 +190,19 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook(update, () => MetricState.Frequency(snapshot())) } + + private final class AtomicMinMax { + private val minMax = new AtomicReference(Option.empty[(Double, Double)]) + + def get(): (Double, Double) = minMax.get().getOrElse(0.0 -> 0.0) + + def update(value: Double): Unit = + minMax.updateAndGet { + case minMax @ Some((min, max)) => + if (value < min) Some((value, max)) + else if (value > max) Some((min, value)) + else minMax + case None => Some((value, value)) + } + } } diff --git a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index 7c61455489c9..be6476c55a66 100644 --- a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -39,6 +39,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { var count = 0L var sum = 0.0 val size = bounds.length + val minMax = new MinMax bounds.sorted.zipWithIndex.foreach { case (n, i) => boundaries(i) = n } @@ -59,6 +60,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { values(from) = values(from) + 1 count += 1 sum += value + minMax.update(value) () } @@ -76,7 +78,13 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { builder.result() } - MetricHook(update, () => MetricState.Histogram(getBuckets(), count, sum)) + MetricHook( + update, + { () => + val (min, max) = minMax.get + MetricState.Histogram(getBuckets(), count, min, max, sum) + } + ) } def summary(key: MetricKey.Summary): MetricHook.Summary = { @@ -86,6 +94,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { var head = 0 var count = 0L var sum = 0.0 + val minMax = new MinMax val sortedQuantiles: Chunk[Double] = quantiles.sorted(DoubleOrdering) @@ -129,18 +138,21 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { count += 1 sum += value + minMax.update(value) () } MetricHook( t => observe(t._1, t._2), - () => + { () => + val (min, max) = minMax.get MetricState.Summary( error, snapshot(java.time.Instant.now()), getCount(), getSum() ) + } ) } @@ -168,4 +180,19 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook(update, () => MetricState.Frequency(snapshot())) } + + private final class MinMax { + private var minMax = Option.empty[(Double, Double)] + + def get(): (Double, Double) = minMax.getOrElse(0.0 -> 0.0) + + def update(value: Double): Unit = + minMax = minMax match { + case minMax @ Some((min, max)) => + if (value < min) Some((value, max)) + else if (value > max) Some((min, value)) + else minMax + case None => Some((value, value)) + } + } } diff --git a/core/shared/src/main/scala/zio/metrics/MetricState.scala b/core/shared/src/main/scala/zio/metrics/MetricState.scala index 795690dc3924..b0e3194a290a 100644 --- a/core/shared/src/main/scala/zio/metrics/MetricState.scala +++ b/core/shared/src/main/scala/zio/metrics/MetricState.scala @@ -40,6 +40,8 @@ object MetricState { final case class Histogram( buckets: Chunk[(Double, Long)], count: Long, + min: Double, + max: Double, sum: Double ) extends MetricState[MetricKeyType.Histogram] @@ -47,6 +49,8 @@ object MetricState { error: Double, quantiles: Chunk[(Double, Option[Double])], count: Long, + min: Double, + max: Double, sum: Double ) extends MetricState[MetricKeyType.Summary] } From cf33a3d70ccae5cd357f046c8378c0b48a6407b4 Mon Sep 17 00:00:00 2001 From: Scott T Weaver Date: Thu, 31 Mar 2022 18:27:11 -0500 Subject: [PATCH 2/9] Fixed compilation issue for ConcurrentMetricHooksPlatformSpecific due to missing 'min' and 'max' values. --- .../metrics/ConcurrentMetricHooksPlatformSpecific.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index be6476c55a66..c57709b2e3b3 100644 --- a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -150,6 +150,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { error, snapshot(java.time.Instant.now()), getCount(), + min, + max, getSum() ) } From c05dff04671b606b22c6741eb9252003dec575d7 Mon Sep 17 00:00:00 2001 From: Scott T Weaver Date: Thu, 31 Mar 2022 20:23:34 -0500 Subject: [PATCH 3/9] Fix JS compilation issues due to missing parens. --- .../metrics/ConcurrentMetricHooksPlatformSpecific.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index 692f088b8449..3874cee40a97 100644 --- a/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -81,7 +81,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook( update, { () => - val (min, max) = minMax.get + val (min, max) = minMax.get() MetricState.Histogram(getBuckets(), count, min, max, sum) } ) @@ -145,7 +145,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook( t => observe(t._1, t._2), { () => - val (min, max) = minMax.get + val (min, max) = minMax.get() MetricState.Summary( error, snapshot(java.time.Instant.now()), From b5d1382b060e1d57f3865b42a481d9193d4a27f3 Mon Sep 17 00:00:00 2001 From: Scott T Weaver Date: Fri, 1 Apr 2022 11:19:59 -0500 Subject: [PATCH 4/9] Implemeted recommended performance improvements. --- ...oncurrentMetricHooksPlatformSpecific.scala | 40 ++++++---------- ...oncurrentMetricHooksPlatformSpecific.scala | 46 ++++++++----------- ...oncurrentMetricHooksPlatformSpecific.scala | 36 ++++++--------- 3 files changed, 48 insertions(+), 74 deletions(-) diff --git a/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index 3874cee40a97..fa85e55ae64b 100644 --- a/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -39,7 +39,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { var count = 0L var sum = 0.0 var size = bounds.length - val minMax = new MinMax + var min = Double.MaxValue + var max = Double.MinValue bounds.sorted.zipWithIndex.foreach { case (n, i) => boundaries(i) = n } @@ -60,7 +61,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { values(from) = values(from) + 1 count += 1 sum += value - minMax.update(value) + if (value < min) min = value + if (value > max) min = value () } @@ -80,10 +82,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook( update, - { () => - val (min, max) = minMax.get() + () => MetricState.Histogram(getBuckets(), count, min, max, sum) - } ) } @@ -94,12 +94,17 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { var head = 0 var count = 0L var sum = 0.0 - val minMax = new MinMax + var min = Double.MaxValue + var max = Double.MinValue val sortedQuantiles: Chunk[Double] = quantiles.sorted(DoubleOrdering) def getCount(): Long = count + def getMin(): Double = min + + def getMax(): Double = max + def getSum(): Double = sum // Just before the Snapshot we filter out all values older than maxAge @@ -138,20 +143,20 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { count += 1 sum += value - minMax.update(value) + if (value < min) min = value + if (value > max) min = value () } MetricHook( t => observe(t._1, t._2), { () => - val (min, max) = minMax.get() MetricState.Summary( error, snapshot(java.time.Instant.now()), getCount(), - min, - max, + getMin(), + getMax(), getSum() ) } @@ -182,19 +187,4 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook(update, () => MetricState.Frequency(snapshot())) } - - private final class MinMax { - private var minMax = Option.empty[(Double, Double)] - - def get(): (Double, Double) = minMax.getOrElse(0.0 -> 0.0) - - def update(value: Double): Unit = - minMax = minMax match { - case minMax @ Some((min, max)) => - if (value < min) Some((value, max)) - else if (value > max) Some((min, value)) - else minMax - case None => Some((value, value)) - } - } } diff --git a/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index 1eea6477a2c5..55f12110bcd6 100644 --- a/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -41,7 +41,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { val count = new LongAdder val sum = new DoubleAdder val size = bounds.length - val minMax = new AtomicMinMax() + val min = new AtomicReference(Double.MaxValue) + val max = new AtomicReference(Double.MinValue) bounds.sorted.zipWithIndex.foreach { case (n, i) => boundaries(i) = n } @@ -62,7 +63,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { values.getAndIncrement(from) count.increment() sum.add(value) - minMax.update(value) + if (value < min.get()) min.set(value) + if (value > max.get()) max.set(value) () } @@ -82,10 +84,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook( update, - { () => - val (min, max) = minMax.get() - MetricState.Histogram(getBuckets(), count.longValue(), min, max, sum.doubleValue()) - } + () => + MetricState.Histogram(getBuckets(), count.longValue(), min.get(), max.get(), sum.doubleValue()) ) } @@ -96,13 +96,20 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { val head = new AtomicInteger(0) val count = new LongAdder val sum = new DoubleAdder - val minMax = new AtomicMinMax() + val min = new AtomicReference(Double.MaxValue) + val max = new AtomicReference(Double.MinValue) val sortedQuantiles: Chunk[Double] = quantiles.sorted(DoubleOrdering) def getCount(): Long = count.longValue + def getMin(): Double = + min.get() + + def getMax(): Double = + max.get() + def getSum(): Double = sum.doubleValue @@ -142,23 +149,22 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { val value = tuple._1 count.increment() sum.add(value) - minMax.update(value) + if (value < min.get()) min.set(value) + if (value > max.get()) max.set(value) () } MetricHook( observe(_), - { () => - val (min, max) = minMax.get() + () => MetricState.Summary( error, snapshot(java.time.Instant.now()), getCount(), - min, - max, + getMin(), + getMax(), getSum() ) - } ) } @@ -191,18 +197,4 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook(update, () => MetricState.Frequency(snapshot())) } - private final class AtomicMinMax { - private val minMax = new AtomicReference(Option.empty[(Double, Double)]) - - def get(): (Double, Double) = minMax.get().getOrElse(0.0 -> 0.0) - - def update(value: Double): Unit = - minMax.updateAndGet { - case minMax @ Some((min, max)) => - if (value < min) Some((value, max)) - else if (value > max) Some((min, value)) - else minMax - case None => Some((value, value)) - } - } } diff --git a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index c57709b2e3b3..d9687affe164 100644 --- a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -39,7 +39,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { var count = 0L var sum = 0.0 val size = bounds.length - val minMax = new MinMax + var min = Double.MaxValue + var max = Double.MinValue bounds.sorted.zipWithIndex.foreach { case (n, i) => boundaries(i) = n } @@ -60,7 +61,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { values(from) = values(from) + 1 count += 1 sum += value - minMax.update(value) + if (value < min) min = value + if (value > max) min = value () } @@ -81,7 +83,6 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook( update, { () => - val (min, max) = minMax.get MetricState.Histogram(getBuckets(), count, min, max, sum) } ) @@ -94,12 +95,17 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { var head = 0 var count = 0L var sum = 0.0 - val minMax = new MinMax + var min = Double.MaxValue + var max = Double.MinValue val sortedQuantiles: Chunk[Double] = quantiles.sorted(DoubleOrdering) def getCount(): Long = count + def getMin(): Double = min + + def getMax(): Double = max + def getSum(): Double = sum // Just before the Snapshot we filter out all values older than maxAge @@ -139,19 +145,20 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { count += 1 sum += value minMax.update(value) + if (value < min) min = value + if (value > max) min = value () } MetricHook( t => observe(t._1, t._2), { () => - val (min, max) = minMax.get MetricState.Summary( error, snapshot(java.time.Instant.now()), getCount(), - min, - max, + getMin(), + getMax(), getSum() ) } @@ -182,19 +189,4 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook(update, () => MetricState.Frequency(snapshot())) } - - private final class MinMax { - private var minMax = Option.empty[(Double, Double)] - - def get(): (Double, Double) = minMax.getOrElse(0.0 -> 0.0) - - def update(value: Double): Unit = - minMax = minMax match { - case minMax @ Some((min, max)) => - if (value < min) Some((value, max)) - else if (value > max) Some((min, value)) - else minMax - case None => Some((value, value)) - } - } } From 7761c7f132098e346d943d374c0283180e903562 Mon Sep 17 00:00:00 2001 From: Scott T Weaver Date: Fri, 1 Apr 2022 12:52:53 -0500 Subject: [PATCH 5/9] Worked with Adam Fraser to come up with a safer and more performant approach for supporting atomic double values. Fixed issue in native and js implementations that was causing test failures. --- ...oncurrentMetricHooksPlatformSpecific.scala | 10 ++- ...oncurrentMetricHooksPlatformSpecific.scala | 64 ++++++++++++++++--- ...oncurrentMetricHooksPlatformSpecific.scala | 10 ++- 3 files changed, 62 insertions(+), 22 deletions(-) diff --git a/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index fa85e55ae64b..300c10b4ea97 100644 --- a/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/js/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -62,7 +62,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { count += 1 sum += value if (value < min) min = value - if (value > max) min = value + if (value > max) max = value () } @@ -82,8 +82,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook( update, - () => - MetricState.Histogram(getBuckets(), count, min, max, sum) + () => MetricState.Histogram(getBuckets(), count, min, max, sum) ) } @@ -144,13 +143,13 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { count += 1 sum += value if (value < min) min = value - if (value > max) min = value + if (value > max) max = value () } MetricHook( t => observe(t._1, t._2), - { () => + () => MetricState.Summary( error, snapshot(java.time.Instant.now()), @@ -159,7 +158,6 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { getMax(), getSum() ) - } ) } diff --git a/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index 55f12110bcd6..911a27f02610 100644 --- a/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -20,6 +20,7 @@ import zio.metrics._ import java.util.concurrent.atomic._ import java.util.concurrent.ConcurrentHashMap +import java.lang.{Double => JDouble} class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { def counter(key: MetricKey.Counter): MetricHook.Counter = { @@ -34,6 +35,28 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook(v => ref.set(v), () => MetricState.Gauge(ref.get())) } + private def updateMin(atomic: AtomicDouble, value: Double): Unit = { + var loop = true + + while (loop) { + val current = atomic.get() + if (value < current) { + loop = !atomic.compareAndSet(current, value) + } else loop = false + } + } + + private def updateMax(atomic: AtomicDouble, value: Double): Unit = { + var loop = true + + while (loop) { + val current = atomic.get() + if (value > current) { + loop = !atomic.compareAndSet(current, value) + } else loop = false + } + + } def histogram(key: MetricKey.Histogram): MetricHook.Histogram = { val bounds = key.keyType.boundaries.values val values = new AtomicLongArray(bounds.length + 1) @@ -41,8 +64,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { val count = new LongAdder val sum = new DoubleAdder val size = bounds.length - val min = new AtomicReference(Double.MaxValue) - val max = new AtomicReference(Double.MinValue) + val min = AtomicDouble.make(Double.MaxValue) + val max = AtomicDouble.make(Double.MinValue) bounds.sorted.zipWithIndex.foreach { case (n, i) => boundaries(i) = n } @@ -63,8 +86,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { values.getAndIncrement(from) count.increment() sum.add(value) - if (value < min.get()) min.set(value) - if (value > max.get()) max.set(value) + updateMin(min, value) + updateMax(max, value) () } @@ -84,8 +107,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook( update, - () => - MetricState.Histogram(getBuckets(), count.longValue(), min.get(), max.get(), sum.doubleValue()) + () => MetricState.Histogram(getBuckets(), count.longValue(), min.get(), max.get(), sum.doubleValue()) ) } @@ -96,8 +118,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { val head = new AtomicInteger(0) val count = new LongAdder val sum = new DoubleAdder - val min = new AtomicReference(Double.MaxValue) - val max = new AtomicReference(Double.MinValue) + val min = AtomicDouble.make(Double.MaxValue) + val max = AtomicDouble.make(Double.MinValue) val sortedQuantiles: Chunk[Double] = quantiles.sorted(DoubleOrdering) @@ -149,8 +171,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { val value = tuple._1 count.increment() sum.add(value) - if (value < min.get()) min.set(value) - if (value > max.get()) max.set(value) + updateMin(min, value) + updateMax(max, value) () } @@ -197,4 +219,26 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook(update, () => MetricState.Frequency(snapshot())) } + /** + * Scala's `Double` implemenntation does not play nicely with Java's `AtomicReference.compareAndSwap` as + * s`compareAndSwap` uses Java's `==` reference equality when it performs an equality check. This means + * that even if two Scala `Double`s have the same value, they will still fail `compareAndSwap` as they + * will most likely be two, distinct object references. Thus, `compareAndSwap` will. + * + */ + private final class AtomicDouble private (private val ref: AtomicLong) { + def get(): Double = + JDouble.longBitsToDouble(ref.get()) + + def compareAndSet(expected: Double, newValue: Double): Boolean = + ref.compareAndSet(JDouble.doubleToLongBits(expected), JDouble.doubleToLongBits(newValue)) + + } + + private object AtomicDouble { + + def make(value: Double): AtomicDouble = + new AtomicDouble(new AtomicLong(JDouble.doubleToLongBits(value))) + } + } diff --git a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index d9687affe164..0aee7f3b876b 100644 --- a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -62,7 +62,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { count += 1 sum += value if (value < min) min = value - if (value > max) min = value + if (value > max) max = value () } @@ -82,9 +82,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook( update, - { () => + () => MetricState.Histogram(getBuckets(), count, min, max, sum) - } ) } @@ -146,13 +145,13 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { sum += value minMax.update(value) if (value < min) min = value - if (value > max) min = value + if (value > max) max = value () } MetricHook( t => observe(t._1, t._2), - { () => + () => MetricState.Summary( error, snapshot(java.time.Instant.now()), @@ -161,7 +160,6 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { getMax(), getSum() ) - } ) } From 28f43aa90f9e804fad00ed67b3cf82ee1fc782a1 Mon Sep 17 00:00:00 2001 From: Scott T Weaver Date: Fri, 1 Apr 2022 13:06:18 -0500 Subject: [PATCH 6/9] Completed documentation of 'AtomicDouble'. --- .../ConcurrentMetricHooksPlatformSpecific.scala | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index 911a27f02610..b74008ea5f94 100644 --- a/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -220,10 +220,14 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { } /** - * Scala's `Double` implemenntation does not play nicely with Java's `AtomicReference.compareAndSwap` as - * s`compareAndSwap` uses Java's `==` reference equality when it performs an equality check. This means + * Scala's `Double` implementation does not play nicely with Java's `AtomicReference.compareAndSwap` as + * `compareAndSwap` uses Java's `==` reference equality when it performs an equality check. This means * that even if two Scala `Double`s have the same value, they will still fail `compareAndSwap` as they - * will most likely be two, distinct object references. Thus, `compareAndSwap` will. + * will most likely be two, distinct object references. Thus, `compareAndSwap` will fail. + * + * This `AtomicDouble` implementation is a workaround for this issue that is backed by an `AtomicLong` instead + * of an `AtomicReference` in which the Double's bits are stored as a Long value. This approach also reduces boxing and + * unboxing overhead that can be incurred with `AtomicReference`. * */ private final class AtomicDouble private (private val ref: AtomicLong) { From 5e6fdac1c80744c185f3f40a85d05215f2d05761 Mon Sep 17 00:00:00 2001 From: Scott T Weaver Date: Fri, 1 Apr 2022 13:49:56 -0500 Subject: [PATCH 7/9] Formatting --- ...oncurrentMetricHooksPlatformSpecific.scala | 26 ++++++++++--------- ...oncurrentMetricHooksPlatformSpecific.scala | 3 +-- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index b74008ea5f94..36a9cd20d5b5 100644 --- a/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/jvm/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -64,8 +64,8 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { val count = new LongAdder val sum = new DoubleAdder val size = bounds.length - val min = AtomicDouble.make(Double.MaxValue) - val max = AtomicDouble.make(Double.MinValue) + val min = AtomicDouble.make(Double.MaxValue) + val max = AtomicDouble.make(Double.MinValue) bounds.sorted.zipWithIndex.foreach { case (n, i) => boundaries(i) = n } @@ -220,16 +220,18 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { } /** - * Scala's `Double` implementation does not play nicely with Java's `AtomicReference.compareAndSwap` as - * `compareAndSwap` uses Java's `==` reference equality when it performs an equality check. This means - * that even if two Scala `Double`s have the same value, they will still fail `compareAndSwap` as they - * will most likely be two, distinct object references. Thus, `compareAndSwap` will fail. - * - * This `AtomicDouble` implementation is a workaround for this issue that is backed by an `AtomicLong` instead - * of an `AtomicReference` in which the Double's bits are stored as a Long value. This approach also reduces boxing and - * unboxing overhead that can be incurred with `AtomicReference`. - * - */ + * Scala's `Double` implementation does not play nicely with Java's + * `AtomicReference.compareAndSwap` as `compareAndSwap` uses Java's `==` + * reference equality when it performs an equality check. This means that even + * if two Scala `Double`s have the same value, they will still fail + * `compareAndSwap` as they will most likely be two, distinct object + * references. Thus, `compareAndSwap` will fail. + * + * This `AtomicDouble` implementation is a workaround for this issue that is + * backed by an `AtomicLong` instead of an `AtomicReference` in which the + * Double's bits are stored as a Long value. This approach also reduces boxing + * and unboxing overhead that can be incurred with `AtomicReference`. + */ private final class AtomicDouble private (private val ref: AtomicLong) { def get(): Double = JDouble.longBitsToDouble(ref.get()) diff --git a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index 0aee7f3b876b..9713adbabe49 100644 --- a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -82,8 +82,7 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { MetricHook( update, - () => - MetricState.Histogram(getBuckets(), count, min, max, sum) + () => MetricState.Histogram(getBuckets(), count, min, max, sum) ) } From f86c170ee8acc83ffa850a244e821e546715f7a7 Mon Sep 17 00:00:00 2001 From: Scott T Weaver Date: Fri, 1 Apr 2022 14:07:30 -0500 Subject: [PATCH 8/9] Fix native compile issue. --- .../internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala | 1 - 1 file changed, 1 deletion(-) diff --git a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala index 9713adbabe49..18a820447c5c 100644 --- a/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala +++ b/core/native/src/main/scala/zio/internal/metrics/ConcurrentMetricHooksPlatformSpecific.scala @@ -142,7 +142,6 @@ class ConcurrentMetricHooksPlatformSpecific extends ConcurrentMetricHooks { count += 1 sum += value - minMax.update(value) if (value < min) min = value if (value > max) max = value () From 4bcb802e922b559ac19c0f9cdcbb97f769cf0b7f Mon Sep 17 00:00:00 2001 From: Scott T Weaver Date: Fri, 1 Apr 2022 14:48:04 -0500 Subject: [PATCH 9/9] Attempt to address test flakiness. --- .../shared/src/test/scala/zio/metrics/MetricSpec.scala | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core-tests/shared/src/test/scala/zio/metrics/MetricSpec.scala b/core-tests/shared/src/test/scala/zio/metrics/MetricSpec.scala index 451c2d2db254..f9516046f89b 100644 --- a/core-tests/shared/src/test/scala/zio/metrics/MetricSpec.scala +++ b/core-tests/shared/src/test/scala/zio/metrics/MetricSpec.scala @@ -189,11 +189,11 @@ object MetricSpec extends ZIOBaseSpec { state.sum > 3.9, state.sum <= elapsed, state.min >= 1.0, - state.min < 2.0, + state.min < state.max, state.max >= 3.0, - state.max < 4.0 + state.max < elapsed ) - } @@ withLiveClock, + } @@ withLiveClock @@ flaky, test("observeHistogram") { val h = Metric .histogram("h4", Histogram.Boundaries.linear(0, 1.0, 10))