From 2f5d378a158447ca4354f40167a27febd7df4809 Mon Sep 17 00:00:00 2001 From: FZambia Date: Sat, 17 May 2025 09:40:35 +0300 Subject: [PATCH] postgresql consumer: option to use try lock --- internal/cli/configdoc/schema.json | 10 +++ internal/configtypes/types.go | 2 + internal/consuming/postgresql.go | 31 ++++++++- internal/consuming/postgresql_test.go | 95 +++++++++++++++------------ 4 files changed, 94 insertions(+), 44 deletions(-) diff --git a/internal/cli/configdoc/schema.json b/internal/cli/configdoc/schema.json index cd8bf3194..dfd727595 100644 --- a/internal/cli/configdoc/schema.json +++ b/internal/cli/configdoc/schema.json @@ -6914,6 +6914,16 @@ "is_complex_type": false } ] + }, + { + "field": "consumers[].postgresql.use_try_lock", + "name": "use_try_lock", + "go_name": "UseTryLock", + "level": 3, + "type": "bool", + "default": "false", + "comment": "UseTryLock when enabled tells Centrifugo to use pg_try_advisory_xact_lock instead of pg_advisory_xact_lock.", + "is_complex_type": false } ] }, diff --git a/internal/configtypes/types.go b/internal/configtypes/types.go index f56fbfd3a..2e2983563 100644 --- a/internal/configtypes/types.go +++ b/internal/configtypes/types.go @@ -723,6 +723,8 @@ type PostgresConsumerConfig struct { PartitionPollInterval Duration `mapstructure:"partition_poll_interval" json:"partition_poll_interval" envconfig:"partition_poll_interval" default:"300ms" yaml:"partition_poll_interval" toml:"partition_poll_interval"` PartitionNotificationChannel string `mapstructure:"partition_notification_channel" json:"partition_notification_channel" envconfig:"partition_notification_channel" yaml:"partition_notification_channel" toml:"partition_notification_channel"` TLS TLSConfig `mapstructure:"tls" json:"tls" envconfig:"tls" yaml:"tls" toml:"tls"` + // UseTryLock when enabled tells Centrifugo to use pg_try_advisory_xact_lock instead of pg_advisory_xact_lock. + UseTryLock bool `mapstructure:"use_try_lock" json:"use_try_lock" envconfig:"use_try_lock" default:"false" yaml:"use_try_lock" toml:"use_try_lock"` } func (c PostgresConsumerConfig) Validate() error { diff --git a/internal/consuming/postgresql.go b/internal/consuming/postgresql.go index 083439ca8..310345f62 100644 --- a/internal/consuming/postgresql.go +++ b/internal/consuming/postgresql.go @@ -123,6 +123,8 @@ func (c *PostgresConsumer) listenForNotifications(ctx context.Context, triggerCh } } +var ErrLockNotAcquired = errors.New("advisory lock not acquired") + func (c *PostgresConsumer) processOnce(ctx context.Context, partition int) (int, error) { tx, err := c.pool.BeginTx(ctx, pgx.TxOptions{IsoLevel: pgx.ReadCommitted}) if err != nil { @@ -133,9 +135,21 @@ func (c *PostgresConsumer) processOnce(ctx context.Context, partition int) (int, // Acquire an advisory lock for partition. This allows us to process all the rows // from partition in order. lockName := c.lockPrefix + strconv.Itoa(partition) - _, err = tx.Exec(ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", lockName) - if err != nil { - return 0, fmt.Errorf("unable to acquire advisory lock: %w", err) + + if c.config.UseTryLock { + var acquired bool + err = tx.QueryRow(ctx, "SELECT pg_try_advisory_xact_lock(hashtext($1))", lockName).Scan(&acquired) + if err != nil { + return 0, fmt.Errorf("error acquiring advisory lock: %w", err) + } + if !acquired { + return 0, ErrLockNotAcquired + } + } else { + _, err = tx.Exec(ctx, "SELECT pg_advisory_xact_lock(hashtext($1))", lockName) + if err != nil { + return 0, fmt.Errorf("error acquiring advisory lock: %w", err) + } } sql := ` @@ -254,6 +268,17 @@ func (c *PostgresConsumer) Run(ctx context.Context) error { if errors.Is(err, context.Canceled) { return err } + if c.config.UseTryLock && errors.Is(err, ErrLockNotAcquired) { + // If we are using try advisory lock, and it was not acquired + // then wait for notification or poll interval. + select { + case <-time.After(pollInterval.ToDuration()): + case <-partitionTriggerChannels[i]: + case <-ctx.Done(): + return ctx.Err() + } + continue + } retries++ backoffDuration = getNextBackoffDuration(backoffDuration, retries) c.common.metrics.errorsTotal.WithLabelValues(c.common.name).Inc() diff --git a/internal/consuming/postgresql_test.go b/internal/consuming/postgresql_test.go index cd375a29d..38daf7594 100644 --- a/internal/consuming/postgresql_test.go +++ b/internal/consuming/postgresql_test.go @@ -111,54 +111,67 @@ func ensureEventsRemoved(ctx context.Context, t *testing.T, tableName string, pa func TestPostgresConsumer_GreenScenario(t *testing.T) { t.Parallel() - testTableName := "centrifugo_consumer_test_" + strings.Replace(uuid.New().String(), "-", "_", -1) - testNotificationChannel := "centrifugo_test_channel_" + strings.Replace(uuid.New().String(), "-", "_", -1) - testMethod := "method" - testPayload := []byte(`{"key":"value"}`) - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - defer cancel() + testCases := []struct { + useTryLock bool + }{ + {useTryLock: false}, + {useTryLock: true}, + } - err := setupTestTable(ctx, testTableName, testNotificationChannel) - require.NoError(t, err) + for _, tc := range testCases { + t.Run(fmt.Sprintf("useTryLock=%v", tc.useTryLock), func(t *testing.T) { + testTableName := "centrifugo_consumer_test_" + strings.Replace(uuid.New().String(), "-", "_", -1) + testNotificationChannel := "centrifugo_test_channel_" + strings.Replace(uuid.New().String(), "-", "_", -1) + testMethod := "method" + testPayload := []byte(`{"key":"value"}`) - eventReceived := make(chan struct{}) - consumerClosed := make(chan struct{}) + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() - // Setup consumer - config := PostgresConfig{ - DSN: testPGDSN, - OutboxTableName: testTableName, - PartitionSelectLimit: 10, - NumPartitions: 1, - PartitionPollInterval: configtypes.Duration(300 * time.Millisecond), - PartitionNotificationChannel: testNotificationChannel, - } - consumer, err := NewPostgresConsumer(config, &MockDispatcher{ - onDispatchCommand: func(ctx context.Context, method string, data []byte) error { - require.Equal(t, testMethod, method) - require.Equal(t, testPayload, data) - close(eventReceived) - return nil - }, - }, testCommon(prometheus.NewRegistry())) - require.NoError(t, err) + err := setupTestTable(ctx, testTableName, testNotificationChannel) + require.NoError(t, err) - // Start the consumer - go func() { - err := consumer.Run(ctx) - require.ErrorIs(t, err, context.Canceled) - close(consumerClosed) - }() + eventReceived := make(chan struct{}) + consumerClosed := make(chan struct{}) + + // Setup consumer + config := PostgresConfig{ + DSN: testPGDSN, + OutboxTableName: testTableName, + PartitionSelectLimit: 10, + NumPartitions: 1, + PartitionPollInterval: configtypes.Duration(300 * time.Millisecond), + PartitionNotificationChannel: testNotificationChannel, + UseTryLock: tc.useTryLock, + } + consumer, err := NewPostgresConsumer(config, &MockDispatcher{ + onDispatchCommand: func(ctx context.Context, method string, data []byte) error { + require.Equal(t, testMethod, method) + require.Equal(t, testPayload, data) + close(eventReceived) + return nil + }, + }, testCommon(prometheus.NewRegistry())) + require.NoError(t, err) - partition := 0 - err = insertEvent(ctx, consumer.pool, testTableName, testMethod, testPayload, partition) - require.NoError(t, err) + // Start the consumer + go func() { + err := consumer.Run(ctx) + require.ErrorIs(t, err, context.Canceled) + close(consumerClosed) + }() - waitCh(t, eventReceived, 30*time.Second, "timeout waiting for event") - ensureEventsRemoved(ctx, t, testTableName, 0) - cancel() - waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed") + partition := 0 + err = insertEvent(ctx, consumer.pool, testTableName, testMethod, testPayload, partition) + require.NoError(t, err) + + waitCh(t, eventReceived, 30*time.Second, "timeout waiting for event") + ensureEventsRemoved(ctx, t, testTableName, 0) + cancel() + waitCh(t, consumerClosed, 30*time.Second, "timeout waiting for consumer closed") + }) + } } func TestPostgresConsumer_SeveralConsumers(t *testing.T) {