8000 fix(api): write on sse (#3457) · ovh/cds@e66d70c · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit e66d70c

Browse files
fsaminyesnault
authored andcommitted
fix(api): write on sse (#3457)
1 parent c9398dd commit e66d70c

File tree

1 file changed

+89
-56
lines changed

1 file changed

+89
-56
lines changed

engine/api/events.go

Lines changed: 89 additions & 56 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@ import (
2727
type eventsBrokerSubscribe struct {
2828
UUID string
2929
User *sdk.User
30-
Queue chan sdk.Event
31-
Mutex *sync.Mutex
32-
IsAlive *abool.AtomicBool
30+
isAlive *abool.AtomicBool
31+
w http.ResponseWriter
32+
mutex sync.Mutex
3333
}
3434

3535
// lastUpdateBroker keeps connected client of the current route,
@@ -93,7 +93,7 @@ func (b *eventsBroker) Start(ctx context.Context) {
9393
b.chanAddClient = make(chan (*eventsBrokerSubscribe))
9494
b.chanRemoveClient = make(chan (string))
9595

96-
tickerMetrics := time.NewTicker(30 * time.Second)
96+
tickerMetrics := time.NewTicker(10 * time.Second)
9797
defer tickerMetrics.Stop()
9898

9999
for {
@@ -103,40 +103,50 @@ func (b *eventsBroker) Start(ctx context.Context) {
103103

104104
case <-ctx.Done():
105105
if b.clients != nil {
106-
for c, v := range b.clients {
107-
close(v.Queue)
108-
delete(b.clients, c)
106+
for uuid := range b.clients {
107+
delete(b.clients, uuid)
109108
}
109+
observability.Record(b.router.Background, b.router.Stats.SSEClients, 0)
110110

111111
}
112112
if ctx.Err() != nil {
113113
log.Error("eventsBroker.Start> Exiting: %v", ctx.Err())
114114
return
115115
}
116+
116117
case receivedEvent := <-b.messages:
117118
for i := range b.clients {
118-
go func(c *eventsBrokerSubscribe) {
119-
c.Mutex.Lock()
120-
defer c.Mutex.Unlock()
121-
if c.IsAlive.IsSet() {
122-
log.Debug("send data to %s", c.UUID)
123-
c.Queue <- receivedEvent
124-
}
125-
}(b.clients[i])
119+
c := b.clients[i]
120+
if c == nil {
121+
delete(b.clients, i)
122+
continue
123+
}
124+
125+
// Send the event to the client sse within a goroutine
126+
s := "sse-" + b.clients[i].UUID
127+
sdk.GoRoutine(ctx, s,
128+
func(ctx context.Context) {
129+
if c.isAlive.IsSet() {
130+
log.Debug("send data to %s", c.UUID)
131+
if err := c.Send(receivedEvent); err != nil {
132+
log.Error("eventsBroker> unable to send event to %s: %v", c.UUID, err)
133+
b.chanRemoveClient <- c.UUID
134+
}
135+
}
136+
},
137+
)
126138
}
139+
127140
case client := <-b.chanAddClient:
128141
b.clients[client.UUID] = client
142+
129143
case uuid := <-b.chanRemoveClient:
130144
client, has := b.clients[uuid]
131145
if !has {
132-
return
146+
continue
133147
}
134-
go func(c *eventsBrokerSubscribe) {
135-
c.Mutex.Lock()
136-
close(c.Queue)
137-
c.IsAlive.UnSet()
138-
c.Mutex.Unlock()
139-
}(client)
148+
149+
client.isAlive.UnSet()
140150
delete(b.clients, uuid)
141151
}
142152
}
@@ -155,9 +165,8 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
155165
client := &eventsBrokerSubscribe{
156166
UUID: uuid,
157167
User: getUser(ctx),
158-
Queue: make(chan sdk.Event, 10), // chan buffered, to avoid goroutine Start() wait on push in queue
159-
Mutex: new(sync.Mutex),
160-
IsAlive: abool.NewBool(true),
168+
isAlive: abool.NewBool(true),
169+
w: w,
161170
}
162171

163172
// Add this client to the map of those that should receive updates
@@ -188,29 +197,6 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
188197
log.Info("events.Http: client disconnected")
189198
b.chanRemoveClient <- client.UUID
190199
break leave
191-
case event := <-client.Queue:
192-
if ok := client.manageEvent(event); !ok {
193-
continue
194-
}
195-
196-
msg, errJ := json.Marshal(event)
197-
if errJ != nil {
198-
log.Warning("sendevent> Unavble to marshall event: %v", errJ)
199-
continue
200-
}
201-
202-
var buffer bytes.Buffer
203-
buffer.WriteString("data: ")
204-
buffer.Write(msg)
205-
buffer.WriteString("\n\n")
206-
207-
if !client.IsAlive.IsSet() {
208-
break leave
209-
}
210-
if _, err := w.Write(buffer.Bytes()); err != nil {
211-
return sdk.WrapError(err, "events.write> Unable to write to client")
212-
}
213-
f.Flush()
214200
case <-tick.C:
215201
if _, err := w.Write([]byte("")); err != nil {
216202
return sdk.WrapError(err, "events.write> Unable to ping client")
@@ -223,50 +209,97 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
223209
}
224210
}
225211

226-
func (s *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool {
212+
func (client *eventsBrokerSubscribe) manageEvent(event sdk.Event) bool {
227213
var isSharedInfra bool
228-
for _, g := range s.User.Groups {
214+
for _, g := range client.User.Groups {
229215
if g.ID == group.SharedInfraGroup.ID {
230216
isSharedInfra = true
231217
break
232218
}
233219
}
234220

235221
if strings.HasPrefix(event.EventType, "sdk.EventProject") {
236-
if s.User.Admin || isSharedInfra || permission.ProjectPermission(event.ProjectKey, s.User) >= permission.PermissionRead {
222+
if client.User.Admin || isSharedInfra || permission.ProjectPermission(event.ProjectKey, client.User) >= permission.PermissionRead {
237223
return true
238224
}
239225
return false
240226
}
241227
if strings.HasPrefix(event.EventType, "sdk.EventWorkflow") || strings.HasPrefix(event.EventType, "sdk.EventRunWorkflow") {
242-
if s.User.Admin || isSharedInfra || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, s.User) >= permission.PermissionRead {
228+
if client.User.Admin || isSharedInfra || permission.WorkflowPermission(event.ProjectKey, event.WorkflowName, client.User) >= permission.PermissionRead {
243229
return true
244230
}
245231
return false
246232
}
247233
if strings.HasPrefix(event.EventType, "sdk.EventApplication") {
248-
if s.User.Admin || isSharedInfra || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, s.User) >= permission.PermissionRead {
234+
if client.User.Admin || isSharedInfra || permission.ApplicationPermission(event.ProjectKey, event.ApplicationName, client.User) >= permission.PermissionRead {
249235
return true
250236
}
251237
return false
252238
}
253239
if strings.HasPrefix(event.EventType, "sdk.EventPipeline") {
254-
if s.User.Admin || isSharedInfra || permission.PipelinePermission(event.ProjectKey, event.PipelineName, s.User) >= permission.PermissionRead {
240+
if client.User.Admin || isSharedInfra || permission.PipelinePermission(event.ProjectKey, event.PipelineName, client.User) >= permission.PermissionRead {
255241
return true
256242
}
257243
return false
258244
}
259245
if strings.HasPrefix(event.EventType, "sdk.EventEnvironment") {
260-
if s.User.Admin || isSharedInfra || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, s.User) >= permission.PermissionRead {
246+
if client.User.Admin || isSharedInfra || permission.EnvironmentPermission(event.ProjectKey, event.EnvironmentName, client.User) >= permission.PermissionRead {
261247
return true
262248
}
263249
return false
264250
}
265251
if strings.HasPrefix(event.EventType, "sdk.EventBroadcast") {
266-
if s.User.Admin || isSharedInfra || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, s.User, permission.PermissionRead) {
252+
if client.User.Admin || isSharedInfra || event.ProjectKey == "" || permission.AccessToProject(event.ProjectKey, client.User, permission.PermissionRead) {
267253
return true
268254
}
269255
return false
270256
}
271257
return false
272258
}
259+
260+
// Send an event to a client
261+
func (client *eventsBrokerSubscribe) Send(event sdk.Event) (err error) {
262+
client.mutex.Lock()
263+
defer client.mutex.Unlock()
264+
265+
if client == nil || client.w == nil {
266+
return nil
267+
}
268+
269+
// Make sure that the writer supports flushing.
270+
f, ok := client.w.(http.Flusher)
271+
if !ok {
272+
return sdk.WrapError(fmt.Errorf("streaming unsupported"), "")
273+
}
274+
275+
if ok := client.manageEvent(event); !ok {
276+
return nil
277+
}
278+
279+
msg, err := json.Marshal(event)
280+
if err != nil {
281+
return sdk.WrapError(err, "Unable to marshall event")
282+
}
283+
284+
var buffer bytes.Buffer
285+
buffer.WriteString("data: ")
286+
buffer.Write(msg)
287+
buffer.WriteString("\n\n")
288+
289+
if !client.isAlive.IsSet() {
290+
return nil
291+
}
292+
293+
defer func() {
294+
if r := recover(); r != nil {
295+
err = fmt.Errorf("%v", r)
296+
}
297+
}()
298+
299+
if _, err := client.w.Write(buffer.Bytes()); err != nil {
300+
return sdk.WrapError(err, "unable to write to client")
301+
}
302+
f.Flush()
303+
304+
return nil
305+
}

0 commit comments

Comments
 (0)
0