8000 AAP-43201 netceptor logging by arrestle · Pull Request #1300 · ansible/receptor · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

AAP-43201 netceptor logging #1300

8000 New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
May 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkg/backends/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ func (b *TCPDialer) Start(ctx context.Context, wg *sync.WaitGroup) (chan netcept
return nil, err
}

if b.logger != nil {
b.logger.Debug("TCPDialer connected to TCP %s Address %s\n", b.address, conn.RemoteAddr().String())
}

return newTCPSession(conn, closeChan), nil
})
}
Expand Down
63 changes: 62 additions & 1 deletion pkg/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ type ReceptorLogger struct {
log.Logger
Prefix string
m sync.Mutex
Suffix map[string]string
}

// NewReceptorLogger to instantiate a new logger object.
Expand All @@ -98,6 +99,36 @@ func NewReceptorLogger(prefix string) *ReceptorLogger {
}
}

// NewReceptorLoggerWithSuffix to instantiate a new logger object with a new Suffix.
func NewReceptorLoggerWithSuffix(prefix string, suffix map[string]string) *ReceptorLogger {
logger := NewReceptorLogger(prefix)
logger.SetSuffix(suffix)

return logger
}

// SetSuffix sets the suffix for the logger, overwriting any existing suffix.
func (rl *ReceptorLogger) SetSuffix(suffix map[string]string) {
rl.m.Lock()
defer rl.m.Unlock()
rl.Suffix = suffix
}

// UpdateSuffix allows adding suffix key/value pairs to an existing suffix.
// If the key already exists, the value will be overwritten.
// If the suffix is nil, it will be initialized.
// This is useful for adding additional context to log messages when you want to keep the existing suffix key/value pairs.
func (rl *ReceptorLogger) UpdateSuffix(suffix map[string]string) {
rl.m.Lock()
defer rl.m.Unlock()
if rl.Suffix == nil {
rl.Suffix = make(map[string]string)
}
for k, v := range suffix {
rl.Suffix[k] = v
}
}

