8000 feat(cdp): Improved ingestion consumer event runner by benjackwhite · Pull Request #27671 · PostHog/posthog · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(cdp): Improved ingestion consumer event runner #27671

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 200 commits into
base: master
Choose a base branch
from

Conversation

benjackwhite
Copy link
Contributor
@benjackwhite benjackwhite commented Jan 18, 2025

Problem

The runner code is a nightmare. I wanted to refactor this for a while now but given we have a new ingestion consumer this felt like the right time as we can run both codebases in parallel

Changes

  • Copies all the "runner" stuff over into one simple class that just runs each "step" as normal functions.
  • Mostly meant to be 1-1 but there was also a lot of duplicate processing happening (might actually account for some things slowing down) so in these cases simplified it all
  • Modify the consumer to still us the runner.ts but compare the output to the new consumer

👉 Stay up-to-date with PostHog coding conventions for a smoother review.

Does this work well for both Cloud and self-hosted?

How did you test this code?

Adds tests to cover:

  • Handled errors like invalid UUID
  • Unhandled errors like kafka produce failures

@benjackwhite benjackwhite marked this pull request as ready for review April 12, 2025 18:37
@benjackwhite benjackwhite requested review from a team and meikelmosby and removed request for meikelmosby April 12, 2025 18:37
Copy link
Contributor
@greptile-apps greptile-apps bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PR Summary

This PR implements a major refactor of the event processing pipeline, introducing a new EventPipelineRunnerV2 class that consolidates previously scattered logic into a more maintainable structure.

  • Added new EventPipelineRunnerV2 class in /plugin-server/src/ingestion/event-pipeline-runner/event-pipeline-runner.ts with clear step-by-step processing
  • Created utility modules for timestamp handling (timestamp-utils.ts), heatmap processing (heatmaps.ts), and element chain extraction (event-utils.ts)
  • Implemented comparison mode via INGESTION_CONSUMER_V2_COMPARISON_PERCENTAGE config to safely validate the new implementation against the old one
  • Added comprehensive error handling with custom EventDroppedError class for expected failures
  • Improved AI event processing with better error handling and logging via structured logger

14 file(s) reviewed, 9 comment(s)
Edit PR Review Bot Settings | Greptile

Base automatically changed from feat/tidy-ingester to master April 15, 2025 08:33
# Conflicts:
#	plugin-server/src/cdp/hog-transformations/hog-transformer.service.ts
#	plugin-server/src/ingestion/ingestion-consumer.test.ts
#	plugin-server/src/ingestion/ingestion-consumer.ts
#	plugin-server/tests/main/ingestion-queues/each-batch.test.ts
@benjackwhite benjackwhite changed the title feat(cdp): Major refactor of event runner feat(cdp): Improved ingestion consumer event runner Apr 16, 2025
# Conflicts:
#	plugin-server/src/ingestion/ingestion-consumer.ts
Comment on lines +579 to +596
private async runEventRunnerV2(incomingEvent: IncomingEvent): Promise<RawKafkaEvent | undefined> {
const runner = this.getEventPipelineRunnerV2(incomingEvent.event)

try {
return await runner.run()
} catch (error) {
// NOTE: If we error at this point we want to handle it gracefully and continue to process the scheduled promises
await this.handleProcessingErrorV2(error, incomingEvent.message, incomingEvent.event)
}

runner?.getPromises().forEach((promise) => {
// Schedule each promise with their own error handling
// That way if all fail with ignoreable errors we continue but if any one fails with an unexpected error we can crash out
this.scheduleWork(promise).catch((error) => {
return this.handleProcessingErrorV2(error, incomingEvent.message, incomingEvent.event)
})
})
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It appears the runEventRunnerV2 method doesn't return a value in all code paths. The method tries to return the result of runner.run(), but if an error is caught, it handles the error without returning anything. After error handling, the code processes promises but doesn't return a value.

Consider adding an explicit return statement or restructuring the method to ensure it consistently returns a value of type RawKafkaEvent | undefined in all code paths:

try {
    return await runner.run();
} catch (error) {
    await this.handleProcessingErrorV2(error, incomingEvent.message, incomingEvent.event);
    return undefined; // Explicit return after error handling
}

Spotted by Diamond

Is this helpful? React 👍 or 👎 to let us know.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants
0