-
Notifications
You must be signed in to change notification settings - Fork 22
sink: improve the performance of mysql sink #1374
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!
Summary of Changes
Hello! Gemini here, providing a summary of this pull request titled "[WIP] for test". Based on the title and description, this appears to be a work-in-progress or test PR, as indicated by the [WIP]
prefix and the placeholder description. However, the code changes clearly implement a specific feature: adding metrics to measure the time taken for a DML event to travel from the dispatcher to the sink worker. This involves adding a timestamp to the DML event when it's created or handled by the dispatcher and then recording the duration when the event is received by the sink.
Highlights
- Metric for Event Sync Duration: A new Prometheus histogram metric,
EventSyncDuration
, has been introduced to track the time elapsed from when a DML event is recorded in the dispatcher until it is received by a sink worker. - Timestamping DML Events: A
RecordTimestamp
field has been added to theDMLEvent
struct. This timestamp is populated in the dispatcher when the event is processed, allowing for duration calculation later in the pipeline. - Logging in MySQL Sink: Informational logs have been added in the MySQL sink worker to indicate when events are received and when batches are flushed, including the time cost for event sync.
Changelog
Click here to see the changelog
- downstreamadapter/dispatcher/dispatcher.go
- Added
dml.RecordTimestamp = time.Now()
to set the timestamp on the DML event when it's handled in theHandleEvents
function.
- Added
- downstreamadapter/sink/mysql/sink.go
- Declared and initialized a new metric variable
workerEventSyncDuration
for theEventSyncDuration
histogram. - Added
workerEventSyncDuration
to the list of metrics to be deleted in the defer function. - Added a log message and observed the
workerEventSyncDuration
usingtime.Since(txnEvent.RecordTimestamp)
when receiving a transaction event inrunDMLWriter
. - Added a log message indicating when events are being flushed in
runDMLWriter
.
- Declared and initialized a new metric variable
- pkg/common/event/dml_event.go
- Imported the
time
package. - Added a
RecordTimestamp time.Time
field to theDMLEvent
struct, intended for metrics.
- Imported the
- pkg/metrics/sink.go
- Defined the new
EventSyncDuration
Prometheus histogram metric. - Registered the
EventSyncDuration
metric in theInitSinkMetrics
function.
- Defined the new
Using Gemini Code Assist
The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.
Invoking Gemini
You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command>
or @gemini-code-assist <command>
. Below is a summary of the supported commands.
Feature | Command | Description |
---|---|---|
Code Review | /gemini review |
Performs a code review for the current pull request in its current state. |
Pull Request Summary | /gemini summary |
Provides a summary of the current pull request in its current state. |
Comment | @gemini-code-assist | Responds in comments when explicitly tagged, both in issue comments and review comments. |
Help | /gemini help |
Displays a list of available commands. |
Customization
To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/
folder in the base of the repository. Detailed instructions can be found here.
Limitations & Feedback
Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.
You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.
Events flow downstream,
Timestamps mark their journey,
Metrics watch the time.
Footnotes
-
Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution. ↩
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces a new metric, EventSyncDuration
, to measure the time an event takes to travel from the dispatcher to a MySQL sink worker. This is achieved by adding a RecordTimestamp
to DMLEvent
when it's handled by the dispatcher, and then calculating the duration in the sink worker. The changes look reasonable and address the goal of adding this new observability point. I have one suggestion regarding the Prometheus histogram buckets for the new metric.
Summary of Findings
- Prometheus Histogram Buckets: The new
EventSyncDuration
histogram metric is defined without explicit buckets. This might lead to suboptimal data distribution if default buckets are not well-suited for the expected range of these durations. Suggestion made to consider defining custom buckets. - Testing: No new tests were added to verify the new timestamping logic or the metric collection. For production-quality code, unit tests ensuring
RecordTimestamp
is set and the metric is observed would be beneficial. (Not commented due to review settings, but important for consideration). - Pull Request Hygiene: The PR title
[WIP] for test
and the default PR description template should be updated to accurately reflect the changes and their purpose before merging. (Not commented due to review settings, but important for PR management). - Minor Performance Consideration: In
downstreamadapter/sink/mysql/sink.go
,time.Since(txnEvent.RecordTimestamp)
is called twice (once for logging, once for metrics). This could be optimized by calling it once and reusing the result, though the impact is likely negligible. (Not commented due to review settings as it's low severity).
Merge Readiness
The core logic for adding the EventSyncDuration
metric appears correct. However, the configuration of the Prometheus histogram (specifically, the buckets) should be reviewed to ensure the metric provides useful insights. Addressing the comment on histogram buckets would improve the quality of this new metric.
Additionally, while not commented on directly due to severity settings, consider adding tests for the new logic and updating the PR title/description for clarity before merging.
I am unable to approve this pull request. Please ensure other reviewers approve this code before merging, especially after addressing the feedback.
pkg/metrics/sink.go
Outdated
EventSyncDuration = prometheus.NewHistogramVec( | ||
prometheus.HistogramOpts{ | ||
Namespace: "ticdc", | ||
Subsystem: "sink", | ||
Name: "txn_event_sync_duration", | ||
Help: "Event sync duration (s) for txn worker.", | ||
}, []string{"namespace", "changefeed", "id"}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The new histogram EventSyncDuration
is defined without explicit buckets. Other duration-based histograms in this file, such as WorkerFlushDuration
and WorkerTotalDuration
, use custom prometheus.ExponentialBuckets
.
Using the default Prometheus buckets might not provide the optimal granularity for analyzing these event sync durations, potentially making the metric less useful if the typical values fall into a narrow range or are very small.
Could we consider defining explicit buckets for EventSyncDuration
? For instance, if the expected durations are in a similar range to worker flush times, buckets like prometheus.ExponentialBuckets(0.001, 2, 20)
(covering 1ms to ~524s) could be a good starting point. What is the anticipated range for these event sync durations, and would custom buckets help in observing them more effectively?
@gemini-code-assist[bot]: adding LGTM is restricted to approvers and reviewers in OWNERS files. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
Signed-off-by: dongmen <414110582@qq.com>
Signed-off-by: dongmen <414110582@qq.com>
Thanks for the tag, @hongyunyan. Based on the current state of the pull request and the automated checks:
To move this PR forward, please consider addressing these points:
Let me know if you have any specific questions about the feedback provided! |
/gemini review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code Review
This pull request introduces significant performance improvements to the MySQL sink. Key changes include:
- Reduced RTT for DML Execution: By switching from
db.Begin() + tx.ExecContext() + tx.Commit()
to a singledb.ExecContext("BEGIN; ... ;COMMIT;")
for DML batches, the number of round trips to the database is reduced, which should improve latency and throughput. - Unlimited Channels for Event Handling: The use of
chann.UnlimitedChannel
for event buffering and retrieval (e.g.,GetMultipleNoGroup
) in the DML writer and conflict detector aims to streamline event flow and reduce potential blocking points. - Improved Robustness: The change in
utils/chann/unlimited_chann.go
to log a warning instead of panicking when pushing to a closed channel is a good enhancement for stability. - Refined Shutdown Logic: The
ConflictDetector
'sClose
method has been improved to ensure all resources, including output channels of resolved transaction caches, are properly closed, preventing potential deadlocks.
The changes are well-targeted for performance. I have one main point of feedback regarding the implementation of bounded behavior on top of the unlimited channels, which might have concurrency implications for strict size enforcement.
Summary of Findings
- Concurrency in Bounded Cache Logic: The
add
methods inboundedTxnCache
andboundedTxnCacheWithBlock
(indownstreamadapter/sink/mysql/causality/txn_cache.go
) use a non-atomic check-then-act pattern (Len()
thenPush()
) on theUnlimitedChannel
. This could lead to theupperSize
being exceeded under concurrent access. Clarification on whether this is acceptable or if stricter bounding is needed is recommended. - Performance: DML Execution RTT Reduction: The change to use
db.ExecContext("BEGIN; ... ;COMMIT;")
inpkg/sink/mysql/mysql_writer_dml.go
is a good strategy for reducing database round trips and should improve DML flushing performance. - Performance: Unlimited Channel Usage: Employing
UnlimitedChannel
andGetMultipleNoGroup
indownstreamadapter/sink/mysql/sink.go
andcausality/txn_cache.go
should enhance event processing throughput by allowing more flexible batching and reducing blocking. - Robustness: Handling Closed Unlimited Channel: In
utils/chann/unlimited_chann.go
, changing the behavior ofPush
on a closed channel frompanic
tolog.Warn
andreturn
significantly improves the robustness of the channel implementation.
Merge Readiness
This pull request introduces valuable performance optimizations. However, there's a potential concurrency concern in txn_cache.go
regarding how upperSize
is enforced, which could lead to the cache size exceeding the intended bound under high contention. I recommend addressing or clarifying this point before merging. I am not authorized to approve pull requests, so please ensure other reviewers approve these changes after the feedback is addressed.
if w.ch.Len() > w.upperSize { | ||
return false | ||
} | ||
w.ch.Push(txn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current implementation of add
checks w.ch.Len()
and then calls w.ch.Push(txn)
. While UnlimitedChannel.Len()
and Push()
are individually synchronized, this sequence of operations (check-then-act) is not atomic at the boundedTxnCache
level.
If multiple goroutines call add
concurrently, it's possible for w.ch.Len()
to be less than or equal to w.upperSize
for several goroutines, and then all of them proceed to Push
, potentially causing the number of items in the channel to exceed w.upperSize
.
Is this potential for exceeding upperSize
under high concurrency acceptable, or is strict adherence to upperSize
required? If strictness is paramount, an additional mutex within boundedTxnCache
might be needed to protect this check-then-act sequence, for example:
// w.mu.Lock() // Assuming w.mu is a new sync.Mutex field in boundedTxnCache
// defer w.mu.Unlock()
// if w.ch.Len() > w.upperSize {
// return false
// }
// w.ch.Push(txn)
// return true
However, this would introduce more contention. Could you clarify the design intent here regarding the strictness of upperSize
?
if w.ch.Len() > w.upperSize { | ||
w.isBlocked.CompareAndSwap(false, true) | ||
return false | ||
} | ||
w.ch.Push(txn) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similar to the boundedTxnCache.add
method, the check w.ch.Len() > w.upperSize
(line 106) followed by w.ch.Push(txn)
(line 110) within the if !w.isBlocked.Load()
block is not an atomic operation.
If multiple goroutines execute this section concurrently, they might all pass the length check before any of them push to the channel or set isBlocked
. This could lead to the channel size temporarily exceeding w.upperSize
.
Could you please review if this behavior is intended or if a stricter bounding mechanism (e.g., using a mutex for this critical section) is necessary for boundedTxnCacheWithBlock
as well?
@gemini-code-assist[bot]: adding LGTM is restricted to approvers and reviewers in OWNERS files. In response to this:
Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: asddongmen The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
[LGTM Timeline notifier]Timeline:
|
@hongyunyan: The following test failed, say
Full PR test history. Your PR dashboard. Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here. |
[FORMAT CHECKER NOTIFICATION] Notice: To remove the 📖 For more info, you can check the "Contribute Code" section in the development guide. |
What problem does this PR solve?
Issue Number: close #xxx
What is changed and how it works?
db.Begin + tx.ExecContext + db.Commit
to reduce the times of rtt in flush durationCheck List
Tests
Questions
Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?
Release note