8000 eventBroker: support split update event that UK changed by asddongmen · Pull Request #1375 · pingcap/ticdc · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

eventBroker: support split update event that UK changed #1375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6475c40
eventService: add unit test case
asddongmen May 23, 2025
d8d1902
eventService: remove all panics
asddongmen May 26, 2025
2c34ea1
Merge remote-tracking branch 'upstream/master' into support-split-upd…
asddongmen May 26, 2025
6d515d9
eventService: adjust eventScanner
asddongmen May 26, 2025
ad58cd7
eventService: refactor eventScanner
asddongmen May 26, 2025
bad9a03
eventStore: split update event that uk changed
asddongmen May 26, 2025
14a98b5
Merge remote-tracking branch 'upstream/master' into support-split-upd…
asddongmen May 28, 2025
159e3f1
eventStore: rever unnecessary changes
asddongmen May 28, 2025
d9267be
eventBroker: fix some verbose codes
asddongmen May 28, 2025
3195ee3
eventBroker: add log to help debug
asddongmen May 28, 2025
6aef01c
eventBroker: split change uk update events
asddongmen May 29, 2025
0468a62
event: fix ut
asddongmen May 29, 2025
bc57e69
event: rename a file
asddongmen May 29, 2025
3cbab52
event: fix failed ut
asddongmen May 29, 2025
6dd1afa
event: fix failed ut 2
asddongmen May 29, 2025
fd44871
event: add more ut
asddongmen May 29, 2025
250e9a3
eventservice: add more ut
asddongmen May 30, 2025
7391601
eventservice: add more uts
asddongmen May 30, 2025
85d88db
make fmt
asddongmen May 30, 2025
3888e31
add more ut
asddongmen May 30, 2025
df86eb0
make fmt
asddongmen May 30, 2025
ada6a3e
make fmt
asddongmen May 30, 2025
00589a7
make fmt 2
asddongmen May 30, 2025
5f06b99
Merge remote-tracking branch 'upstream/master' into support-split-upd…
asddongmen Jun 4, 2025
4ae8c90
fix failed ut
asddongmen Jun 4, 2025
216b7a2
make fmt
asddongmen Jun 4, 2025
5f7ffce
fix error
asddongmen Jun 5, 2025
73d3793
add debug log
asddongmen Jun 6, 2025
7cc1d75
fix a bug
asddongmen Jun 9, 2025
5269a7f
add ut
asddongmen Jun 9, 2025
a31b730
remove debug log
asddongmen Jun 9, 2025
1f2b774
Merge remote-tracking branch 'upstream/master' into support-split-upd…
asddongmen Jun 9, 2025
0bd0ab9
fix a bug
asddongmen Jun 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ UT_PACKAGES_DISPATCHER := ./pkg/sink/cloudstorage/... ./pkg/sink/mysql/... ./pkg
UT_PACKAGES_MAINTAINER := ./maintainer/...
UT_PACKAGES_COORDINATOR := ./coordinator/...
UT_PACKAGES_LOGSERVICE := ./logservice/...
UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./pkg/version/... ./utils/dynstream/...
UT_PACKAGES_OTHERS := ./pkg/eventservice/... ./pkg/version/... ./utils/dynstream/... ./pkg/common/event/...

include tools/Makefile

