From 9a79c07119d8c3bc129463eaabd58b2a32ac572c Mon Sep 17 00:00:00 2001 From: Riya Tyagi Date: Fri, 20 Jun 2025 11:34:46 +0530 Subject: [PATCH 1/2] Added code for validating subclustered placement --- .../placement/algo/subclustered_test.go | 1 + src/cluster/placement/placement.go | 73 ++++ src/cluster/placement/placement_test.go | 389 ++++++++++++++++++ 3 files changed, 463 insertions(+) diff --git a/src/cluster/placement/algo/subclustered_test.go b/src/cluster/placement/algo/subclustered_test.go index 0208f87948..af16e324ee 100644 --- a/src/cluster/placement/algo/subclustered_test.go +++ b/src/cluster/placement/algo/subclustered_test.go @@ -264,6 +264,7 @@ func TestSubclusteredAlgorithm_InitialPlacement(t *testing.T) { assert.Equal(t, tt.replicaFactor, result.ReplicaFactor()) assert.True(t, result.IsSharded()) assert.True(t, result.HasSubClusters()) + assert.NoError(t, placement.Validate(result)) assert.Equal(t, tt.instancesPerSubcluster, result.InstancesPerSubCluster()) } }) diff --git a/src/cluster/placement/placement.go b/src/cluster/placement/placement.go index 0a151565eb..1d273a26dc 100644 --- a/src/cluster/placement/placement.go +++ b/src/cluster/placement/placement.go @@ -34,6 +34,8 @@ import ( const ( // uninitializedShardSetID represents uninitialized shard set id. uninitializedShardSetID = 0 + // uninitializedSubClusterID represents uninitialized subcluster id. + uninitializedSubClusterID = uint32(0) ) var ( @@ -42,6 +44,7 @@ var ( errDuplicatedShards = errors.New("invalid placement, there are duplicated shards in one replica") errUnexpectedShards = errors.New("invalid placement, there are unexpected shard ids on instance") errMirrorNotSharded = errors.New("invalid placement, mirrored placement must be sharded") + errSubclusteredNotSharded = errors.New("invalid placement, subclustered placement must be sharded") ) type placement struct { @@ -286,6 +289,10 @@ func validate(p Placement) error { return errMirrorNotSharded } + if p.HasSubClusters() && !p.IsSharded() { + return errSubclusteredNotSharded + } + shardCountMap := convertShardSliceToMap(p.Shards()) if len(shardCountMap) != len(p.Shards()) { return errDuplicatedShards @@ -309,6 +316,13 @@ func validate(p Placement) error { if instance.Shards().NumShards() != 0 && !p.IsSharded() { return fmt.Errorf("instance %s contains shards in a non-sharded placement", instance.String()) } + if instance.SubClusterID() == uninitializedSubClusterID && p.HasSubClusters() { + return fmt.Errorf("instance %s has uninitialized subcluster id in a subclustered placement", instance.String()) + } + if instance.SubClusterID() != uninitializedSubClusterID && !p.HasSubClusters() { + return fmt.Errorf("instance %s has subcluster id %d in a non-subclustered placement", + instance.String(), instance.SubClusterID()) + } shardSetID := instance.ShardSetID() if shardSetID > maxShardSetID { return fmt.Errorf("instance %s shard set id %d is larger than max shard set id %d in the placement", instance.String(), shardSetID, maxShardSetID) @@ -422,6 +436,65 @@ func validate(p Placement) error { return fmt.Errorf("invalid shard count for shard %d: expected %d, actual %d", shard, p.ReplicaFactor(), c) } } + + if p.HasSubClusters() { + return validateSubclusteredPlacement(p) + } + return nil +} + +func validateSubclusteredPlacement(p Placement) error { + shardToInstanceMap := make(map[uint32]map[Instance]struct{}) + subClusterToInstanceMap := make(map[uint32]map[Instance]struct{}) + shardToIsolationGroupMap := make(map[uint32]map[string]struct{}) + instancesPerSubCluster := p.InstancesPerSubCluster() + + for _, instance := range p.Instances() { + if instance.IsLeaving() { + continue + } + if _, exist := subClusterToInstanceMap[instance.SubClusterID()]; !exist { + subClusterToInstanceMap[instance.SubClusterID()] = make(map[Instance]struct{}) + } + subClusterToInstanceMap[instance.SubClusterID()][instance] = struct{}{} + + for _, s := range instance.Shards().All() { + if s.State() == shard.Leaving { + continue + } + if _, exist := shardToIsolationGroupMap[s.ID()]; !exist { + shardToIsolationGroupMap[s.ID()] = make(map[string]struct{}) + } + shardToIsolationGroupMap[s.ID()][instance.IsolationGroup()] = struct{}{} + if _, exist := shardToInstanceMap[s.ID()]; !exist { + shardToInstanceMap[s.ID()] = make(map[Instance]struct{}) + } + shardToInstanceMap[s.ID()][instance] = struct{}{} + } + } + + for shard, instances := range shardToInstanceMap { + firstReplica := true + shardSubclusterID := uninitializedSubClusterID + for instance := range instances { + if firstReplica { + shardSubclusterID = instance.SubClusterID() + firstReplica = false + continue + } + currSubclusterID := instance.SubClusterID() + if currSubclusterID != shardSubclusterID && + len(subClusterToInstanceMap[shardSubclusterID]) == instancesPerSubCluster && + len(subClusterToInstanceMap[currSubclusterID]) == instancesPerSubCluster { + return fmt.Errorf("invalid shard %d, expected subcluster id %d, actual %d", + shard, shardSubclusterID, currSubclusterID) + } + } + if len(shardToIsolationGroupMap[shard]) != p.ReplicaFactor() { + return fmt.Errorf("invalid shard %d, expected %d isolation groups, actual %d", + shard, p.ReplicaFactor(), len(shardToIsolationGroupMap[shard])) + } + } return nil } diff --git a/src/cluster/placement/placement_test.go b/src/cluster/placement/placement_test.go index 3dfcdc7be6..00db1d885e 100644 --- a/src/cluster/placement/placement_test.go +++ b/src/cluster/placement/placement_test.go @@ -160,6 +160,20 @@ func TestValidateMirrorButNotSharded(t *testing.T) { assert.Equal(t, errMirrorNotSharded.Error(), err.Error()) } +func TestValidateSubclusteredButNotSharded(t *testing.T) { + p := NewPlacement().SetHasSubClusters(true) + err := Validate(p) + require.Error(t, err) + assert.Equal(t, errSubclusteredNotSharded.Error(), err.Error()) +} + +func TestValidateInstanceWithSubclusterIDInNonSubclusteredPlacement(t *testing.T) { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint", 1).SetSubClusterID(1) + p := NewPlacement().SetInstances([]Instance{i1}).SetShards([]uint32{1}).SetReplicaFactor(1) + err := Validate(p) + require.Error(t, err) +} + func TestValidateMissingShard(t *testing.T) { i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint", 1) i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) @@ -766,3 +780,378 @@ func getProtoShards(ids []uint32) []*placementpb.Shard { } return r } + +func TestValidateSubclusteredPlacement(t *testing.T) { + tests := []struct { + name string + instancesPerSubcluster int + replicaFactor int + instances []Instance + shards []uint32 + expectError bool + }{ + { + name: "valid subclustered placement - single subcluster", + instancesPerSubcluster: 6, + replicaFactor: 2, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r1", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(4).SetState(shard.Available)) + i2.Shards().Add(shard.NewShard(5).SetState(shard.Available)) + i2.Shards().Add(shard.NewShard(6).SetState(shard.Available)) + + i3 := NewEmptyInstance("i3", "r2", "z1", "endpoint3", 1).SetSubClusterID(1) + i3.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i3.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + i3.Shards().Add(shard.NewShard(5).SetState(shard.Available)) + + i4 := NewEmptyInstance("i4", "r2", "z1", "endpoint4", 1).SetSubClusterID(1) + i4.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i4.Shards().Add(shard.NewShard(4).SetState(shard.Available)) + i4.Shards().Add(shard.NewShard(6).SetState(shard.Available)) + + return []Instance{i1, i2, i3, i4} + }(), + shards: []uint32{1, 2, 3, 4, 5, 6}, + expectError: false, + }, + { + name: "valid subclustered placement - multiple subclusters", + instancesPerSubcluster: 3, + replicaFactor: 1, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + i2.Shards().Add(shard.NewShard(4).SetState(shard.Available)) + + i3 := NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1) + i3.Shards().Add(shard.NewShard(5).SetState(shard.Available)) + i3.Shards().Add(shard.NewShard(6).SetState(shard.Available)) + + i4 := NewEmptyInstance("i4", "r1", "z1", "endpoint4", 1).SetSubClusterID(2) + i4.Shards().Add(shard.NewShard(7).SetState(shard.Available)) + i4.Shards().Add(shard.NewShard(8).SetState(shard.Available)) + + i5 := NewEmptyInstance("i5", "r2", "z1", "endpoint5", 1).SetSubClusterID(2) + i5.Shards().Add(shard.NewShard(9).SetState(shard.Available)) + i5.Shards().Add(shard.NewShard(10).SetState(shard.Available)) + + i6 := NewEmptyInstance("i6", "r3", "z1", "endpoint6", 1).SetSubClusterID(2) + i6.Shards().Add(shard.NewShard(11).SetState(shard.Available)) + i6.Shards().Add(shard.NewShard(12).SetState(shard.Available)) + + return []Instance{i1, i2, i3, i4, i5, i6} + }(), + shards: []uint32{1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}, + expectError: false, + }, + // nolint: dupl + { + name: "shard with wrong isolation group count", + instancesPerSubcluster: 6, + replicaFactor: 2, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r1", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i2.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + + return []Instance{i1, i2} + }(), + shards: []uint32{1, 2}, + expectError: true, + }, + { + name: "instance with uninitialized subcluster ID", + instancesPerSubcluster: 3, + replicaFactor: 1, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + + // This instance has uninitialized subcluster ID + i3 := NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1) + i3.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + + return []Instance{i1, i2, i3} + }(), + shards: []uint32{1, 2, 3}, + expectError: true, + }, + { + name: "valid subclustered placement with leaving instances", + instancesPerSubcluster: 4, + replicaFactor: 2, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i2.Shards().Add(shard.NewShard(4).SetState(shard.Available)) + + // Leaving instance - should be ignored in subcluster validation + i3 := NewEmptyInstance("i3", "r2", "z1", "endpoint3", 1).SetSubClusterID(1) + i3.Shards().Add(shard.NewShard(2).SetState(shard.Leaving)) + i3.Shards().Add(shard.NewShard(3).SetState(shard.Leaving)) + + i4 := NewEmptyInstance("i4", "r1", "z1", "endpoint4", 1).SetSubClusterID(1) + i4.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i4.Shards().Add(shard.NewShard(4).SetState(shard.Available)) + + i5 := NewEmptyInstance("i5", "r2", "z1", "endpoint5", 1).SetSubClusterID(1) + i5.Shards().Add(shard.NewShard(2).SetState(shard.Initializing).SetSourceID("i3")) + i5.Shards().Add(shard.NewShard(3).SetState(shard.Initializing).SetSourceID("i3")) + + return []Instance{i1, i2, i3, i4, i5} + }(), + shards: []uint32{1, 2, 3, 4}, + expectError: false, + }, + { + name: "shard with leaving state ignored in validation", + instancesPerSubcluster: 3, + replicaFactor: 1, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(4).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r1", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(5).SetState(shard.Available)) + + // This shard is leaving, so it should be ignored in isolation group validation + i3 := NewEmptyInstance("i3", "r1", "z1", "endpoint3", 1).SetSubClusterID(1) + i3.Shards().Add(shard.NewShard(3).SetState(shard.Leaving)) + i3.Shards().Add(shard.NewShard(6).SetState(shard.Available)) + + i4 := NewEmptyInstance("i4", "r1", "z1", "endpoint4", 1).SetSubClusterID(1) + i4.Shards().Add(shard.NewShard(3).SetState(shard.Initializing).SetSourceID("i3")) + + return []Instance{i1, i2, i3, i4} + }(), + shards: []uint32{1, 2, 3, 4, 5, 6}, + expectError: false, + }, + { + name: "shards are shared among multiple completeßsubclusters", + instancesPerSubcluster: 3, + replicaFactor: 3, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(4).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i2.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i2.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + + i3 := NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1) + i3.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i3.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i3.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + + i4 := NewEmptyInstance("i4", "r1", "z1", "endpoint4", 1).SetSubClusterID(2) + i4.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i4.Shards().Add(shard.NewShard(5).SetState(shard.Available)) + i4.Shards().Add(shard.NewShard(6).SetState(shard.Available)) + + i5 := NewEmptyInstance("i5", "r2", "z1", "endpoint5", 1).SetSubClusterID(2) + i5.Shards().Add(shard.NewShard(4).SetState(shard.Available)) + i5.Shards().Add(shard.NewShard(5).SetState(shard.Available)) + i5.Shards().Add(shard.NewShard(6).SetState(shard.Available)) + + i6 := NewEmptyInstance("i6", "r3", "z1", "endpoint6", 1).SetSubClusterID(2) + i6.Shards().Add(shard.NewShard(4).SetState(shard.Available)) + i6.Shards().Add(shard.NewShard(5).SetState(shard.Available)) + i6.Shards().Add(shard.NewShard(6).SetState(shard.Available)) + + return []Instance{i1, i2, i3, i4, i5, i6} + }(), + shards: []uint32{1, 2, 3, 4, 5, 6}, + expectError: true, + }, + { + name: "shards in transitionary state while moving to another subcluster", + instancesPerSubcluster: 3, + replicaFactor: 3, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(3).SetState(shard.Leaving)) + + i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i2.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i2.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + + i3 := NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1) + i3.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i3.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i3.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + + i4 := NewEmptyInstance("i4", "r1", "z1", "endpoint4", 1).SetSubClusterID(2) + i4.Shards().Add(shard.NewShard(3).SetState(shard.Initializing).SetSourceID("i1")) + + return []Instance{i1, i2, i3, i4} + }(), + shards: []uint32{1, 2, 3}, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := NewPlacement(). + SetInstances(tt.instances). + SetShards(tt.shards). + SetReplicaFactor(tt.replicaFactor). + SetIsSharded(true). + SetHasSubClusters(true). + SetInstancesPerSubCluster(tt.instancesPerSubcluster) + + err := Validate(p) + + if tt.expectError { + require.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestValidateSubclusteredPlacementEdgeCases(t *testing.T) { + tests := []struct { + name string + instancesPerSubcluster int + replicaFactor int + instances []Instance + shards []uint32 + expectError bool + errorMessage string + }{ + { + name: "empty placement", + instancesPerSubcluster: 3, + replicaFactor: 1, + instances: []Instance{}, + shards: []uint32{}, + expectError: false, + }, + { + name: "single instance placement", + instancesPerSubcluster: 3, + replicaFactor: 1, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + return []Instance{i1} + }(), + shards: []uint32{1}, + expectError: false, + }, + // nolint: dupl + { + name: "incomplete subcluster - should not fail validation", + instancesPerSubcluster: 4, + replicaFactor: 2, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i1.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i2.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + + // Only 2 instances in subcluster, but instancesPerSubcluster is 4 + // This should still be valid as the subcluster is not full + return []Instance{i1, i2} + }(), + shards: []uint32{1, 2}, + expectError: false, + }, + { + name: "multiple isolation groups per shard", + instancesPerSubcluster: 6, + replicaFactor: 3, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + + i3 := NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1) + i3.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + + return []Instance{i1, i2, i3} + }(), + shards: []uint32{1}, + expectError: false, + }, + { + name: "shard with insufficient isolation groups", + instancesPerSubcluster: 6, + replicaFactor: 3, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r1", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + + i3 := NewEmptyInstance("i3", "r2", "z1", "endpoint3", 1).SetSubClusterID(1) + i3.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + + // Only 2 instances from same isolation group, but replica factor is 3 + return []Instance{i1, i2, i3} + }(), + shards: []uint32{1}, + expectError: true, + errorMessage: "invalid shard 1, expected 3 isolation groups, actual 2", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := NewPlacement(). + SetInstances(tt.instances). + SetShards(tt.shards). + SetReplicaFactor(tt.replicaFactor). + SetIsSharded(true). + SetHasSubClusters(true). + SetInstancesPerSubCluster(tt.instancesPerSubcluster) + + err := Validate(p) + + if tt.expectError { + require.Error(t, err) + assert.Equal(t, tt.errorMessage, err.Error()) + } else { + assert.NoError(t, err) + } + }) + } +} From ba4649c21cf27cc55db836a7195c3a5b7daf9f18 Mon Sep 17 00:00:00 2001 From: Riya Tyagi Date: Sun, 29 Jun 2025 19:36:52 +0530 Subject: [PATCH 2/2] Added more validations --- src/cluster/placement/placement.go | 46 ++++++++++++++++--------- src/cluster/placement/placement_test.go | 38 +++++++++++++++++++- 2 files changed, 66 insertions(+), 18 deletions(-) diff --git a/src/cluster/placement/placement.go b/src/cluster/placement/placement.go index 1d273a26dc..56f4b909d9 100644 --- a/src/cluster/placement/placement.go +++ b/src/cluster/placement/placement.go @@ -444,7 +444,7 @@ func validate(p Placement) error { } func validateSubclusteredPlacement(p Placement) error { - shardToInstanceMap := make(map[uint32]map[Instance]struct{}) + shardToSubclusterMap := make(map[uint32]map[uint32]struct{}) subClusterToInstanceMap := make(map[uint32]map[Instance]struct{}) shardToIsolationGroupMap := make(map[uint32]map[string]struct{}) instancesPerSubCluster := p.InstancesPerSubCluster() @@ -466,30 +466,42 @@ func validateSubclusteredPlacement(p Placement) error { shardToIsolationGroupMap[s.ID()] = make(map[string]struct{}) } shardToIsolationGroupMap[s.ID()][instance.IsolationGroup()] = struct{}{} - if _, exist := shardToInstanceMap[s.ID()]; !exist { - shardToInstanceMap[s.ID()] = make(map[Instance]struct{}) + if _, exist := shardToSubclusterMap[s.ID()]; !exist { + shardToSubclusterMap[s.ID()] = make(map[uint32]struct{}) } - shardToInstanceMap[s.ID()][instance] = struct{}{} + shardToSubclusterMap[s.ID()][instance.SubClusterID()] = struct{}{} } } - for shard, instances := range shardToInstanceMap { + for shard, subclusters := range shardToSubclusterMap { firstReplica := true shardSubclusterID := uninitializedSubClusterID - for instance := range instances { - if firstReplica { - shardSubclusterID = instance.SubClusterID() - firstReplica = false - continue - } - currSubclusterID := instance.SubClusterID() - if currSubclusterID != shardSubclusterID && - len(subClusterToInstanceMap[shardSubclusterID]) == instancesPerSubCluster && - len(subClusterToInstanceMap[currSubclusterID]) == instancesPerSubCluster { - return fmt.Errorf("invalid shard %d, expected subcluster id %d, actual %d", - shard, shardSubclusterID, currSubclusterID) + + // If the movement is happening than the shard can be shared by at most two subclusters. + // One which is giving the shard and one which is receiving the shard. + if len(subclusters) > 2 { + return fmt.Errorf("invalid shard %d, expected at most 2 subclusters (only during shard movement),"+ + "actual %d", shard, len(subclusters)) + } + if len(subclusters) == 2 { + // Check if the shard is shared among subclusters while moving from one subcluster to another. + // If the movement is happening than the shard can be shared by at most two subclusters. + // One which is giving the shard and one which is receiving the shard. + for subcluster := range subclusters { + if firstReplica { + shardSubclusterID = subcluster + firstReplica = false + continue + } + currSubclusterID := subcluster + if len(subClusterToInstanceMap[shardSubclusterID]) == instancesPerSubCluster && + len(subClusterToInstanceMap[currSubclusterID]) == instancesPerSubCluster { + return fmt.Errorf("invalid shard %d, expected subcluster id %d, actual %d", + shard, shardSubclusterID, currSubclusterID) + } } } + if len(shardToIsolationGroupMap[shard]) != p.ReplicaFactor() { return fmt.Errorf("invalid shard %d, expected %d isolation groups, actual %d", shard, p.ReplicaFactor(), len(shardToIsolationGroupMap[shard])) diff --git a/src/cluster/placement/placement_test.go b/src/cluster/placement/placement_test.go index 00db1d885e..078e330deb 100644 --- a/src/cluster/placement/placement_test.go +++ b/src/cluster/placement/placement_test.go @@ -951,7 +951,7 @@ func TestValidateSubclusteredPlacement(t *testing.T) { expectError: false, }, { - name: "shards are shared among multiple completeßsubclusters", + name: "shards are shared among multiple completebsubclusters", instancesPerSubcluster: 3, replicaFactor: 3, instances: func() []Instance { @@ -1018,6 +1018,42 @@ func TestValidateSubclusteredPlacement(t *testing.T) { shards: []uint32{1, 2, 3}, expectError: false, }, + { + name: "shards in transitionary state - belongs to > 2 subclusters", + instancesPerSubcluster: 3, + replicaFactor: 3, + instances: func() []Instance { + i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1) + i1.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + + i2 := NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1) + i2.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + + i3 := NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1) + i3.Shards().Add(shard.NewShard(1).SetState(shard.Available)) + i3.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + + i4 := NewEmptyInstance("i4", "r1", "z1", "endpoint4", 1).SetSubClusterID(2) + i4.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + + i5 := NewEmptyInstance("i5", "r2", "z1", "endpoint5", 1).SetSubClusterID(2) + i5.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + i5.Shards().Add(shard.NewShard(3).SetState(shard.Initializing).SetSourceID("i8")) + + i6 := NewEmptyInstance("i6", "r3", "z1", "endpoint6", 1).SetSubClusterID(2) + i6.Shards().Add(shard.NewShard(2).SetState(shard.Available)) + + i7 := NewEmptyInstance("i7", "r1", "z1", "endpoint7", 1).SetSubClusterID(3) + i7.Shards().Add(shard.NewShard(3).SetState(shard.Available)) + + i8 := NewEmptyInstance("i8", "r2", "z1", "endpoint8", 1).SetSubClusterID(3) + i8.Shards().Add(shard.NewShard(3).SetState(shard.Leaving)) + + return []Instance{i1, i2, i3, i4, i5, i6, i7, i8} + }(), + shards: []uint32{1, 2, 3}, + expectError: true, + }, } for _, tt := range tests {