diff --git a/anta/custom_types.py b/anta/custom_types.py index aa36d2fc7..4ec33ba9b 100644 --- a/anta/custom_types.py +++ b/anta/custom_types.py @@ -394,3 +394,4 @@ def snmp_v3_prefix(auth_type: Literal["auth", "priv", "noauth"]) -> str: AfterValidator(update_bgp_redistributed_proto_user), ] RedistributedAfiSafi = Annotated[Literal["v4u", "v4m", "v6u", "v6m"], BeforeValidator(bgp_redistributed_route_proto_abbreviations)] +NTPStratumLevel = Annotated[int, Field(ge=0, le=16)] diff --git a/anta/input_models/system.py b/anta/input_models/system.py index 1771c1a63..3e098c4ec 100644 --- a/anta/input_models/system.py +++ b/anta/input_models/system.py @@ -7,9 +7,9 @@ from ipaddress import IPv4Address -from pydantic import BaseModel, ConfigDict, Field +from pydantic import BaseModel, ConfigDict -from anta.custom_types import Hostname +from anta.custom_types import Hostname, NTPStratumLevel class NTPServer(BaseModel): @@ -22,10 +22,20 @@ class NTPServer(BaseModel): For example, 'ntp.example.com' in the configuration might resolve to 'ntp3.example.com' in the device output.""" preferred: bool = False """Optional preferred for NTP server. If not provided, it defaults to `False`.""" - stratum: int = Field(ge=0, le=16) + stratum: NTPStratumLevel """NTP stratum level (0 to 15) where 0 is the reference clock and 16 indicates unsynchronized. Values should be between 0 and 15 for valid synchronization and 16 represents an out-of-sync state.""" def __str__(self) -> str: """Representation of the NTPServer model.""" return f"NTP Server: {self.server_address} Preferred: {self.preferred} Stratum: {self.stratum}" + + +class NTPPool(BaseModel): + """Model for a NTP server pool.""" + + model_config = ConfigDict(extra="forbid") + server_addresses: list[Hostname | IPv4Address] + """The list of NTP server addresses as an IPv4 addresses or hostnames.""" + preferred_stratum_range: list[NTPStratumLevel] + """Preferred NTP stratum range for the NTP server pool. If the expected stratum range is 1 to 3 then preferred_stratum_range should be `[1,3]`.""" diff --git a/anta/tests/system.py b/anta/tests/system.py index 5da3688cb..b35f4d45b 100644 --- a/anta/tests/system.py +++ b/anta/tests/system.py @@ -8,16 +8,26 @@ from __future__ import annotations import re -from typing import TYPE_CHECKING, ClassVar +from typing import TYPE_CHECKING, Any, ClassVar -from anta.custom_types import PositiveInteger -from anta.input_models.system import NTPServer +from pydantic import model_validator + +from anta.custom_types import Hostname, PositiveInteger +from anta.input_models.system import NTPPool, NTPServer from anta.models import AntaCommand, AntaTest from anta.tools import get_value if TYPE_CHECKING: + import sys + from ipaddress import IPv4Address + from anta.models import AntaTemplate + if sys.version_info >= (3, 11): + from typing import Self + else: + from typing_extensions import Self + CPU_IDLE_THRESHOLD = 25 MEMORY_THRESHOLD = 0.25 DISK_SPACE_THRESHOLD = 75 @@ -284,12 +294,21 @@ def test(self) -> None: class VerifyNTPAssociations(AntaTest): """Verifies the Network Time Protocol (NTP) associations. + This test performs the following checks: + + 1. For the NTP servers: + - The primary NTP server (marked as preferred) has the condition 'sys.peer'. + - All other NTP servers have the condition 'candidate'. + - All the NTP servers have the expected stratum level. + 2. For the NTP servers pool: + - All the NTP servers belong to the specified NTP pool. + - All the NTP servers have valid condition (sys.peer | candidate). + - All the NTP servers have the stratum level within the specified startum level. + Expected Results ---------------- - * Success: The test will pass if the Primary NTP server (marked as preferred) has the condition 'sys.peer' and - all other NTP servers have the condition 'candidate'. - * Failure: The test will fail if the Primary NTP server (marked as preferred) does not have the condition 'sys.peer' or - if any other NTP server does not have the condition 'candidate'. + * Success: The test will pass if all the NTP servers meet the expected state. + * Failure: The test will fail if any of the NTP server does not meet the expected state. Examples -------- @@ -313,10 +332,79 @@ class VerifyNTPAssociations(AntaTest): class Input(AntaTest.Input): """Input model for the VerifyNTPAssociations test.""" - ntp_servers: list[NTPServer] + ntp_servers: list[NTPServer] | None = None """List of NTP servers.""" + ntp_pool: NTPPool | None = None + """NTP servers pool.""" NTPServer: ClassVar[type[NTPServer]] = NTPServer + @model_validator(mode="after") + def validate_inputs(self) -> Self: + """Validate the inputs provided to the VerifyNTPAssociations test. + + Either `ntp_servers` or `ntp_pool` can be provided at the same time. + """ + if not self.ntp_servers and not self.ntp_pool: + msg = "'ntp_servers' or 'ntp_pool' must be provided" + raise ValueError(msg) + if self.ntp_servers and self.ntp_pool: + msg = "Either 'ntp_servers' or 'ntp_pool' can be provided at the same time" + raise ValueError(msg) + + # Verifies the len of preferred_stratum_range in NTP Pool should be 2 as this is the range. + stratum_range = 2 + if self.ntp_pool and len(self.ntp_pool.preferred_stratum_range) > stratum_range: + msg = "'preferred_stratum_range' list should have at most 2 items" + raise ValueError(msg) + return self + + def _validate_ntp_server(self, ntp_server: NTPServer, peers: dict[str, Any]) -> list[str]: + """Validate the NTP server, condition and stratum level.""" + failure_msgs: list[str] = [] + server_address = str(ntp_server.server_address) + + # We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input. + matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None) + + if not matching_peer: + failure_msgs.append(f"{ntp_server} - Not configured") + return failure_msgs + + # Collecting the expected/actual NTP peer details. + exp_condition = "sys.peer" if ntp_server.preferred else "candidate" + exp_stratum = ntp_server.stratum + act_condition = get_value(peers[matching_peer], "condition") + act_stratum = get_value(peers[matching_peer], "stratumLevel") + + if act_condition != exp_condition: + failure_msgs.append(f"{ntp_server} - Incorrect condition - Expected: {exp_condition} Actual: {act_condition}") + + if act_stratum != exp_stratum: + failure_msgs.append(f"{ntp_server} - Incorrect stratum level - Expected: {exp_stratum} Actual: {act_stratum}") + + return failure_msgs + + def _validate_ntp_pool(self, server_addresses: list[Hostname | IPv4Address], peer: str, stratum_range: list[int], peer_details: dict[str, Any]) -> list[str]: + """Validate the NTP server pool, condition and stratum level.""" + failure_msgs: list[str] = [] + + # We check `peerIpAddr` and `peer` in the peer details - covering server_addresses input + if (peer_ip := peer_details["peerIpAddr"]) not in server_addresses and peer not in server_addresses: + failure_msgs.append(f"NTP Server: {peer_ip} Hostname: {peer} - Associated but not part of the provided NTP pool") + return failure_msgs + + act_condition = get_value(peer_details, "condition") + act_stratum = get_value(peer_details, "stratumLevel") + + if act_condition not in ["sys.peer", "candidate"]: + failure_msgs.append(f"NTP Server: {peer_ip} Hostname: {peer} - Incorrect condition - Expected: sys.peer, candidate Actual: {act_condition}") + + if int(act_stratum) not in range(stratum_range[0], stratum_range[1] + 1): + msg = f"Expected Stratum Range: {stratum_range[0]} to {stratum_range[1]} Actual: {act_stratum}" + failure_msgs.append(f"NTP Server: {peer_ip} Hostname: {peer} - Incorrect stratum level - {msg}") + + return failure_msgs + @AntaTest.anta_test def test(self) -> None: """Main test function for VerifyNTPAssociations.""" @@ -326,25 +414,21 @@ def test(self) -> None: self.result.is_failure("No NTP peers configured") return - # Iterate over each NTP server. - for ntp_server in self.inputs.ntp_servers: - server_address = str(ntp_server.server_address) - - # We check `peerIpAddr` in the peer details - covering IPv4Address input, or the peer key - covering Hostname input. - matching_peer = next((peer for peer, peer_details in peers.items() if (server_address in {peer_details["peerIpAddr"], peer})), None) - - if not matching_peer: - self.result.is_failure(f"{ntp_server} - Not configured") - continue - - # Collecting the expected/actual NTP peer details. - exp_condition = "sys.peer" if ntp_server.preferred else "candidate" - exp_stratum = ntp_server.stratum - act_condition = get_value(peers[matching_peer], "condition") - act_stratum = get_value(peers[matching_peer], "stratumLevel") + if self.inputs.ntp_servers: + # Iterate over each NTP server. + for ntp_server in self.inputs.ntp_servers: + failure_msgs = self._validate_ntp_server(ntp_server, peers) + for msg in failure_msgs: + self.result.is_failure(msg) + return - if act_condition != exp_condition or act_stratum != exp_stratum: - self.result.is_failure(f"{ntp_server} - Bad association - Condition: {act_condition}, Stratum: {act_stratum}") + # Verifies the NTP pool details + server_addresses = self.inputs.ntp_pool.server_addresses + exp_stratum_range = self.inputs.ntp_pool.preferred_stratum_range + for peer, peer_details in peers.items(): + failure_msgs = self._validate_ntp_pool(server_addresses, peer, exp_stratum_range, peer_details) + for msg in failure_msgs: + self.result.is_failure(msg) class VerifyMaintenance(AntaTest): diff --git a/tests/units/anta_tests/test_system.py b/tests/units/anta_tests/test_system.py index eb7c584fd..052202923 100644 --- a/tests/units/anta_tests/test_system.py +++ b/tests/units/anta_tests/test_system.py @@ -347,6 +347,65 @@ }, "expected": {"result": "success"}, }, + { + "name": "success-ntp-pool-as-input", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "1.1.1.1": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "2.2.2.2": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "3.3.3.3": { + "condition": "candidate", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 2, + }, + } + } + ], + "inputs": {"ntp_pool": {"server_addresses": ["1.1.1.1", "2.2.2.2", "3.3.3.3"], "preferred_stratum_range": [1, 2]}}, + "expected": {"result": "success"}, + }, + { + "name": "success-ntp-pool-hostname", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "itsys-ntp010p.aristanetworks.com": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "itsys-ntp011p.aristanetworks.com": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "itsys-ntp012p.aristanetworks.com": { + "condition": "candidate", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 2, + }, + } + } + ], + "inputs": { + "ntp_pool": { + "server_addresses": ["itsys-ntp010p.aristanetworks.com", "itsys-ntp011p.aristanetworks.com", "itsys-ntp012p.aristanetworks.com"], + "preferred_stratum_range": [1, 2], + } + }, + "expected": {"result": "success"}, + }, { "name": "success-ip-dns", "test": VerifyNTPAssociations, @@ -381,7 +440,7 @@ "expected": {"result": "success"}, }, { - "name": "failure", + "name": "failure-ntp-server", "test": VerifyNTPAssociations, "eos_data": [ { @@ -414,9 +473,11 @@ "expected": { "result": "failure", "messages": [ - "NTP Server: 1.1.1.1 Preferred: True Stratum: 1 - Bad association - Condition: candidate, Stratum: 2", - "NTP Server: 2.2.2.2 Preferred: False Stratum: 2 - Bad association - Condition: sys.peer, Stratum: 2", - "NTP Server: 3.3.3.3 Preferred: False Stratum: 2 - Bad association - Condition: sys.peer, Stratum: 3", + "NTP Server: 1.1.1.1 Preferred: True Stratum: 1 - Incorrect condition - Expected: sys.peer Actual: candidate", + "NTP Server: 1.1.1.1 Preferred: True Stratum: 1 - Incorrect stratum level - Expected: 1 Actual: 2", + "NTP Server: 2.2.2.2 Preferred: False Stratum: 2 - Incorrect condition - Expected: candidate Actual: sys.peer", + "NTP Server: 3.3.3.3 Preferred: False Stratum: 2 - Incorrect condition - Expected: candidate Actual: sys.peer", + "NTP Server: 3.3.3.3 Preferred: False Stratum: 2 - Incorrect stratum level - Expected: 2 Actual: 3", ], }, }, @@ -491,12 +552,110 @@ "expected": { "result": "failure", "messages": [ - "NTP Server: 1.1.1.1 Preferred: True Stratum: 1 - Bad association - Condition: candidate, Stratum: 1", + "NTP Server: 1.1.1.1 Preferred: True Stratum: 1 - Incorrect condition - Expected: sys.peer Actual: candidate", "NTP Server: 2.2.2.2 Preferred: False Stratum: 1 - Not configured", "NTP Server: 3.3.3.3 Preferred: False Stratum: 1 - Not configured", ], }, }, + { + "name": "failure-ntp-pool-as-input", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "ntp1.pool": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "ntp2.pool": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "ntp3.pool": { + "condition": "candidate", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 2, + }, + } + } + ], + "inputs": {"ntp_pool": {"server_addresses": ["1.1.1.1", "2.2.2.2"], "preferred_stratum_range": [1, 2]}}, + "expected": { + "result": "failure", + "messages": ["NTP Server: 3.3.3.3 Hostname: ntp3.pool - Associated but not part of the provided NTP pool"], + }, + }, + { + "name": "failure-ntp-pool-as-input-bad-association", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "ntp1.pool": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 1, + }, + "ntp2.pool": { + "condition": "candidate", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 2, + }, + "ntp3.pool": { + "condition": "reject", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 3, + }, + } + } + ], + "inputs": {"ntp_pool": {"server_addresses": ["1.1.1.1", "2.2.2.2", "3.3.3.3"], "preferred_stratum_range": [1, 2]}}, + "expected": { + "result": "failure", + "messages": [ + "NTP Server: 3.3.3.3 Hostname: ntp3.pool - Incorrect condition - Expected: sys.peer, candidate Actual: reject", + "NTP Server: 3.3.3.3 Hostname: ntp3.pool - Incorrect stratum level - Expected Stratum Range: 1 to 2 Actual: 3", + ], + }, + }, + { + "name": "failure-ntp-pool-hostname", + "test": VerifyNTPAssociations, + "eos_data": [ + { + "peers": { + "itsys-ntp010p.aristanetworks.com": { + "condition": "sys.peer", + "peerIpAddr": "1.1.1.1", + "stratumLevel": 5, + }, + "itsys-ntp011p.aristanetworks.com": { + "condition": "reject", + "peerIpAddr": "2.2.2.2", + "stratumLevel": 4, + }, + "itsys-ntp012p.aristanetworks.com": { + "condition": "candidate", + "peerIpAddr": "3.3.3.3", + "stratumLevel": 2, + }, + } + } + ], + "inputs": {"ntp_pool": {"server_addresses": ["itsys-ntp010p.aristanetworks.com", "itsys-ntp011p.aristanetworks.com"], "preferred_stratum_range": [1, 2]}}, + "expected": { + "result": "failure", + "messages": [ + "NTP Server: 1.1.1.1 Hostname: itsys-ntp010p.aristanetworks.com - Incorrect stratum level - Expected Stratum Range: 1 to 2 Actual: 5", + "NTP Server: 2.2.2.2 Hostname: itsys-ntp011p.aristanetworks.com - Incorrect condition - Expected: sys.peer, candidate Actual: reject", + "NTP Server: 2.2.2.2 Hostname: itsys-ntp011p.aristanetworks.com - Incorrect stratum level - Expected Stratum Range: 1 to 2 Actual: 4", + "NTP Server: 3.3.3.3 Hostname: itsys-ntp012p.aristanetworks.com - Associated but not part of the provided NTP pool", + ], + }, + }, { "name": "success-no-maintenance-configured", "test": VerifyMaintenance, diff --git a/tests/units/input_models/test_system.py b/tests/units/input_models/test_system.py new file mode 100644 index 000000000..1d2d86f6d --- /dev/null +++ b/tests/units/input_models/test_system.py @@ -0,0 +1,48 @@ +# Copyright (c) 2023-2025 Arista Networks, Inc. +# Use of this source code is governed by the Apache License 2.0 +# that can be found in the LICENSE file. +"""Tests for anta.input_models.system.py.""" + +from __future__ import annotations + +from typing import TYPE_CHECKING + +import pytest +from pydantic import ValidationError + +from anta.tests.system import VerifyNTPAssociations + +if TYPE_CHECKING: + from anta.input_models.system import NTPPool, NTPServer + + +class TestVerifyNTPAssociationsInput: + """Test anta.tests.system.VerifyNTPAssociations.Input.""" + + @pytest.mark.parametrize( + ("ntp_servers", "ntp_pool"), + [ + pytest.param([{"server_address": "1.1.1.1", "preferred": True, "stratum": 1}], None, id="valid-ntp-server"), + pytest.param(None, {"server_addresses": ["1.1.1.1"], "preferred_stratum_range": [1, 3]}, id="valid-ntp-pool"), + ], + ) + def test_valid(self, ntp_servers: list[NTPServer], ntp_pool: NTPPool) -> None: + """Test VerifyNTPAssociations.Input valid inputs.""" + VerifyNTPAssociations.Input(ntp_servers=ntp_servers, ntp_pool=ntp_pool) + + @pytest.mark.parametrize( + ("ntp_servers", "ntp_pool"), + [ + pytest.param( + [{"server_address": "1.1.1.1", "preferred": True, "stratum": 1}], + {"server_addresses": ["1.1.1.1"], "preferred_stratum_range": [1, 3]}, + id="invalid-both-server-pool", + ), + pytest.param(None, {"server_addresses": ["1.1.1.1"], "preferred_stratum_range": [1, 3, 6]}, id="invalid-ntp-pool-stratum"), + pytest.param(None, None, id="invalid-both-none"), + ], + ) + def test_invalid(self, ntp_servers: list[NTPServer], ntp_pool: NTPPool) -> None: + """Test VerifyNTPAssociations.Input invalid inputs.""" + with pytest.raises(ValidationError): + VerifyNTPAssociations.Input(ntp_servers=ntp_servers, ntp_pool=ntp_pool)