8000 Step Functions: Support for Aliasing by MEPalma · Pull Request #12326 · localstack/localstack · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Step Functions: Support for Aliasing #12326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 121 additions & 0 deletions localstack-core/localstack/services/stepfunctions/backend/alias.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
from __future__ import annotations

import copy
import datetime
import random
import threading
from typing import Final, Optional

from localstack.aws.api.stepfunctions import (
AliasDescription,
Arn,
CharacterRestrictedName,
DescribeStateMachineAliasOutput,
RoutingConfigurationList,
StateMachineAliasListItem,
)


class Alias:
_mutex: Final[threading.Lock]
update_date: Optional[datetime.datetime]
name: Final[CharacterRestrictedName]
_description: Optional[AliasDescription]
_routing_configuration_list: RoutingConfigurationList
_state_machine_version_arns: list[Arn]
_execution_probability_distribution: list[int]
state_machine_alias_arn: Final[Arn]
create_date: datetime.datetime

def __init__(
self,
state_machine_arn: Arn,
name: CharacterRestrictedName,
description: Optional[AliasDescription],
routing_configuration_list: RoutingConfigurationList,
):
self._mutex = threading.Lock()
self.update_date = None
self.name = name
self._description = None
self.state_machine_alias_arn = f"{state_machine_arn}:{name}"
self.update(description=description, routing_configuration_list=routing_configuration_list)
self.create_date = self._get_mutex_date()

def __hash__(self):
return hash(self.state_machine_alias_arn)

def __eq__(self, other):
if isinstance(other, Alias):
return self.is_idempotent(other=other)
return False

def is_idempotent(self, other: Alias) -> bool:
return all(
[
self.state_machine_alias_arn == other.state_machine_alias_arn,
self.name == other.name,
self._description == other._description,
self._routing_configuration_list == other._routing_configuration_list,
]
)

@staticmethod
def _get_mutex 8000 _date() -> datetime.datetime:
return datetime.datetime.now(tz=datetime.timezone.utc)

def get_routing_configuration_list(self) -> RoutingConfigurationList:
return copy.deepcopy(self._routing_configuration_list)

def is_router_for(self, state_machine_version_arn: Arn) -> bool:
with self._mutex:
return state_machine_version_arn in self._state_machine_version_arns

def update(
self,
description: Optional[AliasDescription],
routing_configuration_list: RoutingConfigurationList,
) -> None:
with self._mutex:
self.update_date = self._get_mutex_date()

if description is not None:
self._description = description

if routing_configuration_list:
self._routing_configuration_list = routing_configuration_list
self._state_machine_version_arns = list()
self._execution_probability_distribution = list()
for routing_configuration in routing_configuration_list:
self._state_machine_version_arns.append(
routing_configuration["stateMachineVersionArn"]
)
self._execution_probability_distribution.append(routing_configuration["weight"])

def sample(self):
with self._mutex:
samples = random.choices(
self._state_machine_version_arns,
weights=self._execution_probability_distribution,
k=1,
)
state_machine_version_arn = samples[0]
return state_machine_version_arn

def to_description(self) -> DescribeStateMachineAliasOutput:
with self._mutex:
description = DescribeStateMachineAliasOutput(
creationDate=self.create_date,
name=self.name,
description=self._description,
routingConfiguration=self._routing_configuration_list,
stateMachineAliasArn=self.state_machine_alias_arn,
)
if self.update_date is not None:
description["updateDate"] = self.update_date
return description

def to_item(self) -> StateMachineAliasListItem:
return StateMachineAliasListItem(
stateMachineAliasArn=self.state_machine_alias_arn, creationDate=self.create_date
)
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ class Execution:
region_name: str

state_machine: Final[StateMachineInstance]
state_machine_arn: Final[Arn]
state_machine_version_arn: Final[Optional[Arn]]
state_machine_alias_arn: Final[Optional[Arn]]

