8000 sink: improve the performance of mysql sink by hongyunyan · Pull Request #1374 · pingcap/ticdc · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

sink: improve the performance of mysql sink #1374

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 35 commits into from
May 29, 2025

Conversation

hongyunyan
Copy link
Collaborator
@hongyunyan hongyunyan commented May 26, 2025

What problem does this PR solve?

Issue Number: close #xxx

What is changed and how it works?

  1. use db.ExecContext instead of db.Begin + tx.ExecContext + db.Commit to reduce the times of rtt in flush duration
  2. use ulimited channel instead of normal channel to get events easiler

Check List

Tests

  • Unit test
  • Integration test
  • Manual test (add detailed scripts or steps below)
  • No code

Questions

Will it cause performance regression or break compatibility?
Do you need to update user documentation, design documentation or monitoring documentation?

Release note

Please refer to [Release Notes Language Style Guide](https://pingcap.github.io/tidb-dev-guide/contribute-to-tidb/release-notes-style-guide.html) to write a quality release note.

If you don't think this PR needs a release note then fill it with `None`.

@ti-chi-bot ti-chi-bot bot added do-not-merge/needs-linked-issue do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. release-note Denotes a PR that will be considered when it comes time to generate release notes. size/S Denotes a PR that changes 10-29 lines, ignoring generated files. labels May 26, 2025
Copy link
@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hello @hongyunyan, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

Summary of Changes

Hello! Gemini here, providing a summary of this pull request titled "[WIP] for test". Based on the title and description, this appears to be a work-in-progress or test PR, as indicated by the [WIP] prefix and the placeholder description. However, the code changes clearly implement a specific feature: adding metrics to measure the time taken for a DML event to travel from the dispatcher to the sink worker. This involves adding a timestamp to the DML event when it's created or handled by the dispatcher and then recording the duration when the event is received by the sink.

Highlights

  • Metric for Event Sync Duration: A new Prometheus histogram metric, EventSyncDuration, has been introduced to track the time elapsed from when a DML event is recorded in the dispatcher until it is received by a sink worker.
  • Timestamping DML Events: A RecordTimestamp field has been added to the DMLEvent struct. This timestamp is populated in the dispatcher when the event is processed, allowing for duration calculation later in the pipeline.
  • Logging in MySQL Sink: Informational logs have been added in the MySQL sink worker to indicate when events are received and when batches are flushed, including the time cost for event sync.

Changelog

Click here to see the changelog
  • downstreamadapter/dispatcher/dispatcher.go
    • Added dml.RecordTimestamp = time.Now() to set the timestamp on the DML event when it's handled in the HandleEvents function.
  • downstreamadapter/sink/mysql/sink.go
    • Declared and initialized a new metric variable workerEventSyncDuration for the EventSyncDuration histogram.
    • Added workerEventSyncDuration to the list of metrics to be deleted in the defer function.
    • Added a log message and observed the workerEventSyncDuration using time.Since(txnEvent.RecordTimestamp) when receiving a transaction event in runDMLWriter.
    • Added a log message indicating when events are being flushed in runDMLWriter.
  • pkg/common/event/dml_event.go
    • Imported the time package.
    • Added a RecordTimestamp time.Time field to the DMLEvent struct, intended for metrics.
  • pkg/metrics/sink.go
    • Defined the new EventSyncDuration Prometheus histogram metric.
    • Registered the EventSyncDuration metric in the InitSinkMetrics function.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point in your pull request via creating an issue comment (i.e. comment on the pull request page) using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in issue comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist is currently in preview and may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments to provide feedback.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.


Events flow downstream,
Timestamps mark their journey,
Metrics watch the time.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces a new metric, EventSyncDuration, to measure the time an event takes to travel from the dispatcher to a MySQL sink worker. This is achieved by adding a RecordTimestamp to DMLEvent when it's handled by the dispatcher, and then calculating the duration in the sink worker. The changes look reasonable and address the goal of adding this new observability point. I have one suggestion regarding the Prometheus histogram buckets for the new metric.

Summary of Findings

  • Prometheus Histogram Buckets: The new EventSyncDuration histogram metric is defined without explicit buckets. This might lead to suboptimal data distribution if default buckets are not well-suited for the expected range of these durations. Suggestion made to consider defining custom buckets.
  • Testing: No new tests were added to verify the new timestamping logic or the metric collection. For production-quality code, unit tests ensuring RecordTimestamp is set and the metric is observed would be beneficial. (Not commented due to review settings, but important for consideration).
  • Pull Request Hygiene: The PR title [WIP] for test and the default PR description template should be updated to accurately reflect the changes and their purpose before merging. (Not commented due to review settings, but important for PR management).
  • Minor Performance Consideration: In downstreamadapter/sink/mysql/sink.go, time.Since(txnEvent.RecordTimestamp) is called twice (once for logging, once for metrics). This could be optimized by calling it once and reusing the result, though the impact is likely negligible. (Not commented due to review settings as it's low severity).

Merge Readiness

The core logic for adding the EventSyncDuration metric appears correct. However, the configuration of the Prometheus histogram (specifically, the buckets) should be reviewed to ensure the metric provides useful insights. Addressing the comment on histogram buckets would improve the quality of this new metric.

Additionally, while not commented on directly due to severity settings, consider adding tests for the new logic and updating the PR title/description for clarity before merging.

I am unable to approve this pull request. Please ensure other reviewers approve this code before merging, especially after addressing the feedback.

Comment on lines 119 to 125
EventSyncDuration = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Namespace: "ticdc",
Subsystem: "sink",
Name: "txn_event_sync_duration",
Help: "Event sync duration (s) for txn worker.",
}, []string{"namespace", "changefeed", "id"})

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The new histogram EventSyncDuration is defined without explicit buckets. Other duration-based histograms in this file, such as WorkerFlushDuration and WorkerTotalDuration, use custom prometheus.ExponentialBuckets.

