8000 Truncate execution error message in list execution calls by katrogan · Pull Request #523 · flyteorg/flyteadmin · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
This repository was archived by the owner on Oct 9, 2023. It is now read-only.

Truncate execution error message in list execution calls #523

Merged
merged 2 commits into from
Feb 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 8 additions & 8 deletions pkg/manager/impl/execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (m *ExecutionManager) getInheritedExecMetadata(ctx context.Context, request
}
sourceExecutionID = sourceExecutionModel.ID
requestSpec.Metadata.Principal = sourceExecutionModel.User
sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel)
sourceExecution, err := transformers.FromExecutionModel(*sourceExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Errorf(ctx, "Failed transform parent execution model for child execution [%+v] with err: %v", workflowExecutionID, err)
return parentNodeExecutionID, sourceExecutionID, err
Expand Down Expand Up @@ -951,7 +951,7 @@ func (m *ExecutionManager) RelaunchExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel)
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1008,7 +1008,7 @@ func (m *ExecutionManager) RecoverExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err %v", request, err)
return nil, err
}
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel)
existingExecution, err := transformers.FromExecutionModel(*existingExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -1059,7 +1059,7 @@ func (m *ExecutionManager) emitScheduledWorkflowMetrics(
return
}
// Find the reference launch plan to get the kickoff time argument
execution, err := transformers.FromExecutionModel(*executionModel)
execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Warningf(context.Background(),
"failed to transform execution model when emitting scheduled workflow execution stats with for "+
Expand Down Expand Up @@ -1302,7 +1302,7 @@ func (m *ExecutionManager) GetExecution(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
execution, transformerErr := transformers.FromExecutionModel(*executionModel)
execution, transformerErr := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
if transformerErr != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id,
transformerErr)
Expand Down Expand Up @@ -1345,7 +1345,7 @@ func (m *ExecutionManager) GetExecutionData(
logger.Debugf(ctx, "Failed to get execution model for request [%+v] with err: %v", request, err)
return nil, err
}
execution, err := transformers.FromExecutionModel(*executionModel)
execution, err := transformers.FromExecutionModel(*executionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "Failed to transform execution model [%+v] to proto object with err: %v", request.Id, err)
return nil, err
Expand Down Expand Up @@ -1445,7 +1445,7 @@ func (m *ExecutionManager) ListExecutions(
logger.Debugf(ctx, "Failed to list executions using input [%+v] with err %v", listExecutionsInput, err)
return nil, err
}
executionList, err := transformers.FromExecutionModels(output.Executions)
executionList, err := transformers.FromExecutionModels(output.Executions, transformers.ListExecutionTransformerOptions)
if err != nil {
logger.Errorf(ctx,
"Failed to transform execution models [%+v] with err: %v", output.Executions, err)
Expand Down Expand Up @@ -1475,7 +1475,7 @@ func (m *ExecutionManager) ListExecutions(
func (m *ExecutionManager) publishNotifications(ctx context.Context, request admin.WorkflowExecutionEventRequest,
execution models.Execution) error {
// Notifications are stored in the Spec object of an admin.Execution object.
adminExecution, err := transformers.FromExecutionModel(execution)
adminExecution, err := transformers.FromExecutionModel(execution, transformers.DefaultExecutionTransformerOptions)
if err != nil {
// This shouldn't happen because execution manager marshaled the data into models.Execution.
m.systemMetrics.TransformerError.Inc()
Expand Down
10 changes: 5 additions & 5 deletions pkg/manager/impl/node_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (m *NodeExecutionManager) CreateNodeEvent(ctx context.Context, request admi
// Handles making additional database calls, if necessary, to populate IsParent & IsDynamic data using the historical pattern of
// preloading child node executions. Otherwise, simply calls transform on the input model.
func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context, nodeExecutionModel models.NodeExecution,
nodeExecutionID *core.NodeExecutionIdentifier) (*admin.NodeExecution, error) {
nodeExecutionID *core.NodeExecutionIdentifier, opts *transformers.ExecutionTransformerOptions) (*admin.NodeExecution, error) {
internalData, err := transformers.GetNodeExecutionInternalData(nodeExecutionModel.InternalData)
if err != nil {
return nil, err
Expand All @@ -323,7 +323,7 @@ func (m *NodeExecutionManager) transformNodeExecutionModel(ctx context.Context,
}
}

nodeExecution, err := transformers.FromNodeExecutionModel(nodeExecutionModel)
nodeExecution, err := transformers.FromNodeExecutionModel(nodeExecutionModel, opts)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution model [%+v] to proto with err: %v", nodeExecutionID, err)
return nil, err
Expand All @@ -341,7 +341,7 @@ func (m *NodeExecutionManager) transformNodeExecutionModelList(ctx context.Conte
Name: nodeExecutionModel.Name,
},
NodeId: nodeExecutionModel.NodeID,
})
}, transformers.ListExecutionTransformerOptions)
if err != nil {
return nil, err
}
Expand All @@ -362,7 +362,7 @@ func (m *NodeExecutionManager) GetNodeExecution(
request.Id, err)
return nil, err
}
nodeExecution, err := m.transformNodeExecutionModel(ctx, *nodeExecutionModel, request.Id)
nodeExecution, err := m.transformNodeExecutionModel(ctx, *nodeExecutionModel, request.Id, nil)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -498,7 +498,7 @@ func (m *NodeExecutionManager) GetNodeExecutionData(
return nil, err
}

nodeExecution, err := transformers.FromNodeExecutionModel(*nodeExecutionModel)
nodeExecution, err := transformers.FromNodeExecutionModel(*nodeExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "failed to transform node execution model [%+v] when fetching data: %v", request.Id, err)
return nil, err
Expand Down
10 changes: 6 additions & 4 deletions pkg/manager/impl/node_execution_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"github.com/flyteorg/flyteadmin/pkg/repositories/transformers"

"github.com/flyteorg/flyteadmin/pkg/manager/impl/util"

genModel "github.com/flyteorg/flyteadmin/pkg/repositories/gen/models"
Expand Down Expand Up @@ -444,7 +446,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
manager := NodeExecutionManager{
db: repository,
}
nodeExecution, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID)
nodeExecution, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID, transformers.DefaultExecutionTransformerOptions)
assert.NoError(t, err)
assert.True(t, proto.Equal(nodeExecID, nodeExecution.Id))
assert.True(t, nodeExecution.Metadata.IsParentNode)
Expand Down Expand Up @@ -474,7 +476,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
Closure: closureBytes,
NodeExecutionMetadata: nodeExecutionMetadataBytes,
InternalData: internalDataBytes,
}, nodeExecID)
}, nodeExecID, transformers.DefaultExecutionTransformerOptions)
assert.NoError(t, err)
assert.True(t, nodeExecution.Metadata.IsParentNode)
assert.True(t, nodeExecution.Metadata.IsDynamic)
Expand All @@ -485,7 +487,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
}
_, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{
InternalData: []byte("i'm invalid"),
}, nodeExecID)
}, nodeExecID, transformers.DefaultExecutionTransformerOptions)
assert.NotNil(t, err)
assert.Equal(t, err.(flyteAdminErrors.FlyteAdminError).Code(), codes.Internal)
})
Expand All @@ -500,7 +502,7 @@ func TestTransformNodeExecutionModel(t *testing.T) {
manager := NodeExecutionManager{
db: repository,
}
_, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID)
_, err := manager.transformNodeExecutionModel(ctx, models.NodeExecution{}, nodeExecID, transformers.DefaultExecutionTransformerOptions)
assert.Equal(t, err, expectedErr)
})
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/manager/impl/task_execution_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ func (m *TaskExecutionManager) GetTaskExecution(
if err != nil {
return nil, err
}
taskExecution, err := transformers.FromTaskExecutionModel(*taskExecutionModel)
taskExecution, err := transformers.FromTaskExecutionModel(*taskExecutionModel, transformers.DefaultExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "Failed to transform task execution model [%+v] to proto: %v", request.Id, err)
return nil, err
Expand Down Expand Up @@ -284,7 +284,7 @@ func (m *TaskExecutionManager) ListTaskExecutions(
return nil, err
}

taskExecutionList, err := transformers.FromTaskExecutionModels(output.TaskExecutions)
taskExecutionList, err := transformers.FromTaskExecutionModels(output.TaskExecutions, transformers.ListExecutionTransformerOptions)
if err != nil {
logger.Debugf(ctx, "failed to transform task execution models for request [%+v] with err: %v", request, err)
return nil, err
Expand Down
24 changes: 21 additions & 3 deletions pkg/repositories/transformers/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ import (
"k8s.io/apimachinery/pkg/util/sets"
)

const trimmedErrMessageLen = 100

var clusterReassignablePhases = sets.NewString(core.WorkflowExecution_UNDEFINED.String(), core.WorkflowExecution_QUEUED.String())

// CreateExecutionModelInput encapsulates request parameters for calls to CreateExecutionModel.
Expand All @@ -44,6 +46,15 @@ type CreateExecutionModelInput struct {
LaunchEntity core.ResourceType
}

type ExecutionTransformerOptions struct {
TrimErrorMessage bool
}

var DefaultExecutionTransformerOptions = &ExecutionTransformerOptions{}
var ListExecutionTransformerOptions = &ExecutionTransformerOptions{
TrimErrorMessage: true,
}

// CreateExecutionModel transforms a ExecutionCreateRequest to a Execution model
func CreateExecutionModel(input CreateExecutionModelInput) (*models.Execution, error) {
requestSpec := input.RequestSpec
Expand Down Expand Up @@ -305,7 +316,7 @@ func GetExecutionIdentifier(executionModel *models.Execution) core.WorkflowExecu
}
}

func FromExecutionModel(executionModel models.Execution) (*admin.Execution, error) {
func FromExecutionModel(executionModel models.Execution, opts *ExecutionTransformerOptions) (*admin.Execution, error) {
var spec admin.ExecutionSpec
var err error
if err = proto.Unmarshal(executionModel.Spec, &spec); err != nil {
Expand All @@ -315,6 +326,13 @@ func FromExecutionModel(executionModel models.Execution) (*admin.Execution, erro
if err = proto.Unmarshal(executionModel.Closure, &closure); err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen]
closure.OutputResult = &admin.ExecutionClosure_Error{
Error: trimmedErrOutputResult,
}
}

if closure.StateChangeDetails == nil {
// Update execution state details from model for older executions
Expand Down Expand Up @@ -362,10 +380,10 @@ func PopulateDefaultStateChangeDetails(executionModel models.Execution) (*admin.
}, nil
}

func FromExecutionModels(executionModels []models.Execution) ([]*admin.Execution, error) {
func FromExecutionModels(executionModels []models.Execution, opts *ExecutionTransformerOptions) ([]*admin.Execution, error) {
executions := make([]*admin.Execution, len(executionModels))
for idx, executionModel := range executionModels {
execution, err := FromExecutionModel(executionModel)
execution, err := FromExecutionModel(executionModel, opts)
if err != nil {
return nil, err
}
Expand Down
38 changes: 34 additions & 4 deletions pkg/repositories/transformers/execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -528,7 +528,7 @@ func TestFromExecutionModel(t *testing.T) {
StartedAt: &startedAt,
State: &stateInt,
}
execution, err := FromExecutionModel(executionModel)
execution, err := FromExecutionModel(executionModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.Execution{
Id: &core.WorkflowExecutionIdentifier{
Expand Down Expand Up @@ -556,19 +556,49 @@ func TestFromExecutionModel_Aborted(t *testing.T) {
AbortCause: abortCause,
Closure: executionClosureBytes,
}
execution, err := FromExecutionModel(executionModel)
execution, err := FromExecutionModel(executionModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.Equal(t, core.WorkflowExecution_ABORTED, execution.Closure.Phase)
assert.True(t, proto.Equal(&admin.AbortMetadata{
Cause: abortCause,
}, execution.Closure.GetAbortMetadata()))

executionModel.Phase = core.WorkflowExecution_RUNNING.String()
execution, err = FromExecutionModel(executionModel)
execution, err = FromExecutionModel(executionModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.Empty(t, execution.Closure.GetAbortCause())
}

func TestFromExecutionModel_Error(t *testing.T) {
extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen))
execErr := &core.ExecutionError{
Code: "CODE",
Message: extraLongErrMsg,
Kind: core.ExecutionError_USER,
}
executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_FAILED,
OutputResult: &admin.ExecutionClosure_Error{Error: execErr},
})
executionModel := models.Execution{
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
Phase: core.WorkflowExecution_FAILED.String(),
Closure: executionClosureBytes,
}
execution, err := FromExecutionModel(executionModel, &ExecutionTransformerOptions{
TrimErrorMessage: true,
})
expectedExecErr := execErr
expectedExecErr.Message = string(make([]byte, trimmedErrMessageLen))
assert.Nil(t, err)
assert.Equal(t, core.WorkflowExecution_FAILED, execution.Closure.Phase)
assert.True(t, proto.Equal(expectedExecErr, execution.Closure.GetError()))
}

func TestFromExecutionModels(t *testing.T) {
spec := testutils.GetExecutionRequest().Spec
specBytes, _ := proto.Marshal(spec)
Expand Down Expand Up @@ -611,7 +641,7 @@ func TestFromExecutionModels(t *testing.T) {
State: &stateInt,
},
}
executions, err := FromExecutionModels(executionModels)
executions, err := FromExecutionModels(executionModels, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.Len(t, executions, 1)
assert.True(t, proto.Equal(&admin.Execution{
Expand Down
9 changes: 8 additions & 1 deletion pkg/repositories/transformers/node_execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -278,12 +278,19 @@ func UpdateNodeExecutionModel(
return nil
}

func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution) (*admin.NodeExecution, error) {
func FromNodeExecutionModel(nodeExecutionModel models.NodeExecution, opts *ExecutionTransformerOptions) (*admin.NodeExecution, error) {
var closure admin.NodeExecutionClosure
err := proto.Unmarshal(nodeExecutionModel.Closure, &closure)
if err != nil {
return nil, errors.NewFlyteAdminErrorf(codes.Internal, "failed to unmarshal closure")
}
if closure.GetError() != nil && opts != nil && opts.TrimErrorMessage && len(closure.GetError().Message) > 0 {
trimmedErrOutputResult := closure.GetError()
trimmedErrOutputResult.Message = trimmedErrOutputResult.Message[0:trimmedErrMessageLen]
closure.OutputResult = &admin.NodeExecutionClosure_Error{
Error: trimmedErrOutputResult,
}
}

var nodeExecutionMetadata admin.NodeExecutionMetaData
err = proto.Unmarshal(nodeExecutionModel.NodeExecutionMetadata, &nodeExecutionMetadata)
Expand Down
39 changes: 36 additions & 3 deletions pkg/repositories/transformers/node_execution_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ func TestFromNodeExecutionModel(t *testing.T) {
NodeExecutionMetadata: nodeExecutionMetadataBytes,
InputURI: "input uri",
Duration: duration,
})
}, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.NodeExecution{
Id: &nodeExecutionIdentifier,
Expand All @@ -500,6 +500,39 @@ func TestFromNodeExecutionModel(t *testing.T) {
}, nodeExecution))
}

func TestFromNodeExecutionModel_Error(t *testing.T) {
extraLongErrMsg := string(make([]byte, 2*trimmedErrMessageLen))
execErr := &core.ExecutionError{
Code: "CODE",
Message: extraLongErrMsg,
Kind: core.ExecutionError_USER,
}
executionClosureBytes, _ := proto.Marshal(&admin.ExecutionClosure{
Phase: core.WorkflowExecution_FAILED,
OutputResult: &admin.ExecutionClosure_Error{Error: execErr},
})
nodeExecution, err := FromNodeExecutionModel(models.NodeExecution{
NodeExecutionKey: models.NodeExecutionKey{
NodeID: "nodey",
ExecutionKey: models.ExecutionKey{
Project: "project",
Domain: "domain",
Name: "name",
},
},
Closure: executionClosureBytes,
NodeExecutionMetadata: nodeExecutionMetadataBytes,
InputURI: "input uri",
Duration: duration,
}, &ExecutionTransformerOptions{TrimErrorMessage: true})
assert.Nil(t, err)

expectedExecErr := execErr
expectedExecErr.Message = string(make([]byte, trimmedErrMessageLen))
assert.Nil(t, err)
assert.True(t, proto.Equal(expectedExecErr, nodeExecution.Closure.GetError()))
}

func TestFromNodeExecutionModelWithChildren(t *testing.T) {
nodeExecutionIdentifier := core.NodeExecutionIdentifier{
NodeId: "nodey",
Expand Down Expand Up @@ -536,7 +569,7 @@ func TestFromNodeExecutionModelWithChildren(t *testing.T) {
}
t.Run("dynamic workflow", func(t *testing.T) {
nodeExecModel.DynamicWorkflowRemoteClosureReference = "dummy_dynamic_worklfow_ref"
nodeExecution, err := FromNodeExecutionModel(nodeExecModel)
nodeExecution, err := FromNodeExecutionModel(nodeExecModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.NodeExecution{
Id: &nodeExecutionIdentifier,
Expand All @@ -552,7 +585,7 @@ func TestFromNodeExecutionModelWithChildren(t *testing.T) {
})
t.Run("non dynamic workflow", func(t *testing.T) {
nodeExecModel.DynamicWorkflowRemoteClosureReference = ""
nodeExecution, err := FromNodeExecutionModel(nodeExecModel)
nodeExecution, err := FromNodeExecutionModel(nodeExecModel, DefaultExecutionTransformerOptions)
assert.Nil(t, err)
assert.True(t, proto.Equal(&admin.NodeExecution{
Id: &nodeExecutionIdentifier,
Expand Down
Loading
0