-
Notifications
You must be signed in to change notification settings - Fork 1.6k
feat(cdp): Improved ingestion consumer event runner #27671
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
# Conflicts: # plugin-server/src/main/pluginsServer.ts
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PR Summary
This PR implements a major refactor of the event processing pipeline, introducing a new EventPipelineRunnerV2
class that consolidates previously scattered logic into a more maintainable structure.
- Added new
EventPipelineRunnerV2
class in/plugin-server/src/ingestion/event-pipeline-runner/event-pipeline-runner.ts
with clear step-by-step processing - Created utility modules for timestamp handling (
timestamp-utils.ts
), heatmap processing (heatmaps.ts
), and element chain extraction (event-utils.ts
) - Implemented comparison mode via
INGESTION_CONSUMER_V2_COMPARISON_PERCENTAGE
config to safely validate the new implementation against the old one - Added comprehensive error handling with custom
EventDroppedError
class for expected failures - Improved AI event processing with better error handling and logging via structured logger
14 file(s) reviewed, 9 comment(s)
Edit PR Review Bot Settings | Greptile
plugin-server/src/cdp/hog-transformations/hog-transformer.service.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/ingestion/event-pipeline-runner/event-pipeline-runner.test.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/ingestion/event-pipeline-runner/event-pipeline-runner.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/ingestion/event-pipeline-runner/event-pipeline-runner.ts
Outdated
Show resolved
Hide resolved
plugin-server/src/ingestion/event-pipeline-runner/utils/timestamp-utils.ts
Show resolved
Hide resolved
…ne-runner.ts Co-authored-by: greptile-apps[bot] <165735046+greptile-apps[bot]@users.noreply.github.com>
# Conflicts: # plugin-server/src/cdp/hog-transformations/hog-transformer.service.ts # plugin-server/src/ingestion/ingestion-consumer.test.ts # plugin-server/src/ingestion/ingestion-consumer.ts # plugin-server/tests/main/ingestion-queues/each-batch.test.ts
# Conflicts: # plugin-server/src/ingestion/ingestion-consumer.ts
private async runEventRunnerV2(incomingEvent: IncomingEvent): Promise<RawKafkaEvent | undefined> { | ||
const runner = this.getEventPipelineRunnerV2(incomingEvent.event) | ||
|
||
try { | ||
return await runner.run() | ||
} catch (error) { | ||
// NOTE: If we error at this point we want to handle it gracefully and continue to process the scheduled promises | ||
await this.handleProcessingErrorV2(error, incomingEvent.message, incomingEvent.event) | ||
} | ||
|
||
runner?.getPromises().forEach((promise) => { | ||
// Schedule each promise with their own error handling | ||
// That way if all fail with ignoreable errors we continue but if any one fails with an unexpected error we can crash out | ||
this.scheduleWork(promise).catch((error) => { | ||
return this.handleProcessingErrorV2(error, incomingEvent.message, incomingEvent.event) | ||
}) | ||
}) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It appears the runEventRunnerV2
method doesn't return a value in all code paths. The method tries to return the result of runner.run()
, but if an error is caught, it handles the error without returning anything. After error handling, the code processes promises but doesn't return a value.
Consider adding an explicit return statement or restructuring the method to ensure it consistently returns a value of type RawKafkaEvent | undefined
in all code paths:
try {
return await runner.run();
} catch (error) {
await this.handleProcessingErrorV2(error, incomingEvent.message, incomingEvent.event);
return undefined; // Explicit return after error handling
}
Spotted by Diamond
Is this helpful? React 👍 or 👎 to let us know.
Problem
The runner code is a nightmare. I wanted to refactor this for a while now but given we have a new ingestion consumer this felt like the right time as we can run both codebases in parallel
Changes
👉 Stay up-to-date with PostHog coding conventions for a smoother review.
Does this work well for both Cloud and self-hosted?
How did you test this code?
Adds tests to cover: