8000 Added code to balance shards by RiyaTyagi · Pull Request #4368 · m3db/m3 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Added code to balance shards #4368

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 8 commits into
base: sub-cluster-12
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 12 additions & 5 deletions src/cluster/placement/algo/subclustered.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ func (a subclusteredPlacementAlgorithm) AddReplica(p placement.Placement) (place
return nil, fmt.Errorf("AddReplica is not supported for subclustered placement")
}

// nolint:dupl
func (a subclusteredPlacementAlgorithm) RemoveInstances(
p placement.Placement,
instanceIDs []string,
Expand All @@ -80,9 +81,9 @@ func (a subclusteredPlacementAlgorithm) RemoveInstances(
return nil, err
}

// if err := ph.optimize(safe); err != nil {
// return nil, err
// }
if err := ph.optimize(safe); err != nil {
return nil, err
}

if p, _, err = addInstanceToPlacement(ph.generatePlacement(), leavingInstance, withShards); err != nil {
return nil, err
Expand Down Expand Up @@ -182,7 +183,13 @@ func (a subclusteredPlacementAlgorithm) BalanceShards(
if err := a.IsCompatibleWith(p); err != nil {
return nil, err
}
ph, err := newSubclusteredHelper(p, a.opts, 0)
if err != nil {
return nil, err
}
if err := ph.optimize(unsafe); err != nil {
return nil, fmt.Errorf("shard balance optimization failed: %w", err)
}

// TODO: Implement subclustered balance shards logic
return nil, fmt.Errorf("subclustered balance shards not yet implemented")
return tryCleanupShardState(ph.generatePlacement(), a.opts)
}
78 changes: 58 additions & 20 deletions src/cluster/placement/algo/subclustered_helper.go
1E0A
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ package algo

import (
"container/heap"
"errors"
"fmt"
"math"
"math/rand"
Expand All @@ -35,19 +34,13 @@ import (
"github.com/m3db/m3/src/cluster/shard"
)

var (
// nolint: unused
errSubclusteredHelperNotImplemented = errors.New("subclustered helper methods not yet implemented")
)

type validationOperation int

const (
validationOpRemoval validationOperation = iota
validationOpAddition
)

// nolint
type subclusteredHelper struct {
targetLoad map[string]int
shardToInstanceMap map[uint32]map[placement.Instance]struct{}
Expand All @@ -64,7 +57,6 @@ type subclusteredHelper struct {
}

// subcluster is a subcluster in the placement.
// nolint
type subcluster struct {
id uint32
targetShardCount int
Expand Down Expand Up @@ -388,7 +380,6 @@ func (ph *subclusteredHelper) getShardLen() int {
}

// assignShardToInstance assigns a shard to an instance.
// nolint: unused
func (ph *subclusteredHelper) assignShardToInstance(s shard.Shard, to placement.Instance) {
to.Shards().Add(s)

Expand All @@ -400,7 +391,6 @@ func (ph *subclusteredHelper) assignShardToInstance(s shard.Shard, to placement.
ph.subClusters[to.SubClusterID()].instanceShardCounts[to.ID()]++
}

// nolint
// Instances returns the list of instances managed by the PlacementHelper.
func (ph *subclusteredHelper) Instances() []placement.Instance {
res := make([]placement.Instance, 0, len(ph.instances))
Expand Down Expand Up @@ -559,7 +549,6 @@ func (ph *subclusteredHelper) findMapKeyIntersection(map1, map2 map[uint32]int)
}

// CanMoveShard checks if the shard can be moved from the instance to the target isolation group.
// nolint: unused
func (ph *subclusteredHelper) CanMoveShard(shard uint32, from placement.Instance, toIsolationGroup string) bool {
if from != nil {
if from.IsolationGroup() == toIsolationGroup {
Expand Down Expand Up @@ -617,12 +606,17 @@ func (ph *subclusteredHelper) placeShards(
}

// addInstance adds an instance to the placement.
// nolint: unused
func (ph *subclusteredHelper) addInstance(addingInstance placement.Instance) error {
ph.reclaimLeavingShards(addingInstance)
return ph.assignLoadToInstanceUnsafe(addingInstance)
}

func (ph *subclusteredHelper) assignLoadToInstanceSafe(addingInstance placement.Instance) error {
return ph.assignTargetLoad(addingInstance, func(from, to placement.Instance) bool {
return ph.moveOneShardInState(from, to, shard.Unknown)
})
}

func (ph *subclusteredHelper) assignLoadToInstanceUnsafe(addingInstance placement.Instance) error {
return ph.assignTargetLoad(addingInstance, func(from, to placement.Instance) bool {
return ph.moveOneShard(from, to)
Expand All @@ -636,6 +630,7 @@ func (ph *subclusteredHelper) assignTargetLoad(

targetLoad := ph.targetLoadForInstance(targetInstance.ID())
// First try to move shards from other subclusters

instanceHeap, err := ph.buildInstanceHeap(ph.removeSubClusterInstances(targetInstance.SubClusterID()), false)
if err != nil {
return err
Expand Down Expand Up @@ -776,6 +771,7 @@ func (ph *subclusteredHelper) optimizeForSubclusterBalance(
}
}
}

for _, s := range shards {
shardID := s.ID()
tempCounts := make(map[string]int)
Expand All @@ -789,7 +785,7 @@ func (ph *subclusteredHelper) optimizeForSubclusterBalance(
// hasn't been reached.
shardScores = append(shardScores, shardSkewScore{
shard: s,
skewAfterRemoval: math.MaxInt32,
skewAfterRemoval: math.MaxInt32 - count,
})
continue
}
Expand Down Expand Up @@ -831,15 +827,60 @@ func (ph *subclusteredHelper) optimizeForSubclusterBalance(
return result
}

// nolint: dupl
func (ph *subclusteredHelper) mostUnderLoadedInstance() (placement.Instance, bool) {
var (
res placement.Instance
maxLoadGap int
totalLoadSurplus int
)
// nolint: dupl
for id, instance := range ph.instances {
loadGap := ph.targetLoad[id] - loadOnInstance(instance)
if loadGap > maxLoadGap {
maxLoadGap = loadGap
res = instance
}
if loadGap == maxLoadGap && res != nil && res.ID() > id {
res = instance
}
if loadGap < 0 {
totalLoadSurplus -= loadGap
}
}
if maxLoadGap > 0 && totalLoadSurplus != 0 {
return res, true
}
return nil, false
}

// optimize rebalances the load distribution in the cluster.
// nolint: unused
func (ph *subclusteredHelper) optimize(t optimizeType) error {
// TODO: Implement subclustered optimization logic
return fmt.Errorf("subclustered optimize not yet implemented: %w", errSubclusteredHelperNotImplemented)
var fn assignLoadFn
switch t {
case safe:
fn = ph.assignLoadToInstanceSafe
case unsafe:
fn = ph.assignLoadToInstanceUnsafe
}
uniq := make(map[string]struct{}, len(ph.instances))
for {
ins, ok := ph.mostUnderLoadedInstance()
if !ok {
return nil
}
if _, exist := uniq[ins.ID()]; exist {
return nil
}

uniq[ins.ID()] = struct{}{}
if err := fn(ins); err != nil {
return err
}
}
}

// generatePlacement generates a placement.
// nolint: unused
func (ph *subclusteredHelper) generatePlacement() placement.Placement {
var instances = make([]placement.Instance, 0, len(ph.instances))

Expand Down Expand Up @@ -872,7 +913,6 @@ func (ph *subclusteredHelper) generatePlacement() placement.Placement {

// reclaimLeavingShards reclaims all the leaving shards on the given instance
// by pulling them back from the rest of the cluster.
// nolint: unused
func (ph *subclusteredHelper) reclaimLeavingShards(instance placement.Instance) {
if instance.Shards().NumShardsForState(shard.Leaving) == 0 {
// Shortcut if there is nothing to be reclaimed.
Expand All @@ -890,7 +930,6 @@ func (ph *subclusteredHelper) reclaimLeavingShards(instance placement.Instance)

// returnInitializingShards returns all the initializing shards on the given instance
// by returning them back to the original owners.
// nolint: unused
func (ph *subclusteredHelper) returnInitializingShards(instance placement.Instance) {
shardSet := getShardMap(instance.Shards().All())
ph.returnInitializingShardsToSource(shardSet, instance, ph.Instances())
Expand Down Expand Up @@ -930,7 +969,6 @@ func (ph *subclusteredHelper) returnInitializingShardsToSource(
// validateSubclusterDistribution validates that:
// 1. Number of isolation groups equals replica factor (rf)
// 2. For complete subclusters, nodes per isolation group = instancesPerSubcluster / rf
// nolint: unused
func (ph *subclusteredHelper) validateSubclusterDistribution() error {
if len(ph.instances) == 0 {
return nil
Expand Down
1 change: 1 addition & 0 deletions src/cluster/placement/algo/subclustered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func TestSubclusteredAlgorithm_IsCompatibleWith(t *testing.T) {
})
}
}

func TestSubclusteredAlgorithm_InitialPlacement(t *testing.T) {
tests := []struct {
name string
Expand Down
4 changes: 2 additions & 2 deletions src/cluster/placement/placement_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -860,10 +860,10 @@ func TestValidateSubclusteredPlacement(t *testing.T) {
instancesPerSubcluster: 6,
replicaFactor: 2,
instances: func() []Instance {
i1 := NewEmptyInstance("i1", "r1", "z1", "endpoint1", 1).SetSubClusterID(1)
i1 := NewEmptyInstance("i1", "IG1", "z1", "endpoint1", 1).SetSubClusterID(1)
i1.Shards().Add(shard.NewShard(1).SetState(shard.Available))
i1.Shards().Add(shard.NewShard(2).SetState(shard.Available))
i2 := NewEmptyInstance("i2", "r1", "z1", "endpoint2", 1).SetSubClusterID(1)
i2 := NewEmptyInstance("i2", "IG1", "z1", "endpoint2", 1).SetSubClusterID(1)
i2.Shards().Add(shard.NewShard(1).SetState(shard.Available))
i2.Shards().Add(shard.NewShard(2).SetState(shard.Available))

Expand Down
0