8000 Added code for assigning subcluster Ids and adding validations in initial placement by RiyaTyagi · Pull Request #4360 · m3db/m3 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Added code for assigning subcluster Ids and adding validations in initial placement #4360

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: sub-custer-4
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 16 additions & 2 deletions src/cluster/placement/algo/subclustered.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ func newSubclusteredAlgorithm(opts placement.Options) placement.Algorithm {
}

func (a subclusteredPlacementAlgorithm) IsCompatibleWith(p placement.Placement) error {
if p == nil {
return fmt.Errorf("placement is nil")
}

if !p.IsSharded() {
return errIncompatibleWithSubclusteredAlgo
}
Expand All @@ -35,8 +39,18 @@ func (a subclusteredPlacementAlgorithm) InitialPlacement(
shards []uint32,
rf int,
) (placement.Placement, error) {
// TODO: Implement subclustered initial placement logic
return nil, fmt.Errorf("subclustered initial placement not yet implemented")
if a.opts.InstancesPerSubCluster()%rf != 0 {
return nil, fmt.Errorf("instances per subcluster is not a multiple of replica factor")
}

_, err := newSubclusteredInitHelper(instances, shards, a.opts, rf)
if err != nil {
return nil, err
}

// TODO: Add logic to place all shard replicas.

return nil, nil
}

func (a subclusteredPlacementAlgorithm) AddReplica(p placement.Placement) (placement.Placement, error) {
Expand Down
83 changes: 71 additions & 12 deletions src/cluster/placement/algo/subclustered_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,38 @@ type subcluster struct {
instanceShardCounts map[string]int
}

// nolint
func newSubclusteredHelper(p placement.Placement, targetRF int, opts placement.Options, subClusterToExclude uint32) (placementHelper, error) {
func newSubclusteredInitHelper(
instances []placement.Instance,
ids []uint32,
opts placement.Options,
rf int,
) (placementHelper, error) {
err := assignSubClusterIDs(instances, nil, opts.InstancesPerSubCluster())
if err != nil {
return nil, err
}
emptyPlacement := placement.NewPlacement().
SetInstances(instances).
SetShards(ids).
SetReplicaFactor(rf).
SetIsSharded(true).
SetHasSubClusters(true).
SetInstancesPerSubCluster(opts.InstancesPerSubCluster()).
SetCutoverNanos(opts.PlacementCutoverNanosFn()())
ph, err := newSubclusteredHelper(emptyPlacement, opts, 0)
if err != nil {
return nil, err
}
return ph, nil
}

func newSubclusteredHelper(
p placement.Placement,
opts placement.Options,
subClusterToExclude uint32,
) (placementHelper, error) {
ph := &subclusteredHelper{
rf: targetRF,
rf: p.ReplicaFactor(),
instances: make(map[string]placement.Instance, p.NumInstances()),
uniqueShards: p.Shards(),
log: opts.InstrumentOptions().Logger(),
Expand All @@ -85,21 +113,19 @@ func newSubclusteredHelper(p placement.Placement, targetRF int, opts placement.O
}

ph.scanCurrentLoad()
ph.buildTargetLoad()
ph.buildTargetSubclusterLoad(subClusterToExclude)

err = ph.validateSubclusterDistribution()
if err != nil {
return nil, err
}

// TODO: Implement subclustered helper logic build target load.
ph.buildTargetLoad()
ph.buildTargetSubclusterLoad(subClusterToExclude)

return ph, nil
}

// validateInstanceWeight validates that all instances have the same weight.
// nolint: unused
func (ph *subclusteredHelper) validateInstanceWeight() error {
if len(ph.instances) == 0 {
return nil
Expand All @@ -126,7 +152,6 @@ func (ph *subclusteredHelper) validateInstanceWeight() error {
return nil
}

// nolint: unused
func (ph *subclusteredHelper) scanCurrentLoad() {
ph.shardToInstanceMap = make(map[uint32]map[placement.Instance]struct{}, len(ph.uniqueShards))
ph.groupToInstancesMap = make(map[string]map[placement.Instance]struct{})
Expand Down Expand Up @@ -169,7 +194,7 @@ func (ph *subclusteredHelper) scanCurrentLoad() {
}

// buildTargetLoad builds the target load for the placement.
// nolint
// nolint: dupl
func (ph *subclusteredHelper) buildTargetLoad() {
overWeightedGroups := 0
overWeight := uint32(0)
Expand Down Expand Up @@ -198,7 +223,6 @@ func (ph *subclusteredHelper) buildTargetLoad() {
}

// buildTargetSubclusterLoad builds the target load for the subclusters.
// nolint: unused
func (ph *subclusteredHelper) buildTargetSubclusterLoad(subClusterToExclude uint32) {
totalShards := len(ph.uniqueShards)
subClusters := ph.getSubclusterIds(subClusterToExclude)
Expand All @@ -219,7 +243,6 @@ func (ph *subclusteredHelper) buildTargetSubclusterLoad(subClusterToExclude uint
}

// getSubclusterIds gets the subcluster ids slice.
// nolint: unused
func (ph *subclusteredHelper) getSubclusterIds(subClusterToExclude uint32) []uint32 {
subClusterIds := make([]uint32, 0, len(ph.subClusters))
for k := range ph.subClusters {
Expand All @@ -232,7 +255,6 @@ func (ph *subclusteredHelper) getSubclusterIds(subClusterToExclude uint32) []uin
}

// getShardLen gets the shard length.
// nolint: unused
func (ph *subclusteredHelper) getShardLen() int {
return len(ph.uniqueShards)
}
Expand Down Expand Up @@ -350,3 +372,40 @@ func (ph *subclusteredHelper) validateSubclusterDistribution() error {

return nil
}

func assignSubClusterIDs(
instances []placement.Instance,
currPlacement placement.Placement,
instancesPerSubcluster int,
) error {
if instancesPerSubcluster <= 0 {
return fmt.Errorf("instances per subcluster is not set")
}

// If current placement is nil, start assigning from subcluster 1
maxSubclusterID := uint32(1)
maxSubclusterCount := 0
if currPlacement != nil {
currInstances := currPlacement.Instances()

for _, instance := range currInstances {
if instance.SubClusterID() > maxSubclusterID {
maxSubclusterID = instance.SubClusterID()
maxSubclusterCount = 1
} else if instance.SubClusterID() == maxSubclusterID {
maxSubclusterCount++
}
}
}

// Assign subcluster IDs to new instances
for _, instance := range instances {
if maxSubclusterCount == instancesPerSubcluster {
maxSubclusterID++
maxSubclusterCount = 0
}
instance.SetSubClusterID(maxSubclusterID)
maxSubclusterCount++
}
return nil
}
173 changes: 171 additions & 2 deletions src/cluster/placement/algo/subclustered_helper_test.go
6D40 9E88
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ func TestNewSubclusteredHelper(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
helper, err := newSubclusteredHelper(tt.placement, tt.targetRF, tt.opts, tt.subClusterToExclude)
helper, err := newSubclusteredHelper(tt.placement, tt.opts, tt.subClusterToExclude)
if tt.expectError {
assert.Error(t, err)
assert.Contains(t, err.Error(), tt.errorMsg)
Expand Down Expand Up @@ -430,7 +430,7 @@ func TestNewSubclusteredHelperIntegration(t *testing.T) {
placement := createTestPlacement(instances, []uint32{1, 2, 3}, 3, 3)
opts := createTestOptions(3)

helper, err := newSubclusteredHelper(placement, 3, opts, 0)
helper, err := newSubclusteredHelper(placement, opts, 0)
require.NoError(t, err)
require.NotNil(t, helper)

Expand Down Expand Up @@ -461,3 +461,172 @@ func TestValidateInstanceWeightIntegration(t *testing.T) {
err := ph.validateInstanceWeight()
assert.NoError(t, err)
}

func TestAssignSubClusterIDs(t *testing.T) {
tests := []struct {
name string
instancesPerSubcluster int
currentPlacement placement.Placement
newInstances []placement.Instance
expectedSubclusterIDs []uint32
expectError bool
errorMessage string
}{
{
name: "no current placement, 3 instances per subcluster",
instancesPerSubcluster: 3,
currentPlacement: nil,
newInstances: []placement.Instance{
placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1),
placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1),
placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1),
placement.NewEmptyInstance("i4", "r4", "z1", "endpoint4", 1),
placement.NewEmptyInstance("i5", "r5", "z1", "endpoint5", 1),
},
expectedSubclusterIDs: []uint32{1, 1, 1, 2, 2},
expectError: false,
},
{
name: "current placement with partial subclusters",
instancesPerSubcluster: 4,
currentPlacement: placement.NewPlacement().
SetInstances([]placement.Instance{
placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1),
placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1),
placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1),
placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(1),
placement.NewEmptyInstance("existing5", "r5", "z1", "endpoint5", 1).SetSubClusterID(2),
}).
SetIsSharded(true).
SetHasSubClusters(true),
newInstances: []placement.Instance{
placement.NewEmptyInstance("new1", "r6", "z1", "endpoint6", 1),
placement.NewEmptyInstance("new2", "r7", "z1", "endpoint7", 1),
placement.NewEmptyInstance("new3", "r8", "z1", "endpoint8", 1),
placement.NewEmptyInstance("new4", "r9", "z1", "endpoint9", 1),
placement.NewEmptyInstance("new5", "r10", "z1", "endpoint10", 1),
},
expectedSubclusterIDs: []uint32{2, 2, 2, 3, 3},
expectError: false,
},
{
name: "current placement with full subclusters",
instancesPerSubcluster: 2,
currentPlacement: placement.NewPlacement().
SetInstances([]placement.Instance{
placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1),
placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1), // subcluster 1 full
placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(2),
placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(2), // subcluster 2 full
}).
SetIsSharded(true).
SetHasSubClusters(true),
newInstances: []placement.Instance{
placement.NewEmptyInstance("new1", "r5", "z1", "endpoint5", 1),
placement.NewEmptyInstance("new2", "r6", "z1", "endpoint6", 1),
placement.NewEmptyInstance("new3", "r7", "z1", "endpoint7", 1),
},
expectedSubclusterIDs: []uint32{3, 3, 4},
expectError: false,
},
{
name: "empty new instances",
instancesPerSubcluster: 3,
currentPlacement: nil,
newInstances: []placement.Instance{},
expectedSubclusterIDs: []uint32{},
expectError: false,
},
{
name: "exactly fill one subcluster",
instancesPerSubcluster: 3,
currentPlacement: nil,
newInstances: []placement.Instance{
placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1),
placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1),
placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1),
},
expectedSubclusterIDs: []uint32{1, 1, 1},
expectError: false,
},
{
name: "fill multiple subclusters exactly",
instancesPerSubcluster: 2,
currentPlacement: nil,
newInstances: []placement.Instance{
placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1),
placement.NewEmptyInstance("i2", "r2", "z1", "endpoint2", 1),
placement.NewEmptyInstance("i3", "r3", "z1", "endpoint3", 1),
placement.NewEmptyInstance("i4", "r4", "z1", "endpoint4", 1),
placement.NewEmptyInstance("i5", "r5", "z1", "endpoint5", 1),
placement.NewEmptyInstance("i6", "r6", "z1", "endpoint6", 1),
},
expectedSubclusterIDs: []uint32{1, 1, 2, 2, 3, 3},
expectError: false,
},
{
name: "invalid instances per subcluster - zero",
instancesPerSubcluster: 0,
currentPlacement: nil,
newInstances: []placement.Instance{
placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1),
},
expectedSubclusterIDs: []uint32{},
expectError: true,
errorMessage: "instances per subcluster is not set",
},
{
name: "invalid instances per subcluster - negative",
instancesPerSubcluster: -1,
currentPlacement: nil,
newInstances: []placement.Instance{
placement.NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1),
},
expectedSubclusterIDs: []uint32{},
expectError: true,
errorMessage: "instances per subcluster is not set",
},
{
name: "complex scenario with gaps in subclusters",
instancesPerSubcluster: 4,
currentPlacement: placement.NewPlacement().
SetInstances([]placement.Instance{
placement.NewEmptyInstance("existing1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1),
placement.NewEmptyInstance("existing2", "r2", "z1", "endpoint2", 1).SetSubClusterID(1),
placement.NewEmptyInstance("existing3", "r3", "z1", "endpoint3", 1).SetSubClusterID(1), // subcluster 1 full
placement.NewEmptyInstance("existing4", "r4", "z1", "endpoint4", 1).SetSubClusterID(3), // skip subcluster 2
placement.NewEmptyInstance("existing5", "r5", "z1", "endpoint5", 1).SetSubClusterID(3),
placement.NewEmptyInstance("existing6", "r6", "z1", "endpoint6", 1).SetSubClusterID(3), // subcluster 3 full
}).
SetIsSharded(true).
SetHasSubClusters(true),
newInstances: []placement.Instance{
placement.NewEmptyInstance("new1", "r7", "z1", "endpoint7", 1),
placement.NewEmptyInstance("new2", "r8", "z1", "endpoint8", 1),
placement.NewEmptyInstance("new3", "r9", "z1", "endpoint9", 1),
placement.NewEmptyInstance("new4", "r10", "z1", "endpoint10", 1),
},
expectedSubclusterIDs: []uint32{3, 4, 4, 4},
expectError: false,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
err := assignSubClusterIDs(tt.newInstances, tt.currentPlacement, tt.instancesPerSubcluster)
if tt.expectError {
assert.Error(t, err)
assert.Equal(t, tt.errorMessage, err.Error())
} else {
assert.NoError(t, err)
assert.Equal(t, len(tt.expectedSubclusterIDs), len(tt.newInstances))

for i, expectedID := range tt.expectedSubclusterIDs {
assert.Equal(t, expectedID, tt.newInstances[i].SubClusterID(),
"Instance %d (ID: %s) should be in subcluster %d",
i, tt.newInstances[i].ID(), expectedID)
}
}
})
}
}
Loading
0