@@ -14,6 +14,7 @@ import (
14
14
15
15
"github.com/ovh/cds/engine/api/cache"
16
16
"github.com/ovh/cds/engine/api/group"
17
+ "github.com/ovh/cds/engine/api/observability"
17
18
"github.com/ovh/cds/engine/api/permission"
18
19
"github.com/ovh/cds/engine/service"
19
20
"github.com/ovh/cds/sdk"
@@ -37,14 +38,16 @@ type eventsBroker struct {
37
38
dbFunc func () * gorp.DbMap
38
39
cache cache.Store
39
40
clientsLen int64
41
+ router * Router
40
42
}
41
43
42
44
// AddClient add a client to the client map
43
- func (b * eventsBroker ) addClient (client eventsBrokerSubscribe ) {
45
+ func (b * eventsBroker ) addClient (ctx context. Context , client eventsBrokerSubscribe ) {
44
46
b .mutex .Lock ()
45
47
defer b .mutex .Unlock ()
46
48
b .clients [client .UUID ] = client
47
49
b .clientsLen ++
50
+ observability .Record (ctx , b .router .Stats .SSEClients , 1 )
48
51
}
49
52
50
53
// CleanAll cleans all clients
@@ -55,22 +58,24 @@ func (b *eventsBroker) cleanAll() {
55
58
for c , v := range b .clients {
56
59
close (v .Queue )
57
60
delete (b .clients , c )
61
+ observability .Record (b .router .Background , b .router .Stats .SSEClients , - 1 )
58
62
}
59
63
}
60
64
b .clientsLen = 0
61
65
}
62
66
63
- func (b * eventsBroker ) disconnectClient (uuid string ) {
67
+ func (b * eventsBroker ) disconnectClient (ctx context. Context , uuid string ) {
64
68
b .disconnectedMutex .Lock ()
65
69
defer b .disconnectedMutex .Unlock ()
66
70
b .disconnected [uuid ] = true
71
+ observability .Record (ctx , b .router .Stats .SSEClients , - 1 )
67
72
}
68
73
69
74
//Init the eventsBroker
70
75
func (b * eventsBroker ) Init (c context.Context ) {
71
76
// Start cache Subscription
72
77
subscribeFunc := func () {
73
- cacheSubscribe (c , b .messages , b .cache )
78
+ b . cacheSubscribe (c , b .messages , b .cache )
74
79
}
75
80
sdk .GoRoutine ("eventsBroker.Init.CacheSubscribe" , subscribeFunc )
76
81
@@ -80,7 +85,7 @@ func (b *eventsBroker) Init(c context.Context) {
80
85
sdk .GoRoutine ("eventsBroker.Init.Start" , startFunc )
81
86
}
82
87
83
- func cacheSubscribe (c context.Context , cacheMsgChan chan <- sdk.Event , store cache.Store ) {
88
+ func ( b * eventsBroker ) cacheSubscribe (c context.Context , cacheMsgChan chan <- sdk.Event , store cache.Store ) {
84
89
pubSub := store .Subscribe ("events_pubsub" )
85
90
tick := time .NewTicker (50 * time .Millisecond )
86
91
defer tick .Stop ()
@@ -107,6 +112,7 @@ func cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cach
107
112
case "sdk.EventPipelineBuild" , "sdk.EventJob" :
108
113
continue
109
114
}
115
+ observability .Record (c , b .router .Stats .SSEEvents , 1 )
110
116
cacheMsgChan <- e
111
117
}
112
118
}
@@ -145,7 +151,7 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
145
151
}
146
152
147
153
// Add this client to the map of those that should receive updates
148
- b .addClient (client )
154
+ b .addClient (ctx , client )
149
155
150
156
// Set the headers related to event streaming.
151
157
w .Header ().Set ("Content-Type" , "text/event-stream" )
@@ -166,11 +172,11 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
166
172
select {
167
173
case <- ctx .Done ():
168
174
log .Info ("events.Http: context done" )
169
- b .disconnectClient (client .UUID )
175
+ b .disconnectClient (ctx , client .UUID )
170
176
break leave
171
177
case <- r .Context ().Done ():
172
178
log .Info ("events.Http: client disconnected" )
173
- b .disconnectClient (client .UUID )
179
+ b .disconnectClient (ctx , client .UUID )
174
180
break leave
175
181
case event := <- client .Queue :
176
182
if ok := client .manageEvent (event ); ! ok {
0 commit comments