8000 refactor(anta.tests): Move routing.generic.ospf tests to new structure by gmuloc · Pull Request #188 · aristanetworks/anta · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

refactor(anta.tests): Move routing.generic.ospf tests to new structure #188

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 86 additions & 52 deletions anta/tests/routing/ospf.py
Original file line number Diff line number Diff line change
@@ -1,73 +1,107 @@
"""
OSPF test functions
"""
from __future__ import annotations

import logging
from typing import Any, Dict, List, Optional, cast

from anta.inventory.models import InventoryDevice
from anta.result_manager.models import TestResult
from anta.tests import anta_test
from anta.models import AntaTest, AntaTestCommand

logger = logging.getLogger(__name__)


@anta_test
async def verify_ospf_state(device: InventoryDevice, result: TestResult) -> TestResult:
def _count_ospf_neighbor(ospf_neighbor_json: Dict[str, Any]) -> int:
"""
Verifies all OSPF neighbors are in FULL state.
Count the number of OSPF neighbors
"""
count = 0
for _, vrf_data in ospf_neighbor_json["vrfs"].items():
for _, instance_data in vrf_data["instList"].items():
count += len(instance_data.get("ospfNeighborEntries", []))
return count

Args:
device (InventoryDevice): InventoryDevice instance containing all devices information.

Returns:
TestResult instance with
* result = "unset" if the test has not been executed
* result = "success" if all OSPF neighbors are FULL.
* result = "failure" otherwise.
* result = "error" if any exception is caught

def _get_not_full_ospf_neighbors(ospf_neighbor_json: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
Return the OSPF neighbors whose adjacency state is not "full"
"""
response = await device.session.cli(command="show ip ospf neighbor | exclude FULL|Address", ofmt="text")
logger.debug(f"query result is: {response}")
if len(response) == 0:
result.is_skipped("no OSPF neighbor found")
return result
if response.count("\n") == 0:
result.is_success()
else:
result.is_failure("Some neighbors are not correctly configured.")
not_full_neighbors = []
for vrf, vrf_data in ospf_neighbor_json["vrfs"].items():
for instance, instance_data in vrf_data["instList"].items():
for neighbor_data in instance_data.get("ospfNeighborEntries", []):
if (state := neighbor_data["adjacencyState"]) != "full":
not_full_neighbors.append(
{
"vrf": vrf,
"instance": instance,
"neighbor": neighbor_data["routerId"],
"state": state,
}
)

return result
return not_full_neighbors


@anta_test
async def verify_ospf_count(device: InventoryDevice, result: TestResult, number: int) -> TestResult:
class VerifyOSPFNeighborState(AntaTest):
"""
Verifies all OSPF neighbors are in FULL state.
"""

name = "VerifyOSPFNeighborState"
description = "Verifies all OSPF neighbors are in FULL state."
categories = ["routing", "ospf"]
commands = [AntaTestCommand(command="show ip ospf neighbor")]

@AntaTest.anta_test
def test(self) -> None:
"""Run VerifyOSPFNeighborState validation"""

command_output = cast(Dict[str, Dict[Any, Any]], self.instance_commands[0].output)

if _count_ospf_neighbor(command_output) == 0:
self.result.is_skipped("no OSPF neighbor found")
return

F86D self.result.is_success()

not_full_neighbors = _get_not_full_ospf_neighbors(command_output)
if not_full_neighbors:
self.result.is_failure(f"Some neighbors are not correctly configured: {not_full_neighbors}.")


class VerifyOSPFNeighborCount(AntaTest):
"""
Verifies the number of OSPF neighbors in FULL state is the one we expect.

Args:
device (InventoryDevice): InventoryDevice instance containing all devices information.
number (int): The expected number of OSPF neighbors in FULL state.

Returns:
TestResult instance with
* result = "unset" if the test has not been executed
* result = "skipeed" if the `number` parameter is missing
* result = "success" if device has correct number of devices
* result = "failure" otherwise.
* result = "error" if any exception is caught
"""
if not number:
result.is_skipped("verify_igmp_snooping_vlans was not run as no number was given")
return result

response = await device.session.cli(command="show ip ospf neighbor | exclude Address", ofmt="text")
logger.debug(f"query result is: {response}")
if len(response) == 0:
result.is_skipped("no OSPF neighbor found")
return result
response_data = response.count("FULL")
if response_data.count("FULL") == number:
result.is_success()
else:
result.is_failure(f'device has {response_data.count("FULL")} neighbors (expected {number}')

return result

name = "VerifyOSPFNeighborCount"
description = "Verifies the number of OSPF neighbors in FULL state is the one we expect."
categories = ["routing", "ospf"]
commands = [AntaTestCommand(command="show ip ospf neighbor")]

@AntaTest.anta_test
def test(self, number: Optional[int] = None) -> None:
"""Run VerifyOSPFNeighborCount validation"""
if not (isinstance(number, int) and number >= 0):
self.result.is_skipped(f"VerifyOSPFNeighborCount was not run as the number given '{number}' is not a valid value.")
return

command_output = cast(Dict[str, Dict[Any, Any]], self.instance_commands[0].output)

if (neighbor_count := _count_ospf_neighbor(command_output)) == 0:
self.result.is_skipped("no OSPF neighbor found")
return

self.result.is_success()

if neighbor_count != number:
self.result.is_failure(f"device has {neighbor_count} neighbors (expected {number})")

not_full_neighbors = _get_not_full_ospf_neighbors(command_output)
print(not_full_neighbors)
if not_full_neighbors:
self.result.is_failure(f"Some neighbors are not correctly configured: {not_full_neighbors}.")
Empty file.
Loading
0