8000 Postgresql consumer: option to use try lock by FZambia · Pull Request #988 · centrifugal/centrifugo · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

Postgresql consumer: option to use try lock #988

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 8, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions internal/cli/configdoc/schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -6914,6 +6914,16 @@
"is_complex_type": false
}
]
},
{
"field": "consumers[].postgresql.use_try_lock",
"name": "use_try_lock",
"go_name": "UseTryLock",
"level": 3,
"type": "bool",
"default": "false",
"comment": "UseTryLock when enabled tells Centrifugo to use pg_try_advisory_xact_lock instead of pg_advisory_xact_lock.",
"is_complex_type": false
}
]
},
Expand Down
2 changes: 2 additions & 0 deletions internal/configtypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -723,6 +723,8 @@ type PostgresConsumerConfig struct {
PartitionPollInterval Duration `mapstructure:"partition_poll_interval" json:"partition_poll_interval" envconfig:"partition_poll_interval" default:"300ms" yaml:"partition_poll_interval" toml:"partition_poll_interval"`
PartitionNotificationChannel string `mapstructure:"partition_notification_channel" json:"partition_notification_channel" envconfig:"partition_notification_channel" yaml:"partition_notification_channel" toml:"partition_notification_channel"`
TLS TLSConfig `mapstructure:"tls" json:"tls" envconfig:"tls" yaml:"tls" toml:"tls"`
// UseTryLock when enabled tells Centrifugo to use pg_try_advisory_xact_lock instead of pg_advisory_xact_lock.
UseTryLock bool `mapstructure:"use_try_lock" json:"use_try_lock" envconfig:"use_try_lock" default:"false" yaml:"use_try_lock" toml:"use_try_lock"`
}

func (c PostgresConsumerConfig) Validate() error {
Expand Down
31 changes: 28 additions & 3 deletions internal/consuming/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func (c *PostgresConsumer) listenForNotifications(ctx context.Context, triggerCh
}
}

var ErrLockNotAcquired = errors.New("advisory lock not acquired")

