8000 [Processor] Propagate allocation metrics by rokatyy · Pull Request #3575 · nuclio/nuclio · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

[Processor] Propagate allocation metrics #3575

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Apr 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 4 additions & 3 deletions pkg/processor/eventprocessor/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"github.com/nuclio/nuclio/pkg/common"
"github.com/nuclio/nuclio/pkg/errgroup"
"github.com/nuclio/nuclio/pkg/processor/statistics"

"github.com/nuclio/errors"
"github.com/nuclio/logger"
Expand Down Expand Up @@ -162,16 +163,16 @@ func (sa *syncPoolAllocator) SetObjects(objects []EventProcessor) error {
// GetStatistics returns object allocator statistics
// return unsafe copy of the statistics to avoid any unnecessary blocking of the actual statistics object
// used in gatherers which are thread-safe
func (sa *syncPoolAllocator) GetStatistics() *AllocatorStatistics {
statistics := &AllocatorStatistics{
func (sa *syncPoolAllocator) GetStatistics() *statistics.AllocatorStatistics {
allocatorStatistics := &statistics.AllocatorStatistics{
AllocationCount: sa.statistics.AllocationCount.Load(),
AllocationSuccessImmediateTotal: sa.statistics.AllocationSuccessImmediateTotal.Load(),
AllocationSuccessAfterWaitTotal: sa.statistics.AllocationSuccessAfterWaitTotal.Load(),
AllocationTimeoutTotal: sa.statistics.AllocationTimeoutTotal.Load(),
AllocationWaitDurationMilliSecondsSum: sa.statistics.AllocationWaitDurationMilliSecondsSum.Load(),
AllocationObjectsAvailablePercentage: sa.statistics.AllocationObjectsAvailablePercentage.Load(),
}
return statistics
return allocatorStatistics
}

func (sa *syncPoolAllocator) SignalDraining() error {
Expand Down
6 changes: 4 additions & 2 deletions pkg/processor/eventprocessor/singleton.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"errors"
"time"

"github.com/nuclio/nuclio/pkg/processor/statistics"

"github.com/nuclio/logger"
)

Expand Down Expand Up @@ -64,8 +66,8 @@ func (s *asyncSingletonAllocator) GetNumObjectsAvailable() int {
}

// GetStatistics returns allocator statistics
func (s *asyncSingletonAllocator) GetStatistics() *AllocatorStatistics {
return nil
func (s *asyncSingletonAllocator) GetStatistics() *statistics.AllocatorStatistics {
return s.object.GetAllocationStatistics()
}

func (s *asyncSingletonAllocator) SignalDraining() error {
Expand Down
28 changes: 0 additions & 28 deletions pkg/processor/eventprocessor/statistics.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,34 +18,6 @@ package eventprocessor

import "sync/atomic"

type Statistics struct {
EventsHandledSuccess uint64
EventsHandledError uint64
}

// AllocatorStatistics is not a safe statistics object and should be used only to copy safe object to it and return to outside
// so it can later be taken by metric gatherers
type AllocatorStatistics struct {
AllocationCount uint64
AllocationSuccessImmediateTotal uint64
AllocationSuccessAfterWaitTotal uint64
AllocationTimeoutTotal uint64
AllocationWaitDurationMilliSecondsSum uint64
AllocationObjectsAvailablePercentage uint64
}

func (s *AllocatorStatistics) DiffFrom(prev *AllocatorStatistics) AllocatorStatistics {

return AllocatorStatistics{
AllocationCount: s.AllocationCount - prev.AllocationCount,
AllocationSuccessImmediateTotal: s.AllocationSuccessImmediateTotal - prev.AllocationSuccessImmediateTotal,
AllocationSuccessAfterWaitTotal: s.AllocationSuccessAfterWaitTotal - prev.AllocationSuccessAfterWaitTotal,
AllocationTimeoutTotal: s.AllocationTimeoutTotal - prev.AllocationTimeoutTotal,
AllocationWaitDurationMilliSecondsSum: s.AllocationWaitDurationMilliSecondsSum - prev.AllocationWaitDurationMilliSecondsSum,
AllocationObjectsAvailablePercentage: s.AllocationObjectsAvailablePercentage - prev.AllocationObjectsAvailablePercentage,
}
}

// safeAllocatorStatistics is a safe statistics object, for outside usages use AllocatorStatistics
type safeAllocatorStatistics struct {
AllocationCount atomic.Uint64
Expand Down
8 changes: 6 additions & 2 deletions pkg/processor/eventprocessor/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/nuclio/nuclio/pkg/processor/cloudevent"
"github.com/nuclio/nuclio/pkg/processor/controlcommunication"
"github.com/nuclio/nuclio/pkg/processor/runtime"
"github.com/nuclio/nuclio/pkg/processor/statistics"

"github.com/nuclio/errors"
"github.com/nuclio/logger"
Expand All @@ -49,7 +50,7 @@ type Allocator interface {
GetNumObjectsAvailable() int

// GetStatistics returns allocator statistics
GetStatistics() *AllocatorStatistics
GetStatistics() *statistics.AllocatorStatistics

// SignalDraining signals all event processors to drain events
SignalDraining() error
Expand Down Expand Up @@ -99,7 +100,10 @@ type EventProcessor interface {
Stop() error

// GetStatistics returns event processing statistics, s 6DAF uch as counts and latencies
GetStatistics() *Statistics
GetStatistics() *statistics.EventProcessingStatistics

// GetAllocationStatistics returns allocation statistics, such as allocation time and number of allocations
GetAllocationStatistics() *statistics.AllocatorStatistics

// GetStructuredCloudEvent retrieves the last processed structured CloudEvent
GetStructuredCloudEvent() *cloudevent.Structured
Expand Down
7 changes: 7 additions & 0 deletions pkg/processor/runtime/golang/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (

"github.com/nuclio/nuclio/pkg/common/status"
"github.com/nuclio/nuclio/pkg/processor/runtime"
"github.com/nuclio/nuclio/pkg/processor/statistics"

"github.com/nuclio/errors"
"github.com/nuclio/logger"
Expand Down Expand Up @@ -119,6 +120,12 @@ func (g *golang) ProcessBatch(batch []nuclio.Event, functionLogger logger.Logger
return nil, nuclio.ErrNotImplemented
}

// GetAllocationStatistics returns the statistics of the allocator if there is any in the runtime
// go runtime doesn't have any allocator in it, so return nil
func (g *golang) GetAllocationStatistics() *statistics.AllocatorStatistics {
return nil
}

func (g *golang) Restart() error {
if err := g.Stop(); err != nil {
return errors.Wrap(err, "Failed to stop golang runtime")
Expand Down
5 changes: 5 additions & 0 deletions pkg/processor/runtime/rpc/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/nuclio/nuclio/pkg/processor/eventprocessor"
"github.com/nuclio/nuclio/pkg/processor/runtime"
"github.com/nuclio/nuclio/pkg/processor/runtime/rpc/connection"
"github.com/nuclio/nuclio/pkg/processor/statistics"
"github.com/nuclio/nuclio/pkg/processwaiter"

"github.com/nuclio/errors"
Expand Down Expand Up @@ -90,6 +91,10 @@ func (r *AbstractRuntime) ProcessBatch(batch []nuclio.Event, functionLogger logg
return r.processBatchAndWaitForResult(batch, functionLogger)
}

func (r *AbstractRuntime) GetAllocationStatistics() *statistics.AllocatorStatistics {
return r.connectionManager.GetAllocationStatistics()
}

// Stop stops the runtime
func (r *AbstractRuntime) Stop() error {
r.Logger.WarnWith("Stopping",
Expand Down
13 changes: 12 additions & 1 deletion pkg/processor/runtime/rpc/connection/abstract.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/nuclio/nuclio/pkg/processor/runtime/rpc/controlmessagebroker"
"github.com/nuclio/nuclio/pkg/processor/runtime/rpc/encoder"
"github.com/nuclio/nuclio/pkg/processor/runtime/rpc/result"
"github.com/nuclio/nuclio/pkg/processor/statistics"

"github.com/mitchellh/mapstructure"
"github.com/nuclio/errors"
Expand Down Expand Up @@ -88,6 +89,10 @@ func (bc *AbstractConnectionManager) UpdateStatistics(durationSec float64) {
bc.Configuration.Statistics.DurationMilliSecondsSum += uint64(durationSec * 1000)
}

func (bc *AbstractConnectionManager) GetAllocationStatistics() *statistics.AllocatorStatistics {
return bc.allocator.GetStatistics()
}

func (bc *AbstractConnectionManager) GetConfig() ManagerConfigration {
return *bc.Configuration
}
Expand Down Expand Up @@ -468,7 +473,13 @@ func (be *AbstractEventConnection) RunHandler() {
}

// GetStatistics returns a pointer to the statistics object. This must not be modified by the reader
func (be *AbstractEventConnection) GetStatistics() *eventprocessor.Statistics {
func (be *AbstractEventConnection) GetStatistics() *statistics.EventProcessingStatistics {
return nil
}

// GetAllocationStatistics returns the statistics of the allocator if there is any in the runtime
// AbstractEventConnection runtime doesn't have any allocator in it, so return nil
func (be *AbstractEventConnection) GetAllocationStatistics() *statistics.AllocatorStatistics {
return nil
}

Expand Down
4 changes: 4 additions & 0 deletions pkg/processor/runtime/rpc/connection/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"github.com/nuclio/nuclio/pkg/processor/runtime"
"github.com/nuclio/nuclio/pkg/processor/runtime/rpc/encoder"
"github.com/nuclio/nuclio/pkg/processor/runtime/rpc/result"
"github.com/nuclio/nuclio/pkg/processor/statistics"

"github.com/nuclio/logger"
)
Expand Down Expand Up @@ -54,6 +55,9 @@ type ConnectionManager interface {
// duration of an event or process, specified in seconds
UpdateStatistics(durationSec float64)

// GetAllocationStatistics retrieves the current statistics of the ConnectionManager allocator
GetAllocationStatistics() *statistics.AllocatorStatistics

// SetStatus updates the operational status of the ConnectionManager
SetStatus(status.Status)

Expand Down
5 changes: 5 additions & 0 deletions pkg/processor/runtime/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/nuclio/nuclio/pkg/common/status"
"github.com/nuclio/nuclio/pkg/processor/controlcommunication"
"github.com/nuclio/nuclio/pkg/processor/databinding"
"github.com/nuclio/nuclio/pkg/processor/statistics"

"github.com/nuclio/errors"
"github.com/nuclio/logger"
Expand All @@ -44,6 +45,10 @@ type Runtime interface {
// GetStatistics returns statistics gathered by the runtime
GetStatistics() *Statistics

// GetAllocationStatistics returns statistics gathered by the allocator
// if there is any in the runtime, otherwise returns nil
GetAllocationStatistics() *statistics.AllocatorStatistics

// GetConfiguration returns the runtime configuration
GetConfiguration() *Configuration

Expand Down
7 changes: 7 additions & 0 deletions pkg/processor/runtime/shell/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import (
"github.com/nuclio/nuclio/pkg/common/status"
"github.com/nuclio/nuclio/pkg/functionconfig"
"github.com/nuclio/nuclio/pkg/processor/runtime"
"github.com/nuclio/nuclio/pkg/processor/statistics"

"github.com/nuclio/errors"
"github.com/nuclio/logger"
Expand Down Expand Up @@ -176,6 +177,12 @@ func (s *shell) ProcessBatch(batch []nuclio.Event, functionLogger logger.Logger)
return nil, nuclio.ErrNotImplemented
}

// GetAllocationStatistics returns the statistics of the allocator if there is any in the runtime
// shell runtime doesn't have any allocator in it, so return nil
func (s *shell) GetAllocationStatistics() *statistics.AllocatorStatistics {
return nil
}

func (s *shell) processEvent(context context.Context,
command []string,
event nuclio.Event,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package statistics
package gatherer

// a reflection of an object in the processor (e.g. trigger, runtime, worker) that holds prometheus
// metrics. when Gather() is called, the resource is queried for its primitive statistics. this way we decouple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package statistics
package gatherer

import (
"os"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package statistics
package gatherer

import (
"sync/atomic"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ See the License for the specific language governing permissions and
limitations under the License.
*/

package statistics
package gatherer

import (
"strconv"
Expand Down
45 changes: 45 additions & 0 deletions pkg/processor/statistics/types.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
Copyright 2025 The Nuclio Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package statistics

type EventProcessingStatistics struct {
EventsHandledSuccess uint64
EventsHandledError uint64
}

// AllocatorStatistics is not a safe statistics object and should be used only to copy safe object to it and return to outside
// so it can later be taken by metric gatherers
type AllocatorStatistics struct {
AllocationCount uint64
AllocationSuccessImmediateTotal uint64
AllocationSuccessAfterWaitTotal uint64
AllocationTimeoutTotal uint64
AllocationWaitDurationMilliSecondsSum uint64
AllocationObjectsAvailablePercentage uint64
}

func (s *AllocatorStatistics) DiffFrom(prev *AllocatorStatistics) AllocatorStatistics {

return AllocatorStatistics{
AllocationCount: s.AllocationCount - prev.AllocationCount,
AllocationSuccessImmediateTotal: s.AllocationSuccessImmediateTotal - prev.AllocationSuccessImmediateTotal,
AllocationSuccessAfterWaitTotal: s.AllocationSuccessAfterWaitTotal - prev.AllocationSuccessAfterWaitTotal,
AllocationTimeoutTotal: s.AllocationTimeoutTotal - prev.AllocationTimeoutTotal,
AllocationWaitDurationMilliSecondsSum: s.AllocationWaitDurationMilliSecondsSum - prev.AllocationWaitDurationMilliSecondsSum,
AllocationObjectsAvailablePercentage: s.AllocationObjectsAvailablePercentage - prev.AllocationObjectsAvailablePercentage,
}
}
4 changes: 2 additions & 2 deletions pkg/processor/trigger/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ import (
"time"

"github.com/nuclio/nuclio/pkg/functionconfig"
"github.com/nuclio/nuclio/pkg/processor/eventprocessor"
"github.com/nuclio/nuclio/pkg/processor/runtime"
"github.com/nuclio/nuclio/pkg/processor/statistics"
"github.com/nuclio/nuclio/pkg/processor/util/partitionworker"

"github.com/nuclio/errors"
Expand Down Expand Up @@ -183,7 +183,7 @@ func (c *Configuration) ResolveWorkerAllocationMode(modeFromAttributes, modeFrom
type Statistics struct {
EventsHandledSuccessTotal uint64
EventsHandledFailureTotal uint64
WorkerAllocatorStatistics eventprocessor.AllocatorStatistics
WorkerAllocatorStatistics statistics.AllocatorStatistics
}

func (s *Statistics) DiffFrom(prev *Statistics) Statistics {
Expand Down
9 changes: 7 additions & 2 deletions pkg/processor/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
"github.com/nuclio/nuclio/pkg/processor/controlcommunication"
"github.com/nuclio/nuclio/pkg/processor/eventprocessor"
"github.com/nuclio/nuclio/pkg/processor/runtime"
"github.com/nuclio/nuclio/pkg/processor/statistics"

"github.com/nuclio/logger"
"github.com/nuclio/nuclio-sdk-go"
Expand All @@ -36,7 +37,7 @@ import (
type Worker struct {

// accessed atomically, keep as first field for alignment
statistics eventprocessor.Statistics
statistics statistics.EventProcessingStatistics

logger logger.Logger
index int
Expand Down Expand Up @@ -93,10 +94,14 @@ func (w *Worker) ProcessEventBatch(batch []nuclio.Event, functionLogger logger.L
}

// GetStatistics returns a pointer to the statistics object. This must not be modified by the reader
func (w *Worker) GetStatistics() *eventprocessor.Statistics {
func (w *Worker) GetStatistics() *statistics.EventProcessingStatistics {
return &w.statistics
}

func (w *Worker) GetAllocationStatistics() *statistics.AllocatorStatistics {
return w.runtime.GetAllocationStatistics()
}

// GetIndex returns the index of the worker, as specified during creation
func (w *Worker) GetIndex() int {
return w.index
Expand Down
5 changes: 5 additions & 0 deletions pkg/processor/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"github.com/nuclio/nuclio/pkg/common/status"
"github.com/nuclio/nuclio/pkg/processor/controlcommunication"
"github.com/nuclio/nuclio/pkg/processor/runtime"
"github.com/nuclio/nuclio/pkg/processor/statistics"

"github.com/nuclio/logger"
"github.com/nuclio/nuclio-sdk-go"
Expand Down Expand Up @@ -53,6 +54,10 @@ func (mr *MockRuntime) GetStatistics() *runtime.Statistics {
return nil
}

func (mr *MockRuntime) GetAllocationStatistics() *statistics.AllocatorStatistics {
return nil
}

func (mr *MockRuntime) GetConfiguration() *runtime.Configuration {
return nil
}
Expand Down
Loading
0