-
-
Notifications
You must be signed in to change notification settings - Fork 4.1k
[ESM] Add backoff between Stream Poller retries #12281
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py
Outdated
Show resolved
Hide resolved
05becff
to
1476ae8
Compare
LocalStack Community integration with Pro 2 files ± 0 2 suites ±0 1h 29m 18s ⏱️ - 21m 56s Results for commit dbcdde6. ± Comparison against base commit 9bc5bc9. This pull request removes 993 tests.
♻️ This comment has been updated with latest results. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice application of the backoff utility 👍
It would be great to highlight the motivation on why we are doing this from a customer perspective (as discussed and raised by @dfangl ) in future PR descriptions.
localstack-core/localstack/services/lambda_/event_source_mapping/pollers/stream_poller.py
Show resolved
Hide resolved
if not arrival_timestamp_of_last_event: | ||
return False | ||
|
||
now = get_current_time().timestamp() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are we handling timezones correctly here?
def pre_filter(self, events: list[dict]) -> list[dict]: | ||
return events | ||
|
||
def post_filter(self, events: list[dict]) -> list[dict]: | ||
return events | ||
|
||
def has_record_expired(self, event: dict): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's an untested feature for which we should ideally add a test (which is a bit annoying because of the 60s minimum https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-lambda-eventsourcemapping.html#cfn-lambda-eventsourcemapping-maximumrecordageinseconds). Can we at least add a backlog item for this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm happy to revert these changes and make a follow-up PR with a test instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is blocking for one of my PRs (as the logs are so massive they cannot reasonably be printed due to the retries), would be nice to get this in a separate PR, as there seem to be some confusion still.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good, thanks for the fix! No more 500k log lines after the function was deleted :)
Motivation
This PR leverages the backoff util to ensure, when retrying, we wait between retries so as to not spam the processing loop.
Companion PR to #12264
Changes
has_record_expired
every iteration to allow for an expired record to break processing.backoff
utility, in conjunction with athreading.Event
, to exponentially back-off between processing retries.