8000 Added code for validating subclustered placement by RiyaTyagi · Pull Request #4362 · m3db/m3 · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Added code for validating subclustered placement #4362

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: sub-cluster-6
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/cluster/placement/algo/subclustered_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ func TestSubclusteredAlgorithm_InitialPlacement(t *testing.T) {
assert.Equal(t, tt.replicaFactor, result.ReplicaFactor())
assert.True(t, result.IsSharded())
assert.True(t, result.HasSubClusters())
assert.NoError(t, placement.Validate(result))
assert.Equal(t, tt.instancesPerSubc 8000 luster, result.InstancesPerSubCluster())
}
})
Expand Down
85 changes: 85 additions & 0 deletions src/cluster/placement/placement.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import (
const (
// uninitializedShardSetID represents uninitialized shard set id.
uninitializedShardSetID = 0
// uninitializedSubClusterID represents uninitialized subcluster id.
uninitializedSubClusterID = uint32(0)
)

var (
Expand All @@ -42,6 +44,7 @@ var (
errDuplicatedShards = errors.New("invalid placement, there are duplicated shards in one replica")
errUnexpectedShards = errors.New("invalid placement, there are unexpected shard ids on instance")
errMirrorNotSharded = errors.New("invalid placement, mirrored placement must be sharded")
errSubclusteredNotSharded = errors.New("invalid placement, subclustered placement must be sharded")
)

type placement struct {
Expand Down Expand Up @@ -286,6 +289,10 @@ func validate(p Placement) error {
return errMirrorNotSharded
}

if p.HasSubClusters() && !p.IsSharded() {
return errSubclusteredNotSharded
}

shardCountMap := convertShardSliceToMap(p.Shards())
if len(shardCountMap) != len(p.Shards()) {
return errDuplicatedShards
Expand All @@ -309,6 +316,13 @@ func validate(p Placement) error {
if instance.Shards().NumShards() != 0 && !p.IsSharded() {
return fmt.Errorf("instance %s contains shards in a non-sharded placement", instance.String())
}
if instance.SubClusterID() == uninitializedSubClusterID && p.HasSubClusters() {
return fmt.Errorf("instance %s has uninitialized subcluster id in a subclustered placement", instance.String())
}
if instance.SubClusterID() != uninitializedSubClusterID && !p.HasSubClusters() {
return fmt.Errorf("instance %s has subcluster id %d in a non-subclustered placement",
instance.String(), instance.SubClusterID())
}
shardSetID := instance.ShardSetID()
if shardSetID > maxShardSetID {
return fmt.Errorf("instance %s shard set id %d is larger than max shard set id %d in the placement", instance.String(), shardSetID, maxShardSetID)
Expand Down Expand Up @@ -422,6 +436,77 @@ func validate(p Placement) error {
return fmt.Errorf("invalid shard count for shard %d: expected %d, actual %d", shard, p.ReplicaFactor(), c)
}
}

if p.HasSubClusters() {
return validateSubclusteredPlacement(p)
}
return nil
}

func validateSubclusteredPlacement(p Placement) error {
shardToSubclusterMap := make(map[uint32]map[uint32]struct{})
subClusterToInstanceMap := make(map[uint32]map[Instance]struct{})
shardToIsolationGroupMap := make(map[uint32]map[string]struct{})
instancesPerSubCluster := p.InstancesPerSubCluster()

for _, instance := range p.Instances() {
if instance.IsLeaving() {
continue
}
if _, exist := subClusterToInstanceMap[instance.SubClusterID()]; !exist {
subClusterToInstanceMap[instance.SubClusterID()] = make(map[Instance]struct{})
}
subClusterToInstanceMap[instance.SubClusterID()][instance] = struct{}{}

for _, s := range instance.Shards().All() {
if s.State() == shard.Leaving {
continue
}
if _, exist := shardToIsolationGroupMap[s.ID()]; !exist {
shardToIsolationGroupMap[s.ID()] = make(map[string]struct{})
}
shardToIsolationGroupMap[s.ID()][instance.IsolationGroup()] = struct{}{}
if _, exist := shardToSubclusterMap[s.ID()]; !exist {
shardToSubclusterMap[s.ID()] = make(map[uint32]struct{})
}
shardToSubclusterMap[s.ID()][instance.SubClusterID()] = struct{}{}
}
}

for shard, subclusters := range shardToSubclusterMap {
firstReplica := true
shardSubclusterID := uninitializedSubClusterID

// If the movement is happening than the shard can be shared by at most two subclusters.
// One which is giving the shard and one which is receiving the shard.
if len(subclusters) > 2 {
return fmt.Errorf("invalid shard %d, expected at most 2 subclusters (only during shard movement),"+
"actual %d", shard, len(subclusters))
}
if len(subclusters) == 2 {
// Check if the shard is shared among subclusters while moving from one subcluster to another.
// If the movement is happening than the shard can be shared by at most two subclusters.
// One which is giving the shard and one which is receiving the shard.
for subcluster := range subclusters {
if firstReplica {
shardSubclusterID = subcluster
firstReplica = false
continue
}
currSubclusterID := subcluster
if len(subClusterToInstanceMap[shardSubclusterID]) == instancesPerSubCluster &&
len(subClusterToInstanceMap[currSubclusterID]) == instancesPerSubCluster {
return fmt.Errorf("invalid shard %d, expected subcluster id %d, actual %d",
shard, shardSubclusterID, currSubclusterID)
}
}
}

if len(shardToIsolationGroupMap[shard]) != p.ReplicaFactor() {
return fmt.Errorf("invalid shard %d, expected %d isolation groups, actual %d",
shard, p.ReplicaFactor(), len(shardToIsolationGroupMap[shard]))
}
}
return nil
}

Expand Down
Loading
0