8000 Move OrchestrationMetadata to proto by JoshVanL · Pull Request #6 · dapr/durabletask-go · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Move OrchestrationMetadata to proto #6

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
189 changes: 9 additions & 180 deletions api/orchestration.go
< 5D39 /tr>
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,8 @@ package api
import (
"encoding/json"
"errors"
"fmt"
"time"

"github.com/dapr/durabletask-go/api/helpers"
"github.com/dapr/durabletask-go/api/protos"
"google.golang.org/protobuf/types/known/timestamppb"
"google.golang.org/protobuf/types/known/wrapperspb"
Expand Down Expand Up @@ -49,18 +47,6 @@ type OrchestrationIdReusePolicy = protos.OrchestrationIdReusePolicy
// InstanceID is a unique identifier for an orchestration instance.
type InstanceID string

type OrchestrationMetadata struct {
InstanceID InstanceID
Name string
RuntimeStatus protos.OrchestrationStatus
CreatedAt time.Time
LastUpdatedAt time.Time
SerializedInput string
SerializedOutput string
SerializedCustomStatus string
FailureDetails *protos.TaskFailureDetails
}

// NewOrchestrationOptions configures options for starting a new orchestration.
type NewOrchestrationOptions func(*protos.CreateInstanceRequest) error

Expand Down Expand Up @@ -110,9 +96,9 @@ func WithInput(input any) NewOrchestrationOptions {
}

// WithRawInput configures an input for the orchestration. The specified input must be a string.
func WithRawInput(rawInput string) NewOrchestrationOptions {
func WithRawInput(rawInput *wrapperspb.StringValue) NewOrchestrationOptions {
return func(req *protos.CreateInstanceRequest) error {
req.Input = wrapperspb.String(rawInput)
req.Input = rawInput
return nil
}
}
Expand Down Expand Up @@ -147,9 +133,9 @@ func WithEventPayload(data any) RaiseEventOptions {
}

// WithRawEventData configures an event payload that is a raw, unprocessed string (e.g. JSON data).
func WithRawEventData(data string) RaiseEventOptions {
func WithRawEventData(data *wrapperspb.StringValue) RaiseEventOptions {
return func(req *protos.RaiseEventRequest) error {
req.Input = wrapperspb.String(data)
req.Input = data
return nil
}
}
Expand All @@ -167,9 +153,9 @@ func WithOutput(data any) TerminateOptions {
}

// WithRawOutput configures a raw, unprocessed output (i.e. pre-serialized) for the terminated orchestration.
func WithRawOutput(data string) TerminateOptions {
func WithRawOutput(data *wrapperspb.StringValue) TerminateOptions {
return func(req *protos.TerminateRequest) error {
req.Output = wrapperspb.String(data)
req.Output = data
return nil
}
}
Expand All @@ -190,168 +176,11 @@ func WithRecursivePurge(recursive bool) PurgeOptions {
}
}

func NewOrchestrationMetadata(
iid InstanceID,
name string,
status protos.OrchestrationStatus,
createdAt time.Time,
lastUpdatedAt time.Time,
serializedInput string,
serializedOutput string,
serializedCustomStatus string,
failureDetails *protos.TaskFailureDetails,
) *OrchestrationMetadata {
return &OrchestrationMetadata{
InstanceID: iid,
Name: name,
RuntimeStatus: status,
CreatedAt: createdAt,
LastUpdatedAt: lastUpdatedAt,
SerializedInput: serializedInput,
SerializedOutput: serializedOutput,
SerializedCustomStatus: serializedCustomStatus,
FailureDetails: failureDetails,
}
}

func (m *OrchestrationMetadata) MarshalJSON() ([]byte, error) {
obj := make(map[string]any, 16)

// Required values
obj["id"] = m.InstanceID
obj["name"] = m.Name
obj["status"] = helpers.ToRuntimeStatusString(m.RuntimeStatus)
obj["createdAt"] = m.CreatedAt
obj["lastUpdatedAt"] = m.LastUpdatedAt

// Optional values
if m.SerializedInput != "" {
obj["serializedInput"] = m.SerializedInput
}
if m.SerializedOutput != "" {
obj["serializedOutput"] = m.SerializedOutput
}
if m.SerializedCustomStatus != "" {
obj["serializedCustomStatus"] = m.SerializedCustomStatus
}

// Optional failure details (recursive)
if m.FailureDetails != nil {
const fieldCount = 4
root := make(map[string]any, fieldCount)
current := root
f := m.FailureDetails
for {
current["type"] = f.ErrorType
current["message"] = f.ErrorMessage
if f.StackTrace != nil {
current["stackTrace"] = f.StackTrace.GetValue()
}
if f.InnerFailure == nil {
// base case
break
}
// recursive case
f = f.InnerFailure
inner := make(map[string]any, fieldCount)
current["innerFailure"] = inner
current = inner
}
obj["failureDetails"] = root
}
return json.Marshal(obj)
}

func (m *OrchestrationMetadata) UnmarshalJSON(data []byte) (err error) {
defer func() {
if r := recover(); r != nil {
if rerr, ok := r.(error); ok {
err = fmt.Errorf("failed to unmarshal the JSON payload: %w", rerr)
} else {
err = errors.New("failed to unmarshal the JSON payload")
}
}
}()

var obj map[string]any
if err := json.Unmarshal(data, &obj); err != nil {
return fmt.Errorf("failed to unmarshal orchestration metadata json: %w", err)
}

if id, ok := obj["id"]; ok {
m.InstanceID = InstanceID(id.(string))
} else {
return errors.New("missing 'id' field")
}
if name, ok := obj["name"]; ok {
m.Name = name.(string)
} else {
return errors.New("missing 'name' field")
}
if status, ok := obj["status"]; ok {
m.RuntimeStatus = helpers.FromRuntimeStatusString(status.(string))
} else {
return errors.New("missing 'name' field")
}
if createdAt, ok := obj["createdAt"]; ok {
if time, err := time.Parse(time.RFC3339, createdAt.(string)); err == nil {
m.CreatedAt = time
} else {
return errors.New("invalid 'createdAt' field: must be RFC3339 format")
}
} else {
return errors.New("missing 'createdAt' field")
}
if lastUpdatedAt, ok := obj["lastUpdatedAt"]; ok {
if time, err := time.Parse(time.RFC3339, lastUpdatedAt.(string)); err == nil {
m.LastUpdatedAt = time
} else {
return errors.New("invalid 'lastUpdatedAt' field: must be RFC3339 format")
}
} else {
return errors.New("missing 'lastUpdatedAt' field")
}
if input, ok := obj["serializedInput"]; ok {
m.SerializedInput = input.(string)
}
if output, ok := obj["serializedOutput"]; ok {
m.SerializedOutput = output.(string)
}
if output, ok := obj["serializedCustomStatus"]; ok {
m.SerializedCustomStatus = output.(string)
}

failureDetails, ok := obj["failureDetails"]
if ok {
m.FailureDetails = &protos.TaskFailureDetails{}
current := m.FailureDetails
obj = failureDetails.(map[string]any)
for {
current.ErrorType = obj["type"].(string)
current.ErrorMessage = obj["message"].(string)
if stackTrace, ok := obj["stackTrace"]; ok {
current.StackTrace = wrapperspb.String(stackTrace.(string))
}
if innerFailure, ok := obj["innerFailure"]; ok {
// recursive case
next := &protos.TaskFailureDetails{}
current.InnerFailure = next
current = next
obj = innerFailure.(map[string]any)
} else {
// base case
break
}
}
}
return nil
}

func (o *OrchestrationMetadata) IsRunning() bool {
return !o.IsComplete()
func OrchestrationMetadataIsRunning(o *protos.OrchestrationMetadata) bool {
return !OrchestrationMetadataIsComplete(o)
}

func (o *OrchestrationMetadata) IsComplete() bool {
func OrchestrationMetadataIsComplete(o *protos.OrchestrationMetadata) bool {
return o.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_COMPLETED ||
o.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_FAILED ||
o.RuntimeStatus == protos.OrchestrationStatus_ORCHESTRATION_STATUS_TERMINATED ||
Expand Down
Loading
Loading
0