8000 Configure server-to-client ping/pong intervals by FZambia · Pull Request #551 · centrifugal/centrifugo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Configure server-to-client ping/pong intervals #551

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Aug 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions internal/unihttpstream/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ type Config struct {
ProtocolVersion centrifuge.ProtocolVersion
// MaxRequestBodySize limits request body size.
MaxRequestBodySize int

centrifuge.PingPongConfig
}
2 changes: 1 addition & 1 deletion internal/unihttpstream/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

transport := newStreamTransport(r, protoVersion)
transport := newStreamTransport(r, protoVersion, h.config.PingPongConfig)
c, closeFn, err := centrifuge.NewClient(r.Context(), h.node, transport)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error create client", map[string]interface{}{"error": err.Error(), "transport": "uni_http_stream"}))
Expand Down
31 changes: 16 additions & 15 deletions internal/unihttpstream/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,29 @@ package unihttpstream
import (
"net/http"
"sync"
"time"

"github.com/centrifugal/centrifuge"
)

type streamTransport struct {
mu sync.Mutex
req *http.Request
messages chan []byte
disconnectCh chan *centrifuge.Disconnect
closedCh chan struct{}
closed bool
protoVersion centrifuge.ProtocolVersion
mu sync.Mutex
req *http.Request
messages chan []byte
disconnectCh chan *centrifuge.Disconnect
closedCh chan struct{}
closed bool
protoVersion centrifuge.ProtocolVersion
pingPongConfig centrifuge.PingPongConfig
}

