8000 refactor(anta): Update MultiProtocolCaps custom type to make it more restrictive by geetanjalimanegslab · Pull Request #1021 · aristanetworks/anta · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

refactor(anta): Update MultiProtocolCaps custom type to make it more restrictive #1021

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

91 changes: 68 additions & 23 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@
REGEXP_TYPE_HOSTNAME = r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$"
"""Match hostname like `my-hostname`, `my-hostname-1`, `my-hostname-1-2`."""

# Regexp BGP AFI/SAFI
REGEXP_BGP_L2VPN_AFI = r"\b(l2[\s\-]?vpn[\s\-]?evpn)\b"
"""Match L2VPN EVPN AFI."""
REGEXP_BGP_IPV4_MPLS_LABELS = r"\b(ipv4[\s\-]?mpls[\s\-]?label(s)?)\b"
"""Match IPv4 MPLS Labels."""
REGEX_BGP_IPV4_MPLS_VPN = r"\b(ipv4[\s\-]?mpls[\s\-]?vpn)\b"
"""Match IPv4 MPLS VPN."""
REGEX_BGP_IPV4_UNICAST = r"\b(ipv4[\s\-]?uni[\s\-]?cast)\b"
"""Match IPv4 Unicast."""


def aaa_group_prefix(v: str) -> str:
"""Prefix the AAA method with 'group' if it is known."""
Expand Down Expand Up @@ -78,26 +68,57 @@ def interface_case_sensitivity(v: str) -> str:
def bgp_multiprotocol_capabilities_abbreviations(value: str) -> str:
"""Abbreviations for different BGP multiprotocol capabilities.

Handles different separators (hyphen, underscore, space) and case sensitivity.

Examples
--------
- IPv4 Unicast
- L2vpnEVPN
- ipv4 MPLS Labels
- ipv4Mplsvpn

```python
>>> bgp_multiprotocol_capabilities_abbreviations("IPv4 Unicast")
'ipv4Unicast'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4-Flow_Spec Vpn")
'ipv4FlowSpecVpn'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv6_labeled-unicast")
'ipv6MplsLabels'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4_mpls_vpn")
'ipv4MplsVpn'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4 mpls labels")
'ipv4MplsLabels'
>>> bgp_multiprotocol_capabilities_abbreviations("rt-membership")
'rtMembership'
>>> bgp_multiprotocol_capabilities_abbreviations("dynamic-path-selection")
'dps'
```
"""
patterns = {
REGEXP_BGP_L2VPN_AFI: "l2VpnEvpn",
REGEXP_BGP_IPV4_MPLS_LABELS: "ipv4MplsLabels",
REGEX_BGP_IPV4_MPLS_VPN: "ipv4MplsVpn",
REGEX_BGP_IPV4_UNICAST: "ipv4Unicast",
f"{r'dynamic[-_ ]?path[-_ ]?selection$'}": "dps",
f"{r'dps$'}": "dps",
f"{r'ipv4[-_ ]?unicast$'}": "ipv4Unicast",
f"{r'ipv6[-_ ]?unicast$'}": "ipv6Unicast",
f"{r'ipv4[-_ ]?multicast$'}": "ipv4Multicast",
f"{r'ipv6[-_ ]?multicast$'}": "ipv6Multicast",
f"{r'ipv4[-_ ]?labeled[-_ ]?Unicast$'}": "ipv4MplsLabels",
f"{r'ipv4[-_ ]?mpls[-_ ]?labels$'}": "ipv4MplsLabels",
f"{r'ipv6[-_ ]?labeled[-_ ]?Unicast$'}": "ipv6MplsLabels",
f"{r'ipv6[-_ ]?mpls[-_ ]?labels$'}": "ipv6MplsLabels",
f"{r'ipv4[-_ ]?sr[-_ ]?te$'}": "ipv4SrTe", # codespell:ignore
f"{r'ipv6[-_ ]?sr[-_ ]?te$'}": "ipv6SrTe", # codespell:ignore
f"{r'ipv4[-_ ]?mpls[-_ ]?vpn$'}": "ipv4MplsVpn",
f"{r'ipv6[-_ ]?mpls[-_ ]?vpn$'}": "ipv6MplsVpn",
f"{r'ipv4[-_ ]?Flow[-_ ]?spec$'}": "ipv4FlowSpec",
f"{r'ipv6[-_ ]?Flow[-_ ]?spec$'}": "ipv6FlowSpec",
f"{r'ipv4[-_ ]?Flow[-_ ]?spec[-_ ]?vpn$'}": "ipv4FlowSpecVpn",
f"{r'ipv6[-_ ]?Flow[-_ ]?spec[-_ ]?vpn$'}": "ipv6FlowSpecVpn",
f"{r'l2[-_ ]?vpn[-_ ]?vpls$'}": "l2VpnVpls",
f"{r'l2[-_ ]?vpn[-_ ]?evpn$'}": "l2VpnEvpn",
f"{r'link[-_ ]?state$'}": "linkState",
f"{r'rt[-_ ]?membership$'}": "rtMembership",
f"{r'ipv4[-_ ]?rt[-_ ]?membership$'}": "rtMembership",
f"{r'ipv4[-_ ]?mvpn$'}": "ipv4Mvpn",
}

