From 2cb8d2a29286903c55f04bc7e4273d2df549a54d Mon Sep 17 00:00:00 2001 From: Andrea Restle-Lay Date: Mon, 5 May 2025 15:57:35 -0400 Subject: [PATCH 1/6] Add structured suffix logging to Receptor --- pkg/backends/tcp.go | 4 +++ pkg/logger/logger.go | 57 ++++++++++++++++++++++++++++++- pkg/logger/logger_test.go | 38 +++++++++++++++++++++ pkg/netceptor/external_backend.go | 4 +++ pkg/netceptor/netceptor.go | 18 ++++++++-- 5 files changed, 118 insertions(+), 3 deletions(-) diff --git a/pkg/backends/tcp.go b/pkg/backends/tcp.go index 397b743c4..d2f7135ef 100644 --- a/pkg/backends/tcp.go +++ b/pkg/backends/tcp.go @@ -62,6 +62,10 @@ func (b *TCPDialer) Start(ctx context.Context, wg *sync.WaitGroup) (chan netcept return nil, err } + if b.logger != nil { + b.logger.Debug("TCPDialer connected to TCP %s Address %s\n", b.address, conn.RemoteAddr().String()) + } + return newTCPSession(conn, closeChan), nil }) } diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index ffaab5bff..47041a2f2 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -88,6 +88,7 @@ type ReceptorLogger struct { log.Logger Prefix string m sync.Mutex + Suffix map[string]string } // NewReceptorLogger to instantiate a new logger object. @@ -98,6 +99,30 @@ func NewReceptorLogger(prefix string) *ReceptorLogger { } } +func NewReceptorLoggerWithSuffix(prefix string, suffix map[string]string) *ReceptorLogger { + logger := NewReceptorLogger(prefix) + logger.SetSuffix(suffix) + + return logger +} + +func (rl *ReceptorLogger) SetSuffix(suffix map[string]string) { + rl.m.Lock() + defer rl.m.Unlock() + rl.Suffix = suffix +} + +func (rl *ReceptorLogger) UpdateSuffix(suffix map[string]string) { + rl.m.Lock() + defer rl.m.Unlock() + if rl.Suffix == nil { + rl.Suffix = make(map[string]string) + } + for k, v := range suffix { + rl.Suffix[k] = v + } +} + // SetOutput sets the output destination for the logger. func (rl *ReceptorLogger) SetOutput(w io.Writer) { rl.Logger.SetOutput(w) @@ -218,10 +243,39 @@ func (rl *ReceptorLogger) Log(level int, format string, v ...interface{}) { if logLevel >= level { rl.Logger.SetPrefix(prefix) + format, v = rl.appendSuffix(format, v) rl.Logger.Printf(format, v...) } } +func (rl *ReceptorLogger) appendSuffix(format string, v []interface{}) (string, []interface{}) { + if rl.Suffix != nil { + rl.m.Lock() + defer rl.m.Unlock() + + var b strings.Builder + b.WriteString("{") + first := true + for k, val := range rl.Suffix { + if !first { + b.WriteString(",") + } + first = false + b.WriteString(`"`) + b.WriteString(k) + b.WriteString(`":"`) + b.WriteString(val) + b.WriteString(`"`) + } + b.WriteString("}") + + format = strings.ReplaceAll(format, "\n", "") + " %s" + v = append(v, b.String()) + } + + return format, v +} + // SanitizedLog adds a prefix and prints a given log message. func (rl *ReceptorLogger) SanitizedLog(level int, format string, v ...interface{}) { if logger != nil { @@ -245,7 +299,8 @@ func (rl *ReceptorLogger) SanitizedLog(level int, format string, v ...interface{ } if logLevel >= level { - message := fmt.Sprintf(format, v...) + message, v := rl.appendSuffix(format, v) + message = fmt.Sprintf(message, v...) sanMessage := strings.ReplaceAll(message, "\n", "") rl.Logger.SetPrefix(prefix) rl.Logger.Print(sanMessage) diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index a19452095..4cabd0325 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -4,6 +4,7 @@ import ( "bytes" "fmt" "os" + "strings" "testing" "github.com/ansible/receptor/pkg/logger" @@ -120,3 +121,40 @@ func TestDebugPayload(t *testing.T) { }) } } + +func TestGetLoggerWithSuffix(t *testing.T) { + var logBuffer bytes.Buffer + + logger.SetGlobalLogLevel(4) + + suffix := map[string]string{ + "node_id": "controller", + "remote_id": "hop", + } + receptorLogger := logger.NewReceptorLoggerWithSuffix("", suffix) + receptorLogger.SetOutput(&logBuffer) + expectedLog := `example message 1 {"node_id":"controller","remote_id":"hop"}` + receptorLogger.Error("example message 1") + testOutput := logBuffer.Bytes() + + if !strings.Contains(logBuffer.String(), expectedLog) { + t.Errorf("failed to log correctly, expected: %v got %v", expectedLog, string(testOutput)) + } + + logBuffer.Reset() + + suffix = map[string]string{ + "node_id": "controller", + "remote_id": "hop", + "cost": "12", + } + receptorLogger.UpdateSuffix(suffix) + expectedLog = `example message 2 {"node_id":"controller","remote_id":"hop","cost":"12"}` // note fields comes out alphabetically + receptorLogger.SanitizedError("example message 2") + testOutput = logBuffer.Bytes() + + if !strings.Contains(logBuffer.String(), expectedLog) { + t.Errorf("failed to log correctly, expected: %v got %v", expectedLog, string(testOutput)) + } + logBuffer.Reset() +} diff --git a/pkg/netceptor/external_backend.go b/pkg/netceptor/external_backend.go index 1c01b547c..898c6d943 100644 --- a/pkg/netceptor/external_backend.go +++ b/pkg/netceptor/external_backend.go @@ -204,3 +204,7 @@ func (es *ExternalSession) Close() error { return err } + +func (mc *netMessageConn) RemoteAddr() net.Addr { + return mc.conn.RemoteAddr() +} diff --git a/pkg/netceptor/netceptor.go b/pkg/netceptor/netceptor.go index f7452245f..792b0860f 100644 --- a/pkg/netceptor/netceptor.go +++ b/pkg/netceptor/netceptor.go @@ -355,6 +355,7 @@ func NewWithConsts(ctx context.Context, nodeID string, MinVersion: tls.VersionTLS12, } s.AddNameHash(nodeID) + s.GetLogger().SetSuffix(map[string]string{"node_id": nodeID}) s.context, s.cancelFunc = context.WithCancel(ctx) s.unreachableBroker = utils.NewBroker(s.context, reflect.TypeOf(UnreachableNotification{})) s.routingUpdateBroker = utils.NewBroker(s.context, reflect.TypeOf(map[string]string{})) @@ -1987,6 +1988,8 @@ func (s *Netceptor) runProtocol(ctx context.Context, sess BackendSession, bi *Ba if remoteNodeID == s.nodeID { return s.sendAndLogConnectionRejection(remoteNodeID, ci, "it tried to connect using our own node ID") } + suffix := map[string]string{"remote_id": remoteNodeID} + s.GetLogger().UpdateSuffix(suffix) remoteNodeAccepted := true if bi.allowedPeers != nil { remoteNodeAccepted = false @@ -2002,6 +2005,7 @@ func (s *Netceptor) runProtocol(ctx context.Context, sess BackendSession, bi *Ba return s.sendAndLogConnectionRejection(remoteNodeID, ci, "it is not in the allowed peers list") } + // Check if there is connection cost for this remoteNodeID // Check if there is connection cost for this remoteNodeID remoteNodeCost, ok := bi.nodeCost[remoteNodeID] if ok { @@ -2010,11 +2014,21 @@ func (s *Netceptor) runProtocol(ctx context.Context, sess BackendSession, bi *Ba } s.connLock.Lock() - // Check if there connInfo for this remoteNodeID - _, ok = s.connections[remoteNodeID] + // Check if there is already connInfo for this remoteNodeID + existingConn, ok := s.connections[remoteNodeID] if ok { remoteNodeAccepted = false } + var connError error + connError = nil + + // Verify that the existing connection is valid + if ok && existingConn != nil { + connError = existingConn.Context.Err() + } + if ok && connError != nil { + s.Logger.Error("Initial handshake failed: %s", connError) + } if !remoteNodeAccepted { s.connLock.Unlock() From 03ca76b2566792917513799acb5410d5a5b8d08e Mon Sep 17 00:00:00 2001 From: Andrea Restle-Lay Date: Mon, 5 May 2025 17:03:24 -0400 Subject: [PATCH 2/6] feedback from pr review --- pkg/logger/logger.go | 6 +++ pkg/logger/logger_test.go | 102 ++++++++++++++++++++------------------ 2 files changed, 60 insertions(+), 48 deletions(-) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 47041a2f2..e2677a1dd 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -99,6 +99,7 @@ func NewReceptorLogger(prefix string) *ReceptorLogger { } } +// NewReceptorLoggerWithSuffix to instantiate a new logger object with a new Suffix func NewReceptorLoggerWithSuffix(prefix string, suffix map[string]string) *ReceptorLogger { logger := NewReceptorLogger(prefix) logger.SetSuffix(suffix) @@ -106,12 +107,17 @@ func NewReceptorLoggerWithSuffix(prefix string, suffix map[string]string) *Recep return logger } +// SetSuffix sets the suffix for the logger, overwriting any existing suffix. func (rl *ReceptorLogger) SetSuffix(suffix map[string]string) { rl.m.Lock() defer rl.m.Unlock() rl.Suffix = suffix } +// UpdateSuffix allows adding suffix key/value pairs to an existing suffix. +// If the key already exists, the value will be overwritten. +// If the suffix is nil, it will be initialized. +// This is useful for adding additional context to log messages when you want to keep the existing suffix key/value pairs. func (rl *ReceptorLogger) UpdateSuffix(suffix map[string]string) { rl.m.Lock() defer rl.m.Unlock() diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index 4cabd0325..e54cad357 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -3,7 +3,6 @@ package logger_test import ( "bytes" "fmt" - "os" "strings" "testing" @@ -73,14 +72,10 @@ func TestLogLevelToNameWithError(t *testing.T) { } func TestDebugPayload(t *testing.T) { - logFilePath := "/tmp/test-output" + var logBuffer bytes.Buffer logger.SetGlobalLogLevel(4) receptorLogger := logger.NewReceptorLogger("testDebugPayload") - logFile, err := os.OpenFile(logFilePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o600) - if err != nil { - t.Error("error creating test-output file") - } - + receptorLogger.SetOutput(&logBuffer) payload := "Testing debugPayload" workUnitID := "1234" connectionType := "unix socket" @@ -105,56 +100,67 @@ func TestDebugPayload(t *testing.T) { for _, testCase := range debugPayloadTestCases { t.Run(testCase.name, func(t *testing.T) { - receptorLogger.SetOutput(logFile) receptorLogger.DebugPayload(testCase.debugPayload, testCase.payload, testCase.workUnitID, testCase.connectionType) + testOutput := logBuffer.Bytes() - testOutput, err := os.ReadFile(logFilePath) - if err != nil { - t.Error("error reading test-output file") - } - if !bytes.Contains(testOutput, []byte(testCase.expectedLog)) { + if !strings.Contains(string(testOutput), testCase.expectedLog) { t.Errorf("failed to log correctly, expected: %v got %v", testCase.expectedLog, string(testOutput)) } - if err := os.Truncate(logFilePath, 0); err != nil { - t.Errorf("failed to truncate: %v", err) - } + logBuffer.Reset() }) } } -func TestGetLoggerWithSuffix(t *testing.T) { - var logBuffer bytes.Buffer - - logger.SetGlobalLogLevel(4) - - suffix := map[string]string{ - "node_id": "controller", - "remote_id": "hop", +func assertSuffixFieldsPresent(t *testing.T, logLine string, expected map[string]string) { + t.Helper() + for k, v := range expected { + needle := `"` + k + `":"` + v + `"` + if !strings.Contains(logLine, needle) { + t.Errorf("expected key-value pair %s not found in log: %s", needle, logLine) + } } - receptorLogger := logger.NewReceptorLoggerWithSuffix("", suffix) - receptorLogger.SetOutput(&logBuffer) - expectedLog := `example message 1 {"node_id":"controller","remote_id":"hop"}` - receptorLogger.Error("example message 1") - testOutput := logBuffer.Bytes() - - if !strings.Contains(logBuffer.String(), expectedLog) { - t.Errorf("failed to log correctly, expected: %v got %v", expectedLog, string(testOutput)) - } - - logBuffer.Reset() +} - suffix = map[string]string{ - "node_id": "controller", - "remote_id": "hop", - "cost": "12", - } - receptorLogger.UpdateSuffix(suffix) - expectedLog = `example message 2 {"node_id":"controller","remote_id":"hop","cost":"12"}` // note fields comes out alphabetically - receptorLogger.SanitizedError("example message 2") - testOutput = logBuffer.Bytes() +func TestGetLoggerWithSuffix(t *testing.T) { + logger.SetGlobalLogLevel(4) - if !strings.Contains(logBuffer.String(), expectedLog) { - t.Errorf("failed to log correctly, expected: %v got %v", expectedLog, string(testOutput)) - } - logBuffer.Reset() + t.Run("initial suffix", func(t *testing.T) { + var logBuffer bytes.Buffer + testname := "Initial Suffix Example" + suffix := map[string]string{ + "node_id": "controller", + "remote_id": "hop", + } + receptorLogger := logger.NewReceptorLoggerWithSuffix("", suffix) + receptorLogger.SetOutput(&logBuffer) + + receptorLogger.Error(testname) + if !strings.Contains(logBuffer.String(), testname) { + t.Errorf("expected log message %s not found in log: %s", testname, logBuffer.String()) + } + assertSuffixFieldsPresent(t, logBuffer.String(), suffix) + }) + t.Run("updated suffix", func(t *testing.T) { + var logBuffer bytes.Buffer + testname := "Updated Suffix Example" + suffix := map[string]string{ + "node_id": "controller", + "remote_id": "hop", + } + receptorLogger := logger.NewReceptorLoggerWithSuffix("", suffix) + receptorLogger.SetOutput(&logBuffer) + + updated := map[string]string{ + "cost": "12", + } + receptorLogger.UpdateSuffix(updated) + receptorLogger.SanitizedError(testname) + + if !strings.Contains(logBuffer.String(), testname) { + t.Errorf("expected log message %s not found in log: %s", testname, logBuffer.String()) + } + + assertSuffixFieldsPresent(t, logBuffer.String(), suffix) + assertSuffixFieldsPresent(t, logBuffer.String(), updated) + }) } From d176f651729ec2add821c12169dd9f15aec0458a Mon Sep 17 00:00:00 2001 From: Andrea Restle-Lay Date: Tue, 6 May 2025 09:04:39 -0400 Subject: [PATCH 3/6] lint --- pkg/logger/logger.go | 2 +- pkg/logger/logger_test.go | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index e2677a1dd..eeda20a36 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -99,7 +99,7 @@ func NewReceptorLogger(prefix string) *ReceptorLogger { } } -// NewReceptorLoggerWithSuffix to instantiate a new logger object with a new Suffix +// NewReceptorLoggerWithSuffix to instantiate a new logger object with a new Suffix. func NewReceptorLoggerWithSuffix(prefix string, suffix map[string]string) *ReceptorLogger { logger := NewReceptorLogger(prefix) logger.SetSuffix(suffix) diff --git a/pkg/logger/logger_test.go b/pkg/logger/logger_test.go index e54cad357..68caeb1bf 100644 --- a/pkg/logger/logger_test.go +++ b/pkg/logger/logger_test.go @@ -134,7 +134,7 @@ func TestGetLoggerWithSuffix(t *testing.T) { receptorLogger := logger.NewReceptorLoggerWithSuffix("", suffix) receptorLogger.SetOutput(&logBuffer) - receptorLogger.Error(testname) + receptorLogger.Error("%s", testname) if !strings.Contains(logBuffer.String(), testname) { t.Errorf("expected log message %s not found in log: %s", testname, logBuffer.String()) } @@ -154,7 +154,7 @@ func TestGetLoggerWithSuffix(t *testing.T) { "cost": "12", } receptorLogger.UpdateSuffix(updated) - receptorLogger.SanitizedError(testname) + receptorLogger.SanitizedError("%s", testname) if !strings.Contains(logBuffer.String(), testname) { t.Errorf("expected log message %s not found in log: %s", testname, logBuffer.String()) From 80185279a53e43f5a58d080930f1b9672626ef41 Mon Sep 17 00:00:00 2001 From: Andrea Restle-Lay Date: Tue, 6 May 2025 09:11:15 -0400 Subject: [PATCH 4/6] Update pkg/logger/logger.go Co-authored-by: Dan Leehr --- pkg/logger/logger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index eeda20a36..8f9ff677d 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -254,7 +254,7 @@ func (rl *ReceptorLogger) Log(level int, format string, v ...interface{}) { } } -func (rl *ReceptorLogger) appendSuffix(format string, v []interface{}) (string, []interface{}) { +func (rl *ReceptorLogger) appendSuffix(format string, v []string) (string, []string) { if rl.Suffix != nil { rl.m.Lock() defer rl.m.Unlock() From ad96375d53c832d347c74f50ed1245636efde363 Mon Sep 17 00:00:00 2001 From: Andrea Restle-Lay Date: Tue, 6 May 2025 09:19:54 -0400 Subject: [PATCH 5/6] Revert "Update pkg/logger/logger.go" This reverts commit 80185279a53e43f5a58d080930f1b9672626ef41. --- pkg/logger/logger.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/logger/logger.go b/pkg/logger/logger.go index 8f9ff677d..eeda20a36 100644 --- a/pkg/logger/logger.go +++ b/pkg/logger/logger.go @@ -254,7 +254,7 @@ func (rl *ReceptorLogger) Log(level int, format string, v ...interface{}) { } } -func (rl *ReceptorLogger) appendSuffix(format string, v []string) (string, []string) { +func (rl *ReceptorLogger) appendSuffix(format string, v []interface{}) (string, []interface{}) { if rl.Suffix != nil { rl.m.Lock() defer rl.m.Unlock() From d498326e5ab3d08c2ea8f858dd2c48e1057b07ac Mon Sep 17 00:00:00 2001 From: Andrea Restle-Lay Date: Wed, 7 May 2025 15:56:37 -0400 Subject: [PATCH 6/6] update handshake error with specifics of issue --- pkg/netceptor/netceptor.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/netceptor/netceptor.go b/pkg/netceptor/netceptor.go index 792b0860f..304c1da0e 100644 --- a/pkg/netceptor/netceptor.go +++ b/pkg/netceptor/netceptor.go @@ -2027,7 +2027,7 @@ func (s *Netceptor) runProtocol(ctx context.Context, sess BackendSession, bi *Ba connError = existingConn.Context.Err() } if ok && connError != nil { - s.Logger.Error("Initial handshake failed: %s", connError) + s.Logger.Error("Context for existing connection error: %s", connError) } if !remoteNodeAccepted {