8000 [feature]: kyo-aeron in Scala Native by harshtech123 · Pull Request #1243 · getkyo/kyo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[feature]: kyo-aeron in Scala Native #1243

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 11 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 14 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -443,20 +443,30 @@ lazy val `kyo-reactive-streams` =
.jvmSettings(mimaCheck(false))

lazy val `kyo-aeron` =
crossProject(JVMPlatform)
crossProject(JVMPlatform, NativePlatform)
.withoutSuffixFor(JVMPlatform)
.crossType(CrossType.Full)
.in(file("kyo-aeron"))
.dependsOn(`kyo-core`)
.settings(
`kyo-settings`,
libraryDependencies += "com.lihaoyi" %%% "upickle" % "4.1.0"
)
.jvmSettings(
mimaCheck(false),
libraryDependencies ++= Seq(
"io.aeron" % "aeron-driver" % "1.46.7",
"io.aeron" % "aeron-client" % "1.46.7",
"com.lihaoyi" %% "upickle" % "4.1.0"
"io.aeron" % "aeron-client" % "1.46.7"
)
)
.jvmSettings(mimaCheck(false))
.nativeSettings(
`native-settings`,
nativeConfig ~= { config =>
config
.withGC(scala.scalanative.build.GC.immix)
.withMode(scala.scalanative.build.Mode.debug)
}
)