func (c *PostgresConsumer) processOnce(ctx context.Context, partition int) (int, error) {
tx, err := c.pool.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted})
if err != nil {
Expand All @@ -133,9 +135,21 @@ func (c *PostgresConsumer) processOnce(ctx context.Context, partition int) (int,
// Acquire an advisory lock for partition. This allows us to process all the rows
// from partition in order.
lockName := c.lockPrefix + strconv.Itoa(partition)
_, err = tx.Exec(ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", lockName)
if err != nil {
return 0, fmt.Errorf("unable to acquire advisory lock: %w", err)

if c.config.UseTryLock {
var acquired bool
err = tx.QueryRow(ctx, "SELECT pg_try_advisory_xact_lock(hashtext($1))", lockName).Scan(&acquired)
if err != nil {
return 0, fmt.Errorf("error acquiring advisory lock: %w", err)
}
if !acquired {
return 0, ErrLockNotAcquired
}
} else {
_, err = tx.Exec(ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", lockName)
if err != nil {
return 0, fmt.Errorf("error acquiring advisory lock: %w", err)
}
}

sql := `
Expand Down Expand Up @@ -254,6 +268,17 @@ func (c *PostgresConsumer) Run(ctx context.Context) error {
if errors.Is(err, context.Canceled) {
return err
}
if c.config.UseTryLock && errors.Is(err, ErrLockNotAcquired) {
// If we are using try advisory lock, and it was not acquired
// then wait for notification or poll interval.
select {
case <-time.After(pollInterval.ToDuration()):
case <-partitionTriggerChannels[i]:
case <-ctx.Done():
return ctx.Err()
}
continue
}
retries++
backoffDuration = getNextBackoffDuration(backoffDuration, retries)
c.common.metrics.errorsTotal.WithLabelValues(c.common.name).Inc()
Expand Down
95 changes: 54 additions & 41 deletions internal/consuming/postgresql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,54 +111,67 @@ func ensureEventsRemoved(ctx context.Context, t *testing.T, tableName string, pa

func TestPostgresConsumer_GreenScenario(t *testing.T) {
t.Parallel()
testTableName := "centrifugo_consumer_test_" + strings.Replace(uuid.New().String(), "-", "_", -1)
testNotificationChannel := "centrifugo_test_channel_" + strings.Replace(uuid.New().String(), "-", "_", -1)
testMethod := "method"
testPayload := []byte(`{"key":"value"}`)

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
testCases := []struct {
useTryLock bool
}{
{useTryLock: false},
{useTryLock: true},
}

err := setupTestTable(ctx, testTableName, testNotificationChannel)
require.NoError(t, err)
for _, tc := range testCases {
t.Run(fmt.Sprintf("useTryLock=%v", tc.useTryLock), func(t *testing.T) {
testTableName := "centrifugo_consumer_test_" + strings.Replace(uuid.New().String(), "-", "_", -1)
testNotificationChannel := "centrifugo_test_channel_" + strings.Replace(uuid.New().String(), "-", "_", -1)
testMethod := "method"
testPayload := []byte(`{"key":"value"}`)

eventReceived := make(chan struct{})
consumerClosed := make(chan struct{})
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()

// Setup consumer
config := PostgresConfig{
DSN: testPGDSN,
OutboxTableName: testTableName,
PartitionSelectLimit: 10,
NumPartitions: 1,
PartitionPollInterval: configtypes.Duration(300 * time.Millisecond),
PartitionNotificationChannel: testNotificationChannel,
}
consumer, err := NewPostgresConsumer(config, &MockDispatcher{
onDispatchCommand: func(ctx context.Context, method string, data []byte) error {
require.Equal(t, testMethod, method)
require.Equal(t, testPayload, data)
close(eventReceived)
return nil
},
}, testCommon(prometheus.NewRegistry()))
require.NoError(t, err)
err := setupTestTable(ctx, testTableName, testNotificationChannel)
require.NoError(t, err)

// Start the consumer
go func() {
err := consumer.Run(ctx)
require.ErrorIs(t, err, context.Canceled)
close(consumerClosed)
}()
eventReceived := make(chan struct{})
consumerClosed := make(chan struct{})

8000 // Setup consumer
config := PostgresConfig{
DSN: testPGDSN,
OutboxTableName: testTableName,
PartitionSelectLimit: 10,
NumPartitions: 1,
PartitionPollInterval: configtypes.Duration(300 * time.Millisecond),
PartitionNotificationChannel: testNotificationChannel,
UseTryLock: tc.useTryLock,
}
consumer, err := NewPostgresConsumer(config, &MockDispatcher{
onDispatchCommand: func(ctx context.Context, method string, data []byte) error {
require.Equal(t, testMethod, method)
require.Equal(t, testPayload, data)
close(eventReceived)
return nil
},
}, testCommon(prometheus.NewRegistry()))
require.NoError(t, err)

partition := 0
err = insertEvent(ctx, consumer.pool, testTableName, testMethod, testPayload, partition)
require.NoError(t, err)
// Start the consumer
go func() {
err := consumer.Run(ctx)
require.ErrorIs(t, err, context.Canceled)
close(consumerClosed)
}()

waitCh(t, eventReceived, 30*time.Second, "timeout waiting for event")
ensureEventsRemoved(ctx, t, testTableName, 0)
cancel()
waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed")
partition := 0
err = insertEvent(ctx, consumer.pool, testTableName, testMethod, testPayload, partition)
require.NoError(t, err)

waitCh(t, eventReceived, 30*time.Second, "timeout waiting for event")
ensureEventsRemoved(ctx, t, testTableName, 0)
cancel()
waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed")
})
}
}

func TestPostgresConsumer_SeveralConsumers(t *testing.T) {
Expand Down
Loading
0