8000 [CNM] Flush closed connections dynamically by akarpz · Pull Request #36321 · DataDog/datadog-agent · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[CNM] Flush closed connections dynamically #36321

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 49 commits into
base: main
Choose a base branch
from

Conversation

akarpz
Copy link
Contributor
@akarpz akarpz commented Apr 20, 2025

What does this PR do?

  • Keeps an atomic, per-client flag in the tracer state that tracks if the closed connection buffer is at 50% capacity.
  • Adds a new endpoint to fetch this status, and process-agent will now periodically check this endpoint to eagerly run the connections check if we're on track to hit capacity.
  • The periodic capacity check will operate if a new binary config is set to true (default false).
  • The connections check will still always run at 30s (was always configurable) intervals regardless of the capacity check.
  • More info here

Motivation

https://datadoghq.atlassian.net/browse/NPM-4369

Describe how you validated your changes

More info here

Possible Drawbacks / Trade-offs

Pros: Hosts that previously dropped connections will do this less or not at all. Only run checks more frequently when necessary.

Cons: Increase in CPU on nodes that need to run checks more often

Additional Notes

@github-actions github-actions bot added component/system-probe long review PR is complex, plan time to review it team/networks labels Apr 20, 2025
@github-actions github-actions bot added medium review PR review might take time and removed long review PR is complex, plan time to review it labels Apr 20, 2025
@agent-platform-auto-pr
Copy link
Contributor
agent-platform-auto-pr bot commented Apr 20, 2025

Uncompressed package size comparison

Comparison with ancestor 06e08cf387fb430c0acb1508a8e0b55778d2f6c7

Diff per package
package diff status size ancestor threshold
datadog-agent-x86_64-rpm 0.02MB ⚠️ 783.24MB 783.22MB 0.50MB
datadog-agent-x86_64-suse 0.02MB ⚠️ 783.24MB 783.22MB 0.50MB
datadog-agent-amd64-deb 0.02MB ⚠️ 774.29MB 774.27MB 0.50MB
datadog-agent-arm64-deb 0.02MB ⚠️ 760.39MB 760.37MB 0.50MB
datadog-agent-aarch64-rpm 0.02MB ⚠️ 769.32MB 769.30MB 0.50MB
datadog-heroku-agent-amd64-deb 0.01MB ⚠️ 380.66MB 380.65MB 0.50MB
datadog-iot-agent-arm64-deb 0.01MB ⚠️ 59.38MB 59.37MB 0.50MB
datadog-iot-agent-aarch64-rpm 0.01MB ⚠️ 59.46MB 59.45MB 0.50MB
datadog-iot-agent-amd64-deb 0.00MB 62.80MB 62.80MB 0.50MB
datadog-iot-agent-x86_64-rpm 0.00MB 62.89MB 62.88MB 0.50MB
datadog-iot-agent-x86_64-suse 0.00MB 62.89MB 62.88MB 0.50MB
datadog-dogstatsd-amd64-deb 0.00MB 31.99MB 31.99MB 0.50MB
datadog-dogstatsd-x86_64-rpm 0.00MB 32.07MB 32.07MB 0.50MB
datadog-dogstatsd-x86_64-suse 0.00MB 32.07MB 32.07MB 0.50MB
datadog-dogstatsd-arm64-deb 0.00MB 30.47MB 30.47MB 0.50MB

Decision

⚠️ Warning

@agent-platform-auto-pr
Copy link
Contributor
agent-platform-auto-pr bot commented Apr 20, 2025

Test changes on VM

Use this command from test-infra-definitions to manually test this PR changes on a VM:

dda inv aws.create-vm --pipeline-id=63365132 --os-family=ubuntu

Note: This applies to commit 4ad7696

Copy link
cit-pr-commenter bot commented Apr 20, 2025

Regression Detector

Regression Detector Results

Metrics dashboard
Target profiles
Run ID: 244e83bd-ffbb-4b6e-b178-504306857a7d

Baseline: 5224e9d
Comparison: 2a67e17
Diff

Optimization Goals: ✅ No significant changes detected

Fine details of change detection per experiment

