This repository was archived by the owner on Mar 24, 2025. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 77
feat: dynamic metrics #785
Merged
technicallyty
merged 8 commits into
main
from
tyler/con-1797-connect-will-not-emit-telemetry-if-node-is-not-started
Oct 9, 2024
Merged
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
4992755
dynamic metrics
technicallyty 6a66acd
comments
technicallyty cb4e87c
nolint
technicallyty 69d5ba2
data race
technicallyty ff3c180
lock
technicallyty f3e990a
k
technicallyty 5cb876d
no race pls
technicallyty 4707f9c
Merge branch 'main' into tyler/con-1797-connect-will-not-emit-telemet…
technicallyty File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
package metrics | ||
|
||
import ( | ||
"context" | ||
"time" | ||
|
||
"github.com/skip-mev/connect/v2/oracle/config" | ||
) | ||
|
||
type ImplType int | ||
|
||
const ( | ||
OracleMetricsType ImplType = iota | ||
NoOpMetricsType | ||
UnknownMetricsType | ||
) | ||
|
||
// determines the underlying impl of the metrics. | ||
func determineMetricsType(m Metrics) ImplType { | ||
switch m.(type) { | ||
case *OracleMetricsImpl: | ||
return OracleMetricsType | ||
case *noOpOracleMetrics: | ||
return NoOpMetricsType | ||
default: | ||
return UnknownMetricsType | ||
} | ||
} | ||
|
||
var _ Metrics = &dynamicMetrics{} | ||
|
||
// dynamicMetrics is a type that can change its internal metrics impl on the fly. | ||
// this is useful for when a connect instance is started before a node. we can't be sure which one starts first, | ||
// so we need to be able to switch when the node comes online. | ||
type dynamicMetrics struct { | ||
cfg config.MetricsConfig | ||
nc NodeClient | ||
impl Metrics | ||
metricsType ImplType | ||
} | ||
|
||
func NewDynamicMetrics(ctx context.Context, cfg config.MetricsConfig, nc NodeClient) Metrics { | ||
impl := NewMetricsFromConfig(cfg, nc) | ||
dyn := &dynamicMetrics{ | ||
cfg: cfg, | ||
nc: nc, | ||
impl: impl, | ||
metricsType: determineMetricsType(impl), | ||
} | ||
// we only want to kick off the routine of attempting to switch if we're a noop metrics, telemetry is enabled, | ||
// and we have a node client. | ||
if dyn.metricsType == NoOpMetricsType && !cfg.Telemetry.Disabled && nc != nil { | ||
dyn.retrySwitchImpl(ctx) | ||
} | ||
return dyn | ||
} | ||
|
||
// retrySwitchImpl kicks off a go routine that attempts to contact a node every 3 seconds for 10 mins. | ||
// if it gets a response, it will switch its internal metrics impl. | ||
func (d *dynamicMetrics) retrySwitchImpl(ctx context.Context) { | ||
go func() { | ||
retryDuration := time.NewTimer(10 * time.Minute) | ||
ticker := time.NewTicker(3 * time.Second) | ||
|
||
for { | ||
select { | ||
case <-ctx.Done(): | ||
return | ||
case <-retryDuration.C: | ||
return | ||
case <-ticker.C: | ||
// this is technically a race condition, but it doesn't really matter. if something is accessing the | ||
// old impl, its not the end of the world since its a noop impl. | ||
_, err := d.nc.DeriveNodeIdentifier() | ||
if err == nil { | ||
impl := NewMetricsFromConfig(d.cfg, d.nc) | ||
d.impl = impl | ||
d.metricsType = determineMetricsType(impl) | ||
d.SetConnectBuildInfo() | ||
return | ||
} | ||
} | ||
} | ||
}() | ||
} | ||
|
||
func (d *dynamicMetrics) AddTick() { | ||
d.impl.AddTick() | ||
} | ||
|
||
func (d *dynamicMetrics) AddTickerTick(pairID string) { | ||
d.impl.AddTickerTick(pairID) | ||
} | ||
|
||
func (d *dynamicMetrics) UpdatePrice(name, pairID string, decimals uint64, price float64) { | ||
d.impl.UpdatePrice(name, pairID, decimals, price) | ||
} | ||
|
||
func (d *dynamicMetrics) UpdateAggregatePrice(pairID string, decimals uint64, price float64) { | ||
d.impl.UpdateAggregatePrice(pairID, decimals, price) | ||
} | ||
|
||
func (d *dynamicMetrics) AddProviderTick(providerName, pairID string, success bool) { | ||
d.impl.AddProviderTick(providerName, pairID, success) | ||
} | ||
|
||
func (d *dynamicMetrics) AddProviderCountForMarket(pairID string, count int) { | ||
d.impl.AddProviderCountForMarket(pairID, count) | ||
} | ||
|
||
func (d *dynamicMetrics) SetConnectBuildInfo() { | ||
d.impl.SetConnectBuildInfo() | ||
} | ||
|
||
func (d *dynamicMetrics) MissingPrices(pairIDs []string) { | ||
d.impl.MissingPrices(pairIDs) | ||
} | ||
|
||
func (d *dynamicMetrics) GetMissingPrices() []string { | ||
return d.impl.GetMissingPrices() | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,89 @@ | ||
//go:build !race | ||
// +build !race | ||
|
||
// we don't need the race detector here because race conditions with the dynamic impl do not matter. | ||
// if a codepath calls `metrics.AddTick()` while the dynamic impl is updating the impl, nothing bad really happens. | ||
|
||
package metrics | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/skip-mev/connect/v2/oracle/config" | ||
"github.com/skip-mev/connect/v2/oracle/metrics/mocks" | ||
) | ||
|
||
func TestDetermineMetricsType(t *testing.T) { | ||
tcs := []struct { | ||
name string | ||
metrics Metrics | ||
mType ImplType | ||
}{ | ||
{ | ||
name: "oracle metrics type", | ||
metrics: &OracleMetricsImpl{}, | ||
mType: OracleMetricsType, | ||
}, | ||
{ | ||
name: "noop metrics type", | ||
metrics: &noOpOracleMetrics{}, | ||
mType: NoOpMetricsType, | ||
}, | ||
} | ||
|
||
for _, tc := range tcs { | ||
t.Run(tc.name, func(t *testing.T) { | ||
typ := determineMetricsType(tc.metrics) | ||
require.Equal(t, tc.mType, typ) | ||
}) | ||
} | ||
} | ||
|
||
// TestDynamicMetrics_Switches tests that the metrics impl will switch if it can communicate with the node. | ||
func TestDynamicMetrics_Switches(t *testing.T) { | ||
ctx := context.Background() | ||
cfg := config.MetricsConfig{ | ||
PrometheusServerAddress: "", | ||
Telemetry: config.TelemetryConfig{ | ||
Disabled: false, | ||
PushAddress: "", | ||
}, | ||
Enabled: false, | ||
} | ||
node := mocks.NewNodeClient(t) | ||
|
||
blocker := make(chan bool) | ||
|
||
// it gets called once in the loop where it checks the node, | ||
// and again in NewMetricsFromConfig. | ||
node.EXPECT().DeriveNodeIdentifier().Run(func() { | ||
<-blocker | ||
}).Return("foobar", nil).Twice() | ||
|
||
dyn := NewDynamicMetrics(ctx, cfg, node) | ||
dynImpl, ok := dyn.(*dynamicMetrics) | ||
require.True(t, ok) | ||
require.Equal(t, dynImpl.metricsType, NoOpMetricsType) | ||
|
||
dynImpl.cfg.Enabled = true | ||
|
||
// once for the routine that switches it. | ||
blocker <- true | ||
// and again for the new metrics call. | ||
blocker <- true | ||
|
||
valid := false | ||
for range 10 { | ||
if dynImpl.metricsType == OracleMetricsType { | ||
valid = true | ||
break | ||
} | ||
time.Sleep(time.Millisecond * 500) | ||
} | ||
|
||
require.True(t, valid, "the metrics type did not change after 500ms", "metricsType", dynImpl.metricsType) | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.