8000 storage: don't crash when applying side-effects of old ChangeReplicas trigger by nvanbenschoten · Pull Request #41171 · cockroachdb/cockroach · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

storage: don't crash when applying side-effects of old ChangeReplicas trigger #41171

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions pkg/roachpb/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -1343,6 +1343,14 @@ func (crt ChangeReplicasTrigger) Replicas() []ReplicaDescriptor {
return crt.DeprecatedUpdatedReplicas
}

// NextReplicaID returns the next replica id to use after this trigger applies.
func (crt ChangeReplicasTrigger) NextReplicaID() ReplicaID {
if crt.Desc != nil {
return crt.Desc.NextReplicaID
}
return crt.DeprecatedNextReplicaID
}

// ConfChange returns the configuration change described by the trigger.
func (crt ChangeReplicasTrigger) ConfChange(encodedCtx []byte) (raftpb.ConfChangeI, error) {
return confChangeImpl(crt, encodedCtx)
Expand Down
2 changes: 1 addition & 1 deletion pkg/storage/replica_application_result.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,7 +320,7 @@ func (r *Replica) handleChangeReplicasResult(
log.Fatalf(ctx, "failed to run Replica postDestroy: %v", err)
}

if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.Desc.NextReplicaID, RemoveOptions{
if err := r.store.removeInitializedReplicaRaftMuLocked(ctx, r, chng.NextReplicaID(), RemoveOptions{
// We destroyed the data when the batch committed so don't destroy it again.
DestroyData: false,
// In order to detect the GC queue racing with other causes of replica removal
Expand Down
12 changes: 1 addition & 11 deletions pkg/storage/replica_application_state_machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -668,16 +668,6 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat
b.r.mu.Unlock()
b.changeRemovesReplica = true

// In 19.1 and before we used DeprecatedNextReplicaID to carry the next
// replica id to use once this change is applied. In 19.2 we started
// providing a new range descriptor directly, which includes this info.
var nextReplID roachpb.ReplicaID
if change.Desc != nil {
nextReplID = change.Desc.NextReplicaID
} else {
nextReplID = change.DeprecatedNextReplicaID
}

// Delete all of the local data. We're going to delete the hard state too.
// In order for this to be safe we need code above this to promise that we're
// never going to write hard state in response to a message for a later
Expand All @@ -686,7 +676,7 @@ func (b *replicaAppBatch) runPreApplyTriggers(ctx context.Context, cmd *replicat
ctx,
b.batch,
b.batch,
nextReplID,
change.NextReplicaID(),
false, /* clearRangeIDLocalOnly */
false, /* mustUseClearRange */
); err != nil {
Expand Down
217 changes: 117 additions & 100 deletions pkg/storage/replica_application_state_machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"testing"

"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/storage/apply"
"github.com/cockroachdb/cockroach/pkg/storage/storagepb"
"github.com/cockroachdb/cockroach/pkg/testutils"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
Expand All @@ -23,68 +24,81 @@ import (
"go.etcd.io/etcd/raft/raftpb"
)

// TestReplicaStateMachineStageChangeReplicas tests the behavior of staging a
// replicated command with a ChangeReplicas trigger in a replicaAppBatch. The
// test exercises the logic of applying both old-style and new-style
// TestReplicaStateMachineChangeReplicas tests the behavior of applying a
// replicated command with a ChangeReplicas trigger in a replicaAppBatch.
// The test exercises the logic of applying both old-style and new-style
// ChangeReplicas triggers.
func TestReplicaStateMachineStageChangeReplicas(t *testing.T) {
func TestReplicaStateMachineChangeReplicas(t *testing.T) {
defer leaktest.AfterTest(t)()
tc := testContext{}
stopper := stop.NewStopper()
defer stopper.Stop(context.TODO())
tc.Start(t, stopper)

// Lock the replica for the entire test.
r := tc.repl
r.raftMu.Lock()
defer r.raftMu.Unlock()
sm := r.getStateMachine()

desc := r.Desc()
replDesc, ok := desc.GetReplicaDescriptor(r.store.StoreID())
require.True(t, ok)

makeCmd := func(desc *roachpb.RangeDescriptor, t roachpb.ChangeReplicasTrigger) *replicatedCmd {
return &replicatedCmd{
ctx: context.Background(),
ent: &raftpb.Entry{Index: r.mu.state.RaftAppliedIndex + 1},
decodedRaftEntry: decodedRaftEntry{
idKey: makeIDKey(),
raftCmd: storagepb.RaftCommand{
ProposerLeaseSequence: r.mu.state.Lease.Sequence,
MaxLeaseIndex: r.mu.state.LeaseAppliedIndex + 1,
ReplicatedEvalResult: storagepb.ReplicatedEvalResult{
State: &storagepb.ReplicaState{Desc: desc},
ChangeReplicas: &storagepb.ChangeReplicas{ChangeReplicasTrigger: t},
Timestamp: r.mu.state.GCThreshold.Add(1, 0),
},
},
},
}
}

t.Run("add replica", func(t *testing.T) {
// Add a new replica to the Range.
newDesc := *desc
newDesc.InternalReplicas = append([]roachpb.ReplicaDescriptor(nil), desc.InternalReplicas...)
addedReplDesc := newDesc.AddReplica(replDesc.NodeID+1, replDesc.StoreID+1, roachpb.VOTER_FULL)

testutils.RunTrueAndFalse(t, "add replica", func(t *testing.T, add bool) {
testutils.RunTrueAndFalse(t, "deprecated", func(t *testing.T, deprecated bool) {
tc := testContext{}
ctx := context.Background()
stopper := stop.NewStopper()
defer stopper.Stop(ctx)
tc.Start(t, stopper)

// Lock the replica for the entire test.
r := tc.repl
r.raftMu.Lock()
defer r.raftMu.Unlock()
sm := r.getStateMachine()

desc := r.Desc()
replDesc, ok := desc.GetReplicaDescriptor(r.store.StoreID())
require.True(t, ok)

newDesc := *desc
newDesc.InternalReplicas = append([]roachpb.ReplicaDescriptor(nil), desc.InternalReplicas...)
var trigger roachpb.ChangeReplicasTrigger
if deprecated {
trigger = roachpb.ChangeReplicasTrigger{
DeprecatedChangeType: roachpb.ADD_REPLICA,
DeprecatedReplica: addedReplDesc,
DeprecatedUpdatedReplicas: []roachpb.ReplicaDescriptor{
replDesc,
addedReplDesc,
},
DeprecatedNextReplicaID: addedReplDesc.ReplicaID + 1,
var confChange raftpb.ConfChange
if add {
// Add a new replica to the Range.
addedReplDesc := newDesc.AddReplica(replDesc.NodeID+1, replDesc.StoreID+1, roachpb.VOTER_FULL)

if deprecated {
trigger = roachpb.ChangeReplicasTrigger{
DeprecatedChangeType: roachpb.ADD_REPLICA,
DeprecatedReplica: addedReplDesc,
DeprecatedUpdatedReplicas: []roachpb.ReplicaDescriptor{
replDesc,
addedReplDesc,
},
DeprecatedNextReplicaID: addedReplDesc.ReplicaID + 1,
}
} else {
trigger = roachpb.ChangeReplicasTrigger{
Desc: &newDesc,
InternalAddedReplicas: []roachpb.ReplicaDescriptor{addedReplDesc},
}
}

confChange = raftpb.ConfChange{
Type: raftpb.ConfChangeAddNode,
NodeID: uint64(addedReplDesc.NodeID),
}
} else {
trigger = roachpb.ChangeReplicasTrigger{
Desc: &newDesc,
InternalAddedReplicas: []roachpb.ReplicaDescriptor{addedReplDesc},
// Remove ourselves from the Range.
removedReplDesc, ok := newDesc.RemoveReplica(replDesc.NodeID, replDesc.StoreID)
require.True(t, ok)

if deprecated {
trigger = roachpb.ChangeReplicasTrigger{
DeprecatedChangeType: roachpb.REMOVE_REPLICA,
DeprecatedReplica: removedReplDesc,
DeprecatedUpdatedReplicas: []roachpb.ReplicaDescriptor{},
DeprecatedNextReplicaID: replDesc.ReplicaID + 1,
}
} else {
trigger = roachpb.ChangeReplicasTrigger{
Desc: &newDesc,
InternalRemovedReplicas: []roachpb.ReplicaDescriptor{removedReplDesc},
}
}

confChange = raftpb.ConfChange{
Type: raftpb.ConfChangeRemoveNode,
NodeID: uint64(removedReplDesc.NodeID),
}
}

Expand All @@ -93,60 +107,63 @@ func TestReplicaStateMachineStageChangeReplicas(t *testing.T) {
defer b.Close()

// Stage a command with the ChangeReplicas trigger.
cmd := makeCmd(&newDesc, trigger)
_, err := b.Stage(cmd)
cmd := &replicatedCmd{
ctx: ctx,
ent: &raftpb.Entry{
Index: r.mu.state.RaftAppliedIndex + 1,
Type: raftpb.EntryConfChange,
},
decodedRaftEntry: decodedRaftEntry{
idKey: makeIDKey(),
raftCmd: storagepb.RaftCommand{
ProposerLeaseSequence: r.mu.state.Lease.Sequence,
MaxLeaseIndex: r.mu.state.LeaseAppliedIndex + 1,
ReplicatedEvalResult: storagepb.ReplicatedEvalResult{
State: &storagepb.ReplicaState{Desc: &newDesc},
ChangeReplicas: &storagepb.ChangeReplicas{ChangeReplicasTrigger: trigger},
Timestamp: r.mu.state.GCThreshold.Add(1, 0),
},
},
confChange: &decodedConfChange{
ConfChangeI: confChange,
},
},
}

checkedCmd, err := b.Stage(cmd)
require.NoError(t, err)
require.False(t, b.changeRemovesReplica)
require.Equal(t, !add, b.changeRemovesReplica)
require.Equal(t, b.state.RaftAppliedIndex, cmd.ent.Index)
require.Equal(t, b.state.LeaseAppliedIndex, cmd.raftCmd.MaxLeaseIndex)

// Check the replica's destroy status.
r.mu.Lock()
require.False(t, r.mu.destroyStatus.Removed())
r.mu.Unlock()
})
})

t.Run("remove replica", func(t *testing.T) {
// Remove ourselves from the Range.
newDesc := *desc
newDesc.InternalReplicas = append([]roachpb.ReplicaDescriptor(nil), desc.InternalReplicas...)
removedReplDesc, ok := newDesc.RemoveReplica(replDesc.NodeID, replDesc.StoreID)
require.True(t, ok)

testutils.RunTrueAndFalse(t, "deprecated", func(t *testing.T, deprecated bool) {
var trigger roachpb.ChangeReplicasTrigger
if deprecated {
trigger = roachpb.ChangeReplicasTrigger{
DeprecatedChangeType: roachpb.REMOVE_REPLICA,
DeprecatedReplica: removedReplDesc,
DeprecatedUpdatedReplicas: []roachpb.ReplicaDescriptor{},
DeprecatedNextReplicaID: replDesc.ReplicaID + 1,
}
reason, _ := r.IsDestroyed()
if add {
require.Equal(t, destroyReasonAlive, reason)
} else {
trigger = roachpb.ChangeReplicasTrigger{
Desc: &newDesc,
InternalRemovedReplicas: []roachpb.ReplicaDescriptor{removedReplDesc},
}
require.Equal(t, destroyReasonRemoved, reason)
}

// Create a new application batch.
b := sm.NewBatch(false /* ephemeral */).(*replicaAppBatch)
defer b.Close()

// Stage a command with the ChangeReplicas trigger.
cmd := makeCmd(&newDesc, trigger)
_, err := b.Stage(cmd)
// Apply the batch to the StateMachine.
err = b.ApplyToStateMachine(ctx)
require.NoError(t, err)
require.True(t, b.changeRemovesReplica)
require.Equal(t, b.state.RaftAppliedIndex, cmd.ent.Index)
require.Equal(t, b.state.LeaseAppliedIndex, cmd.raftCmd.MaxLeaseIndex)

// Check the replica's destroy status.
r.mu.Lock()
require.True(t, r.mu.destroyStatus.Removed())
r.mu.destroyStatus.Set(nil, destroyReasonAlive) // reset
r.mu.Unlock()
// Apply the side effects of the command to the StateMachine.
_, err = sm.ApplySideEffects(checkedCmd)
if add {
require.NoError(t, err)
} else {
require.Equal(t, apply.ErrRemoved, err)
}

// Check whether the Replica still exists in the Store.
_, err = tc.store.GetReplica(r.RangeID)
if add {
require.NoError(t, err)
} else {
require.Error(t, err)
require.IsType(t, &roachpb.RangeNotFoundError{}, err)
}
})
})
}
0