perf experiment goal Δ mean % Δ mean % CI trials links
quality_gate_logs % cpu utilization +0.16 [-2.59, +2.91] 1 Logs bounds checks dashboard
file_to_blackhole_1000ms_latency egress throughput +0.10 [-0.46, +0.67] 1 Logs
file_to_blackhole_0ms_latency_http1 egress throughput +0.10 [-0.51, +0.70] 1 Logs
file_to_blackhole_300ms_latency egress throughput +0.03 [-0.58, +0.64] 1 Logs
quality_gate_idle memory utilization +0.02 [-0.05, +0.08] 1 Logs bounds checks dashboard
tcp_dd_logs_filter_exclude ingress throughput +0.01 [-0.01, +0.02] 1 Logs
uds_dogstatsd_to_api ingress throughput -0.00 [-0.29, +0.29] 1 Logs
file_to_blackhole_0ms_latency_http2 egress throughput -0.04 [-0.62, +0.53] 1 Logs
file_to_blackhole_0ms_latency egress throughput -0.05 [-0.61, +0.51] 1 Logs
file_to_blackhole_500ms_latency egress throughput -0.06 [-0.65, +0.53] 1 Logs
file_to_blackhole_1000ms_latency_linear_load egress throughput -0.06 [-0.29, +0.17] 1 Logs
file_to_blackhole_100ms_latency egress throughput -0.08 [-0.70, +0.54] 1 Logs
ddot_logs memory utilization -0.27 [-0.36, -0.18] 1 Logs
docker_containers_memory memory utilization -0.27 [-0.36, -0.18] 1 Logs
ddot_metrics memory utilization -0.39 [-0.51, -0.27] 1 Logs
otlp_ingest_logs memory utilization -0.46 [-0.58, -0.33] 1 Logs
uds_dogstatsd_20mb_12k_contexts_20_senders memory utilization -0.67 [-0.74, -0.60] 1 Logs
docker_containers_cpu % cpu utilization -0.74 [-3.76, +2.29] 1 Logs
tcp_syslog_to_blackhole ingress throughput -0.80 [-0.86, -0.74] 1 Logs
quality_gate_idle_all_features memory utilization -0.89 [-1.03, -0.75] 1 Logs bounds checks dashboard
otlp_ingest_metrics memory utilization -1.37 [-1.54, -1.20] 1 Logs
file_tree memory utilization -1.61 [-1.81, -1.41] 1 Logs
uds_dogstatsd_to_api_cpu % cpu utilization -2.20 [-3.07, -1.34] 1 Logs

Bounds Checks: ❌ Failed

perf experiment bounds_check_name replicates_passed links
quality_gate_logs memory_usage 9/10 bounds checks dashboard
docker_containers_cpu simple_check_run 10/10
docker_containers_memory memory_usage 10/10
docker_containers_memory simple_check_run 10/10
file_to_blackhole_0ms_latency lost_bytes 10/10
file_to_blackhole_0ms_latency memory_usage 10/10
file_to_blackhole_0ms_latency_http1 lost_bytes 10/10
file_to_blackhole_0ms_latency_http1 memory_usage 10/10
file_to_blackhole_0ms_latency_http2 lost_bytes 10/10
file_to_blackhole_0ms_latency_http2 memory_usage 10/10
file_to_blackhole_1000ms_latency memory_usage 10/10
file_to_blackhole_1000ms_latency_linear_load memory_usage 10/10
file_to_blackhole_100ms_latency lost_bytes 10/10
file_to_blackhole_100ms_latency memory_usage 10/10
file_to_blackhole_300ms_latency lost_bytes 10/10
file_to_blackhole_300ms_latency memory_usage 10/10
file_to_blackhole_500ms_latency lost_bytes 10/10
file_to_blackhole_500ms_latency memory_usage 10/10
quality_gate_idle intake_connections 10/10 bounds checks dashboard
quality_gate_idle memory_usage 10/10 bounds checks dashboard
quality_gate_idle_all_features intake_connections 10/10 bounds checks dashboard
quality_gate_idle_all_features memory_usage 10/10 bounds checks dashboard
quality_gate_logs intake_connections 10/10 bounds checks dashboard
quality_gate_logs lost_bytes 10/10 bounds checks dashboard

