-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix(groups): Group Flush should handle MessageSizeTooLargeError #33585
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
Added error handling for MessageSizeTooLarge errors in group flush logic to prevent infinite retries on oversized Kafka messages.
- Modified
BatchWritingGroupStore
to properly handle MessageSizeTooLarge errors during group updates, generating ingestion warnings instead of retrying - Enhanced
promiseRetry
utility to support non-retriable errors through newnonRetriableErrorTypes
parameter - Added test coverage for MessageSizeTooLarge error scenarios in group property updates
- Fixed production issue where large group properties could cause system instability due to continuous retry attempts
3 files reviewed, 2 comments
Edit PR Review Bot Settings | Greptile
plugin-server/src/worker/ingestion/groups/batch-writing-group-store.test.ts
Outdated
Show resolved
Hide resolved
logger.debug('🚫', `failed ${name}, non-retriable error encountered`, { error }) | ||
return Promise.reject(error) | ||
} | ||
|
||
logger.debug('🔁', `failed ${name}, retrying`, { error }) | ||
const nextInterval = Math.min( | ||
retryIntervalMillis * defaultRetryConfig.BACKOFF_FACTOR, | ||
defaultRetryConfig.MAX_INTERVAL | ||
) | ||
await new Promise((resolve) => setTimeout(resolve, retryIntervalMillis)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
style: Use the sleep utility function defined in utils.ts for consistency instead of raw Promise timeout
await new Promise((resolve) => setTimeout(resolve, retryIntervalMillis)< 8000 span class="pl-kos">) | |
await sleep(retryIntervalMillis) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Small comment, but LGTM, feel free to merge.
Think you may have not posted the comment 😅 |
): Promise<T> { | ||
if (retries <= 0) { | ||
logger.error('🚨', `Final retry failure for ${name}`, { previousError }) | ||
return Promise.reject(previousError) | ||
} | ||
return fn().catch(async (error) => { | ||
// Check if error is non-retriable | ||
if (nonRetriableErrorTypes && nonRetriableErrorTypes.some((ErrorType) => error instanceof ErrorType)) { | ||
logger.debug('🚫', `failed ${name}, non-retriable error encountered`, { error }) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
question: wdyt about bumping it to the warn level? Could be useful to see those errors while troubleshooting
Oops, don't know where it went - reposted 😅 |
Important
👉 Stay up-to-date with PostHog coding conventions for a smoother review.
Problem
We had an issue in production, as the flush logic did not handle KafkaMessageTooLarge errors, which are non-transient and should produce an ingestion warning message. This PR fixes this
Changes
Did you write or update any docs for this change?
How did you test this code?