8000 fleet engine constantly running by jonboulle · Pull Request #742 · coreos/fleet · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
This repository was archived by the owner on Jan 30, 2020. It is now read-only.

fleet engine constantly running #742

Merged
merged 2 commits into from
Aug 5, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions event/bus.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,9 @@ func (eb *EventBus) AddListener(eType Event, lFunc listenerFunc) {
}

// Dispatch calls all listeners registered to the given Event
func (eb *EventBus) Dispatch(ev *Event) {
func (eb *EventBus) Dispatch(ev Event) {
wg := sync.WaitGroup{}
for _, lFunc := range eb.lFuncMap[*ev] {
for _, lFunc := range eb.lFuncMap[ev] {
wg.Add(1)
go func() {
lFunc()
Expand Down
6 changes: 3 additions & 3 deletions event/bus_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func TestEventBus(t *testing.T) {
ev := Event("TypeOne")
bus.AddListener(ev, tl.HandleEvent)

bus.Dispatch(&ev)
bus.Dispatch(ev)

select {
case <-tl.evchan:
Expand All @@ -34,8 +34,8 @@ func TestEventBusNoDispatch(t *testing.T) {
ev2 := Event("TypeTwo")
bus.AddListener(ev1, tl.HandleEvent)

bus.Dispatch(&ev2)
bus.Dispatch(&ev1)
bus.Dispatch(ev2)
bus.Dispatch(ev1)

close(tl.evchan)

Expand Down
6 changes: 4 additions & 2 deletions event/event.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package event

var (
GlobalEvent = Event("GlobalEvent")
JobEvent = Event("JobEvent")
// Occurs when any Job's target is touched
JobTargetChangeEvent = Event("JobTargetChangeEvent")
// Occurs when any Job's target state is touched
JobTargetStateChangeEvent = Event("JobTargetStateChangeEvent")
)

type Event string
40 changes: 20 additions & 20 deletions registry/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,34 @@ func NewEventStream(client etcd.Client, registry Registry) (*EventStream, error)
return &EventStream{client, reg}, nil
}

func (es *EventStream) Stream(idx uint64, sendFunc func(*event.Event), stop chan bool) {
func (es *EventStream) Stream(idx uint64, sendFunc func(event.Event), stop chan bool) {
etcdchan := make(chan *etcd.Result)
go watch(es.etcd, idx, etcdchan, es.registry.keyPrefix, stop)
go filter(etcdchan, es.registry.keyPrefix, sendFunc, stop)
}

func filter(etcdchan chan *etcd.Result, prefix string, sendFunc func(*event.Event), stop chan bool) {
parse := func(res *etcd.Result) *event.Event {
func filter(etcdchan chan *etcd.Result, prefix string, sendFunc func(event.Event), stop chan bool) {
parse := func(res *etcd.Result) (ev event.Event, ok bool) {
if res == nil || res.Node == nil {
return nil
return
}

if !strings.HasPrefix(res.Node.Key, prefix) {
return nil
// ignore everything but the job namespace
if !strings.HasPrefix(res.Node.Key, path.Join(prefix, jobPrefix)) {
return
}

var ev event.Event
if strings.HasPrefix(res.Node.Key, path.Join(prefix, jobPrefix)) {
ev = event.JobEvent
} else {
ev = event.GlobalEvent
_, baseName := path.Split(res.Node.Key)
switch baseName {
case "target-state":
ev = event.JobTargetStateChangeEvent
ok = true
case "target":
ev = event.JobTargetChangeEvent
ok = true
default:
}

return &ev
return
}

for {
Expand All @@ -58,14 +62,10 @@ func filter(etcdchan chan *etcd.Result, prefix string, sendFunc func(*event.Even
return
case res := <-etcdchan:
log.V(1).Infof("Received %v from etcd watch", res)

ev := parse(res)
if ev == nil {
continue
if ev, ok := parse(res); ok {
log.V(1).Infof("Translated %v to Event(Type=%s)", res, ev)
sendFunc(ev)
}

log.V(1).Infof("Translated %v to Event(Type=%s)", res, ev)
sendFunc(ev)
}
}
}
Expand Down
80 changes: 60 additions & 20 deletions registry/event_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,41 +10,81 @@ import (

func TestFilterEtcdEvents(t *testing.T) {
tests := []struct {
in *etcd.Result
ev *event.Event
in string
ev []event.Event
}{
{
in: nil,
ev: nil,
in: "",
ev: []event.Event{},
},
{
in: &etcd.Result{Node: &etcd.Node{Key: "/"}},
ev: nil,
in: "/",
ev: []event.Event{},
},
{
in: &etcd.Result{Node: &etcd.Node{Key: "/fleet"}},
ev: &event.GlobalEvent,
in: "/fleet",
ev: []event.Event{},
},
{
in: &etcd.Result{Node: &etcd.Node{Key: "/fleet/job"}},
ev: &event.JobEvent,
in: "/fleet/job",
ev: []event.Event{},
},
{
in: "/fleet/job/foo/object",
ev: []event.Event{},
},
{
in: "/fleet/machine/asdf",
ev: []event.Event{},
},
{
in: "/fleet/state/asdf",
ev: []event.Event{},
},
{
in: "/fleet/job/asdf/target-state",
ev: []event.Event{event.JobTargetStateChangeEvent},
},
{
in: "/fleet/job/foobarbaz/target-state",
ev: []event.Event{event.JobTargetStateChangeEvent},
},
{
in: "/fleet/job/asdf/target",
ev: []event.Event{event.JobTargetChangeEvent},
},
}

for i, tt := range tests {
etcdchan := make(chan *etcd.Result)
stopchan := make(chan bool)
prefix := "/fleet"
for _, action := range []string{"set", "update", "create", "delete"} {
etcdchan := make(chan *etcd.Result)
stopchan := make(chan bool)
prefix := "/fleet"

send := func(ev *event.Event) {
if !reflect.DeepEqual(tt.ev, ev) {
t.Errorf("case %d: received incorrect event\nexpected %#v\ngot %#v", i, tt.ev, ev)
got := make([]event.Event, 0)
send := func(ev event.Event) {
got = append(got, ev)
}
}

go filter(etcdchan, prefix, send, stopchan)
go filter(etcdchan, prefix, send, stopchan)

etcdchan <- tt.in
close(stopchan)
var res *etcd.Result
if tt.in != "" {
res = &etcd.Result{
Node: &etcd.Node{
Key: tt.in,
},
Action: action,
}
}
etcdchan <- res

if !reflect.DeepEqual(tt.ev, got) {
t.Errorf("case %d: received incorrect event\nexpected %#v\ngot %#v", i, tt.ev, got) 6DB6
t.Logf("action: %v", action)
}

close(stopchan)
}
}
}
5 changes: 3 additions & 2 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,9 @@ func New(cfg config.Config) (*Server, error) {
}

eBus := event.NewEventBus()
eBus.AddListener(event.JobEvent, ar.Trigger)
eBus.AddListener(event.GlobalEvent, e.Trigger)
eBus.AddListener(event.JobTargetChangeEvent, ar.Trigger)
eBus.AddListener(event.JobTargetStateChangeEvent, ar.Trigger)
eBus.AddListener(event.JobTargetStateChangeEvent, e.Trigger)

listeners, err := activation.Listeners(false)
if err != nil {
Expand Down
0