8000 JVM metrics + DefaultJvmMetricsExporter ZIOApp by vigoo · Pull Request #5582 · zio/zio · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

JVM metrics + DefaultJvmMetricsExporter ZIOApp #5582

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 8 commits into from
Sep 30, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 56 additions & 0 deletions core/jvm/src/main/scala/zio/metrics/jvm/BufferPools.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package zio.metrics.jvm

import com.github.ghik.silencer.silent

import zio._

import java.lang.management.{BufferPoolMXBean, ManagementFactory}

import scala.collection.JavaConverters._

trait BufferPools extends JvmMetrics {
override type Feature = BufferPools
override val featureTag: Tag[BufferPools] = Tag[BufferPools]

/** Used bytes of a given JVM buffer pool. */
private def bufferPoolUsedBytes(pool: String): ZIOMetric.Gauge[Long] =
ZIOMetric.setGaugeWith("jvm_buffer_pool_used_bytes", MetricLabel("pool", pool))(_.toDouble)

/** Bytes capacity of a given JVM buffer pool. */
private def bufferPoolCapacityBytes(pool: String): ZIOMetric.Gauge[Long] =
ZIOMetric.setGaugeWith("jvm_buffer_pool_capacity_bytes", MetricLabel("pool", pool))(_.toDouble)

/** Used buffers of a given JVM buffer pool. */
private def bufferPoolUsedBuffers(pool: String): ZIOMetric.Gauge[Long] =
ZIOMetric.setGaugeWith("jvm_buffer_pool_used_buffers", MetricLabel("pool", pool))(_.toDouble)

private def reportBufferPoolMetrics(
bufferPoolMXBeans: List[BufferPoolMXBean]
): ZIO[Any, Throwable, Unit] =
ZIO.foreachParDiscard(bufferPoolMXBeans) { bufferPoolMXBean =>
for {
name <- Task(bufferPoolMXBean.getName)
_ <- Task(bufferPoolMXBean.getMemoryUsed) @@ bufferPoolUsedBytes(name)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not related to this PR but you could imagine there being an unsafe method to update a metric so this whole thing could take place within one ZIO.succeed block.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would probably also help in some migration situations allowing you to (temporarily) not convert something to an effect that deep inside called some non-ZIO metric API to report a metric.

_ <- Task(bufferPoolMXBean.getTotalCapacity) @@ bufferPoolCapacityBytes(name)
_ <- Task(bufferPoolMXBean.getCount) @@ bufferPoolUsedBuffers(name)
} yield ()
}

@silent("JavaConverters")
val collectMetrics: ZManaged[Has[Clock], Throwable, BufferPools] =
for {
bufferPoolMXBeans <- Task(
ManagementFactory.getPlatformMXBeans(classOf[BufferPoolMXBean]).asScala.toList
).toManaged
_ <- reportBufferPoolMetrics(bufferPoolMXBeans)
.repeat(collectionSchedule)
.interruptible
.forkManaged
} yield this
}

object BufferPools extends BufferPools with JvmMetrics.DefaultSchedule {
def withSchedule(schedule: Schedule[Any, Any, Unit]): BufferPools = new BufferPools {
override protected val collectionSchedule: Schedule[Any, Any, Unit] = schedule
}
}
49 changes: 49 additions & 0 deletions core/jvm/src/main/scala/zio/metrics/jvm/ClassLoading.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package zio.metrics.jvm

import zio.ZIOMetric.Gauge
import zio._

import java.lang.management.{ClassLoadingMXBean, ManagementFactory}

