8000 feat: allow passing the topic selector store to the transport by dunglas · Pull Request #971 · dunglas/mercure · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: allow passing the topic selector store to the transport #971

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Nov 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions bolt.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (t *BoltTransport) persist(updateID string, updateJSON []byte) error {
}

// AddSubscriber adds a new subscriber to the transport.
func (t *BoltTransport) AddSubscriber(s *Subscriber) error {
func (t *BoltTransport) AddSubscriber(s *LocalSubscriber) error {
select {
case <-t.closed:
return ErrClosedTransport
Expand All @@ -226,7 +226,7 @@ func (t *BoltTransport) AddSubscriber(s *Subscriber) error {
}

// RemoveSubscriber removes a new subscriber from the transport.
func (t *BoltTransport) RemoveSubscriber(s *Subscriber) error {
func (t *BoltTransport) RemoveSubscriber(s *LocalSubscriber) error {
select {
case <-t.closed:
return ErrClosedTransport
Expand All @@ -249,7 +249,7 @@ func (t *BoltTransport) GetSubscribers() (string, []*Subscriber, error) {
}

//nolint:gocognit
func (t *BoltTransport) dispatchHistory(s *Subscriber, toSeq uint64) error {
func (t *BoltTransport) dispatchHistory(s *LocalSubscriber, toSeq uint64) error {
err := t.db.View(func(tx *bolt.Tx) error {
b := tx.Bucket([]byte(t.bucketName))
if b == nil {
Expand Down Expand Up @@ -311,7 +311,7 @@ func (t *BoltTransport) Close() (err error) {
t.Lock()
defer t.Unlock()

t.subscribers.Walk(0, func(s *Subscriber) bool {
t.subscribers.Walk(0, func(s *LocalSubscriber) bool {
s.Disconnect()

return true
Expand Down
28 changes: 14 additions & 14 deletions bolt_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func TestBoltTransportHistory(t *testing.T) {
})
}

s := NewSubscriber("8", transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber("8", transport.logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)

require.NoError(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -68,7 +68,7 @@ func TestBoltTransportLogsBogusLastEventID(t *testing.T) {
Topics: topics,
})

s := NewSubscriber("711131", logger, &TopicSelectorStore{})
s := NewLocalSubscriber("711131", logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)

require.NoError(t, transport.AddSubscriber(s))
Expand All @@ -87,7 +87,7 @@ func TestBoltTopicSelectorHistory(t *testing.T) {
transport.Dispatch(&Update{Topics: []string{"http://example.com/subscribed-public-only"}, Private: true, Event: Event{ID: "3"}})
transport.Dispatch(&Update{Topics: []string{"http://example.com/subscribed-public-only"}, Event: Event{ID: "4"}})

s := NewSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s.SetTopics([]string{"http://example.com/subscribed", "http://example.com/subscribed-public-only"}, []string{"http://example.com/subscribed"})

require.NoError(t, transport.AddSubscriber(s))
Expand All @@ -109,7 +109,7 @@ func TestBoltTransportRetrieveAllHistory(t *testing.T) {
})
}

s := NewSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber(EarliestLastEventID, transport.logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -139,7 +139,7 @@ func TestBoltTransportHistoryAndLive(t *testing.T) {
})
}

s := NewSubscriber("8", transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber("8", transport.logger, &TopicSelectorStore{})
s.SetTopics(topics, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand Down Expand Up @@ -221,7 +221,7 @@ func TestBoltTransportDoNotDispatchUntilListen(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
require.NoError(t, transport.AddSubscriber(s))

var wg sync.WaitGroup
Expand All @@ -245,7 +245,7 @@ func TestBoltTransportDispatch(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
s.SetTopics([]string{"https://example.com/foo", "https://example.com/private"}, []string{"https://example.com/private"})

require.NoError(t, transport.AddSubscriber(s))
Expand Down Expand Up @@ -274,7 +274,7 @@ func TestBoltTransportClosed(t *testing.T) {
defer os.Remove("test.db")
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
s.SetTopics([]string{"https://example.com/foo"}, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -293,11 +293,11 @@ func TestBoltCleanDisconnectedSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

s1 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s1 := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
s1.SetTopics([]string{"foo"}, []string{})
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s2 := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
s2.SetTopics([]string{"foo"}, []string{})
require.NoError(t, transport.AddSubscriber(s2))

Expand All @@ -318,19 +318,19 @@ func TestBoltGetSubscribers(t *testing.T) {
defer transport.Close()
defer os.Remove("test.db")

s1 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s1 := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", transport.logger, &TopicSelectorStore{})
s2 := NewLocalSubscriber("", transport.logger, &TopicSelectorStore{})
require.NoError(t, transport.AddSubscriber(s2))

lastEventID, subscribers, err := transport.GetSubscribers()
require.NoError(t, err)

assert.Equal(t, EarliestLastEventID, lastEventID)
assert.Len(t, subscribers, 2)
assert.Contains(t, subscribers, s1)
assert.Contains(t, subscribers, s2)
assert.Contains(t, subscribers, &s1.Subscriber)
assert.Contains(t, subscribers, &s2.Subscriber)
}

func TestBoltLastEventID(t *testing.T) {
Expand Down
13 changes: 8 additions & 5 deletions hub.go
Original file line number Diff line number Diff line change
Expand Up @@ -324,11 +324,6 @@ func NewHub(options ...Option) (*Hub, error) {
opt.logger = l
}

if opt.transport == nil {
t, _ := DeprecatedNewLocalTransport(nil, nil)
opt.transport = t
}

if opt.topicSelectorStore == nil {
tss, err := NewTopicSelectorStoreLRU(DefaultTopicSelectorStoreLRUMaxEntriesPerShard, DefaultTopicSelectorStoreLRUShardCount)
if err != nil {
Expand All @@ -338,6 +333,14 @@ func NewHub(options ...Option) (*Hub, error) {
opt.topicSelectorStore = tss
}

if opt.transport == nil {
opt.transport = NewLocalTransport()
}

if ttss, ok := opt.transport.(TransportTopicSelectorStore); ok {
ttss.SetTopicSelectorStore(opt.topicSelectorStore)
}

if opt.metrics == nil {
opt.metrics = NopMetrics{}
}
Expand Down
6 changes: 3 additions & 3 deletions local.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (t *LocalTransport) Dispatch(update *Update) error {
}

// AddSubscriber adds a new subscriber to the transport.
func (t *LocalTransport) AddSubscriber(s *Subscriber) error {
func (t *LocalTransport) AddSubscriber(s *LocalSubscriber) error {
select {
case <-t.closed:
return ErrClosedTransport
Expand All @@ -74,7 +74,7 @@ func (t *LocalTransport) AddSubscriber(s *Subscriber) error {
}

// RemoveSubscriber removes a subscriber from the transport.
func (t *LocalTransport) RemoveSubscriber(s *Subscriber) error {
func (t *LocalTransport) RemoveSubscriber(s *LocalSubscriber) error {
select {
case <-t.closed:
return ErrClosedTransport
Expand Down Expand Up @@ -102,7 +102,7 @@ func (t *LocalTransport) Close() (err error) {
t.Lock()
defer t.Unlock()
close(t.closed)
t.subscribers.Walk(0, func(s *Subscriber) bool {
t.subscribers.Walk(0, func(s *LocalSubscriber) bool {
s.Disconnect()

return true
Expand Down
2 changes: 1 addition & 1 deletion local_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func subBenchLocalTransport(b *testing.B, topics, concurrency, matchPct int, tes
out := make(chan *Update, 50000)
tss := &TopicSelectorStore{}
for i := 0; i < concurrency; i++ {
s := NewSubscriber("", zap.NewNop(), tss)
s := NewLocalSubscriber("", zap.NewNop(), tss)
if i%100 < matchPct {
s.SetTopics(tsMatch, nil)
} else {
Expand Down
28 changes: 14 additions & 14 deletions local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestLocalTransportDoNotDispatchUntilListen(t *testing.T) {
err := transport.Dispatch(u)
require.NoError(t, err)

s := NewSubscriber("", logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", logger, &TopicSelectorStore{})
s.SetTopics(u.Topics, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -43,7 +43,7 @@ func TestLocalTransportDispatch(t *testing.T) {
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", logger, &TopicSelectorStore{})
s.SetTopics([]string{"http://example.com/foo"}, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -60,10 +60,10 @@ func TestLocalTransportClosed(t *testing.T) {

tss := &TopicSelectorStore{}

s := NewSubscriber("", logger, tss)
s := NewLocalSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s))
require.NoError(t, transport.Close())
assert.Equal(t, transport.AddSubscriber(NewSubscriber("", logger, tss)), ErrClosedTransport)
assert.Equal(t, transport.AddSubscriber(NewLocalSubscriber("", logger, tss)), ErrClosedTransport)
assert.Equal(t, transport.Dispatch(&Update{}), ErrClosedTransport)

_, ok := <-s.out
Expand All @@ -78,10 +78,10 @@ func TestLiveCleanDisconnectedSubscribers(t *testing.T) {

tss := &TopicSelectorStore{}

s1 := NewSubscriber("", logger, tss)
s1 := NewLocalSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", logger, tss)
s2 := NewLocalSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s2))

assert.Equal(t, 2, transport.subscribers.Len())
Expand All @@ -101,7 +101,7 @@ func TestLiveReading(t *testing.T) {
defer transport.Close()
assert.Implements(t, (*Transport)(nil), transport)

s := NewSubscriber("", logger, &TopicSelectorStore{})
s := NewLocalSubscriber("", logger, &TopicSelectorStore{})
s.SetTopics([]string{"https://example.com"}, nil)
require.NoError(t, transport.AddSubscriber(s))

Expand All @@ -113,23 +113,23 @@ func TestLiveReading(t *testing.T) {
}

func TestLocalTransportGetSubscribers(t *testing.T) {
logger := zap.NewNop()
transport, _ := DeprecatedNewLocalTransport(&url.URL{Scheme: "local"}, logger)
transport := NewLocalTransport()
defer transport.Close()
require.NotNil(t, transport)

logger := zap.NewNop()
tss := &TopicSelectorStore{}

s1 := NewSubscriber("", logger, tss)
s1 := NewLocalSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s1))

s2 := NewSubscriber("", logger, tss)
s2 := NewLocalSubscriber("", logger, tss)
require.NoError(t, transport.AddSubscriber(s2))

lastEventID, subscribers, err := transport.(TransportSubscribers).GetSubscribers()
lastEventID, subscribers, err := transport.GetSubscribers()
require.NoError(t, err)
assert.Equal(t, EarliestLastEventID, lastEventID)
assert.Len(t, subscribers, 2)
assert.Contains(t, subscribers, s1)
assert.Contains(t, subscribers, s2)
assert.Contains(t, subscribers, &s1.Subscriber)
assert.Contains(t, subscribers, &s2.Subscriber)
}
Loading
Loading
0