10000 Kamon 2.2.x, http4s 0.23, cats effect 3 by hygt · Pull Request #45 · kamon-io/kamon-http4s · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Kamon 2.2.x, http4s 0.23, cats effect 3 #45

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
* =========================================================================================
*/

val kamonCore = "io.kamon" %% "kamon-core" % "2.1.0"
val kamonTestkit = "io.kamon" %% "kamon-testkit" % "2.1.0"
val kamonCommon = "io.kamon" %% "kamon-instrumentation-common" % "2.1.0"
val kamonCore = "io.kamon" %% "kamon-core" % "2.2.3"
val kamonTestkit = "io.kamon" %% "kamon-testkit" % "2.2.3"
val kamonCommon = "io.kamon" %% "kamon-instrumentation-common" % "2.2.3"

val server = "org.http4s" %% "http4s-blaze-server" % "0.21.3"
val client = "org.http4s" %% "http4s-blaze-client" % "0.21.3"
val dsl = "org.http4s" %% "http4s-dsl" % "0.21.3"
val server = "org.http4s" %% "http4s-blaze-server" % "0.23.0-RC1"
val client = "org.http4s" %% "http4s-blaze-client" % "0.23.0-RC1"
val dsl = "org.http4s" %% "http4s-dsl" % "0.23.0-RC1"


