8000 feat(anta.tests): Added support for NTP pool in VerifyNTPAssociations test by vitthalmagadum · Pull Request #1068 · aristanetworks/anta · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(anta.tests): Added support for NTP pool in VerifyNTPAssociations test #1068

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,4 @@ def snmp_v3_prefix(auth_type: Literal["auth", "priv", "noauth"]) -> str:
AfterValidator(update_bgp_redistributed_proto_user),
]
RedistributedAfiSafi = Annotated[Literal["v4u", "v4m", "v6u", "v6m"], BeforeValidator(bgp_redistributed_route_proto_abbreviations)]
NTPStratumLevel = Annotated[int, Field(ge=0, le=16)]
16 changes: 13 additions & 3 deletions anta/input_models/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@

from ipaddress import IPv4Address

from pydantic import BaseModel, ConfigDict, Field
from pydantic import BaseModel, ConfigDict

from anta.custom_types import Hostname
from anta.custom_types import Hostname, NTPStratumLevel


class NTPServer(BaseModel):
Expand All @@ -22,10 +22,20 @@ class NTPServer(BaseModel):
For example, 'ntp.example.com' in the configuration might resolve to 'ntp3.example.com' in the device output."""
preferred: bool = False
"""Optional preferred for NTP server. If not provided, it defaults to `False`."""
stratum: int = Field(ge=0, le=16)
stratum: NTPStratumLevel
"""NTP stratum level (0 to 15) where 0 is the reference clock and 16 indicates unsynchronized.
Values should be between 0 and 15 for valid synchronization and 16 represents an out-of-sync state."""

def __str__(self) -> str:
"""Representation of the NTPServer model."""
return f"NTP Server: {self.server_address} Preferred: {self.preferred} Stratum: {self.stratum}"


class NTPPool(BaseModel):
"""Model for a NTP server pool."""

model_config = ConfigDict(extra="forbid")
server_addresses: list[Hostname | IPv4Address]
"""The list of NTP server addresses as an IPv4 addresses or hostnames."""
preferred_stratum_range: list[NTPStratumLevel]
"""Preferred NTP stratum range for the NTP server pool. If the expected stratum range is 1 to 3 then preferred_stratum_range should be `[1,3]`."""
136 changes: 110 additions & 26 deletions anta/tests/system.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,26 @@
from __future__ import annotations

import re
from typing import TYPE_CHECKING, ClassVar
from typing import TYPE_CHECKING, Any, ClassVar

from anta.custom_types import PositiveInteger
from anta.input_models.system import NTPServer
from pydantic import model_validator

from anta.custom_types import Hostname, PositiveInteger
from anta.input_models.system import NTPPool, NTPServer
from anta.models import AntaCommand, AntaTest
from anta.tools import get_value

if TYPE_CHECKING:
import sys
from ipaddress import IPv4Address

from anta.models import AntaTemplate

if sys.version_info >= (3, 11):
from typing import Self
else:
from typing_extensions import Self

CPU_IDLE_THRESHOLD = 25
MEMORY_THRESHOLD = 0.25
DISK_SPACE_THRESHOLD = 75
Expand Down Expand Up @@ -284,12 +294,21 @@ def test(self) -> None:
class VerifyNTPAssociations(AntaTest):
"""Verifies the Network Time Protocol (NTP) associations.

This test performs the following checks:

1. For the NTP servers:
- The primary NTP server (marked as preferred) has the condition 'sys.peer'.
- All other NTP servers have the condition 'candidate'.
- All the NTP servers have the expected stratum level.
2. For the NTP servers pool:
- All the NTP servers belong to the specified NTP pool.
- All the NTP servers have valid condition (sys.peer | candidate).
- All the NTP servers have the stratum level within the specified startum level.

Expected Results
----------------
* Success: The test will pass if the Primary NTP server (marked as preferred) has the condition 'sys.peer' and
all other NTP servers have the condition 'candidate'.
* Failure: The test will fail if the Primary NTP server (marked as preferred) does not have the condition 'sys.peer' or
if any other NTP server does not have the condition 'candidate'.
* Success: The test will pass if all the NTP servers meet the expected state.
10000 * Failure: The test will fail if any of the NTP server does not meet the expected state.

Examples
--------
Expand All @@ -313,10 +332,79 @@ class VerifyNTPAssociations(AntaTest):
class Input(AntaTest.Input):
"""Input model for the VerifyNTPAssociations test."""

ntp_servers: list[NTPServer]
ntp_servers: list[NTPServer] | None = None
"""List of NTP servers."""
ntp_pool: NTPPool | None = None
"""NTP servers pool."""
NTPServer: ClassVar[type[NTPServer]] = NTPServer

