8000 agent,engine: fix bugs in rescheduling for replaced units by dongsupark · Pull Request #1698 · coreos/fleet · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
This repository was archived by the owner on Jan 30, 2020. It is now read-only.

agent,engine: fix bugs in rescheduling for replaced units #1698

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 15 additions & 15 deletions agent/reconcile_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,48 +275,48 @@ func TestAbleToRun(t *testing.T) {
tests := []struct {
dState *AgentState
job *job.Job
want bool
want job.JobAction
}{
// nothing to worry about
{
dState: NewAgentState(&machine.MachineState{ID: "123"}),
job: &job.Job{Name: "easy-street.service", Unit: unit.UnitFile{}},
want: true,
want: job.JobActionSchedule,
},

// match MachineID
{
dState: NewAgentState(&machine.MachineState{ID: "XYZ"}),
job: newTestJobWithXFleetValues(t, "MachineID=XYZ"),
want: true,
want: job.JobActionSchedule,
},

// mismatch MachineID
{
dState: NewAgentState(&machine.MachineState{ID: "123"}),
job: newTestJobWithXFleetValues(t, "MachineID=XYZ"),
want: false,
want: job.JobActionUnschedule,
},

// match MachineMetadata
{
dState: NewAgentState(&machine.MachineState{ID: "123", Metadata: map[string]string{"region": "us-west"}}),
job: newTestJobWithXFleetValues(t, "MachineMetadata=region=us-west"),
want: true,
want: job.JobActionSchedule,
},

// Machine metadata ignored when no MachineMetadata in Job
{
dState: NewAgentState(&machine.MachineState{ID: "123", Metadata: map[string]string{"region": "us-west"}}),
job: &job.Job{Name: "easy-street.service", Unit: unit.UnitFile{}},
want: true,
want: job.JobActionSchedule,
},

// mismatch MachineMetadata
{
dState: NewAgentState(&machine.MachineState{ID: "123", Metadata: map[string]string{"region": "us-west"}}),
job: newTestJobWithXFleetValues(t, "MachineMetadata=region=us-east"),
want: false,
want: job.JobActionUnschedule,
},

// peer scheduled locally
Expand All @@ -328,7 +328,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "MachineOf=pong.service"),
want: true,
want: job.JobActionSchedule,
},

// multiple peers scheduled locally
Expand All @@ -341,14 +341,14 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "MachineOf=pong.service\nMachineOf=ping.service"),
want: true,
want: job.JobActionSchedule,
},

// peer not scheduled locally
{
dState: NewAgentState(&machine.MachineState{ID: "123"}),
job: newTestJobWithXFleetValues(t, "MachineOf=ping.service"),
want: false,
want: job.JobActionUnschedule,
},

// one of multiple peers not scheduled locally
Expand All @@ -360,7 +360,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "MachineOf=pong.service\nMachineOf=ping.service"),
want: false,
want: job.JobActionUnschedule,
},

// no conflicts found
Expand All @@ -372,7 +372,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "Conflicts=pong.service"),
want: true,
want: job.JobActionSchedule,
},

// conflicts found
Expand All @@ -384,7 +384,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "Conflicts=ping.service"),
want: false,
want: job.JobActionUnschedule,
},

// no replaces found
Expand All @@ -396,7 +396,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "Replaces=pong.service"),
want: true,
want: job.JobActionSchedule,
},

// replaces found
Expand All @@ -408,7 +408,7 @@ func TestAbleToRun(t *testing.T) {
},
},
job: newTestJobWithXFleetValues(t, "Replaces=ping.service"),
want: false,
want: job.JobActionReschedule,
},
}

