diff --git a/ocs_ci/ocs/bucket_utils.py b/ocs_ci/ocs/bucket_utils.py index 4b55ac04352..020c66d8c86 100644 --- a/ocs_ci/ocs/bucket_utils.py +++ b/ocs_ci/ocs/bucket_utils.py @@ -2143,6 +2143,24 @@ def update_replication_policy(bucket_name, replication_policy_dict): ).patch(params=json.dumps(replication_policy_patch_dict), format_type="merge") +def get_replication_policy(bucket_name): + """ + Get the replication policy on a bucket + + Args: + bucket_name (str): Name of the bucket + + Returns: + Dict: replication policy + + """ + return OCP( + kind="obc", + namespace=config.ENV_DATA["cluster_namespace"], + resource_name=bucket_name, + ).get()["spec"]["additionalConfig"]["replicationPolicy"] + + def patch_replication_policy_to_bucketclass( bucketclass_name, rule_id, destination_bucket_name ): @@ -2855,6 +2873,56 @@ def bulk_s3_put_bucket_lifecycle_config(mcg_obj, buckets, lifecycle_config): logger.info("Applied lifecyle rule on all the buckets") +def upload_random_objects_to_source_and_wait_for_replication( + mcg_obj, + source_bucket, + target_bucket, + mockup_logger, + file_dir, + pattern="ObjKey-", + amount=1, + num_versions=1, + prefix=None, + timeout=600, +): + """ + Upload randomly generated objects to the source bucket and wait until the + replication happens + + Args: + mcg_obj (MCG): MCG object + source_bucket (OBC): OBC object + target_bucket (OBC): OBC object + mockup_logger (MockupLogger): MockupLogger object + file_dir (str): File directory where to generate objects + pattern (str): Prefix for object name + amount (int): Number of objects + num_verions (int): Number of versions of each object + prefix (str): Prefix under bucket where objects need to be uploaded + timeout (int): Timeout to wait until the replication + + """ + + logger.info(f"Randomly generating {amount} object/s") + for _ in range(num_versions): + obj_list = write_random_objects_in_pod( + io_pod=mockup_logger.awscli_pod, + file_dir=file_dir, + amount=amount, + pattern=pattern, + ) + + mockup_logger.upload_random_objects_and_log( + source_bucket.name, file_dir=file_dir, obj_list=obj_list, prefix=prefix + ) + assert compare_bucket_object_list( + mcg_obj, + source_bucket.name, + target_bucket.name, + timeout=timeout, + ), f"Standard replication failed to complete in {timeout} seconds" + + def upload_test_objects_to_source_and_wait_for_replication( mcg_obj, source_bucket, target_bucket, mockup_logger, timeout ): @@ -3147,7 +3215,6 @@ def get_obj_versions(mcg_obj, awscli_pod, bucket_name, obj_key): # Remove quotes from the ETag values for easier usage for d in versions_dicts: d["ETag"] = d["ETag"].strip('"') - return versions_dicts @@ -3356,3 +3423,34 @@ def delete_all_objects_in_batches( batch_deleter.delete_in_parallel() else: batch_deleter.delete_sequentially() + + +def verify_soft_deletion(mcg_obj, awscli_pod, bucket_name, object_key): + """ + Verify if deletion marker exists and IsLatest for the given object key + + Args: + mcg_obj (MCG): MCG object + awscli_pod (Pod): Pod object where AWS CLI is installed + bucket_name (str): Name of the bucket + object_key (str): Object key + + Returns: + True if DeletionMarkers exists else False + + """ + resp = awscli_pod.exec_cmd_on_pod( + command=craft_s3_command( + f"list-object-versions --bucket {bucket_name} --prefix {object_key}", + mcg_obj=mcg_obj, + api=True, + ), + out_yaml_format=False, + ) + + if resp and "DeleteMarkers" in resp: + delete_markers = json.loads(resp).get("DeleteMarkers")[0] + logger.info(f"{bucket_name}:\n{delete_markers}") + if delete_markers.get("IsLatest"): + return True + return False diff --git a/ocs_ci/ocs/resources/mcg_replication_policy.py b/ocs_ci/ocs/resources/mcg_replication_policy.py index a28ece2b902..b70c0ecaeb0 100644 --- a/ocs_ci/ocs/resources/mcg_replication_policy.py +++ b/ocs_ci/ocs/resources/mcg_replication_policy.py @@ -38,15 +38,18 @@ def __init__( self, destination_bucket, sync_deletions=False, + sync_versions=False, prefix="", ): super().__init__(destination_bucket, prefix) self.sync_deletions = sync_deletions + self.sync_versions = sync_versions @abstractmethod def to_dict(self): dict = super().to_dict() dict["rules"][0]["sync_deletions"] = self.sync_deletions + dict["rules"][0]["sync_versions"] = self.sync_versions dict["log_replication_info"] = {} return dict @@ -65,8 +68,9 @@ def __init__( logs_bucket="", prefix="", logs_location_prefix="", + sync_versions=False, ): - super().__init__(destination_bucket, sync_deletions, prefix) + super().__init__(destination_bucket, sync_deletions, sync_versions, prefix) self.logs_bucket = logs_bucket self.logs_location_prefix = logs_location_prefix diff --git a/ocs_ci/ocs/resources/mockup_bucket_logger.py b/ocs_ci/ocs/resources/mockup_bucket_logger.py index 7187f305634..e52c6de8c79 100644 --- a/ocs_ci/ocs/resources/mockup_bucket_logger.py +++ b/ocs_ci/ocs/resources/mockup_bucket_logger.py @@ -101,6 +101,34 @@ def upload_arbitrary_object_and_log(self, bucket_name): self._upload_mockup_logs(bucket_name, [obj_name], "PUT") + def upload_random_objects_and_log( + self, bucket_name, file_dir, obj_list, prefix=None + ): + """ + Uploads randomly generated objects to the bucket and upload a matching + mockup log + + Args: + bucket_name (str): Name of the bucket + file_dir (str): File directory where the objects are present + obj_list (list): List of the objects + prefix (str): Prefix under which object needs to be uploaded + + """ + + logger.info( + f"Uploading randomly generated objects from {file_dir} to {bucket_name}" + ) + prefix = prefix if prefix else "" + sync_object_directory( + self.awscli_pod, + file_dir, + f"s3://{bucket_name}/{prefix}", + self.mcg_obj, + ) + + self._upload_mockup_logs(bucket_name=bucket_name, obj_list=obj_list, op="PUT") + def delete_objs_and_log(self, bucket_name, objs, prefix=""): """ Delete list of objects from the MCG bucket and write diff --git a/tests/conftest.py b/tests/conftest.py index 0e8d1eb90e7..486adccb477 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -46,6 +46,8 @@ from ocs_ci.ocs.bucket_utils import ( craft_s3_command, put_bucket_policy, + update_replication_policy, + put_bucket_versioning_via_awscli, ) from ocs_ci.ocs.cnv.virtual_machine import VirtualMachine, VMCloner from ocs_ci.ocs.dr.dr_workload import ( @@ -8488,6 +8490,29 @@ def factory(new_lifecycle_batch_size): return factory +@pytest.fixture() +def reduce_replication_delay(add_env_vars_to_noobaa_core_class): + """ + Fixture to reduce the replication cycle delay + + """ + + def factory(interval=1): + """ + Factory function to reduce the replication + cycle delay + + """ + new_delay_in_milliseconfs = interval * 60 * 1000 + new_env_var_touples = [ + (constants.BUCKET_REPLICATOR_DELAY_PARAM, new_delay_in_milliseconfs), + (constants.BUCKET_LOG_REPLICATOR_DELAY_PARAM, new_delay_in_milliseconfs), + ] + add_env_vars_to_noobaa_core_class(new_env_var_touples) + + return factory + + @pytest.fixture() def reset_conn_score(): """ @@ -8815,21 +8840,34 @@ def aws_log_based_replication_setup( """ A fixture to set up standard log-based replication with deletion sync. - Args: - awscli_pod_session(Pod): A pod running the AWS CLI - mcg_obj_session(MCG): An MCG object - bucket_factory: A bucket factory fixture - - Returns: - MockupBucketLogger: A MockupBucketLogger object - Bucket: The source bucket - Bucket: The target bucket - """ reduce_replication_delay_setup() - def factory(bucketclass_dict=None): + def factory( + bucketclass_dict=None, + prefix_source="", + prefix_target="", + bidirectional=False, + deletion_sync=True, + enable_versioning=False, + ): + """ + A fixture to set up standard log-based replication with deletion sync. + + Args: + bucketclass_dict (Dict): Dictionary representing bucketclass parameters + bidirectional (Bool): True if you want to setup bi-directional replication + otherwise False + deletion_sync (Bool): True if you want to setup deletion sync otherwise False + + Returns: + MockupBucketLogger: A MockupBucketLogger object + Bucket: The source bucket + Bucket: The target bucket + + """ + log.info("Starting log-based replication setup") if bucketclass_dict is None: bucketclass_dict = { @@ -8842,27 +8880,60 @@ def factory(bucketclass_dict=None): }, } target_bucket = bucket_factory(bucketclass=bucketclass_dict)[0] + if enable_versioning: + put_bucket_versioning_via_awscli( + mcg_obj_session, awscli_pod_session, target_bucket.name + ) - mockup_logger = MockupBucketLogger( + mockup_logger_source = MockupBucketLogger( awscli_pod=awscli_pod_session, mcg_obj=mcg_obj_session, bucket_factory=bucket_factory, platform=constants.AWS_PLATFORM, region=constants.DEFAULT_AWS_REGION, ) - replication_policy = AwsLogBasedReplicationPolicy( + replication_policy_source = AwsLogBasedReplicationPolicy( destination_bucket=target_bucket.name, - sync_deletions=True, - logs_bucket=mockup_logger.logs_bucket_uls_name, + sync_deletions=deletion_sync, + logs_bucket=mockup_logger_source.logs_bucket_uls_name, + prefix=prefix_source, + sync_versions=enable_versioning, ) source_bucket = bucket_factory( - 1, bucketclass=bucketclass_dict, replication_policy=replication_policy + 1, + bucketclass=bucketclass_dict, + replication_policy=replication_policy_source, )[0] + if enable_versioning: + put_bucket_versioning_via_awscli( + mcg_obj_session, awscli_pod_session, source_bucket.name + ) + + mockup_logger_target = None + if bidirectional: + mockup_logger_target = MockupBucketLogger( + awscli_pod=awscli_pod_session, + mcg_obj=mcg_obj_session, + bucket_factory=bucket_factory, + platform=constants.AWS_PLATFORM, + region=constants.DEFAULT_AWS_REGION, + ) + + replication_policy_target = AwsLogBasedReplicationPolicy( + destination_bucket=source_bucket.name, + sync_deletions=deletion_sync, + logs_bucket=mockup_logger_target.logs_bucket_uls_name, + prefix=prefix_target, + sync_versions=enable_versioning, + ) + update_replication_policy( + target_bucket.name, replication_policy_target.to_dict() + ) log.info("log-based replication setup complete") - return mockup_logger, source_bucket, target_bucket + return mockup_logger_source, mockup_logger_target, source_bucket, target_bucket return factory diff --git a/tests/cross_functional/conftest.py b/tests/cross_functional/conftest.py index aceea817827..ba36ea36285 100644 --- a/tests/cross_functional/conftest.py +++ b/tests/cross_functional/conftest.py @@ -6,6 +6,7 @@ from concurrent.futures import ThreadPoolExecutor from threading import Event +from ocs_ci.ocs.resources.mcg_lifecycle_policies import LifecyclePolicy, ExpirationRule from ocs_ci.utility.retry import retry from ocs_ci.framework import config from ocs_ci.helpers.e2e_helpers import ( @@ -129,7 +130,7 @@ def factory( awscli_pod_session=awscli_pod_session, mcg_obj_session=mcg_obj_session, ): - global secrets_obj + nonlocal secrets_obj # create bucket and write some objects to it test_bucket = bucket_factory()[0] @@ -315,7 +316,240 @@ def check_for_buckets_content(bucket): def finalizer(): - global secrets_obj + nonlocal secrets_obj + + # remove the local copy of ./mcg.bck + if os.path.exists("./mcg.bck"): + os.remove("mcg.bck") + logger.info("Removed the local copy of mcg.bck") + + # create the secrets if they're deleted + if secrets_obj: + for secret in secrets_obj: + if secret.is_deleted: + secret.create() + else: + logger.info(f"{secret.name} is not deleted!") + + # restore MCG reconcilation if not restored already + if ( + ocs_storagecluster_obj.get(resource_name=constants.DEFAULT_CLUSTERNAME)[ + "spec" + ]["multiCloudGateway"]["reconcileStrategy"] + != "manage" + ): + restore_mcg_reconcilation(ocs_storagecluster_obj) + logger.info("MCG reconcilation restored!") + + # start noobaa services if its down + ocp_deployment_obj = OCP( + kind=constants.DEPLOYMENT, namespace=config.ENV_DATA["cluster_namespace"] + ) + nb_operator_dc = Deployment( + **ocp_deployment_obj.get(resource_name=constants.NOOBAA_OPERATOR_DEPLOYMENT) + ) + nb_endpoint_dc = Deployment( + **ocp_deployment_obj.get(resource_name=constants.NOOBAA_ENDPOINT_DEPLOYMENT) + ) + start_noobaa_services(nb_endpoint_dc, nb_operator_dc) + + request.addfinalizer(finalizer) + return factory + + +@pytest.fixture() +def noobaa_db_backup_locally(bucket_factory, awscli_pod_session, mcg_obj_session): + """ + Noobaa db backup locally + + """ + + secrets_obj = [] + + def factory(): + + nonlocal secrets_obj + + # Backup secrets + ocp_secret_obj = OCP( + kind="secret", namespace=config.ENV_DATA["cluster_namespace"] + ) + secrets = [ + "noobaa-root-master-key-volume", + "noobaa-root-master-key-backend", + "noobaa-admin", + "noobaa-operator", + "noobaa-server", + "noobaa-endpoints", + ] + + secrets_yaml = [ + ocp_secret_obj.get(resource_name=f"{secret}") for secret in secrets + ] + secrets_obj = [OCS(**secret_yaml) for secret_yaml in secrets_yaml] + logger.info("Backed up secrets as secret objects!") + + # Backup the PostgreSQL database and save it to a local folder + noobaa_db_pod = get_primary_nb_db_pod() + noobaa_db_pod.exec_cmd_on_pod( + command="pg_dump nbcore -F custom -f /dev/shm/test.db", + ) + OCP(namespace=config.ENV_DATA["cluster_namespace"]).exec_oc_cmd( + command=f"cp --retries=-1 {noobaa_db_pod.name}:/dev/shm/test.db ./mcg.bck", + out_yaml_format=False, + ) + logger.info("Backed up PostgreSQL and stored it in local folder!") + + # Backup the noobaa-db-pg-cluster resource + cnpg_cluster_yaml = OCP( + kind=constants.CNPG_CLUSTER_KIND, + namespace=config.ENV_DATA["cluster_namespace"], + ).get(resource_name=constants.NB_DB_CNPG_CLUSTER_NAME) + original_db_replica_count = cnpg_cluster_yaml["spec"]["instances"] + + return cnpg_cluster_yaml, original_db_replica_count, secrets_obj + + return factory + + +@pytest.fixture() +def noobaa_db_recovery_from_local(request): + + # OCS storagecluster object + ocs_storagecluster_obj = OCP( + namespace=config.ENV_DATA["cluster_namespace"], + kind=constants.STORAGECLUSTER, + ) + + # OCP object for kind deployment + ocp_deployment_obj = OCP( + kind=constants.DEPLOYMENT, namespace=config.ENV_DATA["cluster_namespace"] + ) + + # Noobaa operator & noobaa endpoint deployments objects + nb_operator_dc = Deployment( + **ocp_deployment_obj.get(resource_name=constants.NOOBAA_OPERATOR_DEPLOYMENT) + ) + nb_endpoint_dc = Deployment( + **ocp_deployment_obj.get(resource_name=constants.NOOBAA_ENDPOINT_DEPLOYMENT) + ) + + secrets_obj = [] + + def factory(cnpg_cluster_yaml, original_db_replica_count, secrets): + + nonlocal secrets_obj + secrets_obj = secrets + + # Stop MCG reconcilation + params = '{"spec": {"multiCloudGateway": {"reconcileStrategy": "ignore"}}}' + ocs_storagecluster_obj.patch( + resource_name=constants.DEFAULT_CLUSTERNAME, + params=params, + format_type="merge", + ) + logger.info("Stopped MCG reconcilation!") + + # Stop the NooBaa Service before restoring the NooBaa DB. There will be no object service after this point + nb_operator_dc.scale(replicas=0) + nb_endpoint_dc.scale(replicas=0) + modify_statefulset_replica_count( + statefulset_name=constants.NOOBAA_CORE_STATEFULSET, replica_count=0 + ) + logger.info( + "Stopped the noobaa service: Noobaa endpoint, Noobaa core, Noobaa operator pods!!" + ) + + # Login to the NooBaa DB pod and cleanup potential database clients to nbcore + query = "SELECT pg_terminate_backend (pid) FROM pg_stat_activity WHERE datname = 'nbcore';" + try: + exec_nb_db_query(query) + except CommandFailed as ex: + if "terminating connection due to administrator command" not in str(ex): + raise ex + logger.info("Cleaned up potential database clients to nbcore!") + + # Delete the existing cnpg cluster + OCP(kind=constants.CNPG_CLUSTER_KIND).delete( + resource_name=constants.NB_DB_CNPG_CLUSTER_NAME + ) + + # Ensure the the cnpg cluster yaml uses the correct bootstrap object + cnpg_cluster_yaml["bootstrap"] = { + "initdb": { + "database": "nbcore", + "encoding": "UTF8", + "localeCType": "C", + "localeCollate": "C", + "owner": "noobaa", + } + } + cnpg_cluster_obj = OCS(**cnpg_cluster_yaml) + cnpg_cluster_obj.create() + + # Wait for the cluster status to be in a healthy state + selector = ( + f"{constants.NOOBAA_DB_LABEL_419_AND_ABOVE}," + f"{constants.CNPG_POD_ROLE_INSTANCE_LABEL}" + ) + OCP(kind=constants.POD).wait_for_resource( + condition=constants.STATUS_RUNNING, + selector=selector, + resource_count=original_db_replica_count, + timeout=600, + sleep=5, + ) + + # Restore DB from a local folder to the primary instance + # for pod_info in get_pods_having_label(label=constants.NOOBAA_CNPG_POD_LABEL): + # noobaa_db_pod = Pod(**pod_info) + noobaa_db_pod = get_primary_nb_db_pod() + OCP(namespace=config.ENV_DATA["cluster_namespace"]).exec_oc_cmd( + command=f"cp --retries=-1 ./mcg.bck {noobaa_db_pod.name}:/dev/shm/test.db", + out_yaml_format=False, + ) + cmd = ( + 'bash -c "pg_restore --no-owner -n public ' + "--role=noobaa -d nbcore " + '--verbose < /dev/shm/test.db"' + ) + noobaa_db_pod.exec_cmd_on_pod(command=cmd) + logger.info(f"Restored {noobaa_db_pod.name} from the local folder!") + + # Delete secrets and restore them from a local folder. + # Please note that verify that there are no errors before you proceed to the next steps. + for secret in secrets_obj: + secret.delete() + logger.info( + f"Deleted current Noobaa secrets: {[secret.name for secret in secrets_obj]}!" + ) + for secret in secrets_obj: + secret.create() + logger.info( + f"Restored old Noobaa secrets: {[secret.name for secret in secrets_obj]}" + ) + + # Restore MCG reconciliation + restore_mcg_reconcilation(ocs_storagecluster_obj) + logger.info("Restored MCG reconcilation!") + + # Start the NooBaa service + nb_operator_dc.scale(replicas=1) + nb_endpoint_dc.scale(replicas=1) + modify_statefulset_replica_count( + statefulset_name=constants.NOOBAA_CORE_STATEFULSET, replica_count=1 + ) + logger.info( + "Started noobaa services: Noobaa endpoint, Noobaa core, Noobaa operator pods!" + ) + + # Restart the NooBaa DB pod + noobaa_db_pod.delete() + logger.info("Restarted noobaa-db pod!") + + def finalizer(): + + nonlocal secrets_obj # remove the local copy of ./mcg.bck if os.path.exists("./mcg.bck"): @@ -400,6 +634,16 @@ def factory(noobaa_pvc_obj): ) return restore_pvc_objs, snap_obj + def teardown(): + """ + Teardown code to delete the restore pvc objects + + """ + for pvc_obj in restore_pvc_objs: + if pvc_obj.ocp.get(resource_name=pvc_obj.name, dont_raise=True): + pvc_obj.delete() + + request.addfinalizer(teardown) return factory @@ -964,25 +1208,14 @@ def factory(number_of_buckets, bucket_types, cloud_providers): reduce_expiration_interval(interval=1) logger.info("Changed noobaa lifecycle interval to 1 minute") - expiration_rule = { - "Rules": [ - { - "Expiration": { - "Days": 1, - "ExpiredObjectDeleteMarker": False, - }, - "Filter": {"Prefix": ""}, - "ID": "data-expire", - "Status": "Enabled", - } - ] - } - + expiration_rule = LifecyclePolicy(ExpirationRule(days=1)) all_buckets = create_muliple_types_provider_obcs( number_of_buckets, type, cloud_providers, bucket_factory ) - bulk_s3_put_bucket_lifecycle_config(mcg_obj, all_buckets, expiration_rule) + bulk_s3_put_bucket_lifecycle_config( + mcg_obj, all_buckets, expiration_rule.as_dict() + ) logger.info( f"Buckets created under expiration setup: {[bucket.name for bucket in all_buckets]}" diff --git a/tests/cross_functional/system_test/test_mcg_replication_with_disruptions.py b/tests/cross_functional/system_test/test_mcg_replication_with_disruptions.py index 47a61f2fab4..77ee7e40618 100644 --- a/tests/cross_functional/system_test/test_mcg_replication_with_disruptions.py +++ b/tests/cross_functional/system_test/test_mcg_replication_with_disruptions.py @@ -1,3 +1,4 @@ +import json import logging import pytest @@ -20,6 +21,8 @@ magenta_squad, mcg, polarion_id, + skipif_aws_creds_are_missing, + skipif_disconnected_cluster, ) from ocs_ci.ocs.node import get_worker_nodes, get_node_objs from ocs_ci.ocs.bucket_utils import ( @@ -28,6 +31,10 @@ write_random_test_objects_to_bucket, upload_test_objects_to_source_and_wait_for_replication, update_replication_policy, + upload_random_objects_to_source_and_wait_for_replication, + get_replication_policy, + s3_put_bucket_versioning, + wait_for_object_versions_match, ) from ocs_ci.ocs import ocp from ocs_ci.ocs.resources.pvc import get_pvc_objs @@ -40,9 +47,17 @@ get_noobaa_core_pod, get_noobaa_pods, wait_for_noobaa_pods_running, + get_pod_node, + get_noobaa_endpoint_pods, ) + from ocs_ci.utility.retry import retry -from ocs_ci.ocs.exceptions import CommandFailed, ResourceWrongStatusException +from ocs_ci.ocs.exceptions import ( + CommandFailed, + ResourceWrongStatusException, + TimeoutExpiredError, +) + logger = logging.getLogger(__name__) @@ -310,7 +325,9 @@ def test_log_based_replication_with_disruptions( skip_any_features=["nsfs", "rgw kafka", "caching"], ) - mockup_logger, source_bucket, target_bucket = aws_log_based_replication_setup() + mockup_logger, _, source_bucket, target_bucket = ( + aws_log_based_replication_setup() + ) # upload test objects to the bucket and verify replication upload_test_objects_to_source_and_wait_for_replication( @@ -402,3 +419,363 @@ def test_log_based_replication_with_disruptions( object_amount=5, ) logger.info("No issues seen with the MCG bg feature validation") + + +@mcg +@magenta_squad +@system_test +@skipif_aws_creds_are_missing +@skipif_disconnected_cluster +class TestMCGReplicationWithVersioningSystemTest: + + @retry(CommandFailed, tries=7, delay=30) + def upload_objects_with_retry( + self, + mcg_obj_session, + source_bucket, + target_bucket, + mockup_logger, + file_dir, + pattern, + prefix, + num_versions=1, + ): + """ + Upload random objects to the bucket and retry if fails with + CommandFailed exception. + + Args: + mcg_obj_session (MCG): MCG object + source_bucket (OBC): Bucket object + target_bucket (OBC): Bucket object + mockup_logger (MockupLogger): Mockup logger object + file_dir (str): Source for generating objects + pattern (str): File object pattern + prefix (str): Prefix under which objects need to be uploaded + num_versions (int): Number of object versions + + """ + upload_random_objects_to_source_and_wait_for_replication( + mcg_obj_session, + source_bucket, + target_bucket, + mockup_logger, + file_dir, + pattern=pattern, + amount=1, + num_versions=num_versions, + prefix=prefix, + timeout=600, + ) + + @polarion_id("OCS-6407") + def test_bucket_replication_with_versioning_system_test( + self, + awscli_pod_session, + mcg_obj_session, + bucket_factory, + reduce_replication_delay, + nodes, + noobaa_db_backup_locally, + noobaa_db_recovery_from_local, + aws_log_based_replication_setup, + test_directory_setup, + setup_mcg_bg_features, + validate_mcg_bg_features, + ): + """ + System test to verify the bucket replication with versioning when there + are some disruptive and backup operations are performed. + + Steps: + + 1. Run MCG background feature setup and validation + 2. Setup two buckets with bi-directional replication enabled + 3. Upload object and verify replication works between the buckets + 4. Enable versioning on the buckets and also enable sync_versions=True on + replication policy as well + 5. Upload objects to second bucket and verify replication, version sync works + 6. Upload objects to first bucket and shutdown noobaa core pod node. Verify + replication and version sync works + 7. Upload objects to the second bucket and restart all noobaa pods. Verify + replication and version sync works + 8. Take the backup of Noobaa DB. + 9. Upload objects to the first bucket and verify replication works but not the + version sync + 10. Recover Noobaa DB from the backup + 11. Upload objects to the second bucket. Verify replication and version sync works + + """ + + feature_setup_map = setup_mcg_bg_features( + num_of_buckets=5, + object_amount=5, + is_disruptive=True, + skip_any_features=["nsfs", "rgw kafka", "caching"], + ) + + prefix_1 = "site_1" + prefix_2 = "site_2" + object_key = "ObjectKey-" + + # Reduce the replication delay to 1 minute + logger.info("Reduce the bucket replication delay cycle to 1 minute") + reduce_replication_delay() + + # Setup two buckets with bi-directional replication enabled + # deletion sync disabled + bucketclass_dict = { + "interface": "OC", + "backingstore_dict": {"aws": [(1, "eu-central-1")]}, + } + mockup_logger_source, mockup_logger_target, bucket_1, bucket_2 = ( + aws_log_based_replication_setup( + bucketclass_dict=bucketclass_dict, + bidirectional=True, + prefix_source=prefix_1, + prefix_target=prefix_2, + deletion_sync=False, + ) + ) + + # Upload object and verify that bucket replication works + logger.info(f"Uploading object {object_key} to the bucket {bucket_1.name}") + self.upload_objects_with_retry( + mcg_obj_session, + bucket_1, + bucket_2, + mockup_logger_source, + test_directory_setup.origin_dir, + pattern=object_key, + prefix=prefix_1, + ) + + # Enable object versioning on both the buckets + s3_put_bucket_versioning(mcg_obj_session, bucket_1.name) + s3_put_bucket_versioning(mcg_obj_session, bucket_2.name) + logger.info("Enabled object versioning for both the buckets") + + # Enable sync versions in both buckets replication policy + replication_1 = json.loads(get_replication_policy(bucket_name=bucket_2.name)) + replication_2 = json.loads(get_replication_policy(bucket_name=bucket_1.name)) + replication_1["rules"][0]["sync_versions"] = True + replication_2["rules"][0]["sync_versions"] = True + + update_replication_policy(bucket_2.name, replication_1) + update_replication_policy(bucket_1.name, replication_2) + logger.info( + "Enabled sync versions in the replication policy for both the buckets" + ) + + # Update previously uploaded object with new data and new version + self.upload_objects_with_retry( + mcg_obj_session, + bucket_2, + bucket_1, + mockup_logger_target, + test_directory_setup.origin_dir, + pattern=object_key, + prefix=prefix_2, + ) + logger.info( + f"Updated object {object_key} with new version data in bucket {bucket_2.name}" + ) + + wait_for_object_versions_match( + mcg_obj_session, + awscli_pod_session, + bucket_1.name, + bucket_2.name, + obj_key=f"{prefix_2}/{object_key}", + ) + logger.info( + f"Replication works from {bucket_2.name} to {bucket_1.name} and has all the versions of object {object_key}" + ) + + # Will perform disruptive operations and object uploads, version verifications + # parallely. + with ThreadPoolExecutor(max_workers=1) as executor: + + # Update object uploaded previously from the second bucket and + # then shutdown the noobaa core and db pod nodes + noobaa_pods = [ + get_noobaa_core_pod(), + get_noobaa_db_pod(), + ] + get_noobaa_endpoint_pods() + noobaa_pod_nodes = [get_pod_node(pod_obj) for pod_obj in noobaa_pods] + aws_cli_pod_node = get_pod_node(awscli_pod_session).name + for node_obj in noobaa_pod_nodes: + if node_obj.name != aws_cli_pod_node: + node_to_shutdown = [node_obj] + break + + logger.info( + f"Updating object {object_key} with new version data in bucket {bucket_1.name}" + ) + future = executor.submit( + self.upload_objects_with_retry, + mcg_obj_session, + bucket_1, + bucket_2, + mockup_logger_source, + test_directory_setup.origin_dir, + pattern=object_key, + prefix=prefix_1, + ) + + nodes.stop_nodes(node_to_shutdown) + logger.info(f"Stopped these noobaa pod nodes {node_to_shutdown}") + + # Wait for the upload to finish + future.result() + + wait_for_object_versions_match( + mcg_obj_session, + awscli_pod_session, + bucket_1.name, + bucket_2.name, + obj_key=f"{prefix_1}/{object_key}", + ) + logger.info( + f"Replication works from {bucket_1.name} to {bucket_2.name} and" + f" has all the versions of object {object_key}" + ) + + logger.info("Starting nodes now...") + nodes.start_nodes(nodes=node_to_shutdown) + wait_for_noobaa_pods_running() + + # Update object uploaded previously from the first bucket and then restart the noobaa pods + noobaa_pods = get_noobaa_pods() + logger.info( + f"Updating object {object_key} with new version data in bucket {bucket_2.name}" + ) + future = executor.submit( + self.upload_objects_with_retry, + mcg_obj_session, + bucket_2, + bucket_1, + mockup_logger_target, + test_directory_setup.origin_dir, + pattern=object_key, + prefix=prefix_2, + ) + for pod_obj in noobaa_pods: + pod_obj.delete(force=True) + logger.info(f"Deleted noobaa pod {pod_obj.name}") + logger.info("Restarted all Noobaa pods") + + # Wait for the upload to finish + future.result() + + wait_for_object_versions_match( + mcg_obj_session, + awscli_pod_session, + bucket_1.name, + bucket_2.name, + obj_key=f"{prefix_2}/{object_key}", + ) + logger.info( + f"Replication works from {bucket_2.name} to {bucket_1.name} " + f"and has all the versions of object {object_key}" + ) + future.result() + + # Take the noobaa db backup and then disable the sync versions + # make sure no version sync happens + logger.info("Taking backup of noobaa db") + cnpg_cluster_yaml, original_db_replica_count, secrets_obj = ( + noobaa_db_backup_locally() + ) + + logger.info("Disabling version sync for both the buckets") + replication_1["rules"][0]["sync_versions"] = False + replication_2["rules"][0]["sync_versions"] = False + + update_replication_policy(bucket_2.name, replication_1) + update_replication_policy(bucket_1.name, replication_2) + + # Change the replication cycle delay to 3 minutes + logger.info("Reduce the bucket replication delay cycle to 5 minutes") + reduce_replication_delay(interval=5) + + # Update previously uploaded object with new data and new version + self.upload_objects_with_retry( + mcg_obj_session, + bucket_1, + bucket_2, + mockup_logger_source, + test_directory_setup.origin_dir, + pattern=object_key, + prefix=prefix_1, + num_versions=4, + ) + logger.info( + f"Updated object {object_key} with new version data in bucket {bucket_1.name}" + ) + + try: + wait_for_object_versions_match( + mcg_obj_session, + awscli_pod_session, + bucket_1.name, + bucket_2.name, + obj_key=f"{prefix_1}/{object_key}", + ) + except TimeoutExpiredError: + logger.info( + f"Sync versions didnt work as expected, both {bucket_1.name} " + f"and {bucket_2.name} have different versions" + ) + else: + assert False, "Sync version worked even when sync_versions was disabled!!" + + # Recover the noobaa db from the backup and perform + # object deletion and verify deletion sync works + logger.info("Recovering noobaa db from backup") + noobaa_db_recovery_from_local( + cnpg_cluster_yaml, original_db_replica_count, secrets_obj + ) + wait_for_noobaa_pods_running(timeout=420) + + logger.info("Enabling version sync for both the buckets") + replication_1["rules"][0]["sync_versions"] = True + replication_2["rules"][0]["sync_versions"] = True + + update_replication_policy(bucket_2.name, replication_1) + update_replication_policy(bucket_1.name, replication_2) + + # Update previously uploaded object with new data and new version + self.upload_objects_with_retry( + mcg_obj_session, + bucket_1, + bucket_2, + mockup_logger_source, + test_directory_setup.origin_dir, + pattern=object_key, + prefix=prefix_1, + num_versions=4, + ) + logger.info( + f"Updated object {object_key} with new version data in bucket {bucket_1.name}" + ) + + wait_for_object_versions_match( + mcg_obj_session, + awscli_pod_session, + bucket_1.name, + bucket_2.name, + obj_key=f"{prefix_1}/{object_key}", + ) + logger.info( + f"Replication works from {bucket_1.name} to {bucket_2.name} and" + f" has all the versions of object {object_key}" + ) + + validate_mcg_bg_features( + feature_setup_map, + run_in_bg=False, + skip_any_features=["nsfs", "rgw kafka", "caching"], + object_amount=5, + ) + logger.info("No issues seen with the MCG bg feature validation")