From e3e204e6f6726f6322b7dc2f6f462b58429d75e4 Mon Sep 17 00:00:00 2001 From: zhuyi1159 <1159751291@qq.com> Date: Wed, 9 Apr 2025 20:46:14 +0800 Subject: [PATCH] stash --- apis/dataprotection/v1alpha1/backup_types.go | 5 +- .../dataprotection.kubeblocks.io_backups.yaml | 1 + .../dataprotection/backup_controller.go | 66 +++++++++++++++++++ .../dataprotection.kubeblocks.io_backups.yaml | 1 + pkg/operations/stop.go | 54 ++++++++++++++- 5 files changed, 125 insertions(+), 2 deletions(-) diff --git a/apis/dataprotection/v1alpha1/backup_types.go b/apis/dataprotection/v1alpha1/backup_types.go index ce9739abaca..ca66a16fd60 100644 --- a/apis/dataprotection/v1alpha1/backup_types.go +++ b/apis/dataprotection/v1alpha1/backup_types.go @@ -244,7 +244,7 @@ const ( // BackupPhase describes the lifecycle phase of a Backup. // +enum -// +kubebuilder:validation:Enum={New,InProgress,Running,Completed,Failed,Deleting} +// +kubebuilder:validation:Enum={New,InProgress,Running,Completed,Failed,Deleting,Paused} type BackupPhase string const ( @@ -264,6 +264,9 @@ const ( // BackupPhaseDeleting means the backup and all its associated data are being deleted. BackupPhaseDeleting BackupPhase = "Deleting" + + // BackupPhasePaused means the backup is paused. + BackupPhasePaused BackupPhase = "Paused" ) type ActionStatus struct { diff --git a/config/crd/bases/dataprotection.kubeblocks.io_backups.yaml b/config/crd/bases/dataprotection.kubeblocks.io_backups.yaml index 363975c4c09..08de01bc05d 100644 --- a/config/crd/bases/dataprotection.kubeblocks.io_backups.yaml +++ b/config/crd/bases/dataprotection.kubeblocks.io_backups.yaml @@ -1125,6 +1125,7 @@ spec: - Completed - Failed - Deleting + - Paused type: string startTimestamp: description: |- diff --git a/controllers/dataprotection/backup_controller.go b/controllers/dataprotection/backup_controller.go index 721201d4dbe..e6dfe3aa86f 100644 --- a/controllers/dataprotection/backup_controller.go +++ b/controllers/dataprotection/backup_controller.go @@ -131,6 +131,8 @@ func (r *BackupReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctr return r.handleCompletedPhase(reqCtx, backup) case dpv1alpha1.BackupPhaseDeleting: return r.handleDeletingPhase(reqCtx, backup) + case dpv1alpha1.BackupPhasePaused: + return r.handlePausedPhase(reqCtx, backup) case dpv1alpha1.BackupPhaseFailed: if backup.Labels[dptypes.BackupTypeLabelKey] == string(dpv1alpha1.BackupTypeContinuous) { if backup.Status.StartTimestamp.IsZero() { @@ -799,6 +801,70 @@ func (r *BackupReconciler) deleteRelatedBackups( return nil } +func (r *BackupReconciler) handlePausedPhase( + reqCtx intctrlutil.RequestCtx, + backup *dpv1alpha1.Backup) (ctrl.Result, error) { + + // 1. 挂起所有关联的备份Job + if requeue, err := r.suspendBackupJobs(reqCtx, backup); err != nil { + return intctrlutil.RequeueWithError(err, reqCtx.Log, "failed to suspend backup jobs") + } else if requeue { + return intctrlutil.RequeueAfter(5*time.Second, reqCtx.Log, "waiting for job suspension") + } + + // 2. 更新备份状态为Paused(如果尚未更新) + if backup.Status.Phase != dpv1alpha1.BackupPhasePaused { + patch := client.MergeFrom(backup.DeepCopy()) + backup.Status.Phase = dpv1alpha1.BackupPhasePaused + backup.Status.CompletionTimestamp = &metav1.Time{Time: r.clock.Now().UTC()} + // 记录暂停持续时间 + if backup.Status.StartTimestamp != nil { + duration := backup.Status.CompletionTimestamp.Sub(backup.Status.StartTimestamp.Time).Round(time.Second) + backup.Status.Duration = &metav1.Duration{Duration: duration} + } + if err := r.Client.Status().Patch(reqCtx.Ctx, backup, patch); err != nil { + return intctrlutil.CheckedRequeueWithError(err, reqCtx.Log, "") + } + r.Recorder.Event(backup, corev1.EventTypeNormal, "BackupPaused", "Backup jobs have been suspended") + } + + return intctrlutil.Reconciled() +} + +// suspendBackupJobs 挂起所有关联的Job但不删除 +func (r *BackupReconciler) suspendBackupJobs( + reqCtx intctrlutil.RequestCtx, + backup *dpv1alpha1.Backup) (bool, error) { + + // 获取所有关联的Job + jobList := &batchv1.JobList{} + labels := dpbackup.BuildBackupWorkloadLabels(backup) + if err := r.Client.List(reqCtx.Ctx, jobList, + client.InNamespace(backup.Namespace), + client.MatchingLabels(labels)); err != nil { + return false, fmt.Errorf("failed to list backup jobs: %w", err) + } + + var needRequeue bool + for _, job := range jobList.Items { + // 如果已经是挂起状态则跳过 + if job.Spec.Suspend != nil && *job.Spec.Suspend { + continue + } + + // 更新Job为挂起状态 + patch := client.MergeFrom(job.DeepCopy()) + job.Spec.Suspend = pointer.Bool(true) + if err := r.Client.Patch(reqCtx.Ctx, &job, patch); err != nil { + return false, fmt.Errorf("failed to suspend job %s: %w", job.Name, err) + } + reqCtx.Log.Info("suspended backup job", "job", job.Name) + needRequeue = true // 需要等待Job状态更新 + } + + return needRequeue, nil +} + // PatchBackupObjectMeta patches backup object metaObject include cluster snapshot. func PatchBackupObjectMeta( original *dpv1alpha1.Backup, diff --git a/deploy/helm/crds/dataprotection.kubeblocks.io_backups.yaml b/deploy/helm/crds/dataprotection.kubeblocks.io_backups.yaml index 363975c4c09..08de01bc05d 100644 --- a/deploy/helm/crds/dataprotection.kubeblocks.io_backups.yaml +++ b/deploy/helm/crds/dataprotection.kubeblocks.io_backups.yaml @@ -1125,6 +1125,7 @@ spec: - Completed - Failed - Deleting + - Paused type: string startTimestamp: description: |- diff --git a/pkg/operations/stop.go b/pkg/operations/stop.go index bb5b0d888e0..b8197acc725 100644 --- a/pkg/operations/stop.go +++ b/pkg/operations/stop.go @@ -28,7 +28,9 @@ import ( "sigs.k8s.io/controller-runtime/pkg/client" appsv1 "github.com/apecloud/kubeblocks/apis/apps/v1" + dpv1alpha1 "github.com/apecloud/kubeblocks/apis/dataprotection/v1alpha1" opsv1alpha1 "github.com/apecloud/kubeblocks/apis/operations/v1alpha1" + "github.com/apecloud/kubeblocks/pkg/constant" intctrlcomp "github.com/apecloud/kubeblocks/pkg/controller/component" intctrlutil "github.com/apecloud/kubeblocks/pkg/controllerutil" ) @@ -129,10 +131,60 @@ func (stop StopOpsHandler) ReconcileAction(reqCtx intctrlutil.RequestCtx, cli cl return expectProgressCount, completedCount, nil } compOpsHelper := newComponentOpsHelper(opsRes.OpsRequest.Spec.StopList) - return compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes, "stop", handleComponentProgress) + + phase, duration, err := compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes, "stop", handleComponentProgress) + + // 新增逻辑:当集群进入停止状态时暂停相关备份 + if opsRes.Cluster.Status.Phase == appsv1.StoppingClusterPhase || opsRes.Cluster.Status.Phase == appsv1.StoppedClusterPhase { + if err := pauseRelatedBackups(reqCtx, cli, opsRes.Cluster); err != nil { + return opsv1alpha1.OpsFailedPhase, 0, err + } + } + + return phase, duration, err + //return compOpsHelper.reconcileActionWithComponentOps(reqCtx, cli, opsRes, "stop", handleComponentProgress) } // SaveLastConfiguration records last configuration to the OpsRequest.status.lastConfiguration func (stop StopOpsHandler) SaveLastConfiguration(reqCtx intctrlutil.RequestCtx, cli client.Client, opsRes *OpsResource) error { return nil } + +// pauseRelatedBackups 暂停与集群关联的所有运行中的备份 +func pauseRelatedBackups(reqCtx intctrlutil.RequestCtx, cli client.Client, cluster *appsv1.Cluster) error { + // 1. 通过标签获取关联的所有Backup资源 + backupList := &dpv1alpha1.BackupList{} + labels := client.MatchingLabels{ + constant.AppInstanceLabelKey: cluster.Name, // 假设Backup使用该标签关联Cluster + } + if err := cli.List(reqCtx.Ctx, backupList, client.InNamespace(cluster.Namespace), labels); err != nil { + return err + } + + // 2. 过滤出需要暂停的备份 + var needUpdateBackups []*dpv1alpha1.Backup + for i := range backupList.Items { + backup := &backupList.Items[i] + if backup.Status.Phase == dpv1alpha1.BackupPhaseRunning { + needUpdateBackups = append(needUpdateBackups, backup) + } + } + + // 3. 批量更新备份状态为Paused + for _, backup := range needUpdateBackups { + patch := client.MergeFrom(backup.DeepCopy()) + backup.Status.Phase = dpv1alpha1.BackupPhasePaused + backup.Status.CompletionTimestamp = &metav1.Time{Time: time.Now()} + if backup.Status.StartTimestamp != nil { + duration := backup.Status.CompletionTimestamp.Sub(backup.Status.StartTimestamp.Time).Round(time.Second) + backup.Status.Duration = &metav1.Duration{Duration: duration} + } + if err := cli.Status().Patch(reqCtx.Ctx, backup, patch); err != nil { + return err + } + reqCtx.Log.Info("paused backup due to cluster stopping", + "backup", client.ObjectKeyFromObject(backup), + "cluster", cluster.Name) + } + return nil +}