8000 [ESM] Add backoff between Stream Poller retries by gregfurman · Pull Request #12281 · localstack/localstack · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[ESM] Add backoff between Stream Poller retries #12281

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Feb 19, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import logging
import threading
from abc import abstractmethod
from datetime import datetime
from typing import Iterator
Expand Down Expand Up @@ -28,6 +29,7 @@
)
from localstack.services.lambda_.event_source_mapping.pollers.sqs_poller import get_queue_url
from localstack.utils.aws.arns import parse_arn, s3_bucket_name
from localstack.utils.backoff import ExponentialBackoff
from localstack.utils.strings import long_uid

LOG = logging.getLogger(__name__)
Expand All @@ -47,6 +49,9 @@ class StreamPoller(Poller):
# The ARN of the processor (e.g., Pipe ARN)
partner_resource_arn: str | None

# Used for backing-off between retries and breaking the retry loop
_is_shutdown: threading.Event

def __init__(
self,
source_arn: str,
Expand All @@ -62,6 +67,8 @@ def __init__(
self.shards = {}
self.iterator_over_shards = None

self._is_shutdown = threading.Event()

@abstractmethod
def transform_into_events(self, records: list[dict], shard_id) -> list[dict]:
pass
Expand Down Expand Up @@ -104,6 +111,9 @@ def format_datetime(self, time: datetime) -> str:
def get_sequence_number(self, record: dict) -> str:
pass

def close(self):
self._is_shutdown.set()

def pre_filter(self, events: list[dict]) -> list[dict]:
return events

Expand Down Expand Up @@ -187,9 +197,23 @@ def poll_events_from_shard(self, shard_id: str, shard_iterator: str):
# TODO: think about how to avoid starvation of other shards if one shard runs into infinite retries
attempts = 0
error_payload = {}
while not abort_condition and not self.max_retries_exceeded(attempts):

boff = ExponentialBackoff(max_retries=attempts)
while (
not abort_condition
and not self.max_retries_exceeded(attempts)
and not self._is_shutdown.is_set()
):
try:
if attempts > 0:
# TODO: Should we always backoff (with jitter) before processing since we may not want multiple pollers
# all starting up and polling simultaneously
# For example: 500 persisted ESMs starting up and requesting concurrently could flood gateway
self._is_shutdown.wait(boff.next_backoff())

self.processor.process_events_batch(events)
boff.reset()

# Update shard iterator if execution is successful
self.shards[shard_id] = get_records_response["NextShardIterator"]
return
Expand Down
Loading
0