@model_validator(mode="after")
def validate_inputs(self) -> Self:
"""Validate the inputs provided to the VerifyNTPAssociations test.

Either `ntp_servers` or `ntp_pool` can be provided at the same time.
"""
if not self.ntp_servers and not self.ntp_pool:
msg = "'ntp_servers' or 'ntp_pool' must be provided"
raise ValueError(msg)
if self.ntp_servers and self.ntp_pool:
msg = "Either 'ntp_servers' or 'ntp_pool' can be provided at the same time"
raise ValueError(msg)

# Verifies the len of preferred_stratum_range in NTP Pool should be 2 as this is the range.
stratum_range = 2
if self.ntp_pool and len(self.ntp_pool.preferred_stratum_range) > stratum_range:
msg = "'preferred_stratum_range' list should have at most 2 items"
raise ValueError(msg)
return self

def _validate_ntp_server(self, ntp_server: NTPServer, peers: dict[str, Any]) -> list[str]:
"""Validate the NTP server, condition and stratum level."""
failure_msgs: list[str] = []
server_address = str(ntp_server.server_address)

# We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input.
matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None)

if not matching_peer:
failure_msgs.append(f"{ntp_server} - Not configured")
return failure_msgs

# Collecting the expected/actual NTP peer details.
exp_condition = "sys.peer" if ntp_server.preferred else "candidate"
exp_stratum = ntp_server.stratum
act_condition = get_value(peers[matching_peer], "condition")
act_stratum = get_value(peers[matching_peer], "stratumLevel")

if act_condition != exp_condition:
failure_msgs.append(f"{ntp_server} - Incorrect condition - Expected: {exp_condition} Actual: {act_condition}")

if act_stratum != exp_stratum:
failure_msgs.append(f"{ntp_server} - Incorrect stratum level - Expected: {exp_stratum} Actual: {act_stratum}")

return failure_msgs

def _validate_ntp_pool(self, server_addresses: list[Hostname | IPv4Address], peer: str, stratum_range: list[int], peer_details: dict[str, Any]) -> list[str]:
"""Validate the NTP server pool, condition and stratum level."""
failure_msgs: list[str] = []

# We check `peerIpAddr` and `peer` in the peer details - covering server_addresses input
if (peer_ip := peer_details["peerIpAddr"]) not in server_addresses and peer not in server_addresses:
failure_msgs.append(f"NTP Server: {peer_ip} Hostname: {peer} - Associated but not part of the provided NTP pool")
return failure_msgs

act_condition = get_value(peer_details, "condition")
act_stratum = get_value(peer_details, "stratumLevel")

if act_condition not in ["sys.peer", "candidate"]:
failure_msgs.append(f"NTP Server: {peer_ip} Hostname: {peer} - Incorrect condition - Expected: sys.peer, candidate Actual: {act_condition}")

if int(act_stratum) not in range(stratum_range[0], stratum_range[1] + 1):
msg = f"Expected Stratum Range: {stratum_range[0]} to {stratum_range[1]} Actual: {act_stratum}"
failure_msgs.append(f"NTP Server: {peer_ip} Hostname: {peer} - Incorrect stratum level - {msg}")

return failure_msgs

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyNTPAssociations."""
Expand All @@ -326,25 +414,21 @@ def test(self) -> None:
self.result.is_failure("No NTP peers configured")
return

# Iterate over each NTP server.
for ntp_server in self.inputs.ntp_servers:
server_address = str(ntp_server.server_address)

# We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input.
matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None)

if not matching_peer:
self.result.is_failure(f"{ntp_server} - Not configured")
continue

# Collecting the expected/actual NTP peer details.
exp_condition = "sys.peer" if ntp_server.preferred else "candidate"
exp_stratum = ntp_server.stratum
act_condition = get_value(peers[matching_peer], "condition")
act_stratum = get_value(peers[matching_peer], "stratumLevel")
if self.inputs.ntp_servers:
# Iterate over each NTP server.
for ntp_server in self.inputs.ntp_servers:
failure_msgs = self._validate_ntp_server(ntp_server, peers)
for msg in failure_msgs:
self.result.is_failure(msg)
return

if act_condition != exp_condition or act_stratum != exp_stratum:
self.result.is_failure(f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}")
# Verifies the NTP pool details
server_addresses = self.inputs.ntp_pool.server_addresses
exp_stratum_range = self.inputs.ntp_pool.preferred_stratum_range
for peer, peer_details in peers.items():
failure_msgs = self._validate_ntp_pool(server_addresses, peer, exp_stratum_range, peer_details)
for msg in failure_msgs:
self.result.is_failure(msg)


class VerifyMaintenance(AntaTest):
Expand Down
Loading
0