-
Notifications
You must be signed in to change notification settings - Fork 22
eventBroker: support split update event that UK changed #1375
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
eventBroker: support split update event that UK changed #1375
Conversation
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
…ate-in-eventbroker
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Skipping CI for Draft Pull Request. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @asddongmen, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
Summary of Changes
Gemini or gemini-code-assist here, providing a summary for this pull request. This PR introduces support for 'split updates' within the event store. The core idea is to treat certain update events, specifically those that modify a unique key, as a DELETE
followed by an INSERT
. This is crucial for ensuring correctness in downstream systems that might rely on unique key constraints or specific event ordering for updates.
The changes involve adding a new function type (ShouldSpiltUpdateChecker
) to the EventStore
interface and its implementation to allow the event scanning logic to determine if a given update event needs to be split. A new helper function IsUKChanged
is added to check if a unique key has been modified in an update event. The RawKVEntry
struct gains a SplitUpdate
method to perform the actual conversion of an update into a delete and insert pair.
Furthermore, the event scanning logic in event_scanner.go
has undergone a significant refactor. The main scanning function is broken down into smaller, more focused methods and helper types (scanSession
, limitChecker
, eventMerger
, dmlProcessor
, errorHandler
) to improve readability, maintainability, and testability. Error handling in event_broker.go
and event_service.go
is also improved by changing panics to error logging and returns.
Overall, this PR adds the necessary infrastructure to handle unique key updates as split events and substantially refactors the event scanning component.
Highlights
- Split Update Support: Introduces the concept and implementation for splitting certain update
RawKVEntry
events into aDELETE
followed by anINSERT
. - Unique Key Change Detection: Adds a function (
IsUKChanged
) to detect if an update event modifies a unique key, which is the current criteria for splitting the update. - Event Scanning Refactor: Significantly refactors the
eventScanner
logic into smaller, more modular components and methods for improved code structure and testability. - Improved Error Handling: Changes panic calls to error logging and returns in event broker and service registration logic.
Changelog
Click here to see the changelog
- logservice/eventstore/event_store.go
- Added
ShouldSpiltUpdateChecker
type for checking if an update should be split (line 58). - Added
shouldSpiltUpdateChecker
parameter toRegisterDispatcher
method (line 70, 349). - Modified
consumeKVEvents
to use theshouldSpiltUpdateChecker
andkv.SplitUpdate()
to process events (lines 444-458).
- Added
- pkg/common/event/chunk.go
- Removed a
TODO: handle old value
comment (line 94).
- Removed a
- pkg/common/event/codec.go
- Added imports for
log
andzap
(lines 21, 30). - Added
IsUKChanged
function to check if unique key columns have changed in an updateRawKVEntry
(lines 221-245).
- Added imports for
- pkg/common/event/dml_event.go
- Added
NewBatchDMLEvent
constructor (lines 49-53). - Added
Init
method toBatchDMLEvent
(lines 57-63). - Added
AddDMLEvent
method toBatchDMLEvent
(lines 84-99). - Added
NewDMLEvent
constructor (lines 320-327). - Added
SetRows
method toDMLEvent
(lines 331-332).
- Added
- pkg/common/kv_entry.go
- Added
SplitUpdate
method toRawKVEntry
to convert an update into a delete and put pair (lines 71-94).
- Added
- pkg/eventservice/dispatcher_stat.go
- Added
shouldSpiltUpdate
method to check if an update should be split based on unique key changes (lines 245-255).
- Added
- pkg/eventservice/event_broker.go
- Changed panic to log.Error and return true in
tickTableTriggerDispatchers
on DDL fetch failure (line 314). - Renamed
scanLimit
fields to lowercasemaxBytes
andtimeout
(lines 513-514). - Changed panic to log.Error and return in
doScan
on scanner error (line 519). - Changed
addDispatcher
return type toerror
(line 862). - Passed
dispatcher.shouldSpiltUpdate
as a parameter toc.eventStore.RegisterDispatcher
(lines 900-902). - Changed panics to log.Error and return err in
addDispatcher
(lines 906, 917, 930, 940). - Added
return nil
inaddDispatcher
on success or non-reusable case (lines 886, 925, 961). - Added FIXME comment and ignored error in
resetDispatcher
when callingaddDispatcher
(line 1035).
- Changed panic to log.Error and return true in
- pkg/eventservice/event_scanner.go
- Introduced
eventGetter
andschemaGetter
interfaces (lines 34-43). - Renamed
scanLimit
fields to lowercasemaxBytes
andtimeout
(lines 48-49). - Refactored
eventScanner
to useeventGetter
andschemaGetter
(lines 55-56). - Renamed
Scan
method to privatescan
(line 106). - Major refactor of
scan
logic into helper types (scanSession
,limitChecker
,eventMerger
,dmlProcessor
,errorHandler
) and methods (fetchDDLEvents
,getEventIterator
,handleEmptyIterator
,closeIterator
,scanAndMergeEvents
,checkScanConditions
,handleNewTransaction
,finalizeScan
,interruptScan
) (lines 112-574).
- Introduced
- pkg/eventservice/event_scanner_test.go
- Updated test cases to call the renamed
scanner.scan
method (lines 74, 102, 128, 187, 221, 244, 340, 368, 424). - Updated
scanLimit
field names to lowercasemaxBytes
andtimeout
(lines 69-70, 99-100, 125-126, 184-185, 218-219, 241-242, 337-338, 365-366, 402-403). - Adjusted test assertions to check for
BatchDMLEvent
type and the number ofDMLEvent
s within the batch (lines 133-143, 251-257, 265-268).
- Updated test cases to call the renamed
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
An update arrives, a change so grand,
But unique keys shift across the land.
Split it in two, a delete, a write,
Ensuring downstream sees the light.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces an important feature to support splitting update events when a unique key changes, which is crucial for certain downstream consumers. The changes are quite extensive, especially the significant and commendable refactoring of the event_scanner.go
file. This refactoring greatly improves the modularity, readability, and testability of the event scanning logic by breaking it down into smaller, manageable components like scanSession
, eventMerger
, dmlProcessor
, etc.
Additionally, the consistent replacement of log.Panic
with log.Error
and proper error propagation throughout the modified files enhances the robustness of the system.
While the overall direction and implementation are good, there are a few areas, primarily around error handling in new logic paths and addressing existing FIXME
comments, that should be reviewed to ensure correctness and completeness before merging.
Summary of Findings
- Error Handling for
shouldSpiltUpdateChecker
: Inlogservice/eventstore/event_store.go
, ifshouldSpiltUpdateChecker
returns an error, it's logged, but the event processing continues as if no split is needed. This could lead to data inconsistencies if an event that should be split isn't. (Severity: High) - Robustness of
IsUKChanged
: Inpkg/common/event/codec.go
,IsUKChanged
uses string comparison for datums, which might not be robust for all data types. It also usesGetIndexColumns()
, which needs clarification if it's specific enough for UK changes. (Severity: Medium) - Unhandled
FIXME
for Chunk Full Condition: Inpkg/common/event/dml_event.go
, aFIXME
regarding checking if a chunk is full (chk.isFull
) inAddDMLEvent
is critical. If not handled, it could lead to panics or data corruption. (Severity: High) - Unhandled
FIXME
for Error inresetDispatcher
: Inpkg/eventservice/event_broker.go
, an error fromaddDispatcher
during aresetDispatcher
operation is ignored. This could leave the dispatcher in an inconsistent state. (Severity: High) - Unhandled
FIXME
for Error inregisterDispatcher
: Inpkg/eventservice/event_service.go
, an error fromaddDispatcher
during initial registration is logged but not fully propagated for handling by a manager component. (Severity: High) - Logging Message Content: In
pkg/common/event/codec.go
, a log message contains "fizz", which seems like a placeholder. (Severity: Low, not commented directly due to settings) - Potentially Unused Code: In
pkg/common/event/dml_event.go
,AppendDMLEvent
is marked with aFIXME
indicating it might be unused old code. (Severity: Low, not commented directly due to settings)
Merge Readiness
This pull request makes significant positive changes, especially with the refactoring of event_scanner.go
and improved error handling. However, there are several high
severity issues identified, primarily related to FIXME
comments that point to potentially critical unhandled error conditions or logic gaps (e.g., chunk full condition, error handling during dispatcher registration/reset, and error handling in the new shouldSpiltUpdateChecker
logic).
It is strongly recommended that these high
severity issues, and ideally the medium
severity ones concerning the robustness of IsUKChanged
, be addressed before merging to ensure the stability and correctness of the new features. I am unable to approve pull requests, but based on this review, changes are requested.
// FIXME: Send message to the dispatcherManager to handle the error. | ||
err := c.addDispatcher(info) | ||
if err != nil { | ||
log.Error("add dispatcher to eventBroker failed", zap.Stringer("dispatcherID", info.GetID()), zap.Error(err)) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The FIXME
comment here highlights that an error from c.addDispatcher(info)
is logged but not fully handled (e.g., by notifying dispatcherManager
). If adding a dispatcher to the event broker fails, the dispatcher won't receive events, which could be a critical issue for the changefeed.
What is the intended mechanism for dispatcherManager
to handle such failures? Should this error be propagated back to the client that initiated the registration, or is there an internal retry/recovery mechanism planned?
@gemini-code-assist[bot]: adding LGTM is restricted to approvers and reviewers in OWNERS files. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
…ate-in-eventStore
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
/test all |
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
/test all |
[LGTM Timeline notifier]Timeline:
|
/retest-required |
/retest |
/test pull-cdc-kafka-integration-light |
Signed-off-by: dongmen <414110582@qq.com>
/test pull-cdc-kafka-integration-light |
/test pull-cdc-kafka-integration-light |
Signed-off-by: dongmen <414110582@qq.com>
…ate-in-eventStore
/test pull-cdc-mysql-integration-light |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: 3AceShowHand, hongyunyan The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
@asddongmen: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
What problem does this PR solve?
Issue Number: close #1307
What is changed and how it works?
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note