Explanation

Confidence level: 90.00%
Effect size tolerance: |Δ mean %| ≥ 5.00%

Performance changes are noted in the perf column of each table:

  • ✅ = significantly better comparison variant performance
  • ❌ = significantly worse comparison variant performance
  • ➖ = no significant change in performance

A regression test is an A/B test of target performance in a repeatable rig, where "performance" is measured as "comparison variant minus baseline variant" for an optimization goal (e.g., ingress throughput). Due to intrinsic variability in measuring that goal, we can only estimate its mean value for each experiment; we report uncertainty in that value as a 90.00% confidence interval denoted "Δ mean % CI".

For each experiment, we decide whether a change in performance is a "regression" -- a change worth investigating further -- if all of the following criteria are true:

  1. Its estimated |Δ mean %| ≥ 5.00%, indicating the change is big enough to merit a closer look.

  2. Its 90.00% confidence interval "Δ mean % CI" does not contain zero, indicating that if our statistical model is accurate, there is at least a 90.00% chance there is a difference in performance between baseline and comparison variants.

  3. Its configuration does not mark it "erratic".

CI Pass/Fail Decision

Failed. Some Quality Gates were violated.

  • quality_gate_logs, bounds check intake_connections: 10/10 replicas passed. Gate passed.
  • quality_gate_logs, bounds check memory_usage: 9/10 replicas passed. Failed 1 which is > 0. Gate FAILED.
  • quality_gate_logs, bounds check lost_bytes: 10/10 replicas passed. Gate passed.
  • quality_gate_idle_all_features, bounds check intake_connections: 10/10 replicas passed. Gate passed.
  • quality_gate_idle_all_features, bounds check memory_usage: 10/10 replicas passed. Gate passed.
  • quality_gate_idle, bounds check memory_usage: 10/10 replicas passed. Gate passed.
  • quality_gate_idle, bounds check intake_connections: 10/10 replicas passed. Gate passed.

