From 6c67302c8244393e4c346d12ba984f0ccdde8623 Mon Sep 17 00:00:00 2001 From: Riya Tyagi Date: Thu, 19 Jun 2025 16:26:58 +0530 Subject: [PATCH 1/2] Added code for assigning subcluster Ids and adding validations in initial placement --- src/cluster/placement/algo/subclustered.go | 56 ++- .../placement/algo/subclustered_helper.go | 35 +- .../placement/algo/subclustered_test.go | 472 ++++++++++++++++++ 3 files changed, 553 insertions(+), 10 deletions(-) create mode 100644 src/cluster/placement/algo/subclustered_test.go diff --git a/src/cluster/placement/algo/subclustered.go b/src/cluster/placement/algo/subclustered.go index 93d9cdd10c..feafdd2d12 100644 --- a/src/cluster/placement/algo/subclustered.go +++ b/src/cluster/placement/algo/subclustered.go @@ -20,6 +20,10 @@ func newSubclusteredAlgorithm(opts placement.Options) placement.Algorithm { } func (a subclusteredPlacementAlgorithm) IsCompatibleWith(p placement.Placement) error { + if p == nil { + return fmt.Errorf("placement is nil") + } + if !p.IsSharded() { return errIncompatibleWithSubclusteredAlgo } @@ -35,8 +39,19 @@ func (a subclusteredPlacementAlgorithm) InitialPlacement( shards []uint32, rf int, ) (placement.Placement, error) { - // TODO: Implement subclustered initial placement logic - return nil, fmt.Errorf("subclustered initial placement not yet implemented") + if err := a.assignSubClusterIDs(instances, nil); err != nil { + return nil, err + } + if a.opts.InstancesPerSubCluster()%rf != 0 { + return nil, fmt.Errorf("instances per subcluster is not a multiple of replica factor") + } + + _, err := newSubclusteredInitHelper(instances, shards, a.opts) + if err != nil { + return nil, err + } + + return nil, nil } func (a subclusteredPlacementAlgorithm) AddReplica(p placement.Placement) (placement.Placement, error) { @@ -119,3 +134,40 @@ func (a subclusteredPlacementAlgorithm) BalanceShards( // TODO: Implement subclustered balance shards logic return nil, fmt.Errorf("subclustered balance shards not yet implemented") } + +func (a subclusteredPlacementAlgorithm) assignSubClusterIDs( + instances []placement.Instance, + currPlacement placement.Placement, +) error { + instancesPerSubcluster := a.opts.InstancesPerSubCluster() + if instancesPerSubcluster <= 0 { + return fmt.Errorf("instances per subcluster is not set") + } + + // If current placement is nil, start assigning from subcluster 1 + maxSubclusterID := uint32(1) + maxSubclusterCount := 0 + if currPlacement != nil { + currInstances := currPlacement.Instances() + + for _, instance := range currInstances { + if instance.SubClusterID() > maxSubclusterID { + maxSubclusterID = instance.SubClusterID() + maxSubclusterCount = 1 + } else if instance.SubClusterID() == maxSubclusterID { + maxSubclusterCount++ + } + } + } + + // Assign subcluster IDs to new instances + for _, instance := range instances { + if maxSubclusterCount == instancesPerSubcluster { + maxSubclusterID++ + maxSubclusterCount = 0 + } + instance.SetSubClusterID(maxSubclusterID) + maxSubclusterCount++ + } + return nil +} diff --git a/src/cluster/placement/algo/subclustered_helper.go b/src/cluster/placement/algo/subclustered_helper.go index 60d95a72e6..bb1345da2f 100644 --- a/src/cluster/placement/algo/subclustered_helper.go +++ b/src/cluster/placement/algo/subclustered_helper.go @@ -63,8 +63,32 @@ type subcluster struct { instanceShardCounts map[string]int } -// nolint -func newSubclusteredHelper(p placement.Placement, targetRF int, opts placement.Options, subClusterToExclude uint32) (placementHelper, error) { +func newSubclusteredInitHelper( + instances []placement.Instance, + ids []uint32, + opts placement.Options, +) (placementHelper, error) { + emptyPlacement := placement.NewPlacement(). + SetInstances(instances). + SetShards(ids). + SetReplicaFactor(0). + SetIsSharded(true). + SetHasSubClusters(true). + SetInstancesPerSubCluster(opts.InstancesPerSubCluster()). + SetCutoverNanos(opts.PlacementCutoverNanosFn()()) + ph, err := newSubclusteredHelper(emptyPlacement, emptyPlacement.ReplicaFactor()+1, opts, 0) + if err != nil { + return nil, err + } + return ph, nil +} + +func newSubclusteredHelper( + p placement.Placement, + targetRF int, + opts placement.Options, + subClusterToExclude uint32, +) (placementHelper, error) { ph := &subclusteredHelper{ rf: targetRF, instances: make(map[string]placement.Instance, p.NumInstances()), @@ -99,7 +123,6 @@ func newSubclusteredHelper(p placement.Placement, targetRF int, opts placement.O } // validateInstanceWeight validates that all instances have the same weight. -// nolint: unused func (ph *subclusteredHelper) validateInstanceWeight() error { if len(ph.instances) == 0 { return nil @@ -126,7 +149,6 @@ func (ph *subclusteredHelper) validateInstanceWeight() error { return nil } -// nolint: unused func (ph *subclusteredHelper) scanCurrentLoad() { ph.shardToInstanceMap = make(map[uint32]map[placement.Instance]struct{}, len(ph.uniqueShards)) ph.groupToInstancesMap = make(map[string]map[placement.Instance]struct{}) @@ -169,7 +191,7 @@ func (ph *subclusteredHelper) scanCurrentLoad() { } // buildTargetLoad builds the target load for the placement. -// nolint +// nolint: dupl func (ph *subclusteredHelper) buildTargetLoad() { overWeightedGroups := 0 overWeight := uint32(0) @@ -198,7 +220,6 @@ func (ph *subclusteredHelper) buildTargetLoad() { } // buildTargetSubclusterLoad builds the target load for the subclusters. -// nolint: unused func (ph *subclusteredHelper) buildTargetSubclusterLoad(subClusterToExclude uint32) { totalShards := len(ph.uniqueShards) subClusters := ph.getSubclusterIds(subClusterToExclude) @@ -219,7 +240,6 @@ func (ph *subclusteredHelper) buildTargetSubclusterLoad(subClusterToExclude uint } // getSubclusterIds gets the subcluster ids slice. -// nolint: unused func (ph *subclusteredHelper) getSubclusterIds(subClusterToExclude uint32) []uint32 { subClusterIds := make([]uint32, 0, len(ph.subClusters)) for k := range ph.subClusters { @@ -232,7 +252,6 @@ func (ph *subclusteredHelper) getSubclusterIds(subClusterToExclude uint32) []uin } // getShardLen gets the shard length. -// nolint: unused func (ph *subclusteredHelper) getShardLen() int { return len(ph.uniqueShards) } diff --git a/src/cluster/placement/algo/subclustered_test.go b/src/cluster/placement/algo/subclustered_test.go new file mode 100644 index 0000000000..40dda2d795 --- /dev/null +++ b/src/cluster/placement/algo/subclustered_test.go @@ -0,0 +1,472 @@ +// Copyright (c) 2018 Uber Technologies, Inc. +// +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to deal +// in the Software without restriction, including without limitation the rights +// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +// copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: +// +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. +// +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +// THE SOFTWARE. + +package algo + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/m3db/m3/src/cluster/placement" +) + +func TestSubclusteredAlgorithm_IsCompatibleWith(t *testing.T) { + algo := newSubclusteredAlgorithm(placement.NewOptions()) + + tests := []struct { + name string + placement placement.Placement + expectError bool + }{ + { + name: "nil placement", + placement: nil, + expectError: true, + }, + { + name: "not sharded placement", + placement: placement.NewPlacement(). + SetIsSharded(false). + SetHasSubClusters(true), + expectError: true, + }, + { + name: "no subclusters placement", + placement: placement.NewPlacement(). + SetIsSharded(true). + SetHasSubClusters(false), + expectError: true, + }, + { + name: "compatible placement", + placement: placement.NewPlacement(). + SetIsSharded(true). + SetHasSubClusters(true), + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := algo.IsCompatibleWith(tt.placement) + if tt.expectError { + assert.Error(t, err) + } else { + assert.NoError(t, err) + } + }) + } +} + +func TestAssignSubClusterIDs(t *testing.T) { + tests := []struct { + name string + instancesPerSubcluster int + currentPlacement placement.Placement + newInstances []placement.Instance + expectedSubclusterIDs []uint32 + expectError bool + errorMessage string + }{ + { + name: "no current placement, 3 instances per subcluster", + instancesPerSubcluster: 3, + currentPlacement: nil, + newInstances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), + placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), + placement.NewEmptyInstance("i4", "r4", "z1", "endpoint4", 1), + placement.NewEmptyInstance("i5", "r5", "z1", "endpoint5", 1), + }, + expectedSubclusterIDs: []uint32{1, 1, 1, 2, 2}, + expectError: false, + }, + { + name: "current placement with partial subclusters", + instancesPerSubcluster: 4, + currentPlacement: placement.NewPlacement(). + SetInstances([]placement.Instance{ + placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing5", "r5", "z1", "endpoint5", 1).SetSubClusterID(2), + }). + SetIsSharded(true). + SetHasSubClusters(true), + newInstances: []placement.Instance{ + placement.NewEmptyInstance("new1", "r6", "z1", "endpoint6", 1), + placement.NewEmptyInstance("new2", "r7", "z1", "endpoint7", 1), + placement.NewEmptyInstance("new3", "r8", "z1", "endpoint8", 1), + placement.NewEmptyInstance("new4", "r9", "z1", "endpoint9", 1), + placement.NewEmptyInstance("new5", "r10", "z1", "endpoint10", 1), + }, + expectedSubclusterIDs: []uint32{2, 2, 2, 3, 3}, + expectError: false, + }, + { + name: "current placement with full subclusters", + instancesPerSubcluster: 2, + currentPlacement: placement.NewPlacement(). + SetInstances([]placement.Instance{ + placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1), // subcluster 1 full + placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(2), + placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(2), // subcluster 2 full + }). + SetIsSharded(true). + SetHasSubClusters(true), + newInstances: []placement.Instance{ + placement.NewEmptyInstance("new1", "r5", "z1", "endpoint5", 1), + placement.NewEmptyInstance("new2", "r6", "z1", "endpoint6", 1), + placement.NewEmptyInstance("new3", "r7", "z1", "endpoint7", 1), + }, + expectedSubclusterIDs: []uint32{3, 3, 4}, + expectError: false, + }, + { + name: "empty new instances", + instancesPerSubcluster: 3, + currentPlacement: nil, + newInstances: []placement.Instance{}, + expectedSubclusterIDs: []uint32{}, + expectError: false, + }, + { + name: "exactly fill one subcluster", + instancesPerSubcluster: 3, + currentPlacement: nil, + newInstances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), + placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), + }, + expectedSubclusterIDs: []uint32{1, 1, 1}, + expectError: false, + }, + { + name: "fill multiple subclusters exactly", + instancesPerSubcluster: 2, + currentPlacement: nil, + newInstances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), + placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), + placement.NewEmptyInstance("i4", "r4", "z1", "endpoint4", 1), + placement.NewEmptyInstance("i5", "r5", "z1", "endpoint5", 1), + placement.NewEmptyInstance("i6", "r6", "z1", "endpoint6", 1), + }, + expectedSubclusterIDs: []uint32{1, 1, 2, 2, 3, 3}, + expectError: false, + }, + { + name: "invalid instances per subcluster - zero", + instancesPerSubcluster: 0, + currentPlacement: nil, + newInstances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + }, + expectedSubclusterIDs: []uint32{}, + expectError: true, + errorMessage: "instances per subcluster is not set", + }, + { + name: "invalid instances per subcluster - negative", + instancesPerSubcluster: -1, + currentPlacement: nil, + newInstances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + }, + expectedSubclusterIDs: []uint32{}, + expectError: true, + errorMessage: "instances per subcluster is not set", + }, + { + name: "complex scenario with gaps in subclusters", + instancesPerSubcluster: 4, + currentPlacement: placement.NewPlacement(). + SetInstances([]placement.Instance{ + placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1), // subcluster 1 full + placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(3), // skip subcluster 2 + placement.NewEmptyInstance("existing5", "r5", "z1", "endpoint5", 1).SetSubClusterID(3), + placement.NewEmptyInstance("existing6", "r6", "z1", "endpoint6", 1).SetSubClusterID(3), // subcluster 3 full + }). + SetIsSharded(true). + SetHasSubClusters(true), + newInstances: []placement.Instance{ + placement.NewEmptyInstance("new1", "r7", "z1", "endpoint7", 1), + placement.NewEmptyInstance("new2", "r8", "z1", "endpoint8", 1), + placement.NewEmptyInstance("new3", "r9", "z1", "endpoint9", 1), + placement.NewEmptyInstance("new4", "r10", "z1", "endpoint10", 1), + }, + expectedSubclusterIDs: []uint32{3, 4, 4, 4}, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := placement.NewOptions().SetInstancesPerSubCluster(tt.instancesPerSubcluster) + algo := subclusteredPlacementAlgorithm{opts: opts} + + err := algo.assignSubClusterIDs(tt.newInstances, tt.currentPlacement) + + if tt.expectError { + assert.Error(t, err) + assert.Equal(t, tt.errorMessage, err.Error()) + } else { + assert.NoError(t, err) + assert.Equal(t, len(tt.expectedSubclusterIDs), len(tt.newInstances)) + + for i, expectedID := range tt.expectedSubclusterIDs { + assert.Equal(t, expectedID, tt.newInstances[i].SubClusterID(), + "Instance %d (ID: %s) should be in subcluster %d", + i, tt.newInstances[i].ID(), expectedID) + } + } + }) + } +} + +func TestInitialPlacement(t *testing.T) { + tests := []struct { + name string + instancesPerSubcluster int + instances []placement.Instance + shards int + replicaFactor int + expectError bool + errorMessage string + }{ + { + name: "instances per subcluster not multiple of replica factor", + instancesPerSubcluster: 5, + shards: 5, + replicaFactor: 3, + expectError: true, + errorMessage: "instances per subcluster is not a multiple of replica factor", + }, + { + name: "instances per subcluster is not set", + instancesPerSubcluster: 0, + shards: 5, + replicaFactor: 3, + expectError: true, + errorMessage: "instances per subcluster is not set", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := placement.NewOptions().SetInstancesPerSubCluster(tt.instancesPerSubcluster) + algo := subclusteredPlacementAlgorithm{opts: opts} + + // Clone instances to avoid modifying the original test data + instances := make([]placement.Instance, len(tt.instances)) + for i, instance := range tt.instances { + instances[i] = instance.Clone() + } + + // Clone shards to avoid modifying the original test data + shards := make([]uint32, tt.shards) + for i := range shards { + shards[i] = uint32(i) + } + + result, err := algo.InitialPlacement(instances, shards, tt.replicaFactor) + + if tt.expectError { + assert.Error(t, err) + assert.Equal(t, tt.errorMessage, err.Error()) + assert.Nil(t, result) + } else { + // For now, the method returns nil, nil when successful + // This will need to be updated when the implementation is complete + assert.NoError(t, err) + assert.Nil(t, result) // Current implementation returns nil + } + + // Verify that instances were assigned subcluster IDs correctly + if !tt.expectError && len(instances) > 0 { + // Check that all instances have valid subcluster IDs + for i, instance := range instances { + assert.True(t, instance.SubClusterID() > 0, + "Instance %d (ID: %s) should have a valid subcluster ID", + i, instance.ID()) + } + + // Verify subcluster assignment logic + expectedSubclusters := make(map[uint32]int) + for _, instance := range instances { + expectedSubclusters[instance.SubClusterID()]++ + } + + // Each subcluster should not exceed the configured limit + for subclusterID, count := range expectedSubclusters { + assert.True(t, count <= tt.instancesPerSubcluster, + "Subcluster %d should not exceed %d instances, got %d", + subclusterID, tt.instancesPerSubcluster, count) + } + } + }) + } +} + +func TestSubclusteredAlgorithm_InitialPlacement_SubclusterAssignment(t *testing.T) { + // Test specific subcluster assignment patterns + tests := []struct { + name string + instancesPerSubcluster int + instances []placement.Instance + expectedSubclusterIDs []uint32 + }{ + { + name: "exactly one subcluster", + instancesPerSubcluster: 3, + instances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), + placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), + }, + expectedSubclusterIDs: []uint32{1, 1, 1}, + }, + { + name: "exactly two subclusters", + instancesPerSubcluster: 2, + instances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), + placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), + placement.NewEmptyInstance("i4", "r4", "z1", "endpoint4", 1), + }, + expectedSubclusterIDs: []uint32{1, 1, 2, 2}, + }, + { + name: "partial subcluster", + instancesPerSubcluster: 4, + instances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), + placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), + }, + expectedSubclusterIDs: []uint32{1, 1, 1}, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := placement.NewOptions().SetInstancesPerSubCluster(tt.instancesPerSubcluster) + algo := subclusteredPlacementAlgorithm{opts: opts} + + // Clone instances to avoid modifying the original test data + instances := make([]placement.Instance, len(tt.instances)) + for i, instance := range tt.instances { + instances[i] = instance.Clone() + } + + shards := []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} + replicaFactor := 1 + + _, err := algo.InitialPlacement(instances, shards, replicaFactor) + assert.NoError(t, err) + + // Verify subcluster assignments match expected pattern + for i, expectedID := range tt.expectedSubclusterIDs { + assert.Equal(t, expectedID, instances[i].SubClusterID(), + "Instance %d (ID: %s) should be in subcluster %d", + i, instances[i].ID(), expectedID) + } + }) + } +} + +func TestSubclusteredAlgorithm_InitialPlacement_ErrorCases(t *testing.T) { + tests := []struct { + name string + instancesPerSubcluster int + instances []placement.Instance + shards []uint32 + replicaFactor int + expectError bool + errorMessage string + }{ + { + name: "instances per subcluster not set", + instancesPerSubcluster: 0, + instances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + }, + shards: []uint32{0}, + replicaFactor: 1, + expectError: true, + errorMessage: "instances per subcluster is not set", + }, + { + name: "negative instances per subcluster", + instancesPerSubcluster: -1, + instances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + }, + shards: []uint32{0}, + replicaFactor: 1, + expectError: true, + errorMessage: "instances per subcluster is not set", + }, + { + name: "replica factor greater than instances per subcluster", + instancesPerSubcluster: 2, + instances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), + }, + shards: []uint32{0, 1}, + replicaFactor: 3, + expectError: true, + errorMessage: "instances per subcluster is not a multiple of replica factor", + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + opts := placement.NewOptions().SetInstancesPerSubCluster(tt.instancesPerSubcluster) + algo := subclusteredPlacementAlgorithm{opts: opts} + + // Clone instances to avoid modifying the original test data + instances := make([]placement.Instance, len(tt.instances)) + for i, instance := range tt.instances { + instances[i] = instance.Clone() + } + + // Clone shards to avoid modifying the original test data + shards := make([]uint32, len(tt.shards)) + copy(shards, tt.shards) + + result, err := algo.InitialPlacement(instances, shards, tt.replicaFactor) + + assert.Error(t, err) + assert.Equal(t, tt.errorMessage, err.Error()) + assert.Nil(t, result) + }) + } +} From c1a0516913c14923922e8c92bc90074c8e3d0e08 Mon Sep 17 00:00:00 2001 From: Riya Tyagi Date: Sun, 29 Jun 2025 13:27:16 +0530 Subject: [PATCH 2/2] Fixed unit tests and moved code to assign subcluster id to the helper file --- src/cluster/placement/algo/subclustered.go | 44 +--- .../placement/algo/subclustered_helper.go | 54 ++++- .../algo/subclustered_helper_test.go | 173 +++++++++++++++- .../placement/algo/subclustered_test.go | 196 +----------------- 4 files changed, 228 insertions(+), 239 deletions(-) diff --git a/src/cluster/placement/algo/subclustered.go b/src/cluster/placement/algo/subclustered.go index feafdd2d12..0d1bac46df 100644 --- a/src/cluster/placement/algo/subclustered.go +++ b/src/cluster/placement/algo/subclustered.go @@ -39,18 +39,17 @@ func (a subclusteredPlacementAlgorithm) InitialPlacement( shards []uint32, rf int, ) (placement.Placement, error) { - if err := a.assignSubClusterIDs(instances, nil); err != nil { - return nil, err - } if a.opts.InstancesPerSubCluster()%rf != 0 { return nil, fmt.Errorf("instances per subcluster is not a multiple of replica factor") } - _, err := newSubclusteredInitHelper(instances, shards, a.opts) + _, err := newSubclusteredInitHelper(instances, shards, a.opts, rf) if err != nil { return nil, err } + // TODO: Add logic to place all shard replicas. + return nil, nil } @@ -134,40 +133,3 @@ func (a subclusteredPlacementAlgorithm) BalanceShards( // TODO: Implement subclustered balance shards logic return nil, fmt.Errorf("subclustered balance shards not yet implemented") } - -func (a subclusteredPlacementAlgorithm) assignSubClusterIDs( - instances []placement.Instance, - currPlacement placement.Placement, -) error { - instancesPerSubcluster := a.opts.InstancesPerSubCluster() - if instancesPerSubcluster <= 0 { - return fmt.Errorf("instances per subcluster is not set") - } - - // If current placement is nil, start assigning from subcluster 1 - maxSubclusterID := uint32(1) - maxSubclusterCount := 0 - if currPlacement != nil { - currInstances := currPlacement.Instances() - - for _, instance := range currInstances { - if instance.SubClusterID() > maxSubclusterID { - maxSubclusterID = instance.SubClusterID() - maxSubclusterCount = 1 - } else if instance.SubClusterID() == maxSubclusterID { - maxSubclusterCount++ - } - } - } - - // Assign subcluster IDs to new instances - for _, instance := range instances { - if maxSubclusterCount == instancesPerSubcluster { - maxSubclusterID++ - maxSubclusterCount = 0 - } - instance.SetSubClusterID(maxSubclusterID) - maxSubclusterCount++ - } - return nil -} diff --git a/src/cluster/placement/algo/subclustered_helper.go b/src/cluster/placement/algo/subclustered_helper.go index bb1345da2f..5767642844 100644 --- a/src/cluster/placement/algo/subclustered_helper.go +++ b/src/cluster/placement/algo/subclustered_helper.go @@ -67,16 +67,21 @@ func newSubclusteredInitHelper( instances []placement.Instance, ids []uint32, opts placement.Options, + rf int, ) (placementHelper, error) { + err := assignSubClusterIDs(instances, nil, opts.InstancesPerSubCluster()) + if err != nil { + return nil, err + } emptyPlacement := placement.NewPlacement(). SetInstances(instances). SetShards(ids). - SetReplicaFactor(0). + SetReplicaFactor(rf). SetIsSharded(true). SetHasSubClusters(true). SetInstancesPerSubCluster(opts.InstancesPerSubCluster()). SetCutoverNanos(opts.PlacementCutoverNanosFn()()) - ph, err := newSubclusteredHelper(emptyPlacement, emptyPlacement.ReplicaFactor()+1, opts, 0) + ph, err := newSubclusteredHelper(emptyPlacement, opts, 0) if err != nil { return nil, err } @@ -85,12 +90,11 @@ func newSubclusteredInitHelper( func newSubclusteredHelper( p placement.Placement, - targetRF int, opts placement.Options, subClusterToExclude uint32, ) (placementHelper, error) { ph := &subclusteredHelper{ - rf: targetRF, + rf: p.ReplicaFactor(), instances: make(map[string]placement.Instance, p.NumInstances()), uniqueShards: p.Shards(), log: opts.InstrumentOptions().Logger(), @@ -109,15 +113,14 @@ func newSubclusteredHelper( } ph.scanCurrentLoad() - ph.buildTargetLoad() - ph.buildTargetSubclusterLoad(subClusterToExclude) err = ph.validateSubclusterDistribution() if err != nil { return nil, err } - // TODO: Implement subclustered helper logic build target load. + ph.buildTargetLoad() + ph.buildTargetSubclusterLoad(subClusterToExclude) return ph, nil } @@ -369,3 +372,40 @@ func (ph *subclusteredHelper) validateSubclusterDistribution() error { return nil } + +func assignSubClusterIDs( + instances []placement.Instance, + currPlacement placement.Placement, + instancesPerSubcluster int, +) error { + if instancesPerSubcluster <= 0 { + return fmt.Errorf("instances per subcluster is not set") + } + + // If current placement is nil, start assigning from subcluster 1 + maxSubclusterID := uint32(1) + maxSubclusterCount := 0 + if currPlacement != nil { + currInstances := currPlacement.Instances() + + for _, instance := range currInstances { + if instance.SubClusterID() > maxSubclusterID { + maxSubclusterID = instance.SubClusterID() + maxSubclusterCount = 1 + } else if instance.SubClusterID() == maxSubclusterID { + maxSubclusterCount++ + } + } + } + + // Assign subcluster IDs to new instances + for _, instance := range instances { + if maxSubclusterCount == instancesPerSubcluster { + maxSubclusterID++ + maxSubclusterCount = 0 + } + instance.SetSubClusterID(maxSubclusterID) + maxSubclusterCount++ + } + return nil +} diff --git a/src/cluster/placement/algo/subclustered_helper_test.go b/src/cluster/placement/algo/subclustered_helper_test.go index f2de62f5e9..44a4ba5191 100644 --- a/src/cluster/placement/algo/subclustered_helper_test.go +++ b/src/cluster/placement/algo/subclustered_helper_test.go @@ -142,7 +142,7 @@ func TestNewSubclusteredHelper(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { - helper, err := newSubclusteredHelper(tt.placement, tt.targetRF, tt.opts, tt.subClusterToExclude) + helper, err := newSubclusteredHelper(tt.placement, tt.opts, tt.subClusterToExclude) if tt.expectError { assert.Error(t, err) assert.Contains(t, err.Error(), tt.errorMsg) @@ -430,7 +430,7 @@ func TestNewSubclusteredHelperIntegration(t *testing.T) { placement := createTestPlacement(instances, []uint32{1, 2, 3}, 3, 3) opts := createTestOptions(3) - helper, err := newSubclusteredHelper(placement, 3, opts, 0) + helper, err := newSubclusteredHelper(placement, opts, 0) require.NoError(t, err) require.NotNil(t, helper) @@ -461,3 +461,172 @@ func TestValidateInstanceWeightIntegration(t *testing.T) { err := ph.validateInstanceWeight() assert.NoError(t, err) } + +func TestAssignSubClusterIDs(t *testing.T) { + tests := []struct { + name string + instancesPerSubcluster int + currentPlacement placement.Placement + newInstances []placement.Instance + expectedSubclusterIDs []uint32 + expectError bool + errorMessage string + }{ + { + name: "no current placement, 3 instances per subcluster", + instancesPerSubcluster: 3, + currentPlacement: nil, + newInstances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), + placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), + placement.NewEmptyInstance("i4", "r4", "z1", "endpoint4", 1), + placement.NewEmptyInstance("i5", "r5", "z1", "endpoint5", 1), + }, + expectedSubclusterIDs: []uint32{1, 1, 1, 2, 2}, + expectError: false, + }, + { + name: "current placement with partial subclusters", + instancesPerSubcluster: 4, + currentPlacement: placement.NewPlacement(). + SetInstances([]placement.Instance{ + placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing5", "r5", "z1", "endpoint5", 1).SetSubClusterID(2), + }). + SetIsSharded(true). + SetHasSubClusters(true), + newInstances: []placement.Instance{ + placement.NewEmptyInstance("new1", "r6", "z1", "endpoint6", 1), + placement.NewEmptyInstance("new2", "r7", "z1", "endpoint7", 1), + placement.NewEmptyInstance("new3", "r8", "z1", "endpoint8", 1), + placement.NewEmptyInstance("new4", "r9", "z1", "endpoint9", 1), + placement.NewEmptyInstance("new5", "r10", "z1", "endpoint10", 1), + }, + expectedSubclusterIDs: []uint32{2, 2, 2, 3, 3}, + expectError: false, + }, + { + name: "current placement with full subclusters", + instancesPerSubcluster: 2, + currentPlacement: placement.NewPlacement(). + SetInstances([]placement.Instance{ + placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1), // subcluster 1 full + placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(2), + placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(2), // subcluster 2 full + }). + SetIsSharded(true). + SetHasSubClusters(true), + newInstances: []placement.Instance{ + placement.NewEmptyInstance("new1", "r5", "z1", "endpoint5", 1), + placement.NewEmptyInstance("new2", "r6", "z1", "endpoint6", 1), + placement.NewEmptyInstance("new3", "r7", "z1", "endpoint7", 1), + }, + expectedSubclusterIDs: []uint32{3, 3, 4}, + expectError: false, + }, + { + name: "empty new instances", + instancesPerSubcluster: 3, + currentPlacement: nil, + newInstances: []placement.Instance{}, + expectedSubclusterIDs: []uint32{}, + expectError: false, + }, + { + name: "exactly fill one subcluster", + instancesPerSubcluster: 3, + currentPlacement: nil, + newInstances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), + placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), + }, + expectedSubclusterIDs: []uint32{1, 1, 1}, + expectError: false, + }, + { + name: "fill multiple subclusters exactly", + instancesPerSubcluster: 2, + currentPlacement: nil, + newInstances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), + placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), + placement.NewEmptyInstance("i4", "r4", "z1", "endpoint4", 1), + placement.NewEmptyInstance("i5", "r5", "z1", "endpoint5", 1), + placement.NewEmptyInstance("i6", "r6", "z1", "endpoint6", 1), + }, + expectedSubclusterIDs: []uint32{1, 1, 2, 2, 3, 3}, + expectError: false, + }, + { + name: "invalid instances per subcluster - zero", + instancesPerSubcluster: 0, + currentPlacement: nil, + newInstances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + }, + expectedSubclusterIDs: []uint32{}, + expectError: true, + errorMessage: "instances per subcluster is not set", + }, + { + name: "invalid instances per subcluster - negative", + instancesPerSubcluster: -1, + currentPlacement: nil, + newInstances: []placement.Instance{ + placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), + }, + expectedSubclusterIDs: []uint32{}, + expectError: true, + errorMessage: "instances per subcluster is not set", + }, + { + name: "complex scenario with gaps in subclusters", + instancesPerSubcluster: 4, + currentPlacement: placement.NewPlacement(). + SetInstances([]placement.Instance{ + placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1), + placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1), // subcluster 1 full + placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(3), // skip subcluster 2 + placement.NewEmptyInstance("existing5", "r5", "z1", "endpoint5", 1).SetSubClusterID(3), + placement.NewEmptyInstance("existing6", "r6", "z1", "endpoint6", 1).SetSubClusterID(3), // subcluster 3 full + }). + SetIsSharded(true). + SetHasSubClusters(true), + newInstances: []placement.Instance{ + placement.NewEmptyInstance("new1", "r7", "z1", "endpoint7", 1), + placement.NewEmptyInstance("new2", "r8", "z1", "endpoint8", 1), + placement.NewEmptyInstance("new3", "r9", "z1", "endpoint9", 1), + placement.NewEmptyInstance("new4", "r10", "z1", "endpoint10", 1), + }, + expectedSubclusterIDs: []uint32{3, 4, 4, 4}, + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + err := assignSubClusterIDs(tt.newInstances, tt.currentPlacement, tt.instancesPerSubcluster) + if tt.expectError { + assert.Error(t, err) + assert.Equal(t, tt.errorMessage, err.Error()) + } else { + assert.NoError(t, err) + assert.Equal(t, len(tt.expectedSubclusterIDs), len(tt.newInstances)) + + for i, expectedID := range tt.expectedSubclusterIDs { + assert.Equal(t, expectedID, tt.newInstances[i].SubClusterID(), + "Instance %d (ID: %s) should be in subcluster %d", + i, tt.newInstances[i].ID(), expectedID) + } + } + }) + } +} diff --git a/src/cluster/placement/algo/subclustered_test.go b/src/cluster/placement/algo/subclustered_test.go index 40dda2d795..d0b9bb34b7 100644 --- a/src/cluster/placement/algo/subclustered_test.go +++ b/src/cluster/placement/algo/subclustered_test.go @@ -75,180 +75,6 @@ func TestSubclusteredAlgorithm_IsCompatibleWith(t *testing.T) { }) } } - -func TestAssignSubClusterIDs(t *testing.T) { - tests := []struct { - name string - instancesPerSubcluster int - currentPlacement placement.Placement - newInstances []placement.Instance - expectedSubclusterIDs []uint32 - expectError bool - errorMessage string - }{ - { - name: "no current placement, 3 instances per subcluster", - instancesPerSubcluster: 3, - currentPlacement: nil, - newInstances: []placement.Instance{ - placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), - placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), - placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), - placement.NewEmptyInstance("i4", "r4", "z1", "endpoint4", 1), - placement.NewEmptyInstance("i5", "r5", "z1", "endpoint5", 1), - }, - expectedSubclusterIDs: []uint32{1, 1, 1, 2, 2}, - expectError: false, - }, - { - name: "current placement with partial subclusters", - instancesPerSubcluster: 4, - currentPlacement: placement.NewPlacement(). - SetInstances([]placement.Instance{ - placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1), - placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1), - placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1), - placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(1), - placement.NewEmptyInstance("existing5", "r5", "z1", "endpoint5", 1).SetSubClusterID(2), - }). - SetIsSharded(true). - SetHasSubClusters(true), - newInstances: []placement.Instance{ - placement.NewEmptyInstance("new1", "r6", "z1", "endpoint6", 1), - placement.NewEmptyInstance("new2", "r7", "z1", "endpoint7", 1), - placement.NewEmptyInstance("new3", "r8", "z1", "endpoint8", 1), - placement.NewEmptyInstance("new4", "r9", "z1", "endpoint9", 1), - placement.NewEmptyInstance("new5", "r10", "z1", "endpoint10", 1), - }, - expectedSubclusterIDs: []uint32{2, 2, 2, 3, 3}, - expectError: false, - }, - { - name: "current placement with full subclusters", - instancesPerSubcluster: 2, - currentPlacement: placement.NewPlacement(). - SetInstances([]placement.Instance{ - placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1), - placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1), // subcluster 1 full - placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(2), - placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(2), // subcluster 2 full - }). - SetIsSharded(true). - SetHasSubClusters(true), - newInstances: []placement.Instance{ - placement.NewEmptyInstance("new1", "r5", "z1", "endpoint5", 1), - placement.NewEmptyInstance("new2", "r6", "z1", "endpoint6", 1), - placement.NewEmptyInstance("new3", "r7", "z1", "endpoint7", 1), - }, - expectedSubclusterIDs: []uint32{3, 3, 4}, - expectError: false, - }, - { - name: "empty new instances", - instancesPerSubcluster: 3, - currentPlacement: nil, - newInstances: []placement.Instance{}, - expectedSubclusterIDs: []uint32{}, - expectError: false, - }, - { - name: "exactly fill one subcluster", - instancesPerSubcluster: 3, - currentPlacement: nil, - newInstances: []placement.Instance{ - placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), - placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), - placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), - }, - expectedSubclusterIDs: []uint32{1, 1, 1}, - expectError: false, - }, - { - name: "fill multiple subclusters exactly", - instancesPerSubcluster: 2, - currentPlacement: nil, - newInstances: []placement.Instance{ - placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), - placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), - placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), - placement.NewEmptyInstance("i4", "r4", "z1", "endpoint4", 1), - placement.NewEmptyInstance("i5", "r5", "z1", "endpoint5", 1), - placement.NewEmptyInstance("i6", "r6", "z1", "endpoint6", 1), - }, - expectedSubclusterIDs: []uint32{1, 1, 2, 2, 3, 3}, - expectError: false, - }, - { - name: "invalid instances per subcluster - zero", - instancesPerSubcluster: 0, - currentPlacement: nil, - newInstances: []placement.Instance{ - placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), - }, - expectedSubclusterIDs: []uint32{}, - expectError: true, - errorMessage: "instances per subcluster is not set", - }, - { - name: "invalid instances per subcluster - negative", - instancesPerSubcluster: -1, - currentPlacement: nil, - newInstances: []placement.Instance{ - placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), - }, - expectedSubclusterIDs: []uint32{}, - expectError: true, - errorMessage: "instances per subcluster is not set", - }, - { - name: "complex scenario with gaps in subclusters", - instancesPerSubcluster: 4, - currentPlacement: placement.NewPlacement(). - SetInstances([]placement.Instance{ - placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1), - placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1), - placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1), // subcluster 1 full - placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(3), // skip subcluster 2 - placement.NewEmptyInstance("existing5", "r5", "z1", "endpoint5", 1).SetSubClusterID(3), - placement.NewEmptyInstance("existing6", "r6", "z1", "endpoint6", 1).SetSubClusterID(3), // subcluster 3 full - }). - SetIsSharded(true). - SetHasSubClusters(true), - newInstances: []placement.Instance{ - placement.NewEmptyInstance("new1", "r7", "z1", "endpoint7", 1), - placement.NewEmptyInstance("new2", "r8", "z1", "endpoint8", 1), - placement.NewEmptyInstance("new3", "r9", "z1", "endpoint9", 1), - placement.NewEmptyInstance("new4", "r10", "z1", "endpoint10", 1), - }, - expectedSubclusterIDs: []uint32{3, 4, 4, 4}, - expectError: false, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - opts := placement.NewOptions().SetInstancesPerSubCluster(tt.instancesPerSubcluster) - algo := subclusteredPlacementAlgorithm{opts: opts} - - err := algo.assignSubClusterIDs(tt.newInstances, tt.currentPlacement) - - if tt.expectError { - assert.Error(t, err) - assert.Equal(t, tt.errorMessage, err.Error()) - } else { - assert.NoError(t, err) - assert.Equal(t, len(tt.expectedSubclusterIDs), len(tt.newInstances)) - - for i, expectedID := range tt.expectedSubclusterIDs { - assert.Equal(t, expectedID, tt.newInstances[i].SubClusterID(), - "Instance %d (ID: %s) should be in subcluster %d", - i, tt.newInstances[i].ID(), expectedID) - } - } - }) - } -} - func TestInitialPlacement(t *testing.T) { tests := []struct { name string @@ -340,10 +166,12 @@ func TestSubclusteredAlgorithm_InitialPlacement_SubclusterAssignment(t *testing. instancesPerSubcluster int instances []placement.Instance expectedSubclusterIDs []uint32 + replicaFactor int }{ { name: "exactly one subcluster", instancesPerSubcluster: 3, + replicaFactor: 3, instances: []placement.Instance{ placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), @@ -354,30 +182,21 @@ func TestSubclusteredAlgorithm_InitialPlacement_SubclusterAssignment(t *testing. { name: "exactly two subclusters", instancesPerSubcluster: 2, + replicaFactor: 2, instances: []placement.Instance{ placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), - placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), - placement.NewEmptyInstance("i4", "r4", "z1", "endpoint4", 1), + placement.NewEmptyInstance("i3", "r1", "z1", "endpoint3", 1), + placement.NewEmptyInstance("i4", "r2", "z1", "endpoint4", 1), }, expectedSubclusterIDs: []uint32{1, 1, 2, 2}, }, - { - name: "partial subcluster", - instancesPerSubcluster: 4, - instances: []placement.Instance{ - placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1), - placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1), - placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1), - }, - expectedSubclusterIDs: []uint32{1, 1, 1}, - }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { opts := placement.NewOptions().SetInstancesPerSubCluster(tt.instancesPerSubcluster) - algo := subclusteredPlacementAlgorithm{opts: opts} + algo := newSubclusteredAlgorithm(opts) // Clone instances to avoid modifying the original test data instances := make([]placement.Instance, len(tt.instances)) @@ -386,9 +205,8 @@ func TestSubclusteredAlgorithm_InitialPlacement_SubclusterAssignment(t *testing. } shards := []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} - replicaFactor := 1 - _, err := algo.InitialPlacement(instances, shards, replicaFactor) + _, err := algo.InitialPlacement(instances, shards, tt.replicaFactor) assert.NoError(t, err) // Verify subcluster assignments match expected pattern