8000 feature: healthcheck ready wait for all data sources load by charlottebrady · Pull Request #803 · permitio/opal · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feature: healthcheck ready wait for all data sources load #803

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions packages/opal-client/opal_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ def _init_fast_api_app(self):
self._configure_lifecycle_callbacks(app)
return app

async def _is_ready(self):
async def _is_ready(self, wait_for_all_data_sources_loaded: bool = False):
# Data loaded from file or from server
return self._backup_loaded or await self.policy_store.is_ready()
return self._backup_loaded or await self.policy_store.is_ready(wait_for_all_data_sources_loaded=wait_for_all_data_sources_loaded)

def _configure_api_routes(self, app: FastAPI):
"""Mounts the api routes on the app object."""
Expand Down Expand Up @@ -307,9 +307,9 @@ async def healthy():
)

@app.get("/ready", include_in_schema=False)
async def ready():
async def ready(wait_for_all_data_sources_loaded: bool = False):
"""Returns 200 if the policy store is ready to serve requests."""
if await self._is_ready():
if await self._is_ready(wait_for_all_data_sources_loaded=wait_for_all_data_sources_loaded):
return JSONResponse(
status_code=status.HTTP_200_OK, content={"status": "ok"}
)
Expand Down
10 changes: 10 additions & 0 deletions packages/opal-client/opal_client/data/updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,16 @@ async def get_base_policy_data(
# Fetch the base config with all data entries
sources_config = await self.get_policy_data_config(url=config_url)

# Set the expected data transaction count for readiness checking
# This count represents all data sources that will be loaded initially
total_expected_data_transactions = len(sources_config.entries)
if hasattr(self._policy_store, 'set_expected_data_transaction_count'):
await self._policy_store.set_expected_data_transaction_count(total_expected_data_transactions)
logger.info(
"Set expected data transaction count to {count}",
count=total_expected_data_transactions
)

init_entries, periodic_entries = [], []
for entry in sources_config.entries:
if entry.periodic_update_interval is not None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,12 @@ async def log_transaction(self, transaction: StoreTransaction):
async def is_healthy(self) -> bool:
raise NotImplementedError()

async def is_ready(self) -> bool:
async def is_ready(self, wait_for_all_data_sources_loaded: bool = False) -> bool:
raise NotImplementedError()

async def set_expected_data_transaction_count(self, count: int) -> None:
pass

async def full_export(self, writer: AsyncTextIOWrapper) -> None:
raise NotImplementedError()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class MockPolicyStoreClient(BasePolicyStoreClient):

def __init__(self) -> None:
super().__init__()
self._has_data_event: asyncio.Event() = None
self._has_data_event: Optional[asyncio.Event] = None
self._data = {}

@property
Expand Down Expand Up @@ -102,5 +102,11 @@ async def init_healthcheck_policy(
async def log_transaction(self, transaction: StoreTransaction):
pass

async def is_ready(self, wait_for_all_data_sources_loaded: bool = False) -> bool:
return self.has_data_event.is_set()

async def is_healthy(self) -> bool:
return self.has_data_event.is_set()

async def set_expected_data_transaction_count(self, count: int) -> None:
pass
50 changes: 40 additions & 10 deletions packages/opal-client/opal_client/policy_store/opa_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,28 @@ def __init__(
self._num_failed_policy_transactions = 0
self._num_successful_data_transactions = 0
self._num_failed_data_transactions = 0
self._num_expected_data_transactions = None # Track total expected data transactions (None = not set)
self._last_policy_transaction: Optional[StoreTransaction] = None
self._last_failed_policy_transaction: Optional[StoreTransaction] = None
self._last_data_transaction: Optional[StoreTransaction] = None
self._last_failed_data_transaction: Optional[StoreTransaction] = None

@property
def ready(self) -> bool:
is_ready: bool = self._num_successful_policy_transactions > 0 and (
self._data_updater_disabled or self._num_successful_data_transactions > 0
)
def is_ready(self, wait_for_all_data_sources_loaded: bool = False) -> bool:
if wait_for_all_data_sources_loaded:
# require all expected data transactions to be successful
is_ready: bool = self._num_successful_policy_transactions > 0 and (
self._data_updater_disabled or
(self._num_expected_data_transactions is not None and
self._num_successful_data_transactions == self._num_expected_data_transactions)
)
else:
# Default behavior
is_ready: bool = self._num_successful_policy_transactions > 0 and (
self._data_updater_disabled or
self._num_successful_data_transactions > 0 or
(self._num_expected_data_transactions is not None and
self._num_expected_data_transactions == 0)
)
return is_ready

@property
Expand Down Expand Up @@ -188,6 +200,15 @@ def _is_policy_transaction(self, transaction: StoreTransaction):
def _is_data_transaction(self, transaction: StoreTransaction):
return transaction.transaction_type == TransactionType.data

def set_expected_data_transaction_count(self, count: int) -> None:
"""Set the expected number of data transactions for readiness checking.
Args:
count (int): The total number of data transactions expected to be processed
before the policy store can be considered fully ready (determined
by data sources config).
"""
self._num_expected_data_transactions = count

def process_transaction(self, transaction: StoreTransaction):
"""Mutates the state into a new state that can be then persisted as
hardcoded policy."""
Expand Down Expand Up @@ -233,7 +254,7 @@ async def persist(self, state: OpaTransactionLogState):
OPA."""
logger.info(
"persisting health check policy: ready={ready}, healthy={healthy}",
ready=state.ready,
ready=state.is_ready(),
healthy=state.healthy,
)
logger.info(
Expand All @@ -245,7 +266,7 @@ async def persist(self, state: OpaTransactionLogState):
)
policy_code = self._format_with_json(
self._policy_template,
ready=state.ready,
ready=state.is_ready(),
healthy=state.healthy,
last_policy_transaction=state.last_policy_transaction,
last_failed_policy_transaction=state.last_failed_policy_transaction,
Expand Down Expand Up @@ -440,7 +461,7 @@ async def _get_oauth_token(self):
logger.warning("OAuth server connection error: {err}", err=repr(e))
raise

async def _get_auth_headers(self) -> {}:
async def _get_auth_headers(self) -> dict:
headers = {}
if self._auth_type == PolicyStoreAuth.TOKEN:
if self._token is not None:
Expand Down Expand Up @@ -945,12 +966,21 @@ async def log_transaction(self, transaction: StoreTransaction):
data=transaction_data,
)

async def is_ready(self) -> bool:
return self._transaction_state.ready
async def is_ready(self, wait_for_all_data_sources_loaded: bool = False) -> bool:
return self._transaction_state.is_ready(wait_for_all_data_sources_loaded=wait_for_all_data_sources_loaded)

async def is_healthy(self) -> bool:
return self._transaction_state.healthy

async def set_expected_data_transaction_count(self, count: int) -> None:
"""Set the expected number of data transactions for readiness checking.
Args:
count (int): The total number of data transactions expected to be processed
before the policy store can be considered fully ready (determined
by the data sources config).
"""
self._transaction_state.set_expected_data_transaction_count(count)

async def full_export(self, writer: AsyncTextIOWrapper) -> None:
policies = await self.get_policies()
data = self._policy_data_cache.get_data()
Expand Down
132 changes: 132 additions & 0 deletions packages/opal-client/opal_client/tests/client_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
import pytest
from unittest.mock import patch
from fastapi import status
from fastapi.testclient import TestClient

from opal_client.client import OpalClient
from opal_client.policy_store.base_policy_store_client import BasePolicyStoreClient
from opal_client.config import PolicyStoreTypes


class MockPolicyStoreClient(BasePolicyStoreClient):
"""Mock policy store client for testing."""

def __init__(self, is_ready_value=True, is_healthy_value=True):
self._is_ready_value = is_ready_value
self._is_healthy_value = is_healthy_value

async def is_ready(self, wait_for_all_data_sources_loaded: bool = False) -> bool:
return self._is_ready_value

async def is_healthy(self) -> bool:
return self._is_healthy_value


@pytest.fixture
def mock_policy_store_ready():
"""Mock policy store that is ready."""
return MockPolicyStoreClient(is_ready_value=True, is_healthy_value=True)


@pytest.fixture
def mock_policy_store_not_ready():
"""Mock policy store that is not ready."""
return MockPolicyStoreClient(is_ready_value=False, is_healthy_value=False)


@pytest.fixture
def opal_client_ready(mock_policy_store_ready):
"""Create an OpalClient with a ready policy store."""
with patch("opal_client.client.PolicyStoreClientFactory.create") as mock_factory:
mock_factory.return_value = mock_policy_store_ready

client = OpalClient(
policy_store_type=PolicyStoreTypes.OPA,
policy_store=mock_policy_store_ready,
data_updater=False, # Disable data updater
policy_updater=False, # Disable policy updater
inline_opa_enabled=False,
inline_cedar_enabled=False,
)
yield client


@pytest.fixture
def opal_client_not_ready(mock_policy_store_not_ready):
"""Create an OpalClient with a policy store that is not ready."""
with patch("opal_client.client.PolicyStoreClientFactory.create") as mock_factory:
mock_factory.return_value = mock_policy_store_not_ready

client = OpalClient(
policy_store_type=PolicyStoreTypes.OPA,
policy_store=mock_policy_store_not_ready,
data_updater=False, # Disable data updater
policy_updater=False, # Disable policy updater
inline_opa_enabled=False,
inline_cedar_enabled=False,
)
yield client


@pytest.fixture
def test_client_ready(opal_client_ready):
"""Create a FastAPI test client with a ready OPAL client."""
return TestClient(opal_client_ready.app)


@pytest.fixture
def test_client_not_ready(opal_client_not_ready):
"""Create a FastAPI test client with a not ready OPAL client."""
return TestClient(opal_client_not_ready.app)


def test_ready_endpoint_when_ready(test_client_ready, opal_client_ready):
"""Test that /ready endpoint returns 200 when policy store is ready and _is_ready is called with correct arguments."""
with patch.object(
opal_client_ready, "_is_ready", return_value=True
) as mock_is_ready:
response = test_client_ready.get("/ready")

assert response.status_code == status.HTTP_200_OK
assert response.json() == {"status": "ok"}
mock_is_ready.assert_called_once_with(wait_for_all_data_sources_loaded=False)


def test_ready_endpoint_when_not_ready(test_client_not_ready, opal_client_not_ready):
"""Test that /ready endpoint returns 503 when policy store is not ready and _is_ready is called with correct arguments."""
with patch.object(
opal_client_not_ready, "_is_ready", return_value=False
) as mock_is_ready:
response = test_client_not_ready.get("/ready")

assert response.status_code == status.HTTP_503_SERVICE_UNAVAILABLE
assert response.json() == {"status": "unavailable"}
mock_is_ready.assert_called_once_with(wait_for_all_data_sources_loaded=False)


def test_ready_endpoint_with_query_parameter_true(test_client_ready, opal_client_ready):
"""Test that /ready endpoint works with wait_for_all_data_sources_loaded parameter and _is_ready is called with correct arguments."""
with patch.object(
opal_client_ready, "_is_ready", return_value=True
) as mock_is_ready:
response = test_client_ready.get("/ready?wait_for_all_data_sources_loaded=true")

assert response.status_code == status.HTTP_200_OK
assert response.json() == {"status": "ok"}
mock_is_ready.assert_called_once_with(wait_for_all_data_sources_loaded=True)


def test_ready_endpoint_with_query_parameter_false(
test_client_ready, opal_client_ready
):
"""Test that /ready endpoint works with wait_for_all_data_sources_loaded=false and _is_ready is called with correct arguments."""
with patch.object(
opal_client_ready, "_is_ready", return_value=True
) as mock_is_ready:
response = test_client_ready.get(
"/ready?wait_for_all_data_sources_loaded=false"
)

assert response.status_code == status.HTTP_200_OK
assert response.json() == {"status": "ok"}
mock_is_ready.assert_called_once_with(wait_for_all_data_sources_loaded=False)
Loading
0