Using the default Prometheus buckets might not provide the optimal granularity for analyzing these event sync durations, potentially making the metric less useful if the typical values fall into a narrow range or are very small.

Could we consider defining explicit buckets for EventSyncDuration? For instance, if the expected durations are in a similar range to worker flush times, buckets like prometheus.ExponentialBuckets(0.001, 2, 20) (covering 1ms to ~524s) could be a good starting point. What is the anticipated range for these event sync durations, and would custom buckets help in observing them more effectively?

Copy link
ti-chi-bot bot commented May 26, 2025

@gemini-code-assist[bot]: adding LGTM is restricted to approvers and reviewers in OWNERS files.

In response to this:

Code Review

This pull request introduces a new metric, EventSyncDuration, to measure the time an event takes to travel from the dispatcher to a MySQL sink worker. This is achieved by adding a RecordTimestamp to DMLEvent when it's handled by the dispatcher, and then calculatin 8000 g the duration in the sink worker. The changes look reasonable and address the goal of adding this new observability point. I have one suggestion regarding the Prometheus histogram buckets for the new metric.

Summary of Findings

  • Prometheus Histogram Buckets: The new EventSyncDuration histogram metric is defined without explicit buckets. This might lead to suboptimal data distribution if default buckets are not well-suited for the expected range of these durations. Suggestion made to consider defining custom buckets.
  • Testing: No new tests were added to verify the new timestamping logic or the metric collection. For production-quality code, unit tests ensuring RecordTimestamp is set and the metric is observed would be beneficial. (Not commented due to review settings, but important for consideration).
  • Pull Request Hygiene: The PR title [WIP] for test and the default PR description template should be updated to accurately reflect the changes and their purpose before merging. (Not commented due to review settings, but important for PR management).
  • Minor Performance Consideration: In downstreamadapter/sink/mysql/sink.go, time.Since(txnEvent.RecordTimestamp) is called twice (once for logging, once for metrics). This could be optimized by calling it once and reusing the result, though the impact is likely negligible. (Not commented due to review settings as it's low severity).

Merge Readiness

The core logic for adding the EventSyncDuration metric appears correct. However, the configuration of the Prometheus histogram (specifically, the buckets) should be reviewed to ensure the metric provides useful insights. Addressing the comment on histogram buckets would improve the quality of this new metric.

Additionally, while not commented on directly due to severity settings, consider adding tests for the new logic and updating the PR title/description for clarity before merging.

I am unable to approve this pull request. Please ensure other reviewers approve this code before merging, especially after addressing the feedback.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

@ti-chi-bot ti-chi-bot bot added size/M Denotes a PR that changes 30-99 lines, ignoring generated files. and removed size/S Denotes a PR that changes 10-29 lines, ignoring generated files. labels May 26, 2025
@ti-chi-bot ti-chi-bot bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels May 26, 2025
hongyunyan and others added 9 commits May 26, 2025 21:36
@ti-chi-bot ti-chi-bot bot added the size/M Denotes a PR that changes 30-99 lines, ignoring generated files. label May 28, 2025
@ti-chi-bot ti-chi-bot bot added size/L Denotes a PR that changes 100-499 lines, ignoring generated files. and removed size/M Denotes a PR that changes 30-99 lines, ignoring generated files. labels May 28, 2025
@hongyunyan hongyunyan changed the title [WIP] for test sink: improve the performance of mysql sink May 29, 2025
@ti-chi-bot ti-chi-bot bot removed the do-not-merge/work-in-progress Indicates that a PR should not merge because it is a work in progress. label May 29, 2025
@hongyunyan
Copy link
Collaborator Author

@gemini-code-assist

Copy link

Thanks for the tag, @hongyunyan.

Based on the current state of the pull request and the automated checks:

  1. My initial review comment highlighted a suggestion regarding the Prometheus histogram buckets for the new EventSyncDuration metric. It also mentioned considering adding tests and updating the PR title/description.
  2. The ti-chi-bot has reported failed tests.
  3. The ti-chi-bot has also notified about a missing linked issue number in the PR body, which is required.

To move this PR forward, please consider addressing these points:

  • Review the suggestion about the histogram buckets.
  • Address the root cause of the failed tests.
  • Update the PR body to include the linked issue number (e.g., Issue Number: close #<issue_number>).
  • Update the PR title and description to accurately reflect the changes.

Let me know if you have any specific questions about the feedback provided!

@hongyunyan
Copy link
Collaborator Author

/gemini review

6D47 Copy link
@gemini-code-assist gemini-code-assist bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces significant performance improvements to the MySQL sink. Key changes include:

  1. Reduced RTT for DML Execution: By switching from db.Begin() + tx.ExecContext() + tx.Commit() to a single db.ExecContext("BEGIN; ... ;COMMIT;") for DML batches, the number of round trips to the database is reduced, which should improve latency and throughput.
  2. Unlimited Channels for Event Handling: The use of chann.UnlimitedChannel for event buffering and retrieval (e.g., GetMultipleNoGroup) in the DML writer and conflict detector aims to streamline event flow and reduce potential blocking points.
  3. Improved Robustness: The change in utils/chann/unlimited_chann.go to log a warning instead of panicking when pushing to a closed channel is a good enhancement for stability.
  4. Refined Shutdown Logic: The ConflictDetector's Close method has been improved to ensure all resources, including output channels of resolved transaction caches, are properly closed, preventing potential deadlocks.

The changes are well-targeted for performance. I have one main point of feedback regarding the implementation of bounded behavior on top of the unlimited channels, which might have concurrency implications for strict size enforcement.

Summary of Findings

  • Concurrency in Bounded Cache Logic: The add methods in boundedTxnCache and boundedTxnCacheWithBlock (in downstreamadapter/sink/mysql/causality/txn_cache.go) use a non-atomic check-then-act pattern (Len() then Push()) on the UnlimitedChannel. This could lead to the upperSize being exceeded under concurrent access. Clarification on whether this is acceptable or if stricter bounding is needed is recommended.
  • Performance: DML Execution RTT Reduction: The change to use db.ExecContext("BEGIN; ... ;COMMIT;") in pkg/sink/mysql/mysql_writer_dml.go is a good strategy for reducing database round trips and should improve DML flushing performance.
  • Performance: Unlimited Channel Usage: Employing UnlimitedChannel and GetMultipleNoGroup in downstreamadapter/sink/mysql/sink.go and causality/txn_cache.go should enhance event processing throughput by allowing more flexible batching and reducing blocking.
  • Robustness: Handling Closed Unlimited Channel: In utils/chann/unlimited_chann.go, changing the behavior of Push on a closed channel from panic to log.Warn and return significantly improves the robustness of the channel implementation.

Merge Readiness

This pull request introduces valuable performance optimizations. However, there's a potential concurrency concern in txn_cache.go regarding how upperSize is enforced, which could lead to the cache size exceeding the intended bound under high contention. I recommend addressing or clarifying this point before merging. I am not authorized to approve pull requests, so please ensure other reviewers approve these changes after the feedback is addressed.

Comment on lines +78 to +81
if w.ch.Len() > w.upperSize {
return false
}
w.ch.Push(txn)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of add checks w.ch.Len() and then calls w.ch.Push(txn). While UnlimitedChannel.Len() and Push() are individually synchronized, this sequence of operations (check-then-act) is not atomic at the boundedTxnCache level.

If multiple goroutines call add concurrently, it's possible for w.ch.Len() to be less than or equal to w.upperSize for several goroutines, and then all of them proceed to Push, potentially causing the number of items in the channel to exceed w.upperSize.

Is this potential for exceeding upperSize under high concurrency acceptable, or is strict adherence to upperSize required? If strictness is paramount, an additional mutex within boundedTxnCache might be needed to protect this check-then-act sequence, for example:

// w.mu.Lock() // Assuming w.mu is a new sync.Mutex field in boundedTxnCache
// defer w.mu.Unlock()
// if w.ch.Len() > w.upperSize {
// 	return false
// }
// w.ch.Push(txn)
// return true

However, this would introduce more contention. Could you clarify the design intent here regarding the strictness of upperSize?

Comment on lines +106 to +110
if w.ch.Len() > w.upperSize {
w.isBlocked.CompareAndSwap(false, true)
return false
}
w.ch.Push(txn)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the boundedTxnCache.add method, the check w.ch.Len() > w.upperSize (line 106) followed by w.ch.Push(txn) (line 110) within the if !w.isBlocked.Load() block is not an atomic operation.

If multiple goroutines execute this section concurrently, they might all pass the length check before any of them push to the channel or set isBlocked. This could lead to the channel size temporarily exceeding w.upperSize.

Could you please review if this behavior is intended or if a stricter bounding mechanism (e.g., using a mutex for this critical section) is necessary for boundedTxnCacheWithBlock as well?

Copy link
ti-chi-bot bot commented May 29, 2025

@gemini-code-assist[bot]: adding LGTM is restricted to approvers and reviewers in OWNERS files.

In response to this:

Code Review

This pull request introduces significant performance improvements to the MySQL sink. Key changes include:

  1. Reduced RTT for DML Execution: By switching from db.Begin() + tx.ExecContext() + tx.Commit() to a single db.ExecContext("BEGIN; ... ;COMMIT;") for DML batches, the number of round trips to the database is reduced, which should improve latency and throughput.
  2. Unlimited Channels for Event Handling: The use of chann.UnlimitedChannel for event buffering and retrieval (e.g., GetMultipleNoGroup) in the DML writer and conflict detector aims to streamline event flow and reduce potential blocking points.
  3. Improved Robustness: The change in utils/chann/unlimited_chann.go to log a warning instead of panicking when pushing to a closed channel is a good enhancement for stability.
  4. Refined Shutdown Logic: The ConflictDetector's Close method has been improved to ensure all resources, including output channels of resolved transaction caches, are properly closed, preventing potential deadlocks.

The changes are well-targeted for performance. I have one main point of feedback regarding the implementation of bounded behavior on top of the unlimited channels, which might have concurrency implications for strict size enforcement.

Summary of Findings

  • Concurrency in Bounded Cache Logic: The add methods in boundedTxnCache and boundedTxnCacheWithBlock (in downstreamadapter/sink/mysql/causality/txn_cache.go) use a non-atomic check-then-act pattern (Len() then Push()) on the UnlimitedChannel. This could lead to the upperSize being exceeded under concurrent access. Clarification on whether this is acceptable or if stricter bounding is needed is recommended.
  • Performance: DML Execution RTT Reduction: The change to use db.ExecContext("BEGIN; ... ;COMMIT;") in pkg/sink/mysql/mysql_writer_dml.go is a good strategy for reducing database round trips and should improve DML flushing performance.
  • Performance: Unlimited Channel Usage: Employing UnlimitedChannel and GetMultipleNoGroup in downstreamadapter/sink/mysql/sink.go and causality/txn_cache.go should enhance event processing throughput by allowing more flexible batching and reducing blocking.
  • Robustness: Handling Closed Unlimited Channel: In utils/chann/unlimited_chann.go, changing the behavior of Push on a closed channel from panic to log.Warn and return significantly improves the robustness of the channel implementation.

Merge Readiness

This pull request introduces valuable performance optimizations. However, there's a potential concurrency concern in txn_cache.go regarding how upperSize is enforced, which could lead to the cache size exceeding the intended bound under high contention. I recommend addressing or clarifying this point before merging. I am not authorized to approve pull requests, so please ensure other reviewers approve these changes after the feedback is addressed.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository.

Copy link
ti-chi-bot bot commented May 29, 2025

[APPROVALNOTIFIER] This PR is APPROVED

This pull-request has been approved by: asddongmen

The full list of commands accepted by this bot can be found here.

The pull request process is described here

Needs approval from an approver in each of these files:

Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment

Copy link
ti-chi-bot bot commented May 29, 2025

[LGTM Timeline notifier]

Timeline:

  • 2025-05-29 07:55:22.804805803 +0000 UTC m=+91943.866823408: ☑️ agreed by asddongmen.

Copy link
ti-chi-bot bot commented May 29, 2025

@hongyunyan: The following test failed, say /retest to rerun all failed tests or /retest-required to rerun all mandatory failed tests:

Test name Commit Details Required Rerun command
pull-cdc-pulsar-integration-light e7c7912 link false /test pull-cdc-pulsar-integration-light

Full PR test history. Your PR dashboard.

Instructions for interacting with me using PR comments are available here. If you have questions or suggestions related to my behavior, please file an issue against the kubernetes-sigs/prow repository. I understand the commands that are listed here.

Copy link
ti-chi-bot bot commented May 29, 2025

[FORMAT CHECKER NOTIFICATION]

Notice: To remove the do-not-merge/needs-linked-issue label, please provide the linked issue number on one line in the PR body, for example: Issue Number: close #123 or Issue Number: ref #456.

📖 For more info, you can check the "Contribute Code" section in the development guide.

@ti-chi-bot ti-chi-bot bot merged commit 29a765f into pingcap:master May 29, 2025
14 of 15 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
approved lgtm release-note Denotes a PR that will be considered when it comes time to generate release notes. size/L Denotes a PR that changes 100-499 lines, ignoring generated files.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants
0