Expand Down
1 change: 1 addition & 0 deletions downstreamadapter/dispatcher/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ func (d *Dispatcher) HandleEvents(dispatcherEvents []DispatcherEvent, wakeCallba
for _, dispatcherEvent := range dispatcherEvents {
log.Debug("dispatcher receive all event",
zap.Stringer("dispatcher", d.id),
zap.String("eventType", commonEvent.TypeToString(dispatcherEvent.Event.GetType())),
zap.Any("event", dispatcherEvent.Event))
failpoint.Inject("HandleEventsSlowly", func() {
lag := time.Duration(rand.Intn(5000)) * time.Millisecond
Expand Down
4 changes: 3 additions & 1 deletion downstreamadapter/dispatchermanager/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -267,9 +267,11 @@ func (h *SchedulerDispatcherRequestHandler) Handle(eventDispatcherManager *Event
if err != nil {
select {
case eventDispatcherManager.errCh <- err:
log.Error("new dispatcher meet error", zap.String("ChangefeedID", eventDispatcherManager.changefeedID.String()),
zap.Error(err))
default:
log.Error("error channel is full, discard error",
zap.Any("ChangefeedID", eventDispatcherManager.changefeedID.String()),
zap.String("ChangefeedID", eventDispatcherManager.changefeedID.String()),
zap.Error(err))
}
}
Expand Down
15 changes: 8 additions & 7 deletions downstreamadapter/eventcollector/dispatcher_stat.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,17 +84,18 @@ func (d *dispatcherStat) isEventFromCurrentEventService(event dispatcher.Dispatc

// isEventSeqValid check whether there are any events being dropped
func (d *dispatcherStat) isEventSeqValid(event dispatcher.DispatcherEvent) bool {
log.Debug("check event sequence",
zap.String("changefeedID", d.target.GetChangefeedID().ID().String()),
zap.Stringer("dispatcher", d.target.GetId()),
zap.Int("eventType", event.GetType()),
zap.Uint64("receivedSeq", event.GetSeq()),
zap.Uint64("lastEventSeq", d.lastEventSeq.Load()),
zap.Uint64("commitTs", event.GetCommitTs()))
switch event.GetType() {
case commonEvent.TypeDMLEvent,
commonEvent.TypeDDLEvent,
commonEvent.TypeHandshakeEvent:
log.Debug("check event sequence",
zap.String("changefeedID", d.target.GetChangefeedID().ID().String()),
zap.Stringer("dispatcher", d.target.GetId()),
zap.Int("eventType", event.GetType()),
zap.Uint64("receivedSeq", event.GetSeq()),
zap.Uint64("lastEventSeq", d.lastEventSeq.Load()),
zap.Uint64("commitTs", event.GetCommitTs()))

expectedSeq := d.lastEventSeq.Add(1)
if event.GetSeq() != expectedSeq {
log.Warn("Received an out-of-order event, reset the dispatcher",
Expand Down
4 changes: 3 additions & 1 deletion downstreamadapter/eventcollector/event_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,10 +572,12 @@ func (c *EventCollector) runProcessMessage(ctx context.Context, inCh <-chan *mes
case event.TypeBatchResolvedEvent:
events := e.(*event.BatchResolvedEvent).Events
from := &targetMessage.From
resolvedTsCount := int32(0)
for _, resolvedEvent := range events {
c.ds.Push(resolvedEvent.DispatcherID, dispatcher.NewDispatcherEvent(from, resolvedEvent))
resolvedTsCount += resolvedEvent.Len()
}
c.metricDispatcherReceivedResolvedTsEventCount.Add(float64(e.Len()))
c.metricDispatcherReceivedResolvedTsEventCount.Add(float64(resolvedTsCount))
case event.TypeBatchDMLEvent:
stat, ok := c.dispatcherMap.Load(e.GetDispatcherID())
if !ok {
Expand Down
3 changes: 3 additions & 0 deletions downstreamadapter/sink/kafka/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,6 +400,9 @@ func (s *sink) sendMessages(ctx context.Context) error {
start := time.Now()
if err = s.statistics.RecordBatchExecution(func() (int, int64, error) {
message.SetPartitionKey(future.Key.PartitionKey)

log.Debug("send message to kafka", zap.String("messageKey", string(message.Key)), zap.String("messageValue", string(message.Value)))

if err = s.dmlProducer.AsyncSend(
ctx,
future.Key.Topic,
Expand Down
8 changes: 4 additions & 4 deletions logservice/schemastore/persist_storage_ddl_handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -1806,7 +1806,7 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter
}
} else if !ignoreCurrentTable {
// ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query)
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query).Error()
} else {
// if the table is both filtered out before and after rename table, the ddl should not be fetched
log.Panic("should not build a ignored rename table ddl",
Expand Down Expand Up @@ -1862,7 +1862,7 @@ func buildDDLEventForRenameTable(rawEvent *PersistedDDLEvent, tableFilter filter
}
} else if !ignoreCurrentTable {
// ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query)
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query).Error()
} else {
// if the table is both filtered out before and after rename table, the ddl should not be fetched
log.Panic("should not build a ignored rename table ddl",
Expand Down Expand Up @@ -2111,7 +2111,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
}
} else if !ignoreCurrentTable {
// ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query)
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query).Error()
} else {
// if the table is both filtered out before and after rename table, ignore
}
Expand Down Expand Up @@ -2151,7 +2151,7 @@ func buildDDLEventForRenameTables(rawEvent *PersistedDDLEvent, tableFilter filte
}
} else if !ignoreCurrentTable {
// ignorePrevTable & !ignoreCurrentTable is not allowed as in: https://docs.pingcap.com/tidb/dev/ticdc-ddl
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query)
ddlEvent.Err = cerror.ErrSyncRenameTableFailed.GenWithStackByArgs(rawEvent.TableID, rawEvent.Query).Error()
} else {
// ignore
}
Expand Down
1 change: 0 additions & 1 deletion pkg/common/event/chunk.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ func (m *mounter) rawKVToChunkV1(value []byte, tableInfo *common.TableInfo, chk
colID2CutPos[col.ID] = len(colID2CutPos)
}
}
// TODO: handle old value
cutVals, err := tablecodec.CutRowNew(value, colID2CutPos)
if err != nil {
return err
Expand Down
42 changes: 42 additions & 0 deletions pkg/common/event/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,45 @@ func unflatten(datum types.Datum, ft *types.FieldType, loc *time.Location) (type
}
return datum, nil
}

func IsUKChanged(rawKV *common.RawKVEntry, tableInfo *common.TableInfo) (bool, error) {
recordID, err := tablecodec.DecodeRowKey(rawKV.Key)
if err != nil {
return false, errors.Trace(err)
}

oldDatum, err := decodeRow(rawKV.OldValue, recordID, tableInfo, time.UTC)
if err != nil {
return false, errors.Trace(err)
}

newDatum, err := decodeRow(rawKV.Value, recordID, tableInfo, time.UTC)
if err != nil {
return false, errors.Trace(err)
}

for _, colIDs := range tableInfo.GetIndexColumns() {
for _, colID := range colIDs {
d1 := oldDatum[colID]
d2 := newDatum[colID]
if !d1.Equals(d2) {
return true, nil
}
}
}

for _, colInfo := range tableInfo.GetColumns() {
colID := colInfo.ID
if colInfo.GetFlag()&mysql.UniqueKeyFlag == 0 {
continue
}
d1 := oldDatum[colID]
d2 := newDatum[colID]

if !d1.Equals(d2) {
return true, nil
}
}

return false, nil
}
181 changes: 181 additions & 0 deletions pkg/common/event/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,181 @@
// Copyright 2025 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package event

import (
"testing"

"github.com/stretchr/testify/require"
)

// TestIsUKChanged tests the IsUKChanged function to verify it can correctly
// determine whether an update type rowChange has modified unique keys.
func TestIsUKChanged(t *testing.T) {
helper := NewEventTestHelper(t)
defer helper.Close()

helper.tk.MustExec("use test")

// Test case 1: Table with single unique key - UK changed
createTableSQL1 := "create table t1 (id int primary key, name varchar(32), age int, unique key uk_age (age));"
helper.DDL2Job(createTableSQL1)

insertSQL1 := "insert into t1 values (1, 'alice', 20);"
updateSQL1 := "update t1 set age = 25 where id = 1;"
dmlEvent1, rawKV1 := helper.DML2UpdateEvent("test", "t1", insertSQL1, updateSQL1)

isChanged, err := IsUKChanged(rawKV1, dmlEvent1.TableInfo)
require.NoError(t, err)
require.True(t, isChanged, "UK should be detected as changed when unique key column is modified")

// Test case 2: Table with single unique key - UK not changed
insertSQL2 := "insert into t1 values (2, 'bob', 30);"
updateSQL2 := "update t1 set name = 'robert' where id = 2;"
dmlEvent2, rawKV2 := helper.DML2UpdateEvent("test", "t1", insertSQL2, updateSQL2)

isChanged, err = IsUKChanged(rawKV2, dmlEvent2.TableInfo)
require.NoError(t, err)
require.False(t, isChanged, "UK should not be detected as changed when non-UK column is modified")

// Test case 3: Table with composite unique key - UK changed
createTableSQL3 := "create table t3 (id int primary key, name varchar(32), age int, city varchar(32), unique key uk_name_age (name, age));"
helper.DDL2Job(createTableSQL3)

insertSQL3 := "insert into t3 values (1, 'charlie', 25, 'shanghai');"
updateSQL3 := "update t3 set name = 'charles' where id = 1;"
dmlEvent3, rawKV3 := helper.DML2UpdateEvent("test", "t3", insertSQL3, updateSQL3)

isChanged, err = IsUKChanged(rawKV3, dmlEvent3.TableInfo)
require.NoError(t, err)
require.True(t, isChanged, "UK should be detected as changed when one column of composite UK is modified")

// Test case 4: Table with composite unique key - UK not changed
insertSQL4 := "insert into t3 values (2, 'david', 28, 'beijing');"
updateSQL4 := "update t3 set city = 'guangzhou' where id = 2;"
dmlEvent4, rawKV4 := helper.DML2UpdateEvent("test", "t3", insertSQL4, updateSQL4)

isChanged, err = IsUKChanged(rawKV4, dmlEvent4.TableInfo)
require.NoError(t, err)
require.False(t, isChanged, "UK should not be detected as changed when non-UK column is modified")

// Test case 5: Table with multiple unique keys - first UK changed
createTableSQL5 := "create table t5 (id int primary key, email varchar(64), phone varchar(20), name varchar(32), unique key uk_email (email), unique key uk_phone (phone));"
helper.DDL2Job(createTableSQL5)

insertSQL5 := "insert into t5 values (1, 'alice@example.com', '1234567890', 'alice');"
updateSQL5 := "update t5 set email = 'alice.new@example.com' where id = 1;"
dmlEvent5, rawKV5 := helper.DML2UpdateEvent("test", "t5", insertSQL5, updateSQL5)

isChanged, err = IsUKChanged(rawKV5, dmlEvent5.TableInfo)
require.NoError(t, err)
require.True(t, isChanged, "UK should be detected as changed when first unique key is modified")

// Test case 6: Table with multiple unique keys - second UK changed
insertSQL6 := "insert into t5 values (2, 'bob@example.com', '0987654321', 'bob');"
updateSQL6 := "update t5 set phone = '1111111111' where id = 2;"
dmlEvent6, rawKV6 := helper.DML2UpdateEvent("test", "t5", insertSQL6, updateSQL6)

isChanged, err = IsUKChanged(rawKV6, dmlEvent6.TableInfo)
require.NoError(t, err)
require.True(t, isChanged, "UK should be detected as changed when second unique key is modified")

// Test case 7: Table with multiple unique keys - none changed
insertSQL7 := "insert into t5 values (3, 'charlie@example.com', '2222222222', 'charlie');"
updateSQL7 := "update t5 set name = 'charles' where id = 3;"
dmlEvent7, rawKV7 := helper.DML2UpdateEvent("test", "t5", insertSQL7, updateSQL7)

isChanged, err = IsUKChanged(rawKV7, dmlEvent7.TableInfo)
require.NoError(t, err)
require.False(t, isChanged, "UK should not be detected as changed when non-UK column is modified")

// Test case 8: Table with no unique keys (only primary key)
createTableSQL8 := "create table t8 (id int primary key, name varchar(32), age int);"
helper.DDL2Job(createTableSQL8)

insertSQL8 := "insert into t8 values (1, 'eve', 22);"
updateSQL8 := "update t8 set name = 'eva', age = 23 where id = 1;"
dmlEvent8, rawKV8 := helper.DML2UpdateEvent("test", "t8", insertSQL8, updateSQL8)

isChanged, err = IsUKChanged(rawKV8, dmlEvent8.TableInfo)
require.NoError(t, err)
require.False(t, isChanged, "UK should not be detected as changed when table has no unique keys")

// Test case 9: Composite UK with partial change (both columns in same UK)
createTableSQL9 := "create table t9 (id int primary key, first_name varchar(32), last_name varchar(32), age int, unique key uk_full_name (first_name, last_name));"
helper.DDL2Job(createTableSQL9)

insertSQL9 := "insert into t9 values (1, 'john', 'doe', 30);"
updateSQL9 := "update t9 set first_name = 'johnny', last_name = 'smith' where id = 1;"
dmlEvent9, rawKV9 := helper.DML2UpdateEvent("test", "t9", insertSQL9, updateSQL9)

isChanged, err = IsUKChanged(rawKV9, dmlEvent9.TableInfo)
require.NoError(t, err)
require.True(t, isChanged, "UK should be detected as changed when multiple columns of the same UK are modified")

// Test case 10: UK value changed to same value (should still be detected as changed)
insertSQL10 := "insert into t1 values (10, 'test', 40);"
// Update to same value
updateSQL10 := "update t1 set age = 40 where id = 10;"
dmlEvent10, rawKV10 := helper.DML2UpdateEvent("test", "t1", insertSQL10, updateSQL10)

isChanged, err = IsUKChanged(rawKV10, dmlEvent10.TableInfo)
require.NoError(t, err)
// Note: This depends on implementation - the function might return false for same values
// But if it uses string comparison, it might return false as expected
require.False(t, isChanged, "UK should not be detected as changed when UK value is updated to the same value")

// Test case 11: UK is declared in the column list
createTableSQL11 := "create table t11 (id int primary key, name varchar(32), age int unique key);"
helper.DDL2Job(createTableSQL11)

insertSQL11 := "insert into t11 values (1, 'alice', 20);"
updateSQL11 := "update t11 set age = 33 where id = 1;"
dmlEvent11, rawKV11 := helper.DML2UpdateEvent("test", "t11", insertSQL11, updateSQL11)

isChanged, err = IsUKChanged(rawKV11, dmlEvent11.TableInfo)
require.NoError(t, err)
require.True(t, isChanged, "UK should be detected as changed when UK is changed")

// Test case 12: table with PK, UK, Key and key changed
createTableSQL12 := "create table t12 (id int primary key, name varchar(32), age int, unique key uk_name (name), key idx_age (age));"
helper.DDL2Job(createTableSQL12)

insertSQL12 := "insert into t12 values (1, 'alice', 20);"
updateSQL12 := "update t12 set age = 30 where id = 1;"
dmlEvent12, rawKV12 := helper.DML2UpdateEvent("test", "t12", insertSQL12, updateSQL12)

isChanged, err = IsUKChanged(rawKV12, dmlEvent12.TableInfo)
require.NoError(t, err)
require.False(t, isChanged, "UK should not be detected as changed when only key is modified")

insertSQL13 := "insert into t12 values (2, 'bob', 30);"
updateSQL13 := "update t12 set name = 'dongmen' where id = 2;"
dmlEvent13, rawKV13 := helper.DML2UpdateEvent("test", "t12", insertSQL13, updateSQL13)

isChanged, err = IsUKChanged(rawKV13, dmlEvent13.TableInfo)
require.NoError(t, err)
require.True(t, isChanged, "UK should be detected as changed when UK is changed")

// UK is int type and is in the column list
createTableSQL14 := "create table t14 (id int primary key, name varchar(32), age int unique key);"
helper.DDL2Job(createTableSQL14)

insertSQL14 := "insert into t14 values (1, 'alice', 20);"
updateSQL14 := "update t14 set age = '30' where id = 1;"
dmlEvent14, rawKV14 := helper.DML2UpdateEvent("test", "t14", insertSQL14, updateSQL14)

isChanged, err = IsUKChanged(rawKV14, dmlEvent14.TableInfo)
require.NoError(t, err)
require.True(t, isChanged, "UK should be detected as changed when UK is changed")
}
Loading
0