func newStreamTransport(req *http.Request, protoVersion centrifuge.ProtocolVersion) *streamTransport {
func newStreamTransport(req *http.Request, protoVersion centrifuge.ProtocolVersion, pingPongConfig centrifuge.PingPongConfig) *streamTransport {
return &streamTransport{
messages: make(chan []byte),
disconnectCh: make(chan *centrifuge.Disconnect),
closedCh: make(chan struct{}),
req: req,
protoVersion: protoVersion,
messages: make(chan []byte),
disconnectCh: make(chan *centrifuge.Disconnect),
closedCh: make(chan struct{}),
req: req,
protoVersion: protoVersion,
pingPongConfig: pingPongConfig,
}
}

Expand Down Expand Up @@ -56,7 +57,7 @@ func (t *streamTransport) DisabledPushFlags() uint64 {
// AppLevelPing ...
func (t *streamTransport) AppLevelPing() centrifuge.AppLevelPing {
return centrifuge.AppLevelPing{
PingInterval: 25 * time.Second,
PingInterval: t.pingPongConfig.PingInterval,
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/unisse/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ type Config struct {
ProtocolVersion centrifuge.ProtocolVersion
// MaxRequestBodySize for POST requests when used.
MaxRequestBodySize int

centrifuge.PingPongConfig
}
2 changes: 1 addition & 1 deletion internal/unisse/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func (h Handler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
}

transport := newEventsourceTransport(r, protoVersion)
transport := newEventsourceTransport(r, protoVersion, h.config.PingPongConfig)
c, closeFn, err := centrifuge.NewClient(r.Context(), h.node, transport)
if err != nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error create client", map[string]interface{}{"error": err.Error(), "transport": "uni_sse"}))
Expand Down
31 changes: 16 additions & 15 deletions internal/unisse/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,28 +3,29 @@ package unisse
import (
"net/http"
"sync"
"time"

"github.com/centrifugal/centrifuge"
)

type eventsourceTransport struct {
mu sync.Mutex
req *http.Request
messages chan []byte
disconnectCh chan *centrifuge.Disconnect
closedCh chan struct{}
closed bool
protoVersion centrifuge.ProtocolVersion
mu sync.Mutex
req *http.Request
messages chan []byte
disconnectCh chan *centrifuge.Disconnect
closedCh chan struct{}
closed bool
protoVersion centrifuge.ProtocolVersion
pingPongConfig centrifuge.PingPongConfig
}

func newEventsourceTransport(req *http.Request, protoVersion centrifuge.ProtocolVersion) *eventsourceTransport {
func newEventsourceTransport(req *http.Request, protoVersion centrifuge.ProtocolVersion, pingPongConfig centrifuge.PingPongConfig) *eventsourceTransport {
return &eventsourceTransport{
messages: make(chan []byte),
disconnectCh: make(chan *centrifuge.Disconnect),
closedCh: make(chan struct{}),
req: req,
protoVersion: protoVersion,
messages: make(chan []byte),
disconnectCh: make(chan *centrifuge.Disconnect),
closedCh: make(chan struct{}),
req: req,
protoVersion: protoVersion,
pingPongConfig: pingPongConfig,
}
}

Expand Down Expand Up @@ -56,7 +57,7 @@ func (t *eventsourceTransport) DisabledPushFlags() uint64 {
// AppLevelPing ...
func (t *eventsourceTransport) AppLevelPing() centrifuge.AppLevelPing {
return centrifuge.AppLevelPing{
PingInterval: 25 * time.Second,
PingInterval: t.pingPongConfig.PingInterval,
}
}

Expand Down
2 changes: 2 additions & 0 deletions internal/uniws/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ type Config struct {

// UseWriteBufferPool enables using buffer pool for writes.
UseWriteBufferPool bool

centrifuge.PingPongConfig
}

func sameHostOriginCheck() func(r *http.Request) bool {
Expand Down
1 change: 1 addition & 0 deletions internal/uniws/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
writeTimeout: writeTimeout,
compressionMinSize: compressionMinSize,
protoVersion: protoVersion,
pingPongConfig: s.config.PingPongConfig,
}

graceCh := make(chan struct{})
Expand Down
3 changes: 2 additions & 1 deletion internal/uniws/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type websocketTransportOptions struct {
writeTimeout time.Duration
compressionMinSize int
protoVersion centrifuge.ProtocolVersion
pingPongConfig centrifuge.PingPongConfig
}

func newWebsocketTransport(conn *websocket.Conn, opts websocketTransportOptions, graceCh chan struct{}) *websocketTransport {
Expand Down Expand Up @@ -103,7 +104,7 @@ func (t *websocketTransport) DisabledPushFlags() uint64 {
// AppLevelPing ...
func (t *websocketTransport) AppLevelPing() centrifuge.AppLevelPing {
return centrifuge.AppLevelPing{
PingInterval: 25 * time.Second,
PingInterval: t.opts.pingPongConfig.PingInterval,
}
}

Expand Down
16 changes: 4 additions & 12 deletions internal/wt/config.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,10 @@
package wt

import "time"
import (
"github.com/centrifugal/centrifuge"
)

// Config for Handler.
type Config struct {
// AppLevelPingInterval tells how often to issue application-level server-to-client pings.
// AppLevelPingInterval is only used for clients with ProtocolVersion2.
// AppLevelPingInterval is EXPERIMENTAL and is a subject to change.
// For zero value 25 secs will be used. To disable sending app-level pings use -1.
AppLevelPingInterval time.Duration
// AppLevelPongTimeout sets time for application-level pong check after issuing
// ping. AppLevelPongTimeout must be less than AppLevelPingInterval.
// AppLevelPongTimeout is only used for clients with ProtocolVersion2.
// AppLevelPongTimeout is EXPERIMENTAL and is a subject to change.
// For zero value AppLevelPingInterval / 3 will be used. To disable pong checks use -1.
AppLevelPongTimeout time.Duration
centrifuge.PingPongConfig
}
2 changes: 1 addition & 1 deletion internal/wt/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (s *Handler) ServeHTTP(rw http.ResponseWriter, r *http.Request) {
protoType = centrifuge.ProtocolTypeProtobuf
}

transport := newWebtransportTransport(protoType, conn, stream, s.config.AppLevelPingInterval, s.config.AppLevelPongTimeout)
transport := newWebtransportTransport(protoType, conn, stream, s.config.PingPongConfig)
c, closeFn, err := centrifuge.NewClient(r.Context(), s.node, transport)
if err != nil {
s.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error creating client", map[string]interface{}{"transport": transportName}))
Expand Down
42 changes: 15 additions & 27 deletions internal/wt/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,34 +12,22 @@ import (
const transportName = "webtransport"

type webtransportTransport struct {
mu sync.RWMutex
closeCh chan struct{}
protoType centrifuge.ProtocolType
session *webtransport.Session
stream webtransport.Stream
pingInterval time.Duration
pongTimeout time.Duration
closed bool
mu sync.RWMutex
closeCh chan struct{}
protoType centrifuge.ProtocolType
session *webtransport.Session
stream webtransport.Stream
pingPongConfig centrifuge.PingPongConfig
closed bool
}

func newWebtransportTransport(protoType centrifuge.ProtocolType, session *webtransport.Session, stream webtransport.Stream, pingInterval, pongTimeout time.Duration) *webtransportTransport {
if pingInterval == 0 {
pingInterval = 25 * time.Second
} else if pingInterval == -1 {
pingInterval = 0
}
if pongTimeout == 0 {
pongTimeout = pingInterval / 3
} else if pongTimeout == -1 {
pongTimeout = 0
}
func newWebtransportTransport(protoType centrifuge.ProtocolType, session *webtransport.Session, stream webtransport.Stream, pingPongConfig centrifuge.PingPongConfig) *webtransportTransport {
return &webtransportTransport{
protoType: protoType,
closeCh: make(chan struct{}),
session: session,
stream: stream,
pingInterval: pingInterval,
pongTimeout: pongTimeout,
protoType: protoType,
closeCh: make(chan struct{}),
session: session,
stream: stream,
pingPongConfig: pingPongConfig,
}
}

Expand Down Expand Up @@ -76,8 +64,8 @@ func (t *webtransportTransport) Emulation() bool {
// AppLevelPing ...
func (t *webtransportTransport) AppLevelPing() centrifuge.AppLevelPing {
return centrifuge.AppLevelPing{
PingInterval: t.pingInterval,
PongTimeout: t.pongTimeout,
PingInterval: t.pingPongConfig.PingInterval,
PongTimeout: t.pingPongConfig.PongTimeout,
}
}

Expand Down
26 changes: 25 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,9 @@ func bindCentrifugoConfig() {
// This option allows smooth migration to Centrifugo v4,
// should be removed at some point in the future.
"use_client_protocol_v1_by_default": false,

"ping_interval": 25 * time.Second,
"pong_timeout": 8 * time.Second,
}

for k, v := range defaults {
Expand Down Expand Up @@ -1850,6 +1853,18 @@ func rpcNamespacesFromConfig(v *viper.Viper) []rule.RpcNamespace {
return ns
}

func getPingPongConfig() centrifuge.PingPongConfig {
pingInterval := GetDuration("ping_interval")
pongTimeout := GetDuration("pong_timeout")
if pingInterval <= pongTimeout {
log.Fatal().Msgf("ping_interval (%s) must be greater than pong_timeout (%s)", pingInterval, pongTimeout)
}
return centrifuge.PingPongConfig{
PingInterval: pingInterval,
PongTimeout: pongTimeout,
}
}

func websocketHandlerConfig() centrifuge.WebsocketConfig {
v := viper.GetViper()
cfg := centrifuge.WebsocketConfig{}
Expand All @@ -1867,18 +1882,21 @@ func websocketHandlerConfig() centrifuge.WebsocketConfig {
cfg.WriteTimeout = GetDuration("websocket_write_timeout")
cfg.MessageSizeLimit = v.GetInt("websocket_message_size_limit")
cfg.CheckOrigin = getCheckOrigin()
cfg.PingPongConfig = getPingPongConfig()
return cfg
}

func httpStreamHandlerConfig() centrifuge.HTTPStreamConfig {
return centrifuge.HTTPStreamConfig{
MaxRequestBodySize: viper.GetInt("http_stream_max_request_body_size"),
PingPongConfig: getPingPongConfig(),
}
}

func sseHandlerConfig() centrifuge.SSEConfig {
return centrifuge.SSEConfig{
MaxRequestBodySize: viper.GetInt("sse_max_request_body_size"),
PingPongConfig: getPingPongConfig(),
}
}

Expand Down Expand Up @@ -1945,6 +1963,7 @@ func uniWebsocketHandlerConfig() uniws.Config {
WriteTimeout: GetDuration("uni_websocket_write_timeout"),
MessageSizeLimit: v.GetInt("uni_websocket_message_size_limit"),
CheckOrigin: getCheckOrigin(),
PingPongConfig: getPingPongConfig(),
}
}

Expand All @@ -1956,6 +1975,7 @@ func uniSSEHandlerConfig() unisse.Config {
return unisse.Config{
ProtocolVersion: protocolVersion,
MaxRequestBodySize: viper.GetInt("uni_sse_max_request_body_size"),
PingPongConfig: getPingPongConfig(),
}
}

Expand All @@ -1967,6 +1987,7 @@ func uniStreamHandlerConfig() unihttpstream.Config {
return unihttpstream.Config{
ProtocolVersion: protocolVersion,
MaxRequestBodySize: viper.GetInt("uni_http_stream_max_request_body_size"),
PingPongConfig: getPingPongConfig(),
}
}

Expand Down Expand Up @@ -1995,11 +2016,14 @@ func sockjsHandlerConfig() centrifuge.SockjsConfig {
cfg.WebsocketWriteTimeout = GetDuration("websocket_write_timeout")
cfg.CheckOrigin = getCheckOrigin()
cfg.WebsocketCheckOrigin = getCheckOrigin()
cfg.PingPongConfig = getPingPongConfig()
return cfg
}

func webTransportHandlerConfig() wt.Config {
return wt.Config{}
return wt.Config{
PingPongConfig: getPingPongConfig(),
}
}

func adminHandlerConfig() admin.Config {
Expand Down
0