8000 feat(queries): ratelimits by elee1766 · Pull Request #4359 · wormhole-foundation/wormhole · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(queries): ratelimits #4359

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 12 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions node/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ require (
github.com/cosmos/cosmos-sdk v0.45.11
github.com/go-kit/kit v0.12.0
github.com/golang/snappy v0.0.4
github.com/google/btree v1.1.2
github.com/google/uuid v1.6.0
github.com/grafana/dskit v0.0.0-20230201083518-528d8a7d52f2
github.com/grafana/loki v1.6.2-0.20230721141808-0d81144cfee8
Expand All @@ -61,6 +62,7 @@ require (
github.com/wormhole-foundation/wormchain v0.0.0-00010101000000-000000000000
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20220926172624-4b38dc650bb0
golang.org/x/exp v0.0.0-20241009180824-f66d83c29e7c
golang.org/x/sync v0.8.0
google.golang.org/genproto/googleapis/api v0.0.0-20230726155614-23370e0ffb3e
gopkg.in/godo.v2 v2.0.9
nhooyr.io/websocket v1.8.7
Expand Down Expand Up @@ -163,7 +165,6 @@ require (
github.com/golang/glog v1.1.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/flatbuffers v1.12.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/gofuzz v1.2.0 // indirect
Expand Down Expand Up @@ -356,7 +357,6 @@ require (
golang.org/x/mod v0.21.0 // indirect
golang.org/x/net v0.30.0 // indirect
golang.org/x/oauth2 v0.23.0 // indirect
golang.org/x/sync v0.8.0 // indirect
golang.org/x/term v0.25.0 // indirect
golang.org/x/text v0.19.0 // indirect
golang.org/x/tools v0.26.0 // indirect
Expand Down
55 changes: 54 additions & 1 deletion node/pkg/node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"net/http"
"strings"
"time"

"github.com/certusone/wormhole/node/pkg/accountant"
Expand All @@ -17,12 +18,14 @@ import (
"github.com/certusone/wormhole/node/pkg/processor"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/query"
"github.com/certusone/wormhole/node/pkg/query/queryratelimit"
"github.com/certusone/wormhole/node/pkg/readiness"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/watchers"
"github.com/certusone/wormhole/node/pkg/watchers/ibc"
"github.com/certusone/wormhole/node/pkg/watchers/interfaces"
"github.com/certusone/wormhole/node/pkg/wormconn"
ethCommon "github.com/ethereum/go-ethereum/common"
"github.com/gorilla/mux"
libp2p_crypto "github.com/libp2p/go-libp2p/core/crypto"
"github.com/prometheus/client_golang/prometheus/promhttp"
Expand Down Expand Up @@ -139,10 +142,60 @@ func GuardianOptionQueryHandler(ccqEnabled bool, allowedRequesters string) *Guar
return nil
}

enforcer := queryratelimit.NewEnforcer()
if allowedRequesters == "" {
return fmt.Errorf("if cross chain query is enabled `--ccqAllowedRequesters` must be specified")
}
var nullAddr ethCommon.Address
result := make(map[ethCommon.Address]struct{})
for _, str := range strings.Split(allowedRequesters, ",") {
addr := ethCommon.BytesToAddress(ethCommon.Hex2Bytes(strings.TrimPrefix(str, "0x")))
if addr == nullAddr {
return fmt.Errorf("invalid value in `--ccqAllowedRequesters`: `%s`", str)
}
result[addr] = struct{}{}
}
if len(result) <= 0 {
return fmt.Errorf("no allowed requestors specified, ccqAllowedRequesters: `%s`", allowedRequesters)
}

policyProvider, err := queryratelimit.NewPolicyProvider(
queryratelimit.WithPolicyProviderLogger(logger),
queryratelimit.WithPolicyProviderCacheDuration(24*time.Hour),
queryratelimit.WithPolicyProviderFetcher(func(ctx context.Context, key ethCommon.Address) (*queryratelimit.Policy, error) {
_, ok := result[key]
if !ok {
return &queryratelimit.Policy{}, nil
}
return &queryratelimit.Policy{
Limits: queryratelimit.Limits{
Types: map[uint8]queryratelimit.Rule{
uint8(query.EthCallQueryRequestType): {
MaxPerMinute: 15 * 60,
MaxPerSecond: 15,
},
uint8(query.SolanaAccountQueryRequestType): {
MaxPerMinute: 15 * 60,
MaxPerSecond: 15,
},
uint8(query.SolanaPdaQueryRequestType): {
MaxPerMinute: 15 * 60,
MaxPerSecond: 15,
},
},
},
}, nil
}),
)
if err != nil {
return fmt.Errorf("failed to create rate limit policy provider: %w", err)
}

g.queryHandler = query.NewQueryHandler(
logger,
g.env,
allowedRequesters,
enforcer,
policyProvider,
g.signedQueryReqC.readC,
g.chainQueryReqC,
g.queryResponseC.readC,
Expand Down
126 changes: 126 additions & 0 deletions node/pkg/query/narasu/memorystore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
package narasu

import (
"context"
"sync"
"time"

"github.com/google/btree"
)

type MemoryStore struct {
tree *btree.BTreeG[*memoryStoreItem]
mapPool sync.Pool
interval time.Duration
}

type memoryStoreItem struct {
ts int
m map[string]int
}

func (m *memoryStoreItem) Clear() {
for k := range m.m {
delete(m.m, k)
}
}

func NewMemoryStore(interval time.Duration) *MemoryStore {
c := &MemoryStore{
tree: btree.NewG(8, func(a, b *memoryStoreItem) bool {
return a.ts < b.ts
}),
interval: interval,
mapPool: sync.Pool{
New: func() any {
return &memoryStoreItem{
m: make(map[string]int),
}
},
},
}
return c
}

func (s *MemoryStore) Close() error {
return nil
}

func (s *MemoryStore) getMap(key int) *memoryStoreItem {
v := s.mapPool.Get()
if v == nil {
return &memoryStoreItem{
m: make(map[string]int),
}
}
item := v.(*memoryStoreItem)
item.ts = key
item.Clear()
return item
}

func (s *MemoryStore) putMap(m *memoryStoreItem) {
s.mapPool.Put(m)
}

func (s *MemoryStore) time(cur time.Time) int {
return int(cur.Truncate(s.interval).Unix())
}

func (s *MemoryStore) IncrKey(ctx context.Context, bucket string, amount int, cur time.Time) (int, error) {
now := s.time(cur)
val, ok := s.tree.Get(&memoryStoreItem{ts: now})
if !ok {
n := s.getMap(now)
s.tree.ReplaceOrInsert(n)
val = n
}
if _, ok := val.m[bucket]; !ok {
val.m[bucket] = amount
} else {
val.m[bucket] = val.m[bucket] + amount
}
return val.m[bucket], nil
}

func (s *MemoryStore) GetKeys(ctx context.Context, bucket string, from time.Time, to time.Time) ([]int, error) {
out := make([]int, 0)
toseconds := s.time(to)
fromseconds := s.time(from)
s.tree.AscendRange(
&memoryStoreItem{ts: fromseconds},
&memoryStoreItem{ts: toseconds},
func(val *memoryStoreItem) bool {
if val.ts > toseconds {
return false
}
if count, ok := val.m[bucket]; ok {
out = append(out, count)
}
return true
})
return out, nil
}

func (s *MemoryStore) Cleanup(ctx context.Context, now time.Time, age time.Duration) error {
var expired []int
nowseconds := int(now.Unix())
ageSeconds := int(age.Seconds())
func() {
s.tree.Ascend(func(val *memoryStoreItem) bool {
// extract the timestamp from the key timestamp:bucket
if nowseconds-val.ts >= ageSeconds {
expired = append(expired, val.ts)
return true
}
return false
})
}()
for _, key := range expired {
item, ok := s.tree.Delete(&memoryStoreItem{ts: key})
if ok {
s.putMap(item)
}
}
return nil
}
95 changes: 95 additions & 0 deletions node/pkg/query/narasu/memorystore_test.go
F438
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
package narasu_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/certusone/wormhole/node/pkg/query/narasu"
"github.com/stretchr/testify/require"
)

var bucketNames []string

func init() {
for i := range 32 {
bucketNames = append(bucketNames, fmt.Sprintf("test-%d", i))
}
}

func BenchmarkMemoryStore(b *testing.B) {
ctx := context.Background()
start := time.Unix(1000, 0)
end := start.Add(time.Second * 59)
bucket := "test"

b.Run("incr+clean", func(b *testing.B) {
c := narasu.NewMemoryStore(time.Second)
b.ResetTimer()
for i := 0; i < b.N; i++ {
c.IncrKey(ctx, bucketNames[0], 1, start.Add(time.Second*time.Duration(i)))
c.Cleanup(ctx, start.Add(time.Second*time.Duration(i)), 60*time.Second)
}
})
b.Run("incr+getrange", func(b *testing.B) {
c := narasu.NewMemoryStore(time.Second)
for i := range 60 {
c.IncrKey(ctx, bucket, 1, start.Add(time.Second*time.Duration(i)))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
c.GetKeys(ctx, bucket, start, end)
}
})

}

func TestMemoryStore(t *testing.T) {
ctx := context.Background()
interval := time.Minute
c := narasu.NewMemoryStore(interval)
start := time.Unix(0, 0)
end := start.Add(interval * 14)

bucket := "test"
var val int
var err error
val, err = c.IncrKey(ctx, bucket, 1, start)
require.NoError(t, err)
require.Equal(t, 1, val)

val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval))
require.NoError(t, err)
require.Equal(t, 1, val)

val, err = c.IncrKey(ctx, bucket, 1, start.Add(time.Duration(float64(interval)*0.5)))
require.NoError(t, err)
require.Equal(t, 2, val)

val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval*2))
require.NoError(t, err)
require.Equal(t, 1, val)

val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval*10))
require.NoError(t, err)
require.Equal(t, 1, val)

val, err = c.IncrKey(ctx, bucket, 1, start.Add(interval*12))
require.NoError(t, err)
require.Equal(t, 1, val)

xs, err := c.GetKeys(ctx, bucket, start, end)
require.NoError(t, err)
require.ElementsMatch(t, []int{
1, 2, 1, 1, 1}, xs)

err = c.Cleanup(ctx, end, 8*interval)
require.NoError(t, err)

xs, err = c.GetKeys(ctx, bucket, start, end)
require.NoError(t, err)
require.ElementsMatch(t, []int{
1, 1,
}, xs)
}
12 changes: 12 additions & 0 deletions node/pkg/query/narasu/store.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package narasu

import (
"context"
"time"
)

type Store interface {
IncrKey(ctx context.Context, bucket string, amount int, cur time.Time) (int, error)
GetKeys(ctx context.Context, bucket string, from time.Time, to time.Time) ([]int, error)
Cleanup(ctx context.Context, now time.Time, age time.Duration) error
}
Loading
0