lazy val root = (project in file("."))
.settings(Seq(
name := "kamon-http4s",
scalaVersion := "2.13.1",
crossScalaVersions := Seq("2.12.11", "2.13.1")))
scalaVersion := "2.13.6",
crossScalaVersions := Seq("2.12.14", "2.13.6")))
.settings(resolvers += Resolver.bintrayRepo("kamon-io", "snapshots"))
.settings(resolvers += Resolver.mavenLocal)
.settings(scalacOptions ++= (CrossVersion.partialVersion(scalaVersion.value) match {
Expand Down
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.3.9
sbt.version=1.5.5
15 changes: 6 additions & 9 deletions src/main/scala/kamon/http4s/middleware/client/KamonSupport.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,13 +38,10 @@ object KamonSupport {

Kamon.onReconfigure(newConfig => _instrumentation = instrumentation(newConfig))


def apply[F[_]](underlying: Client[F])(implicit F:Sync[F]): Client[F] = Client { request =>

for {
ctx <- Resource.liftF(F.delay(Kamon.currentContext()))
k <- kamonClient(underlying)(request)(ctx)(_instrumentation)
} yield k
def apply[F[_]](underlying: Client[F])(implicit F: Sync[F]): Client[F] = Client { request =>
// this needs to run on the same thread as the caller, so can't be suspended in F
val ctx = Kamon.currentContext()
kamonClient(underlying)(request)(ctx)(_instrumentation)
}


Expand All @@ -54,9 +51,9 @@ object KamonSupport {
(instrumentation: HttpClientInstrumentation)
(implicit F:Sync[F]): Resource[F, Response[F]] =
for {
requestHandler <- Resource.liftF(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx)))
requestHandler <- Resource.eval(F.delay(instrumentation.createHandler(getRequestBuilder(request), ctx)))
response <- underlying.run(requestHandler.request).attempt
trackedResponse <- Resource.liftF(handleResponse(response, requestHandler))
trackedResponse <- Resource.eval(handleResponse(response, requestHandler))
} yield trackedResponse

def handleResponse[F[_]](
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ object KamonSupport {

private def getHandler[F[_]](instrumentation: HttpServerInstrumentation)(request: Request[F])(implicit F: Sync[F]): Resource[F, RequestHandler] =
for {
handler <- Resource.liftF(F.delay(instrumentation.createHandler(buildRequestMessage(request))))
handler <- Resource.eval(F.delay(instrumentation.createHandler(buildRequestMessage(request))))
_ <- processRequest(handler)
_ <- withContext(handler)
} yield handler
Expand Down
26 changes: 13 additions & 13 deletions src/main/scala/kamon/http4s/package.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,57 +3,57 @@ package kamon
import org.http4s.{Header, Headers, Request, Response, Status}
import kamon.instrumentation.http.HttpMessage
import kamon.instrumentation.http.HttpMessage.ResponseBuilder
import org.http4s.util.CaseInsensitiveString
import org.typelevel.ci.CIString

package object http4s {


def buildRequestMessage[F[_]](inner: Request[F]): HttpMessage.Request = new HttpMessage.Request {
override def url: String = inner.uri.toString()

override def path: String = inner.uri.path
override def path: String = inner.uri.path.renderString

override def method: String = inner.method.name

override def host: String = inner.uri.authority.map(_.host.value).getOrElse("")

override def port: Int = inner.uri.authority.flatMap(_.port).getOrElse(0)

override def read(header: String): Option[String] = inner.headers.get(CaseInsensitiveString(header)).map(_.value)
override def read(header: String): Option[String] = inner.headers.get(CIString(header)).map(_.head.value)

override def readAll(): Map[String, String] = {
val builder = Map.newBuilder[String, String]
inner.headers.foreach(h => builder += (h.name.value -> h.value))
inner.headers.foreach(h => builder += (h.name.toString -> h.value))
builder.result()
}
}

def errorResponseBuilder[F[_]]: HttpMessage.ResponseBuilder[Response[F]] = new ResponseBuilder[Response[F]] {
override def write(header: String, value: String): Unit = ()
override def statusCode: Int = 500
override def build(): Response[F] = new Response[F](status = Status.InternalServerError)
override def build(): Response[F] = Response[F](status = Status.InternalServerError)
}

//TODO both of these
def notFoundResponseBuilder[F[_]]: HttpMessage.ResponseBuilder[Response[F]] = new ResponseBuilder[Response[F]] {
private var _headers = Headers.empty

override def write(header: String, value: String): Unit =
_headers = _headers.put(Header(header, value))
_headers = _headers.put(Header.Raw(CIString(header), value))

override def statusCode: Int = 404
override def build(): Response[F] = new Response[F](status = Status.NotFound, headers = _headers)
override def build(): Response[F] = Response[F](status = Status.NotFound, headers = _headers)
}

def getResponseBuilder[F[_]](response: Response[F]) = new HttpMessage.ResponseBuilder[Response[F]] {
def getResponseBuilder[F[_]](response: Response[F]): ResponseBuilder[Response[F]] = new HttpMessage.ResponseBuilder[Response[F]] {
private var _headers = response.headers

override def statusCode: Int = response.status.code

override def build(): Response[F] = response.withHeaders(_headers)

override def write(header: String, value: String): Unit =
_headers = _headers.put(Header(header, value))
_headers = _headers.put(Header.Raw(CIString(header), value))
}


Expand All @@ -63,23 +63,23 @@ package object http4s {
override def build(): Request[F] = request.withHeaders(_headers)

override def write(header: String, value: String): Unit =
_headers = _headers.put(Header(header, value))
_headers = _headers.put(Header.Raw(CIString(header), value))

override def url: String = request.uri.toString()

override def path: String = request.uri.path
override def path: String = request.uri.path.renderString

override def method: String = request.method.name

override def host: String = request.uri.authority.map(_.host.value).getOrElse("")

override def port: Int = request.uri.authority.flatMap(_.port).getOrElse(0)

override def read(header: String): Option[String] = _headers.get(CaseInsensitiveString(header)).map(_.value)
override def read(header: String): Option[String] = _headers.get(CIString(header)).map(_.head.value)

override def readAll(): Map[String, String] = {
val builder = Map.newBuilder[String, String]
request.headers.foreach(h => builder += (h.name.value -> h.value))
request.headers.foreach(h => builder += (h.name.toString -> h.value))
builder.result()
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/test/scala/kamon/http4s/ClientInstrumentationSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,22 +16,22 @@

package kamon.http4s

import java.net.ConnectException

import cats.effect.unsafe.implicits.global
import cats.effect.{IO, Resource}
import kamon.Kamon
import kamon.http4s.middleware.client.KamonSupport
import kamon.tag.Lookups.{plain, plainLong}
import kamon.testkit.TestSpanReporter
import kamon.trace.Span
import org.http4s.{HttpRoutes, Response}
import org.http4s.client._
import org.http4s.dsl.io._
import org.http4s.implicits._
import org.http4s.{HttpRoutes, Response}
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar
import org.scalatest.{BeforeAndAfterAll, Matchers, OptionValues, WordSpec}
import kamon.tag.Lookups.{plainLong, plain}

import java.net.ConnectException

class ClientInstrumentationSpec extends WordSpec
with Matchers
Expand All @@ -57,7 +57,7 @@ class ClientInstrumentationSpec extends WordSpec
client.expect[String]("/tracing/ok").unsafeRunSync() shouldBe "ok"
}

eventually(timeout(2 seconds)) {
eventually(timeout(3 seconds)) {
val span = testSpanReporter().nextSpan().value

span.operationName shouldBe "/tracing/ok"
Expand All @@ -74,7 +74,7 @@ class ClientInstrumentationSpec extends WordSpec
"close and finish a span even if an exception is thrown by the client" in {
val okSpan = Kamon.spanBuilder("client-exception").start()
val client: Client[IO] = KamonSupport[IO](
Client(_ => Resource.liftF(IO.raiseError[Response[IO]](new ConnectException("Connection Refused."))))
Client(_ => Resource.eval(IO.raiseError[Response[IO]](new ConnectException("Connection Refused."))))
)

Kamon.runWithSpan(okSpan) {
Expand All @@ -83,7 +83,7 @@ class ClientInstrumentationSpec extends WordSpec
}
}

eventually(timeout(2 seconds)) {
eventually(timeout(3 seconds)) {
val span = testSpanReporter().nextSpan().value
span.operationName shouldBe "/tracing/ok"
span.kind shouldBe Span.Kind.Client
Expand All @@ -102,7 +102,7 @@ class ClientInstrumentationSpec extends WordSpec
client.expect[String]("/tracing/not-found").attempt.unsafeRunSync().isLeft shouldBe true
}

eventually(timeout(2 seconds)) {
eventually(timeout(3 seconds)) {
val span = testSpanReporter().nextSpan().value
span.operationName shouldBe "/tracing/not-found"
span.kind shouldBe Span.Kind.Client
Expand All @@ -124,7 +124,7 @@ class ClientInstrumentationSpec extends WordSpec
client.expect[String]("/tracing/error").attempt.unsafeRunSync().isLeft shouldBe true
}

eventually(timeout(2 seconds)) {
eventually(timeout(3 seconds)) {
val span = testSpanReporter().nextSpan().value

span.operationName shouldBe "/tracing/error"
Expand Down
36 changes: 16 additions & 20 deletions src/test/scala/kamon/http4s/HttpMetricsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,21 @@
package kamon.http4s

import cats.effect._
import cats.effect.unsafe.implicits.global
import cats.implicits._
import kamon.http4s.middleware.server.KamonSupport
import kamon.instrumentation.http.HttpServerMetrics
import kamon.testkit.InstrumentInspection
import org.http4s.HttpRoutes
import org.http4s.blaze.client.BlazeClientBuilder
import org.http4s.blaze.server.BlazeServerBuilder
import org.http4s.client.Client
import org.http4s.dsl.io._
import org.http4s.implicits._
import org.http4s.server.Server
import org.http4s.server.blaze.BlazeServerBuilder
import org.scalatest.concurrent.Eventually
import org.scalatest.time.SpanSugar
import org.scalatest.{Matchers, OptionValues, WordSpec}
import cats.implicits._
import kamon.http4s.middleware.server.KamonSupport
import kamon.instrumentation.http.HttpServerMetrics
import org.http4s.client.blaze.BlazeClientBuilder
import org.http4s.client.Client

import scala.concurrent.ExecutionContext
import org.http4s.implicits._

class HttpMetricsSpec extends WordSpec
with Matchers
Expand All @@ -42,11 +41,8 @@ class HttpMetricsSpec extends WordSpec
with OptionValues
{

implicit val contextShift: ContextShift[IO] = IO.contextShift(ExecutionContext.global)
implicit val timer: Timer[IO] = IO.timer(ExecutionContext.global)

val srv =
BlazeServerBuilder[IO]
BlazeServerBuilder[IO](global.compute)
.bindLocal(43567)
.withHttpApp(KamonSupport(HttpRoutes.of[IO] {
case GET -> Root / "tracing" / "ok" => Ok("ok")
Expand All @@ -56,16 +52,16 @@ class HttpMetricsSpec extends WordSpec
.resource

val client =
BlazeClientBuilder[IO](ExecutionContext.global).withMaxTotalConnections(10).resource
BlazeClientBuilder[IO](global.compute).withMaxTotalConnections(10).resource

val metrics =
Resource.liftF(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567)))
Resource.eval(IO(HttpServerMetrics.of("http4s.server", "/127.0.0.1", 43567)))


def withServerAndClient[A](f: (Server[IO], Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A =
def withServerAndClient[A](f: (Server, Client[IO], HttpServerMetrics.HttpServerInstruments) => IO[A]): A =
(srv, client, metrics).tupled.use(f.tupled).unsafeRunSync()

private def get[F[_]: Sync](path: String)(server: Server[F], client: Client[F]): F[String] = {
private def get[F[_]: Concurrent](path: String)(server: Server, client: Client[F]): F[String] = {
client.expect[String](s"http://127.0.0.1:${server.address.getPort}$path")
}

Expand All @@ -88,23 +84,23 @@ class HttpMetricsSpec extends WordSpec
"track the response time with status code 2xx" in withServerAndClient { (server, client, serverMetrics) =>
val requests: IO[Unit] = List.fill(100)(get("/tracing/ok")(server, client)).sequence_

val test = IO(serverMetrics.requestsSuccessful.value should be >= 0L)
val test = IO(serverMetrics.requestsSuccessful.value() should be >= 0L)

requests *> test
}

"track the response time with status code 4xx" in withServerAndClient { (server, client, serverMetrics) =>
val requests: IO[Unit] = List.fill(100)(get("/tracing/not-found")(server, client).attempt).sequence_

val test = IO(serverMetrics.requestsClientError.value should be >= 0L)
val test = IO(serverMetrics.requestsClientError.value() should be >= 0L)

requests *> test
}

"track the response time with status code 5xx" in withServerAndClient { (server, client, serverMetrics) =>
val requests: IO[Unit] = List.fill(100)(get("/tracing/error")(server, client).attempt).sequence_

val test = IO(serverMetrics.requestsServerError.value should be >= 0L)
val test = IO(serverMetrics.requestsServerError.value() should be >= 0L)

requests *> test
}
Expand Down
Loading
0