trait ClassLoading extends JvmMetrics {
override type Feature = ClassLoading
override val featureTag: Tag[ClassLoading] = Tag[ClassLoading]

/** The number of classes that are currently loaded in the JVM */
private val loadedClassCount: Gauge[Int] =
ZIOMetric.setGaugeWith("jvm_classes_loaded")(_.toDouble)

/** The total number of classes that have been loaded since the JVM has started execution */
private val totalLoadedClassCount: Gauge[Long] =
ZIOMetric.setGaugeWith("jvm_classes_loaded_total")(_.toDouble)

/** The total number of classes that have been unloaded since the JVM has started execution */
private val unloadedClassCount: Gauge[Long] =
ZIOMetric.setGaugeWith("jvm_classes_unloaded_total")(_.toDouble)

private def reportClassLoadingMetrics(
classLoadingMXBean: ClassLoadingMXBean
): ZIO[Any, Throwable, Unit] =
for {
_ <- Task(classLoadingMXBean.getLoadedClassCount) @@ loadedClassCount
_ <- Task(classLoadingMXBean.getTotalLoadedClassCount) @@ totalLoadedClassCount
_ <- Task(classLoadingMXBean.getUnloadedClassCount) @@ unloadedClassCount
} yield ()

val collectMetrics: ZManaged[Has[Clock], Throwable, ClassLoading] =
for {
classLoadingMXBean <-
Task(ManagementFactory.getPlatformMXBean(classOf[ClassLoadingMXBean])).toManaged
_ <- reportClassLoadingMetrics(classLoadingMXBean)
.repeat(collectionSchedule)
.interruptible
.forkManaged
} yield this
}

/** Exports metrics related to JVM class loading */
object ClassLoading extends ClassLoading with JvmMetrics.DefaultSchedule {
def withSchedule(schedule: Schedule[Any, Any, Unit]): ClassLoading = new ClassLoading {
override protected val collectionSchedule: Schedule[Any, Any, Unit] = schedule
}
}
39 changes: 39 additions & 0 deletions 8000 core/jvm/src/main/scala/zio/metrics/jvm/DefaultJvmMetrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package zio.metrics.jvm

import zio._

/** JVM metrics, compatible with the prometheus-hotspot library, with configurable schedule */
trait DefaultJvmMetrics extends MultipleJvmMetrics {
protected val collectionSchedule: Schedule[Any, Any, Unit]

override protected lazy val collectors: NonEmptyChunk[JvmMetrics] =
NonEmptyChunk(
BufferPools.withSchedule(collectionSchedule),
ClassLoading.withSchedule(collectionSchedule),
GarbageCollector.withSchedule(collectionSchedule),
MemoryAllocation.withSchedule(collectionSchedule),
MemoryPools.withSchedule(collectionSchedule),
Standard.withSchedule(collectionSchedule),
Thread.withSchedule(collectionSchedule),
VersionInfo.withSchedule(collectionSchedule)
)

/** Layer that starts collecting the same JVM metrics as the Prometheus Java client's default exporters */
lazy val live: ZLayer[Has[Clock] with Has[System], Throwable, Has[BufferPools] with Has[ClassLoading] with Has[
GarbageCollector
] with Has[MemoryAllocation] with Has[MemoryPools] with Has[Standard] with Has[Thread] with Has[VersionInfo]] = {
BufferPools.live ++
ClassLoading.live ++
GarbageCollector.live ++
MemoryAllocation.live ++
MemoryPools.live ++
Standard.live ++
Thread.live ++
VersionInfo.live
}
}

/** JVM metrics, compatible with the prometheus-hotspot library */
object DefaultJvmMetrics extends DefaultJvmMetrics {
override protected val collectionSchedule: Schedule[Any, Any, Unit] = JvmMetrics.defaultSchedule
}
50 changes: 50 additions & 0 deletions core/jvm/src/main/scala/zio/metrics/jvm/GarbageCollector.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package zio.metrics.jvm

import com.github.ghik.silencer.silent

import zio.ZIOMetric.Gauge
import zio._

import java.lang.management.{GarbageCollectorMXBean, ManagementFactory}

import scala.collection.JavaConverters._

