8000 [NU-2217] JSON parameter template instead of dynamic form for Kafka Sink by mateuszkp96 · Pull Request #8213 · TouK/nussknacker · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[NU-2217] JSON parameter template instead of dynamic form for Kafka Sink #8213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 13 commits into
base: staging
Choose a base branch
from
2 changes: 1 addition & 1 deletion .run/NussknackerApp.run.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<option name="INCLUDE_PROVIDED_SCOPE" value="true" />
<option name="MAIN_CLASS_NAME" value="pl.touk.nussknacker.ui.NussknackerApp" />
<module name="nussknacker.nussknacker-designer" />
<option name="VM_PARAMETERS" value="-Dconfig.override_with_env_vars=true -Dnussknacker.config.locations=../../../nussknacker-dist/src/universal/conf/dev-application.conf -Dlogback.configurationFile=../../../nussknacker-dist/src/universal/conf/logback.xml" />
<option name="VM_PARAMETERS" value="-Dconfig.override_with_env_vars=true -Dnussknacker.config.locations=../../../nussknacker-dist/src/universal/conf/dev-application.conf -Dlogback.configurationFile=../../../nussknacker-dist/src/universal/conf/logback.xml -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005" />
<option name="WORKING_DIRECTORY" value="$PROJECT_DIR$/designer/server/work" />
<extension name="coverage">
<pattern>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@ import com.typesafe.config.Config
import net.ceedubs.ficus.Ficus.toFicusConfig
import net.ceedubs.ficus.readers.AnyValReaders._
import net.ceedubs.ficus.readers.OptionReader._
import pl.touk.nussknacker.engine.ModelConfig.LiveDataPreviewMode
import pl.touk.nussknacker.engine.ModelConfig.{JsonLikeValuesEnteringMode, LiveDataPreviewMode}
import pl.touk.nussknacker.engine.api.namespaces.NamingStrategy

