8000 [Processor] Add enrichment and validation for async processing by rokatyy · Pull Request #3532 · nuclio/nuclio · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[Processor] Add enrichment and validation for async processing #3532

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 12, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 42 additions & 1 deletion pkg/functionconfig/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,8 @@ type Trigger struct {
// General attributes
Attributes map[string]interface{} `json:"attributes,omitempty"`

Mode TriggerWorkMode `json:"mode,omitempty"`
Mode TriggerWorkMode `json:"mode,omitempty"`
AsyncConfig *AsyncConfig `json:"async,omitempty"`

// Deprecated: MaxWorkers is replaced by NumWorkers, and will be removed in 1.15.x
// TODO: remove in 1.15.x
Expand All @@ -115,6 +116,21 @@ type BatchConfiguration struct {
Timeout string `json:"timeout,omitempty"`
}

type AsyncConfig struct {
MinConnectionsNumber int `json:"minConnectionsNumber,omitempty"`
MaxConnectionsNumber int `json:"maxConnectionsNumber,omitempty"`
ConnectionCreationMode ConnectionCreationMode `json:"connectionCreationMode,omitempty"`
}

type ConnectionCreationMode string

const (
ConnectionCreationModeStatic ConnectionCreationMode = "static"
ConnectionCreationModeDynamic ConnectionCreationMode = "dynamic"
Comment on lines +125 to +129
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a prep for lazy connection creation?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.


DefaultMaxConnectionsNumber = 1000
)

type BatchMode string

const (
Expand All @@ -140,6 +156,13 @@ var runtimesSupportBatching = []string{
"python",
}

var triggerKindsSupportAsync = []string{
"http",
}
var runtimesSupportAsync = []string{
"python",
}

func TriggerKindSupportsBatching(triggerKind string) bool {
for _, supportedKind := range triggerKindsSupportBatching {
if triggerKind == supportedKind {
Expand All @@ -158,6 +181,24 @@ func RuntimeSupportsBatching(runtime string) bool {
return false
}

func TriggerKindSupportsAsync(triggerKind string) bool {
for _, supportedKind := range triggerKindsSupportAsync {
if triggerKind == supportedKind {
return true
}
}
Comment on lines +185 to +189
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not:

return strings.Contains(triggerKind, triggerKindsSupportAsync)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

trigger is always the same (we don't have any versions there), so it should be fine.

return false
}

func RuntimeSupportsAsync(runtime string) bool {
for _, supportedRuntime := range runtimesSupportAsync {
if strings.Contains(runtime, supportedRuntime) {
return true
}
}
return false
}

type ExplicitAckMode string

const (
Expand Down
223 changes: 165 additions & 58 deletions pkg/platform/abstract/platform.go
Original file line number Diff line number Diff line change
Expand Up @@ -1626,10 +1626,6 @@ func (ap *Platform) validateTriggers(functionConfig *functionconfig.Config) erro
return errors.Wrap(err, "Ingresses validation failed")
}

if err := ap.validateBatchConfiguration(functionConfig); err != nil {
return nuclio.WrapErrBadRequest(err)
}

for triggerKey, triggerInstance := range functionConfig.Spec.Triggers {

// do not allow trigger with empty name
Expand Down Expand Up @@ -1698,35 +1694,75 @@ func (ap *Platform) validateTriggers(functionConfig *functionconfig.Config) erro
}
}
}

if err := ap.validateBatchConfiguration(triggerInstance, functionConfig); err != nil {
return nuclio.WrapErrBadRequest(err)
}

if err := ap.validateProcessingMode(triggerInstance, functionConfig); err != nil {
return nuclio.WrapErrBadRequest(err)
}
}

return nil
}

func (ap *Platform) validateBatchConfiguration(functionConfig *functionconfig.Config) error {
func (ap *Platform) validateBatchConfiguration(triggerInstance functionconfig.Trigger, functionConfig *functionconfig.Config) error {

for _, triggerInstance := range functionConfig.Spec.Triggers {
if triggerInstance.Batch == nil {
continue
}
if functionconfig.BatchModeEnabled(triggerInstance.Batch) &&
!functionconfig.TriggerKindSupportsBatching(triggerInstance.Kind) {
ap.Logger.WarnWith("Batching is not supported for given trigger kind - batching configuration is ignored",
"triggerKind", triggerInstance.Kind)
}
if functionconfig.BatchModeEnabled(triggerInstance.Batch) &&
!functionconfig.RuntimeSupportsBatching(functionConfig.Spec.Runtime) {
ap.Logger.WarnWith("Batching is not supported for given runtime - batching configuration is ignored",
"runtime", functionConfig.Spec.Runtime)
}
if triggerInstance.Batch == nil {
return nil
}
if functionconfig.BatchModeEnabled(triggerInstance.Batch) &&
!functionconfig.TriggerKindSupportsBatching(triggerInstance.Kind) {
ap.Logger.WarnWith("Batching is not supported for given trigger kind - batching configuration is ignored",
"triggerKind", triggerInstance.Kind)
}
if functionconfig.BatchModeEnabled(triggerInstance.Batch) &&
!functionconfig.RuntimeSupportsBatching(functionConfig.Spec.Runtime) {
ap.Logger.WarnWith("Batching is not supported for given runtime - batching configuration is ignored",
"runtime", functionConfig.Spec.Runtime)
}

if triggerInstance.Batch.BatchSize <= 0 {
return nuclio.NewErrBadRequest("Batch size should be 1 or higher")
}
if _, err := time.ParseDuration(triggerInstance.Batch.Timeout); err != nil {
return nuclio.NewErrBadRequest(fmt.Sprintf("Batching timeout validation failed. Error: %s", err.Error()))
if triggerInstance.Batch.BatchSize <= 0 {
return nuclio.NewErrBadRequest("Batch size should be 1 or higher")
}
if _, err := time.ParseDuration(triggerInstance.Batch.Timeout); err != nil {
return nuclio.NewErrBadRequest(fmt.Sprintf("Batching timeout validation failed. Error: %s", err.Error()))
}
return nil
}

func (ap *Platform) validateProcessingMode(triggerInstance functionconfig.Trigger, functionConfig *functionconfig.Config) error {

if triggerInstance.Mode == functionconfig.SyncTriggerWorkMode {

if triggerInstance.AsyncConfig != nil {
return nuclio.NewErrBadRequest("AsyncConfig should be empty when working in `sync` trigger mode")
}
return nil
}

// Async validations
if !functionconfig.TriggerKindSupportsAsync(triggerInstance.Kind) {
return nuclio.NewErrBadRequest(fmt.Sprintf(
"Async processing mode is not supported for trigger kind - %s",
triggerInstance.Kind))
}

if !functionconfig.RuntimeSupportsAsync(functionConfig.Spec.Runtime) {
return nuclio.NewErrBadRequest(fmt.Sprintf(
"Async processing mode is not supported for runtime - %s",
functionConfig.Spec.Runtime))
}

if triggerInstance.AsyncConfig.MaxConnectionsNumber < triggerInstance.AsyncConfig.MinConnectionsNumber {
return nuclio.NewErrBadRequest(fmt.Sprintf(
"Maximum connection number configuration can't be smaller than minimal. "+
"MaxConnectionsNumber: %d, MinConnectionsNumber: %d", triggerInstance.AsyncConfig.MaxConnectionsNumber,
triggerInstance.AsyncConfig.MaxConnectionsNumber,
))
}

return nil
}

Expand Down Expand Up @@ -1779,21 +1815,13 @@ func (ap *Platform) enrichTriggers(ctx context.Context, functionConfig *function
return errors.Wrap(err, "Failed to enrich explicit ack params")
}

if err := ap.enrichBatchParams(ctx, functionConfig); err != nil {
return errors.Wrap(err, "Failed to enrich batch params")
}

for triggerName, triggerInstance := range functionConfig.Spec.Triggers {

// if name was not given, inherit its key
if triggerInstance.Name == "" {
triggerInstance.Name = triggerName
}

if triggerInstance.Mode == "" {
triggerInstance.Mode = functionconfig.SyncTriggerWorkMode
}

// replace deprecated MaxWorkers with NumWorkers
// TODO: remove in 1.15.x
// nolint: staticcheck
Expand All @@ -1808,6 +1836,17 @@ func (ap *Platform) enrichTriggers(ctx context.Context, functionConfig *function
triggerInstance.NumWorkers = 1
}
}
if err := ap.enrichExplicitAckParams(ctx, functionConfig); err != nil {
return errors.Wrap(err, "Failed to enrich explicit ack params")
}

if err := ap.enrichBatchParams(ctx, triggerName, &triggerInstance, functionConfig); err != nil {
return errors.Wrap(err, "Failed to enrich batch params")
}

if err := ap.enrichProcessingMode(ctx, triggerName, &triggerInstance, functionConfig); err != nil {
return errors.Wrap(err, "Failed to enrich processing mode")
}

functionConfig.Spec.Triggers[triggerName] = triggerInstance
}
Expand Down Expand Up @@ -1847,39 +1886,107 @@ func (ap *Platform) enrichExplicitAckParams(ctx context.Context, functionConfig
return nil
}

func (ap *Platform) enrichBatchParams(ctx context.Context, functionConfig *functionconfig.Config) error {
for triggerName, triggerInstance := range functionConfig.Spec.Triggers {
// skip batch configuration enrichment if it wasn't set
if triggerInstance.Batch == nil {
continue
}
// if batch mode is enabled, check batching parameters
if functionconfig.BatchModeEnabled(triggerInstance.Batch) {
ap.Logger.DebugWithCtx(ctx, "Enriching batch params for function trigger",
func (ap *Platform) enrichBatchParams(
ctx context.Context,
triggerName string,
triggerInstance *functionconfig.Trigger,
functionConfig *functionconfig.Config) error {
// skip batch configuration enrichment if it wasn't set
if triggerInstance.Batch == nil {
return nil
}
// if batch mode is enabled, check batching parameters
if functionconfig.BatchModeEnabled(triggerInstance.Batch) {
ap.Logger.DebugWithCtx(ctx, "Enriching batch params for function trigger",
"functionName", functionConfig.Meta.Name,
"trigger", triggerName)
// if batch size isn't set, set it to default
if triggerInstance.Batch.BatchSize == 0 {
ap.Logger.DebugWithCtx(ctx, "Enriching batch size for function trigger",
"functionName", functionConfig.Meta.Name,
"trigger", triggerName)
// if batch size isn't set, set it to default
if triggerInstance.Batch.BatchSize == 0 {
ap.Logger.DebugWithCtx(ctx, "Enriching batch size for function trigger",
"functionName", functionConfig.Meta.Name,
"trigger", triggerName,
"batchSize", functionconfig.DefaultBatchSize)
triggerInstance.Batch.BatchSize = functionconfig.DefaultBatchSize
}
"trigger", triggerName,
"batchSize", functionconfig.DefaultBatchSize)
triggerInstance.Batch.BatchSize = functionconfig.DefaultBatchSize
}

// if timeout isn't set, set it to default
if triggerInstance.Batch.Timeout == "" {
ap.Logger.DebugWithCtx(ctx, "Enriching batching timeout for function trigger",
"functionName", functionConfig.Meta.Name,
"trigger", triggerName,
"batchTimeout", functionconfig.DefaultBatchTimeout)
triggerInstance.Batch.Timeout = functionconfig.DefaultBatchTimeout
}
// if timeout isn't set, set it to default
if triggerInstance.Batch.Timeout == "" {
ap.Logger.DebugWithCtx(ctx, "Enriching batching timeout for function trigger",
"functionName", functionConfig.Meta.Name,
"trigger", triggerName,
"batchTimeout", functionconfig.DefaultBatchTimeout)
triggerInstance.Batch.Timeout = functionconfig.DefaultBatchTimeout
}
}
return nil
}

// enrichProcessingMode sets default processing modes and configurations
// for each trigger in the function configuration.
// If a trigger mode is empty or "sync", it is forced to "sync".
// Otherwise, it ensures that AsyncConfig is properly populated.
func (ap *Platform) enrichProcessingMode(
ctx context.Context,
triggerName string,
triggerInstance *functionconfig.Trigger,
functionConfig *functionconfig.Config) error {

// if trigger mode is empty or already "sync", set it to sync and continue
if triggerInstance.Mode == "" || triggerInstance.Mode == functionconfig.SyncTriggerWorkMode {
triggerInstance.Mode = functionconfig.SyncTriggerWorkMode
return nil
}

// otherwise, this trigger is async
ap.Logger.DebugWithCtx(ctx,
"Enriching async config for function trigger",
"functionName", functionConfig.Meta.Name,
"trigger", triggerName,
)

// if no async config is defined, create a new one
if triggerInstance.AsyncConfig == nil {
triggerInstance.AsyncConfig = &functionconfig.AsyncConfig{}
}

// if ConnectionCreationMode is not set, default to Static
if triggerInstance.AsyncConfig.ConnectionCreationMode == "" {
ap.Logger.DebugWithCtx(ctx,
"Enriching ConnectionCreationMode for function trigger",
"functionName", functionConfig.Meta.Name,
"trigger", triggerName,
"connectionCreationMode", functionconfig.ConnectionCreationModeStatic,
)
triggerInstance.AsyncConfig.ConnectionCreationMode = functionconfig.ConnectionCreationModeStatic
}

// if MaxConnectionsNumber is 0, set it to the default max
if triggerInstance.AsyncConfig.MaxConnectionsNumber == 0 {
ap.Logger.DebugWithCtx(ctx,
"Enriching MaxConnectionsNumber for function trigger",
"functionName", functionConfig.Meta.Name,
"trigger", triggerName,
"maxConnectionsNumber", functionconfig.DefaultMaxConnectionsNumber,
)
triggerInstance.AsyncConfig.MaxConnectionsNumber = functionconfig.DefaultMaxConnectionsNumber
}

// if the connection creation mode is Static, ensure MinConnectionsNumber matches MaxConnectionsNumber
if triggerInstance.AsyncConfig.ConnectionCreationMode == functionconfig.ConnectionCreationModeStatic &&
triggerInstance.AsyncConfig.MinConnectionsNumber != triggerInstance.AsyncConfig.MaxConnectionsNumber {

ap.Logger.DebugWithCtx(ctx,
"Enriching MinConnectionsNumber for function trigger",
"functionName", functionConfig.Meta.Name,
"trigger", triggerName,
"minConnectionsNumber", triggerInstance.AsyncConfig.MaxConnectionsNumber,
)
triggerInstance.AsyncConfig.MinConnectionsNumber = triggerInstance.AsyncConfig.MaxConnectionsNumber
}

return nil
}

// returns overrides for base images per runtime
func (ap *Platform) getBaseImagesOverrides() map[string]string {
return ap.Config.ImageRegistryOverrides.BaseImageRegistries
Expand Down
Loading
Loading
0