-
Notifications
You must be signed in to change notification settings - Fork 22
puller: fix resolved ts calculation #1411
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @lidezhu, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
Summary of Changes
Hello! Gemini-code-assist has analyzed this pull request titled "puller: fix resolved ts calculation" authored by @lidezhu. Based on the provided title and the code changes, the primary intent of this PR appears to be to improve the correctness and robustness of how resolved timestamps are handled within the log puller and event store components. The changes introduce explicit checks and logging to identify potential issues with event timestamps being older than the current resolved timestamp, add filtering logic in the event store to prevent processing such events, and refactor the internal heap structure used for tracking resolved timestamps across different regions and ranges. This work seems crucial for ensuring data consistency and proper progress tracking in the changefeed process.
Highlights
- Resolved Timestamp Validation and Logging: Introduced new warning logs and checks in
regionEventHandler
andsubscriptionClient
to detect scenarios where event commit timestamps are less than or equal to the region's resolved timestamp, or where a newly received resolved timestamp is less than the previously recorded one for a region. This helps in identifying potential issues with the timestamp flow. - Event Filtering in Event Store: Modified the
eventStore
to include the subscription's resolved timestamp when queuing events. Before writing events to the underlying Pebble DB, the code now explicitly filters out any events whose commit timestamp is strictly less than the resolved timestamp associated with the event batch. This prevents processing stale data. - Range Lock Heap Refactor: Replaced the standard library's
container/heap
implementation with a custom heap implementation (utils/heap
) within theRangeLock
component. This involved adding heap-specific fields and methods (heapIndex
,SetHeapIndex
,GetHeapIndex
,LessThan
) to theLockedRangeState
struct and updating theRangeLock
methods (NewRangeLock
,UnlockRange
,GetHeapMinTs
,tryLockRange
, and a newUpdateLockedRangeStateHeap
) to correctly interact with the new heap structure. - Debug Information for Resolved TS: Added a
debugInfo
struct to thesubscriptionClient
to maintain a map tracking the latest resolved timestamp for each region within a subscription. This map is used by the new logging checks to compare incoming event timestamps and resolved timestamps against the last known values.
Changelog
Click here to see the changelog
- logservice/eventstore/event_store.go
- Added
filterTs
field to theeventWithCallback
struct to carry the resolved timestamp. - Removed a warning log in
consumeKVEvents
that checked commitTs against resolvedTs. - Added the current resolvedTs as
filterTs
when pushing events to the event channel. - Implemented filtering logic in
writeEvents
to skip KVs with commitTs less than the event'sfilterTs
before writing to Pebble DB.
- Added
- logservice/logpuller/region_event_handler.go
- Imported the
math
package. - Added a warning log in
Handle
if the minimum commitTs in thekvEventsCache
is less than the new resolvedTs. - Added a warning log in
handleEventEntries
if a committed entry's CommitTs is less than or equal to the region's last resolvedTs. - Removed a commented-out line related to appending
kvEvents
. - Added a call to
span.rangeLock.UpdateLockedRangeStateHeap
inhandleResolvedTs
to update the heap when a region's resolvedTs changes.
- Imported the
- logservice/logpuller/regionlock/range_ts_map.go
- Imported the
go.uber.org/zap
package. - Added an info log in
getMinTs
when an unlocked range is found during the minimum timestamp calculation.
- Imported the
- logservice/logpuller/regionlock/region_range_lock.go
- Removed the import of
container/heap
. - Imported the custom
github.com/pingcap/ticdc/utils/heap
. - Added
heapIndex
,SetHeapIndex
,GetHeapIndex
, andLessThan
methods toLockedRangeState
to make it compatible with the custom heap. - Removed the old
rangeLockEntryHeap
type and its associated methods. - Replaced the
rangeLockEntryHeap
field inRangeLock
withlockedRangeStateHeap
using the new custom heap type. - Updated the heap initialization in
NewRangeLock
to useheap.NewHeap
. - Updated the heap removal logic in
UnlockRange
to usel.lockedRangeStateHeap.Remove
. - Added the
UpdateLockedRangeStateHeap
method to add or update aLockedRangeState
in the heap. - Updated
GetHeapMinTs
to usel.lockedRangeStateHeap.PeekTop
to get the minimum timestamp. - Updated the heap insertion logic in
tryLockRange
to usel.lockedRangeStateHeap.AddOrUpdate
.
- Removed the import of
- logservice/logpuller/regionlock/region_range_lock_test.go
- Added a helper function
updateLockedRangeResolvedTs
to simplify updating a locked range's resolvedTs and the heap. - Updated
TestRangeLockGetHeapMinTs
to use the new helper function for setting resolved timestamps.
- Added a helper function
- logservice/logpuller/subscription_client.go
- Added a
debugInfo
struct with a mutex andresolvedTsMap
to track region resolved timestamps per subscription. - Initialized the
debugInfo.resolvedTsMap
inNewSubscriptionClient
. - Added debug logging within
pushRegionEventToDS
to warn about decreasing region resolvedTs or committed event commitTs less than or equal to the region's resolvedTs.
- Added a
- pkg/eventservice/event_broker.go
- Removed a blank line in the
doScan
function.
- Removed a blank line in the
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request primarily focuses on fixing the resolved timestamp calculation for event filtering in the puller, and refactors the heap implementation within the region lock mechanism. The core change involves introducing a filterTs
to eventWithCallback
, ensuring events are filtered against the resolved timestamp active at the time of their reception. The heap refactoring to a generic implementation is a good improvement for maintainability.
Overall, the changes look reasonable and address the intended fix. There are a couple of points to discuss, particularly a TODO comment and the new debug logging.
Summary of Findings
- TODO for Filtering Logic: A TODO comment in
eventstore/event_store.go
questions whether events withcommitTs == filterTs
should be filtered. This needs clarification as it impacts event processing semantics. - Debug Logging Performance and Verbosity: New debug logging in
logpuller/subscription_client.go
might have performance implications due to locking and operations on a hot path. Its intended use (temporary debug vs. permanent diagnostic) and log levels should be confirmed. - Heap Refactoring: The refactoring of
rangeLockEntryHeap
inlogpuller/regionlock/region_range_lock.go
to use a generic heap (utils/heap.Heap
) is a good improvement for code maintainability and reusability. - Resolved Timestamp Filtering Fix: The core logic of capturing
resolvedTs
asfilterTs
at event reception and using it for filtering inwriteEvents
appears to correctly address issues with resolved timestamp calculations for event filtering. - Potentially Noisy Logging (Not Commented Due to Severity Settings): In
logservice/logpuller/regionlock/range_ts_map.go
,getMinTs
now includeslog.Info("getMinTs with unlocked range", ...)
. This could be verbose if frequently triggered. This was not commented on directly due to the 'low' severity and review settings.
Merge Readiness
The pull request introduces a key fix for resolved timestamp calculation and a beneficial heap refactoring. Before merging, I recommend addressing the TODO comment regarding the filtering logic and clarifying the intent and potential impact of the new debug logging in subscription_client.go
. Once these points are clarified or addressed, the PR should be in good shape for merging. As an AI assistant, I am not authorized to approve pull requests; please ensure further review and approval from team members.
@gemini-code-assist[bot]: adding LGTM is restricted to approvers and reviewers in OWNERS files. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
@lidezhu: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: asddongmen The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
What problem does this PR solve?
Issue Number: close #1395 #1382
What is changed and how it works?
Use a new heap library and update heap when resolved ts changes every time.
Check List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note