8000 [Follow-Up] Updated TC_ACL_2_7 and TC_ACL_2_8 python3 test module to include fabric sensitive checks by j-ororke · Pull Request #38763 · project-chip/connectedhomeip · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[Follow-Up] Updated TC_ACL_2_7 and TC_ACL_2_8 python3 test module to include fabric sensitive checks #38763

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
222 changes: 127 additions & 95 deletions src/python_testing/TC_ACL_2_7.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,60 @@ async def read_currentfabricindex(self, th: ChipDeviceCtrl) -> int:
current_fabric_index = await self.read_single_attribute_check_success(dev_ctrl=th, endpoint=0, cluster=cluster, attribute=attribute)
return current_fabric_index

def _validate_events_th1(self, events, f1, f2, is_filtered):
"""Helper method to validate events for TH1"""
logging.info("Found %d events", len(events))
found_valid_event = False
found_th2_event = False

for event_data in events:
logging.info("Examining event: %s", str(event_data))
if hasattr(event_data, 'Data') and hasattr(event_data.Data, 'changeType'):
# Check for valid TH1 event
if (event_data.Data.changeType == Clusters.AccessControl.Enums.ChangeTypeEnum.kAdded and
event_data.Data.adminNodeID == self.th1.nodeId and
isinstance(event_data.Data.adminPasscodeID, Nullable) and
event_data.Data.latestValue.data == D_OK_EMPTY and
event_data.Data.latestValue.fabricIndex == f1 and
event_data.Data.fabricIndex == f1):
found_valid_event = True

# Check for TH2 events
if (event_data.Data.adminNodeID == self.th2.nodeId or
event_data.Data.latestValue.fabricIndex == f2 or
event_data.Data.fabricIndex == f2):
found_th2_event = True

asserts.assert_true(found_valid_event, "Did not find the expected event with specified fields for TH1")
asserts.assert_false(found_th2_event, "TH1 should not see any events from TH2's fabric when fabric filtered")

def _validate_events_th2(self, events, f1, f2, is_filtered):
"""Helper method to validate events for TH2"""
logging.info("Found %d events", len(events))
found_valid_event = False
found_th1_event = False

for event_data in events:
logging.info("Examining event: %s", str(event_data))
if hasattr(event_data, 'Data') and hasattr(event_data.Data, 'changeType'):
# Check for valid TH2 event
if (event_data.Data.changeType == Clusters.AccessControl.Enums.ChangeTypeEnum.kAdded and
event_data.Data.adminNodeID == self.th2.nodeId and
isinstance(event_data.Data.adminPasscodeID, Nullable) and
event_data.Data.latestValue.data == D_OK_SINGLE and
event_data.Data.latestValue.fabricIndex == f2 and
event_data.Data.fabricIndex == f2):
found_valid_event = True

# Check for TH1 events
if (event_data.Data.adminNodeID == self.th1.nodeId or
event_data.Data.latestValue.fabricIndex == f1 or
event_data.Data.fabricIndex == f1):
found_th1_event = True

asserts.assert_true(found_valid_event, "Did not find the expected event with specified fields for TH2")
asserts.assert_false(found_th1_event, "TH2 should not see any events from TH1's fabric when fabric filtered")

def desc_TC_ACL_2_7(self) -> str:
return "[TC-ACL-2.7] Multiple fabrics test"

Expand All @@ -70,14 +124,14 @@ def steps_TC_ACL_2_7(self) -> list[TestStep]:
"Result is SUCCESS, value is stored as F2"),
TestStep(5, "TH1 writes DUT Endpoint 0 AccessControl cluster Extension attribute, value is list of AccessControlExtensionStruct containing 1 element", "Result is SUCCESS"),
TestStep(6, "TH2 writes DUT Endpoint 0 AccessControl cluster Extension attribute, value is list of AccessControlExtensionStruct containing 1 element", "Result is SUCCESS"),
TestStep(7, "TH1 reads DUT Endpoint 0 AccessControl cluster Extension attribute",
"Result is SUCCESS, value is list of AccessControlExtensionStruct containing 1 element"),
TestStep(8, "TH2 reads DUT Endpoint 0 AccessControl cluster Extension attribute",
"Result is SUCCESS, value is list of AccessControlExtensionStruct containing 1 element"),
TestStep(9, "TH1 reads DUT Endpoint 0 AccessControl cluster AccessControlExtensionChanged event",
"Result is SUCCESS, value is list of AccessControlExtensionChanged containing 1 element"),
TestStep(10, "TH1 reads DUT Endpoint 0 AccessControl cluster AccessControlExtensionChanged event",
"Result is SUCCESS, value is list of AccessControlExtensionChanged containing 1 element"),
TestStep(7, "TH1 reads DUT Endpoint 0 AccessControl cluster Extension attribute with both fabricFiltered True and False",
"Result is SUCCESS, value is list of AccessControlExtensionStruct containing 1 element when fabric filtered"),
TestStep(8, "TH2 reads DUT Endpoint 0 AccessControl cluster Extension attribute with both fabricFiltered True and False",
"Result is SUCCESS, value is list of AccessControlExtensionStruct containing 1 element when fabric filtered"),
TestStep(9, "TH1 reads DUT Endpoint 0 AccessControl cluster AccessControlExtensionChanged event with both fabricFiltered True and False",
"Result is SUCCESS, value is list of AccessControlExtensionChanged containing 1 element when fabric filtered"),
TestStep(10, "TH2 reads DUT Endpoint 0 AccessControl cluster AccessControlExtensionChanged event with both fabricFiltered True and False",
"Result is SUCCESS, value is list of AccessControlExtensionChanged containing 1 element when fabric filtered"),
TestStep(11, "TH_CR1 sends RemoveFabric for TH2 fabric command to DUT_CE",
"Verify DUT_CE responses with NOCResponse with a StatusCode OK"),
]
Expand Down Expand Up @@ -148,128 +202,106 @@ async def test_TC_ACL_2_7(self):
result[0].Status, Status.Success, "Write should have succeeded")

