-
Notifications
You must be signed in to change notification settings - Fork 1.3k
[CNM] Flush closed connections dynamically #36321
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Uncompressed package size comparisonComparison with ancestor Diff per package
Decision |
Test changes on VMUse this command from test-infra-definitions to manually test this PR changes on a VM: dda inv aws.create-vm --pipeline-id=63365132 --os-family=ubuntu Note: This applies to commit 4ad7696 |
Regression DetectorRegression Detector ResultsMetrics dashboard Baseline: 5224e9d Optimization Goals: ✅ No significant changes detected
|
perf | experiment | goal | Δ mean % | Δ mean % CI | trials | links |
---|---|---|---|---|---|---|
➖ | quality_gate_logs | % cpu utilization | +0.16 | [-2.59, +2.91] | 1 | Logs bounds checks dashboard |
➖ | file_to_blackhole_1000ms_latency | egress throughput | +0.10 | [-0.46, +0.67] | 1 | Logs |
➖ | file_to_blackhole_0ms_latency_http1 | egress throughput | +0.10 | [-0.51, +0.70] | 1 | Logs |
➖ | file_to_blackhole_300ms_latency | egress throughput | +0.03 | [-0.58, +0.64] | 1 | Logs |
➖ | quality_gate_idle | memory utilization | +0.02 | [-0.05, +0.08] | 1 | Logs bounds checks dashboard |
➖ | tcp_dd_logs_filter_exclude | ingress throughput | +0.01 | [-0.01, +0.02] | 1 | Logs |
➖ | uds_dogstatsd_to_api | ingress throughput | -0.00 | [-0.29, +0.29] | 1 | Logs |
➖ | file_to_blackhole_0ms_latency_http2 | egress throughput | -0.04 | [-0.62, +0.53] | 1 | Logs |
➖ | file_to_blackhole_0ms_latency | egress throughput | -0.05 | [-0.61, +0.51] | 1 | Logs |
➖ | file_to_blackhole_500ms_latency | egress throughput | -0.06 | [-0.65, +0.53] | 1 | Logs |
➖ | file_to_blackhole_1000ms_latency_linear_load | egress throughput | -0.06 | [-0.29, +0.17] | 1 | Logs |
➖ | file_to_blackhole_100ms_latency | egress throughput | -0.08 | [-0.70, +0.54] | 1 | Logs |
➖ | ddot_logs | memory utilization | -0.27 | [-0.36, -0.18] | 1 | Logs |
➖ | docker_containers_memory | memory utilization | -0.27 | [-0.36, -0.18] | 1 | Logs |
➖ | ddot_metrics | memory utilization | -0.39 | [-0.51, -0.27] | 1 | Logs |
➖ | otlp_ingest_logs | memory utilization | -0.46 | [-0.58, -0.33] | 1 | Logs |
➖ | uds_dogstatsd_20mb_12k_contexts_20_senders | memory utilization | -0.67 | [-0.74, -0.60] | 1 | Logs |
➖ | docker_containers_cpu | % cpu utilization | -0.74 | [-3.76, +2.29] | 1 | Logs |
➖ | tcp_syslog_to_blackhole | ingress throughput | -0.80 | [-0.86, -0.74] | 1 | Logs |
➖ | quality_gate_idle_all_features | memory utilization | -0.89 | [-1.03, -0.75] | 1 | Logs bounds checks dashboard |
➖ | otlp_ingest_metrics | memory utilization | -1.37 | [-1.54, -1.20] | 1 | Logs |
➖ | file_tree | memory utilization | -1.61 | [-1.81, -1.41] | 1 | Logs |
➖ | uds_dogstatsd_to_api_cpu | % cpu utilization | -2.20 | [-3.07, -1.34] | 1 | Logs |
Bounds Checks: ❌ Failed
perf | experiment | bounds_check_name | replicates_passed | links |
---|---|---|---|---|
❌ | quality_gate_logs | memory_usage | 9/10 | bounds checks dashboard |
✅ | docker_containers_cpu | simple_check_run | 10/10 | |
✅ | docker_containers_memory | memory_usage | 10/10 | |
✅ | docker_containers_memory | simple_check_run | 10/10 | |
✅ | file_to_blackhole_0ms_latency | lost_bytes | 10/10 | |
✅ | file_to_blackhole_0ms_latency | memory_usage | 10/10 | |
✅ | file_to_blackhole_0ms_latency_http1 | lost_bytes | 10/10 | |
✅ | file_to_blackhole_0ms_latency_http1 | memory_usage | 10/10 | |
✅ | file_to_blackhole_0ms_latency_http2 | lost_bytes | 10/10 | |
✅ | file_to_blackhole_0ms_latency_http2 | memory_usage | 10/10 | |
✅ | file_to_blackhole_1000ms_latency | memory_usage | 10/10 | |
✅ | file_to_blackhole_1000ms_latency_linear_load | memory_usage | 10/10 | ✅ | file_to_blackhole_100ms_latency | lost_bytes | 10/10 |
✅ | file_to_blackhole_100ms_latency | memory_usage | 10/10 | |
✅ | file_to_blackhole_300ms_latency | lost_bytes | 10/10 | |
✅ | file_to_blackhole_300ms_latency | memory_usage | 10/10 | |
✅ | file_to_blackhole_500ms_latency | lost_bytes | 10/10 | |
✅ | file_to_blackhole_500ms_latency | memory_usage | 10/10 | |
✅ | quality_gate_idle | intake_connections | 10/10 | bounds checks dashboard |
✅ | quality_gate_idle | memory_usage | 10/10 | bounds checks dashboard |
✅ | quality_gate_idle_all_features | intake_connections | 10/10 | bounds checks dashboard |
✅ | quality_gate_idle_all_features | memory_usage | 10/10 | bounds checks dashboard |
✅ | quality_gate_logs | intake_connections | 10/10 | bounds checks dashboard |
✅ | quality_gate_logs | lost_bytes | 10/10 | bounds checks dashboard |
Explanation
Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%
Performance changes are noted in the perf column of each table:
- ✅ = significantly better comparison variant performance
- ❌ = significantly worse comparison variant performance
- ➖ = no significant change in performance
A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".
For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:
-
Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.
-
Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.
-
Its configuration does not mark it "erratic".
CI Pass/Fail Decision
❌ Failed. Some Quality Gates were violated.
- quality_gate_logs, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_logs, bounds check memory_usage: 9/10 replicas passed. Failed 1 which is > 0. Gate FAILED.
- quality_gate_logs, bounds check lost_bytes: 10/10 replicas passed. Gate passed.
- quality_gate_idle_all_features, bounds check intake_connections: 10/10 replicas passed. Gate passed.
- quality_gate_idle_all_features, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_idle, bounds check memory_usage: 10/10 replicas passed. Gate passed.
- quality_gate_idle, bounds check intake_connections: 10/10 replicas passed. Gate passed.
pkg/network/state_test.go
Outdated
// Add connections below threshold | ||
ns.Lock() | ||
for i := 0; uint32(i) < capacityThreshold-1; i++ { // Add 8 connections | ||
ns.storeClosedConnection(createTestConnectionStats(StatCookie(i))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use networkState.StoreClosedConnection
in the tests which acquires the lock.
…they always align
isNearCapacity, err := nt.tracer.IsClosedConnectionsNearCapacity(clientID) | ||
if err != nil { | ||
log.Errorf("Error checking connection capacity for client %s: %v", clientID, err) | ||
w.WriteHeader(http.StatusInternalServerError) | ||
return | ||
} | ||
|
||
if isNearCapacity { | ||
log.Debugf("Responding to capacity check request for client %s: Near capacity (200 OK)", clientID) | ||
w.WriteHeader(http.StatusOK) | ||
} else { | ||
log.Tracef("Responding to capacity check request for client %s: Not near capacity (204 No Content)", clientID) | ||
w.WriteHeader(http.StatusNoContent) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
don't you want to measure "how close" are we for the limit?
it might help with finding bugs, or fine-tuning the terminology of "near capacity"
pkg/config/setup/process.go
Outdated
@@ -205,6 +205,9 @@ func setupProcesses(config pkgconfigmodel.Setup) { | |||
|
|||
procBindEnvAndSetDefault(config, "process_config.language_detection.grpc_port", DefaultProcessEntityStreamPort) | |||
|
|||
// Connection Capacity Check configuration | |||
procBindEnvAndSetDefault(config, "process_config.connections_capacity_check_interval", 30*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we check the capacity every 30s? not 10s?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default is 30 so we can merge this PR without it changing the current behavior, and override this config with the chart to apply the change selectively
pkg/network/config/config.go
Outdated
TCPFailedConnectionsEnabled: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_tcp_failed_connections")), | ||
MaxTrackedConnections: uint32(cfg.GetInt64(sysconfig.FullKeyPath(spNS, "max_tracked_connections"))), | ||
MaxClosedConnectionsBuffered: uint32(cfg.GetInt64(sysconfig.FullKeyPath(spNS, "max_closed_connections_buffered"))), | ||
ClosedConnectionsBufferThresholdRatio: cfg.GetFloat64(sysconfig.FullKeyPath(spNS, "closed_connections_buffer_threshold_ratio")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Where is closed_connections_buffer_threshold_ratio
defined?
also why do you use spNS
? I do think it is polluting the global namespace of system-probe and incorrectly can be assumed to be affecting system-probe and not only cnm
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point, I will change this to use the netNS. I'm not sure what you mean in the first question
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
for the first question: you should have declared the configuration option in setup/system-probe.go
like just max_traced_connections
- cfg.BindEnvAndSetDefault(join(spNS, "max_tracked_connections"), 65536)
(with or without a default value)
pkg/network/state.go
Outdated
if !client.closedConnectionsNearCapacity.Load() { | ||
log.Warnf("Closed connections buffer for client %s is nearing capacity (%d/%d, threshold: %.2f%%). Setting flag.", clientID, len(client.closed.conns), ns.maxClosedConns, ns.closedConnectionsBufferThresholdRatio*100) | ||
client.closedConnectionsNearCapacity.Store(true) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
isn't it a racy code? two instances can reach to line 589 at the same time
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If that's not a racy code as I believe we use mutex in one of the callers, than why do you need an atomic?
Also, where do you set it to false
?
And IIRC, you should be used CompareAndSwap
which preform the swapping (if the condition met) and returns the old value in an atomic way.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code only called when the lock for the state is held, so likely no need to for atomics.
pkg/network/state.go
Outdated
// Check if the buffer is near capacity using the configured threshold | ||
if float64(len(client.closed.conns)) >= float64(ns.maxClosedConns)*ns.closedConnectionsBufferThresholdRatio { | ||
if !client.closedConnectionsNearCapacity.Load() { | ||
log.Warnf("Closed connections buffer for client %s is nearing capacity (%d/%d, threshold: %.2f%%). Setting flag.", clientID, len(client.closed.conns), ns.maxClosedConns, ns.closedConnectionsBufferThresholdRatio*100) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Have you checked this log isn't spamming during a high/moderate load?
ratioPath := spNS("closed_connections_buffer_threshold_ratio") | ||
if !cfg.IsConfigured(ratioPath) { | ||
cfg.Set(ratioPath, defaultClosedConnectionsBufferThresholdRatioSystemProbe, model.SourceDefault) | ||
} else { | ||
ratio := cfg.GetFloat64(ratioPath) | ||
if ratio <= 0 || ratio > 1.0 { | ||
log.Warnf("Invalid value %f for %s, using default %f. Value must be > 0 and <= 1.0.", ratio, ratioPath, defaultClosedConnectionsBufferThresholdRatioSystemProbe) | ||
cfg.Set(ratioPath, defaultClosedConnectionsBufferThresholdRatioSystemProbe, model.SourceAgentRuntime) | ||
} | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do recommend on adding UTs to check this logic.
pkg/process/runner/runner.go
Outdated
// Otherwise (e.g. not configured, or derived value is invalid), defaultInterval is returned. | ||
func (l *CheckRunner) getConnectionsCheckInterval(checkName string, defaultInterval time.Duration) time.Duration { | ||
const minAllowedInterval = 10 * time.Second | ||
const maxAllowedInterval = 30 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the max must match the configuration value of connection check.
We can set the connections check to report every 1m if we want, but the current code ignore that and forces us to use 30s or lower).
Also, the minimum interval can cause a violation as well, as if we set a connection check interval to 10s (we have that in a single customer due to high load), then we have 2 issues -
- Race between the new endpoint and the connections check endpoint (cc: see my comment in the RFC about reusing the same endpoint)
- you hard-coded prevent the connection check to be less than 10s
pkg/process/runner/runner.go
Outdated
// If both are true, the capacity interval is returned. | ||
// If explicitly configured by the user but fails validation, a specific warning is logged and defaultInterval is returned. | ||
// Otherwise (e.g. not configured, or derived value is invalid), defaultInterval is returned. | ||
func (l *CheckRunner) getConnectionsCheckInterval(checkName string, defaultInterval time.Duration) time.Duration { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do we have UTs for this?
pkg/process/checks/net.go
Outdated
return false, fmt.Errorf("error creating capacity check request %s: %w", url, err) | ||
} | ||
|
||
log.Tracef("Checking connections capacity endpoint: %s", url) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps you should use shouldLog
as if the log level is not trace (99% of the time), then this is a redundant cpu consumption
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Blocking the PR on:
- comment regarding breaking today's status and possible race conditions
- The code is enabled even when USM is enabled.
ratio := cfg.GetFloat64(ratioPath) | ||
if ratio <= 0 || ratio > 1.0 { | ||
log.Warnf("Invalid value %f for %s, using default %f. Value must be > 0 and <= 1.0.", ratio, ratioPath, defaultClosedConnectionsBufferThresholdRatioSystemProbe) | ||
cfg.Set(ratioPath, defaultClosedConnectionsBufferThresholdRatioSystemProbe, model.SourceAgentRuntime) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
suggestion: create validateFloat64
which mirrors the other validate*
functions in adjust.go
and use that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How the two new configs interact with each and the connection interval is not clear to me. Also what happens if one is set and the other is not? It would be great if we can figure out one config and define some sane defaults.
pkg/network/tracer/tracer.go
Outdated
// IsClosedConnectionsNearCapacity checks if the closed connections buffer is near capacity for a specific client. | ||
// It returns true if near capacity, false otherwise, and an error if the state is not initialized. | ||
func (t *Tracer) IsClosedConnectionsNearCapacity(clientID string) (bool, error) { | ||
if t.state == nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How is this possible?
pkg/network/config/config.go
Outdated
TCPFailedConnectionsEnabled: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_tcp_failed_connections")), | ||
MaxTrackedConnections: uint32(cfg.GetInt64(sysconfig.FullKeyPath(spNS, "max_tracked_connections"))), | ||
MaxClosedConnectionsBuffered: uint32(cfg.GetInt64(sysconfig.FullKeyPath(spNS, "max_closed_connections_buffered"))), | ||
ClosedConnectionsBufferThresholdRatio: cfg.GetFloat64(sysconfig.FullKeyPath(spNS, "closed_connections_buffer_threshold_ratio")), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the default here 1.0
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does it change if we decrease the check interval from the default of 30s? Do we have to set both configs?
pkg/network/state.go
Outdated
if !client.closedConnectionsNearCapacity.Load() { | ||
log.Warnf("Closed connections buffer for client %s is nearing capacity (%d/%d, threshold: %.2f%%). Setting flag.", clientID, len(client.closed.conns), ns.maxClosedConns, ns.closedConnectionsBufferThresholdRatio*100) | ||
client.closedConnectionsNearCapacity.Store(true) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this code only called when the lock for the state is held, so likely no need to for atomics.
maxOffsetThreshold = 3000 | ||
defaultMaxProcessesTracked = 1024 | ||
defaultMaxTrackedConnections = 65536 | ||
defaultClosedConnectionsBufferThresholdRatioSystemProbe = 0.75 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this 0.75? Don't we want this to be 1.0 to keep the status quo?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for agent-configuration owned files
pkg/config/setup/process.go
Outdated
@@ -205,6 +205,9 @@ func setupProcesses(config pkgconfigmodel.Setup) { | |||
|
|||
procBindEnvAndSetDefault(config, "process_config.language_detection.grpc_port", DefaultProcessEntityStreamPort) | |||
|
|||
// Connect 10000 ion Capacity Check configuration | |||
procBindEnvAndSetDefault(config, "process_config.connections_capacity_check_interval", 30*time.Second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we move this to a child of process_config
, like process_config.connections
or similar?
pkg/process/runner/runner.go
Outdated
@@ -368,7 +368,12 @@ func (l *CheckRunner) basicRunner(c checks.Check) func() { | |||
l.runCheck(c) | |||
} | |||
|
|||
ticker := time.NewTicker(checks.GetInterval(l.config, c.Name())) | |||
tickerInterval := checks.GetInterval(l.config, c.Name()) | |||
if c.Name() == checks.ConnectionsCheckName { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could you move this logic into GetInterval()
? I see there's other check-specific logic in there already
What does this PR do?
Motivation
https://datadoghq.atlassian.net/browse/NPM-4369
Describe how you validated your changes
More info here
Possible Drawbacks / Trade-offs
Pros: Hosts that previously dropped connections will do this less or not at all. Only run checks more frequently when necessary.
Cons: Increase in CPU on nodes that need to run checks more often
Additional Notes