From 2ced9f9b8890daedefc4c10681b92b41c2bc41e3 Mon Sep 17 00:00:00 2001 From: AaronH88 Date: Tue, 18 Mar 2025 10:50:56 +0000 Subject: [PATCH 1/3] Refactor how kubernetes.go reads the final lines of stdout --- pkg/workceptor/kubernetes.go | 84 ++++++++++++++++++++++-------------- 1 file changed, 51 insertions(+), 33 deletions(-) diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index a1f9d2d82..d148bb9a6 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -327,41 +327,42 @@ func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout successfulWrite := false remainingRetries := retries // resets on each successful read from pod stdout - for { - if *stdinErr != nil { - // fail to send stdin to pod, no need to continue - return + // get pod, with retry + for retries := 5; retries > 0; retries-- { + KubeAPIWrapperLock.Lock() + kw.pod, err = KubeAPIWrapperInstance.Get(kw.GetContext(), kw.clientset, podNamespace, podName, metav1.GetOptions{}) + KubeAPIWrapperLock.Unlock() + if err == nil { + break } + kw.GetWorkceptor().nc.GetLogger().Warning( + "Error getting pod %s/%s. Will retry %d more times. Error: %s", + podNamespace, + podName, + retries, + err, + ) + time.Sleep(time.Second) + } + if err != nil { + errMsg := fmt.Sprintf("Error getting pod %s/%s. Error: %s", podNamespace, podName, err) + kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - // get pod, with retry - for retries := 5; retries > 0; retries-- { - KubeAPIWrapperLock.Lock() - kw.pod, err = KubeAPIWrapperInstance.Get(kw.GetContext(), kw.clientset, podNamespace, podName, metav1.GetOptions{}) - KubeAPIWrapperLock.Unlock() - if err == nil { - break - } - kw.GetWorkceptor().nc.GetLogger().Warning( - "Error getting pod %s/%s. Will retry %d more times. Error: %s", - podNamespace, - podName, - retries, - err, - ) - time.Sleep(time.Second) - } - if err != nil { - errMsg := fmt.Sprintf("Error getting pod %s/%s. Error: %s", podNamespace, podName, err) - kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet - kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) + // fail to get pod, no need to continue + return + } - // fail to get pod, no need to continue - return - } + logStream, err := kw.kubeLoggingConnectionHandler(true, sinceTime) + if err != nil { + // fail to get log stream, no need to continue + return + } + defer logStream.Close() - logStream, err := kw.kubeLoggingConnectionHandler(true, sinceTime) - if err != nil { - // fail to get log stream, no need to continue + for { + if *stdinErr != nil { + // fail to send stdin to pod, no need to continue return } @@ -379,6 +380,25 @@ func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout return } + + if err == io.EOF { + if line != "" { + _, err = stdout.Write([]byte(line + "\n")) + if err != nil { + *stdoutErr = fmt.Errorf("writing final line to stdout: %s", err) + kw.GetWorkceptor().nc.GetLogger().Error("Error writing final line to stdout: %s", err) + + return + } + } + kw.GetWorkceptor().nc.GetLogger().Info("Detected EOF for pod %s/%s.", + podNamespace, + podName, + ) + + return + } + kw.GetWorkceptor().nc.GetLogger().Info( "Detected Error: %s for pod %s/%s. Will retry %d more times.", err, @@ -439,8 +459,6 @@ func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout remainingRetries = retries // each time we read successfully, reset this counter successfulWrite = true } - - logStream.Close() } } From 6c18e75afe29da0be8c544a0a82416b62949576a Mon Sep 17 00:00:00 2001 From: AaronH88 Date: Fri, 21 Mar 2025 11:30:32 +0000 Subject: [PATCH 2/3] Scope this refactor to just fixing existing error --- pkg/workceptor/kubernetes.go | 64 ++++++++++++++++++------------------ 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/pkg/workceptor/kubernetes.go b/pkg/workceptor/kubernetes.go index d148bb9a6..d07f64aba 100644 --- a/pkg/workceptor/kubernetes.go +++ b/pkg/workceptor/kubernetes.go @@ -327,42 +327,41 @@ func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout successfulWrite := false remainingRetries := retries // resets on each successful read from pod stdout - // get pod, with retry - for retries := 5; retries > 0; retries-- { - KubeAPIWrapperLock.Lock() - kw.pod, err = KubeAPIWrapperInstance.Get(kw.GetContext(), kw.clientset, podNamespace, podName, metav1.GetOptions{}) - KubeAPIWrapperLock.Unlock() - if err == nil { - break + for { + if *stdinErr != nil { + // fail to send stdin to pod, no need to continue + return } - kw.GetWorkceptor().nc.GetLogger().Warning( - "Error getting pod %s/%s. Will retry %d more times. Error: %s", - podNamespace, - podName, - retries, - err, - ) - time.Sleep(time.Second) - } - if err != nil { - errMsg := fmt.Sprintf("Error getting pod %s/%s. Error: %s", podNamespace, podName, err) - kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet - kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - // fail to get pod, no need to continue - return - } + // get pod, with retry + for retries := 5; retries > 0; retries-- { + KubeAPIWrapperLock.Lock() + kw.pod, err = KubeAPIWrapperInstance.Get(kw.GetContext(), kw.clientset, podNamespace, podName, metav1.GetOptions{}) + KubeAPIWrapperLock.Unlock() + if err == nil { + break + } + kw.GetWorkceptor().nc.GetLogger().Warning( + "Error getting pod %s/%s. Will retry %d more times. Error: %s", + podNamespace, + podName, + retries, + err, + ) + time.Sleep(time.Second) + } + if err != nil { + errMsg := fmt.Sprintf("Error getting pod %s/%s. Error: %s", podNamespace, podName, err) + kw.GetWorkceptor().nc.GetLogger().Error(errMsg) //nolint:govet + kw.UpdateBasicStatus(WorkStateFailed, errMsg, 0) - logStream, err := kw.kubeLoggingConnectionHandler(true, sinceTime) - if err != nil { - // fail to get log stream, no need to continue - return - } - defer logStream.Close() + // fail to get pod, no need to continue + return + } - for { - if *stdinErr != nil { - // fail to send stdin to pod, no need to continue + logStream, err := kw.kubeLoggingConnectionHandler(true, sinceTime) + if err != nil { + // fail to get log stream, no need to continue return } @@ -459,6 +458,7 @@ func (kw *KubeUnit) KubeLoggingWithReconnect(streamWait *sync.WaitGroup, stdout remainingRetries = retries // each time we read successfully, reset this counter successfulWrite = true } + logStream.Close() } } From 9f7921b84d73219a93aed2fe7f30f9823af6ed74 Mon Sep 17 00:00:00 2001 From: AaronH88 Date: Fri, 21 Mar 2025 17:37:22 +0000 Subject: [PATCH 3/3] Add test for 503 error --- pkg/workceptor/kubernetes_test.go | 45 +++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/pkg/workceptor/kubernetes_test.go b/pkg/workceptor/kubernetes_test.go index 1c66371a3..f524e1bad 100644 --- a/pkg/workceptor/kubernetes_test.go +++ b/pkg/workceptor/kubernetes_test.go @@ -541,6 +541,51 @@ func TestKubeLoggingWithReconnect(t *testing.T) { mockKubeAPI.EXPECT().NewSPDYExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&exec, nil).AnyTimes() }, }, + { + name: "Kube error 503", + expectedCalls: func() { + mockBaseWorkUnit.EXPECT().UpdateBasicStatus(gomock.Any(), gomock.Any(), gomock.Any()).AnyTimes() + config := rest.Config{} + mockKubeAPI.EXPECT().InClusterConfig().Return(&config, nil) + mockBaseWorkUnit.EXPECT().GetWorkceptor().Return(w).AnyTimes() + clientset := kubernetes.Clientset{} + mockKubeAPI.EXPECT().NewForConfig(gomock.Any()).Return(&clientset, nil) + lock := &sync.RWMutex{} + mockBaseWorkUnit.EXPECT().GetStatusLock().Return(lock).AnyTimes() + mockBaseWorkUnit.EXPECT().MonitorLocalStatus().AnyTimes() + mockBaseWorkUnit.EXPECT().UnitDir().Return("TestDir2").AnyTimes() + kubeExtraData := workceptor.KubeExtraData{} + status := workceptor.StatusFileData{ExtraData: &kubeExtraData} + mockBaseWorkUnit.EXPECT().GetStatusWithoutExtraData().Return(&status).AnyTimes() + mockBaseWorkUnit.EXPECT().GetStatusCopy().Return(status).AnyTimes() + mockBaseWorkUnit.EXPECT().GetContext().Return(ctx).AnyTimes() + pod := corev1.Pod{TypeMeta: metav1.TypeMeta{}, ObjectMeta: metav1.ObjectMeta{Name: "Test_Name"}, Spec: corev1.PodSpec{}, Status: corev1.PodStatus{}} + mockKubeAPI.EXPECT().Create(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes() + mockBaseWorkUnit.EXPECT().UpdateFullStatus(gomock.Any()).AnyTimes() + field := hasTerm{} + mockKubeAPI.EXPECT().OneTermEqualSelector(gomock.Any(), gomock.Any()).Return(&field).AnyTimes() + ev := watch.Event{Object: &pod} + mockKubeAPI.EXPECT().UntilWithSync(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&ev, nil).AnyTimes() + mockKubeAPI.EXPECT().Get(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(&pod, nil).AnyTimes() + req := fakerest.RESTClient{ + Client: fakerest.CreateHTTPClient(func(request *http.Request) (*http.Response, error) { + resp := &http.Response{ + StatusCode: http.StatusServiceUnavailable, // 503 + Body: io.NopCloser(strings.NewReader("kube error")), + } + + return resp, nil + }), + NegotiatedSerializer: scheme.Codecs.WithoutConversion(), + } + mockKubeAPI.EXPECT().GetLogs(gomock.Any(), gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes() + logger := logger.NewReceptorLogger("") + mockNetceptor.EXPECT().GetLogger().Return(logger).AnyTimes() + mockKubeAPI.EXPECT().SubResource(gomock.Any(), gomock.Any(), gomock.Any()).Return(req.Request()).AnyTimes() + exec := ex{} + mockKubeAPI.EXPECT().NewSPDYExecutor(gomock.Any(), gomock.Any(), gomock.Any()).Return(&exec, nil).AnyTimes() + }, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) {