diff --git a/pkg/controlsvc/controlsvc.go b/pkg/controlsvc/controlsvc.go index f73a2a9d1..31ac2355b 100644 --- a/pkg/controlsvc/controlsvc.go +++ b/pkg/controlsvc/controlsvc.go @@ -123,21 +123,27 @@ func (s *Server) AddControlFunc(name string, cType ControlCommandType) error { return nil } +func errorNormal(err error) bool { + return strings.HasSuffix(err.Error(), "normal close") +} + // RunControlSession runs the server protocol on the given connection. func (s *Server) RunControlSession(conn net.Conn) { - logger.Info("Client connected to control service %s\n", conn.RemoteAddr().String()) + logger.Debug("Client connected to control service %s\n", conn.RemoteAddr().String()) defer func() { - logger.Info("Client disconnected from control service %s\n", conn.RemoteAddr().String()) + logger.Debug("Client disconnected from control service %s\n", conn.RemoteAddr().String()) if conn != nil { err := conn.Close() if err != nil { - logger.Error("Error closing connection: %s\n", err) + logger.Warning("Could not close connection: %s\n", err) } } }() _, err := conn.Write([]byte(fmt.Sprintf("Receptor Control, node %s\n", s.nc.NodeID()))) if err != nil { - logger.Error("Write error in control service: %s\n", err) + if !errorNormal(err) { + logger.Error("Could not write in control service: %s\n", err) + } return } @@ -150,12 +156,14 @@ func (s *Server) RunControlSession(conn net.Conn) { for { n, err := conn.Read(buf) if err == io.EOF { - logger.Info("Control service closed\n") + logger.Debug("Control service closed\n") done = true break } else if err != nil { - logger.Error("Read error in control service: %s\n", err) + if !errorNormal(err) { + logger.Warning("Could not read in control service: %s\n", err) + } return } @@ -190,7 +198,9 @@ func (s *Server) RunControlSession(conn net.Conn) { if err != nil { _, err = conn.Write([]byte(fmt.Sprintf("ERROR: %s\n", err))) if err != nil { - logger.Error("Write error in control service: %s\n", err) + if !errorNormal(err) { + logger.Error("Write error in control service: %s\n", err) + } return } @@ -231,10 +241,14 @@ func (s *Server) RunControlSession(conn net.Conn) { cfr, err = cc.ControlFunc(ctx, s.nc, cfo) } if err != nil { - logger.Error(err.Error()) + if !errorNormal(err) { + logger.Error(err.Error()) + } _, err = conn.Write([]byte(fmt.Sprintf("ERROR: %s\n", err))) if err != nil { - logger.Error("Write error in control service: %s\n", err) + if !errorNormal(err) { + logger.Error("Write error in control service: %s\n", err) + } return } @@ -243,7 +257,9 @@ func (s *Server) RunControlSession(conn net.Conn) { if err != nil { _, err = conn.Write([]byte(fmt.Sprintf("ERROR: could not convert response to JSON: %s\n", err))) if err != nil { - logger.Error("Write error in control service: %s\n", err) + if !errorNormal(err) { + logger.Error("Write error in control service: %s\n", err) + } return } @@ -251,7 +267,9 @@ func (s *Server) RunControlSession(conn net.Conn) { rbytes = append(rbytes, '\n') _, err = conn.Write(rbytes) if err != nil { - logger.Error("Write error in control service: %s\n", err) + if !errorNormal(err) { + logger.Error("Write error in control service: %s\n", err) + } return } @@ -259,7 +277,9 @@ func (s *Server) RunControlSession(conn net.Conn) { } else { _, err = conn.Write([]byte("ERROR: Unknown command\n")) if err != nil { - logger.Error("Write error in control service: %s\n", err) + if !errorNormal(err) { + logger.Error("Write error in control service: %s\n", err) + } return } diff --git a/pkg/netceptor/conn.go b/pkg/netceptor/conn.go index 05ec07565..93392752a 100644 --- a/pkg/netceptor/conn.go +++ b/pkg/netceptor/conn.go @@ -393,7 +393,7 @@ func monitorUnreachable(pc *PacketConn, doneChan chan struct{}, remoteAddr Addr, // read from channel until closed for msg := range msgCh { if msg.Problem == ProblemServiceUnknown && msg.ToNode == remoteAddr.node && msg.ToService == remoteAddr.service { - logger.Error("remote service unreachable") + logger.Warning("remote service %s to node %s is unreachable", msg.ToService, msg.ToNode) cancel() } } diff --git a/pkg/pkg.go b/pkg/pkg.go index eddf2b573..48e916627 100644 --- a/pkg/pkg.go +++ b/pkg/pkg.go @@ -36,7 +36,7 @@ type Receptor struct { Controllers *controlsvc.Controllers `mapstructure:"controllers"` } -// Serve launches an receptor instance and blocks until canceled or failed. +// Serve launches an receptor instance and blocks until cancelled or failed. func (r Receptor) Serve(ctx context.Context) error { logger.SetShowTrace(r.EnableTracing) diff --git a/pkg/workceptor/controlsvc.go b/pkg/workceptor/controlsvc.go index f0b3a6d0e..5fc566342 100644 --- a/pkg/workceptor/controlsvc.go +++ b/pkg/workceptor/controlsvc.go @@ -421,6 +421,7 @@ func (c *workceptorCommand) ControlFunc(ctx context.Context, nc *netceptor.Netce if err != nil { return nil, err } + err = cfo.Close() if err != nil { return nil, err diff --git a/pkg/workceptor/remote_work.go b/pkg/workceptor/remote_work.go index 26a8109cd..c95a8de52 100644 --- a/pkg/workceptor/remote_work.go +++ b/pkg/workceptor/remote_work.go @@ -83,7 +83,7 @@ func (rw *remoteUnit) getConnection(ctx context.Context) (net.Conn, *bufio.Reade if err == nil { return conn, reader } - logger.Warning("Connection to %s failed with error: %s", + logger.Debug("Connection to %s failed with error: %s", rw.Status().ExtraData.(*remoteExtraData).RemoteNode, err) errStr := err.Error() if strings.Contains(errStr, "CRYPTO_ERROR") { @@ -470,7 +470,13 @@ func (rw *remoteUnit) monitorRemoteStdout(mw *utils.JobContext) { _, err = io.Copy(stdout, conn) close(doneChan) if err != nil { - logger.Warning("Error copying to stdout file %s: %s\n", rw.stdoutFileName, err) + var errmsg string + if strings.HasSuffix(err.Error(), "error code 499") { + errmsg = "read operation cancelled" + } else { + errmsg = err.Error() + } + logger.Warning("Could not copy to stdout file %s: %s\n", rw.stdoutFileName, errmsg) continue } diff --git a/pkg/workceptor/workceptor.go b/pkg/workceptor/workceptor.go index 705566c57..f36934656 100644 --- a/pkg/workceptor/workceptor.go +++ b/pkg/workceptor/workceptor.go @@ -561,7 +561,7 @@ func (w *Workceptor) GetResults(ctx context.Context, unitID string, startPos int if err == io.EOF { stdoutSize := stdoutSize(unitdir) if IsComplete(unit.Status().State) && stdoutSize >= unit.Status().StdoutSize { - logger.Info("Stdout complete - closing channel for: %s \n", unitID) + logger.Debug("Stdout complete - closing channel for: %s \n", unitID) return }