8000 fix: uniswap unable to handle more than 10 tickers by aljo242 · Pull Request #797 · skip-mev/connect · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content
This repository was archived by the owner on Mar 24, 2025. It is now read-only.

fix: uniswap unable to handle more than 10 tickers #797

Merged
merged 16 commits into from
Oct 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions pkg/slices/slices.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package slices

// Chunk chunks a slice into batches of chunkSize.
// example: {1,2,3,4,5}, chunkSize = 2 -> {1,2}, {3,4}, {5}
func Chunk[T any](input []T, chunkSize int) [][]T {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

taken from sauron.

There is a Chunk function in the slices pkg for go1.23, but this would require is to bump the go version in our go.mod, which would affect anyone using our code

if len(input) <= chunkSize {
return [][]T{input}
}
var chunks [][]T
for i := 0; i < len(input); i += chunkSize {
end := i + chunkSize

if end > len(input) {
end = len(input)
}

chunks = append(chunks, input[i:end])
}
return chunks
}
62 changes: 62 additions & 0 deletions pkg/slices/slices_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package slices_test

import (
"testing"

"github.com/stretchr/testify/require"

"github.com/skip-mev/connect/v2/pkg/slices"
)

func TestChunkSlice(t *testing.T) {
testCases := []struct {
name string
input []string
chunkSize int
expected [][]string
}{
{
name: "Empty slice",
input: []string{},
chunkSize: 3,
expected: [][]string{{}},
},
{
name: "Slice smaller than chunk size",
input: []string{"a", "b"},
chunkSize: 3,
expected: [][]string{{"a", "b"}},
},
{
name: "Slice equal to chunk size",
input: []string{"a", "b", "c"},
chunkSize: 3,
expected: [][]string{{"a", "b", "c"}},
},
{
name: "Slice larger than chunk size",
input: []string{"a", "b", "c", "d", "e"},
chunkSize: 2,
expected: [][]string{{"a", "b"}, {"c", "d"}, {"e"}},
},
{
name: "Chunk size of 1",
input: []string{"a", "b", "c"},
chunkSize: 1,
expected: [][]string{{"a"}, {"b"}, {"c"}},
},
{
name: "Large slice with uneven chunks",
input: []string{"1", "2", "3", "4", "5", "6", "7", "8", "9", "10"},
chunkSize: 3,
expected: [][]string{{"1", "2", "3"}, {"4", "5", "6"}, {"7", "8", "9"}, {"10"}},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := slices.Chunk(tc.input, tc.chunkSize)
require.Equal(t, tc.expected, result)
})
}
}
12 changes: 6 additions & 6 deletions providers/apis/defi/ethmulticlient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,10 @@ import (
"net/http"
"time"

"github.com/ethereum/go-ethereum/rpc"

"github.com/skip-mev/connect/v2/oracle/config"
"github.com/skip-mev/connect/v2/providers/base/api/metrics"

"github.com/ethereum/go-ethereum/rpc"
)

