8000 feat: dynamic metrics by technicallyty · Pull Request #785 · skip-mev/connect · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.

feat: dynamic metrics #785

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions cmd/connect/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
"errors"
"fmt"
"net/http"

//nolint: gosec
_ "net/http/pprof"
"os"
Expand Down Expand Up @@ -349,7 +348,7 @@

isValidateMode := runMode(mode) == modeValidate

metrics := oraclemetrics.NewMetricsFromConfig(cfg.Metrics, nodeClient)
metrics := oraclemetrics.NewDynamicMetrics(ctx, cfg.Metrics, nodeClient)

Check warning on line 351 in cmd/connect/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/connect/main.go#L351

Added line #L351 was not covered by tests

aggregator, err := oraclemath.NewIndexPriceAggregator(
logger,
Expand Down
121 changes: 121 additions & 0 deletions oracle/metrics/dynamic_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package metrics

import (
"context"
"time"

"github.com/skip-mev/connect/v2/oracle/config"
)

type ImplType int

const (
OracleMetricsType ImplType = iota
NoOpMetricsType
UnknownMetricsType
)

// determines the underlying impl of the metrics.
func determineMetricsType(m Metrics) ImplType {
switch m.(type) {
case *OracleMetricsImpl:
return OracleMetricsType
case *noOpOracleMetrics:
return NoOpMetricsType
default:
return UnknownMetricsType

Check warning on line 26 in oracle/metrics/dynamic_metrics.go

View check run for this annotation

Codecov / codecov/patch

oracle/metrics/dynamic_metrics.go#L25-L26

Added lines #L25 - L26 were not covered by tests
}
}

var _ Metrics = &dynamicMetrics{}

// dynamicMetrics is a type that can change its internal metrics impl on the fly.
// this is useful for when a connect instance is started before a node. we can't be sure which one starts first,
// so we need to be able to switch when the node comes online.
type dynamicMetrics struct {
cfg config.MetricsConfig
nc NodeClient
impl Metrics
metricsType ImplType
}

func NewDynamicMetrics(ctx context.Context, cfg config.MetricsConfig, nc NodeClient) Metrics {
impl := NewMetricsFromConfig(cfg, nc)
dyn := &dynamicMetrics{
cfg: cfg,
nc: nc,
impl: impl,
metricsType: determineMetricsType(impl),
}
// we only want to kick off the routine of attempting to switch if we're a noop metrics, telemetry is enabled,
// and we have a node client.
if dyn.metricsType == NoOpMetricsType && !cfg.Telemetry.Disabled && nc != nil {
dyn.retrySwitchImpl(ctx)
}
return dyn
}

// retrySwitchImpl kicks off a go routine that attempts to contact a node every 3 seconds for 10 mins.
// if it gets a response, it will switch its internal metrics impl.
func (d *dynamicMetrics) retrySwitchImpl(ctx context.Context) {
go func() {
retryDuration := time.NewTimer(10 * time.Minute)
ticker := time.NewTicker(3 * time.Second)

for {
select {
case <-ctx.Done():
return
case <-retryDuration.C:
return

Check warning on line 70 in oracle/metrics/dynamic_metrics.go

View check run for this annotation

Codecov / codecov/patch

oracle/metrics/dynamic_metrics.go#L67-L70

Added lines #L67 - L70 were not covered by tests
case <-ticker.C:
// this is technically a race condition, but it doesn't really matter. if something is accessing the
// old impl, its not the end of the world since its a noop impl.
_, err := d.nc.DeriveNodeIdentifier()
if err == nil {
impl := NewMetricsFromConfig(d.cfg, d.nc)
d.impl = impl
d.metricsType = determineMetricsType(impl)
d.SetConnectBuildInfo()
return
}
}
}
}()
}

func (d *dynamicMetrics) AddTick() {
d.impl.AddTick()

Check warning on line 88 in oracle/metrics/dynamic_metrics.go

View check run for this annotation

Codecov / codecov/patch

oracle/metrics/dynamic_metrics.go#L87-L88

Added lines #L87 - L88 were not covered by tests
}

func (d *dynamicMetrics) AddTickerTick(pairID string) {
d.impl.AddTickerTick(pairID)

Check warning on line 92 in oracle/metrics/dynamic_metrics.go

View check run for this annotation

Codecov / codecov/patch

oracle/metrics/dynamic_metrics.go#L91-L92

Added lines #L91 - L92 were not covered by tests
}

func (d *dynamicMetrics) UpdatePrice(name, pairID string, decimals uint64, price float64) {
d.impl.UpdatePrice(name, pairID, decimals, price)

Check warning on line 96 in oracle/metrics/dynamic_metrics.go

View check run for this annotation

Codecov / codecov/patch

oracle/metrics/dynamic_metrics.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}

func (d *dynamicMetrics) UpdateAggregatePrice(pairID string, decimals uint64, price float64) {
d.impl.UpdateAggregatePrice(pairID, decimals, price)

Check warning on line 100 in oracle/metrics/dynamic_metrics.go

View check run for this annotation

Codecov / codecov/patch

oracle/metrics/dynamic_metrics.go#L99-L100

Added lines #L99 - L100 were not covered by tests
}

func (d *dynamicMetrics) AddProviderTick(providerName, pairID string, success bool) {
d.impl.AddProviderTick(providerName, pairID, success)

Check warning on line 104 in oracle/metrics/dynamic_metrics.go

View check run for this annotation

Codecov / codecov/patch

oracle/metrics/dynamic_metrics.go#L103-L104

Added lines #L103 - L104 were not covered by tests
}

func (d *dynamicMetrics) AddProviderCountForMarket(pairID string, count int) {
d.impl.AddProviderCountForMarket(pairID, count)

Check warning on line 108 in oracle/metrics/dynamic_metrics.go

View check run for this annotation

Codecov / codecov/patch

oracle/metrics/dynamic_metrics.go#L107-L108

Added lines #L107 - L108 were not covered by tests
}

func (d *dynamicMetrics) SetConnectBuildInfo() {
d.impl.SetConnectBuildInfo()
}

func (d *dynamicMetrics) MissingPrices(pairIDs []string) {
d.impl.MissingPrices(pairIDs)

Check warning on line 116 in oracle/metrics/dynamic_metrics.go

View check run for this annotation

Codecov / codecov/patch

oracle/metrics/dynamic_metrics.go#L115-L116

Added lines #L115 - L116 were not covered by tests
}

func (d *dynamicMetrics) GetMissingPrices() []string {
return d.impl.GetMissingPrices()

Check warning on line 120 in oracle/metrics/dynamic_metrics.go

View check run for this annotation

Codecov / codecov/patch

oracle/metrics/dynamic_metrics.go#L119-L120

Added lines #L119 - L120 were not covered by tests
}
89 changes: 89 additions & 0 deletions oracle/metrics/dynamic_metrics_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
//go:build !race
// +build !race

// we don't need the race detector here because race conditions with the dynamic impl do not matter.
// if a codepath calls `metrics.AddTick()` while the dynamic impl is updating the impl, nothing bad really happens.

package metrics

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/skip-mev/connect/v2/oracle/config"
"github.com/skip-mev/connect/v2/oracle/metrics/mocks"
)

