8000 make go routines shutdown gracefully by geyslan · Pull Request #2784 · aquasecurity/tracee · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

make go routines shutdown gracefully #2784

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/tracee-rules/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,13 +159,13 @@ func main() {
return err
}

if httpServer != nil {
go httpServer.Start()
}

ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

if httpServer != nil {
go httpServer.Start(ctx)
}

e.Start(ctx)

return nil
Expand Down
2 changes: 1 addition & 1 deletion pkg/cmd/tracee.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func (r Runner) Run(ctx context.Context) error {
}
}

go r.Server.Start()
go r.Server.Start(ctx)
}

// Print statistics at the end
Expand Down
9 changes: 8 additions & 1 deletion pkg/ebpf/bpf_log.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ebpf

import (
"context"
"encoding/binary"
"fmt"
"strings"
Expand Down Expand Up @@ -158,7 +159,10 @@ func (b *BPFLog) Decode(rawBuffer []byte) error {
return nil
}

func (t *Tracee) processBPFLogs() {
func (t *Tracee) processBPFLogs(ctx context.Context) {
logger.Debugw("Starting processBPFLogs go routine")
defer logger.Debugw("Stopped processBPFLogs go routine")

for {
select {
case rawData := <-t.bpfLogsChannel:
Expand Down Expand Up @@ -193,6 +197,9 @@ func (t *Tracee) processBPFLogs() {
t.stats.LostBPFLogsCount.Increment(lost)
logger.Warnw(fmt.Sprintf("Lost %d ebpf logs events", lost))
}

case <-ctx.Done():
return
}
}
}
24 changes: 16 additions & 8 deletions pkg/ebpf/events_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package ebpf

import (
"bytes"
"context"
"fmt"
"path/filepath"
"strconv"
Expand All @@ -26,15 +27,22 @@ func init() {
initKernelReadFileTypes()
}

func (t *Tracee) processLostEvents() {
func (t *Tracee) processLostEvents(ctx context.Context) {
logger.Debugw("Starting processLostEvents go routine")
defer logger.Debugw("Stopped processLostEvents go routine")

for {
lost := <-t.lostEvChannel
// When terminating tracee-ebpf the lost channel receives multiple "0 lost events" events.
// This check prevents those 0 lost events messages to be written to stderr until the bug is fixed:
// https://github.com/aquasecurity/libbpfgo/issues/122
if lost > 0 {
t.stats.LostEvCount.Increment(lost)
logger.Warnw(fmt.Sprintf("Lost %d events", lost))
select {
case lost := <-t.lostEvChannel:
// When terminating tracee-ebpf the lost channel receives multiple "0 lost events" events.
// This check prevents those 0 lost events messages to be written to stderr until the bug is fixed:
// https://github.com/aquasecurity/libbpfgo/issues/122
if lost > 0 {
_ = t.stats.LostEvCount.Increment(lost)
logger.Warnw(fmt.Sprintf("Lost %d events", lost))
}
case <-ctx.Done():
return
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions pkg/ebpf/net_capture.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package ebpf

import (
gocontext "context"
"context"
"encoding/binary"
"fmt"

Expand Down Expand Up @@ -29,7 +29,10 @@ const (
familyIpv6
)

func (t *Tracee) processNetCaptureEvents(ctx gocontext.Context) {
func (t *Tracee) processNetCaptureEvents(ctx context.Context) {
logger.Debugw("Starting processNetCaptureEvents go routine")
defer logger.Debugw("Stopped processNetCaptureEvents go routine")

var errChanList []<-chan error

// source pipeline stage (re-used from regular pipeline)
Expand All @@ -44,7 +47,7 @@ func (t *Tracee) processNetCaptureEvents(ctx gocontext.Context) {
t.WaitForPipeline(errChanList...)
}

func (t *Tracee) processNetCapEvents(ctx gocontext.Context, in <-chan *trace.Event) <-chan error {
func (t *Tracee) processNetCapEvents(ctx context.Context, in <-chan *trace.Event) <-chan error {
errc := make(chan error, 1)

go func() {
Expand Down
6 changes: 3 additions & 3 deletions pkg/ebpf/tracee.go
Original file line number Diff line number Diff line change
Expand Up @@ -1359,18 +1359,18 @@ func (t *Tracee) Run(ctx gocontext.Context) error {
logger.Warnw("Memory dump", "error", err)
}
t.eventsPerfMap.Start()
go t.processLostEvents()
go t.processLostEvents(ctx)
go t.handleEvents(ctx)
if t.config.BlobPerfBufferSize > 0 {
t.fileWrPerfMap.Start()
go t.processFileWrites()
go t.processFileWrites(ctx)
}
if pcaps.PcapsEnabled(t.config.Capture.Net) {
t.netCapPerfMap.Start()
go t.processNetCaptureEvents(ctx)
}
t.bpfLogsPerfMap.Start()
go t.processBPFLogs()
go t.processBPFLogs(ctx)

// write pid file
err = t.writePid()
Expand Down
8 changes: 7 additions & 1 deletion pkg/ebpf/write_capture.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package ebpf

import (
"context"
"fmt"
"io"
"os"
Expand All @@ -12,7 +13,9 @@ import (
"github.com/aquasecurity/tracee/pkg/utils"
)

func (t *Tracee) processFileWrites() {
func (t *Tracee) processFileWrites(ctx context.Context) {
logger.Debugw("Starting processFileWrites go routine")
defer logger.Debugw("Stopped processFileWrites go routine")

const (
//S_IFMT uint32 = 0170000 // bit mask for the file type bit field
Expand Down Expand Up @@ -159,6 +162,9 @@ func (t *Tracee) processFileWrites() {
t.stats.LostWrCount.Increment(lost)
logger.Warnw(fmt.Sprintf("Lost %d write events", lost))
}

case <-ctx.Done():
return
}
}
}
45 changes: 36 additions & 9 deletions pkg/server/server.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package server

import (
"context"
"fmt"
"net/http"
"net/http/pprof"
Expand All @@ -12,16 +13,21 @@ import (

// Server represents a http server
type Server struct {
mux *http.ServeMux
listenAddr string
hs *http.Server
mux *http.ServeMux // just an exposed copy of hs.Handler
metricsEnabled bool
}

// New creates a new server
func New(listenAddr string) *Server {
mux := http.NewServeMux()

return &Server{
mux: http.NewServeMux(),
listenAddr: listenAddr,
hs: &http.Server{
Addr: listenAddr,
Handler: mux,
},
mux: mux,
}
}

Expand All @@ -38,15 +44,35 @@ func (s *Server) EnableHealthzEndpoint() {
})
}

// Start starts the http server on the listen addr
func (s *Server) Start() {
logger.Debugw("Serving metrics endpoint", "address", s.listenAddr)
// Start starts the http server on the listen address
func (s *Server) Start(ctx context.Context) {
srvCtx, srvCancel := context.WithCancel(ctx)
defer srvCancel()

go func() {
logger.Debugw("Starting serving metrics endpoint go routine", "address", s.hs.Addr)
defer logger.Debugw("Stopped serving metrics endpoint")

if err := s.hs.ListenAndServe(); err != http.ErrServerClosed {
logger.Errorw("Serving metrics endpoint", "error", err)
}

srvCancel()
}()

select {
case <-ctx.Done():
logger.Debugw("Context cancelled, shutting down metrics endpoint server")
if err := s.hs.Shutdown(ctx); err != nil {
logger.Errorw("Stopping serving metrics endpoint", "error", err)
}

if err := http.ListenAndServe(s.listenAddr, s.mux); err != http.ErrServerClosed {
logger.Errorw("Serving metrics endpoint", "error", err)
// if server error occurred while base ctx is not done, we should exit via this case
case <-srvCtx.Done():
}
}

// EnablePProfEndpoint enables pprof endpoint for debugging
func (s *Server) EnablePProfEndpoint() {
s.mux.HandleFunc("/debug/pprof/", pprof.Index)
s.mux.Handle("/debug/pprof/allocs", pprof.Handler("allocs"))
Expand All @@ -59,6 +85,7 @@ func (s *Server) EnablePProfEndpoint() {
s.mux.HandleFunc("/debug/pprof/trace", pprof.Trace)
}

// MetricsEndpointEnabled returns true if metrics endpoint is enabled
func (s *Server) MetricsEndpointEnabled() bool {
return s.metricsEnabled
}
15 changes: 12 additions & 3 deletions tests/integration/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -397,12 +397,21 @@ func Test_EventFilters(t *testing.T) {
eventOutput := &eventOutput{}

go func() {
for evt := range eventChan {
eventOutput.addEvent(evt)
for {
select {
case evt, ok := <-eventChan:
if !ok {
return
}
eventOutput.addEvent(evt)

case <-ctx.Done():
return
}
}
}()

trc := startTracee(t, config, nil, nil, ctx)
trc := startTracee(t, ctx, config, nil, nil)

waitforTraceeStart(t, trc, time.Now())

Expand Down
14 changes: 11 additions & 3 deletions tests/integration/tracee.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

// load tracee into memory with args
func startTracee(t *testing.T, config tracee.Config, output *tracee.OutputConfig, capture *tracee.CaptureConfig, ctx context.Context) *tracee.Tracee {
func startTracee(t *testing.T, ctx context.Context, config tracee.Config, output *tracee.OutputConfig, capture *tracee.CaptureConfig) *tracee.Tracee {
initialize.SetLibbpfgoCallbacks()

kernelConfig, err := initialize.KernelConfig()
Expand All @@ -43,8 +43,16 @@ func startTracee(t *testing.T, config tracee.Config, output *tracee.OutputConfig
errChan := make(chan error)

go func() {
for err := range errChan {
t.Logf("received error while testing: %s\n", err)
for {
select {
case err, ok := <-errChan:
if !ok {
return
}
t.Logf("received error while testing: %s\n", err)
case <-ctx.Done():
return
}
}
}()

Expand Down
0