@@ -27,9 +27,9 @@ import (
27
27
type eventsBrokerSubscribe struct {
28
28
UUID string
29
29
User * sdk.User
30
- Queue chan sdk. Event
31
- Mutex * sync. Mutex
32
- IsAlive * abool. AtomicBool
30
+ isAlive * abool. AtomicBool
31
+ w http. ResponseWriter
32
+ mutex sync. Mutex
33
33
}
34
34
35
35
// lastUpdateBroker keeps connected client of the current route,
@@ -93,7 +93,7 @@ func (b *eventsBroker) Start(ctx context.Context) {
93
93
b .chanAddClient = make (chan (* eventsBrokerSubscribe ))
94
94
b .chanRemoveClient = make (chan (string ))
95
95
96
- tickerMetrics := time .NewTicker (30 * time .Second )
96
+ tickerMetrics := time .NewTicker (10 * time .Second )
97
97
defer tickerMetrics .Stop ()
98
98
99
99
for {
@@ -103,40 +103,50 @@ func (b *eventsBroker) Start(ctx context.Context) {
103
103
104
104
case <- ctx .Done ():
105
105
if b .clients != nil {
106
- for c , v := range b .clients {
107
- close (v .Queue )
108
- delete (b .clients , c )
106
+ for uuid := range b .clients {
107
+ delete (b .clients , uuid )
109
108
}
109
+ observability .Record (b .router .Background , b .router .Stats .SSEClients , 0 )
110
110
111
111
}
112
112
if ctx .Err () != nil {
113
113
log .Error ("eventsBroker.Start> Exiting: %v" , ctx .Err ())
114
114
return
115
115
}
116
+
116
117
case receivedEvent := <- b .messages :
117
118
for i := range b .clients {
118
- go func (c * eventsBrokerSubscribe ) {
119
- c .Mutex .Lock ()
120
- defer c .Mutex .Unlock ()
121
- if c .IsAlive .IsSet () {
122
- log .Debug ("send data to %s" , c .UUID )
123
- c .Queue <- receivedEvent
124
- }
125
- }(b .clients [i ])
119
+ c := b .clients [i ]
120
+ if c == nil {
121
+ delete (b .clients , i )
122
+ continue
123
+ }
124
+
125
+ // Send the event to the client sse within a goroutine
126
+ s := "sse-" + b .clients [i ].UUID
127
+ sdk .GoRoutine (ctx , s ,
128
+ func (ctx context.Context ) {
129
+ if c .isAlive .IsSet () {
130
+ log .Debug ("send data to %s" , c .UUID )
131
+ if err := c .Send (receivedEvent ); err != nil {
132
+ log .Error ("eventsBroker> unable to send event to %s: %v" , c .UUID , err )
133
+ b .chanRemoveClient <- c .UUID
134
+ }
135
+ }
136
+ },
137
+ )
126
138
}
139
+
127
140
case client := <- b .chanAddClient :
128
141
b .clients [client .UUID ] = client
142
+
129
143
case uuid := <- b .chanRemoveClient :
130
144
client , has := b .clients [uuid ]
131
145
if ! has {
132
- return
146
+ continue
133
147
}
134
- go func (c * eventsBrokerSubscribe ) {
135
- c .Mutex .Lock ()
136
- close (c .Queue )
137
- c .IsAlive .UnSet ()
138
- c .Mutex .Unlock ()
139
- }(client )
148
+
149
+ client .isAlive .UnSet ()
140
150
delete (b .clients , uuid )
141
151
}
142
152
}
@@ -155,9 +165,8 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
155
165
client := & eventsBrokerSubscribe {
156
166
UUID : uuid ,
157
167
User : getUser (ctx ),
158
- Queue : make (chan sdk.Event , 10 ), // chan buffered, to avoid goroutine Start() wait on push in queue
159
- Mutex : new (sync.Mutex ),
160
- IsAlive : abool .NewBool (true ),
168
+ isAlive : abool .NewBool (true ),
169
+ w : w ,
161
170
}
162
171
163
172
// Add this client to the map of those that should receive updates
@@ -188,29 +197,6 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
188
197
log .Info ("events.Http: client disconnected" )
189
198
b .chanRemoveClient <- client .UUID
190
199
break leave
191
- case event := <- client .Queue :
192
- if ok := client .manageEvent (event ); ! ok {
193
- continue
194
- }
195
-
196
- msg , errJ := json .Marshal (event )
197
- if errJ != nil {
198
- log .Warning ("sendevent> Unavble to marshall event: %v" , errJ )
199
- continue
200
- }
201
-
202
- var buffer bytes.Buffer
203
- buffer .WriteString ("data: " )
204
- buffer .Write (msg )
205
- buffer .WriteString ("\n \n " )
206
-
207
- if ! client .IsAlive .IsSet () {
208
- break leave
209
- }
210
- if _ , err := w .Write (buffer .Bytes ()); err != nil {
211
- return sdk .WrapError (err , "events.write> Unable to write to client" )
212
- }
213
- f .Flush ()
214
200
case <- tick .C :
215
201
if _ , err := w .Write ([]byte ("" )); err != nil {
216
202
return sdk .WrapError (err , "events.write> Unable to ping client" )
@@ -223,50 +209,97 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
223
209
}
224
210
}
225
211
226
- func (s * eventsBrokerSubscribe ) manageEvent (event sdk.Event ) bool {
212
+ func (client * eventsBrokerSubscribe ) manageEvent (event sdk.Event ) bool {
227
213
var isSharedInfra bool
228
- for _ , g := range s .User .Groups {
214
+ for _ , g := range client .User .Groups {
229
215
if g .ID == group .SharedInfraGroup .ID {
230
216
isSharedInfra = true
231
217
break
232
218
}
233
219
}
234
220
235
221
if strings .HasPrefix (event .EventType , "sdk.EventProject" ) {
236
- if s .User .Admin || isSharedInfra || permission .ProjectPermission (event .ProjectKey , s .User ) >= permission .PermissionRead {
222
+ if client .User .Admin || isSharedInfra || permission .ProjectPermission (event .ProjectKey , client .User ) >= permission .PermissionRead {
237
223
return true
238
224
}
239
225
return false
240
226
}
241
227
if strings .HasPrefix (event .EventType , "sdk.EventWorkflow" ) || strings .HasPrefix (event .EventType , "sdk.EventRunWorkflow" ) {
242
- if s .User .Admin || isSharedInfra || permission .WorkflowPermission (event .ProjectKey , event .WorkflowName , s .User ) >= permission .PermissionRead {
228
+ if client .User .Admin || isSharedInfra || permission .WorkflowPermission (event .ProjectKey , event .WorkflowName , client .User ) >= permission .PermissionRead {
243
229
return true
244
230
}
245
231
return false
246
232
}
247
233
if strings .HasPrefix (event .EventType , "sdk.EventApplication" ) {
248
- if s .User .Admin || isSharedInfra || permission .ApplicationPermission (event .ProjectKey , event .ApplicationName , s .User ) >= permission .PermissionRead {
234
+ if client .User .Admin || isSharedInfra || permission .ApplicationPermission (event .ProjectKey , event .ApplicationName , client .User ) >= permission .PermissionRead {
249
235
return true
250
236
}
251
237
return false
252
238
}
253
239
if strings .HasPrefix (event .EventType , "sdk.EventPipeline" ) {
254
- if s .User .Admin || isSharedInfra || permission .PipelinePermission (event .ProjectKey , event .PipelineName , s .User ) >= permission .PermissionRead {
240
+ if client .User .Admin || isSharedInfra || permission .PipelinePermission (event .ProjectKey , event .PipelineName , client .User ) >= permission .PermissionRead {
255
241
return true
256
242
}
257
243
return false
258
244
}
259
245
if strings .HasPrefix (event .EventType , "sdk.EventEnvironment" ) {
260
- if s .User .Admin || isSharedInfra || permission .EnvironmentPermission (event .ProjectKey , event .EnvironmentName , s .User ) >= permission .PermissionRead {
246
+ if client .User .Admin || isSharedInfra || permission .EnvironmentPermission (event .ProjectKey , event .EnvironmentName , client .User ) >= permission .PermissionRead {
261
247
return true
262
248
}
263
249
return false
264
250
}
265
251
if strings .HasPrefix (event .EventType , "sdk.EventBroadcast" ) {
266
- if s .User .Admin || isSharedInfra || event .ProjectKey == "" || permission .AccessToProject (event .ProjectKey , s .User , permission .PermissionRead ) {
252
+ if client .User .Admin || isSharedInfra || event .ProjectKey == "" || permission .AccessToProject (event .ProjectKey , client .User , permission .PermissionRead ) {
267
253
return true
268
254
}
269
255
return false
270
256
}
271
257
return false
272
258
}
259
+
260
+ // Send an event to a client
261
+ func (client * eventsBrokerSubscribe ) Send (event sdk.Event ) (err error ) {
262
+ client .mutex .Lock ()
263
+ defer client .mutex .Unlock ()
264
+
265
+ if client == nil || client .w == nil {
266
+ return nil
267
+ }
268
+
269
+ // Make sure that the writer supports flushing.
270
+ f , ok := client .w .(http.Flusher )
271
+ if ! ok {
272
+ return sdk .WrapError (fmt .Errorf ("streaming unsupported" ), "" )
273
+ }
274
+
275
+ if ok := client .manageEvent (event ); ! ok {
276
+ return nil
277
+ }
278
+
279
+ msg , err := json .Marshal (event )
280
+ if err != nil {
281
+ return sdk .WrapError (err , "Unable to marshall event" )
282
+ }
283
+
284
+ var buffer bytes.Buffer
285
+ buffer .WriteString ("data: " )
286
+ buffer .Write (msg )
287
+ buffer .WriteString ("\n \n " )
288
+
289
+ if ! client .isAlive .IsSet () {
290
+ return nil
291
+ }
292
+
293
+ defer func () {
294
+ if r := recover (); r != nil {
295
+ err = fmt .Errorf ("%v" , r )
296
+ }
297
+ }()
298
+
299
+ if _ , err := client .w .Write (buffer .Bytes ()); err != nil {
300
+ return sdk .WrapError (err , "unable to write to client" )
301
+ }
302
+ f .Flush ()
303
+
304
+ return nil
305
+ }
0 commit comments