func TestDetermineMetricsType(t *testing.T) {
tcs := []struct {
name string
metrics Metrics
mType ImplType
}{
{
name: "oracle metrics type",
metrics: &OracleMetricsImpl{},
mType: OracleMetricsType,
},
{
name: "noop metrics type",
metrics: &noOpOracleMetrics{},
mType: NoOpMetricsType,
},
}

for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
typ := determineMetricsType(tc.metrics)
require.Equal(t, tc.mType, typ)
})
}
}

// TestDynamicMetrics_Switches tests that the metrics impl will switch if it can communicate with the node.
func TestDynamicMetrics_Switches(t *testing.T) {
ctx := context.Background()
cfg := config.MetricsConfig{
PrometheusServerAddress: "",
Telemetry: config.TelemetryConfig{
Disabled: false,
PushAddress: "",
},
Enabled: false,
}
node := mocks.NewNodeClient(t)

blocker := make(chan bool)

// it gets called once in the loop where it checks the node,
// and again in NewMetricsFromConfig.
node.EXPECT().DeriveNodeIdentifier().Run(func() {
<-blocker
}).Return("foobar", nil).Twice()

dyn := NewDynamicMetrics(ctx, cfg, node)
dynImpl, ok := dyn.(*dynamicMetrics)
require.True(t, ok)
require.Equal(t, dynImpl.metricsType, NoOpMetricsType)

dynImpl.cfg.Enabled = true

// once for the routine that switches it.
blocker <- true
// and again for the new metrics call.
blocker <- true

valid := false
for range 10 {
if dynImpl.metricsType == OracleMetricsType {
valid = true
break
}
time.Sleep(time.Millisecond * 500)
}

require.True(t, valid, "the metrics type did not change after 500ms", "metricsType", dynImpl.metricsType)
}
Loading
0