self.step(7)
# TH1 reads Extension attribute
# TH1 reads Extension attribute with both fabricFiltered True and False
ac_cluster = Clusters.AccessControl
extension_attr = Clusters.AccessControl.Attributes.Extension
result = await self.read_single_attribute_check_success(dev_ctrl=self.th1, endpoint=0, cluster=ac_cluster, attribute=extension_attr)
logging.info("TH1 read result: %s", str(result))
asserts.assert_equal(len(result), 1,
"Should have exactly one extension")

endpoint_data = result[0]
logging.info("TH1 endpoint data: %s", str(endpoint_data))
# Read with fabric_filtered=True
result_filtered = await self.read_single_attribute_check_success(
dev_ctrl=self.th1, endpoint=0, cluster=ac_cluster, attribute=extension_attr,
)
logging.info("TH1 read result (fabricFiltered=True): %s", str(result_filtered))
asserts.assert_equal(len(result_filtered), 1, "Should have exactly one extension when fabric filtered")

endpoint_data = result_filtered[0]
asserts.assert_equal(
endpoint_data.data, D_OK_EMPTY, "Extension data should match D_OK_EMPTY")

asserts.assert_equal(endpoint_data.fabricIndex,
f1, "FabricIndex should match F1")

self.step(8)
# TH2 reads Extension attribute
extension_attr = Clusters.AccessControl.Attributes.Extension
result2 = await self.read_single_attribute_check_success(dev_ctrl=self.th2, endpoint=0, cluster=ac_cluster, attribute=extension_attr)
logging.info("TH2 read result: %s", str(result2))
asserts.assert_equal(len(result2), 1,
"Should have exactly one extension")
# Read with fabric_filtered=False
result_unfiltered = await self.read_single_attribute_check_success(
dev_ctrl=self.th1, endpoint=0, cluster=ac_cluster, attribute=extension_attr,
fabric_filtered=False
)
logging.info("TH1 read result (fabricFiltered=False): %s", str(result_unfiltered))
asserts.assert_equal(len(result_unfiltered), 2, "Should have two extensions when not fabric filtered")
# Check that the TH2 extension data is empty
endpoint_data_th2 = result_unfiltered[1]
asserts.assert_equal(
endpoint_data_th2.data, b'', "Extension data should be empty")

endpoint_data2 = result2[0]
self.step(8)
# TH2 reads Extension attribute with both fabricFiltered True and False
result2_filtered = await self.read_single_attribute_check_success(
dev_ctrl=self.th2, endpoint=0, cluster=ac_cluster, attribute=extension_attr,
)
logging.info("TH2 read result (fabricFiltered=True): %s", str(result2_filtered))
asserts.assert_equal(len(result2_filtered), 1, "Should have exactly one extension when fabric filtered")

endpoint_data2 = result2_filtered[0]
asserts.assert_equal(
endpoint_data2.data, D_OK_SINGLE, "Extension data should match D_OK_SINGLE")
asserts.assert_equal(endpoint_data2.fabricIndex, f2, "FabricIndex should match F2")

# Read with fabric_filtered=False
result2_unfiltered = await self.read_single_attribute_check_success(
dev_ctrl=self.th2, endpoint=0, cluster=ac_cluster, attribute=extension_attr,
fabric_filtered=False
)
logging.info("TH2 read result (fabricFiltered=False): %s", str(result2_unfiltered))
asserts.assert_true(len(result2_unfiltered) >= 2, "Should have two extensions when not fabric filtered")
# Check that the TH1 extension data is empty
endpoint_data_th1 = result2_unfiltered[0]
asserts.assert_equal(
endpoint_data_th1.data, b'', "Extension data should be empty")