trait GarbageCollector extends JvmMetrics {
override type Feature = GarbageCollector
override val featureTag: zio.Tag[GarbageCollector] = Tag[GarbageCollector]

/** Time spent in a given JVM garbage collector in seconds. */
private def gcCollectionSecondsSum(gc: String): Gauge[Long] =
ZIOMetric.setGaugeWith("jvm_gc_collection_seconds_sum", MetricLabel("gc", gc))((ms: Long) => ms.toDouble / 1000.0)

private def gcCollectionSecondsCount(gc: String): Gauge[Long] =
ZIOMetric.setGaugeWith("jvm_gc_collection_seconds_count", MetricLabel("gc", gc))(_.toDouble)

private def reportGarbageCollectionMetrics(
garbageCollectors: List[GarbageCollectorMXBean]
): ZIO[Any, Throwable, Unit] =
ZIO.foreachParDiscard(garbageCollectors) { gc =>
for {
name <- Task(gc.getName)
_ <- Task(gc.getCollectionCount) @@ gcCollectionSecondsCount(name)
_ <- Task(gc.getCollectionTime) @@ gcCollectionSecondsSum(name)
} yield ()
}

@silent("JavaConverters")
val collectMetrics: ZManaged[Has[Clock], Throwable, GarbageCollector] =
for {
classLoadingMXBean <- Task(ManagementFactory.getGarbageCollectorMXBeans.asScala.toList).toManaged
_ <- reportGarbageCollectionMetrics(classLoadingMXBean)
.repeat(collectionSchedule)
.interruptible
.forkManaged
} yield this
}

/** Exports metrics related to the garbage collector */
object GarbageCollector extends GarbageCollector with JvmMetrics.DefaultSchedule {
def withSchedule(schedule: Schedule[Any, Any, Unit]): GarbageCollector = new GarbageCollector {
override protected val collectionSchedule: Schedule[Any, Any, Unit] = schedule
}
}
37 changes: 37 additions & 0 deletions core/jvm/src/main/scala/zio/metrics/jvm/JvmMetrics.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package zio.metrics.jvm

import com.github.ghik.silencer.silent
import zio._

trait JvmMetrics { self =>
type Feature
val featureTag: Tag[Feature]

protected val collectionSchedule: Schedule[Any, Any, Unit]

val collectMetrics: ZManaged[Has[Clock] with Has[System], Throwable, Feature]

/** A layer that when constructed forks a fiber that periodically updates the JVM metrics */
lazy val live: ZLayer[Has[Clock] with Has[System], Throwable, Has[Feature]] =
collectMetrics.toLayer(featureTag)

/** A ZIO application that periodically updates the JVM metrics */
lazy val app: ZIOApp = new ZIOApp {
@silent private implicit val ftag: zio.Tag[Feature] = featureTag
override val tag: Tag[Environment] = Tag[Environment]
override type Environment = Has[Clock] with Has[System] with Has[Feature]
override val layer: ZLayer[Has[ZIOAppArgs], Any, Environment] = {
Clock.live ++ System.live >+> live
}
override def run: ZIO[Environment with Has[ZIOAppArgs], Any, Any] = ZIO.unit
}
}

object JvmMetrics {
val defaultSchedule: Schedule[Any, Any, Unit] = Schedule.fixed(10.seconds).unit

trait DefaultSchedule {
self: JvmMetrics =>
override protected val collectionSchedule: Schedule[Any, Any, Unit] = defaultSchedule
}
}
103 changes: 103 additions & 0 deletions core/jvm/src/main/scala/zio/metrics/jvm/MemoryAllocation.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package zio.metrics.jvm

import com.github.ghik.silencer.silent

import com.sun.management.GarbageCollectionNotificationInfo
import zio._
import zio.ZIOMetric.Counter

import java.lang.management.ManagementFactory
import javax.management.openmbean.CompositeData
import javax.management.{Notification, NotificationEmitter, NotificationListener}
import scala.collection.mutable
import scala.collection.JavaConverters._