start_date: Final[Timestamp]
input_data: Final[Optional[json]]
input_details: Final[Optional[CloudWatchEventsExecutionDataDetails]]
Expand Down Expand Up @@ -136,6 +140,7 @@ def __init__(
activity_store: dict[Arn, Activity],
input_data: Optional[json] = None,
trace_header: Optional[TraceHeader] = None,
state_machine_alias_arn: Optional[Arn] = None,
):
self.name = name
self.sm_type = sm_type
Expand All @@ -144,6 +149,13 @@ def __init__(
self.account_id = account_id
self.region_name = region_name
self.state_machine = state_machine
if isinstance(state_machine, StateMachineVersion):
self.state_machine_arn = state_machine.source_arn
self.state_machine_version_arn = state_machine.arn
else:
self.state_machine_arn = state_machine.arn
self.state_machine_version_arn = None
self.state_machine_alias_arn = state_machine_alias_arn
self.start_date = start_date
self._cloud_watch_logging_session = cloud_watch_logging_session
self.input_data = input_data
Expand All @@ -167,7 +179,7 @@ def to_start_output(self) -> StartExecutionOutput:
def to_describe_output(self) -> DescribeExecutionOutput:
describe_output = DescribeExecutionOutput(
executionArn=self.exec_arn,
stateMachineArn=self.state_machine.arn,
stateMachineArn=self.state_machine_arn,
name=self.name,
status=self.exec_status,
startDate=self.start_date,
Expand All @@ -183,6 +195,10 @@ def to_describe_output(self) -> DescribeExecutionOutput:
describe_output["error"] = self.error
if self.cause is not None:
describe_output["cause"] = self.cause
if self.state_machine_version_arn is not None:
describe_output["stateMachineVersionArn"] = self.state_machine_version_arn
if self.state_machine_alias_arn is not None:
describe_output["stateMachineAliasArn"] = self.state_machine_alias_arn
return describe_output

def to_describe_state_machine_for_execution_output(
Expand Down Expand Up @@ -231,6 +247,8 @@ def to_execution_list_item(self) -> ExecutionListItem:
)
if state_machine_version_arn is not None:
item["stateMachineVersionArn"] = state_machine_version_arn
if self.state_machine_alias_arn is not None:
item["stateMachineAliasArn"] = self.state_machine_alias_arn
return item

def to_history_output(self) -> GetExecutionHistoryOutput:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
from localstack.services.stepfunctions.asl.static_analyser.variable_references_static_analyser import (
VariableReferencesStaticAnalyser,
)
from localstack.services.stepfunctions.backend.alias import Alias
from localstack.utils.strings import long_uid


Expand Down Expand Up @@ -163,6 +164,7 @@ class StateMachineRevision(StateMachineInstance):
_next_version_number: int
versions: Final[dict[RevisionId, Arn]]
tag_manager: Final[TagManager]
aliases: Final[set[Alias]]

def __init__(
self,
Expand Down Expand Up @@ -194,6 +196,7 @@ def __init__(
self.tag_manager = TagManager()
if tags:
self.tag_manager.add_all(tags)
self.aliases = set()

def create_revision(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

from localstack.aws.api.stepfunctions import Arn
780C from localstack.services.stepfunctions.backend.activity import Activity
from localstack.services.stepfunctions.backend.alias import Alias
from localstack.services.stepfunctions.backend.execution import Execution
from localstack.services.stepfunctions.backend.state_machine import StateMachineInstance
from localstack.services.stores import AccountRegionBundle, BaseStore, LocalAttribute
Expand All @@ -11,6 +12,8 @@
class SFNStore(BaseStore):
# Maps ARNs to state machines.
state_machines: Final[dict[Arn, StateMachineInstance]] = LocalAttribute(default=dict)
# Map Alias ARNs to state machine aliases
aliases: Final[dict[Arn, Alias]] = LocalAttribute(default=dict)
# Maps Execution-ARNs to state machines.
executions: Final[dict[Arn, Execution]] = LocalAttribute(
default=OrderedDict
Expand Down
Loading
Loading
0