@akarpz akarpz added changelog/no-changelog qa/done QA done before merge and regressions are covered by tests labels Apr 21, 2025
@akarpz akarpz requested review from a team as code owners May 29, 2025 14:45
@akarpz akarpz requested review from AyyLam and dustmop May 29, 2025 14:45
// Add connections below threshold
ns.Lock()
for i := 0; uint32(i) < capacityThreshold-1; i++ { // Add 8 connections
ns.storeClosedConnection(createTestConnectionStats(StatCookie(i)))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use networkState.StoreClosedConnection in the tests which acquires the lock.

@akarpz akarpz removed the request for review from AyyLam May 29, 2025 23:04
Comment on lines 109 to 122
8000
isNearCapacity, err := nt.tracer.IsClosedConnectionsNearCapacity(clientID)
if err != nil {
log.Errorf("Error checking connection capacity for client %s: %v", clientID, err)
w.WriteHeader(http.StatusInternalServerError)
return
}

if isNearCapacity {
log.Debugf("Responding to capacity check request for client %s: Near capacity (200 OK)", clientID)
w.WriteHeader(http.StatusOK)
} else {
log.Tracef("Responding to capacity check request for client %s: Not near capacity (204 No Content)", clientID)
w.WriteHeader(http.StatusNoContent)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't you want to measure "how close" are we for the limit?
it might help with finding bugs, or fine-tuning the terminology of "near capacity"

@@ -205,6 +205,9 @@ func setupProcesses(config pkgconfigmodel.Setup) {

procBindEnvAndSetDefault(config, "process_config.language_detection.grpc_port", DefaultProcessEntityStreamPort)

// Connection Capacity Check configuration
procBindEnvAndSetDefault(config, "process_config.connections_capacity_check_interval", 30*time.Second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we check the capacity every 30s? not 10s?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the default is 30 so we can merge this PR without it changing the current behavior, and override this config with the chart to apply the change selectively

TCPFailedConnectionsEnabled: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_tcp_failed_connections")),
MaxTrackedConnections: uint32(cfg.GetInt64(sysconfig.FullKeyPath(spNS, "max_tracked_connections"))),
MaxClosedConnectionsBuffered: uint32(cfg.GetInt64(sysconfig.FullKeyPath(spNS, "max_closed_connections_buffered"))),
ClosedConnectionsBufferThresholdRatio: cfg.GetFloat64(sysconfig.FullKeyPath(spNS, "closed_connections_buffer_threshold_ratio")),
Copy link
Contributor
@guyarb guyarb May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is closed_connections_buffer_threshold_ratio defined?
also why do you use spNS? I do think it is polluting the global namespace of system-probe and incorrectly can be assumed to be affecting system-probe and not only cnm

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point, I will change this to use the netNS. I'm not sure what you mean in the first question

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for the first question: you should have declared the configuration option in setup/system-probe.go like just max_traced_connections - cfg.BindEnvAndSetDefault(join(spNS, "max_tracked_connections"), 65536) (with or without a default value)

Comment on lines 589 to 592
if !client.closedConnectionsNearCapacity.Load() {
log.Warnf("Closed connections buffer for client %s is nearing capacity (%d/%d, threshold: %.2f%%). Setting flag.", clientID, len(client.closed.conns), ns.maxClosedConns, ns.closedConnectionsBufferThresholdRatio*100)
client.closedConnectionsNearCapacity.Store(true)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't it a racy code? two instances can reach to line 589 at the same time

Copy link
Contributor
@guyarb guyarb May 30, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's not a racy code as I believe we use mutex in one of the callers, than why do you need an atomic?
Also, where do you set it to false?
And IIRC, you should be used CompareAndSwap which preform the swapping (if the condition met) and returns the old value in an atomic way.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code only called when the lock for the state is held, so likely no need to for atomics.

// Check if the buffer is near capacity using the configured threshold
if float64(len(client.closed.conns)) >= float64(ns.maxClosedConns)*ns.closedConnectionsBufferThresholdRatio {
if !client.closedConnectionsNearCapacity.Load() {
log.Warnf("Closed connections buffer for client %s is nearing capacity (%d/%d, threshold: %.2f%%). Setting flag.", clientID, len(client.closed.conns), ns.maxClosedConns, ns.closedConnectionsBufferThresholdRatio*100)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Have you checked this log isn't spamming during a high/moderate load?

Comment on lines 86 to 96
ratioPath := spNS("closed_connections_buffer_threshold_ratio")
if !cfg.IsConfigured(ratioPath) {
cfg.Set(ratioPath, defaultClosedConnectionsBufferThresholdRatioSystemProbe, model.SourceDefault)
} else {
ratio := cfg.GetFloat64(ratioPath)
if ratio <= 0 || ratio > 1.0 {
log.Warnf("Invalid value %f for %s, using default %f. Value must be > 0 and <= 1.0.", ratio, ratioPath, defaultClosedConnectionsBufferThresholdRatioSystemProbe)
cfg.Set(ratioPath, defaultClosedConnectionsBufferThresholdRatioSystemProbe, model.SourceAgentRuntime)
}
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do recommend on adding UTs to check this logic.

// Otherwise (e.g. not configured, or derived value is invalid), defaultInterval is returned.
func (l *CheckRunner) getConnectionsCheckInterval(checkName string, defaultInterval time.Duration) time.Duration {
const minAllowedInterval = 10 * time.Second
const maxAllowedInterval = 30 * time.Second
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the max must match the configuration value of connection check.
We can set the connections check to report every 1m if we want, but the current code ignore that and forces us to use 30s or lower).

Also, the minimum interval can cause a violation as well, as if we set a connection check interval to 10s (we have that in a single customer due to high load), then we have 2 issues -

  1. Race between the new endpoint and the connections check endpoint (cc: see my comment in the RFC about reusing the same endpoint)
  2. you hard-coded prevent the connection check to be less than 10s

// If both are true, the capacity interval is returned.
// If explicitly configured by the user but fails validation, a specific warning is logged and defaultInterval is returned.
// Otherwise (e.g. not configured, or derived value is invalid), defaultInterval is returned.
func (l *CheckRunner) getConnectionsCheckInterval(checkName string, defaultInterval time.Duration) time.Duration {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we have UTs for this?

return false, fmt.Errorf("error creating capacity check request %s: %w", url, err)
}

log.Tracef("Checking connections capacity endpoint: %s", url)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps you should use shouldLog as if the log level is not trace (99% of the time), then this is a redundant cpu consumption

Copy link
Contributor
@guyarb guyarb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Blocking the PR on:

  1. comment regarding breaking today's status and possible race conditions
  2. The code is enabled even when USM is enabled.

Comment on lines 90 to 94
ratio := cfg.GetFloat64(ratioPath)
if ratio <= 0 || ratio > 1.0 {
log.Warnf("Invalid value %f for %s, using default %f. Value must be > 0 and <= 1.0.", ratio, ratioPath, defaultClosedConnectionsBufferThresholdRatioSystemProbe)
cfg.Set(ratioPath, defaultClosedConnectionsBufferThresholdRatioSystemProbe, model.SourceAgentRuntime)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

suggestion: create validateFloat64 which mirrors the other validate* functions in adjust.go and use that.

Copy link
Contributor
@hmahmood hmahmood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How the two new configs interact with each and the connection interval is not clear to me. Also what happens if one is set and the other is not? It would be great if we can figure out one config and define some sane defaults.

// IsClosedConnectionsNearCapacity checks if the closed connections buffer is near capacity for a specific client.
// It returns true if near capacity, false otherwise, and an error if the state is not initialized.
func (t *Tracer) IsClosedConnectionsNearCapacity(clientID string) (bool, error) {
if t.state == nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How is this possible?

TCPFailedConnectionsEnabled: cfg.GetBool(sysconfig.FullKeyPath(netNS, "enable_tcp_failed_connections")),
MaxTrackedConnections: uint32(cfg.GetInt64(sysconfig.FullKeyPath(spNS, "max_tracked_connections"))),
MaxClosedConnectionsBuffered: uint32(cfg.GetInt64(sysconfig.FullKeyPath(spNS, "max_closed_connections_buffered"))),
ClosedConnectionsBufferThresholdRatio: cfg.GetFloat64(sysconfig.FullKeyPath(spNS, "closed_connections_buffer_threshold_ratio")),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the default here 1.0?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does it change if we decrease the check interval from the default of 30s? Do we have to set both configs?

Comment on lines 589 to 592
if !client.closedConnectionsNearCapacity.Load() {
log.Warnf("Closed connections buffer for client %s is nearing capacity (%d/%d, threshold: %.2f%%). Setting flag.", clientID, len(client.closed.conns), ns.maxClosedConns, ns.closedConnectionsBufferThresholdRatio*100)
client.closedConnectionsNearCapacity.Store(true)
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this code only called when the lock for the state is held, so likely no need to for atomics.

maxOffsetThreshold = 3000
defaultMaxProcessesTracked = 1024
defaultMaxTrackedConnections = 65536
defaultClosedConnectionsBufferThresholdRatioSystemProbe = 0.75
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this 0.75? Don't we want this to be 1.0 to keep the status quo?

Copy link
Contributor
@dustmop dustmop left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for agent-configuration owned files

@akarpz akarpz requested a review from a team as a code owner June 9, 2025 15:06
@robertjli robertjli removed the request for review from a team June 12, 2025 21:25
@@ -205,6 +205,9 @@ func setupProcesses(config pkgconfigmodel.Setup) {

procBindEnvAndSetDefault(config, "process_config.language_detection.grpc_port", DefaultProcessEntityStreamPort)

// Connect 10000 ion Capacity Check configuration
procBindEnvAndSetDefault(config, "process_config.connections_capacity_check_interval", 30*time.Second)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we move this to a child of process_config, like process_config.connections or similar?

@@ -368,7 +368,12 @@ func (l *CheckRunner) basicRunner(c checks.Check) func() {
l.runCheck(c)
}

ticker := time.NewTicker(checks.GetInterval(l.config, c.Name()))
tickerInterval := checks.GetInterval(l.config, c.Name())
if c.Name() == checks.ConnectionsCheckName {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you move this logic into GetInterval()? I see there's other check-specific logic in there already

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
changelog/no-changelog component/system-probe long review PR is complex, plan time to review it qa/done QA done before merge and regressions are covered by tests team/networks
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants
0