Expand Down
27 changes: 17 additions & 10 deletions agent/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,36 +124,43 @@ func globMatches(pattern, target string) bool {
// - Agent must have all required Peers of the Job scheduled locally (if any)
// - Job must not conflict with any other Units scheduled to the agent
// - Job must specially handle replaced units to be rescheduled
func (as *AgentState) AbleToRun(j *job.Job) (bool, string) {
func (as *AgentState) AbleToRun(j *job.Job) (jobAction job.JobAction, errstr string) {
if tgt, ok := j.RequiredTarget(); ok && !as.MState.MatchID(tgt) {
return false, fmt.Sprintf("agent ID %q does not match required %q", as.MState.ID, tgt)
return job.JobActionUnschedule, fmt.Sprintf("agent ID %q does not match required %q", as.MState.ID, tgt)
}

metadata := j.RequiredTargetMetadata()
if len(metadata) != 0 {
if !machine.HasMetadata(as.MState, metadata) {
return false, "local Machine metadata insufficient"
return job.JobActionUnschedule, "local Machine metadata insufficient"
}
}

peers := j.Peers()
if len(peers) != 0 {
for _, peer := range peers {
if !as.unitScheduled(peer) {
return false, fmt.Sprintf("required peer Unit(%s) is not scheduled locally", peer)
return job.JobActionUnschedule, fmt.Sprintf("required peer Unit(%s) is not scheduled locally", peer)
}
}
}

if cExists, cJobName := as.HasConflict(j.Name, j.Conflicts()); cExists {
return false, fmt.Sprintf("found conflict with locally-scheduled Unit(%s)", cJobName)
return job.JobActionUnschedule, fmt.Sprintf("found conflict with locally-scheduled Unit(%s)", cJobName)
}

// Handle Replace option specially, by returning a special string
// "jobreschedule" as reason.
if cExists, _ := as.hasReplace(j.Name, j.Replaces()); cExists {
return false, job.JobReschedule
// Handle Replace option specially for rescheduling the unit
if cExists, cJobName := as.hasReplace(j.Name, j.Replaces()); cExists {
return job.JobActionReschedule, fmt.Sprintf("found replace with locally-scheduled Unit(%s)", cJobName)
}

return true, ""
return job.JobActionSchedule, ""
}

func (as *AgentState) GetReplacedUnit(j *job.Job) (string, error) {
cExists, replaced := as.hasReplace(j.Name, j.Replaces())
if !cExists {
return "", fmt.Errorf("cannot find units to be replaced for Unit(%s)", j.Name)
}
return replaced, nil
}
121 changes: 88 additions & 33 deletions engine/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,56 +84,111 @@ func (r *Reconciler) calculateClusterTasks(clust *clusterState, stopchan chan st
return true
}

go func() {
defer close(taskchan)
decide := func(j *job.Job) (jobAction job.JobAction, reason string) {
if j.TargetState == job.JobStateInactive {
return job.JobActionUnschedule, "target state inactive"
}

agents := clust.agents()

as, ok := agents[j.TargetMachineID]
if !ok {
metrics.ReportEngineReconcileFailure(metrics.MachineAway)
return job.JobActionUnschedule, fmt.Sprintf("target Machine(%s) went away", j.TargetMachineID)
}

if act, ableReason := as.AbleToRun(j); act != job.JobActionSchedule {
metrics.ReportEngineReconcileFailure(metrics.RunFailure)
return act, fmt.Sprintf("target Machine(%s) unable to run unit: %v",
j.TargetMachineID, ableReason)
}

return job.JobActionSchedule, ""
}

handle_reschedule := func(j *job.Job, reason string) bool {
isRescheduled := false

agents := clust.agents()

as, ok := agents 6D4E [j.TargetMachineID]
if !ok {
metrics.ReportEngineReconcileFailure(metrics.MachineAway)
return false
}

for _, cj := range clust.jobs {
if !cj.Scheduled() {
continue
}
if j.Name != cj.Name {
continue
}

replacedUnit, err := as.GetReplacedUnit(j)
if err != nil {
log.Debugf("No unit to reschedule: %v", err)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeUnscheduleUnit, reason, replacedUnit, j.TargetMachineID) {
log.Infof("Job(%s) unschedule send failed", replacedUnit)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

dec, err := r.sched.DecideReschedule(clust, j)
if err != nil {
log.Debugf("Unable to schedule Job(%s): %v", j.Name, err)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeAttemptScheduleUnit, reason, replacedUnit, dec.machineID) {
log.Infof("Job(%s) attemptschedule send failed", replacedUnit)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}
clust.schedule(replacedUnit, dec.machineID)
log.Debugf("rescheduling unit %s to machine %s", replacedUnit, dec.machineID)

clust.schedule(j.Name, j.TargetMachineID)
log.Debugf("scheduling unit %s to machine %s", j.Name, j.TargetMachineID)

isRescheduled = true
}

return isRescheduled
}

go func() {
defer close(taskchan)

for _, j := range clust.jobs {
if !j.Scheduled() {
continue
}

decide := func() (unschedule bool, reason string) {
if j.TargetState == job.JobStateInactive {
unschedule = true
reason = "target state inactive"
return
}

as, ok := agents[j.TargetMachineID]
if !ok {
unschedule = true
reason = fmt.Sprintf("target Machine(%s) went away", j.TargetMachineID)
metrics.ReportEngineReconcileFailure(metrics.MachineAway)
return
}

var able bool
var ableReason string
if able, ableReason = as.AbleToRun(j); !able {
unschedule = true
if ableReason == job.JobReschedule {
reason = ableReason
} else {
reason = fmt.Sprintf("target Machine(%s) unable to run unit", j.TargetMachineID)
metrics.ReportEngineReconcileFailure(metrics.RunFailure)
}
return
}

return
act, reason := decide(j)
if act == job.JobActionReschedule && handle_reschedule(j, reason) {
log.Debugf("Job(%s) is rescheduled: %v", j.Name, reason)
continue
}

unschedule, reason := decide()
if !unschedule {
if act != job.JobActionUnschedule {
log.Debugf("Job(%s) is not to be unscheduled, reason: %v", j.Name, reason)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
continue
}

if !send(taskTypeUnscheduleUnit, reason, j.Name, j.TargetMachineID) {
log.Infof("Job(%s) send failed.", j.Name)
metrics.ReportEngineReconcileFailure(metrics.ScheduleFailure)
return
}

log.Debugf("Job(%s) unscheduling.", j.Name)
clust.unschedule(j.Name)
}

Expand Down
39 changes: 38 additions & 1 deletion engine/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type decision struct {

type Scheduler interface {
Decide(*clusterState, *job.Job) (*decision, error)
DecideReschedule(*clusterState, *job.Job) (*decision, error)
}

type leastLoadedScheduler struct{}
Expand All @@ -41,7 +42,7 @@ func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decis

var target *agent.AgentState
for _, as := range agents {
if able, _ := as.AbleToRun(j); !able {
if act, _ := as.AbleToRun(j); act == job.JobActionUnschedule {
continue
}

Expand All @@ -61,6 +62,42 @@ func (lls *leastLoadedScheduler) Decide(clust *clusterState, j *job.Job) (*decis
return &dec, nil
}

// DecideReschedule() decides scheduling in a much simpler way than
// Decide(). It just tries to find out another free machine to be scheduled,
// except for the current target machine. It does not have to run
// as.AbleToRun(), because its job action must have been already decided
// before getting into the function.
func (lls *leastLoadedScheduler) DecideReschedule(clust *clusterState, j *job.Job) (*decision, error) {
agents := lls.sortedAgents(clust)

if len(agents) == 0 {
return nil, fmt.Errorf("zero agents available")
}

found := false
var target *agent.AgentState
for _, as := range agents {
if as.MState.ID == j.TargetMachineID {
continue
}

as := as
target = as
found = true
break
}

if !found {
return nil, fmt.Errorf("no agents able to run job")
}

dec := decision{
machineID: target.MState.ID,
}

return &dec, nil
}

// sortedAgents returns a list of AgentState objects sorted ascending
// by the number of scheduled units
func (lls *leastLoadedScheduler) sortedAgents(clust *clusterState) []*agent.AgentState {
Expand Down
8 changes: 8 additions & 0 deletions functional/fixtures/units/replace-kick0.service
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
[Unit]
Description=Test Unit

[Service]
ExecStart=/bin/bash -c "while true; do echo Hello, World!; sleep 1; done"

[X-Fleet]
Replaces=replace.0.service
Loading
0