trait MemoryAllocation extends JvmMetrics {
override type Feature = MemoryAllocation
override val featureTag: Tag[MemoryAllocation] = Tag[MemoryAllocation]

/** Total bytes allocated in a given JVM memory pool. Only updated after GC, not continuously. */
private def countAllocations(pool: String): Counter[Long] =
ZIOMetric.countValueWith("jvm_memory_pool_allocated_bytes_total", MetricLabel("pool", pool))(_.toDouble)

private class Listener(runtime: Runtime[Any]) extends NotificationListener {
private val lastMemoryUsage: mutable.Map[String, Long] = mutable.HashMap.empty

@silent("JavaConverters")
override def handleNotification(notification: Notification, handback: Any): Unit = {
val info =
GarbageCollectionNotificationInfo.from(notification.getUserData.asInstanceOf[CompositeData])
val gcInfo = info.getGcInfo
val memoryUsageBeforeGc = gcInfo.getMemoryUsageBeforeGc
val memoryUsageAfterGc = gcInfo.getMemoryUsageAfterGc
for (entry <- memoryUsageBeforeGc.entrySet.asScala) {
val memoryPool = entry.getKey
val before = entry.getValue.getUsed
D619 val after = memoryUsageAfterGc.get(memoryPool).getUsed
handleMemoryPool(memoryPool, before, after)
}
}

private def handleMemoryPool(memoryPool: String, before: Long, after: Long): Unit = {
/*
* Calculate increase in the memory pool by comparing memory used
* after last GC, before this GC, and after this GC.
* See ascii illustration below.
* Make sure to count only increases and ignore decreases.
* (Typically a pool will only increase between GCs or during GCs, not both.
* E.g. eden pools between GCs. Survivor and old generation pools during GCs.)
*
* |<-- diff1 -->|<-- diff2 -->|
* Timeline: |-- last GC --| |---- GC -----|
* ___^__ ___^____ ___^___
* Mem. usage vars: / last \ / before \ / after \
*/
// Get last memory usage after GC and remember memory used after for next time
val last = lastMemoryUsage.getOrElse(memoryPool, 0L)
lastMemoryUsage.put(memoryPool, after)
// Difference since last GC
var diff1 = before - last
// Difference during this GC
var diff2 = after - before
// Make sure to only count increases
if (diff1 < 0) diff1 = 0
if (diff2 < 0) diff2 = 0
val increase = diff1 + diff2
if (increase > 0) {
val effect: ZIO[Any, Nothing, Long] = UIO(increase) @@ countAllocations(memoryPool)
runtime.unsafeRun(effect.unit)
}
}
}

@silent("JavaConverters")
override val collectMetrics: ZManaged[Has[Clock] with Has[System], Throwable, MemoryAllocation] =
ZManaged
.acquireReleaseWith(
for {
runtime <- ZIO.runtime[Any]
listener = new Listener(runtime)
garbageCollectorMXBeans <- Task(ManagementFactory.getGarbageCollectorMXBeans.asScala)
_ <- ZIO.foreachDiscard(garbageCollectorMXBeans) {
case emitter: NotificationEmitter =>
Task(emitter.addNotificationListener(listener, null, null))
case _ => ZIO.unit
}
} yield (listener, garbageCollectorMXBeans)
) { case (listener, garbageCollectorMXBeans) =>
ZIO
.foreachDiscard(garbageCollectorMXBeans) {
case emitter: NotificationEmitter =>
Task(emitter.removeNotificationListener(listener))
case _ => ZIO.unit
}
.orDie
}
.as(this)
}

object MemoryAllocation extends MemoryAllocation with JvmMetrics.DefaultSchedule {
def withSchedule(schedule: Schedule[Any, Any, Unit]): MemoryAllocation = new MemoryAllocation {
override protected val collectionSchedule: Schedule[Any, Any, Unit] = schedule
}
}
Loading
0