diff --git a/event/bus.go b/event/bus.go index 7dbf868cf..46ba7a716 100644 --- a/event/bus.go +++ b/event/bus.go @@ -26,9 +26,9 @@ func (eb *EventBus) AddListener(eType Event, lFunc listenerFunc) { } // Dispatch calls all listeners registered to the given Event -func (eb *EventBus) Dispatch(ev *Event) { +func (eb *EventBus) Dispatch(ev Event) { wg := sync.WaitGroup{} - for _, lFunc := range eb.lFuncMap[*ev] { + for _, lFunc := range eb.lFuncMap[ev] { wg.Add(1) go func() { lFunc() diff --git a/event/bus_test.go b/event/bus_test.go index d19d91575..0ab68a9dc 100644 --- a/event/bus_test.go +++ b/event/bus_test.go @@ -18,7 +18,7 @@ func TestEventBus(t *testing.T) { ev := Event("TypeOne") bus.AddListener(ev, tl.HandleEvent) - bus.Dispatch(&ev) + bus.Dispatch(ev) select { case <-tl.evchan: @@ -34,8 +34,8 @@ func TestEventBusNoDispatch(t *testing.T) { ev2 := Event("TypeTwo") bus.AddListener(ev1, tl.HandleEvent) - bus.Dispatch(&ev2) - bus.Dispatch(&ev1) + bus.Dispatch(ev2) + bus.Dispatch(ev1) close(tl.evchan) diff --git a/event/event.go b/event/event.go index 57e84db01..24a8add8e 100644 --- a/event/event.go +++ b/event/event.go @@ -1,8 +1,10 @@ package event var ( - GlobalEvent = Event("GlobalEvent") - JobEvent = Event("JobEvent") + // Occurs when any Job's target is touched + JobTargetChangeEvent = Event("JobTargetChangeEvent") + // Occurs when any Job's target state is touched + JobTargetStateChangeEvent = Event("JobTargetStateChangeEvent") ) type Event string diff --git a/registry/event.go b/registry/event.go index 9f0b8171e..23860a71d 100644 --- a/registry/event.go +++ b/registry/event.go @@ -26,30 +26,34 @@ func NewEventStream(client etcd.Client, registry Registry) (*EventStream, error) return &EventStream{client, reg}, nil } -func (es *EventStream) Stream(idx uint64, sendFunc func(*event.Event), stop chan bool) { +func (es *EventStream) Stream(idx uint64, sendFunc func(event.Event), stop chan bool) { etcdchan := make(chan *etcd.Result) go watch(es.etcd, idx, etcdchan, es.registry.keyPrefix, stop) go filter(etcdchan, es.registry.keyPrefix, sendFunc, stop) } -func filter(etcdchan chan *etcd.Result, prefix string, sendFunc func(*event.Event), stop chan bool) { - parse := func(res *etcd.Result) *event.Event { +func filter(etcdchan chan *etcd.Result, prefix string, sendFunc func(event.Event), stop chan bool) { + parse := func(res *etcd.Result) (ev event.Event, ok bool) { if res == nil || res.Node == nil { - return nil + return } - if !strings.HasPrefix(res.Node.Key, prefix) { - return nil + // ignore everything but the job namespace + if !strings.HasPrefix(res.Node.Key, path.Join(prefix, jobPrefix)) { + return } - var ev event.Event - if strings.HasPrefix(res.Node.Key, path.Join(prefix, jobPrefix)) { - ev = event.JobEvent - } else { - ev = event.GlobalEvent + _, baseName := path.Split(res.Node.Key) + switch baseName { + case "target-state": + ev = event.JobTargetStateChangeEvent + ok = true + case "target": + ev = event.JobTargetChangeEvent + ok = true + default: } - - return &ev + return } for { @@ -58,14 +62,10 @@ func filter(etcdchan chan *etcd.Result, prefix string, sendFunc func(*event.Even return case res := <-etcdchan: log.V(1).Infof("Received %v from etcd watch", res) - - ev := parse(res) - if ev == nil { - continue + if ev, ok := parse(res); ok { + log.V(1).Infof("Translated %v to Event(Type=%s)", res, ev) + sendFunc(ev) } - - log.V(1).Infof("Translated %v to Event(Type=%s)", res, ev) - sendFunc(ev) } } } diff --git a/registry/event_test.go b/registry/event_test.go index d8aa4289a..6cab7a783 100644 --- a/registry/event_test.go +++ b/registry/event_test.go @@ -10,41 +10,81 @@ import ( func TestFilterEtcdEvents(t *testing.T) { tests := []struct { - in *etcd.Result - ev *event.Event + in string + ev []event.Event }{ { - in: nil, - ev: nil, + in: "", + ev: []event.Event{}, }, { - in: &etcd.Result{Node: &etcd.Node{Key: "/"}}, - ev: nil, + in: "/", + ev: []event.Event{}, }, { - in: &etcd.Result{Node: &etcd.Node{Key: "/fleet"}}, - ev: &event.GlobalEvent, + in: "/fleet", + ev: []event.Event{}, }, { - in: &etcd.Result{Node: &etcd.Node{Key: "/fleet/job"}}, - ev: &event.JobEvent, + in: "/fleet/job", + ev: []event.Event{}, + }, + { + in: "/fleet/job/foo/object", + ev: []event.Event{}, + }, + { + in: "/fleet/machine/asdf", + ev: []event.Event{}, + }, + { + in: "/fleet/state/asdf", + ev: []event.Event{}, + }, + { + in: "/fleet/job/asdf/target-state", + ev: []event.Event{event.JobTargetStateChangeEvent}, + }, + { + in: "/fleet/job/foobarbaz/target-state", + ev: []event.Event{event.JobTargetStateChangeEvent}, + }, + { + in: "/fleet/job/asdf/target", + ev: []event.Event{event.JobTargetChangeEvent}, }, } for i, tt := range tests { - etcdchan := make(chan *etcd.Result) - stopchan := make(chan bool) - prefix := "/fleet" + for _, action := range []string{"set", "update", "create", "delete"} { + etcdchan := make(chan *etcd.Result) + stopchan := make(chan bool) + prefix := "/fleet" - send := func(ev *event.Event) { - if !reflect.DeepEqual(tt.ev, ev) { - t.Errorf("case %d: received incorrect event\nexpected %#v\ngot %#v", i, tt.ev, ev) + got := make([]event.Event, 0) + send := func(ev event.Event) { + got = append(got, ev) } - } - go filter(etcdchan, prefix, send, stopchan) + go filter(etcdchan, prefix, send, stopchan) - etcdchan <- tt.in - close(stopchan) + var res *etcd.Result + if tt.in != "" { + res = &etcd.Result{ + Node: &etcd.Node{ + Key: tt.in, + }, + Action: action, + } + } + etcdchan <- res + + if !reflect.DeepEqual(tt.ev, got) { + t.Errorf("case %d: received incorrect event\nexpected %#v\ngot %#v", i, tt.ev, got) + t.Logf("action: %v", action) + } + + close(stopchan) + } } } diff --git a/server/server.go b/server/server.go index 3357d03d7..50f3d3e7a 100644 --- a/server/server.go +++ b/server/server.go @@ -95,8 +95,9 @@ func New(cfg config.Config) (*Server, error) { } eBus := event.NewEventBus() - eBus.AddListener(event.JobEvent, ar.Trigger) - eBus.AddListener(event.GlobalEvent, e.Trigger) + eBus.AddListener(event.JobTargetChangeEvent, ar.Trigger) + eBus.AddListener(event.JobTargetStateChangeEvent, ar.Trigger) + eBus.AddListener(event.JobTargetStateChangeEvent, e.Trigger) listeners, err := activation.Listeners(false) if err != nil {