From fac553e370e93793f420e6a2ca972d8df795d086 Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 14:46:09 -0500
Subject: [PATCH 01/12] noot
---
node/go.mod | 3 +-
node/go.sum | 2 +
node/pkg/node/options.go | 51 +++-
node/pkg/query/narasu/memorystore.go | 120 ++++++++
node/pkg/query/narasu/memorystore_test.go | 95 +++++++
node/pkg/query/narasu/store.go | 12 +
node/pkg/query/query.go | 262 ++++++++++--------
.../query/queryratelimit/policyprovider.go | 130 +++++++++
node/pkg/query/queryratelimit/ratelimit.go | 93 +++++++
node/pkg/query/querystaking/evm.go | 37 +++
10 files changed, 685 insertions(+), 120 deletions(-)
create mode 100644 node/pkg/query/narasu/memorystore.go
create mode 100644 node/pkg/query/narasu/memorystore_test.go
create mode 100644 node/pkg/query/narasu/store.go
create mode 100644 node/pkg/query/queryratelimit/policyprovider.go
create mode 100644 node/pkg/query/queryratelimit/ratelimit.go
create mode 100644 node/pkg/query/querystaking/evm.go
diff --git a/node/go.mod b/node/go.mod
index 1d0eb5f260..37dcf62dca 100644
--- a/node/go.mod
+++ b/node/go.mod
@@ -55,9 +55,11 @@ require (
github.com/grafana/dskit v0.0.0-20230201083518-528d8a7d52f2
github.com/grafana/loki v1.6.2-0.20230721141808-0d81144cfee8
github.com/hashicorp/golang-lru v0.6.0
+ github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/holiman/uint256 v1.2.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.60.0
+ github.com/tidwall/btree v1.7.0
github.com/wormhole-foundation/wormchain v0.0.0-00010101000000-000000000000
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20220926172624-4b38dc650bb0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
@@ -188,7 +190,6 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
- github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/memberlist v0.5.0 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
diff --git a/node/go.sum b/node/go.sum
index 2e381e1c69..07618cb2b2 100644
--- a/node/go.sum
+++ b/node/go.sum
@@ -2984,6 +2984,8 @@ github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmN
github.com/tetafro/godot v0.3.7/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0=
github.com/tetafro/godot v0.4.2/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0=
github.com/tetafro/godot v1.4.11/go.mod h1:LR3CJpxDVGlYOWn3ZZg1PgNZdTUvzsZWu8xaEohUpn8=
+github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
+github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tidwall/gjson v1.6.7/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI=
github.com/tidwall/gjson v1.9.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.15.0 h1:5n/pM+v3r5ujuNl4YLZLsQ+UE5jlkLVm7jMzT5Mpolw=
diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go
index 037cb0fa86..4cdaf1f288 100644
--- a/node/pkg/node/options.go
+++ b/node/pkg/node/options.go
@@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
+ "strings"
"time"
"github.com/certusone/wormhole/node/pkg/accountant"
@@ -17,12 +18,14 @@ import (
"github.com/certusone/wormhole/node/pkg/processor"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
+ "github.com/certusone/wormhole/node/pkg/query/queryratelimit"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/ibc"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
"github.com/certusone/wormhole/node/pkg/wormconn"
+ ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/gorilla/mux"
libp2p_crypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/prometheus/client_golang/prometheus/promhttp"
@@ -139,10 +142,56 @@ func GuardianOptionQueryHandler(ccqEnabled bool, allowedRequesters string) *Guar
return nil
}
+ enforcer := queryratelimit.NewEnforcer()
+ if allowedRequesters == "" {
+ return fmt.Errorf("if cross chain query is enabled `--ccqAllowedRequesters` must be specified")
+ }
+ var nullAddr ethCommon.Address
+ result := make(map[ethCommon.Address]struct{})
+ for _, str := range strings.Split(allowedRequesters, ",") {
+ addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(strings.TrimPrefix(str, "0x")))
+ if addr == nullAddr {
+ return fmt.Errorf("invalid value in `--ccqAllowedRequesters`: `%s`", str)
+ }
+ result[addr] = struct{}{}
+ }
+ if len(result) <= 0 {
+ return fmt.Errorf("no allowed requestors specified, ccqAllowedRequesters: `%s`", allowedRequesters)
+ }
+
+ policyProvider, err := queryratelimit.NewPolicyProvider(
+ queryratelimit.WithPolicyProviderLogger(logger),
+ queryratelimit.WithPolicyProviderCacheDuration(24*time.Hour),
+ queryratelimit.WithPolicyProviderFetcher(func(ctx context.Context, key ethCommon.Address) (*queryratelimit.Policy, error) {
+ _, ok := result[key]
+ if !ok {
+ return &queryratelimit.Policy{}, nil
+ }
+ return &queryratelimit.Policy{
+ Limits: queryratelimit.Limits{
+ Networks: map[string]queryratelimit.Rule{
+ "evm": {
+ MaxPerMinute: 15 * 60,
+ MaxPerSecond: 15,
+ },
+ "solana": {
+ MaxPerMinute: 15 * 60,
+ MaxPerSecond: 15,
+ },
+ },
+ },
+ }, nil
+ }),
+ )
+ if err != nil {
+ return fmt.Errorf("failed to create rate limit policy provider: %w", err)
+ }
+
g.queryHandler = query.NewQueryHandler(
logger,
g.env,
- allowedRequesters,
+ enforcer,
+ policyProvider,
g.signedQueryReqC.readC,
g.chainQueryReqC,
g.queryResponseC.readC,
diff --git a/node/pkg/query/narasu/memorystore.go b/node/pkg/query/narasu/memorystore.go
new file mode 100644
index 0000000000..a4bf2c2644
--- /dev/null
+++ b/node/pkg/query/narasu/memorystore.go
@@ -0,0 +1,120 @@
+package narasu
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/tidwall/btree"
+)
+
+type MemoryStore struct {
+ tree *btree.Map[int, *memoryStoreItem]
+ mapPool sync.Pool
+ interval time.Duration
+}
+
+type memoryStoreItem struct {
+ m map[string]int
+}
+
+func (m *memoryStoreItem) Clear() {
+ for k := range m.m {
+ delete(m.m, k)
+ }
+}
+
+func NewMemoryStore(interval time.Duration) *MemoryStore {
+ c := &MemoryStore{
+ tree: btree.NewMap[int, *memoryStoreItem](8),
+ interval: interval,
+ mapPool: sync.Pool{
+ New: func() any {
+ return &memoryStoreItem{
+ m: make(map[string]int),
+ }
+ },
+ },
+ }
+ return c
+}
+
+func (s *MemoryStore) Close() error {
+ return nil
+}
+
+func (s *MemoryStore) getMap() *memoryStoreItem {
+ v := s.mapPool.Get()
+ if v == nil {
+ return &memoryStoreItem{
+ m: make(map[string]int),
+ }
+ }
+ item := v.(*memoryStoreItem)
+ item.Clear()
+ return item
+}
+
+func (s *MemoryStore) putMap(m *memoryStoreItem) {
+ s.mapPool.Put(m)
+}
+
+func (s *MemoryStore) time(cur time.Time) int {
+ return int(cur.Truncate(s.interval).Unix())
+}
+
+func (s *MemoryStore) IncrKey(ctx context.Context, bucket string, amount int, cur time.Time) (int, error) {
+ now := s.time(cur)
+ val, ok := s.tree.Get(now)
+ if !ok {
+ n := s.getMap()
+ s.tree.Set(now, n)
+ val = n
+ }
+ if _, ok := val.m[bucket]; !ok {
+ val.m[bucket] = amount
+ } else {
+ val.m[bucket] = val.m[bucket] + amount
+ }
+ return val.m[bucket], nil
+}
+
+func (s *MemoryStore) GetKeys(ctx context.Context, bucket string, from time.Time, to time.Time) ([]int, error) {
+ out := make([]int, 0)
+ toseconds := s.time(to)
+ fromseconds := s.time(from)
+ s.tree.Ascend(fromseconds, func(key int, val *memoryStoreItem) bool {
+ if key > toseconds {
+ return false
+ }
+ if count, ok := val.m[bucket]; ok {
+ out = append(out, count)
+ }
+ return true
+ })
+ return out, nil
+}
+
+func (s *MemoryStore) Cleanup(ctx context.Context, now time.Time, age time.Duration) error {
+ var expired []int
+ nowseconds := int(now.Unix())
+ ageSeconds := int(age.Seconds())
+ func() {
+ s.tree.Ascend(0, func(key int, val *memoryStoreItem) bool {
+ // extract the timestamp from the key timestamp:bucket
+ ts := key
+ if nowseconds-ts >= ageSeconds {
+ expired = append(expired, key)
+ return true
+ }
+ return false
+ })
+ }()
+ for _, key := range expired {
+ item, ok := s.tree.Delete(key)
+ if ok {
+ s.putMap(item)
+ }
+ }
+ return nil
+}
diff --git a/node/pkg/query/narasu/memorystore_test.go b/node/pkg/query/narasu/memorystore_test.go
new file mode 100644
index 0000000000..4c03b77423
--- /dev/null
+++ b/node/pkg/query/narasu/memorystore_test.go
@@ -0,0 +1,95 @@
+package narasu_test
+
+import (
+ "context"
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/certusone/wormhole/node/pkg/query/narasu"
+ "github.com/stretchr/testify/require"
+)
+
+var bucketNames []string
+
+func init() {
+ for i := range 32 {
+ bucketNames = append(bucketNames, fmt.Sprintf("test-%d", i))
+ }
+}
+
+func BenchmarkMemoryStore(b *testing.B) {
+ ctx := context.Background()
+ start := time.Unix(1000, 0)
+ end := start.Add(time.Second * 59)
+ bucket := "test"
+
+ b.Run("incr+clean", func(b *testing.B) {
+ c := narasu.NewMemoryStore(time.Second)
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ c.IncrKey(ctx, bucketNames[0], 1, start.Add(time.Second*time.Duration(i)))
+ c.Cleanup(ctx, start.Add(time.Second*time.Duration(i)), 60*time.Second)
+ }
+ })
+ b.Run("incr+getrange", func(b *testing.B) {
+ c := narasu.NewMemoryStore(time.Second)
+ for i := range 60 {
+ c.IncrKey(ctx, bucket, 1, start.Add(time.Second*time.Duration(i)))
+ }
+ b.ResetTimer()
+ for n := 0; n < b.N; n++ {
+ c.GetKeys(ctx, bucket, start, end)
+ }
+ })
+
+}
+
+func TestMemoryStore(t *testing.T) {
+ ctx := context.Background()
+ interval := time.Minute
+ c := narasu.NewMemoryStore(interval)
+ start := time.Unix(0, 0)
+ end := start.Add(interval * 14)
+
+ bucket := "test"
+ var val int
+ var err error
+ val, err = c.IncrKey(ctx, bucket, 1, start)
+ require.NoError(t, err)
+ require.Equal(t, 1, val)
+
+ val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval))
+ require.NoError(t, err)
+ require.Equal(t, 1, val)
+
+ val, err = c.IncrKey(ctx, bucket, 1, start.Add(time.Duration(float64(interval)*0.5)))
+ require.NoError(t, err)
+ require.Equal(t, 2, val)
+
+ val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval*2))
+ require.NoError(t, err)
+ require.Equal(t, 1, val)
+
+ val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval*10))
+ require.NoError(t, err)
+ require.Equal(t, 1, val)
+
+ val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval*12))
+ require.NoError(t, err)
+ require.Equal(t, 1, val)
+
+ xs, err := c.GetKeys(ctx, bucket, start, end)
+ require.NoError(t, err)
+ require.ElementsMatch(t, []int{
+ 1, 2, 1, 1, 1}, xs)
+
+ err = c.Cleanup(ctx, end, 8*interval)
+ require.NoError(t, err)
+
+ xs, err = c.GetKeys(ctx, bucket, start, end)
+ require.NoError(t, err)
+ require.ElementsMatch(t, []int{
+ 1, 1,
+ }, xs)
+}
diff --git a/node/pkg/query/narasu/store.go b/node/pkg/query/narasu/store.go
new file mode 100644
index 0000000000..d0a1024eda
--- /dev/null
+++ b/node/pkg/query/narasu/store.go
@@ -0,0 +1,12 @@
+package narasu
+
+import (
+ "context"
+ "time"
+)
+
+type Store interface {
+ IncrKey(ctx context.Context, bucket string, amount int, cur time.Time) (int, error)
+ GetKeys(ctx context.Context, bucket string, from time.Time, to time.Time) ([]int, error)
+ Cleanup(ctx context.Context, now time.Time, age time.Duration) error
+}
diff --git a/node/pkg/query/query.go b/node/pkg/query/query.go
index 50d344482b..7e22165b79 100644
--- a/node/pkg/query/query.go
+++ b/node/pkg/query/query.go
@@ -4,11 +4,11 @@ import (
"context"
"encoding/hex"
"fmt"
- "strings"
"time"
"github.com/certusone/wormhole/node/pkg/common"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
+ "github.com/certusone/wormhole/node/pkg/query/queryratelimit"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
@@ -44,20 +44,22 @@ const (
func NewQueryHandler(
logger *zap.Logger,
env common.Environment,
- allowedRequestorsStr string,
+ limitEnforcer *queryratelimit.Enforcer,
+ limitPolicyProvider *queryratelimit.PolicyProvider,
signedQueryReqC <-chan *gossipv1.SignedQueryRequest,
chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal,
queryResponseReadC <-chan *PerChainQueryResponseInternal,
queryResponseWriteC chan<- *QueryResponsePublication,
) *QueryHandler {
return &QueryHandler{
- logger: logger.With(zap.String("component", "ccq")),
- env: env,
- allowedRequestorsStr: allowedRequestorsStr,
- signedQueryReqC: signedQueryReqC,
- chainQueryReqC: chainQueryReqC,
- queryResponseReadC: queryResponseReadC,
- queryResponseWriteC: queryResponseWriteC,
+ logger: logger.With(zap.String("component", "ccq")),
+ env: env,
+ limitEnforcer: limitEnforcer,
+ limitPolicyProvider: limitPolicyProvider,
+ signedQueryReqC: signedQueryReqC,
+ chainQueryReqC: chainQueryReqC,
+ queryResponseReadC: queryResponseReadC,
+ queryResponseWriteC: queryResponseWriteC,
}
}
@@ -69,14 +71,14 @@ type (
// QueryHandler defines the cross chain query handler.
QueryHandler struct {
- logger *zap.Logger
- env common.Environment
- allowedRequestorsStr string
- signedQueryReqC <-chan *gossipv1.SignedQueryRequest
- chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal
- queryResponseReadC <-chan *PerChainQueryResponseInternal
- queryResponseWriteC chan<- *QueryResponsePublication
- allowedRequestors map[ethCommon.Address]struct{}
+ logger *zap.Logger
+ env common.Environment
+ limitEnforcer *queryratelimit.Enforcer
+ limitPolicyProvider *queryratelimit.PolicyProvider
+ signedQueryReqC <-chan *gossipv1.SignedQueryRequest
+ chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal
+ queryResponseReadC <-chan *PerChainQueryResponseInternal
+ queryResponseWriteC chan<- *QueryResponsePublication
}
// pendingQuery is the cache entry for a given query.
@@ -102,49 +104,50 @@ type (
PerChainConfig struct {
TimestampCacheSupported bool
NumWorkers int
+ VmType string
}
)
// perChainConfig provides static config info for each chain. If a chain is not listed here, then it does not support queries.
// Every chain listed here must have at least one worker specified.
var perChainConfig = map[vaa.ChainID]PerChainConfig{
- vaa.ChainIDSolana: {NumWorkers: 10, TimestampCacheSupported: false},
- vaa.ChainIDEthereum: {NumWorkers: 5, TimestampCacheSupported: true},
- vaa.ChainIDBSC: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDPolygon: {NumWorkers: 5, TimestampCacheSupported: true},
- vaa.ChainIDAvalanche: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDOasis: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDAurora: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDFantom: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDKarura: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDAcala: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDKlaytn: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDCelo: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDMoonbeam: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDArbitrum: {NumWorkers: 5, TimestampCacheSupported: true},
- vaa.ChainIDOptimism: {NumWorkers: 5, TimestampCacheSupported: true},
- vaa.ChainIDBase: {NumWorkers: 5, TimestampCacheSupported: true},
- vaa.ChainIDScroll: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDMantle: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDBlast: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDXLayer: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDLinea: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDBerachain: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDSnaxchain: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDUnichain: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDWorldchain: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDInk: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDHolesky: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDArbitrumSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDBaseSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDOptimismSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDPolygonSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDHyperEVM: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDMonad: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDSeiEVM: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDMezo: {NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDFogo: {NumWorkers: 10, TimestampCacheSupported: true},
+ vaa.ChainIDSolana: {VmType: "solana", NumWorkers: 10, TimestampCacheSupported: false},
+ vaa.ChainIDEthereum: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true},
+ vaa.ChainIDBSC: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDPolygon: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true},
+ vaa.ChainIDAvalanche: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDOasis: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDAurora: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDFantom: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDKarura: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDAcala: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDKlaytn: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDCelo: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDMoonbeam: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDArbitrum: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true},
+ vaa.ChainIDOptimism: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true},
+ vaa.ChainIDBase: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true},
+ vaa.ChainIDScroll: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDMantle: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDBlast: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDXLayer: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDLinea: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDBerachain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDSnaxchain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDUnichain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDWorldchain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDInk: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDHolesky: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDArbitrumSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDBaseSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDOptimismSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDPolygonSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDHyperEVM: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDMonad: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDSeiEVM: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDMezo: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDFogo: {VmType: "evm", NumWorkers: 10, TimestampCacheSupported: true},
}
// GetPerChainConfig returns the config for the specified chain. If the chain is not configured it returns an empty struct,
@@ -163,33 +166,38 @@ func (config PerChainConfig) QueriesSupported() bool {
// Start initializes the query handler and starts the runnable.
func (qh *QueryHandler) Start(ctx context.Context) error {
- qh.logger.Debug("entering Start", zap.String("enforceFlag", qh.allowedRequestorsStr))
-
- var err error
- qh.allowedRequestors, err = parseAllowedRequesters(qh.allowedRequestorsStr)
- if err != nil {
- return fmt.Errorf("failed to parse allowed requesters: %w", err)
- }
-
+ qh.logger.Debug("entering Start")
if err := supervisor.Run(ctx, "query_handler", common.WrapWithScissors(qh.handleQueryRequests, "query_handler")); err != nil {
return fmt.Errorf("failed to start query handler routine: %w", err)
}
-
return nil
}
// handleQueryRequests multiplexes observation requests to the appropriate chain
func (qh *QueryHandler) handleQueryRequests(ctx context.Context) error {
- return handleQueryRequestsImpl(ctx, qh.logger, qh.signedQueryReqC, qh.chainQueryReqC, qh.allowedRequestors, qh.queryResponseReadC, qh.queryResponseWriteC, qh.env, RequestTimeout, RetryInterval, AuditInterval)
+ return handleQueryRequestsImpl(
+ ctx,
+ qh.logger,
+ qh.limitEnforcer,
+ qh.limitPolicyProvider,
+ qh.signedQueryReqC,
+ qh.chainQueryReqC,
+ qh.queryResponseReadC,
+ qh.queryResponseWriteC,
+ qh.env,
+ RequestTimeout,
+ RetryInterval,
+ AuditInterval)
}
// handleQueryRequestsImpl allows instantiating the handler in the test environment with shorter timeout and retry parameters.
func handleQueryRequestsImpl(
ctx context.Context,
logger *zap.Logger,
+ limitEnforcer *queryratelimit.Enforcer,
+ limitPolicyProvider *queryratelimit.PolicyProvider,
signedQueryReqC <-chan *gossipv1.SignedQueryRequest,
chainQueryReqC map[vaa.ChainID]chan *PerChainQueryInternal,
- allowedRequestors map[ethCommon.Address]struct{},
queryResponseReadC <-chan *PerChainQueryResponseInternal,
queryResponseWriteC chan<- *QueryResponsePublication,
env common.Environment,
@@ -198,7 +206,7 @@ func handleQueryRequestsImpl(
auditIntervalImpl time.Duration,
) error {
qLogger := logger.With(zap.String("component", "ccqhandler"))
- qLogger.Info("cross chain queries are enabled", zap.Any("allowedRequestors", allowedRequestors), zap.String("env", string(env)))
+ qLogger.Info("cross chain queries are enabled", zap.String("env", string(env)))
pendingQueries := make(map[string]*pendingQuery) // Key is requestID.
@@ -252,8 +260,16 @@ func handleQueryRequestsImpl(
signerAddress := ethCommon.BytesToAddress(ethCrypto.Keccak256(signerBytes[1:])[12:])
- if _, exists := allowedRequestors[signerAddress]; !exists {
- qLogger.Debug("invalid requestor", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID))
+ // get a rate limit policy for this requestor
+ // if they dont have one, they will receive one with no networks, and so will fail any reasonable enforcement action
+ policy, err := limitPolicyProvider.GetPolicy(ctx, signerAddress)
+ if err != nil {
+ qLogger.Error("failed to get rate limit policy", zap.String("requestID", requestID), zap.Error(err))
+ invalidQueryRequestReceived.WithLabelValues("failed_to_get_rate_limit_policy").Inc()
+ }
+
+ if len(policy.Limits.Networks) == 0 {
+ qLogger.Debug("requestor has no limits / invalid requestor", zap.String("requestID", requestID))
invalidQueryRequestReceived.WithLabelValues("invalid_requestor").Inc()
continue
}
@@ -272,6 +288,41 @@ func handleQueryRequestsImpl(
invalidQueryRequestReceived.WithLabelValues("failed_to_unmarshal_request").Inc()
continue
}
+ // enforce the rate limit as soon as we have validated unmarshaled, even if it is invalid.
+ // the signer signed this! so they should be punished for sending bad requests.
+ // if they send a chain that is not supported, we will just drop it for now, but mayvbe we should rate limit them for the valid ones they sent. are we that evil ?
+ action := &queryratelimit.Action{
+ Key: signerAddress,
+ Time: time.Now(),
+ Networks: make(map[string]int),
+ }
+ if ok := func() bool {
+ for _, pcq := range queryRequest.PerChainQueries {
+ chainID := vaa.ChainID(pcq.ChainId)
+ config := GetPerChainConfig(chainID)
+ if config.NumWorkers <= 0 {
+ qLogger.Debug("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID))
+ invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc()
+ return false
+ }
+ action.Networks[config.VmType] += 1
+ }
+ return true
+ }(); !ok {
+ continue
+ }
+ // perform the actual enforcement
+ limitResult, err := limitEnforcer.EnforcePolicy(ctx, policy, action)
+ if err != nil {
+ qLogger.Error("failed to enforce rate limit", zap.String("requestID", requestID), zap.Error(err))
+ invalidQueryRequestReceived.WithLabelValues("failed_to_enforce_rate_limit").Inc()
+ continue
+ }
+ if !limitResult.Allowed {
+ qLogger.Warn("rate limit exceeded", zap.String("requestID", requestID), zap.Any("networks", limitResult.ExceededNetworks))
+ invalidQueryRequestReceived.WithLabelValues("rate_limit_exceeded").Inc()
+ continue
+ }
if err := queryRequest.Validate(); err != nil {
qLogger.Error("received invalid message", zap.String("requestor", signerAddress.Hex()), zap.String("requestID", requestID), zap.Error(err))
@@ -280,39 +331,37 @@ func handleQueryRequestsImpl(
}
// Build the set of per chain queries and placeholders for the per chain responses.
- errorFound := false
queries := []*perChainQuery{}
responses := make([]*PerChainQueryResponseInternal, len(queryRequest.PerChainQueries))
receiveTime := time.Now()
- for requestIdx, pcq := range queryRequest.PerChainQueries {
- chainID := vaa.ChainID(pcq.ChainId)
- if _, exists := supportedChains[chainID]; !exists {
- qLogger.Debug("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID))
- invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc()
- errorFound = true
- break
- }
-
- channel, channelExists := chainQueryReqC[chainID]
- if !channelExists {
- qLogger.Debug("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID))
- invalidQueryRequestReceived.WithLabelValues("failed_to_look_up_channel").Inc()
- errorFound = true
- break
- }
+ if errorFound := func() bool {
+ for requestIdx, pcq := range queryRequest.PerChainQueries {
+ chainID := vaa.ChainID(pcq.ChainId)
+ if _, exists := supportedChains[chainID]; !exists {
+ qLogger.Debug("chain does not support cross chain queries", zap.String("requestID", requestID), zap.Stringer("chainID", chainID))
+ invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc()
+ return true
+ }
- queries = append(queries, &perChainQuery{
- req: &PerChainQueryInternal{
- RequestID: requestID,
- RequestIdx: requestIdx,
- Request: pcq,
- },
- channel: channel,
- })
- }
+ channel, channelExists := chainQueryReqC[chainID]
+ if !channelExists {
+ qLogger.Debug("unknown chain ID for query request, dropping it", zap.String("requestID", requestID), zap.Stringer("chain_id", chainID))
+ invalidQueryRequestReceived.WithLabelValues("failed_to_look_up_channel").Inc()
+ return true
+ }
- if errorFound {
+ queries = append(queries, &perChainQuery{
+ req: &PerChainQueryInternal{
+ RequestID: requestID,
+ RequestIdx: requestIdx,
+ Request: pcq,
+ },
+ channel: channel,
+ })
+ }
+ return false
+ }(); errorFound {
continue
}
@@ -450,29 +499,6 @@ func handleQueryRequestsImpl(
}
}
-// parseAllowedRequesters parses a comma separated list of allowed requesters into a map to be used for look ups.
-func parseAllowedRequesters(ccqAllowedRequesters string) (map[ethCommon.Address]struct{}, error) {
- if ccqAllowedRequesters == "" {
- return nil, fmt.Errorf("if cross chain query is enabled `--ccqAllowedRequesters` must be specified")
- }
-
- var nullAddr ethCommon.Address
- result := make(map[ethCommon.Address]struct{})
- for _, str := range strings.Split(ccqAllowedRequesters, ",") {
- addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(strings.TrimPrefix(str, "0x")))
- if addr == nullAddr {
- return nil, fmt.Errorf("invalid value in `--ccqAllowedRequesters`: `%s`", str)
- }
- result[addr] = struct{}{}
- }
-
- if len(result) <= 0 {
- return nil, fmt.Errorf("no allowed requestors specified, ccqAllowedRequesters: `%s`", ccqAllowedRequesters)
- }
-
- return result, nil
-}
-
// ccqForwardToWatcher submits a query request to the appropriate watcher. It updates the request object if the write succeeds.
// If the write fails, it does not update the last update time, which will cause a retry next interval (until it times out)
func (pcq *perChainQuery) ccqForwardToWatcher(qLogger *zap.Logger, receiveTime time.Time) {
diff --git a/node/pkg/query/queryratelimit/policyprovider.go b/node/pkg/query/queryratelimit/policyprovider.go
new file mode 100644
index 0000000000..369bd5f98c
--- /dev/null
+++ b/node/pkg/query/queryratelimit/policyprovider.go
@@ -0,0 +1,130 @@
+package queryratelimit
+
+import (
+ "context"
+ "fmt"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ lru "github.com/hashicorp/golang-lru/v2"
+ "go.uber.org/zap"
+ "golang.org/x/sync/singleflight"
+)
+
+// TODO(elee): this should really be an interface where the seprate parts are split out, ala, one for fetching, one for ttl cache.
+type PolicyProvider struct {
+ fetcher func(ctx context.Context, key common.Address) (*Policy, error)
+ cacheDuration time.Duration
+ optimistic bool
+ parentContext context.Context
+ logger *zap.Logger
+
+ cache *lru.Cache[common.Address, withExpiry[*Policy]]
+
+ sf singleflight.Group
+}
+
+type PolicyProviderOption func(*PolicyProvider)
+
+func WithPolicyProviderLogger(logger *zap.Logger) PolicyProviderOption {
+ return func(p *PolicyProvider) {
+ p.logger = logger
+ }
+}
+
+func WithPolicyProviderFetcher(fetcher func(ctx context.Context, key common.Address) (*Policy, error)) PolicyProviderOption {
+ return func(p *PolicyProvider) {
+ p.fetcher = fetcher
+ }
+}
+
+func WithPolicyProviderCache(cache *lru.Cache[common.Address, withExpiry[*Policy]]) PolicyProviderOption {
+ return func(p *PolicyProvider) {
+ p.cache = cache
+ }
+}
+
+func WithPolicyProviderOptimistic(optimistic bool) PolicyProviderOption {
+ return func(p *PolicyProvider) {
+ p.optimistic = optimistic
+ }
+}
+
+func WithPolicyProviderCacheDuration(cacheDuration time.Duration) PolicyProviderOption {
+ return func(p *PolicyProvider) {
+ p.cacheDuration = cacheDuration
+ }
+}
+
+func WithPolicyProviderParentContext(ctx context.Context) PolicyProviderOption {
+ return func(p *PolicyProvider) {
+ p.parentContext = ctx
+ }
+}
+
+var ErrNewPolicyProvider = fmt.Errorf("new rate limit policy provider")
+
+func NewPolicyProvider(ops ...PolicyProviderOption) (*PolicyProvider, error) {
+ o := &PolicyProvider{
+ cacheDuration: time.Minute * 5,
+ parentContext: context.Background(),
+ }
+ for _, op := range ops {
+ if op == nil {
+ continue
+ }
+ op(o)
+ }
+ if o.cache == nil {
+ lru, err := lru.New[common.Address, withExpiry[*Policy]](1024)
+ if err != nil {
+ return nil, err
+ }
+ o.cache = lru
+ }
+ if o.fetcher == nil {
+ return nil, fmt.Errorf("%w: fetcher required", ErrNewPolicyProvider)
+ }
+
+ return o, nil
+}
+
+func (r *PolicyProvider) GetPolicy(ctx context.Context, key common.Address) (*Policy, error) {
+ val, hit := r.cache.Get(key)
+ if hit {
+ if time.Now().After(val.expiresAt) {
+ r.cache.Remove(key)
+ }
+ if r.optimistic {
+ go func() {
+ ctx, cn := context.WithTimeout(r.parentContext, time.Second*5)
+ defer cn()
+ if _, err := r.fetchAndFill(ctx, key); err != nil {
+ if r.logger != nil {
+ r.logger.Error("failed to fetch rate limit policy in background", zap.Error(err))
+ }
+ }
+ }()
+ return val.v, nil
+ }
+ }
+ return r.fetchAndFill(ctx, key)
+}
+
+func (r *PolicyProvider) fetchAndFill(ctx context.Context, key common.Address) (*Policy, error) {
+ res, err, _ := r.sf.Do(key.String(), func() (any, error) {
+ policy, err := r.fetcher(ctx, key)
+ if err != nil {
+ return nil, err
+ }
+ r.cache.Add(key, withExpiry[*Policy]{
+ v: policy,
+ expiresAt: time.Now().Add(time.Duration(policy.Limits.Evm.MaxPerMinute) * time.Minute),
+ })
+ return policy, nil
+ })
+ if err != nil {
+ return nil, err
+ }
+ return res.(*Policy), err
+}
diff --git a/node/pkg/query/queryratelimit/ratelimit.go b/node/pkg/query/queryratelimit/ratelimit.go
new file mode 100644
index 0000000000..4a47e1e947
--- /dev/null
+++ b/node/pkg/query/queryratelimit/ratelimit.go
@@ -0,0 +1,93 @@
+package queryratelimit
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/certusone/wormhole/node/pkg/query/narasu"
+ "github.com/ethereum/go-ethereum/common"
+)
+
+type withExpiry[T any] struct {
+ v T
+ expiresAt time.Time
+}
+
+type Enforcer struct {
+ secondLimits narasu.Store
+ minuteLimits narasu.Store
+ mu sync.Mutex
+
+ enforcementCount int
+}
+
+func NewEnforcer() *Enforcer {
+ return &Enforcer{
+ secondLimits: narasu.NewMemoryStore(time.Second),
+ minuteLimits: narasu.NewMemoryStore(time.Minute),
+ }
+}
+
+type EnforcementResponse struct {
+ Allowed bool
+ ExceededNetworks []string
+}
+
+func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Action) (*EnforcementResponse, error) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ e.enforcementCount++
+ // TODO(elee): we can probably tune these variable better, if memory consumption or cpu usage of the ratelimiter ever becomes an issue
+ if e.enforcementCount > 1024 {
+ e.enforcementCount = 0
+ e.secondLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
+ e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
+ }
+ out := &EnforcementResponse{
+ Allowed: true,
+ ExceededNetworks: []string{},
+ }
+ for network, amount := range action.Networks {
+ if amount == 0 {
+ continue
+ }
+ limitForNetwork, ok := policy.Limits.Networks[network]
+ if !ok {
+ out.Allowed = false
+ out.ExceededNetworks = append(out.ExceededNetworks, network)
+ continue
+ }
+ thisSecond, err := e.secondLimits.IncrKey(ctx, action.Key.String(), amount, action.Time)
+ if err != nil {
+ // on failure to contact the rate limiter, we just error
+ return nil, err
+ }
+ if thisSecond > limitForNetwork.MaxPerSecond {
+ out.Allowed = false
+ out.ExceededNetworks = append(out.ExceededNetworks, network)
+ continue
+ }
+ }
+ return out, nil
+}
+
+type Action struct {
+ Time time.Time `json:"time"`
+ Key common.Address `json:"key"`
+ Networks map[string]int `json:"networks"`
+}
+
+type Policy struct {
+ Bucket string `json:"bucket"`
+ Limits Limits `json:"limits"`
+}
+
+type Limits struct {
+ Networks map[string]Rule `json:"networks"`
+}
+
+type Rule struct {
+ MaxPerSecond int `json:"max_per_second"`
+ MaxPerMinute int `json:"max_per_minute"`
+}
diff --git a/node/pkg/query/querystaking/evm.go b/node/pkg/query/querystaking/evm.go
new file mode 100644
index 0000000000..117a566060
--- /dev/null
+++ b/node/pkg/query/querystaking/evm.go
@@ -0,0 +1,37 @@
+package querystaking
+
+import (
+ "errors"
+ "fmt"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/holiman/uint256"
+)
+
+type StakeAndSigner struct {
+ StakeInfo *StakeInfo `abi:"StakeInfo"`
+ Signer common.Address `abi:"Signer"`
+}
+
+type StakeInfo struct {
+ Amount *uint256.Int `abi:"uint256"`
+ ConversionTableIndex *uint256.Int `abi:"uint256"`
+ LockupEnd uint64 `abi:"uint64"`
+ AccessEnd uint64 `abi:"uint64"`
+}
+
+var ErrInvalidLength = errors.New("invalid length")
+
+func ParseStakeInfo(data []byte) (*StakeInfo, error) {
+ empty := &StakeInfo{}
+
+ if len(data) != 32*4 {
+ return nil, fmt.Errorf("invalid length: got %d want %d", len(data), 32*4)
+ }
+ empty.Amount = uint256.NewInt(0).SetBytes(data[0:32])
+ empty.ConversionTableIndex = uint256.NewInt(0).SetBytes(data[32:64])
+ tmp := uint256.NewInt(0)
+ empty.LockupEnd = tmp.SetBytes(data[64:72]).Uint64()
+ empty.AccessEnd = tmp.SetBytes(data[72:80]).Uint64()
+ return empty, nil
+}
From 5517533ad0961ba1b29b29e03faeabd3a978e598 Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 14:49:42 -0500
Subject: [PATCH 02/12] noot
---
node/pkg/query/queryratelimit/ratelimit.go | 1 -
1 file changed, 1 deletion(-)
diff --git a/node/pkg/query/queryratelimit/ratelimit.go b/node/pkg/query/queryratelimit/ratelimit.go
index 4a47e1e947..5df167b25b 100644
--- a/node/pkg/query/queryratelimit/ratelimit.go
+++ b/node/pkg/query/queryratelimit/ratelimit.go
@@ -79,7 +79,6 @@ type Action struct {
}
type Policy struct {
- Bucket string `json:"bucket"`
Limits Limits `json:"limits"`
}
From 15a3f81b9d6bc6a68501e30af90b5b681a9d8532 Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 14:51:50 -0500
Subject: [PATCH 03/12] cache duration
---
node/pkg/query/queryratelimit/policyprovider.go | 13 +++++++------
1 file changed, 7 insertions(+), 6 deletions(-)
diff --git a/node/pkg/query/queryratelimit/policyprovider.go b/node/pkg/query/queryratelimit/policyprovider.go
index 369bd5f98c..7d475804bb 100644
--- a/node/pkg/query/queryratelimit/policyprovider.go
+++ b/node/pkg/query/queryratelimit/policyprovider.go
@@ -6,7 +6,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/common"
- lru "github.com/hashicorp/golang-lru/v2"
+ lru "github.com/hashicorp/golang-lru"
"go.uber.org/zap"
"golang.org/x/sync/singleflight"
)
@@ -19,7 +19,7 @@ type PolicyProvider struct {
parentContext context.Context
logger *zap.Logger
- cache *lru.Cache[common.Address, withExpiry[*Policy]]
+ cache *lru.Cache
sf singleflight.Group
}
@@ -38,7 +38,7 @@ func WithPolicyProviderFetcher(fetcher func(ctx context.Context, key common.Addr
}
}
-func WithPolicyProviderCache(cache *lru.Cache[common.Address, withExpiry[*Policy]]) PolicyProviderOption {
+func WithPolicyProviderCache(cache *lru.Cache) PolicyProviderOption {
return func(p *PolicyProvider) {
p.cache = cache
}
@@ -76,7 +76,7 @@ func NewPolicyProvider(ops ...PolicyProviderOption) (*PolicyProvider, error) {
op(o)
}
if o.cache == nil {
- lru, err := lru.New[common.Address, withExpiry[*Policy]](1024)
+ lru, err := lru.New(1024)
if err != nil {
return nil, err
}
@@ -90,8 +90,9 @@ func NewPolicyProvider(ops ...PolicyProviderOption) (*PolicyProvider, error) {
}
func (r *PolicyProvider) GetPolicy(ctx context.Context, key common.Address) (*Policy, error) {
- val, hit := r.cache.Get(key)
+ ival, hit := r.cache.Get(key)
if hit {
+ val := ival.(withExpiry[*Policy])
if time.Now().After(val.expiresAt) {
r.cache.Remove(key)
}
@@ -119,7 +120,7 @@ func (r *PolicyProvider) fetchAndFill(ctx context.Context, key common.Address) (
}
r.cache.Add(key, withExpiry[*Policy]{
v: policy,
- expiresAt: time.Now().Add(time.Duration(policy.Limits.Evm.MaxPerMinute) * time.Minute),
+ expiresAt: time.Now().Add(r.cacheDuration),
})
return policy, nil
})
From 7621a1b85bd8088fbd024ceed77740a825566485 Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 14:51:54 -0500
Subject: [PATCH 04/12] noot
---
node/go.mod | 4 ++--
1 file changed, 2 insertions(+), 2 deletions(-)
diff --git a/node/go.mod b/node/go.mod
index 37dcf62dca..252dd12aa2 100644
--- a/node/go.mod
+++ b/node/go.mod
@@ -55,7 +55,6 @@ require (
github.com/grafana/dskit v0.0.0-20230201083518-528d8a7d52f2
github.com/grafana/loki v1.6.2-0.20230721141808-0d81144cfee8
github.com/hashicorp/golang-lru v0.6.0
- github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/holiman/uint256 v1.2.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.60.0
@@ -63,6 +62,7 @@ require (
github.com/wormhole-foundation/wormchain v0.0.0-00010101000000-000000000000
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20220926172624-4b38dc650bb0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
+ golang.org/x/sync v0.8.0
google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e
gopkg.in/godo.v2 v2.0.9
nhooyr.io/websocket v1.8.7
@@ -190,6 +190,7 @@ require (
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
+ github.com/hashicorp/golang-lru/v2 v2.0.7 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/hashicorp/memberlist v0.5.0 // indirect
github.com/hashicorp/serf v0.10.1 // indirect
@@ -357,7 +358,6 @@ require (
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
- golang.org/x/sync v0.8.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/tools v0.26.0 // indirect
From b4c7d5dae683d62281a87f174a2b2485da39644a Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 14:58:02 -0500
Subject: [PATCH 05/12] reorganize
---
node/pkg/query/queryratelimit/enforcer.go | 72 ++++++++++++++++++++++
node/pkg/query/queryratelimit/ratelimit.go | 66 --------------------
2 files changed, 72 insertions(+), 66 deletions(-)
create mode 100644 node/pkg/query/queryratelimit/enforcer.go
diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go
new file mode 100644
index 0000000000..4f1c50eaa6
--- /dev/null
+++ b/node/pkg/query/queryratelimit/enforcer.go
@@ -0,0 +1,72 @@
+package queryratelimit
+
+import (
+ "context"
+ "sync"
+ "time"
+
+ "github.com/certusone/wormhole/node/pkg/query/narasu"
+)
+
+type withExpiry[T any] struct {
+ v T
+ expiresAt time.Time
+}
+
+type Enforcer struct {
+ secondLimits narasu.Store
+ minuteLimits narasu.Store
+ mu sync.Mutex
+
+ enforcementCount int
+}
+
+func NewEnforcer() *Enforcer {
+ return &Enforcer{
+ secondLimits: narasu.NewMemoryStore(time.Second),
+ minuteLimits: narasu.NewMemoryStore(time.Minute),
+ }
+}
+
+type EnforcementResponse struct {
+ Allowed bool
+ ExceededNetworks []string
+}
+
+func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Action) (*EnforcementResponse, error) {
+ e.mu.Lock()
+ defer e.mu.Unlock()
+ e.enforcementCount++
+ // TODO(elee): we can probably tune these variable better, if memory consumption or cpu usage of the ratelimiter ever becomes an issue
+ if e.enforcementCount > 1024 {
+ e.enforcementCount = 0
+ e.secondLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
+ e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
+ }
+ out := &EnforcementResponse{
+ Allowed: true,
+ ExceededNetworks: []string{},
+ }
+ for network, amount := range action.Networks {
+ if amount == 0 {
+ continue
+ }
+ limitForNetwork, ok := policy.Limits.Networks[network]
+ if !ok {
+ out.Allowed = false
+ out.ExceededNetworks = append(out.ExceededNetworks, network)
+ continue
+ }
+ thisSecond, err := e.secondLimits.IncrKey(ctx, action.Key.String(), amount, action.Time)
+ if err != nil {
+ // on failure to contact the rate limiter, we just error
+ return nil, err
+ }
+ if thisSecond > limitForNetwork.MaxPerSecond {
+ out.Allowed = false
+ out.ExceededNetworks = append(out.ExceededNetworks, network)
+ continue
+ }
+ }
+ return out, nil
+}
diff --git a/node/pkg/query/queryratelimit/ratelimit.go b/node/pkg/query/queryratelimit/ratelimit.go
index 5df167b25b..cbf409f660 100644
--- a/node/pkg/query/queryratelimit/ratelimit.go
+++ b/node/pkg/query/queryratelimit/ratelimit.go
@@ -1,77 +1,11 @@
package queryratelimit
import (
- "context"
- "sync"
"time"
- "github.com/certusone/wormhole/node/pkg/query/narasu"
"github.com/ethereum/go-ethereum/common"
)
-type withExpiry[T any] struct {
- v T
- expiresAt time.Time
-}
-
-type Enforcer struct {
- secondLimits narasu.Store
- minuteLimits narasu.Store
- mu sync.Mutex
-
- enforcementCount int
-}
-
-func NewEnforcer() *Enforcer {
- return &Enforcer{
- secondLimits: narasu.NewMemoryStore(time.Second),
- minuteLimits: narasu.NewMemoryStore(time.Minute),
- }
-}
-
-type EnforcementResponse struct {
- Allowed bool
- ExceededNetworks []string
-}
-
-func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Action) (*EnforcementResponse, error) {
- e.mu.Lock()
- defer e.mu.Unlock()
- e.enforcementCount++
- // TODO(elee): we can probably tune these variable better, if memory consumption or cpu usage of the ratelimiter ever becomes an issue
- if e.enforcementCount > 1024 {
- e.enforcementCount = 0
- e.secondLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
- e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
- }
- out := &EnforcementResponse{
- Allowed: true,
- ExceededNetworks: []string{},
- }
- for network, amount := range action.Networks {
- if amount == 0 {
- continue
- }
- limitForNetwork, ok := policy.Limits.Networks[network]
- if !ok {
- out.Allowed = false
- out.ExceededNetworks = append(out.ExceededNetworks, network)
- continue
- }
- thisSecond, err := e.secondLimits.IncrKey(ctx, action.Key.String(), amount, action.Time)
- if err != nil {
- // on failure to contact the rate limiter, we just error
- return nil, err
- }
- if thisSecond > limitForNetwork.MaxPerSecond {
- out.Allowed = false
- out.ExceededNetworks = append(out.ExceededNetworks, network)
- continue
- }
- }
- return out, nil
-}
-
type Action struct {
Time time.Time `json:"time"`
Key common.Address `json:"key"`
From cdad51c5bef7d70f14aaf8a76d81015fc662cb53 Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 14:58:48 -0500
Subject: [PATCH 06/12] it should be a modulo
---
node/pkg/query/queryratelimit/enforcer.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go
index 4f1c50eaa6..ceadd920a2 100644
--- a/node/pkg/query/queryratelimit/enforcer.go
+++ b/node/pkg/query/queryratelimit/enforcer.go
@@ -38,7 +38,7 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac
defer e.mu.Unlock()
e.enforcementCount++
// TODO(elee): we can probably tune these variable better, if memory consumption or cpu usage of the ratelimiter ever becomes an issue
- if e.enforcementCount > 1024 {
+ if e.enforcementCount%1024 == 0 {
e.enforcementCount = 0
e.secondLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
From d15d64e9335c41cf18e992f98b83e233c9b091a8 Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 15:30:07 -0500
Subject: [PATCH 07/12] use btree
---
node/go.mod | 3 +-
node/go.sum | 2 --
node/pkg/query/narasu/memorystore.go | 50 ++++++++++++++++------------
3 files changed, 29 insertions(+), 26 deletions(-)
diff --git a/node/go.mod b/node/go.mod
index 252dd12aa2..75e27c10f7 100644
--- a/node/go.mod
+++ b/node/go.mod
@@ -51,6 +51,7 @@ require (
github.com/cosmos/cosmos-sdk v0.45.11
github.com/go-kit/kit v0.12.0
github.com/golang/snappy v0.0.4
+ github.com/google/btree v1.1.2
github.com/google/uuid v1.6.0
github.com/grafana/dskit v0.0.0-20230201083518-528d8a7d52f2
github.com/grafana/loki v1.6.2-0.20230721141808-0d81144cfee8
@@ -58,7 +59,6 @@ require (
github.com/holiman/uint256 v1.2.1
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.60.0
- github.com/tidwall/btree v1.7.0
github.com/wormhole-foundation/wormchain v0.0.0-00010101000000-000000000000
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20220926172624-4b38dc650bb0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
@@ -165,7 +165,6 @@ require (
github.com/golang/glog v1.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
- github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v1.12.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
diff --git a/node/go.sum b/node/go.sum
index 07618cb2b2..2e381e1c69 100644
--- a/node/go.sum
+++ b/node/go.sum
@@ -2984,8 +2984,6 @@ github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmN
github.com/tetafro/godot v0.3.7/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0=
github.com/tetafro/godot v0.4.2/go.mod h1:/7NLHhv08H1+8DNj0MElpAACw1ajsCuf3TKNQxA5S+0=
github.com/tetafro/godot v1.4.11/go.mod h1:LR3CJpxDVGlYOWn3ZZg1PgNZdTUvzsZWu8xaEohUpn8=
-github.com/tidwall/btree v1.7.0 h1:L1fkJH/AuEh5zBnnBbmTwQ5Lt+bRJ5A8EWecslvo9iI=
-github.com/tidwall/btree v1.7.0/go.mod h1:twD9XRA5jj9VUQGELzDO4HPQTNJsoWWfYEL+EUQ2cKY=
github.com/tidwall/gjson v1.6.7/go.mod h1:zeFuBCIqD4sN/gmqBzZ4j7Jd6UcA2Fc56x7QFsv+8fI=
github.com/tidwall/gjson v1.9.3/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk=
github.com/tidwall/gjson v1.15.0 h1:5n/pM+v3r5ujuNl4YLZLsQ+UE5jlkLVm7jMzT5Mpolw=
diff --git a/node/pkg/query/narasu/memorystore.go b/node/pkg/query/narasu/memorystore.go
index a4bf2c2644..bd5d096a62 100644
--- a/node/pkg/query/narasu/memorystore.go
+++ b/node/pkg/query/narasu/memorystore.go
@@ -5,17 +5,18 @@ import (
"sync"
"time"
- "github.com/tidwall/btree"
+ "github.com/google/btree"
)
type MemoryStore struct {
- tree *btree.Map[int, *memoryStoreItem]
+ tree *btree.BTreeG[*memoryStoreItem]
mapPool sync.Pool
interval time.Duration
}
type memoryStoreItem struct {
- m map[string]int
+ ts int
+ m map[string]int
}
func (m *memoryStoreItem) Clear() {
@@ -26,7 +27,9 @@ func (m *memoryStoreItem) Clear() {
func NewMemoryStore(interval time.Duration) *MemoryStore {
c := &MemoryStore{
- tree: btree.NewMap[int, *memoryStoreItem](8),
+ tree: btree.NewG(8, func(a, b *memoryStoreItem) bool {
+ return a.ts < b.ts
+ }),
interval: interval,
mapPool: sync.Pool{
New: func() any {
@@ -43,7 +46,7 @@ func (s *MemoryStore) Close() error {
return nil
}
-func (s *MemoryStore) getMap() *memoryStoreItem {
+func (s *MemoryStore) getMap(key int) *memoryStoreItem {
v := s.mapPool.Get()
if v == nil {
return &memoryStoreItem{
@@ -51,6 +54,7 @@ func (s *MemoryStore) getMap() *memoryStoreItem {
}
}
item := v.(*memoryStoreItem)
+ item.ts = key
item.Clear()
return item
}
@@ -65,10 +69,10 @@ func (s *MemoryStore) time(cur time.Time) int {
func (s *MemoryStore) IncrKey(ctx context.Context, bucket string, amount int, cur time.Time) (int, error) {
now := s.time(cur)
- val, ok := s.tree.Get(now)
+ val, ok := s.tree.Get(&memoryStoreItem{ts: now})
if !ok {
- n := s.getMap()
- s.tree.Set(now, n)
+ n := s.getMap(now)
+ s.tree.ReplaceOrInsert(n)
val = n
}
if _, ok := val.m[bucket]; !ok {
@@ -83,15 +87,18 @@ func (s *MemoryStore) GetKeys(ctx context.Context, bucket string, from time.Time
out := make([]int, 0)
toseconds := s.time(to)
fromseconds := s.time(from)
- s.tree.Ascend(fromseconds, func(key int, val *memoryStoreItem) bool {
- if key > toseconds {
- return false
- }
- if count, ok := val.m[bucket]; ok {
- out = append(out, count)
- }
- return true
- })
+ s.tree.AscendRange(
+ &memoryStoreItem{ts: fromseconds},
+ &memoryStoreItem{ts: toseconds},
+ func(val *memoryStoreItem) bool {
+ if val.ts > toseconds {
+ return false
+ }
+ if count, ok := val.m[bucket]; ok {
+ out = append(out, count)
+ }
+ return true
+ })
return out, nil
}
@@ -100,18 +107,17 @@ func (s *MemoryStore) Cleanup(ctx context.Context, now time.Time, age time.Durat
nowseconds := int(now.Unix())
ageSeconds := int(age.Seconds())
func() {
- s.tree.Ascend(0, func(key int, val *memoryStoreItem) bool {
+ s.tree.Ascend(func(val *memoryStoreItem) bool {
// extract the timestamp from the key timestamp:bucket
- ts := key
- if nowseconds-ts >= ageSeconds {
- expired = append(expired, key)
+ if nowseconds-val.ts >= ageSeconds {
+ expired = append(expired, val.ts)
return true
}
return false
})
}()
for _, key := range expired {
- item, ok := s.tree.Delete(key)
+ item, ok := s.tree.Delete(&memoryStoreItem{ts: key})
if ok {
s.putMap(item)
}
From 889fe199f58b583a05dfcbb810bf489229247ddd Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 15:42:52 -0500
Subject: [PATCH 08/12] noot
---
node/pkg/query/queryratelimit/policyprovider.go | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git a/node/pkg/query/queryratelimit/policyprovider.go b/node/pkg/query/queryratelimit/policyprovider.go
index 7d475804bb..4e7a1c1b22 100644
--- a/node/pkg/query/queryratelimit/policyprovider.go
+++ b/node/pkg/query/queryratelimit/policyprovider.go
@@ -14,6 +14,7 @@ import (
// TODO(elee): this should really be an interface where the seprate parts are split out, ala, one for fetching, one for ttl cache.
type PolicyProvider struct {
fetcher func(ctx context.Context, key common.Address) (*Policy, error)
+ fetchTimeout time.Duration
cacheDuration time.Duration
optimistic bool
parentContext context.Context
@@ -61,12 +62,18 @@ func WithPolicyProviderParentContext(ctx context.Context) PolicyProviderOption {
p.parentContext = ctx
}
}
+func WithPolicyProviderFetchTimeout(timeout time.Duration) PolicyProviderOption {
+ return func(p *PolicyProvider) {
+ p.fetchTimeout = timeout
+ }
+}
var ErrNewPolicyProvider = fmt.Errorf("new rate limit policy provider")
func NewPolicyProvider(ops ...PolicyProviderOption) (*PolicyProvider, error) {
o := &PolicyProvider{
cacheDuration: time.Minute * 5,
+ fetchTimeout: time.Second * 5,
parentContext: context.Background(),
}
for _, op := range ops {
@@ -98,7 +105,7 @@ func (r *PolicyProvider) GetPolicy(ctx context.Context, key common.Address) (*Po
}
if r.optimistic {
go func() {
- ctx, cn := context.WithTimeout(r.parentContext, time.Second*5)
+ ctx, cn := context.WithTimeout(r.parentContext, r.fetchTimeout)
defer cn()
if _, err := r.fetchAndFill(ctx, key); err != nil {
if r.logger != nil {
From 7e6fe72f3950e2c4c228efe254dca2f604f75af0 Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 15:56:12 -0500
Subject: [PATCH 09/12] use type
---
node/pkg/node/options.go | 10 ++-
node/pkg/query/query.go | 87 +++++++++++-----------
node/pkg/query/queryratelimit/enforcer.go | 16 ++--
node/pkg/query/queryratelimit/ratelimit.go | 8 +-
4 files changed, 62 insertions(+), 59 deletions(-)
diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go
index 4cdaf1f288..e9f2982fd3 100644
--- a/node/pkg/node/options.go
+++ b/node/pkg/node/options.go
@@ -169,12 +169,16 @@ func GuardianOptionQueryHandler(ccqEnabled bool, allowedRequesters string) *Guar
}
return &queryratelimit.Policy{
Limits: queryratelimit.Limits{
- Networks: map[string]queryratelimit.Rule{
- "evm": {
+ Types: map[uint8]queryratelimit.Rule{
+ uint8(query.EthCallQueryRequestType): {
MaxPerMinute: 15 * 60,
MaxPerSecond: 15,
},
- "solana": {
+ uint8(query.SolanaAccountQueryRequestType): {
+ MaxPerMinute: 15 * 60,
+ MaxPerSecond: 15,
+ },
+ uint8(query.SolanaPdaQueryRequestType): {
MaxPerMinute: 15 * 60,
MaxPerSecond: 15,
},
diff --git a/node/pkg/query/query.go b/node/pkg/query/query.go
index 7e22165b79..806d0cec03 100644
--- a/node/pkg/query/query.go
+++ b/node/pkg/query/query.go
@@ -104,50 +104,49 @@ type (
PerChainConfig struct {
TimestampCacheSupported bool
NumWorkers int
- VmType string
}
)
// perChainConfig provides static config info for each chain. If a chain is not listed here, then it does not support queries.
// Every chain listed here must have at least one worker specified.
var perChainConfig = map[vaa.ChainID]PerChainConfig{
- vaa.ChainIDSolana: {VmType: "solana", NumWorkers: 10, TimestampCacheSupported: false},
- vaa.ChainIDEthereum: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true},
- vaa.ChainIDBSC: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDPolygon: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true},
- vaa.ChainIDAvalanche: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDOasis: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDAurora: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDFantom: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDKarura: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDAcala: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDKlaytn: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDCelo: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDMoonbeam: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDArbitrum: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true},
- vaa.ChainIDOptimism: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true},
- vaa.ChainIDBase: {VmType: "evm", NumWorkers: 5, TimestampCacheSupported: true},
- vaa.ChainIDScroll: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDMantle: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDBlast: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDXLayer: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDLinea: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDBerachain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDSnaxchain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDUnichain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDWorldchain: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDInk: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDHolesky: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDArbitrumSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDBaseSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDOptimismSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDPolygonSepolia: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDHyperEVM: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDMonad: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDSeiEVM: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDMezo: {VmType: "evm", NumWorkers: 1, TimestampCacheSupported: true},
- vaa.ChainIDFogo: {VmType: "evm", NumWorkers: 10, TimestampCacheSupported: true},
+ vaa.ChainIDSolana: {NumWorkers: 10, TimestampCacheSupported: false},
+ vaa.ChainIDEthereum: {NumWorkers: 5, TimestampCacheSupported: true},
+ vaa.ChainIDBSC: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDPolygon: {NumWorkers: 5, TimestampCacheSupported: true},
+ vaa.ChainIDAvalanche: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDOasis: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDAurora: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDFantom: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDKarura: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDAcala: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDKlaytn: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDCelo: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDMoonbeam: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDArbitrum: {NumWorkers: 5, TimestampCacheSupported: true},
+ vaa.ChainIDOptimism: {NumWorkers: 5, TimestampCacheSupported: true},
+ vaa.ChainIDBase: {NumWorkers: 5, TimestampCacheSupported: true},
+ vaa.ChainIDScroll: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDMantle: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDBlast: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDXLayer: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDLinea: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDBerachain: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDSnaxchain: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDUnichain: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDWorldchain: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDInk: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDHolesky: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDArbitrumSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDBaseSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDOptimismSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDPolygonSepolia: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDHyperEVM: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDMonad: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDSeiEVM: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDMezo: {NumWorkers: 1, TimestampCacheSupported: true},
+ vaa.ChainIDFogo: {NumWorkers: 10, TimestampCacheSupported: true},
}
// GetPerChainConfig returns the config for the specified chain. If the chain is not configured it returns an empty struct,
@@ -268,7 +267,7 @@ func handleQueryRequestsImpl(
invalidQueryRequestReceived.WithLabelValues("failed_to_get_rate_limit_policy").Inc()
}
- if len(policy.Limits.Networks) == 0 {
+ if len(policy.Limits.Types) == 0 {
qLogger.Debug("requestor has no limits / invalid requestor", zap.String("requestID", requestID))
invalidQueryRequestReceived.WithLabelValues("invalid_requestor").Inc()
continue
@@ -292,9 +291,9 @@ func handleQueryRequestsImpl(
// the signer signed this! so they should be punished for sending bad requests.
// if they send a chain that is not supported, we will just drop it for now, but mayvbe we should rate limit them for the valid ones they sent. are we that evil ?
action := &queryratelimit.Action{
- Key: signerAddress,
- Time: time.Now(),
- Networks: make(map[string]int),
+ Key: signerAddress,
+ Time: time.Now(),
+ Types: make(map[uint8]int),
}
if ok := func() bool {
for _, pcq := range queryRequest.PerChainQueries {
@@ -305,7 +304,7 @@ func handleQueryRequestsImpl(
invalidQueryRequestReceived.WithLabelValues("chain_does_not_support_ccq").Inc()
return false
}
- action.Networks[config.VmType] += 1
+ action.Types[uint8(pcq.Query.Type())] += 1
}
return true
}(); !ok {
@@ -319,7 +318,7 @@ func handleQueryRequestsImpl(
continue
}
if !limitResult.Allowed {
- qLogger.Warn("rate limit exceeded", zap.String("requestID", requestID), zap.Any("networks", limitResult.ExceededNetworks))
+ qLogger.Warn("rate limit exceeded", zap.String("requestID", requestID), zap.Any("types", limitResult.ExceededTypes))
invalidQueryRequestReceived.WithLabelValues("rate_limit_exceeded").Inc()
continue
}
diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go
index ceadd920a2..b49bddf139 100644
--- a/node/pkg/query/queryratelimit/enforcer.go
+++ b/node/pkg/query/queryratelimit/enforcer.go
@@ -29,8 +29,8 @@ func NewEnforcer() *Enforcer {
}
type EnforcementResponse struct {
- Allowed bool
- ExceededNetworks []string
+ Allowed bool `json:"allowed"`
+ ExceededTypes []uint8 `json:"exceeded_types"`
}
func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Action) (*EnforcementResponse, error) {
@@ -44,17 +44,17 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac
e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
}
out := &EnforcementResponse{
- Allowed: true,
- ExceededNetworks: []string{},
+ Allowed: true,
+ ExceededTypes: []uint8{},
}
- for network, amount := range action.Networks {
+ for network, amount := range action.Types {
if amount == 0 {
continue
}
- limitForNetwork, ok := policy.Limits.Networks[network]
+ limitForNetwork, ok := policy.Limits.Types[network]
if !ok {
out.Allowed = false
- out.ExceededNetworks = append(out.ExceededNetworks, network)
+ out.ExceededTypes = append(out.ExceededTypes, network)
continue
}
thisSecond, err := e.secondLimits.IncrKey(ctx, action.Key.String(), amount, action.Time)
@@ -64,7 +64,7 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac
}
if thisSecond > limitForNetwork.MaxPerSecond {
out.Allowed = false
- out.ExceededNetworks = append(out.ExceededNetworks, network)
+ out.ExceededTypes = append(out.ExceededTypes, network)
continue
}
}
diff --git a/node/pkg/query/queryratelimit/ratelimit.go b/node/pkg/query/queryratelimit/ratelimit.go
index cbf409f660..f0975aa8d7 100644
--- a/node/pkg/query/queryratelimit/ratelimit.go
+++ b/node/pkg/query/queryratelimit/ratelimit.go
@@ -7,9 +7,9 @@ import (
)
type Action struct {
- Time time.Time `json:"time"`
- Key common.Address `json:"key"`
- Networks map[string]int `json:"networks"`
+ Time time.Time `json:"time"`
+ Key common.Address `json:"key"`
+ Types map[uint8]int `json:"networks"`
}
type Policy struct {
@@ -17,7 +17,7 @@ type Policy struct {
}
type Limits struct {
- Networks map[string]Rule `json:"networks"`
+ Types map[uint8]Rule `json:"types"`
}
type Rule struct {
From 0c496e61c9b72aa20678e8d1367e743613718180 Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 15:59:30 -0500
Subject: [PATCH 10/12] correct key
---
node/pkg/query/queryratelimit/enforcer.go | 14 ++++++++------
1 file changed, 8 insertions(+), 6 deletions(-)
diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go
index b49bddf139..00458c800c 100644
--- a/node/pkg/query/queryratelimit/enforcer.go
+++ b/node/pkg/query/queryratelimit/enforcer.go
@@ -2,6 +2,7 @@ package queryratelimit
import (
"context"
+ "strconv"
"sync"
"time"
@@ -47,24 +48,25 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac
Allowed: true,
ExceededTypes: []uint8{},
}
- for network, amount := range action.Types {
+ for queryType, amount := range action.Types {
if amount == 0 {
continue
}
- limitForNetwork, ok := policy.Limits.Types[network]
+ limitForQueryType, ok := policy.Limits.Types[queryType]
if !ok {
out.Allowed = false
- out.ExceededTypes = append(out.ExceededTypes, network)
+ out.ExceededTypes = append(out.ExceededTypes, queryType)
continue
}
- thisSecond, err := e.secondLimits.IncrKey(ctx, action.Key.String(), amount, action.Time)
+ fullKey := strconv.Itoa(int(queryType)) + ":" + action.Key.String()
+ thisSecond, err := e.secondLimits.IncrKey(ctx, fullKey, amount, action.Time)
if err != nil {
// on failure to contact the rate limiter, we just error
return nil, err
}
- if thisSecond > limitForNetwork.MaxPerSecond {
+ if thisSecond > limitForQueryType.MaxPerSecond {
out.Allowed = false
- out.ExceededTypes = append(out.ExceededTypes, network)
+ out.ExceededTypes = append(out.ExceededTypes, queryType)
continue
}
}
From 133ce400fb29e910745de30e54f1b7652b66169a Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 16:00:08 -0500
Subject: [PATCH 11/12] noot
---
node/pkg/query/queryratelimit/enforcer.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go
index 00458c800c..ed1bf05659 100644
--- a/node/pkg/query/queryratelimit/enforcer.go
+++ b/node/pkg/query/queryratelimit/enforcer.go
@@ -41,7 +41,7 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac
// TODO(elee): we can probably tune these variable better, if memory consumption or cpu usage of the ratelimiter ever becomes an issue
if e.enforcementCount%1024 == 0 {
e.enforcementCount = 0
- e.secondLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
+ e.secondLimits.Cleanup(ctx, time.Now(), 5*time.Minute)
e.minuteLimits.Cleanup(ctx, time.Now(), 1*time.Hour)
}
out := &EnforcementResponse{
From a61da167114a280c910817d7df8fbab6892db893 Mon Sep 17 00:00:00 2001
From: a
Date: Mon, 21 Apr 2025 16:00:42 -0500
Subject: [PATCH 12/12] add minutely limit to check
---
node/pkg/query/queryratelimit/enforcer.go | 9 +++++++++
1 file changed, 9 insertions(+)
diff --git a/node/pkg/query/queryratelimit/enforcer.go b/node/pkg/query/queryratelimit/enforcer.go
index ed1bf05659..534b8eabbd 100644
--- a/node/pkg/query/queryratelimit/enforcer.go
+++ b/node/pkg/query/queryratelimit/enforcer.go
@@ -69,6 +69,15 @@ func (e *Enforcer) EnforcePolicy(ctx context.Context, policy *Policy, action *Ac
out.ExceededTypes = append(out.ExceededTypes, queryType)
continue
}
+ thisMinute, err := e.minuteLimits.IncrKey(ctx, fullKey, amount, action.Time)
+ if err != nil {
+ // on failure to contact the rate limiter, we just error
+ return nil, err
+ }
+ if thisMinute > limitForQueryType.MaxPerMinute {
+ out.Allowed = false
+ out.ExceededTypes = append(out.ExceededTypes, queryType)
+ }
}
return out, nil
}