self.step(9)
# TH1 reads AccessControlExtensionChanged event
# TH1 reads AccessControlExtensionChanged event with both fabricFiltered True and False
acec_event = Clusters.AccessControl.Events.AccessControlExtensionChanged
logging.info(
"TH1 reading AccessControlExtensionChanged events...")

events_response = await self.th1.ReadEvent(
# Read with fabricFiltered=True
events_response_filtered = await self.th1.ReadEvent(
self.dut_node_id,
events=[(0, acec_event)],
fabricFiltered=True
)
logging.info("Events response: %s", str(events_response))

events = events_response
logging.info("Found %d events", len(events))

found_valid_event = False
found_th2_event = False

for event_data in events:
logging.info("Examining event: %s", str(event_data))
self._validate_events_th1(events_response_filtered, f1, f2, True)

if hasattr(event_data, 'Data') and hasattr(event_data.Data, 'changeType'):
# Check for valid TH1 event
if (event_data.Data.changeType == Clusters.AccessControl.Enums.ChangeTypeEnum.kAdded and
event_data.Data.adminNodeID == self.th1.nodeId and
isinstance(event_data.Data.adminPasscodeID, Nullable) and
event_data.Data.latestValue.data == D_OK_EMPTY and
event_data.Data.latestValue.fabricIndex == f1 and
event_data.Data.fabricIndex == f1):
found_valid_event = True
logging.info("Found valid event for TH1")

# Check for TH2 events that shouldn't be visible to TH1
if (event_data.Data.adminNodeID == self.th2.nodeId or
event_data.Data.latestValue.fabricIndex == f2 or
event_data.Data.fabricIndex == f2):
found_th2_event = True
logging.info("Found TH2 event visible to TH1, which violates fabric isolation")

asserts.assert_true(
found_valid_event, "Did not find the expected event with specified fields for TH1")
asserts.assert_false(
found_th2_event, "TH1 should not see any events from TH2's fabric due to fabric isolation")
# Read with fabricFiltered=False
events_response_unfiltered = await self.th1.ReadEvent(
self.dut_node_id,
events=[(0, acec_event)],
fabricFiltered=False
)
self._validate_events_th1(events_response_unfiltered, f1, f2, False)

self.step(10)
# TH2 reads AccessControlExtensionChanged event
events_response = await self.th2.ReadEvent(
# TH2 reads AccessControlExtensionChanged event with both fabricFiltered True and False
# Read with fabricFiltered=True
events_response2_filtered = await self.th2.ReadEvent(
self.dut_node_id,
events=[(0, acec_event)],
fabricFiltered=True
)
logging.info("Events response: %s", str(events_response))
self._validate_events_th2(events_response2_filtered, f1, f2, True)

events = events_response
logging.info("Found %d events", len(events))

found_valid_event = False
found_th1_event = False

for event_data in events:
logging.info("Examining event: %s", str(event_data))

if hasattr(event_data, 'Data') and hasattr(event_data.Data, 'changeType'):
# Check for valid TH2 event
if (event_data.Data.changeType == Clusters.AccessControl.Enums.ChangeTypeEnum.kAdded and
event_data.Data.adminNodeID == self.th2.nodeId and
isinstance(event_data.Data.adminPasscodeID, Nullable) and
event_data.Data.latestValue.data == D_OK_SINGLE and
event_data.Data.latestValue.fabricIndex == f2 and
event_data.Data.fabricIndex == f2):
found_valid_event = True
logging.info("Found valid event for TH2")

# Check for TH1 events that shouldn't be visible to TH2
if (event_data.Data.adminNodeID == self.th1.nodeId or
event_data.Data.latestValue.fabricIndex == f1 or
event_data.Data.fabricIndex == f1):
found_th1_event = True
logging.info("Found TH1 event visible to TH2, which violates fabric isolation")

asserts.assert_true(
found_valid_event, "Did not find the expected event with specified fields for TH2")
asserts.assert_false(
found_th1_event, "TH2 should not see any events from TH1's fabric due to fabric isolation")
# Read with fabricFiltered=False
events_response2_unfiltered = await self.th2.ReadEvent(
self.dut_node_id,
events=[(0, acec_event)],
fabricFiltered=False
)
self._validate_events_th2(events_response2_unfiltered, f1, f2, False)

self.step(11)
# TH_CR1 sends RemoveFabric for TH2 fabric command to DUT_CE
fabric_idx_cr2_2 = await self.read_currentfabricindex(th=self.th2)
removeFabricCmd2 = Clusters.OperationalCredentials.Commands.RemoveFabric(fabric_idx_cr2_2)
await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=removeFabricCmd2)
result = await self.th1.SendCommand(nodeid=self.dut_node_id, endpoint=0, payload=removeFabricCmd2)
logging.info("RemoveFabric command result: %s", str(result))
asserts.assert_equal(
result.statusCode, 0, "RemoveFabric command should have succeeded")


if __name__ == "__main__":
Expand Down
Loading
Loading
0