// SetOutput sets the output destination for the logger.
func (rl *ReceptorLogger) SetOutput(w io.Writer) {
rl.Logger.SetOutput(w)
Expand Down Expand Up @@ -218,10 +249,39 @@ func (rl *ReceptorLogger) Log(level int, format string, v ...interface{}) {

if logLevel >= level {
rl.Logger.SetPrefix(prefix)
format, v = rl.appendSuffix(format, v)
rl.Logger.Printf(format, v...)
}
}

func (rl *ReceptorLogger) appendSuffix(format string, v []interface{}) (string, []interface{}) {
if rl.Suffix != nil {
rl.m.Lock()
defer rl.m.Unlock()

var b strings.Builder
b.WriteString("{")
first := true
for k, val := range rl.Suffix {
if !first {
b.WriteString(",")
}
first = false
b.WriteString(`"`)
b.WriteString(k)
b.WriteString(`":"`)
b.WriteString(val)
b.WriteString(`"`)
}
b.WriteString("}")

format = strings.ReplaceAll(format, "\n", "") + " %s"
v = append(v, b.String())
}

return format, v
}

// SanitizedLog adds a prefix and prints a given log message.
func (rl *ReceptorLogger) SanitizedLog(level int, format string, v ...interface{}) {
if logger != nil {
Expand All @@ -245,7 +305,8 @@ func (rl *ReceptorLogger) SanitizedLog(level int, format string, v ...interface{
}

if logLevel >= level {
message := fmt.Sprintf(format, v...)
message, v := rl.appendSuffix(format, v)
message = fmt.Sprintf(message, v...)
sanMessage := strings.ReplaceAll(message, "\n", "")
rl.Logger.SetPrefix(prefix)
rl.Logger.Print(sanMessage)
Expand Down
76 changes: 60 additions & 16 deletions pkg/logger/logger_test.go
Original file 8000 line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package logger_test
import (
"bytes"
"fmt"
"os"
"strings"
"testing"

"github.com/ansible/receptor/pkg/logger"
Expand Down Expand Up @@ -72,14 +72,10 @@ func TestLogLevelToNameWithError(t *testing.T) {
}

func TestDebugPayload(t *testing.T) {
logFilePath := "/tmp/test-output"
var logBuffer bytes.Buffer
logger.SetGlobalLogLevel(4)
receptorLogger := logger.NewReceptorLogger("testDebugPayload")
logFile, err := os.OpenFile(logFilePath, os.O_WRONLY|os.O_CREATE|os.O_APPEND, 0o600)
if err != nil {
t.Error("error creating test-output file")
}

receptorLogger.SetOutput(&logBuffer)
payload := "Testing debugPayload"
workUnitID := "1234"
connectionType := "unix socket"
Expand All @@ -104,19 +100,67 @@ func TestDebugPayload(t *testing.T) {

for _, testCase := range debugPayloadTestCases {
t.Run(testCase.name, func(t *testing.T) {
receptorLogger.SetOutput(logFile)
receptorLogger.DebugPayload(testCase.debugPayload, testCase.payload, testCase.workUnitID, testCase.connectionType)
testOutput := logBuffer.Bytes()

testOutput, err := os.ReadFile(logFilePath)
if err != nil {
t.Error("error reading test-output file")
}
if !bytes.Contains(testOutput, []byte(testCase.expectedLog)) {
if !strings.Contains(string(testOutput), testCase.expectedLog) {
t.Errorf("failed to log correctly, expected: %v got %v", testCase.expectedLog, string(testOutput))
}
if err := os.Truncate(logFilePath, 0); err != nil {
t.Errorf("failed to truncate: %v", err)
}
logBuffer.Reset()
})
}
}

func assertSuffixFieldsPresent(t *testing.T, logLine string, expected map[string]string) {
t.Helper()
for k, v := range expected {
needle := `"` + k + `":"` + v + `"`
if !strings.Contains(logLine, needle) {
t.Errorf("expected key-value pair %s not found in log: %s", needle, logLine)
}
}
}

func TestGetLoggerWithSuffix(t *testing.T) {
logger.SetGlobalLogLevel(4)

t.Run("initial suffix", func(t *testing.T) {
var logBuffer bytes.Buffer
testname := "Initial Suffix Example"
suffix := map[string]string{
"node_id": "controller",
"remote_id": "hop",
}
receptorLogger := logger.NewReceptorLoggerWithSuffix("", suffix)
receptorLogger.SetOutput(&logBuffer)

receptorLogger.Error("%s", testname)
if !strings.Contains(logBuffer.String(), testname) {
t.Errorf("expected log message %s not found in log: %s", testname, logBuffer.String())
}
assertSuffixFieldsPresent(t, logBuffer.String(), suffix)
})
t.Run("updated suffix", func(t *testing.T) {
var logBuffer bytes.Buffer
testname := "Updated Suffix Example"
suffix := map[string]string{
"node_id": "controller",
"remote_id": "hop",
}
receptorLogger := logger.NewReceptorLoggerWithSuffix("", suffix)
receptorLogger.SetOutput(&logBuffer)

updated := map[string]string{
"cost": "12",
}
receptorLogger.UpdateSuffix(updated)
receptorLogger.SanitizedError("%s", testname)

if !strings.Contains(logBuffer.String(), testname) {
t.Errorf("expected log message %s not found in log: %s", testname, logBuffer.String())
}

assertSuffixFieldsPresent(t, logBuffer.String(), suffix)
assertSuffixFieldsPresent(t, logBuffer.String(), updated)
})
}
4 changes: 4 additions & 0 deletions pkg/netceptor/external_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -204,3 +204,7 @@ func (es *ExternalSession) Close() error {

return err
}

func (mc *netMessageConn) RemoteAddr() net.Addr {
return mc.conn.RemoteAddr()
}
18 changes: 16 additions & 2 deletions pkg/netceptor/netceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -355,6 +355,7 @@ func NewWithConsts(ctx context.Context, nodeID string,
MinVersion: tls.VersionTLS12,
}
s.AddNameHash(nodeID)
s.GetLogger().SetSuffix(map[string]string{"node_id": nodeID})
s.context, s.cancelFunc = context.WithCancel(ctx)
s.unreachableBroker = utils.NewBroker(s.context, reflect.TypeOf(UnreachableNotification{}))
s.routingUpdateBroker = utils.NewBroker(s.context, reflect.TypeOf(map[string]string{}))
Expand Down Expand Up @@ -1987,6 +1988,8 @@ func (s *Netceptor) runProtocol(ctx context.Context, sess BackendSession, bi *Ba
if remoteNodeID == s.nodeID {
return s.sendAndLogConnectionRejection(remoteNodeID, ci, "it tried to connect using our own node ID")
}
suffix := map[string]string{"remote_id": remoteNodeID}
s.GetLogger().UpdateSuffix(suffix)
remoteNodeAccepted := true
if bi.allowedPeers != nil {
remoteNodeAccepted = false
Expand All @@ -2002,6 +2005,7 @@ func (s *Netceptor) runProtocol(ctx context.Context, sess BackendSession, bi *Ba
return s.sendAndLogConnectionRejection(remoteNodeID, ci, "it is not in the allowed peers list")
}

// Check if there is connection cost for this remoteNodeID
// Check if there is connection cost for this remoteNodeID
remoteNodeCost, ok := bi.nodeCost[remoteNodeID]
if ok {
Expand All @@ -2010,11 +2014,21 @@ func (s *Netceptor) runProtocol(ctx context.Context, sess BackendSession, bi *Ba
}
s.connLock.Lock()

// Check if there connInfo for this remoteNodeID
_, ok = s.connections[remoteNodeID]
// Check if there is already connInfo for this remoteNodeID
existingConn, ok := s.connections[remoteNodeID]
if ok {
remoteNodeAccepted = false
}
var connError error
connError = nil

// Verify that the existing connection is valid
if ok && existingConn != nil {
connError = existingConn.Context.Err()
}
if ok && connError != nil {
s.Logger.Error("Context for existing connection error: %s", connError)
}

if !remoteNodeAccepted {
s.connLock.Unlock()
Expand Down
0