8000 feat(api): add SEE metrics (#3432) · ovh/cds@dee449b · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Commit dee449b

Browse files
fsaminrichardlt
authored andcommitted
feat(api): add SEE metrics (#3432)
1 parent 4d80334 commit dee449b

File tree

3 files changed

+34
-9
lines changed

3 files changed

+34
-9
lines changed

engine/api/api_routes.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ func (api *API) InitRouter() {
1717
api.Router.PostMiddlewares = append(api.Router.PostMiddlewares, api.deletePermissionMiddleware, TracingPostMiddleware)
1818

1919
api.eventsBroker = &eventsBroker{
20+
router: api.Router,
2021
cache: api.Cache,
2122
clients: make(map[string]eventsBrokerSubscribe),
2223
dbFunc: api.DBConnectionFactory.GetDBMap,

engine/api/events.go

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import (
1414

1515
"github.com/ovh/cds/engine/api/cache"
1616
"github.com/ovh/cds/engine/api/group"
17+
"github.com/ovh/cds/engine/api/observability"
1718
"github.com/ovh/cds/engine/api/permission"
1819
"github.com/ovh/cds/engine/service"
1920
"github.com/ovh/cds/sdk"
@@ -37,14 +38,16 @@ type eventsBroker struct {
3738
dbFunc func() *gorp.DbMap
3839
cache cache.Store
3940
clientsLen int64
41+
router *Router
4042
}
4143

4244
// AddClient add a client to the client map
43-
func (b *eventsBroker) addClient(client eventsBrokerSubscribe) {
45+
func (b *eventsBroker) addClient(ctx context.Context, client eventsBrokerSubscribe) {
4446
b.mutex.Lock()
4547
defer b.mutex.Unlock()
4648
b.clients[client.UUID] = client
4749
b.clientsLen++
50+
observability.Record(ctx, b.router.Stats.SSEClients, 1)
4851
}
4952

5053
// CleanAll cleans all clients
@@ -55,22 +58,24 @@ func (b *eventsBroker) cleanAll() {
5558
for c, v := range b.clients {
5659
close(v.Queue)
5760
delete(b.clients, c)
61+
observability.Record(b.router.Background, b.router.Stats.SSEClients, -1)
5862
}
5963
}
6064
b.clientsLen = 0
6165
}
6266

63-
func (b *eventsBroker) disconnectClient(uuid string) {
67+
func (b *eventsBroker) disconnectClient(ctx context.Context, uuid string) {
6468
b.disconnectedMutex.Lock()
6569
defer b.disconnectedMutex.Unlock()
6670
b.disconnected[uuid] = true
71+
observability.Record(ctx, b.router.Stats.SSEClients, -1)
6772
}
6873

6974
//Init the eventsBroker
7075
func (b *eventsBroker) Init(c context.Context) {
7176
// Start cache Subscription
7277
subscribeFunc := func() {
73-
cacheSubscribe(c, b.messages, b.cache)
78+
b.cacheSubscribe(c, b.messages, b.cache)
7479
}
7580
sdk.GoRoutine("eventsBroker.Init.CacheSubscribe", subscribeFunc)
7681

@@ -80,7 +85,7 @@ func (b *eventsBroker) Init(c context.Context) {
8085
sdk.GoRoutine("eventsBroker.Init.Start", startFunc)
8186
}
8287

83-
func cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cache.Store) {
88+
func (b *eventsBroker) cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cache.Store) {
8489
pubSub := store.Subscribe("events_pubsub")
8590
tick := time.NewTicker(50 * time.Millisecond)
8691
defer tick.Stop()
@@ -107,6 +112,7 @@ func cacheSubscribe(c context.Context, cacheMsgChan chan<- sdk.Event, store cach
107112
case "sdk.EventPipelineBuild", "sdk.EventJob":
108113
continue
109114
}
115+
observability.Record(c, b.router.Stats.SSEEvents, 1)
110116
cacheMsgChan <- e
111117
}
112118
}
@@ -145,7 +151,7 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
145151
}
146152

147153
// Add this client to the map of those that should receive updates
148-
b.addClient(client)
154+
b.addClient(ctx, client)
149155

150156
// Set the headers related to event streaming.
151157
w.Header().Set("Content-Type", "text/event-stream")
@@ -166,11 +172,11 @@ func (b *eventsBroker) ServeHTTP() service.Handler {
166172
select {
167173
case <-ctx.Done():
168174
log.Info("events.Http: context done")
169-
b.disconnectClient(client.UUID)
175+
b.disconnectClient(ctx, client.UUID)
170176
break leave
171177
case <-r.Context().Done():
172178
log.Info("events.Http: client disconnected")
173-
b.disconnectClient(client.UUID)
179+
b.disconnectClient(ctx, client.UUID)
174180
break leave
175181
case event := <-client.Queue:
176182
if ok := client.manageEvent(event); !ok {

engine/api/router.go

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -47,8 +47,10 @@ type Router struct {
4747
nbPanic int
4848
lastPanic *time.Time
4949
Stats struct {
50-
Errors *stats.Int64Measure
51-
Hits *stats.Int64Measure
50+
Errors *stats.Int64Measure
51+
Hits *stats.Int64Measure
52+
SSEClients *stats.Int64Measure
53+
SSEEvents *stats.Int64Measure
5254
}
5355
}
5456

@@ -516,6 +518,10 @@ func (r *Router) InitStats(service, name string) error {
516518
r.Stats.Errors = stats.Int64(label, "number of errors", stats.UnitDimensionless)
517519
label = fmt.Sprintf("cds/%s/%s/router_hits", service, name)
518520
r.Stats.Hits = stats.Int64(label, "number of hits", stats.UnitDimensionless)
521+
label = fmt.Sprintf("cds/%s/%s/sse_clients", service, name)
522+
r.Stats.SSEClients = stats.Int64(label, "number of sse clients", stats.UnitDimensionless)
523+
label = fmt.Sprintf("cds/%s/%s/sse_events", service, name)
524+
r.Stats.SSEEvents = stats.Int64(label, "number of sse events", stats.UnitDimensionless)
519525

520526
log.Info("api> Stats initialized")
521527

@@ -532,5 +538,17 @@ func (r *Router) InitStats(service, name string) error {
532538
Measure: r.Stats.Hits,
533539
Aggregation: view.Count(),
534540
},
541+
&view.View{
542+
Name: "sse_clients",
543+
Description: r.Stats.SSEClients.Description(),
544+
Measure: r.Stats.SSEClients,
545+
Aggregation: view.Count(),
546+
},
547+
&view.View{
548+
Name: "sse_events",
549+
Description: r.Stats.SSEEvents.Description(),
550+
Measure: r.Stats.SSEEvents,
551+
Aggregation: view.Count(),
552+
},
535553
)
536554
}

0 commit comments

Comments
 (0)
0