8000 cilium-cli: Fix logger busy loop by jrajahalme · Pull Request #38199 · cilium/cilium · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

cilium-cli: Fix logger busy loop #38199

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Mar 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion cilium-cli/cli/connectivity.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func RunE(hooks api.Hooks) func(cmd *cobra.Command, args []string) error {
}
}

logger := check.NewConcurrentLogger(params.Writer, params.TestConcurrency)
logger := check.NewConcurrentLogger(params.Writer)
connTests, err := newConnectivityTests(params, hooks, logger, owners)
if err != nil {
return err
Expand Down
2 changes: 1 addition & 1 deletion cilium-cli/cli/connectivity_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestNewConnectivityTests(t *testing.T) {
}

// function to test
actual, err := newConnectivityTests(tt.params, &api.NopHooks{}, check.NewConcurrentLogger(&bytes.Buffer{}, 1), owners)
actual, err := newConnectivityTests(tt.params, &api.NopHooks{}, check.NewConcurrentLogger(&bytes.Buffer{}), owners)

require.NoError(t, err)
require.Len(t, actual, tt.expectedCount)
Expand Down
212 changes: 90 additions & 122 deletions cilium-cli/connectivity/check/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,92 +7,122 @@ import (
"bytes"
"fmt"
"io"
"sync/atomic"
"time"

"github.com/cilium/cilium/pkg/lock"
)

// NewConcurrentLogger factory function that returns ConcurrentLogger.
func NewConcurrentLogger(writer io.Writer, concurrency int) *ConcurrentLogger {
func NewConcurrentLogger(writer io.Writer) *ConcurrentLogger {
return &ConcurrentLogger{
messageCh: make(chan message),
writer: writer,
// The concurrency parameter is used for nsTestsCh buffer size calculation.
// The buffer will be able to accept 10 times more unique connectivity tests
// than concurrency value. Write to the channel implemented in a separate
// goroutine to avoid deadlock in case if buffer is full.
nsTestsCh: make(chan string, concurrency*10),
nsTestMsgs: make(map[string][]message),
nsTestMsgsLock: lock.Mutex{},
collectorStarted: atomic.Bool{},
printerDoneCh: make(chan bool),
writer: writer,
messages: make(chan message),
done: make(chan struct{}),
}
}

type ConcurrentLogger struct {
messageCh chan message
writer io.Writer
nsTestsCh chan string
nsTestMsgs map[string][]message
nsTestMsgsLock lock.Mutex
collectorStarted atomic.Bool
printerDoneCh chan bool
nsTestFinishCount int
writer io.Writer
messages chan message
done chan struct{}
}

// Start starts ConcurrentLogger internals in separate goroutines:
// - collector: collects incoming test messages.
// - printer: sends messages to the writer in corresponding order.
// Start starts ConcurrentLogger
func (c *ConcurrentLogger) Start() {
c.collectorStarted.Store(true)
go c.collector()
go c.printer()
go func() {
// current is the test that is currently being streamed to the writer without
// buffering
var current *Test

// buffered is a map of tests (other than current one) that have not finished yet
buffered := make(map[*Test]*bytes.Buffer)

// finished is an ordered list of tests to be logged once the current test finishes
var finished []*bytes.Buffer

for m := range c.messages {
// make this the current test if none
if current == nil {
current = m.test
}

// stream the current test without buffering
if m.test == current {
mustWrite(c.writer, m.data)
if m.finish {
current = nil
}
} else {
// buffer other tests
buf, ok := buffered[m.test]
if !ok {
buf = &bytes.Buffer{}
buffered[m.test] = buf
}
mustWrite(buf, m.data)
if m.finish {
delete(buffered, m.test)
finished = append(finished, buf)
}
}

if current == nil {
// log any finished tests after done with the current test
for _, buf := range finished {
mustWrite(c.writer, buf.Bytes())
}
finished = finished[len(finished):]

// pick one of the running tests as the current one, if any
for test, buf := range buffered {
delete(buffered, test)
mustWrite(c.writer, buf.Bytes())
current = test
break
}
}
}
// No more messages, log all remaining messages
for _, buf := range finished {
mustWrite(c.writer, buf.Bytes())
}
for _, buf := range buffered {
mustWrite(c.writer, buf.Bytes())
}
close(c.done)
}()
}

// Stop closes incoming message channel and waits while all messages are printed.
func (c *ConcurrentLogger) Stop() {
close(c.messageCh)
<-c.printerDoneCh
close(c.printerDoneCh)
close(c.messages)
<-c.done
}

type message struct {
namespace string
testName string
data string
finish bool
}

func (m message) nsTest() string {
return fmt.Sprintf("%s:%s", m.namespace, m.testName)
test *Test
data []byte
finish bool
}

// Print schedules message for the test to be printed.
func (c *ConcurrentLogger) Print(test *Test, msg string) {
buf := &bytes.Buffer{}
func (c *ConcurrentLogger) Print(test *Test, msg []byte) {
if test.ctx.timestamp() {
mustFprint(buf, timestamp())
msg = append(timestampBytes(), msg...)
}
mustFprint(buf, msg)
c.messageCh <- message{
namespace: test.ctx.params.TestNamespace,
testName: test.name,
data: buf.String(),
c.messages <- message{
test: test,
data: msg,
}
}

// Printf schedules message for the test to be printed.
func (c *ConcurrentLogger) Printf(test *Test, format string, args ...interface{}) {
buf := &bytes.Buffer{}
if test.ctx.timestamp() {
mustFprint(buf, timestamp())
mustWrite(buf, timestampBytes())
}
mustFprintf(buf, format, args...)
c.messageCh <- message{
namespace: test.ctx.params.TestNamespace,
testName: test.name,
data: buf.String(),
c.messages <- message{
test: test,
data: buf.Bytes(),
}
}

Expand All @@ -105,77 +135,15 @@ func (c *ConcurrentLogger) FinishTest(test *Test) {
panic(fmt.Errorf("failed to read from test log buffer: %w", err))
}
}
c.messageCh <- message{
namespace: test.Context().Params().TestNamespace,
testName: test.Name(),
data: buf.String(),
finish: true,
}
}

func (c *ConcurrentLogger) collector() {
defer c.collectorStarted.Store(false)
for m := range c.messageCh {
nsTest := m.nsTest()
c.nsTestMsgsLock.Lock()
nsTestMsgs, ok := c.nsTestMsgs[nsTest]
if !ok {
nsTestMsgs = make([]message, 0)
// use a separate goroutine to avoid deadlock if the channel
// buffer is full, printer goroutine will pull it eventually
go func() { c.nsTestsCh <- nsTest }()
}
c.nsTestMsgs[nsTest] = append(nsTestMsgs, m)
c.nsTestMsgsLock.Unlock()
}
}

func (c *ConcurrentLogger) printer() {
// read messages while the collector is working
for c.collectorStarted.Load() {
// double-check if there are new messages to avoid
// deadlock reading from the `nsTestsCh` channel
if c.nsTestFinishCount < c.collectedTestCount() {
c.printTestMessages(<-c.nsTestsCh)
}
}
// collector stopped but there still might be messages to print
for c.nsTestFinishCount < c.collectedTestCount() {
c.printTestMessages(<-c.nsTestsCh)
}
c.printerDoneCh <- true
close(c.nsTestsCh)
}

func (c *ConcurrentLogger) collectedTestCount() int {
c.nsTestMsgsLock.Lock()
testCount := len(c.nsTestMsgs)
c.nsTestMsgsLock.Unlock()
return testCount
}

func (c *ConcurrentLogger) printTestMessages(nsTest string) {
for printedMessageIndex := 0; ; {
c.nsTestMsgsLock.Lock()
messages := c.nsTestMsgs[nsTest]
c.nsTestMsgsLock.Unlock()
if len(messages) == printedMessageIndex {
// wait for new test messages
time.Sleep(time.Millisecond * 50)
continue
}
for ; printedMessageIndex < len(messages); printedMessageIndex++ {
mustFprint(c.writer, messages[printedMessageIndex].data)
if messages[printedMessageIndex].finish {
c.nsTestFinishCount++
return
}
}
c.messages <- message{
test: test,
data: buf.Bytes(),
finish: true,
}
}

func mustFprint(writer io.Writer, msg string) {
if _, err := fmt.Fprint(writer, msg); err != nil {
func mustWrite(writer io.Writer, msg []byte) {
if _, err := writer.Write(msg); err != nil {
panic(fmt.Errorf("failed to print log message: %w", err))
}
}
Expand Down
2 changes: 1 addition & 1 deletion cilium-cli/connectivity/check/logger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestConcurrentLogger(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
logBuf := &bytes.Buffer{}
logger := NewConcurrentLogger(logBuf, tt.concurrency)
logger := NewConcurrentLogger(logBuf)
logger.Start()

connTests := make([]*ConnectivityTest, 0, tt.concurrency)
Expand Down
10 changes: 9 additions & 1 deletion cilium-cli/connectivity/check/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,7 @@ func (t *Test) flush() {
if _, err := io.Copy(buf, t.logBuf); err != nil {
panic(err)
}
t.ctx.logger.Print(t, buf.String())
t.ctx.logger.Print(t, buf.Bytes())

// Assign a nil buffer so future writes go to user-specified writer.
t.logBuf = nil
Expand Down Expand Up @@ -486,6 +486,14 @@ func timestamp() string {
return fmt.Sprintf("[%s] ", time.Now().Format(time.RFC3339))
}

func timestampBytes() []byte {
b := make([]byte, 0, 32) // roughly enough space
b = append(b, '[')
b = time.Now().AppendFormat(b, time.RFC3339)
b = append(b, ']', ' ')
return b
}

type debugWriter struct {
ct *ConnectivityTest
}
Expand Down
Loading
0