-
Notifications
You must be signed in to change notification settings - Fork 77
fix: uniswap unable to handle more than 10 tickers #797
Changes from all commits
bf950c1
4e414d3
5caaaf6
b538de0
5dfb0d3
16668a2
03f4f3c
9d316d9
3186596
a714486
15cfa56
98c6a2b
84b52b2
0bbe995
2e0cc5a
949108b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
package slices | ||
|
||
// Chunk chunks a slice into batches of chunkSize. | ||
// example: {1,2,3,4,5}, chunkSize = 2 -> {1,2}, {3,4}, {5} | ||
func Chunk[T any](input []T, chunkSize int) [][]T { | ||
if len(input) <= chunkSize { | ||
return [][]T{input} | ||
} | ||
var chunks [][]T | ||
for i := 0; i < len(input); i += chunkSize { | ||
end := i + chunkSize | ||
|
||
if end > len(input) { | ||
end = len(input) | ||
} | ||
|
||
chunks = append(chunks, input[i:end]) | ||
} | ||
return chunks | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,62 @@ | ||
package slices_test | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/skip-mev/connect/v2/pkg/slices" | ||
) | ||
|
||
func TestChunkSlice(t *testing.T) { | ||
testCases := []struct { | ||
name string | ||
input []string | ||
chunkSize int | ||
expected [][]string | ||
}{ | ||
{ | ||
name: "Empty slice", | ||
input: []string{}, | ||
chunkSize: 3, | ||
expected: [][]string{{}}, | ||
}, | ||
{ | ||
name: "Slice smaller than chunk size", | ||
input: []string{"a", "b"}, | ||
chunkSize: 3, | ||
expected: [][]string{{"a", "b"}}, | ||
}, | ||
{ | ||
name: "Slice equal to chunk size", | ||
input: []string{"a", "b", "c"}, | ||
chunkSize: 3, | ||
expected: [][]string{{"a", "b", "c"}}, | ||
}, | ||
{ | ||
name: "Slice larger than chunk size", | ||
input: []string{"a", "b", "c", "d", "e"}, | ||
chunkSize: 2, | ||
expected: [][]string{{"a", "b"}, {"c", "d"}, {"e"}}, | ||
}, | ||
{ | ||
name: "Chunk size of 1", | ||
input: []string{"a", "b", "c"}, | ||
chunkSize: 1, | ||
expected: [][]string{{"a"}, {"b"}, {"c"}}, | ||
}, | ||
{ | ||
name: "Large slice with uneven chunks", | ||
input: []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"}, | ||
chunkSize: 3, | ||
expected: [][]string{{"1", "2", "3"}, {"4", "5", "6"}, {"7", "8", "9"}, {"10"}}, | ||
}, | ||
} | ||
|
||
for _, tc := range testCases { | ||
t.Run(tc.name, func(t *testing.T) { | ||
result := slices.Chunk(tc.input, tc.chunkSize) | ||
require.Equal(t, tc.expected, result) | ||
}) | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,10 +6,10 @@ import ( | |
"net/http" | ||
"time" | ||
|
||
"github.com/ethereum/go-ethereum/rpc" | ||
|
||
"github.com/skip-mev/connect/v2/oracle/config" | ||
"github.com/skip-mev/connect/v2/providers/base/api/metrics" | ||
|
||
"github.com/ethereum/go-ethereum/rpc" | ||
) | ||
|
||
// EVMClient is an interface that abstracts the evm client. | ||
|
@@ -92,17 +92,17 @@ func NewGoEthereumClientImpl( | |
// the corresponding BatchElem. | ||
// | ||
// Note that batch calls may not be executed atomically on the server side. | ||
func (c *GoEthereumClientImpl) BatchCallContext(ctx context.Context, calls []rpc.BatchElem) (err error) { | ||
func (c *GoEthereumClientImpl) BatchCallContext(ctx context.Context, calls []rpc.BatchElem) error { | ||
start := time.Now() | ||
defer func() { | ||
c.apiMetrics.ObserveProviderResponseLatency(c.api.Name, c.redactedURL, time.Since(start)) | ||
}() | ||
|
||
if err = c.client.BatchCallContext(ctx, calls); err != nil { | ||
if err := c.client.BatchCallContext(ctx, calls); err != nil { | ||
c.apiMetrics.AddRPCStatusCode(c.api.Name, c.redactedURL, metrics.RPCCodeError) | ||
return | ||
return err | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I assume this was done for clarity since it can be confusing when we have named return values. Consider removing the name entirely. (i.e., instead of There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
} | ||
|
||
c.apiMetrics.AddRPCStatusCode(c.api.Name, c.redactedURL, metrics.RPCCodeOK) | ||
return | ||
return nil | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -7,15 +7,15 @@ import ( | |
"math/big" | ||
"time" | ||
|
||
"go.uber.org/zap" | ||
|
||
"github.com/ethereum/go-ethereum/accounts/abi" | ||
"github.com/ethereum/go-ethereum/common" | ||
"github.com/ethereum/go-ethereum/common/hexutil" | ||
"github.com/ethereum/go-ethereum/rpc" | ||
"go.uber.org/zap" | ||
|
||
"github.com/skip-mev/connect/v2/oracle/config" | ||
"github.com/skip-mev/connect/v2/oracle/types" | ||
"github.com/skip-mev/connect/v2/pkg/slices" | ||
"github.com/skip-mev/connect/v2/providers/apis/defi/ethmulticlient" | ||
uniswappool "github.com/skip-mev/connect/v2/providers/apis/defi/uniswapv3/pool" | ||
"github.com/skip-mev/connect/v2/providers/base/api/metrics" | ||
|
@@ -158,6 +158,7 @@ func (u *PriceFetcher) Fetch( | |
// Create a batch element for each ticker and pool. | ||
batchElems := make([]rpc.BatchElem, len(tickers)) | ||
pools := make([]PoolConfig, len(tickers)) | ||
|
||
for i, ticker := range tickers { | ||
pool, err := u.GetPool(ticker) | ||
if err != nil { | ||
|
@@ -192,17 +193,23 @@ func (u *PriceFetcher) Fetch( | |
pools[i] = pool | ||
} | ||
|
||
// Batch call to the EVM. | ||
if err := u.client.BatchCallContext(ctx, batchElems); err != nil { | ||
u.logger.Debug( | ||
"failed to batch call to ethereum network for all tickers", | ||
zap.Error(err), | ||
) | ||
// process 10 tickers at a time | ||
const batchSize = 10 | ||
batchChunks := slices.Chunk(batchElems, batchSize) | ||
|
||
return types.NewPriceResponseWithErr( | ||
tickers, | ||
providertypes.NewErrorWithCode(err, providertypes.ErrorAPIGeneral), | ||
) | ||
for _, chunk := range batchChunks { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should probably just parallelize this. It isn't too much more complicated, and this is one of the most common ways for latency to get out of control. eg, egCtx := errgroup.WithContext(ctx)
for _, chunk := range batchChunks {
chunk := chunk
eg.Go(func() error {
// Batch call to the EVM.
if err := u.client.BatchCallContext(egCtx, chunk); err != nil
F438
span> {
u.logger.Debug(
"failed to batch call to ethereum network for all tickers",
zap.Error(err),
)
return err
}
return nil
})
}
if err := eg.Wait(); err != nil {
return types.NewPriceResponseWithErr(
tickers,
providertypes.NewErrorWithCode(err, providertypes.ErrorAPIGeneral),
)
} There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If you do this, you should probably set a limit of something like 5 for the error group. |
||
// Batch call to the EVM. | ||
if err := u.client.BatchCallContext(ctx, chunk); err != nil { | ||
u.logger.Debug( | ||
"failed to batch call to ethereum network for all tickers", | ||
zap.Error(err), | ||
) | ||
|
||
return types.NewPriceResponseWithErr( | ||
tickers, | ||
providertypes.NewErrorWithCode(err, providertypes.ErrorAPIGeneral), | ||
) | ||
} | ||
} | ||
|
||
// Parse the result from the batch call for each ticker. | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -163,7 +163,7 @@ func (o *TestingOracle) RunMarketMap(ctx context.Context, mm mmtypes.MarketMap, | |
if len(prices) != expectedNumPrices { | ||
return nil, fmt.Errorf("expected %d prices, got %d", expectedNumPrices, len(prices)) | ||
} | ||
o.Logger.Info("provider prices", zap.Any("prices", prices)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this being removed because it was making the logs too messy? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yep |
||
|
||
priceResults = append(priceResults, PriceResult{ | ||
Prices: prices, | ||
Time: time.Now(), | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,6 +15,9 @@ func FilterMarketMapToProviders(mm mmtypes.MarketMap) map[string]mmtypes.MarketM | |
for _, market := range mm.Markets { | ||
// check each provider config | ||
for _, pc := range market.ProviderConfigs { | ||
// remove normalizations to isolate markets | ||
pc.NormalizeByPair = nil | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this just for testing, or why is it needed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is so that we can actually test the markets in isolation. For example, if I want to isolate all of the Uniswap markets, they're all going to be Normalized by ETH/USD, which requires a bunch of other feeds to exist. For this kind of testing, we only want to see "Does uniswap return a price for this expected pool?", so we just remove normalize pairs |
||
|
||
// create a market from the given provider config | ||
isolatedMarket := mmtypes.Market{ | ||
Ticker: market.Ticker, | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken from sauron.
There is a Chunk function in the
slices
pkg for go1.23, but this would require is to bump the go version in our go.mod, which would affect anyone using our code