// EVMClient is an interface that abstracts the evm client.
Expand Down Expand Up @@ -92,17 +92,17 @@ func NewGoEthereumClientImpl(
// the corresponding BatchElem.
//
// Note that batch calls may not be executed atomically on the server side.
func (c *GoEthereumClientImpl) BatchCallContext(ctx context.Context, calls []rpc.BatchElem) (err error) {
func (c *GoEthereumClientImpl) BatchCallContext(ctx context.Context, calls []rpc.BatchElem) error {
start := time.Now()
defer func() {
c.apiMetrics.ObserveProviderResponseLatency(c.api.Name, c.redactedURL, time.Since(start))
}()

if err = c.client.BatchCallContext(ctx, calls); err != nil {
if err := c.client.BatchCallContext(ctx, calls); err != nil {
c.apiMetrics.AddRPCStatusCode(c.api.Name, c.redactedURL, metrics.RPCCodeError)
return
return err
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this was done for clarity since it can be confusing when we have named return values. Consider removing the name entirely.

(i.e., instead of (err error) just let it be error for the return type.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}

c.apiMetrics.AddRPCStatusCode(c.api.Name, c.redactedURL, metrics.RPCCodeOK)
return
return nil
}
8 changes: 4 additions & 4 deletions providers/apis/defi/raydium/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,20 +76,20 @@ func (c *JSONRPCClient) GetMultipleAccountsWithOpts(
ctx context.Context,
accounts []solana.PublicKey,
opts *rpc.GetMultipleAccountsOpts,
) (out *rpc.GetMultipleAccountsResult, err error) {
) (*rpc.GetMultipleAccountsResult, error) {
start := time.Now()
defer func() {
c.apiMetrics.ObserveProviderResponseLatency(c.api.Name, c.redactedURL, time.Since(start))
}()

out, err = c.client.GetMultipleAccountsWithOpts(ctx, accounts, opts)
out, err := c.client.GetMult 6D47 ipleAccountsWithOpts(ctx, accounts, opts)
if err != nil {
c.apiMetrics.AddRPCStatusCode(c.api.Name, c.redactedURL, metrics.RPCCodeError)
return
return nil, err
}

c.apiMetrics.AddRPCStatusCode(c.api.Name, c.redactedURL, metrics.RPCCodeOK)
return
return out, nil
}

// solanaClientFromEndpoint creates a new SolanaJSONRPCClient from an endpoint.
Expand Down
31 changes: 19 additions & 12 deletions providers/apis/defi/uniswapv3/fetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,15 @@ import (
"math/big"
"time"

"go.uber.org/zap"

"github.com/ethereum/go-ethereum/accounts/abi"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/rpc"
"go.uber.org/zap"

"github.com/skip-mev/connect/v2/oracle/config"
"github.com/skip-mev/connect/v2/oracle/types"
"github.com/skip-mev/connect/v2/pkg/slices"
"github.com/skip-mev/connect/v2/providers/apis/defi/ethmulticlient"
uniswappool "github.com/skip-mev/connect/v2/providers/apis/defi/uniswapv3/pool"
"github.com/skip-mev/connect/v2/providers/base/api/metrics"
Expand Down Expand Up @@ -158,6 +158,7 @@ func (u *PriceFetcher) Fetch(
// Create a batch element for each ticker and pool.
batchElems := make([]rpc.BatchElem, len(tickers))
pools := make([]PoolConfig, len(tickers))

for i, ticker := range tickers {
pool, err := u.GetPool(ticker)
if err != nil {
Expand Down Expand Up @@ -192,17 +193,23 @@ func (u *PriceFetcher) Fetch(
pools[i] = pool
}

// Batch call to the EVM.
if err := u.client.BatchCallContext(ctx, batchElems); err != nil {
u.logger.Debug(
"failed to batch call to ethereum network for all tickers",
zap.Error(err),
)
// process 10 tickers at a time
const batchSize = 10
batchChunks := slices.Chunk(batchElems, batchSize)

return types.NewPriceResponseWithErr(
tickers,
providertypes.NewErrorWithCode(err, providertypes.ErrorAPIGeneral),
)
for _, chunk := range batchChunks {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should probably just parallelize this. It isn't too much more complicated, and this is one of the most common ways for latency to get out of control.

	eg, egCtx := errgroup.WithContext(ctx)
	for _, chunk := range batchChunks {
		chunk := chunk
		eg.Go(func() error {
			// Batch call to the EVM.
			if err := u.client.BatchCallContext(egCtx, chunk); err != nil {
				u.logger.Debug(
					"failed to batch call to ethereum network for all tickers",
					zap.Error(err),
				)

				return err
			}

			return nil
		})
	}

	if err := eg.Wait(); err != nil {
		return types.NewPriceResponseWithErr(
			tickers,
			providertypes.NewErrorWithCode(err, providertypes.ErrorAPIGeneral),
		)
	}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you do this, you should probably set a limit of something like 5 for the error group.

// Batch call to the EVM.
if err := u.client.BatchCallContext(ctx, chunk); err != nil {
u.logger.Debug(
"failed to batch call to ethereum network for all tickers",
zap.Error(err),
)

return types.NewPriceResponseWithErr(
tickers,
providertypes.NewErrorWithCode(err, providertypes.ErrorAPIGeneral),
)
}
}

// Parse the result from the batch call for each ticker.
Expand Down
3 changes: 1 addition & 2 deletions providers/apis/defi/uniswapv3/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ import (
"math/big"
"testing"

"go.uber.org/zap"

"github.com/ethereum/go-ethereum/rpc"
"github.com/stretchr/testify/require"
"go.uber.org/zap"

"github.com/skip-mev/connect/v2/oracle/config"
"github.com/skip-mev/connect/v2/oracle/types"
Expand Down
2 changes: 1 addition & 1 deletion providers/apis/defi/uniswapv3/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (pc *PoolConfig) ValidateBasic() error {
}

// MustToJSON converts the pool configuration to JSON.
func (pc PoolConfig) MustToJSON() string {
func (pc *PoolConfig) MustToJSON() string {
b, err := json.Marshal(pc)
if err != nil {
panic(err)
Expand Down
2 changes: 1 addition & 1 deletion providers/providertest/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ func (o *TestingOracle) RunMarketMap(ctx context.Context, mm mmtypes.MarketMap,
if len(prices) != expectedNumPrices {
return nil, fmt.Errorf("expected %d prices, got %d", expectedNumPrices, len(prices))
}
o.Logger.Info("provider prices", zap.Any("prices", prices))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this being removed because it was making the logs too messy?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yep


priceResults = append(priceResults, PriceResult{
Prices: prices,
Time: time.Now(),
Expand Down
3 changes: 3 additions & 0 deletions providers/providertest/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@ func FilterMarketMapToProviders(mm mmtypes.MarketMap) map[string]mmtypes.MarketM
for _, market := range mm.Markets {
// check each provider config
for _, pc := range market.ProviderConfigs {
// remove normalizations to isolate markets
pc.NormalizeByPair = nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this just for testing, or why is it needed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is so that we can actually test the markets in isolation.

For example, if I want to isolate all of the Uniswap markets, they're all going to be Normalized by ETH/USD, which requires a bunch of other feeds to exist.

For this kind of testing, we only want to see "Does uniswap return a price for this expected pool?", so we just remove normalize pairs


// create a market from the given provider config
isolatedMarket := mmtypes.Market{
Ticker: market.Ticker,
Expand Down
Loading
0