From fac553e370e93793f420e6a2ca972d8df795d086 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 14:46:09 -0500 Subject: [PATCH 01/12] noot --- node/go.mod | 3 +- node/go.sum | 2 + node/pkg/node/options.go | 51 +++- node/pkg/query/narasu/memorystore.go | 120 ++++++++ node/pkg/query/narasu/memorystore_test.go | 95 +++++++ node/pkg/query/narasu/store.go | 12 + node/pkg/query/query.go | 262 ++++++++++-------- .../query/queryratelimit/policyprovider.go | 130 +++++++++ node/pkg/query/queryratelimit/ratelimit.go | 93 +++++++ node/pkg/query/querystaking/evm.go | 37 +++ 10 files changed, 685 insertions(+), 120 deletions(-) create mode 100644 node/pkg/query/narasu/memorystore.go create mode 100644 node/pkg/query/narasu/memorystore_test.go create mode 100644 node/pkg/query/narasu/store.go create mode 100644 node/pkg/query/queryratelimit/policyprovider.go create mode 100644 node/pkg/query/queryratelimit/ratelimit.go create mode 100644 node/pkg/query/querystaking/evm.go diff --git a/node/go.mod b/node/go.mod index 1d0eb5f260..37dcf62dca 100644 --- a/node/go.mod +++ b/node/go.mod @@ -55,9 +55,11 @@ require ( github.com/grafana/dskit v0.0.0-20230201083518-528d8a7d52f2 github.com/grafana/loki v1.6.2-0.20230721141808-0d81144cfee8 github.com/hashicorp/golang-lru v0.6.0 + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/holiman/uint256 v1.2.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.60.0 + github.com/tidwall/btree v1.7.0 github.com/wormhole-foundation/wormchain v0.0.0-00010101000000-000000000000 github.com/wormhole-foundation/wormhole/sdk v0.0.0-20220926172624-4b38dc650bb0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c @@ -188,7 +190,6 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-sockaddr v1.0.2 // indirect - github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/memberlist v0.5.0 // indirect github.com/hashicorp/serf v0.10.1 // indirect diff --git a/node/go.sum b/node/go.sum index 2e381e1c69..07618cb2b2 100644 --- a/node/go.sum +++ b/node/go.sum @@ -2984,6 +2984,8 @@ github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmN github.com/tetafro/godot v0.3.7/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0= github.com/tetafro/godot v0.4.2/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0= github.com/tetafro/godot v1.4.11/go.mod h1:LR3CJpxDVGlYOWn3ZZg1PgNZdTUvzsZWu8xaEohUpn8= +github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= +github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= github.com/tidwall/gjson v1.6.7/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI= github.com/tidwall/gjson v1.9.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.15.0 h1:5n/pM+v3r5ujuNl4YLZLsQ+UE5jlkLVm7jMzT5Mpolw= diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 037cb0fa86..4cdaf1f288 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "net/http" + "strings" "time" "github.com/certusone/wormhole/node/pkg/accountant" @@ -17,12 +18,14 @@ import ( "github.com/certusone/wormhole/node/pkg/processor" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" "github.com/certusone/wormhole/node/pkg/query" + "github.com/certusone/wormhole/node/pkg/query/queryratelimit" "github.com/certusone/wormhole/node/pkg/readiness" "github.com/certusone/wormhole/node/pkg/supervisor" "github.com/certusone/wormhole/node/pkg/watchers" "github.com/certusone/wormhole/node/pkg/watchers/ibc" "github.com/certusone/wormhole/node/pkg/watchers/interfaces" "github.com/certusone/wormhole/node/pkg/wormconn" + ethCommon "github.com/ethereum/go-ethereum/common" "github.com/gorilla/mux" libp2p_crypto "github.com/libp2p/go-libp2p/core/crypto" "github.com/prometheus/client_golang/prometheus/promhttp" @@ -139,10 +142,56 @@ func GuardianOptionQueryHandler(ccqEnabled bool, allowedRequesters string) *Guar return nil } + enforcer := queryratelimit.NewEnforcer() + if allowedRequesters == "" { + return fmt.Errorf("if cross chain query is enabled `--ccqAllowedRequesters` must be specified") + } + var nullAddr ethCommon.Address + result := make(map[ethCommon.Address]struct{}) + for _, str := range strings.Split(allowedRequesters, ",") { + addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(strings.TrimPrefix(str, "0x"))) + if addr == nullAddr { + return fmt.Errorf("invalid value in `--ccqAllowedRequesters`: `%s`", str) + } + result[addr] = struct{}{} + } + if len(result) <= 0 { + return fmt.Errorf("no allowed requestors specified, ccqAllowedRequesters: `%s`", allowedRequesters) + } + + policyProvider, err := queryratelimit.NewPolicyProvider( + queryratelimit.WithPolicyProviderLogger(logger), + queryratelimit.WithPolicyProviderCacheDuration(24*time.Hour), + queryratelimit.WithPolicyProviderFetcher(func(ctx context.Context, key ethCommon.Address) (*queryratelimit.Policy, error) { + _, ok := result[key] + if !ok { + return &queryratelimit.Policy{}, nil + } + return &queryratelimit.Policy{ + Limits: queryratelimit.Limits{ + Networks: map[string]queryratelimit.Rule{ + "evm": { + MaxPerMinute: 15 * 60, + MaxPerSecond: 15, + }, + "solana": { + MaxPerMinute: 15 * 60, + MaxPerSecond: 15, + }, + }, + }, + }, nil + }), + ) + if err != nil { + return fmt.Errorf("failed to create rate limit policy provider: %w", err) + } + g.queryHandler = query.NewQueryHandler( logger, g.env, - allowedRequesters, + enforcer, + policyProvider, g.signedQueryReqC.readC, g.chainQueryReqC, g.queryResponseC.readC, diff --git a/node/pkg/query/narasu/memorystore.go b/node/pkg/query/narasu/memorystore.go new file mode 100644 index 0000000000..a4bf2c2644 --- /dev/null +++ b/node/pkg/query/narasu/memorystore.go @@ -0,0 +1,120 @@ +package narasu + +import ( + "context" + "sync" + "time" + + "github.com/tidwall/btree" +) + +type MemoryStore struct { + tree *btree.Map[int, *memoryStoreItem] + mapPool sync.Pool + interval time.Duration +} + +type memoryStoreItem struct { + m map[string]int +} + +func (m *memoryStoreItem) Clear() { + for k := range m.m { + delete(m.m, k) + } +} + +func NewMemoryStore(interval time.Duration) *MemoryStore { + c := &MemoryStore{ + tree: btree.NewMap[int, *memoryStoreItem](8), + interval: interval, + mapPool: sync.Pool{ + New: func() any { + return &memoryStoreItem{ + m: make(map[string]int), + } + }, + }, + } + return c +} + +func (s *MemoryStore) Close() error { + return nil +} + +func (s *MemoryStore) getMap() *memoryStoreItem { + v := s.mapPool.Get() + if v == nil { + return &memoryStoreItem{ + m: make(map[string]int), + } + } + item := v.(*memoryStoreItem) + item.Clear() + return item +} + +func (s *MemoryStore) putMap(m *memoryStoreItem) { + s.mapPool.Put(m) +} + +func (s *MemoryStore) time(cur time.Time) int { + return int(cur.Truncate(s.interval).Unix()) +} + +func (s *MemoryStore) IncrKey(ctx context.Context, bucket string, amount int, cur time.Time) (int, error) { + now := s.time(cur) + val, ok := s.tree.Get(now) + if !ok { + n := s.getMap() + s.tree.Set(now, n) + val = n + } + if _, ok := val.m[bucket]; !ok { + val.m[bucket] = amount + } else { + val.m[bucket] = val.m[bucket] + amount + } + return val.m[bucket], nil +} + +func (s *MemoryStore) GetKeys(ctx context.Context, bucket string, from time.Time, to time.Time) ([]int, error) { + out := make([]int, 0) + toseconds := s.time(to) + fromseconds := s.time(from) + s.tree.Ascend(fromseconds, func(key int, val *memoryStoreItem) bool { + if key > toseconds { + return false + } + if count, ok := val.m[bucket]; ok { + out = append(out, count) + } + return true + }) + return out, nil +} + +func (s *MemoryStore) Cleanup(ctx context.Context, now time.Time, age time.Duration) error { + var expired []int + nowseconds := int(now.Unix()) + ageSeconds := int(age.Seconds()) + func() { + s.tree.Ascend(0, func(key int, val *memoryStoreItem) bool { + // extract the timestamp from the key timestamp:bucket + ts := key + if nowseconds-ts >= ageSeconds { + expired = append(expired, key) + return true + } + return false + }) + }() + for _, key := range expired { + item, ok := s.tree.Delete(key) + if ok { + s.putMap(item) + } + } + return nil +} diff --git a/node/pkg/query/narasu/memorystore_test.go b/node/pkg/query/narasu/memorystore_test.go new file mode 100644 index 0000000000..4c03b77423 --- /dev/null +++ b/node/pkg/query/narasu/memorystore_test.go @@ -0,0 +1,95 @@ +package narasu_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/certusone/wormhole/node/pkg/query/narasu" + "github.com/stretchr/testify/require" +) + +var bucketNames []string + +func init() { + for i := range 32 { + bucketNames = append(bucketNames, fmt.Sprintf("test-%d", i)) + } +} + +func BenchmarkMemoryStore(b *testing.B) { + ctx := context.Background() + start := time.Unix(1000, 0) + end := start.Add(time.Second * 59) + bucket := "test" + + b.Run("incr+clean", func(b *testing.B) { + c := narasu.NewMemoryStore(time.Second) + b.ResetTimer() + for i := 0; i < b.N; i++ { + c.IncrKey(ctx, bucketNames[0], 1, start.Add(time.Second*time.Duration(i))) + c.Cleanup(ctx, start.Add(time.Second*time.Duration(i)), 60*time.Second) + } + }) + b.Run("incr+getrange", func(b *testing.B) { + c := narasu.NewMemoryStore(time.Second) + for i := range 60 { + c.IncrKey(ctx, bucket, 1, start.Add(time.Second*time.Duration(i))) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + c.GetKeys(ctx, bucket, start, end) + } + }) + +} + +func TestMemoryStore(t *testing.T) { + ctx := context.Background() + interval := time.Minute + c := narasu.NewMemoryStore(interval) + start := time.Unix(0, 0) + end := start.Add(interval * 14) + + bucket := "test" + var val int + var err error + val, err = c.IncrKey(ctx, bucket, 1, start) + require.NoError(t, err) + require.Equal(t, 1, val) + + val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval)) + require.NoError(t, err) + require.Equal(t, 1, val) + + val, err = c.IncrKey(ctx, bucket, 1, start.Add(time.Duration(float64(interval)*0.5))) + require.NoError(t, err) + require.Equal(t, 2, val) + + val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval*2)) + require.NoError(t, err) + require.Equal(t, 1, val) + + val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval*10)) + require.NoError(t, err) + require.Equal(t, 1, val) + + val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval*12)) + require.NoError(t, err) + require.Equal(t, 1, val) + + xs, err := c.GetKeys(ctx, bucket, start, end) + require.NoError(t, err) + require.ElementsMatch(t, []int{ + 1, 2, 1, 1, 1}, xs) + + err = c.Cleanup(ctx, end, 8*interval) + require.NoError(t, err) + + xs, err = c.GetKeys(ctx, bucket, start, end) + require.NoError(t, err) + require.ElementsMatch(t, []int{ + 1, 1, + }, xs) +} diff --git a/node/pkg/query/narasu/store.go b/node/pkg/query/narasu/store.go new file mode 100644 index 0000000000..d0a1024eda --- /dev/null +++ b/node/pkg/query/narasu/store.go @@ -0,0 +1,12 @@ +package narasu + +import ( + "context" + "time" +) + +type Store interface { + IncrKey(ctx context.Context, bucket string, amount int, cur time.Time) (int, error) + GetKeys(ctx context.Context, bucket string, from time.Time, to time.Time) ([]int, error) + Cleanup(ctx context.Context, now time.Time, age time.Duration) error +} diff --git a/node/pkg/query/query.go b/node/pkg/query/query.go index 50d344482b..7e22165b79 100644 --- a/node/pkg/query/query.go +++ b/node/pkg/query/query.go @@ -4,11 +4,11 @@ import ( "context" "encoding/hex" "fmt" - "strings" "time" "github.com/certusone/wormhole/node/pkg/common" gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" + "github.com/certusone/wormhole/node/pkg/query/queryratelimit" "github.com/certusone/wormhole/node/pkg/supervisor" "github.com/wormhole-foundation/wormhole/sdk/vaa" @@ -44,20 +44,22 @@ const ( func NewQueryHandler( logger *zap.Logger, env common.Environment, - allowedRequestorsStr string, + limitEnforcer *queryratelimit.Enforcer, + limitPolicyProvider *queryratelimit.PolicyProvider, signedQueryReqC <-chan *gossipv1.SignedQueryRequest, chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal, queryResponseReadC <-chan *PerChainQueryResponseInternal, queryResponseWriteC chan<- *QueryResponsePublication, ) *QueryHandler { return &QueryHandler{ - logger: logger.With(zap.String("component", "ccq")), - env: env, - allowedRequestorsStr: allowedRequestorsStr, - signedQueryReqC: signedQueryReqC, - chainQueryReqC: chainQueryReqC, - queryResponseReadC: queryResponseReadC, - queryResponseWriteC: queryResponseWriteC, + logger: logger.With(zap.String("component", "ccq")), + env: env, + limitEnforcer: limitEnforcer, + limitPolicyProvider: limitPolicyProvider, + signedQueryReqC: signedQueryReqC, + chainQueryReqC: chainQueryReqC, + queryResponseReadC: queryResponseReadC, + queryResponseWriteC: queryResponseWriteC, } } @@ -69,14 +71,14 @@ type ( // QueryHandler defines the cross chain query handler. QueryHandler struct { - logger *zap.Logger - env common.Environment - allowedRequestorsStr string - signedQueryReqC <-chan *gossipv1.SignedQueryRequest - chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal - queryResponseReadC <-chan *PerChainQueryResponseInternal - queryResponseWriteC chan<- *QueryResponsePublication - allowedRequestors map[ethCommon.Address]struct{} + logger *zap.Logger + env common.Environment + limitEnforcer *queryratelimit.Enforcer + limitPolicyProvider *queryratelimit.PolicyProvider + signedQueryReqC <-chan *gossipv1.SignedQueryRequest + chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal + queryResponseReadC <-chan *PerChainQueryResponseInternal + queryResponseWriteC chan<- *QueryResponsePublication } // pendingQuery is the cache entry for a given query. @@ -102,49 +104,50 @@ type ( PerChainConfig struct { TimestampCacheSupported bool NumWorkers int + VmType string } ) // perChainConfig provides static config info for each chain. If a chain is not listed here, then it does not support queries. // Every chain listed here must have at least one worker specified. var perChainConfig = map[vaa.ChainID]PerChainConfig{ - vaa.ChainIDSolana: {NumWorkers: 10, TimestampCacheSupported: false}, - vaa.ChainIDEthereum: {NumWorkers: 5, TimestampCacheSupported: true}, - vaa.ChainIDBSC: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDPolygon: {NumWorkers: 5, TimestampCacheSupported: true}, - vaa.ChainIDAvalanche: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDOasis: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDAurora: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDFantom: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDKarura: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDAcala: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDKlaytn: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDCelo: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDMoonbeam: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDArbitrum: {NumWorkers: 5, TimestampCacheSupported: true}, - vaa.ChainIDOptimism: {NumWorkers: 5, TimestampCacheSupported: true}, - vaa.ChainIDBase: {NumWorkers: 5, TimestampCacheSupported: true}, - vaa.ChainIDScroll: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDMantle: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDBlast: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDXLayer: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDLinea: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDBerachain: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDSnaxchain: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDUnichain: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDWorldchain: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDInk: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDHolesky: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDArbitrumSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDBaseSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDOptimismSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDPolygonSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDHyperEVM: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDMonad: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDSeiEVM: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDMezo: {NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDFogo: {NumWorkers: 10, TimestampCacheSupported: true}, + vaa.ChainIDSolana: {VmType: "solana", NumWorkers: 10, TimestampCacheSupported: false}, + vaa.ChainIDEthereum: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDBSC: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDPolygon: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDAvalanche: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDOasis: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDAurora: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDFantom: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDKarura: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDAcala: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDKlaytn: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDCelo: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDMoonbeam: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDArbitrum: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDOptimism: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDBase: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDScroll: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDMantle: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDBlast: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDXLayer: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDLinea: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDBerachain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDSnaxchain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDUnichain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDWorldchain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDInk: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDHolesky: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDArbitrumSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDBaseSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDOptimismSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDPolygonSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDHyperEVM: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDMonad: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDSeiEVM: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDMezo: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDFogo: {VmType: "evm", NumWorkers: 10, TimestampCacheSupported: true}, } // GetPerChainConfig returns the config for the specified chain. If the chain is not configured it returns an empty struct, @@ -163,33 +166,38 @@ func (config PerChainConfig) QueriesSupported() bool { // Start initializes the query handler and starts the runnable. func (qh *QueryHandler) Start(ctx context.Context) error { - qh.logger.Debug("entering Start", zap.String("enforceFlag", qh.allowedRequestorsStr)) - - var err error - qh.allowedRequestors, err = parseAllowedRequesters(qh.allowedRequestorsStr) - if err != nil { - return fmt.Errorf("failed to parse allowed requesters: %w", err) - } - + qh.logger.Debug("entering Start") if err := supervisor.Run(ctx, "query_handler", common.WrapWithScissors(qh.handleQueryRequests, "query_handler")); err != nil { return fmt.Errorf("failed to start query handler routine: %w", err) } - return nil } // handleQueryRequests multiplexes observation requests to the appropriate chain func (qh *QueryHandler) handleQueryRequests(ctx context.Context) error { - return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval, AuditInterval) + return handleQueryRequestsImpl( + ctx, + qh.logger, + qh.limitEnforcer, + qh.limitPolicyProvider, + qh.signedQueryReqC, + qh.chainQueryReqC, + qh.queryResponseReadC, + qh.queryResponseWriteC, + qh.env, + RequestTimeout, + RetryInterval, + AuditInterval) } // handleQueryRequestsImpl allows instantiating the handler in the test environment with shorter timeout and retry parameters. func handleQueryRequestsImpl( ctx context.Context, logger *zap.Logger, + limitEnforcer *queryratelimit.Enforcer, + limitPolicyProvider *queryratelimit.PolicyProvider, signedQueryReqC <-chan *gossipv1.SignedQueryRequest, chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal, - allowedRequestors map[ethCommon.Address]struct{}, queryResponseReadC <-chan *PerChainQueryResponseInternal, queryResponseWriteC chan<- *QueryResponsePublication, env common.Environment, @@ -198,7 +206,7 @@ func handleQueryRequestsImpl( auditIntervalImpl time.Duration, ) error { qLogger := logger.With(zap.String("component", "ccqhandler")) - qLogger.Info("cross chain queries are enabled", zap.Any("allowedRequestors", allowedRequestors), zap.String("env", string(env))) + qLogger.Info("cross chain queries are enabled", zap.String("env", string(env))) pendingQueries := make(map[string]*pendingQuery) // Key is requestID. @@ -252,8 +260,16 @@ func handleQueryRequestsImpl( signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:]) - if _, exists := allowedRequestors[signerAddress]; !exists { - qLogger.Debug("invalid requestor", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID)) + // get a rate limit policy for this requestor + // if they dont have one, they will receive one with no networks, and so will fail any reasonable enforcement action + policy, err := limitPolicyProvider.GetPolicy(ctx, signerAddress) + if err != nil { + qLogger.Error("failed to get rate limit policy", zap.String("requestID", requestID), zap.Error(err)) + invalidQueryRequestReceived.WithLabelValues("failed_to_get_rate_limit_policy").Inc() + } + + if len(policy.Limits.Networks) == 0 { + qLogger.Debug("requestor has no limits / invalid requestor", zap.String("requestID", requestID)) invalidQueryRequestReceived.WithLabelValues("invalid_requestor").Inc() continue } @@ -272,6 +288,41 @@ func handleQueryRequestsImpl( invalidQueryRequestReceived.WithLabelValues("failed_to_unmarshal_request").Inc() continue } + // enforce the rate limit as soon as we have validated unmarshaled, even if it is invalid. + // the signer signed this! so they should be punished for sending bad requests. + // if they send a chain that is not supported, we will just drop it for now, but mayvbe we should rate limit them for the valid ones they sent. are we that evil ? + action := &queryratelimit.Action{ + Key: signerAddress, + Time: time.Now(), + Networks: make(map[string]int), + } + if ok := func() bool { + for _, pcq := range queryRequest.PerChainQueries { + chainID := vaa.ChainID(pcq.ChainId) + config := GetPerChainConfig(chainID) + if config.NumWorkers <= 0 { + qLogger.Debug("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID)) + invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc() + return false + } + action.Networks[config.VmType] += 1 + } + return true + }(); !ok { + continue + } + // perform the actual enforcement + limitResult, err := limitEnforcer.EnforcePolicy(ctx, policy, action) + if err != nil { + qLogger.Error("failed to enforce rate limit", zap.String("requestID", requestID), zap.Error(err)) + invalidQueryRequestReceived.WithLabelValues("failed_to_enforce_rate_limit").Inc() + continue + } + if !limitResult.Allowed { + qLogger.Warn("rate limit exceeded", zap.String("requestID", requestID), zap.Any("networks", limitResult.ExceededNetworks)) + invalidQueryRequestReceived.WithLabelValues("rate_limit_exceeded").Inc() + continue + } if err := queryRequest.Validate(); err != nil { qLogger.Error("received invalid message", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err)) @@ -280,39 +331,37 @@ func handleQueryRequestsImpl( } // Build the set of per chain queries and placeholders for the per chain responses. - errorFound := false queries := []*perChainQuery{} responses := make([]*PerChainQueryResponseInternal, len(queryRequest.PerChainQueries)) receiveTime := time.Now() - for requestIdx, pcq := range queryRequest.PerChainQueries { - chainID := vaa.ChainID(pcq.ChainId) - if _, exists := supportedChains[chainID]; !exists { - qLogger.Debug("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID)) - invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc() - errorFound = true - break - } - - channel, channelExists := chainQueryReqC[chainID] - if !channelExists { - qLogger.Debug("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID)) - invalidQueryRequestReceived.WithLabelValues("failed_to_look_up_channel").Inc() - errorFound = true - break - } + if errorFound := func() bool { + for requestIdx, pcq := range queryRequest.PerChainQueries { + chainID := vaa.ChainID(pcq.ChainId) + if _, exists := supportedChains[chainID]; !exists { + qLogger.Debug("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID)) + invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc() + return true + } - queries = append(queries, &perChainQuery{ - req: &PerChainQueryInternal{ - RequestID: requestID, - RequestIdx: requestIdx, - Request: pcq, - }, - channel: channel, - }) - } + channel, channelExists := chainQueryReqC[chainID] + if !channelExists { + qLogger.Debug("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID)) + invalidQueryRequestReceived.WithLabelValues("failed_to_look_up_channel").Inc() + return true + } - if errorFound { + queries = append(queries, &perChainQuery{ + req: &PerChainQueryInternal{ + RequestID: requestID, + RequestIdx: requestIdx, + Request: pcq, + }, + channel: channel, + }) + } + return false + }(); errorFound { continue } @@ -450,29 +499,6 @@ func handleQueryRequestsImpl( } } -// parseAllowedRequesters parses a comma separated list of allowed requesters into a map to be used for look ups. -func parseAllowedRequesters(ccqAllowedRequesters string) (map[ethCommon.Address]struct{}, error) { - if ccqAllowedRequesters == "" { - return nil, fmt.Errorf("if cross chain query is enabled `--ccqAllowedRequesters` must be specified") - } - - var nullAddr ethCommon.Address - result := make(map[ethCommon.Address]struct{}) - for _, str := range strings.Split(ccqAllowedRequesters, ",") { - addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(strings.TrimPrefix(str, "0x"))) - if addr == nullAddr { - return nil, fmt.Errorf("invalid value in `--ccqAllowedRequesters`: `%s`", str) - } - result[addr] = struct{}{} - } - - if len(result) <= 0 { - return nil, fmt.Errorf("no allowed requestors specified, ccqAllowedRequesters: `%s`", ccqAllowedRequesters) - } - - return result, nil -} - // ccqForwardToWatcher submits a query request to the appropriate watcher. It updates the request object if the write succeeds. // If the write fails, it does not update the last update time, which will cause a retry next interval (until it times out) func (pcq *perChainQuery) ccqForwardToWatcher(qLogger *zap.Logger, receiveTime time.Time) { diff --git a/node/pkg/query/queryratelimit/policyprovider.go b/node/pkg/query/queryratelimit/policyprovider.go new file mode 100644 index 0000000000..369bd5f98c --- /dev/null +++ b/node/pkg/query/queryratelimit/policyprovider.go @@ -0,0 +1,130 @@ +package queryratelimit + +import ( + "context" + "fmt" + "time" + + "github.com/ethereum/go-ethereum/common" + lru "github.com/hashicorp/golang-lru/v2" + "go.uber.org/zap" + "golang.org/x/sync/singleflight" +) + +// TODO(elee): this should really be an interface where the seprate parts are split out, ala, one for fetching, one for ttl cache. +type PolicyProvider struct { + fetcher func(ctx context.Context, key common.Address) (*Policy, error) + cacheDuration time.Duration + optimistic bool + parentContext context.Context + logger *zap.Logger + + cache *lru.Cache[common.Address, withExpiry[*Policy]] + + sf singleflight.Group +} + +type PolicyProviderOption func(*PolicyProvider) + +func WithPolicyProviderLogger(logger *zap.Logger) PolicyProviderOption { + return func(p *PolicyProvider) { + p.logger = logger + } +} + +func WithPolicyProviderFetcher(fetcher func(ctx context.Context, key common.Address) (*Policy, error)) PolicyProviderOption { + return func(p *PolicyProvider) { + p.fetcher = fetcher + } +} + +func WithPolicyProviderCache(cache *lru.Cache[common.Address, withExpiry[*Policy]]) PolicyProviderOption { + return func(p *PolicyProvider) { + p.cache = cache + } +} + +func WithPolicyProviderOptimistic(optimistic bool) PolicyProviderOption { + return func(p *PolicyProvider) { + p.optimistic = optimistic + } +} + +func WithPolicyProviderCacheDuration(cacheDuration time.Duration) PolicyProviderOption { + return func(p *PolicyProvider) { + p.cacheDuration = cacheDuration + } +} + +func WithPolicyProviderParentContext(ctx context.Context) PolicyProviderOption { + return func(p *PolicyProvider) { + p.parentContext = ctx + } +} + +var ErrNewPolicyProvider = fmt.Errorf("new rate limit policy provider") + +func NewPolicyProvider(ops ...PolicyProviderOption) (*PolicyProvider, error) { + o := &PolicyProvider{ + cacheDuration: time.Minute * 5, + parentContext: context.Background(), + } + for _, op := range ops { + if op == nil { + continue + } + op(o) + } + if o.cache == nil { + lru, err := lru.New[common.Address, withExpiry[*Policy]](1024) + if err != nil { + return nil, err + } + o.cache = lru + } + if o.fetcher == nil { + return nil, fmt.Errorf("%w: fetcher required", ErrNewPolicyProvider) + } + + return o, nil +} + +func (r *PolicyProvider) GetPolicy(ctx context.Context, key common.Address) (*Policy, error) { + val, hit := r.cache.Get(key) + if hit { + if time.Now().After(val.expiresAt) { + r.cache.Remove(key) + } + if r.optimistic { + go func() { + ctx, cn := context.WithTimeout(r.parentContext, time.Second*5) + defer cn() + if _, err := r.fetchAndFill(ctx, key); err != nil { + if r.logger != nil { + r.logger.Error("failed to fetch rate limit policy in background", zap.Error(err)) + } + } + }() + return val.v, nil + } + } + return r.fetchAndFill(ctx, key) +} + +func (r *PolicyProvider) fetchAndFill(ctx context.Context, key common.Address) (*Policy, error) { + res, err, _ := r.sf.Do(key.String(), func() (any, error) { + policy, err := r.fetcher(ctx, key) + if err != nil { + return nil, err + } + r.cache.Add(key, withExpiry[*Policy]{ + v: policy, + expiresAt: time.Now().Add(time.Duration(policy.Limits.Evm.MaxPerMinute) * time.Minute), + }) + return policy, nil + }) + if err != nil { + return nil, err + } + return res.(*Policy), err +} diff --git a/node/pkg/query/queryratelimit/ratelimit.go b/node/pkg/query/queryratelimit/ratelimit.go new file mode 100644 index 0000000000..4a47e1e947 --- /dev/null +++ b/node/pkg/query/queryratelimit/ratelimit.go @@ -0,0 +1,93 @@ +package queryratelimit + +import ( + "context" + "sync" + "time" + + "github.com/certusone/wormhole/node/pkg/query/narasu" + "github.com/ethereum/go-ethereum/common" +) + +type withExpiry[T any] struct { + v T + expiresAt time.Time +} + +type Enforcer struct { + secondLimits narasu.Store + minuteLimits narasu.Store + mu sync.Mutex + + enforcementCount int +} + +func NewEnforcer() *Enforcer { + return &Enforcer{ + secondLimits: narasu.NewMemoryStore(time.Second), + minuteLimits: narasu.NewMemoryStore(time.Minute), + } +} + +type EnforcementResponse struct { + Allowed bool + ExceededNetworks []string +} + +func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Action) (*EnforcementResponse, error) { + e.mu.Lock() + defer e.mu.Unlock() + e.enforcementCount++ + // TODO(elee): we can probably tune these variable better, if memory consumption or cpu usage of the ratelimiter ever becomes an issue + if e.enforcementCount > 1024 { + e.enforcementCount = 0 + e.secondLimits.Cleanup(ctx, time.Now(), 1*time.Hour) + e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour) + } + out := &EnforcementResponse{ + Allowed: true, + ExceededNetworks: []string{}, + } + for network, amount := range action.Networks { + if amount == 0 { + continue + } + limitForNetwork, ok := policy.Limits.Networks[network] + if !ok { + out.Allowed = false + out.ExceededNetworks = append(out.ExceededNetworks, network) + continue + } + thisSecond, err := e.secondLimits.IncrKey(ctx, action.Key.String(), amount, action.Time) + if err != nil { + // on failure to contact the rate limiter, we just error + return nil, err + } + if thisSecond > limitForNetwork.MaxPerSecond { + out.Allowed = false + out.ExceededNetworks = append(out.ExceededNetworks, network) + continue + } + } + return out, nil +} + +type Action struct { + Time time.Time `json:"time"` + Key common.Address `json:"key"` + Networks map[string]int `json:"networks"` +} + +type Policy struct { + Bucket string `json:"bucket"` + Limits Limits `json:"limits"` +} + +type Limits struct { + Networks map[string]Rule `json:"networks"` +} + +type Rule struct { + MaxPerSecond int `json:"max_per_second"` + MaxPerMinute int `json:"max_per_minute"` +} diff --git a/node/pkg/query/querystaking/evm.go b/node/pkg/query/querystaking/evm.go new file mode 100644 index 0000000000..117a566060 --- /dev/null +++ b/node/pkg/query/querystaking/evm.go @@ -0,0 +1,37 @@ +package querystaking + +import ( + "errors" + "fmt" + + "github.com/ethereum/go-ethereum/common" + "github.com/holiman/uint256" +) + +type StakeAndSigner struct { + StakeInfo *StakeInfo `abi:"StakeInfo"` + Signer common.Address `abi:"Signer"` +} + +type StakeInfo struct { + Amount *uint256.Int `abi:"uint256"` + ConversionTableIndex *uint256.Int `abi:"uint256"` + LockupEnd uint64 `abi:"uint64"` + AccessEnd uint64 `abi:"uint64"` +} + +var ErrInvalidLength = errors.New("invalid length") + +func ParseStakeInfo(data []byte) (*StakeInfo, error) { + empty := &StakeInfo{} + + if len(data) != 32*4 { + return nil, fmt.Errorf("invalid length: got %d want %d", len(data), 32*4) + } + empty.Amount = uint256.NewInt(0).SetBytes(data[0:32]) + empty.ConversionTableIndex = uint256.NewInt(0).SetBytes(data[32:64]) + tmp := uint256.NewInt(0) + empty.LockupEnd = tmp.SetBytes(data[64:72]).Uint64() + empty.AccessEnd = tmp.SetBytes(data[72:80]).Uint64() + return empty, nil +} From 5517533ad0961ba1b29b29e03faeabd3a978e598 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 14:49:42 -0500 Subject: [PATCH 02/12] noot --- node/pkg/query/queryratelimit/ratelimit.go | 1 - 1 file changed, 1 deletion(-) diff --git a/node/pkg/query/queryratelimit/ratelimit.go b/node/pkg/query/queryratelimit/ratelimit.go index 4a47e1e947..5df167b25b 100644 --- a/node/pkg/query/queryratelimit/ratelimit.go +++ b/node/pkg/query/queryratelimit/ratelimit.go @@ -79,7 +79,6 @@ type Action struct { } type Policy struct { - Bucket string `json:"bucket"` Limits Limits `json:"limits"` } From 15a3f81b9d6bc6a68501e30af90b5b681a9d8532 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 14:51:50 -0500 Subject: [PATCH 03/12] cache duration --- node/pkg/query/queryratelimit/policyprovider.go | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/node/pkg/query/queryratelimit/policyprovider.go b/node/pkg/query/queryratelimit/policyprovider.go index 369bd5f98c..7d475804bb 100644 --- a/node/pkg/query/queryratelimit/policyprovider.go +++ b/node/pkg/query/queryratelimit/policyprovider.go @@ -6,7 +6,7 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - lru "github.com/hashicorp/golang-lru/v2" + lru "github.com/hashicorp/golang-lru" "go.uber.org/zap" "golang.org/x/sync/singleflight" ) @@ -19,7 +19,7 @@ type PolicyProvider struct { parentContext context.Context logger *zap.Logger - cache *lru.Cache[common.Address, withExpiry[*Policy]] + cache *lru.Cache sf singleflight.Group } @@ -38,7 +38,7 @@ func WithPolicyProviderFetcher(fetcher func(ctx context.Context, key common.Addr } } -func WithPolicyProviderCache(cache *lru.Cache[common.Address, withExpiry[*Policy]]) PolicyProviderOption { +func WithPolicyProviderCache(cache *lru.Cache) PolicyProviderOption { return func(p *PolicyProvider) { p.cache = cache } @@ -76,7 +76,7 @@ func NewPolicyProvider(ops ...PolicyProviderOption) (*PolicyProvider, error) { op(o) } if o.cache == nil { - lru, err := lru.New[common.Address, withExpiry[*Policy]](1024) + lru, err := lru.New(1024) if err != nil { return nil, err } @@ -90,8 +90,9 @@ func NewPolicyProvider(ops ...PolicyProviderOption) (*PolicyProvider, error) { } func (r *PolicyProvider) GetPolicy(ctx context.Context, key common.Address) (*Policy, error) { - val, hit := r.cache.Get(key) + ival, hit := r.cache.Get(key) if hit { + val := ival.(withExpiry[*Policy]) if time.Now().After(val.expiresAt) { r.cache.Remove(key) } @@ -119,7 +120,7 @@ func (r *PolicyProvider) fetchAndFill(ctx context.Context, key common.Address) ( } r.cache.Add(key, withExpiry[*Policy]{ v: policy, - expiresAt: time.Now().Add(time.Duration(policy.Limits.Evm.MaxPerMinute) * time.Minute), + expiresAt: time.Now().Add(r.cacheDuration), }) return policy, nil }) From 7621a1b85bd8088fbd024ceed77740a825566485 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 14:51:54 -0500 Subject: [PATCH 04/12] noot --- node/go.mod | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/node/go.mod b/node/go.mod index 37dcf62dca..252dd12aa2 100644 --- a/node/go.mod +++ b/node/go.mod @@ -55,7 +55,6 @@ require ( github.com/grafana/dskit v0.0.0-20230201083518-528d8a7d52f2 github.com/grafana/loki v1.6.2-0.20230721141808-0d81144cfee8 github.com/hashicorp/golang-lru v0.6.0 - github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/holiman/uint256 v1.2.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.60.0 @@ -63,6 +62,7 @@ require ( github.com/wormhole-foundation/wormchain v0.0.0-00010101000000-000000000000 github.com/wormhole-foundation/wormhole/sdk v0.0.0-20220926172624-4b38dc650bb0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c + golang.org/x/sync v0.8.0 google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e gopkg.in/godo.v2 v2.0.9 nhooyr.io/websocket v1.8.7 @@ -190,6 +190,7 @@ require ( github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-rootcerts v1.0.2 // indirect github.com/hashicorp/go-sockaddr v1.0.2 // indirect + github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect github.com/hashicorp/hcl v1.0.0 // indirect github.com/hashicorp/memberlist v0.5.0 // indirect github.com/hashicorp/serf v0.10.1 // indirect @@ -357,7 +358,6 @@ require ( golang.org/x/mod v0.21.0 // indirect golang.org/x/net v0.30.0 // indirect golang.org/x/oauth2 v0.23.0 // indirect - golang.org/x/sync v0.8.0 // indirect golang.org/x/term v0.25.0 // indirect golang.org/x/text v0.19.0 // indirect golang.org/x/tools v0.26.0 // indirect From b4c7d5dae683d62281a87f174a2b2485da39644a Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 14:58:02 -0500 Subject: [PATCH 05/12] reorganize --- node/pkg/query/queryratelimit/enforcer.go | 72 ++++++++++++++++++++++ node/pkg/query/queryratelimit/ratelimit.go | 66 -------------------- 2 files changed, 72 insertions(+), 66 deletions(-) create mode 100644 node/pkg/query/queryratelimit/enforcer.go diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go new file mode 100644 index 0000000000..4f1c50eaa6 --- /dev/null +++ b/node/pkg/query/queryratelimit/enforcer.go @@ -0,0 +1,72 @@ +package queryratelimit + +import ( + "context" + "sync" + "time" + + "github.com/certusone/wormhole/node/pkg/query/narasu" +) + +type withExpiry[T any] struct { + v T + expiresAt time.Time +} + +type Enforcer struct { + secondLimits narasu.Store + minuteLimits narasu.Store + mu sync.Mutex + + enforcementCount int +} + +func NewEnforcer() *Enforcer { + return &Enforcer{ + secondLimits: narasu.NewMemoryStore(time.Second), + minuteLimits: narasu.NewMemoryStore(time.Minute), + } +} + +type EnforcementResponse struct { + Allowed bool + ExceededNetworks []string +} + +func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Action) (*EnforcementResponse, error) { + e.mu.Lock() + defer e.mu.Unlock() + e.enforcementCount++ + // TODO(elee): we can probably tune these variable better, if memory consumption or cpu usage of the ratelimiter ever becomes an issue + if e.enforcementCount > 1024 { + e.enforcementCount = 0 + e.secondLimits.Cleanup(ctx, time.Now(), 1*time.Hour) + e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour) + } + out := &EnforcementResponse{ + Allowed: true, + ExceededNetworks: []string{}, + } + for network, amount := range action.Networks { + if amount == 0 { + continue + } + limitForNetwork, ok := policy.Limits.Networks[network] + if !ok { + out.Allowed = false + out.ExceededNetworks = append(out.ExceededNetworks, network) + continue + } + thisSecond, err := e.secondLimits.IncrKey(ctx, action.Key.String(), amount, action.Time) + if err != nil { + // on failure to contact the rate limiter, we just error + return nil, err + } + if thisSecond > limitForNetwork.MaxPerSecond { + out.Allowed = false + out.ExceededNetworks = append(out.ExceededNetworks, network) + continue + } + } + return out, nil +} diff --git a/node/pkg/query/queryratelimit/ratelimit.go b/node/pkg/query/queryratelimit/ratelimit.go index 5df167b25b..cbf409f660 100644 --- a/node/pkg/query/queryratelimit/ratelimit.go +++ b/node/pkg/query/queryratelimit/ratelimit.go @@ -1,77 +1,11 @@ package queryratelimit import ( - "context" - "sync" "time" - "github.com/certusone/wormhole/node/pkg/query/narasu" "github.com/ethereum/go-ethereum/common" ) -type withExpiry[T any] struct { - v T - expiresAt time.Time -} - -type Enforcer struct { - secondLimits narasu.Store - minuteLimits narasu.Store - mu sync.Mutex - - enforcementCount int -} - -func NewEnforcer() *Enforcer { - return &Enforcer{ - secondLimits: narasu.NewMemoryStore(time.Second), - minuteLimits: narasu.NewMemoryStore(time.Minute), - } -} - -type EnforcementResponse struct { - Allowed bool - ExceededNetworks []string -} - -func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Action) (*EnforcementResponse, error) { - e.mu.Lock() - defer e.mu.Unlock() - e.enforcementCount++ - // TODO(elee): we can probably tune these variable better, if memory consumption or cpu usage of the ratelimiter ever becomes an issue - if e.enforcementCount > 1024 { - e.enforcementCount = 0 - e.secondLimits.Cleanup(ctx, time.Now(), 1*time.Hour) - e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour) - } - out := &EnforcementResponse{ - Allowed: true, - ExceededNetworks: []string{}, - } - for network, amount := range action.Networks { - if amount == 0 { - continue - } - limitForNetwork, ok := policy.Limits.Networks[network] - if !ok { - out.Allowed = false - out.ExceededNetworks = append(out.ExceededNetworks, network) - continue - } - thisSecond, err := e.secondLimits.IncrKey(ctx, action.Key.String(), amount, action.Time) - if err != nil { - // on failure to contact the rate limiter, we just error - return nil, err - } - if thisSecond > limitForNetwork.MaxPerSecond { - out.Allowed = false - out.ExceededNetworks = append(out.ExceededNetworks, network) - continue - } - } - return out, nil -} - type Action struct { Time time.Time `json:"time"` Key common.Address `json:"key"` From cdad51c5bef7d70f14aaf8a76d81015fc662cb53 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 14:58:48 -0500 Subject: [PATCH 06/12] it should be a modulo --- node/pkg/query/queryratelimit/enforcer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go index 4f1c50eaa6..ceadd920a2 100644 --- a/node/pkg/query/queryratelimit/enforcer.go +++ b/node/pkg/query/queryratelimit/enforcer.go @@ -38,7 +38,7 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac defer e.mu.Unlock() e.enforcementCount++ // TODO(elee): we can probably tune these variable better, if memory consumption or cpu usage of the ratelimiter ever becomes an issue - if e.enforcementCount > 1024 { + if e.enforcementCount%1024 == 0 { e.enforcementCount = 0 e.secondLimits.Cleanup(ctx, time.Now(), 1*time.Hour) e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour) From d15d64e9335c41cf18e992f98b83e233c9b091a8 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 15:30:07 -0500 Subject: [PATCH 07/12] use btree --- node/go.mod | 3 +- node/go.sum | 2 -- node/pkg/query/narasu/memorystore.go | 50 ++++++++++++++++------------ 3 files changed, 29 insertions(+), 26 deletions(-) diff --git a/node/go.mod b/node/go.mod index 252dd12aa2..75e27c10f7 100644 --- a/node/go.mod +++ b/node/go.mod @@ -51,6 +51,7 @@ require ( github.com/cosmos/cosmos-sdk v0.45.11 github.com/go-kit/kit v0.12.0 github.com/golang/snappy v0.0.4 + github.com/google/btree v1.1.2 github.com/google/uuid v1.6.0 github.com/grafana/dskit v0.0.0-20230201083518-528d8a7d52f2 github.com/grafana/loki v1.6.2-0.20230721141808-0d81144cfee8 @@ -58,7 +59,6 @@ require ( github.com/holiman/uint256 v1.2.1 github.com/prometheus/client_model v0.6.1 github.com/prometheus/common v0.60.0 - github.com/tidwall/btree v1.7.0 github.com/wormhole-foundation/wormchain v0.0.0-00010101000000-000000000000 github.com/wormhole-foundation/wormhole/sdk v0.0.0-20220926172624-4b38dc650bb0 golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c @@ -165,7 +165,6 @@ require ( github.com/golang/glog v1.1.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect - github.com/google/btree v1.1.2 // indirect github.com/google/flatbuffers v1.12.0 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/gofuzz v1.2.0 // indirect diff --git a/node/go.sum b/node/go.sum index 07618cb2b2..2e381e1c69 100644 --- a/node/go.sum +++ b/node/go.sum @@ -2984,8 +2984,6 @@ github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmN github.com/tetafro/godot v0.3.7/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0= github.com/tetafro/godot v0.4.2/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0= github.com/tetafro/godot v1.4.11/go.mod h1:LR3CJpxDVGlYOWn3ZZg1PgNZdTUvzsZWu8xaEohUpn8= -github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI= -github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY= github.com/tidwall/gjson v1.6.7/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI= github.com/tidwall/gjson v1.9.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= github.com/tidwall/gjson v1.15.0 h1:5n/pM+v3r5ujuNl4YLZLsQ+UE5jlkLVm7jMzT5Mpolw= diff --git a/node/pkg/query/narasu/memorystore.go b/node/pkg/query/narasu/memorystore.go index a4bf2c2644..bd5d096a62 100644 --- a/node/pkg/query/narasu/memorystore.go +++ b/node/pkg/query/narasu/memorystore.go @@ -5,17 +5,18 @@ import ( "sync" "time" - "github.com/tidwall/btree" + "github.com/google/btree" ) type MemoryStore struct { - tree *btree.Map[int, *memoryStoreItem] + tree *btree.BTreeG[*memoryStoreItem] mapPool sync.Pool interval time.Duration } type memoryStoreItem struct { - m map[string]int + ts int + m map[string]int } func (m *memoryStoreItem) Clear() { @@ -26,7 +27,9 @@ func (m *memoryStoreItem) Clear() { func NewMemoryStore(interval time.Duration) *MemoryStore { c := &MemoryStore{ - tree: btree.NewMap[int, *memoryStoreItem](8), + tree: btree.NewG(8, func(a, b *memoryStoreItem) bool { + return a.ts < b.ts + }), interval: interval, mapPool: sync.Pool{ New: func() any { @@ -43,7 +46,7 @@ func (s *MemoryStore) Close() error { return nil } -func (s *MemoryStore) getMap() *memoryStoreItem { +func (s *MemoryStore) getMap(key int) *memoryStoreItem { v := s.mapPool.Get() if v == nil { return &memoryStoreItem{ @@ -51,6 +54,7 @@ func (s *MemoryStore) getMap() *memoryStoreItem { } } item := v.(*memoryStoreItem) + item.ts = key item.Clear() return item } @@ -65,10 +69,10 @@ func (s *MemoryStore) time(cur time.Time) int { func (s *MemoryStore) IncrKey(ctx context.Context, bucket string, amount int, cur time.Time) (int, error) { now := s.time(cur) - val, ok := s.tree.Get(now) + val, ok := s.tree.Get(&memoryStoreItem{ts: now}) if !ok { - n := s.getMap() - s.tree.Set(now, n) + n := s.getMap(now) + s.tree.ReplaceOrInsert(n) val = n } if _, ok := val.m[bucket]; !ok { @@ -83,15 +87,18 @@ func (s *MemoryStore) GetKeys(ctx context.Context, bucket string, from time.Time out := make([]int, 0) toseconds := s.time(to) fromseconds := s.time(from) - s.tree.Ascend(fromseconds, func(key int, val *memoryStoreItem) bool { - if key > toseconds { - return false - } - if count, ok := val.m[bucket]; ok { - out = append(out, count) - } - return true - }) + s.tree.AscendRange( + &memoryStoreItem{ts: fromseconds}, + &memoryStoreItem{ts: toseconds}, + func(val *memoryStoreItem) bool { + if val.ts > toseconds { + return false + } + if count, ok := val.m[bucket]; ok { + out = append(out, count) + } + return true + }) return out, nil } @@ -100,18 +107,17 @@ func (s *MemoryStore) Cleanup(ctx context.Context, now time.Time, age time.Durat nowseconds := int(now.Unix()) ageSeconds := int(age.Seconds()) func() { - s.tree.Ascend(0, func(key int, val *memoryStoreItem) bool { + s.tree.Ascend(func(val *memoryStoreItem) bool { // extract the timestamp from the key timestamp:bucket - ts := key - if nowseconds-ts >= ageSeconds { - expired = append(expired, key) + if nowseconds-val.ts >= ageSeconds { + expired = append(expired, val.ts) return true } return false }) }() for _, key := range expired { - item, ok := s.tree.Delete(key) + item, ok := s.tree.Delete(&memoryStoreItem{ts: key}) if ok { s.putMap(item) } From 889fe199f58b583a05dfcbb810bf489229247ddd Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 15:42:52 -0500 Subject: [PATCH 08/12] noot --- node/pkg/query/queryratelimit/policyprovider.go | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/node/pkg/query/queryratelimit/policyprovider.go b/node/pkg/query/queryratelimit/policyprovider.go index 7d475804bb..4e7a1c1b22 100644 --- a/node/pkg/query/queryratelimit/policyprovider.go +++ b/node/pkg/query/queryratelimit/policyprovider.go @@ -14,6 +14,7 @@ import ( // TODO(elee): this should really be an interface where the seprate parts are split out, ala, one for fetching, one for ttl cache. type PolicyProvider struct { fetcher func(ctx context.Context, key common.Address) (*Policy, error) + fetchTimeout time.Duration cacheDuration time.Duration optimistic bool parentContext context.Context @@ -61,12 +62,18 @@ func WithPolicyProviderParentContext(ctx context.Context) PolicyProviderOption { p.parentContext = ctx } } +func WithPolicyProviderFetchTimeout(timeout time.Duration) PolicyProviderOption { + return func(p *PolicyProvider) { + p.fetchTimeout = timeout + } +} var ErrNewPolicyProvider = fmt.Errorf("new rate limit policy provider") func NewPolicyProvider(ops ...PolicyProviderOption) (*PolicyProvider, error) { o := &PolicyProvider{ cacheDuration: time.Minute * 5, + fetchTimeout: time.Second * 5, parentContext: context.Background(), } for _, op := range ops { @@ -98,7 +105,7 @@ func (r *PolicyProvider) GetPolicy(ctx context.Context, key common.Address) (*Po } if r.optimistic { go func() { - ctx, cn := context.WithTimeout(r.parentContext, time.Second*5) + ctx, cn := context.WithTimeout(r.parentContext, r.fetchTimeout) defer cn() if _, err := r.fetchAndFill(ctx, key); err != nil { if r.logger != nil { From 7e6fe72f3950e2c4c228efe254dca2f604f75af0 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 15:56:12 -0500 Subject: [PATCH 09/12] use type --- node/pkg/node/options.go | 10 ++- node/pkg/query/query.go | 87 +++++++++++----------- node/pkg/query/queryratelimit/enforcer.go | 16 ++-- node/pkg/query/queryratelimit/ratelimit.go | 8 +- 4 files changed, 62 insertions(+), 59 deletions(-) diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 4cdaf1f288..e9f2982fd3 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -169,12 +169,16 @@ func GuardianOptionQueryHandler(ccqEnabled bool, allowedRequesters string) *Guar } return &queryratelimit.Policy{ Limits: queryratelimit.Limits{ - Networks: map[string]queryratelimit.Rule{ - "evm": { + Types: map[uint8]queryratelimit.Rule{ + uint8(query.EthCallQueryRequestType): { MaxPerMinute: 15 * 60, MaxPerSecond: 15, }, - "solana": { + uint8(query.SolanaAccountQueryRequestType): { + MaxPerMinute: 15 * 60, + MaxPerSecond: 15, + }, + uint8(query.SolanaPdaQueryRequestType): { MaxPerMinute: 15 * 60, MaxPerSecond: 15, }, diff --git a/node/pkg/query/query.go b/node/pkg/query/query.go index 7e22165b79..806d0cec03 100644 --- a/node/pkg/query/query.go +++ b/node/pkg/query/query.go @@ -104,50 +104,49 @@ type ( PerChainConfig struct { TimestampCacheSupported bool NumWorkers int - VmType string } ) // perChainConfig provides static config info for each chain. If a chain is not listed here, then it does not support queries. // Every chain listed here must have at least one worker specified. var perChainConfig = map[vaa.ChainID]PerChainConfig{ - vaa.ChainIDSolana: {VmType: "solana", NumWorkers: 10, TimestampCacheSupported: false}, - vaa.ChainIDEthereum: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true}, - vaa.ChainIDBSC: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDPolygon: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true}, - vaa.ChainIDAvalanche: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDOasis: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDAurora: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDFantom: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDKarura: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDAcala: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDKlaytn: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDCelo: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDMoonbeam: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDArbitrum: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true}, - vaa.ChainIDOptimism: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true}, - vaa.ChainIDBase: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true}, - vaa.ChainIDScroll: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDMantle: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDBlast: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDXLayer: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDLinea: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDBerachain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDSnaxchain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDUnichain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDWorldchain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDInk: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDHolesky: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDArbitrumSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDBaseSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDOptimismSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDPolygonSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDHyperEVM: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDMonad: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDSeiEVM: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDMezo: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true}, - vaa.ChainIDFogo: {VmType: "evm", NumWorkers: 10, TimestampCacheSupported: true}, + vaa.ChainIDSolana: {NumWorkers: 10, TimestampCacheSupported: false}, + vaa.ChainIDEthereum: {NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDBSC: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDPolygon: {NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDAvalanche: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDOasis: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDAurora: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDFantom: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDKarura: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDAcala: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDKlaytn: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDCelo: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDMoonbeam: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDArbitrum: {NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDOptimism: {NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDBase: {NumWorkers: 5, TimestampCacheSupported: true}, + vaa.ChainIDScroll: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDMantle: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDBlast: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDXLayer: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDLinea: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDBerachain: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDSnaxchain: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDUnichain: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDWorldchain: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDInk: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDHolesky: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDArbitrumSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDBaseSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDOptimismSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDPolygonSepolia: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDHyperEVM: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDMonad: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDSeiEVM: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDMezo: {NumWorkers: 1, TimestampCacheSupported: true}, + vaa.ChainIDFogo: {NumWorkers: 10, TimestampCacheSupported: true}, } // GetPerChainConfig returns the config for the specified chain. If the chain is not configured it returns an empty struct, @@ -268,7 +267,7 @@ func handleQueryRequestsImpl( invalidQueryRequestReceived.WithLabelValues("failed_to_get_rate_limit_policy").Inc() } - if len(policy.Limits.Networks) == 0 { + if len(policy.Limits.Types) == 0 { qLogger.Debug("requestor has no limits / invalid requestor", zap.String("requestID", requestID)) invalidQueryRequestReceived.WithLabelValues("invalid_requestor").Inc() continue @@ -292,9 +291,9 @@ func handleQueryRequestsImpl( // the signer signed this! so they should be punished for sending bad requests. // if they send a chain that is not supported, we will just drop it for now, but mayvbe we should rate limit them for the valid ones they sent. are we that evil ? action := &queryratelimit.Action{ - Key: signerAddress, - Time: time.Now(), - Networks: make(map[string]int), + Key: signerAddress, + Time: time.Now(), + Types: make(map[uint8]int), } if ok := func() bool { for _, pcq := range queryRequest.PerChainQueries { @@ -305,7 +304,7 @@ func handleQueryRequestsImpl( invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc() return false } - action.Networks[config.VmType] += 1 + action.Types[uint8(pcq.Query.Type())] += 1 } return true }(); !ok { @@ -319,7 +318,7 @@ func handleQueryRequestsImpl( continue } if !limitResult.Allowed { - qLogger.Warn("rate limit exceeded", zap.String("requestID", requestID), zap.Any("networks", limitResult.ExceededNetworks)) + qLogger.Warn("rate limit exceeded", zap.String("requestID", requestID), zap.Any("types", limitResult.ExceededTypes)) invalidQueryRequestReceived.WithLabelValues("rate_limit_exceeded").Inc() continue } diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go index ceadd920a2..b49bddf139 100644 --- a/node/pkg/query/queryratelimit/enforcer.go +++ b/node/pkg/query/queryratelimit/enforcer.go @@ -29,8 +29,8 @@ func NewEnforcer() *Enforcer { } type EnforcementResponse struct { - Allowed bool - ExceededNetworks []string + Allowed bool `json:"allowed"` + ExceededTypes []uint8 `json:"exceeded_types"` } func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Action) (*EnforcementResponse, error) { @@ -44,17 +44,17 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour) } out := &EnforcementResponse{ - Allowed: true, - ExceededNetworks: []string{}, + Allowed: true, + ExceededTypes: []uint8{}, } - for network, amount := range action.Networks { + for network, amount := range action.Types { if amount == 0 { continue } - limitForNetwork, ok := policy.Limits.Networks[network] + limitForNetwork, ok := policy.Limits.Types[network] if !ok { out.Allowed = false - out.ExceededNetworks = append(out.ExceededNetworks, network) + out.ExceededTypes = append(out.ExceededTypes, network) continue } thisSecond, err := e.secondLimits.IncrKey(ctx, action.Key.String(), amount, action.Time) @@ -64,7 +64,7 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac } if thisSecond > limitForNetwork.MaxPerSecond { out.Allowed = false - out.ExceededNetworks = append(out.ExceededNetworks, network) + out.ExceededTypes = append(out.ExceededTypes, network) continue } } diff --git a/node/pkg/query/queryratelimit/ratelimit.go b/node/pkg/query/queryratelimit/ratelimit.go index cbf409f660..f0975aa8d7 100644 --- a/node/pkg/query/queryratelimit/ratelimit.go +++ b/node/pkg/query/queryratelimit/ratelimit.go @@ -7,9 +7,9 @@ import ( ) type Action struct { - Time time.Time `json:"time"` - Key common.Address `json:"key"` - Networks map[string]int `json:"networks"` + Time time.Time `json:"time"` + Key common.Address `json:"key"` + Types map[uint8]int `json:"networks"` } type Policy struct { @@ -17,7 +17,7 @@ type Policy struct { } type Limits struct { - Networks map[string]Rule `json:"networks"` + Types map[uint8]Rule `json:"types"` } type Rule struct { From 0c496e61c9b72aa20678e8d1367e743613718180 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 15:59:30 -0500 Subject: [PATCH 10/12] correct key --- node/pkg/query/queryratelimit/enforcer.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go index b49bddf139..00458c800c 100644 --- a/node/pkg/query/queryratelimit/enforcer.go +++ b/node/pkg/query/queryratelimit/enforcer.go @@ -2,6 +2,7 @@ package queryratelimit import ( "context" + "strconv" "sync" "time" @@ -47,24 +48,25 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac Allowed: true, ExceededTypes: []uint8{}, } - for network, amount := range action.Types { + for queryType, amount := range action.Types { if amount == 0 { continue } - limitForNetwork, ok := policy.Limits.Types[network] + limitForQueryType, ok := policy.Limits.Types[queryType] if !ok { out.Allowed = false - out.ExceededTypes = append(out.ExceededTypes, network) + out.ExceededTypes = append(out.ExceededTypes, queryType) continue } - thisSecond, err := e.secondLimits.IncrKey(ctx, action.Key.String(), amount, action.Time) + fullKey := strconv.Itoa(int(queryType)) + ":" + action.Key.String() + thisSecond, err := e.secondLimits.IncrKey(ctx, fullKey, amount, action.Time) if err != nil { // on failure to contact the rate limiter, we just error return nil, err } - if thisSecond > limitForNetwork.MaxPerSecond { + if thisSecond > limitForQueryType.MaxPerSecond { out.Allowed = false - out.ExceededTypes = append(out.ExceededTypes, network) + out.ExceededTypes = append(out.ExceededTypes, queryType) continue } } From 133ce400fb29e910745de30e54f1b7652b66169a Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 16:00:08 -0500 Subject: [PATCH 11/12] noot --- node/pkg/query/queryratelimit/enforcer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go index 00458c800c..ed1bf05659 100644 --- a/node/pkg/query/queryratelimit/enforcer.go +++ b/node/pkg/query/queryratelimit/enforcer.go @@ -41,7 +41,7 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac // TODO(elee): we can probably tune these variable better, if memory consumption or cpu usage of the ratelimiter ever becomes an issue if e.enforcementCount%1024 == 0 { e.enforcementCount = 0 - e.secondLimits.Cleanup(ctx, time.Now(), 1*time.Hour) + e.secondLimits.Cleanup(ctx, time.Now(), 5*time.Minute) e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour) } out := &EnforcementResponse{ From a61da167114a280c910817d7df8fbab6892db893 Mon Sep 17 00:00:00 2001 From: a Date: Mon, 21 Apr 2025 16:00:42 -0500 Subject: [PATCH 12/12] add minutely limit to check --- node/pkg/query/queryratelimit/enforcer.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go index ed1bf05659..534b8eabbd 100644 --- a/node/pkg/query/queryratelimit/enforcer.go +++ b/node/pkg/query/queryratelimit/enforcer.go @@ -69,6 +69,15 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac out.ExceededTypes = append(out.ExceededTypes, queryType) continue } + thisMinute, err := e.minuteLimits.IncrKey(ctx, fullKey, amount, action.Time) + if err != nil { + // on failure to contact the rate limiter, we just error + return nil, err + } + if thisMinute > limitForQueryType.MaxPerMinute { + out.Allowed = false + out.ExceededTypes = append(out.ExceededTypes, queryType) + } } return out, nil }