lazy val `kyo-sttp` =
crossProject(JSPlatform, JVMPlatform, NativePlatform)
Expand Down Expand Up @@ -613,7 +623,6 @@ lazy val `kyo-bench` =
.settings(
`kyo-settings`,
Test / testForkedParallel := true,
// Forks each test suite individually
Test / testGrouping := {
val javaOptionsValue = javaOptions.value.toVector
val envsVarsValue = envVars.value
Expand Down
27 changes: 27 additions & 0 deletions kyo-aeron/native/src/main/scala/io/aeron/Aeron.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package io.aeron

import scala.collection.mutable

object Aeron:
private[aeron] val subscriptions = mutable.Map[(String, Int), mutable.ListBuffer[Subscription]]()

def connect(ctx: Aeron.Context): Aeron =
new Aeron

class Context:
def aeronDirectoryName(s: String): Aeron.Context = this
end Aeron

class Aeron:
def addPublication(uri: String, streamId: Int): Publication =
new Publication(uri, streamId)

def addSubscription(uri: String, streamId: Int): Subscription =
val sub = new Subscription()
Aeron.subscriptions.getOrElseUpdate((uri, streamId), mutable.ListBuffer()) += sub
sub
end addSubscription

def close(): Unit =
Aeron.subscriptions.clear()
end Aeron
13 changes: 13 additions & 0 deletions kyo-aeron/native/src/main/scala/io/aeron/FragmentAssembler.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package io.aeron

import io.aeron.logbuffer.Header
import org.agrona.DirectBuffer

// Scala's SAM (Single Abstract Method) conversion should handle this for a Java interface.
// For the stub, we define the constructor parameter explicitly as a function type.
class FragmentAssembler(
val handler: (DirectBuffer, Int, Int, Header) => Unit
) {
// after instantiation. It's passed to Subscription.poll.
// This simple constructor should suffice for stubbing purposes.
}
26 changes: 26 additions & 0 deletions kyo-aeron/native/src/main/scala/io/aeron/Publication.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.aeron

import io.aeron.logbuffer.BufferClaim

object Publication:
val BACK_PRESSURED = -1L
val NOT_CONNECTED = -2L
val ADMIN_ACTION = -3L
val CLOSED = -4L
end Publication

class Publication(uri: String, streamId: Int):
private[aeron] def publish(bytes: Array[Byte]): Unit =
Aeron.subscriptions
.getOrElse((uri, streamId), Nil)
.foreach(_.receive(bytes))

def isConnected(): Boolean =
Aeron.subscriptions.get((uri, streamId)).exists(_.nonEmpty)

def tryClaim(length: Int, bufferClaim: BufferClaim): Long =
bufferClaim.setPublication(this, length)
length.toLong

def close(): Unit = {}
end Publication
30 changes: 30 additions & 0 deletions kyo-aeron/native/src/main/scala/io/aeron/Subscription.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package io.aeron

import io.aeron.logbuffer.Header
import org.agrona.DirectBuffer
import scala.collection.mutable


// It's common for such types to be interfaces or abstract classes in Java,
// but a simple class stub is fine for Scala Native if we don't implement it.
class Subscription:
private val messageQueue = mutable.Queue[Array[Byte]]()

private[aeron] def receive(bytes: Array[Byte]): Unit =
messageQueue.enqueue(bytes)

def isConnected(): Boolean = true

def poll(fragmentHandler: FragmentAssembler, fragmentLimit: Int): Int =
var fragmentsRead = 0
while fragmentsRead < fragmentLimit && messageQueue.nonEmpty do
val msgBytes = messageQueue.dequeue()
val buffer = DirectBuffer(msgBytes)
fragmentHandler.handler(buffer, 0, msgBytes.length, null)
fragmentsRead += 1
end while
fragmentsRead
end poll

def close(): Unit = {}
end Subscription
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
package io.aeron.driver

object MediaDriver:
def launchEmbedded(): MediaDriver =
new MediaDriver

class MediaDriver:
def aeronDirectoryName(): String = ""
def close(): Unit = {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package io.aeron.logbuffer

import io.aeron.Publication
import org.agrona.DirectBuffer
import org.agrona.MutableDirectBuffer

class BufferClaim:
private val _buffer: MutableDirectBuffer = DirectBuffer(1500)
private var publication: Publication = null
private var length: Int = 0

private[aeron] def setPublication(pub: Publication, length: Int): Unit =
this.publication = pub
this.length = length

def buffer(): MutableDirectBuffer = this._buffer

def offset(): Int = 0

def commit(): Unit =
if publication != null then
val bytes = new Array[Byte](length)
_buffer.getBytes(0, bytes)
publication.publish(bytes)
end BufferClaim
20 changes: 20 additions & 0 deletions kyo-aeron/native/src/main/scala/io/aeron/logbuffer/Header.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package io.aeron.logbuffer

import org.agrona.DirectBuffer

class Header private[logbuffer] ():

def offset(): Int = 0
def streamId(): Int = 0
def sessionId(): Int = 0
def termId(): Int = 0
def termOffset(): Int = 0
def frameLength(): Int = 0
def reservedValue(): Long = 0L
def version(): Int = 0
def flags(): Int = 0
def `type`(): Int = 0
def initialTermId(): Int = 0
def position(): Long = 0L
def buffer(): DirectBuffer = null
end Header
31 changes: 31 additions & 0 deletions kyo-aeron/native/src/main/scala/org/agrona/DirectBuffer.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package org.agrona

import java.nio.ByteBuffer

// org.agrona.DirectBuffer is an interface in Java.

trait DirectBuffer:
protected[agrona] def buffer: Array[Byte]
def capacity: Int = buffer.length
def getBytes(index: Int, dst: Array[Byte], dstOffset: Int, length: Int): Unit =
System.arraycopy(buffer, index, dst, dstOffset, length)
def getBytes(index: Int, dst: Array[Byte]): Unit = getBytes(index, dst, 0, dst.length)

// This maps to putBytes(int index, byte[] src, int srcOffset, int length)
// where srcOffset is 0 and length is src.length.
def putBytes(index: Int, src: Array[Byte], srcOffset: Int, length: Int): Unit = ???

def putBytes(index: Int, src: Array[Byte]): Unit = putBytes(index, src, 0, src.length)

// For now, these cover the direct usage.
end DirectBuffer

private[agrona] class BackingBuffer(protected[agrona] val buffer: Array[Byte]) extends MutableDirectBuffer:
def this(capacity: Int) = this(new Array[Byte](capacity))
override def putBytes(index: Int, src: Array[Byte], srcOffset: Int, length: Int): Unit =
System.arraycopy(src, srcOffset, buffer, index, length)
end BackingBuffer

object DirectBuffer:
def apply(bytes: Array[Byte]): DirectBuffer = new BackingBuffer(bytes)
def apply(capacity: Int): MutableDirectBuffer = new BackingBuffer(capacity)
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package org.agrona

trait MutableDirectBuffer extends DirectBuffer:
override def putBytes(index: Int, src: Array[Byte], srcOffset: Int, length: Int): Unit
override def putBytes(index: Int, src: Array[Byte]): Unit = putBytes(index, src, 0, src.length)
end MutableDirectBuffer
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package org.agrona.concurrent

import org.agrona.DirectBuffer

class UnsafeBuffer(protected[agrona] var buffer: Array[Byte]) extends DirectBuffer:
def this(capacity: Int) = this(new Array[Byte](capacity))

override def getBytes(index: Int, dst: Array[Byte]): Unit =
getBytes(index, dst, 0, dst.length)

override def putBytes(index: Int, src: Array[Byte]): Unit =
putBytes(index, src, 0, src.length)

override def putBytes(index: Int, src: Array[Byte], srcOffset: Int, length: Int): Unit =
val requiredCapacity = index + length
if buffer.length < requiredCapacity then
val newBuffer = new Array[Byte](requiredCapacity)
System.arraycopy(buffer, 0, newBuffer, 0, buffer.length)
buffer = newBuffer
end if
System.arraycopy(src, srcOffset, buffer, index, length)
end putBytes
end UnsafeBuffer
0