for pattern, replacement in patterns.items():
match = re.search(pattern, value, re.IGNORECASE)
match = re.match(pattern, value, re.IGNORECASE)
if match:
return replacement

return value


Expand Down Expand Up @@ -145,7 +166,31 @@ def validate_regex(value: str) -> str:
EncryptionAlgorithm = Literal["RSA", "ECDSA"]
RsaKeySize = Literal[2048, 3072, 4096]
EcdsaKeySize = Literal[256, 384, 512]
MultiProtocolCaps = Annotated[str, BeforeValidator(bgp_multiprotocol_capabilities_abbreviations)]
MultiProtocolCaps = Annotated[
Literal[
"dps",
"ipv4Unicast",
"ipv6Unicast",
"ipv4Multicast",
"ipv6Multicast",
"ipv4MplsLabels",
"ipv6MplsLabels",
"ipv4SrTe",
"ipv6SrTe",
"ipv4MplsVpn",
"ipv6MplsVpn",
"ipv4FlowSpec",
"ipv6FlowSpec",
"ipv4FlowSpecVpn",
"ipv6FlowSpecVpn",
"l2VpnVpls",
"l2VpnEvpn",
"linkState",
"rtMembership",
"ipv4Mvpn",
],
BeforeValidator(bgp_multiprotocol_capabilities_abbreviations),
]
BfdInterval = Annotated[int, Field(ge=50, le=60000)]
BfdMultiplier = Annotated[int, Field(ge=3, le=50)]
ErrDisableReasons = Literal[
Expand Down
3 changes: 2 additions & 1 deletion anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,8 @@ class VerifyBGPPeerMPCaps(AntaTest):
vrf: default
strict: False
capabilities:
- ipv4Unicast
- ipv4 labeled-Unicast
- ipv4MplsVpn
```
"""

Expand Down
3 changes: 2 additions & 1 deletion examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ anta.tests.routing.bgp:
vrf: default
strict: False
capabilities:
- ipv4Unicast
- ipv4 labeled-Unicast
- ipv4MplsVpn
- VerifyBGPPeerRouteLimit:
# Verifies maximum routes and warning limit for BGP IPv4 peer(s).
bgp_peers:
Expand Down
14 changes: 7 additions & 7 deletions tests/units/anta_tests/routing/test_bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,12 +1384,12 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
{
"peer_address": "172.30.11.1",
"vrf": "default",
"capabilities": ["Ipv4 Unicast", "ipv4 Mpls labels"],
"capabilities": ["Ipv4Unicast", "ipv4 Mpls labels"],
},
{
"peer_address": "172.30.11.10",
"vrf": "MGMT",
"capabilities": ["ipv4 Unicast", "ipv4 MplsVpn"],
"capabilities": ["ipv4_Unicast", "ipv4 MplsVpn"],
},
]
},
Expand Down Expand Up @@ -1441,12 +1441,12 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
{
"peer_address": "172.30.11.10",
"vrf": "default",
"capabilities": ["ipv4Unicast", "L2 Vpn EVPN"],
"capabilities": ["ipv4Unicast", "l2-vpn-EVPN"],
},
{
"peer_address": "172.30.11.1",
"vrf": "MGMT",
"capabilities": ["ipv4Unicast", "L2 Vpn EVPN"],
"capabilities": ["ipv4Unicast", "l2vpnevpn"],
},
]
},
Expand Down Expand Up @@ -1575,7 +1575,7 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
{
"peer_address": "172.30.11.10",
"vrf": "MGMT",
"capabilities": ["ipv4unicast", "ipv4 mplsvpn", "L2vpnEVPN"],
"capabilities": ["ipv4_unicast", "ipv4 mplsvpn", "L2vpnEVPN"],
},
{
"peer_address": "172.30.11.11",
Expand Down Expand Up @@ -1656,13 +1656,13 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
"peer_address": "172.30.11.1",
"vrf": "default",
"strict": True,
"capabilities": ["Ipv4 Unicast", "ipv4 Mpls labels"],
"capabilities": ["Ipv4 Unicast", "ipv4MplsLabels"],
},
{
"peer_address": "172.30.11.10",
"vrf": "MGMT",
"strict": True,
"capabilities": ["ipv4 Unicast", "ipv4 MplsVpn"],
"capabilities": ["ipv4-Unicast", "ipv4MplsVpn"],
},
]
},
Expand Down
66 changes: 20 additions & 46 deletions tests/units/test_custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@
import pytest

from anta.custom_types import (
REGEX_BGP_IPV4_MPLS_VPN,
REGEX_BGP_IPV4_UNICAST,
REGEX_TYPE_PORTCHANNEL,
REGEXP_BGP_IPV4_MPLS_LABELS,
REGEXP_BGP_L2VPN_AFI,
REGEXP_INTERFACE_ID,
REGEXP_PATH_MARKERS,
REGEXP_TYPE_EOS_INTERFACE,
Expand Down Expand Up @@ -50,40 +46,6 @@ def test_regexp_path_markers() -> None:
assert re.search(REGEXP_PATH_MARKERS, ".[]?<>") is None


def test_regexp_bgp_l2vpn_afi() -> None:
"""Test REGEXP_BGP_L2VPN_AFI."""
# Test strings that should match the pattern
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2vpn-evpn") is not None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2 vpn evpn") is not None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2-vpn evpn") is not None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2vpn evpn") is not None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2vpnevpn") is not None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2 vpnevpn") is not None

# Test strings that should not match the pattern
assert re.search(REGEXP_BGP_L2VPN_AFI, "al2vpn evpn") is None
assert re.search(REGEXP_BGP_L2VPN_AFI, "l2vpn-evpna") is None


def test_regexp_bgp_ipv4_mpls_labels() -> None:
"""Test REGEXP_BGP_IPV4_MPLS_LABELS."""
assert re.search(REGEXP_BGP_IPV4_MPLS_LABELS, "ipv4-mpls-label") is not None
assert re.search(REGEXP_BGP_IPV4_MPLS_LABELS, "ipv4 mpls labels") is not None
assert re.search(REGEXP_BGP_IPV4_MPLS_LABELS, "ipv4Mplslabel") is None


def test_regex_bgp_ipv4_mpls_vpn() -> None:
"""Test REGEX_BGP_IPV4_MPLS_VPN."""
assert re.search(REGEX_BGP_IPV4_MPLS_VPN, "ipv4-mpls-vpn") is not None
assert re.search(REGEX_BGP_IPV4_MPLS_VPN, "ipv4_mplsvpn") is None


def test_regex_bgp_ipv4_unicast() -> None:
"""Test REGEX_BGP_IPV4_UNICAST."""
assert re.search(REGEX_BGP_IPV4_UNICAST, "ipv4-uni-cast") is not None
assert re.search(REGEX_BGP_IPV4_UNICAST, "ipv4+unicast") is None


def test_regexp_type_interface_id() -> None:
"""Test REGEXP_INTERFACE_ID."""
intf_id_re = re.compile(f"{REGEXP_INTERFACE_ID}")
Expand Down Expand Up @@ -209,13 +171,29 @@ def test_interface_autocomplete_failure() -> None:
("str_input", "expected_output"),
[
pytest.param("L2VPNEVPN", "l2VpnEvpn", id="l2VpnEvpn"),
pytest.param("ipv4-mplsLabels", "ipv4MplsLabels", id="ipv4MplsLabels"),
pytest.param("IPv4 Labeled Unicast", "ipv4MplsLabels", id="ipv4MplsLabels"),
pytest.param("ipv4-mpls-vpn", "ipv4MplsVpn", id="ipv4MplsVpn"),
pytest.param("ipv4-unicast", "ipv4Unicast", id="ipv4Unicast"),
pytest.param("BLAH", "BLAH", id="unmatched"),
pytest.param("ipv4_unicast", "ipv4Unicast", id="ipv4Unicast"),
pytest.param("ipv4 Mvpn", "ipv4Mvpn", id="ipv4Mvpn"),
pytest.param("ipv4_Flow-Spec Vpn", "ipv4FlowSpecVpn", id="ipv4FlowSpecVpn"),
pytest.param("Dynamic-Path-Selection", "dps", id="dps"),
pytest.param("ipv6unicast", "ipv6Unicast", id="ipv6Unicast"),
pytest.param("IPv4-Multicast", "ipv4Multicast", id="ipv4Multicast"),
pytest.param("IPv6_multicast", "ipv6Multicast", id="ipv6Multicast"),
pytest.param("ipv6_Mpls-Labels", "ipv6MplsLabels", id="ipv6MplsLabels"),
pytest.param("IPv4_SR_TE", "ipv4SrTe", id="ipv4SrTe"),
pytest.param("iPv6-sR-tE", "ipv6SrTe", id="ipv6SrTe"),
pytest.param("ipv6_mpls-vpn", "ipv6MplsVpn", id="ipv6MplsVpn"),
pytest.param("IPv4 Flow-spec", "ipv4FlowSpec", id="ipv4FlowSpec"),
pytest.param("IPv6Flow_spec", "ipv6FlowSpec", id="ipv6FlowSpec"),
pytest.param("ipv6 Flow-Spec Vpn", "ipv6FlowSpecVpn", id="ipv6FlowSpecVpn"),
pytest.param("L2VPN VPLS", "l2VpnVpls", id="l2VpnVpls"),
pytest.param("link-state", "linkState", id="linkState"),
pytest.param("RT_Membership", "rtMembership", id="rtMembership"),
pytest.param("ipv4-RT_Membership", "rtMembership", id="rtMembership"),
],
)
def test_bgp_multiprotocol_capabilities_abbreviationsh(str_input: str, expected_output: str) -> None:
def test_bgp_multiprotocol_capabilities_abbreviations(str_input: str, expected_output: str) -> None:
"""Test bgp_multiprotocol_capabilities_abbreviations."""
assert bgp_multiprotocol_capabilities_abbreviations(str_input) == expected_output

Expand Down Expand Up @@ -257,11 +235,7 @@ def test_interface_case_sensitivity_uppercase() -> None:
@pytest.mark.parametrize(
"str_input",
[
REGEX_BGP_IPV4_MPLS_VPN,
REGEX_BGP_IPV4_UNICAST,
REGEX_TYPE_PORTCHANNEL,
REGEXP_BGP_IPV4_MPLS_LABELS,
REGEXP_BGP_L2VPN_AFI,
REGEXP_INTERFACE_ID,
REGEXP_PATH_MARKERS,
REGEXP_TYPE_EOS_INTERFACE,
Expand Down
Loading
0