From 3b99660a6122a5adb5f3cd89b568a131071f36a7 Mon Sep 17 00:00:00 2001 From: rokatyy Date: Thu, 6 Mar 2025 17:56:43 +0000 Subject: [PATCH 1/4] [Processor] Add enrichment and validation for async processing --- pkg/functionconfig/types.go | 43 +++- pkg/platform/abstract/platform.go | 113 ++++++++++- pkg/platform/abstract/platform_test.go | 261 +++++++++++++++++++++++++ 3 files changed, 412 insertions(+), 5 deletions(-) diff --git a/pkg/functionconfig/types.go b/pkg/functionconfig/types.go index 90a340b3254..8fcf30372b8 100644 --- a/pkg/functionconfig/types.go +++ b/pkg/functionconfig/types.go @@ -95,7 +95,8 @@ type Trigger struct { // General attributes Attributes map[string]interface{} `json:"attributes,omitempty"` - Mode TriggerWorkMode `json:"mode,omitempty"` + Mode TriggerWorkMode `json:"mode,omitempty"` + AsyncConfig *AsyncConfig // Deprecated: MaxWorkers is replaced by NumWorkers, and will be removed in 1.15.x // TODO: remove in 1.15.x @@ -115,6 +116,21 @@ type BatchConfiguration struct { Timeout string `json:"timeout,omitempty"` } +type AsyncConfig struct { + MinConnectionsNumber int + MaxConnectionsNumber int + ConnectionCreationMode ConnectionCreationMode +} + +type ConnectionCreationMode string + +const ( + ConnectionCreationModeStatic ConnectionCreationMode = "static" + ConnectionCreationModeDynamic ConnectionCreationMode = "dynamic" + + DefaultMaxConnectionsNumber = 1000 +) + type BatchMode string const ( @@ -140,6 +156,13 @@ var runtimesSupportBatching = []string{ "python", } +var triggerKindsSupportAsync = []string{ + "http", +} +var runtimesSupportAsync = []string{ + "python", +} + func TriggerKindSupportsBatching(triggerKind string) bool { for _, supportedKind := range triggerKindsSupportBatching { if triggerKind == supportedKind { @@ -158,6 +181,24 @@ func RuntimeSupportsBatching(runtime string) bool { return false } +func TriggerKindSupportsAsync(triggerKind string) bool { + for _, supportedKind := range triggerKindsSupportAsync { + if triggerKind == supportedKind { + return true + } + } + return false +} + +func RuntimeSupportsAsync(runtime string) bool { + for _, supportedRuntime := range runtimesSupportAsync { + if strings.Contains(runtime, supportedRuntime) { + return true + } + } + return false +} + type ExplicitAckMode string const ( diff --git a/pkg/platform/abstract/platform.go b/pkg/platform/abstract/platform.go index 9be46032fc5..992b46bca17 100644 --- a/pkg/platform/abstract/platform.go +++ b/pkg/platform/abstract/platform.go @@ -1630,6 +1630,10 @@ func (ap *Platform) validateTriggers(functionConfig *functionconfig.Config) erro return nuclio.WrapErrBadRequest(err) } + if err := ap.validateProcessingMode(functionConfig); err != nil { + return nuclio.WrapErrBadRequest(err) + } + for triggerKey, triggerInstance := range functionConfig.Spec.Triggers { // do not allow trigger with empty name @@ -1730,6 +1734,40 @@ func (ap *Platform) validateBatchConfiguration(functionConfig *functionconfig.Co return nil } +func (ap *Platform) validateProcessingMode(functionConfig *functionconfig.Config) error { + for _, triggerInstance := range functionConfig.Spec.Triggers { + + if triggerInstance.Mode == functionconfig.SyncTriggerWorkMode { + + if triggerInstance.AsyncConfig != nil { + return nuclio.NewErrBadRequest("AsyncConfig should be empty when working in `sync` trigger mode") + } + continue + } + if !functionconfig.TriggerKindSupportsAsync(triggerInstance.Kind) { + return nuclio.NewErrBadRequest(fmt.Sprintf( + "Async processing mode is not supported for %s trigger kind", + triggerInstance.Kind)) + } + + if !functionconfig.RuntimeSupportsAsync(functionConfig.Spec.Runtime) { + return nuclio.NewErrBadRequest(fmt.Sprintf( + "Async processing mode is not supported for %s runtime", + functionConfig.Spec.Runtime)) + } + + if triggerInstance.AsyncConfig.MaxConnectionsNumber < triggerInstance.AsyncConfig.MinConnectionsNumber { + return nuclio.NewErrBadRequest(fmt.Sprintf( + "Maximum connection number configuration can't be smaller than minimal. "+ + "MaxConnectionsNumber: %d, MinConnectionsNumber: %d", triggerInstance.AsyncConfig.MaxConnectionsNumber, + triggerInstance.AsyncConfig.MaxConnectionsNumber, + )) + } + + } + return nil +} + func (ap *Platform) validateIngresses(triggers map[string]functionconfig.Trigger) error { for triggerName, triggerInstance := range functionconfig.GetTriggersByKind(triggers, "http") { @@ -1783,6 +1821,10 @@ func (ap *Platform) enrichTriggers(ctx context.Context, functionConfig *function return errors.Wrap(err, "Failed to enrich batch params") } + if err := ap.enrichProcessingMode(ctx, functionConfig); err != nil { + return errors.Wrap(err, "Failed to enrich processing mode") + } + for triggerName, triggerInstance := range functionConfig.Spec.Triggers { // if name was not given, inherit its key @@ -1790,10 +1832,6 @@ func (ap *Platform) enrichTriggers(ctx context.Context, functionConfig *function triggerInstance.Name = triggerName } - if triggerInstance.Mode == "" { - triggerInstance.Mode = functionconfig.SyncTriggerWorkMode - } - // replace deprecated MaxWorkers with NumWorkers // TODO: remove in 1.15.x // nolint: staticcheck @@ -1880,6 +1918,73 @@ func (ap *Platform) enrichBatchParams(ctx context.Context, functionConfig *funct return nil } +// enrichProcessingMode sets default processing modes and configurations +// for each trigger in the function configuration. +// If a trigger mode is empty or "sync", it is forced to "sync". +// Otherwise, it ensures that AsyncConfig is properly populated. +func (ap *Platform) enrichProcessingMode(ctx context.Context, functionConfig *functionconfig.Config) error { + + for triggerName, triggerInstance := range functionConfig.Spec.Triggers { + + // if trigger mode is empty or already "sync", set it to sync and continue + if triggerInstance.Mode == "" || triggerInstance.Mode == functionconfig.SyncTriggerWorkMode { + triggerInstance.Mode = functionconfig.SyncTriggerWorkMode + functionConfig.Spec.Triggers[triggerName] = triggerInstance + continue + } + + // otherwise, this trigger is async + ap.Logger.DebugWithCtx(ctx, + "Enriching async config for function trigger", + "functionName", functionConfig.Meta.Name, + "trigger", triggerName, + ) + + // if no async config is defined, create a new one + if triggerInstance.AsyncConfig == nil { + triggerInstance.AsyncConfig = &functionconfig.AsyncConfig{} + } + + // if ConnectionCreationMode is not set, default to Static + if triggerInstance.AsyncConfig.ConnectionCreationMode == "" { + ap.Logger.DebugWithCtx(ctx, + "Enriching ConnectionCreationMode for function trigger", + "functionName", functionConfig.Meta.Name, + "trigger", triggerName, + "connectionCreationMode", functionconfig.ConnectionCreationModeStatic, + ) + triggerInstance.AsyncConfig.ConnectionCreationMode = functionconfig.ConnectionCreationModeStatic + } + + // if MaxConnectionsNumber is 0, set it to the default max + if triggerInstance.AsyncConfig.MaxConnectionsNumber == 0 { + ap.Logger.DebugWithCtx(ctx, + "Enriching MaxConnectionsNumber for function trigger", + "functionName", functionConfig.Meta.Name, + "trigger", triggerName, + "maxConnectionsNumber", functionconfig.DefaultMaxConnectionsNumber, + ) + triggerInstance.AsyncConfig.MaxConnectionsNumber = functionconfig.DefaultMaxConnectionsNumber + } + + // if the connection creation mode is Static, ensure MinConnectionsNumber matches MaxConnectionsNumber + if triggerInstance.AsyncConfig.ConnectionCreationMode == functionconfig.ConnectionCreationModeStatic && + triggerInstance.AsyncConfig.MinConnectionsNumber != triggerInstance.AsyncConfig.MaxConnectionsNumber { + + ap.Logger.DebugWithCtx(ctx, + "Enriching MinConnectionsNumber for function trigger", + "functionName", functionConfig.Meta.Name, + "trigger", triggerName, + "minConnectionsNumber", triggerInstance.AsyncConfig.MaxConnectionsNumber, + ) + triggerInstance.AsyncConfig.MinConnectionsNumber = triggerInstance.AsyncConfig.MaxConnectionsNumber + } + functionConfig.Spec.Triggers[triggerName] = triggerInstance + } + + return nil +} + // returns overrides for base images per runtime func (ap *Platform) getBaseImagesOverrides() map[string]string { return ap.Config.ImageRegistryOverrides.BaseImageRegistries diff --git a/pkg/platform/abstract/platform_test.go b/pkg/platform/abstract/platform_test.go index 67f95b3d9e2..275c53db2b5 100644 --- a/pkg/platform/abstract/platform_test.go +++ b/pkg/platform/abstract/platform_test.go @@ -2063,6 +2063,267 @@ func (suite *AbstractPlatformTestSuite) TestValidateFunctionConfigAutoScaleMetri } } +func (suite *AbstractPlatformTestSuite) TestEnrichProcessingMode() { + testCases := []struct { + name string + functionConfig *functionconfig.Config + expectedConfig *functionconfig.Config + }{ + { + name: "sync-trigger-mode-by-default", + functionConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + }, + }, + }, + }, + expectedConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.SyncTriggerWorkMode, + }, + }, + }, + }, + }, + { + name: "async-trigger-with-defaults", + functionConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.AsyncTriggerWorkMode, + }, + }, + }, + }, + expectedConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.AsyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{ + ConnectionCreationMode: functionconfig.ConnectionCreationModeStatic, + MaxConnectionsNumber: functionconfig.DefaultMaxConnectionsNumber, + MinConnectionsNumber: functionconfig.DefaultMaxConnectionsNumber, + }, + }, + }, + }, + }, + }, + { + name: "async-trigger-enrich-min-connection", + functionConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.AsyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{ + ConnectionCreationMode: functionconfig.ConnectionCreationModeStatic, + MaxConnectionsNumber: 10, + MinConnectionsNumber: 5, + }, + }, + }, + }, + }, + expectedConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.AsyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{ + ConnectionCreationMode: functionconfig.ConnectionCreationModeStatic, + MaxConnectionsNumber: 10, + MinConnectionsNumber: 10, + }, + }, + }, + }, + }, + }, + { + name: "async-trigger-with-custom-config", + functionConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.AsyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{ + ConnectionCreationMode: functionconfig.ConnectionCreationModeDynamic, + MaxConnectionsNumber: 10, + MinConnectionsNumber: 5, + }, + }, + }, + }, + }, + expectedConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.AsyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{ + ConnectionCreationMode: functionconfig.ConnectionCreationModeDynamic, + MaxConnectionsNumber: 10, + MinConnectionsNumber: 5, + }, + }, + }, + }, + }, + }, + } + + for _, testCase := range testCases { + suite.Run(testCase.name, func() { + err := suite.Platform.enrichProcessingMode(context.Background(), testCase.functionConfig) + + suite.Require().NoError(err) + + suite.Equal(testCase.expectedConfig, testCase.functionConfig) + }) + } +} + +func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { + testCases := []struct { + name string + functionConfig *functionconfig.Config + expectedError string + }{ + { + name: "sync trigger, no async config -> no error", + functionConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Runtime: "python", + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.SyncTriggerWorkMode, + }, + }, + }, + }, + }, + { + name: "sync trigger, has async config -> error", + functionConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Runtime: "python", + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.SyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{MaxConnectionsNumber: 10}, + }, + }, + }, + }, + expectedError: "AsyncConfig should be empty when working in `sync` trigger mode", + }, + { + name: "async kind not supported -> error", + functionConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Runtime: "python", + Triggers: map[string]functionconfig.Trigger{ + "cron-trigger": { + Kind: "cron", + Mode: functionconfig.AsyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{ + MaxConnectionsNumber: 10, + }, + }, + }, + }, + }, + expectedError: "Async processing mode is not supported for cron trigger kind", + }, + { + name: "async runtime not supported -> error", + functionConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Runtime: "java", + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.AsyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{ + MaxConnectionsNumber: 10, + }, + }, + }, + }, + }, + expectedError: "Async processing mode is not supported for java runtime", + }, + { + name: "max < min -> error", + functionConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Runtime: "python", + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.AsyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{ + MaxConnectionsNumber: 1, + MinConnectionsNumber: 2, + }, + }, + }, + }, + }, + expectedError: "Maximum connection number configuration can't be smaller than minimal", + }, + { + name: "valid async config -> no error", + functionConfig: &functionconfig.Config{ + Spec: functionconfig.Spec{ + Runtime: "python", + Triggers: map[string]functionconfig.Trigger{ + "http-trigger": { + Kind: "http", + Mode: functionconfig.AsyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{ + MaxConnectionsNumber: 10, + MinConnectionsNumber: 5, + }, + }, + }, + }, + }, + expectedError: "", + }, + } + + for _, testCase := range testCases { + suite.Run(testCase.name, func() { + err := suite.Platform.validateProcessingMode(testCase.functionConfig) + + if testCase.expectedError == "" { + suite.Require().NoError(err) + } else { + suite.Require().Error(err) + suite.Contains(err.Error(), testCase.expectedError) + } + }) + } +} + // Test that GetProcessorLogs() generates the expected formattedPodLogs and briefErrorsMessage // Expects 3 files inside functionLogsFilePath: (kept in these constants) // - FunctionLogsFile From 9d4715e7606b707161d97b49c41b5c92ff1dbb54 Mon Sep 17 00:00:00 2001 From: rokatyy Date: Mon, 10 Mar 2025 17:21:08 +0000 Subject: [PATCH 2/4] comments --- pkg/functionconfig/types.go | 8 +- pkg/platform/abstract/platform.go | 272 +++++++++--------- pkg/platform/abstract/platform_test.go | 161 +++-------- .../runtime/python/test/python_test.go | 4 +- 4 files changed, 188 insertions(+), 257 deletions(-) diff --git a/pkg/functionconfig/types.go b/pkg/functionconfig/types.go index 8fcf30372b8..c34aaa99925 100644 --- a/pkg/functionconfig/types.go +++ b/pkg/functionconfig/types.go @@ -96,7 +96,7 @@ type Trigger struct { Attributes map[string]interface{} `json:"attributes,omitempty"` Mode TriggerWorkMode `json:"mode,omitempty"` - AsyncConfig *AsyncConfig + AsyncConfig *AsyncConfig `json:"asyncConfig,omitempty"` // Deprecated: MaxWorkers is replaced by NumWorkers, and will be removed in 1.15.x // TODO: remove in 1.15.x @@ -117,9 +117,9 @@ type BatchConfiguration struct { } type AsyncConfig struct { - MinConnectionsNumber int - MaxConnectionsNumber int - ConnectionCreationMode ConnectionCreationMode + MinConnectionsNumber int `json:"minConnectionsNumber,omitempty"` + MaxConnectionsNumber int `json:"maxConnectionsNumber,omitempty"` + ConnectionCreationMode ConnectionCreationMode `json:"connectionCreationMode,omitempty"` } type ConnectionCreationMode string diff --git a/pkg/platform/abstract/platform.go b/pkg/platform/abstract/platform.go index 992b46bca17..2d67da4894f 100644 --- a/pkg/platform/abstract/platform.go +++ b/pkg/platform/abstract/platform.go @@ -1626,14 +1626,6 @@ func (ap *Platform) validateTriggers(functionConfig *functionconfig.Config) erro return errors.Wrap(err, "Ingresses validation failed") } - if err := ap.validateBatchConfiguration(functionConfig); err != nil { - return nuclio.WrapErrBadRequest(err) - } - - if err := ap.validateProcessingMode(functionConfig); err != nil { - return nuclio.WrapErrBadRequest(err) - } - for triggerKey, triggerInstance := range functionConfig.Spec.Triggers { // do not allow trigger with empty name @@ -1702,69 +1694,75 @@ func (ap *Platform) validateTriggers(functionConfig *functionconfig.Config) erro } } } + + if err := ap.validateBatchConfiguration(triggerInstance, functionConfig); err != nil { + return nuclio.WrapErrBadRequest(err) + } + + if err := ap.validateProcessingMode(triggerInstance, functionConfig); err != nil { + return nuclio.WrapErrBadRequest(err) + } } return nil } -func (ap *Platform) validateBatchConfiguration(functionConfig *functionconfig.Config) error { +func (ap *Platform) validateBatchConfiguration(triggerInstance functionconfig.Trigger, functionConfig *functionconfig.Config) error { - for _, triggerInstance := range functionConfig.Spec.Triggers { - if triggerInstance.Batch == nil { - continue - } - if functionconfig.BatchModeEnabled(triggerInstance.Batch) && - !functionconfig.TriggerKindSupportsBatching(triggerInstance.Kind) { - ap.Logger.WarnWith("Batching is not supported for given trigger kind - batching configuration is ignored", - "triggerKind", triggerInstance.Kind) - } - if functionconfig.BatchModeEnabled(triggerInstance.Batch) && - !functionconfig.RuntimeSupportsBatching(functionConfig.Spec.Runtime) { - ap.Logger.WarnWith("Batching is not supported for given runtime - batching configuration is ignored", - "runtime", functionConfig.Spec.Runtime) - } + if triggerInstance.Batch == nil { + return nil + } + if functionconfig.BatchModeEnabled(triggerInstance.Batch) && + !functionconfig.TriggerKindSupportsBatching(triggerInstance.Kind) { + ap.Logger.WarnWith("Batching is not supported for given trigger kind - batching configuration is ignored", + "triggerKind", triggerInstance.Kind) + } + if functionconfig.BatchModeEnabled(triggerInstance.Batch) && + !functionconfig.RuntimeSupportsBatching(functionConfig.Spec.Runtime) { + ap.Logger.WarnWith("Batching is not supported for given runtime - batching configuration is ignored", + "runtime", functionConfig.Spec.Runtime) + } - if triggerInstance.Batch.BatchSize <= 0 { - return nuclio.NewErrBadRequest("Batch size should be 1 or higher") - } - if _, err := time.ParseDuration(triggerInstance.Batch.Timeout); err != nil { - return nuclio.NewErrBadRequest(fmt.Sprintf("Batching timeout validation failed. Error: %s", err.Error())) - } + if triggerInstance.Batch.BatchSize <= 0 { + return nuclio.NewErrBadRequest("Batch size should be 1 or higher") + } + if _, err := time.ParseDuration(triggerInstance.Batch.Timeout); err != nil { + return nuclio.NewErrBadRequest(fmt.Sprintf("Batching timeout validation failed. Error: %s", err.Error())) } return nil } -func (ap *Platform) validateProcessingMode(functionConfig *functionconfig.Config) error { - for _, triggerInstance := range functionConfig.Spec.Triggers { +func (ap *Platform) validateProcessingMode(triggerInstance functionconfig.Trigger, functionConfig *functionconfig.Config) error { - if triggerInstance.Mode == functionconfig.SyncTriggerWorkMode { + if triggerInstance.Mode == functionconfig.SyncTriggerWorkMode { - if triggerInstance.AsyncConfig != nil { - return nuclio.NewErrBadRequest("AsyncConfig should be empty when working in `sync` trigger mode") - } - continue - } - if !functionconfig.TriggerKindSupportsAsync(triggerInstance.Kind) { - return nuclio.NewErrBadRequest(fmt.Sprintf( - "Async processing mode is not supported for %s trigger kind", - triggerInstance.Kind)) + if triggerInstance.AsyncConfig != nil { + return nuclio.NewErrBadRequest("AsyncConfig should be empty when working in `sync` trigger mode") } + return nil + } - if !functionconfig.RuntimeSupportsAsync(functionConfig.Spec.Runtime) { - return nuclio.NewErrBadRequest(fmt.Sprintf( - "Async processing mode is not supported for %s runtime", - functionConfig.Spec.Runtime)) - } + // Async validations + if !functionconfig.TriggerKindSupportsAsync(triggerInstance.Kind) { + return nuclio.NewErrBadRequest(fmt.Sprintf( + "Async processing mode is not supported for trigger kind - %s", + triggerInstance.Kind)) + } - if triggerInstance.AsyncConfig.MaxConnectionsNumber < triggerInstance.AsyncConfig.MinConnectionsNumber { - return nuclio.NewErrBadRequest(fmt.Sprintf( - "Maximum connection number configuration can't be smaller than minimal. "+ - "MaxConnectionsNumber: %d, MinConnectionsNumber: %d", triggerInstance.AsyncConfig.MaxConnectionsNumber, - triggerInstance.AsyncConfig.MaxConnectionsNumber, - )) - } + if !functionconfig.RuntimeSupportsAsync(functionConfig.Spec.Runtime) { + return nuclio.NewErrBadRequest(fmt.Sprintf( + "Async processing mode is not supported for runtime - %s", + functionConfig.Spec.Runtime)) + } + if triggerInstance.AsyncConfig.MaxConnectionsNumber < triggerInstance.AsyncConfig.MinConnectionsNumber { + return nuclio.NewErrBadRequest(fmt.Sprintf( + "Maximum connection number configuration can't be smaller than minimal. "+ + "MaxConnectionsNumber: %d, MinConnectionsNumber: %d", triggerInstance.AsyncConfig.MaxConnectionsNumber, + triggerInstance.AsyncConfig.MaxConnectionsNumber, + )) } + return nil } @@ -1817,14 +1815,6 @@ func (ap *Platform) enrichTriggers(ctx context.Context, functionConfig *function return errors.Wrap(err, "Failed to enrich explicit ack params") } - if err := ap.enrichBatchParams(ctx, functionConfig); err != nil { - return errors.Wrap(err, "Failed to enrich batch params") - } - - if err := ap.enrichProcessingMode(ctx, functionConfig); err != nil { - return errors.Wrap(err, "Failed to enrich processing mode") - } - for triggerName, triggerInstance := range functionConfig.Spec.Triggers { // if name was not given, inherit its key @@ -1846,6 +1836,17 @@ func (ap *Platform) enrichTriggers(ctx context.Context, functionConfig *function triggerInstance.NumWorkers = 1 } } + if err := ap.enrichExplicitAckParams(ctx, functionConfig); err != nil { + return errors.Wrap(err, "Failed to enrich explicit ack params") + } + + if err := ap.enrichBatchParams(ctx, triggerName, &triggerInstance, functionConfig); err != nil { + return errors.Wrap(err, "Failed to enrich batch params") + } + + if err := ap.enrichProcessingMode(ctx, triggerName, &triggerInstance, functionConfig); err != nil { + return errors.Wrap(err, "Failed to enrich processing mode") + } functionConfig.Spec.Triggers[triggerName] = triggerInstance } @@ -1885,34 +1886,36 @@ func (ap *Platform) enrichExplicitAckParams(ctx context.Context, functionConfig return nil } -func (ap *Platform) enrichBatchParams(ctx context.Context, functionConfig *functionconfig.Config) error { - for triggerName, triggerInstance := range functionConfig.Spec.Triggers { - // skip batch configuration enrichment if it wasn't set - if triggerInstance.Batch == nil { - continue - } - // if batch mode is enabled, check batching parameters - if functionconfig.BatchModeEnabled(triggerInstance.Batch) { - ap.Logger.DebugWithCtx(ctx, "Enriching batch params for function trigger", +func (ap *Platform) enrichBatchParams( + ctx context.Context, + triggerName string, + triggerInstance *functionconfig.Trigger, + functionConfig *functionconfig.Config) error { + // skip batch configuration enrichment if it wasn't set + if triggerInstance.Batch == nil { + return nil + } + // if batch mode is enabled, check batching parameters + if functionconfig.BatchModeEnabled(triggerInstance.Batch) { + ap.Logger.DebugWithCtx(ctx, "Enriching batch params for function trigger", + "functionName", functionConfig.Meta.Name, + "trigger", triggerName) + // if batch size isn't set, set it to default + if triggerInstance.Batch.BatchSize == 0 { + ap.Logger.DebugWithCtx(ctx, "Enriching batch size for function trigger", "functionName", functionConfig.Meta.Name, - "trigger", triggerName) - // if batch size isn't set, set it to default - if triggerInstance.Batch.BatchSize == 0 { - ap.Logger.DebugWithCtx(ctx, "Enriching batch size for function trigger", - "functionName", functionConfig.Meta.Name, - "trigger", triggerName, - "batchSize", functionconfig.DefaultBatchSize) - triggerInstance.Batch.BatchSize = functionconfig.DefaultBatchSize - } + "trigger", triggerName, + "batchSize", functionconfig.DefaultBatchSize) + triggerInstance.Batch.BatchSize = functionconfig.DefaultBatchSize + } - // if timeout isn't set, set it to default - if triggerInstance.Batch.Timeout == "" { - ap.Logger.DebugWithCtx(ctx, "Enriching batching timeout for function trigger", - "functionName", functionConfig.Meta.Name, - "trigger", triggerName, - "batchTimeout", functionconfig.DefaultBatchTimeout) - triggerInstance.Batch.Timeout = functionconfig.DefaultBatchTimeout - } + // if timeout isn't set, set it to default + if triggerInstance.Batch.Timeout == "" { + ap.Logger.DebugWithCtx(ctx, "Enriching batching timeout for function trigger", + "functionName", functionConfig.Meta.Name, + "trigger", triggerName, + "batchTimeout", functionconfig.DefaultBatchTimeout) + triggerInstance.Batch.Timeout = functionconfig.DefaultBatchTimeout } } return nil @@ -1922,64 +1925,63 @@ func (ap *Platform) enrichBatchParams(ctx context.Context, functionConfig *funct // for each trigger in the function configuration. // If a trigger mode is empty or "sync", it is forced to "sync". // Otherwise, it ensures that AsyncConfig is properly populated. -func (ap *Platform) enrichProcessingMode(ctx context.Context, functionConfig *functionconfig.Config) error { +func (ap *Platform) enrichProcessingMode( + ctx context.Context, + triggerName string, + triggerInstance *functionconfig.Trigger, + functionConfig *functionconfig.Config) error { + + // if trigger mode is empty or already "sync", set it to sync and continue + if triggerInstance.Mode == "" || triggerInstance.Mode == functionconfig.SyncTriggerWorkMode { + triggerInstance.Mode = functionconfig.SyncTriggerWorkMode + return nil + } - for triggerName, triggerInstance := range functionConfig.Spec.Triggers { + // otherwise, this trigger is async + ap.Logger.DebugWithCtx(ctx, + "Enriching async config for function trigger", + "functionName", functionConfig.Meta.Name, + "trigger", triggerName, + ) - // if trigger mode is empty or already "sync", set it to sync and continue - if triggerInstance.Mode == "" || triggerInstance.Mode == functionconfig.SyncTriggerWorkMode { - triggerInstance.Mode = functionconfig.SyncTriggerWorkMode - functionConfig.Spec.Triggers[triggerName] = triggerInstance - continue - } + // if no async config is defined, create a new one + if triggerInstance.AsyncConfig == nil { + triggerInstance.AsyncConfig = &functionconfig.AsyncConfig{} + } - // otherwise, this trigger is async + // if ConnectionCreationMode is not set, default to Static + if triggerInstance.AsyncConfig.ConnectionCreationMode == "" { ap.Logger.DebugWithCtx(ctx, - "Enriching async config for function trigger", + "Enriching ConnectionCreationMode for function trigger", "functionName", functionConfig.Meta.Name, "trigger", triggerName, + "connectionCreationMode", functionconfig.ConnectionCreationModeStatic, ) + triggerInstance.AsyncConfig.ConnectionCreationMode = functionconfig.ConnectionCreationModeStatic + } - // if no async config is defined, create a new one - if triggerInstance.AsyncConfig == nil { - triggerInstance.AsyncConfig = &functionconfig.AsyncConfig{} - } - - // if ConnectionCreationMode is not set, default to Static - if triggerInstance.AsyncConfig.ConnectionCreationMode == "" { - ap.Logger.DebugWithCtx(ctx, - "Enriching ConnectionCreationMode for function trigger", - "functionName", functionConfig.Meta.Name, - "trigger", triggerName, - "connectionCreationMode", functionconfig.ConnectionCreationModeStatic, - ) - triggerInstance.AsyncConfig.ConnectionCreationMode = functionconfig.ConnectionCreationModeStatic - } - - // if MaxConnectionsNumber is 0, set it to the default max - if triggerInstance.AsyncConfig.MaxConnectionsNumber == 0 { - ap.Logger.DebugWithCtx(ctx, - "Enriching MaxConnectionsNumber for function trigger", - "functionName", functionConfig.Meta.Name, - "trigger", triggerName, - "maxConnectionsNumber", functionconfig.DefaultMaxConnectionsNumber, - ) - triggerInstance.AsyncConfig.MaxConnectionsNumber = functionconfig.DefaultMaxConnectionsNumber - } + // if MaxConnectionsNumber is 0, set it to the default max + if triggerInstance.AsyncConfig.MaxConnectionsNumber == 0 { + ap.Logger.DebugWithCtx(ctx, + "Enriching MaxConnectionsNumber for function trigger", + "functionName", functionConfig.Meta.Name, + "trigger", triggerName, + "maxConnectionsNumber", functionconfig.DefaultMaxConnectionsNumber, + ) + triggerInstance.AsyncConfig.MaxConnectionsNumber = functionconfig.DefaultMaxConnectionsNumber + } - // if the connection creation mode is Static, ensure MinConnectionsNumber matches MaxConnectionsNumber - if triggerInstance.AsyncConfig.ConnectionCreationMode == functionconfig.ConnectionCreationModeStatic && - triggerInstance.AsyncConfig.MinConnectionsNumber != triggerInstance.AsyncConfig.MaxConnectionsNumber { + // if the connection creation mode is Static, ensure MinConnectionsNumber matches MaxConnectionsNumber + if triggerInstance.AsyncConfig.ConnectionCreationMode == functionconfig.ConnectionCreationModeStatic && + triggerInstance.AsyncConfig.MinConnectionsNumber != triggerInstance.AsyncConfig.MaxConnectionsNumber { - ap.Logger.DebugWithCtx(ctx, - "Enriching MinConnectionsNumber for function trigger", - "functionName", functionConfig.Meta.Name, - "trigger", triggerName, - "minConnectionsNumber", triggerInstance.AsyncConfig.MaxConnectionsNumber, - ) - triggerInstance.AsyncConfig.MinConnectionsNumber = triggerInstance.AsyncConfig.MaxConnectionsNumber - } - functionConfig.Spec.Triggers[triggerName] = triggerInstance + ap.Logger.DebugWithCtx(ctx, + "Enriching MinConnectionsNumber for function trigger", + "functionName", functionConfig.Meta.Name, + "trigger", triggerName, + "minConnectionsNumber", triggerInstance.AsyncConfig.MaxConnectionsNumber, + ) + triggerInstance.AsyncConfig.MinConnectionsNumber = triggerInstance.AsyncConfig.MaxConnectionsNumber } return nil diff --git a/pkg/platform/abstract/platform_test.go b/pkg/platform/abstract/platform_test.go index 275c53db2b5..c5f7c4da129 100644 --- a/pkg/platform/abstract/platform_test.go +++ b/pkg/platform/abstract/platform_test.go @@ -400,7 +400,7 @@ func (suite *AbstractPlatformTestSuite) TestEnrichBatchConfiguration() { }, }, } - err := suite.Platform.enrichBatchParams(suite.ctx, functionConfig) + err := suite.Platform.EnrichFunctionConfig(suite.ctx, functionConfig) suite.Require().NoError(err) suite.Require().Equal(testCase.expectedBatchConfiguration.BatchSize, testCase.batchConfiguration.BatchSize) @@ -474,9 +474,10 @@ func (suite *AbstractPlatformTestSuite) TestValidateBatchConfiguration() { }, } { suite.Run(testCase.name, func() { + triggerInstance := functionconfig.Trigger{Kind: testCase.triggerKind, Batch: testCase.batchConfiguration} - err := suite.Platform.validateBatchConfiguration(&functionconfig.Config{Spec: functionconfig.Spec{Runtime: testCase.runtime, Triggers: map[string]functionconfig.Trigger{ - "my-trigger": {Kind: testCase.triggerKind, Batch: testCase.batchConfiguration}, + err := suite.Platform.validateBatchConfiguration(triggerInstance, &functionconfig.Config{Spec: functionconfig.Spec{Runtime: testCase.runtime, Triggers: map[string]functionconfig.Trigger{ + "my-trigger": triggerInstance, }}}) if testCase.expectError { suite.Require().NotNil(err) @@ -2062,138 +2063,65 @@ func (suite *AbstractPlatformTestSuite) TestValidateFunctionConfigAutoScaleMetri }) } } - func (suite *AbstractPlatformTestSuite) TestEnrichProcessingMode() { testCases := []struct { name string - functionConfig *functionconfig.Config - expectedConfig *functionconfig.Config + trigger functionconfig.Trigger + expectedMode functionconfig.TriggerWorkMode + expectedConfig *functionconfig.AsyncConfig }{ { - name: "sync-trigger-mode-by-default", - functionConfig: &functionconfig.Config{ - Spec: functionconfig.Spec{ - Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { - Kind: "http", - }, - }, - }, - }, - expectedConfig: &functionconfig.Config{ - Spec: functionconfig.Spec{ - Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { - Kind: "http", - Mode: functionconfig.SyncTriggerWorkMode, - }, - }, - }, + name: "SyncMode", + trigger: functionconfig.Trigger{ + Mode: "", }, + expectedMode: functionconfig.SyncTriggerWorkMode, + expectedConfig: nil, }, { - name: "async-trigger-with-defaults", - functionConfig: &functionconfig.Config{ - Spec: functionconfig.Spec{ - Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { - Kind: "http", - Mode: functionconfig.AsyncTriggerWorkMode, - }, - }, - }, - }, - expectedConfig: &functionconfig.Config{ - Spec: functionconfig.Spec{ - Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { - Kind: "http", - Mode: functionconfig.AsyncTriggerWorkMode, - AsyncConfig: &functionconfig.AsyncConfig{ - ConnectionCreationMode: functionconfig.ConnectionCreationModeStatic, - MaxConnectionsNumber: functionconfig.DefaultMaxConnectionsNumber, - MinConnectionsNumber: functionconfig.DefaultMaxConnectionsNumber, - }, - }, - }, - }, - }, - }, - { - name: "async-trigger-enrich-min-connection", - functionConfig: &functionconfig.Config{ - Spec: functionconfig.Spec{ - Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { - Kind: "http", - Mode: functionconfig.AsyncTriggerWorkMode, - AsyncConfig: &functionconfig.AsyncConfig{ - ConnectionCreationMode: functionconfig.ConnectionCreationModeStatic, - MaxConnectionsNumber: 10, - MinConnectionsNumber: 5, - }, - }, - }, - }, + name: "AsyncModeWithDefaults", + trigger: functionconfig.Trigger{ + Mode: functionconfig.AsyncTriggerWorkMode, }, - expectedConfig: &functionconfig.Config{ - Spec: functionconfig.Spec{ - Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { - Kind: "http", - Mode: functionconfig.AsyncTriggerWorkMode, - AsyncConfig: &functionconfig.AsyncConfig{ - ConnectionCreationMode: functionconfig.ConnectionCreationModeStatic, - MaxConnectionsNumber: 10, - MinConnectionsNumber: 10, - }, - }, - }, - }, + expectedMode: functionconfig.AsyncTriggerWorkMode, + expectedConfig: &functionconfig.AsyncConfig{ + ConnectionCreationMode: functionconfig.ConnectionCreationModeStatic, + MaxConnectionsNumber: functionconfig.DefaultMaxConnectionsNumber, + MinConnectionsNumber: functionconfig.DefaultMaxConnectionsNumber, }, }, { - name: "async-trigger-with-custom-config", - functionConfig: &functionconfig.Config{ - Spec: functionconfig.Spec{ - Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { - Kind: "http", - Mode: functionconfig.AsyncTriggerWorkMode, - AsyncConfig: &functionconfig.AsyncConfig{ - ConnectionCreationMode: functionconfig.ConnectionCreationModeDynamic, - MaxConnectionsNumber: 10, - MinConnectionsNumber: 5, - }, - }, - }, + name: "AsyncModeWithCustomConfig", + trigger: functionconfig.Trigger{ + Mode: functionconfig.AsyncTriggerWorkMode, + AsyncConfig: &functionconfig.AsyncConfig{ + ConnectionCreationMode: "dynamic", + MaxConnectionsNumber: 10, + MinConnectionsNumber: 5, }, }, - expectedConfig: &functionconfig.Config{ - Spec: functionconfig.Spec{ - Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { - Kind: "http", - Mode: functionconfig.AsyncTriggerWorkMode, - AsyncConfig: &functionconfig.AsyncConfig{ - ConnectionCreationMode: functionconfig.ConnectionCreationModeDynamic, - MaxConnectionsNumber: 10, - MinConnectionsNumber: 5, - }, - }, - }, - }, + expectedMode: functionconfig.AsyncTriggerWorkMode, + expectedConfig: &functionconfig.AsyncConfig{ + ConnectionCreationMode: "dynamic", + MaxConnectionsNumber: 10, + MinConnectionsNumber: 5, }, }, } for _, testCase := range testCases { suite.Run(testCase.name, func() { - err := suite.Platform.enrichProcessingMode(context.Background(), testCase.functionConfig) + functionConfig := functionconfig.NewConfig() + functionConfig.Spec.Triggers = map[string]functionconfig.Trigger{ + "test-trigger": testCase.trigger, + } - suite.Require().NoError(err) + err := suite.Platform.enrichTriggers(suite.ctx, functionConfig) + suite.Require().NoError(err, "Failed to enrich processing mode") - suite.Equal(testCase.expectedConfig, testCase.functionConfig) + enrichedTrigger := functionConfig.Spec.Triggers["test-trigger"] + suite.Require().Equal(testCase.expectedMode, enrichedTrigger.Mode, "Unexpected mode") + suite.Require().Equal(testCase.expectedConfig, enrichedTrigger.AsyncConfig, "Unexpected async config") }) } } @@ -2250,7 +2178,7 @@ func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { }, }, }, - expectedError: "Async processing mode is not supported for cron trigger kind", + expectedError: "Async processing mode is not supported for trigger kind - cron", }, { name: "async runtime not supported -> error", @@ -2268,7 +2196,7 @@ func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { }, }, }, - expectedError: "Async processing mode is not supported for java runtime", + expectedError: "Async processing mode is not supported for runtime - java", }, { name: "max < min -> error", @@ -2312,7 +2240,8 @@ func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { for _, testCase := range testCases { suite.Run(testCase.name, func() { - err := suite.Platform.validateProcessingMode(testCase.functionConfig) + triggerInstance := testCase.functionConfig.Spec.Triggers["http-trigger"] + err := suite.Platform.validateProcessingMode(triggerInstance, testCase.functionConfig) if testCase.expectedError == "" { suite.Require().NoError(err) diff --git a/pkg/processor/runtime/python/test/python_test.go b/pkg/processor/runtime/python/test/python_test.go index 807c4453e78..34812948406 100644 --- a/pkg/processor/runtime/python/test/python_test.go +++ b/pkg/processor/runtime/python/test/python_test.go @@ -131,7 +131,7 @@ func (suite *TestSuite) TestOutputs() { } testPath := "/path/to/nowhere" - createFunctionOptions := suite.GetDeployOptions("outputter", + createFunctionOptions := suite.GetDeployOptionsAsync("outputter", suite.GetFunctionPath("outputter")) createFunctionOptions.FunctionConfig.Spec.Handler = "outputter:handler" @@ -490,7 +490,7 @@ func TestIntegrationSuite(t *testing.T) { } runtimes := []string{"python:3.9", "python:3.10", "python:3.11"} - modes := []functionconfig.TriggerWorkMode{functionconfig.AsyncTriggerWorkMode, functionconfig.SyncTriggerWorkMode} + modes := []functionconfig.TriggerWorkMode{functionconfig.SyncTriggerWorkMode} for _, runtime := range runtimes { for _, mode := range modes { From 4c4423841e18e1a6f3df442093e22af3bb364e33 Mon Sep 17 00:00:00 2001 From: rokatyy Date: Mon, 10 Mar 2025 18:53:23 +0000 Subject: [PATCH 3/4] fix --- pkg/platform/abstract/platform_test.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/pkg/platform/abstract/platform_test.go b/pkg/platform/abstract/platform_test.go index c5f7c4da129..8c7f66c4136 100644 --- a/pkg/platform/abstract/platform_test.go +++ b/pkg/platform/abstract/platform_test.go @@ -2138,7 +2138,7 @@ func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { Spec: functionconfig.Spec{ Runtime: "python", Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { + "test-trigger": { Kind: "http", Mode: functionconfig.SyncTriggerWorkMode, }, @@ -2152,7 +2152,7 @@ func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { Spec: functionconfig.Spec{ Runtime: "python", Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { + "test-trigger": { Kind: "http", Mode: functionconfig.SyncTriggerWorkMode, AsyncConfig: &functionconfig.AsyncConfig{MaxConnectionsNumber: 10}, @@ -2168,7 +2168,7 @@ func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { Spec: functionconfig.Spec{ Runtime: "python", Triggers: map[string]functionconfig.Trigger{ - "cron-trigger": { + "test-trigger": { Kind: "cron", Mode: functionconfig.AsyncTriggerWorkMode, AsyncConfig: &functionconfig.AsyncConfig{ @@ -2186,7 +2186,7 @@ func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { Spec: functionconfig.Spec{ Runtime: "java", Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { + "test-trigger": { Kind: "http", Mode: functionconfig.AsyncTriggerWorkMode, AsyncConfig: &functionconfig.AsyncConfig{ @@ -2204,7 +2204,7 @@ func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { Spec: functionconfig.Spec{ Runtime: "python", Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { + "test-trigger": { Kind: "http", Mode: functionconfig.AsyncTriggerWorkMode, AsyncConfig: &functionconfig.AsyncConfig{ @@ -2223,7 +2223,7 @@ func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { Spec: functionconfig.Spec{ Runtime: "python", Triggers: map[string]functionconfig.Trigger{ - "http-trigger": { + "test-trigger": { Kind: "http", Mode: functionconfig.AsyncTriggerWorkMode, AsyncConfig: &functionconfig.AsyncConfig{ @@ -2240,7 +2240,7 @@ func (suite *AbstractPlatformTestSuite) TestValidateProcessingMode() { for _, testCase := range testCases { suite.Run(testCase.name, func() { - triggerInstance := testCase.functionConfig.Spec.Triggers["http-trigger"] + triggerInstance := testCase.functionConfig.Spec.Triggers["test-trigger"] err := suite.Platform.validateProcessingMode(triggerInstance, testCase.functionConfig) if testCase.expectedError == "" { From 135a9a5218ff40d66a1a7bfccdf6c6b365a1f11e Mon Sep 17 00:00:00 2001 From: rokatyy Date: Tue, 11 Mar 2025 12:44:41 +0000 Subject: [PATCH 4/4] comments --- pkg/functionconfig/types.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/functionconfig/types.go b/pkg/functionconfig/types.go index c34aaa99925..66d36bf1d03 100644 --- a/pkg/functionconfig/types.go +++ b/pkg/functionconfig/types.go @@ -96,7 +96,7 @@ type Trigger struct { Attributes map[string]interface{} `json:"attributes,omitempty"` Mode TriggerWorkMode `json:"mode,omitempty"` - AsyncConfig *AsyncConfig `json:"asyncConfig,omitempty"` + AsyncConfig *AsyncConfig `json:"async,omitempty"` // Deprecated: MaxWorkers is replaced by NumWorkers, and will be removed in 1.15.x // TODO: remove in 1.15.x