final case class ModelConfig(
allowEndingScenarioWithoutSink: Boolean,
jsonLikeValuesEnteringMode: JsonLikeValuesEnteringMode,
namingStrategy: NamingStrategy,
liveDataPreviewMode: LiveDataPreviewMode,
// TODO: we should parse this underlying config as ModelConfig class fields instead of passing raw config
Expand All @@ -24,6 +25,7 @@ object ModelConfig {
def parse(rawModelConfig: Config): ModelConfig = {
ModelConfig(
allowEndingScenarioWithoutSink = rawModelConfig.getOrElse[Boolean]("allowEndingScenarioWithoutSink", false),
jsonLikeValuesEnteringMode = parseJsonLikeValuesEnteringMode(rawModelConfig),
namingStrategy = NamingStrategy.fromConfig(rawModelConfig),
liveDataPreviewMode = parseLiveDataPreviewMode(rawModelConfig),
underlyingConfig = rawModelConfig,
Expand All @@ -43,6 +45,30 @@ object ModelConfig {

}

sealed trait JsonLikeValuesEnteringMode

object JsonLikeValuesEnteringMode {
case object DynamicForms extends JsonLikeValuesEnteringMode
case object SingleJsonTemplateParameter extends JsonLikeValuesEnteringMode
}

private def parseJsonLikeValuesEnteringMode(config: Config): JsonLikeValuesEnteringMode = {
val configPath = "jsonLikeValuesEnteringMode"
if (config.hasPath(configPath)) {
val mode = config.getString(configPath)
mode match {
case "DynamicForms" => JsonLikeValuesEnteringMode.DynamicForms
case "SingleJsonTemplateParameter" => JsonLikeValuesEnteringMode.SingleJsonTemplateParameter
case other =>
throw new IllegalArgumentException(
s"Invalid jsonLikeValuesEnteringMode ${other}. Supported are [DynamicForms, SingleJsonTemplateParameter]"
)
}
} else {
JsonLikeValuesEnteringMode.DynamicForms
}
}

private def parseLiveDataPreviewMode(config: Config): LiveDataPreviewMode = {
if (config.getOrElse("liveDataPreview.enabled", false)) {
LiveDataPreviewMode.Enabled(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,22 +1,38 @@
package pl.touk.nussknacker.engine.api

import pl.touk.nussknacker.engine.api.Params.ParamExtractionResult
import pl.touk.nussknacker.engine.api.parameter.ParameterName
import pl.touk.nussknacker.engine.util.Implicits.RichScalaMap

final class Params private (val nameToRawValueMap: Map[ParameterName, Any]) extends Serializable {

def isPresent(name: ParameterName): Boolean = nameToRawValueMap.contains(name)
def extractParam[T](name: ParameterName): ParamExtractionResult[T] = {
extractValue(name) match {
case ParamExtractionResult.Value(value) => ParamExtractionResult.Value(cast(value))
case ParamExtractionResult.ParamValueIsNone => ParamExtractionResult.ParamValueIsNone
case ParamExtractionResult.MissingParam => ParamExtractionResult.MissingParam
}
}

def extract[T](name: ParameterName): Option[T] = {
extractValue(name).map(cast[T])
def extractRequiredParam[T](name: ParameterName): Option[T] = {
extractParam[T](name) match {
case ParamExtractionResult.MissingParam =>
throw new IllegalArgumentException(cannotFindParamNameMessage(name))
case ParamExtractionResult.ParamValueIsNone => None
case ParamExtractionResult.Value(value) => Some(value)
}
}

def extractUnsafe[T](name: ParameterName): T =
extract[T](name)
.getOrElse(throw new IllegalArgumentException(paramValueIsNoneMessage(name)))
def extractRequiredParamUnsafe[T](name: ParameterName): T =
extractParam[T](name) match {
case ParamExtractionResult.MissingParam =>
throw new IllegalArgumentException(cannotFindParamNameMessage(name))
case ParamExtractionResult.ParamValueIsNone => throw new IllegalArgumentException(paramValueIsNoneMessage(name))
case ParamExtractionResult.Value(value) => value
}

def extractOrEvaluateLazyParam[T](name: ParameterName, context: Context): Option[T] = {
extractValue(name)
extractRequiredParam[T](name)
.map {
case lazyParameter: LazyParameter[_] => lazyParameter.evaluate(context)
case other => other
Expand All @@ -29,11 +45,11 @@ final class Params private (val nameToRawValueMap: Map[ParameterName, Any]) exte
.getOrElse(throw new IllegalArgumentException(paramValueIsNoneMessage(name)))
}

private def extractValue(paramName: ParameterName) = {
private def extractValue(paramName: ParameterName): ParamExtractionResult[Any] = {
nameToRawValueMap.get(paramName) match {
case None => throw new IllegalStateException(cannotFindParamNameMessage(paramName))
case Some(null) => None
case Some(value) => Some(value)
case None => ParamExtractionResult.MissingParam
case Some(null) => ParamExtractionResult.ParamValueIsNone
case Some(value) => ParamExtractionResult.Value(value)
}
}

Expand Down Expand Up @@ -67,4 +83,12 @@ object Params {

lazy val empty: Params = new Params(Map.empty)

sealed trait ParamExtractionResult[+T]

object ParamExtractionResult {
final case class Value[+T](value: T) extends ParamExtractionResult[T]
case object ParamValueIsNone extends ParamExtractionResult[Nothing]
case object MissingParam extends ParamExtractionResult[Nothing]
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object ParameterExtractor {
) extends ParameterExtractor[PARAMETER_VALUE_TYPE] {

override def extractValue(params: Params): Option[PARAMETER_VALUE_TYPE] =
params.extract[PARAMETER_VALUE_TYPE](parameterName)
params.extractRequiredParam[PARAMETER_VALUE_TYPE](parameterName)

override private[definition] def createBase: Parameter =
Parameter[PARAMETER_VALUE_TYPE](parameterName)
Expand All @@ -101,7 +101,7 @@ object ParameterExtractor {
) extends ParameterExtractor[Map[String, PARAMETER_VALUE_TYPE]] {

override def extractValue(params: Params): Option[Map[String, PARAMETER_VALUE_TYPE]] =
params.extract[Map[String, PARAMETER_VALUE_TYPE]](parameterName)
params.extractRequiredParam[Map[String, PARAMETER_VALUE_TYPE]](parameterName)

override private[definition] def createBase: Parameter =
Parameter[PARAMETER_VALUE_TYPE](parameterName)
Expand All @@ -114,7 +114,7 @@ object ParameterExtractor {
) extends ParameterExtractor[LazyParameter[PARAMETER_VALUE_TYPE]] {

override def extractValue(params: Params): Option[LazyParameter[PARAMETER_VALUE_TYPE]] =
params.extract[LazyParameter[PARAMETER_VALUE_TYPE]](parameterName)
params.extractRequiredParam[LazyParameter[PARAMETER_VALUE_TYPE]](parameterName)

override private[definition] def createBase: Parameter =
Parameter[PARAMETER_VALUE_TYPE](parameterName)
Expand All @@ -127,7 +127,7 @@ object ParameterExtractor {
) extends ParameterExtractor[Map[String, LazyParameter[PARAMETER_VALUE_TYPE]]] {

override def extractValue(params: Params): Option[Map[String, LazyParameter[PARAMETER_VALUE_TYPE]]] =
params.extract[Map[String, LazyParameter[PARAMETER_VALUE_TYPE]]](parameterName)
params.extractRequiredParam[Map[String, LazyParameter[PARAMETER_VALUE_TYPE]]](parameterName)

override private[definition] def createBase: Parameter =
Parameter[PARAMETER_VALUE_TYPE](parameterName)
Expand All @@ -140,7 +140,7 @@ object ParameterExtractor {
) extends ParameterExtractor[PARAMETER_VALUE_TYPE] {

override def extractValue(params: Params): Option[PARAMETER_VALUE_TYPE] = {
params.extract[PARAMETER_VALUE_TYPE](parameterName)
params.extractRequiredParam[PARAMETER_VALUE_TYPE](parameterName)
}

override private[definition] def createBase: Parameter =
Expand All @@ -152,7 +152,7 @@ object ParameterExtractor {
) extends ParameterExtractor[LazyParameter[PARAMETER_VALUE_TYPE]] {

override def extractValue(params: Params): Option[LazyParameter[PARAMETER_VALUE_TYPE]] =
params.extract[LazyParameter[PARAMETER_VALUE_TYPE]](parameterName)
params.extractRequiredParam[LazyParameter[PARAMETER_VALUE_TYPE]](parameterName)

override private[definition] def createBase: Parameter =
Parameter
Expand All @@ -166,7 +166,7 @@ object ParameterExtractor {
) extends ParameterExtractor[Map[String, PARAMETER_VALUE_TYPE]] {

override def extractValue(params: Params): Option[Map[String, PARAMETER_VALUE_TYPE]] =
params.extract[Map[String, PARAMETER_VALUE_TYPE]](parameterName)
params.extractRequiredParam[Map[String, PARAMETER_VALUE_TYPE]](parameterName)

override private[definition] def createBase: Parameter =
Parameter
Expand All @@ -180,7 +180,7 @@ object ParameterExtractor {
) extends ParameterExtractor[Map[String, LazyParameter[PARAMETER_VALUE_TYPE]]] {

override def extractValue(params: Params): Option[Map[String, LazyParameter[PARAMETER_VALUE_TYPE]]] =
params.extract[Map[String, LazyParameter[PARAMETER_VALUE_TYPE]]](parameterName)
params.extractRequiredParam[Map[String, LazyParameter[PARAMETER_VALUE_TYPE]]](parameterName)

override private[definition] def createBase: Parameter = {
Parameter
Expand Down
CEB7
Original file line number Diff line number Diff line change
@@ -1,25 +1,34 @@
package pl.touk.nussknacker.engine.api.json.decoders

import io.circe.Json
import io.circe.{Json, JsonNumber}
import pl.touk.nussknacker.engine.util.Implicits._

import scala.collection.immutable.ListMap
import scala.jdk.CollectionConverters._

object FromJsonSimpleDecoder {

def jsonToAny(json: Json): Any = json.fold(
jsonNull = null,
jsonBoolean = identity[Boolean],
jsonNumber = jsonNumber =>
// we pick the narrowest type as possible to reduce the amount of memory and computations overheads
jsonNumber.toInt orElse
jsonNumber.toLong orElse
// We prefer java big decimal over float/double
jsonNumber.toBigDecimal.map(_.bigDecimal)
getOrElse (throw new IllegalArgumentException(s"Not supported json number: $jsonNumber")),
jsonNumber = toNumber,
jsonString = identity[String],
jsonArray = _.map(jsonToAny).asJava,
jsonObject = _.toMap.mapValuesNow(jsonToAny).asJava
jsonObject = obj =>
ListMap(
obj.toIterable.toList.map { case (key, value) =>
key -> jsonToAny(value)
}: _*
).asJava
)

private def toNumber(jsonNumber: JsonNumber): Any = {
// we pick the narrowest type as possible to reduce the amount of memory and computations overheads
(jsonNumber.toInt orElse
jsonNumber.toLong orElse
// We prefer java big decimal over float/double
jsonNumber.toBigDecimal.map(_.bigDecimal))
.getOrElse(throw new IllegalArgumentException(s"Not supported json number: $jsonNumber"))
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -390,7 +390,7 @@ object typing {
val fieldTypes = typeMapFields(map)
Typed.record(fieldTypes, mapBasedRecordUnderlyingType[Map[_, _]](fieldTypes))
case javaMap: java.util.Map[String @unchecked, _] =>
val fieldTypes = typeMapFields(javaMap.asScala.toMap)
val fieldTypes = typeMapFields(javaMap.asScala)
Typed.record(fieldTypes)
case list: List[_] =>
genericTypeClass(classOf[List[_]], List(supertypeOfElementTypes(list)))
Expand All @@ -417,7 +417,7 @@ object typing {
}
}

private def typeMapFields(map: Map[String, Any]) = map.map { case (k, v) =>
private def typeMapFields(iterable: Iterable[(String, Any)]) = iterable.map { case (k, v) =>
k -> fromInstance(v)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,8 +268,8 @@ object SchemalessKafkaJsonTypeTests {
.emptySink(
"end",
"kafka",
"Key" -> "".spel,
"Raw editor" -> "true".spel,
"Key" -> "".spel,
// "Raw editor" -> "true".spel,
"Value" -> "#input".spel,
"Topic" -> s"'$sinkTopicName'".spel,
"Content type" -> "'JSON'".spel,
Expand Down
4 changes: 4 additions & 0 deletions docs/Changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,10 @@ description: Stay informed with detailed changelogs covering new features, impro
disabled: { oneOfUserRoles: ["AllPermissions"], type: "allof" } // Disabled, when user has role `AllPermissions`
}
```
* [#8213](https://github.com/TouK/nussknacker/pull/8213) Added functionality to replace dynamic forms with a single parameter with a JSON template
* set `modelConfig.jsonLikeValuesEnteringMode` of the scenarioType in the `scenarioTypes` config section to `SingleJsonTemplateParameter` in order to enable single param editor
* the flag is optional, the default value of the flag is `DynamicForms` (no changes in behavior)
* currently supported for Kafka Sinks only.

## 1.18

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ object SpelTemplatePartsCustomTransformer
finalState: Option[Unit]
): FlinkCustomStreamTransformation = {
val templateLazyParam: LazyParameter[TemplateEvaluationResult] =
params.extractUnsafe[LazyParameter[TemplateEvaluationResult]](spelTemplateParameterName)
params.extractRequiredParamUnsafe[LazyParameter[TemplateEvaluationResult]](spelTemplateParameterName)
FlinkCustomStreamTransformation {
(dataStream: DataStream[Context], flinkCustomNodeContext: FlinkCustomNodeContext) =>
dataStream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,10 @@ class SingleSideJoinTransformer(
val branchTypeByBranchId: Map[String, BranchType] = BranchTypeParamDeclaration.extractValueUnsafe(params)
val keyByBranchId: Map[String, LazyParameter[CharSequence]] = KeyParamDeclaration.extractValueUnsafe(params)
val aggregator: Aggregator = AggregatorParamDeclaration.extractValueUnsafe(params)
val window: Duration = WindowLengthParamDeclaration.extractValueUnsafe(params)
val aggregateBy: LazyParameter[AnyRef] = params.extractUnsafe[LazyParameter[AnyRef]](AggregateByParamName)
val outputType = aggregator.computeOutputTypeUnsafe(aggregateBy.returnType)
val window: Duration = WindowLengthParamDeclaration.extractValueUnsafe(params)
val aggregateBy: LazyParameter[AnyRef] =
params.extractRequiredParamUnsafe[LazyParameter[AnyRef]](AggregateByParamName)
val outputType = aggregator.computeOutputTypeUnsafe(aggregateBy.returnType)

new FlinkCustomJoinTransformation with Serializable {
override def transform(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ object TableJoinComponent
)
)

val outputLazyParam = params.extractUnsafe[LazyParameter[AnyRef]](OutputParamName)
val outputLazyParam = params.extractRequiredParamUnsafe[LazyParameter[AnyRef]](OutputParamName)
val outputTypeInfo =
flinkNodeContext.valueWithContextInfo.forBranch[AnyRef](mainBranchId, outputLazyParam.returnType)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ object LastVariableFilterTransformer
finalState: Option[State]
): FlinkCustomStreamTransformation = {
val value = valueParameterDeclaration.extractValueUnsafe(params)
val condition = params.extractUnsafe[LazyParameter[java.lang.Boolean]](conditionParameterName)
val condition = params.extractRequiredParamUnsafe[LazyParameter[java.lang.Boolean]](conditionParameterName)
val groupBy = groupByParameterDeclaration.extractValueUnsafe(params)

FlinkCustomStreamTransformation((str: DataStream[Context], ctx: FlinkCustomNodeContext) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import org.apache.kafka.clients.consumer.ConsumerRecord
import org.scalatest.Assertion
import org.scalatest.funsuite.AnyFunSuite
import org.scalatest.matchers.should.Matchers
import pl.touk.nussknacker.engine.ModelConfig.JsonLikeValuesEnteringMode
import pl.touk.nussknacker.engine.ModelData
import pl.touk.nussknacker.engine.api._
import pl.touk.nussknacker.engine.api.context.{ProcessCompilationError, ValidationContext}
Expand Down Expand Up @@ -106,12 +107,14 @@ trait KafkaAvroSpecMixin
)
}

protected lazy val universalSinkFactory: UniversalKafkaSinkFactory = {
protected def universalSinkFactory(
jsonLikeValuesEnteringMode: JsonLikeValuesEnteringMode
): UniversalKafkaSinkFactory = {
val universalPayload = UniversalSchemaBasedSerdeProvider.create(schemaRegistryClientFactory, kafkaConfig)
new UniversalKafkaSinkFactory(
schemaRegistryClientFactory,
universalPayload,
testModelConfig,
testModelConfig.copy(jsonLikeValuesEnteringMode = jsonLikeValuesEnteringMode),
kafkaConfig,
FlinkKafkaUniversalSinkImplFactory
)
Expand Down
Loading
Loading
0