8000 fix(connectors): fix webhook idempotency key and tests by paul-nicolas · Pull Request #445 · formancehq/payments · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix(connectors): fix webhook idempotency key and tests #445

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 17 commits into from
May 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension 10000

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions internal/connectors/engine/activities/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@
Name: "PluginCreateWebhooks",
Func: a.PluginCreateWebhooks,
}).
Append(temporalworker.Definition{
Name: "PluginVerifyWebhook",
Func: a.PluginVerifyWebhook,
}).

Check warning on line 93 in internal/connectors/engine/activities/activity.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/activities/activity.go#L90-L93

Added lines #L90 - L93 were not covered by tests
Append(temporalworker.Definition{
Name: "PluginTranslateWebhook",
Func: a.PluginTranslateWebhook,
Expand Down
2 changes: 2 additions & 0 deletions internal/connectors/engine/activities/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@
return temporal.NewNonRetryableApplicationError(err.Error(), ErrTypeInvalidArgument, cause)
case errors.Is(err, models.ErrMissingConnectorMetadata):
return temporal.NewNonRetryableApplicationError(err.Error(), ErrTypeInvalidArgument, cause)
case errors.Is(err, models.ErrWebhookVerification):
return temporal.NewNonRetryableApplicationError(err.Error(), ErrTypeInvalidArgument, cause)

Check warning on line 51 in internal/connectors/engine/activities/errors.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/activities/errors.go#L50-L51

Added lines #L50 - L51 were not covered by tests
case errors.As(err, &models.NonRetryableError):
return temporal.NewNonRetryableApplicationError(err.Error(), ErrTypeInvalidArgument, cause)

Expand Down
39 changes: 39 additions & 0 deletions internal/connectors/engine/activities/plugin_verify_webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package activities

import (
context "context"

"github.com/formancehq/payments/internal/models"
"go.temporal.io/sdk/workflow"
)

type VerifyWebhookRequest struct {
ConnectorID models.ConnectorID
Req models.VerifyWebhookRequest
}

func (a Activities) PluginVerifyWebhook(ctx context.Context, request VerifyWebhookRequest) (*models.VerifyWebhookResponse, error) {
plugin, err := a.plugins.Get(request.ConnectorID)
if err != nil {
return nil, a.temporalPluginError(ctx, err)
}

Check warning on line 19 in internal/connectors/engine/activities/plugin_verify_webhook.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/activities/plugin_verify_webhook.go#L18-L19

Added lines #L18 - L19 were not covered by tests

resp, err := plugin.VerifyWebhook(ctx, request.Req)
if err != nil {
return nil, a.temporalPluginError(ctx, err)
}
return &resp, nil
}

var PluginVerifyWebhookActivity = Activities{}.PluginVerifyWebhook

func PluginVerifyWebhook(ctx workflow.Context, connectorID models.ConnectorID, request models.VerifyWebhookRequest) (*models.VerifyWebhookResponse, error) {
ret := models.VerifyWebhookResponse{}
if err := executeActivity(ctx, PluginVerifyWebhookActivity, &ret, VerifyWebhookRequest{
ConnectorID: connectorID,
Req: request,
}); err != nil {
return nil, err
}
return &ret, nil

Check warning on line 38 in internal/connectors/engine/activities/plugin_verify_webhook.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/engine/activities/plugin_verify_webhook.go#L30-L38

Added lines #L30 - L38 were not covered by tests
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package activities_test

import (
"fmt"
"time"

"github.com/formancehq/go-libs/v3/logging"
"github.com/formancehq/go-libs/v3/pointer"
"github.com/formancehq/payments/internal/connectors/engine/activities"
"github.com/formancehq/payments/internal/connectors/engine/plugins"
pluginsError "github.com/formancehq/payments/internal/connectors/plugins"
"github.com/formancehq/payments/internal/events"
"github.com/formancehq/payments/internal/models"
"github.com/formancehq/payments/internal/storage"
. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
"go.temporal.io/sdk/temporal"
gomock "go.uber.org/mock/gomock"
)

var _ = Describe("Plugin Verify Webhooks", func() {
var (
act activities.Activities
p *plugins.MockPlugins
s *storage.MockStorage
evts *events.Events
sampleResponse models.VerifyWebhookResponse
)

BeforeEach(func() {
evts = &events.Events{}
sampleResponse = models.VerifyWebhookResponse{WebhookIdempotencyKey: pointer.For("test")}
})

Context("plugin verify webhook", func() {
var (
plugin *models.MockPlugin
req activities.VerifyWebhookRequest
logger = logging.NewDefaultLogger(GinkgoWriter, true, false, false)
delay = 50 * time.Millisecond
)

BeforeEach(func() {
ctrl := gomock.NewController(GinkgoT())
p = plugins.NewMockPlugins(ctrl)
s = storage.NewMockStorage(ctrl)
plugin = models.NewMockPlugin(ctrl)
act = activities.New(logger, nil, s, evts, p, delay)
req = activities.VerifyWebhookRequest{
ConnectorID: models.ConnectorID{
Provider: "some_provider",
},
}
})

It("calls underlying plugin", func(ctx SpecContext) {
p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().VerifyWebhook(ctx, req.Req).Return(sampleResponse, nil)
res, err := act.PluginVerifyWebhook(ctx, req)
Expect(err).To(BeNil())
Expect(res.WebhookIdempotencyKey).To(Equal(sampleResponse.WebhookIdempotencyKey))
})

It("returns a retryable temporal error", func(ctx SpecContext) {
p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().VerifyWebhook(ctx, req.Req).Return(sampleResponse, fmt.Errorf("some string"))
_, err := act.PluginVerifyWebhook(ctx, req)
Expect(err).ToNot(BeNil())
temporalErr, ok := err.(*temporal.ApplicationError)
Expect(ok).To(BeTrue())
Expect(temporalErr.NonRetryable()).To(BeFalse())
Expect(temporalErr.Type()).To(Equal(activities.ErrTypeDefault))
})

It("returns a non-retryable temporal error", func(ctx SpecContext) {
p.EXPECT().Get(req.ConnectorID).Return(plugin, nil)
plugin.EXPECT().VerifyWebhook(ctx, req.Req).Return(sampleResponse, fmt.Errorf("invalid: %w", pluginsError.ErrNotImplemented))
_, err := act.PluginVerifyWebhook(ctx, req)
Expect(err).ToNot(BeNil())
temporalErr, ok := err.(*temporal.ApplicationError)
Expect(ok).To(BeTrue())
Expect(temporalErr.NonRetryable()).To(BeTrue())
Expect(temporalErr.Type()).To(Equal(activities.ErrTypeUnimplemented))
})
})
})
20 changes: 19 additions & 1 deletion internal/connectors/engine/workflow/handle_webhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,24 @@ func (w Workflow) runHandleWebhooks(
return temporal.NewNonRetryableApplicationError("webhook config not found", "NOT_FOUND", errors.New("webhook config not found"))
}

verifyResponse, err := activities.PluginVerifyWebhook(
infiniteRetryContext(ctx),
handleWebhooks.ConnectorID,
models.VerifyWebhookRequest{
Webhook: models.PSPWebhook{
BasicAuth: handleWebhooks.Webhook.BasicAuth,
QueryValues: handleWebhooks.Webhook.QueryValues,
Headers: handleWebhooks.Webhook.Headers,
Body: handleWebhooks.Webhook.Body,
},
Config: config,
},
)
if err != nil {
return fmt.Errorf("verifying webhook: %w", err)
}

handleWebhooks.Webhook.IdempotencyKey = verifyResponse.WebhookIdempotencyKey
err = activities.StorageWebhooksStore(infiniteRetryContext(ctx), handleWebhooks.Webhook)
if err != nil {
return fmt.Errorf("storing webhook: %w", err)
Expand Down Expand Up @@ -73,7 +91,7 @@ func (w Workflow) runHandleWebhooks(
workflow.WithChildOptions(
ctx,
workflow.ChildWorkflowOptions{
WorkflowID: fmt.Sprintf("store-webhook-%s-%s-%s", w.stack, handleWebhooks.ConnectorID.String(), response.IdempotencyKey),
WorkflowID: fmt.Sprintf("store-webhook-%s-%s-%s", w.stack, handleWebhooks.ConnectorID.String(), handleWebhooks.Webhook.ID),
TaskQueue: w.getDefaultTaskQueue(),
ParentClosePolicy: enums.PARENT_CLOSE_POLICY_ABANDON,
WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_ALLOW_DUPLICATE_FAILED_ONLY,
Expand Down
63 changes: 59 additions & 4 deletions internal/connectors/engine/workflow/handle_webhooks_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"

"github.com/formancehq/go-libs/v3/pointer"
"github.com/formancehq/payments/internal/connectors/engine/activities"
"github.com/formancehq/payments/internal/models"
"github.com/stretchr/testify/mock"
Expand All @@ -22,12 +23,19 @@ func (s *UnitTestSuite) Test_HandleWebhooks_Success() {
},
nil,
)
s.env.OnActivity(activities.StorageWebhooksStoreActivity, mock.Anything, mock.Anything).Once().Return(nil)
s.env.OnActivity(activities.PluginVerifyWebhookActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, req activities.VerifyWebhookRequest) (*models.VerifyWebhookResponse, error) {
return &models.VerifyWebhookResponse{
WebhookIdempotencyKey: pointer.For("test"),
}, nil
})
s.env.OnActivity(activities.StorageWebhooksStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, webhook models.Webhook) error {
s.Equal(pointer.For("test"), webhook.IdempotencyKey)
return nil
})
s.env.OnActivity(activities.PluginTranslateWebhookActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, req activities.TranslateWebhookRequest) (*models.TranslateWebhookResponse, error) {
return &models.TranslateWebhookResponse{
Responses: []models.WebhookResponse{
{
IdempotencyKey: "test",
Account: &s.pspAccount,
ExternalAccount: &s.pspAccount,
Payment: &s.pspPayment,
Expand Down Expand Up @@ -78,7 +86,15 @@ func (s *UnitTestSuite) Test_HandleWebhooks_NoResponses_Success() {
},
nil,
)
s.env.OnActivity(activities.StorageWebhooksStoreActivity, mock.Anything, mock.Anything).Once().Return(nil)
s.env.OnActivity(activities.PluginVerifyWebhookActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, req activities.VerifyWebhookRequest) (*models.VerifyWebhookResponse, error) {
return &models.VerifyWebhookResponse{
WebhookIdempotencyKey: pointer.For("test"),
}, nil
})
s.env.OnActivity(activities.StorageWebhooksStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, webhook models.Webhook) error {
s.Equal(pointer.For("test"), webhook.IdempotencyKey)
return nil
})
s.env.OnActivity(activities.PluginTranslateWebhookActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, req activities.TranslateWebhookRequest) (*models.TranslateWebhookResponse, error) {
return &models.TranslateWebhookResponse{
Responses: []models.WebhookResponse{},
Expand Down Expand Up @@ -161,6 +177,43 @@ func (s *UnitTestSuite) Test_HandleWebhooks_StorageWebhooksConfigsGet_Error() {
s.Error(err)
}

func (s *UnitTestSuite) Test_HandleWebhooks_PluginVerifyWebhook_Error() {
s.env.OnActivity(activities.StorageWebhooksConfigsGetActivity, mock.Anything, s.connectorID).Once().Return(
[]models.WebhookConfig{
{
Name: "test",
ConnectorID: s.connectorID,
URLPath: "/test",
},
},
nil,
)
s.env.OnActivity(activities.PluginVerifyWebhookActivity, mock.Anything, mock.Anything).Once().Return(
nil,
temporal.NewNonRetryableApplicationError("test", "test", errors.New("test")),
)

s.env.ExecuteWorkflow(RunHandleWebhooks, HandleWebhooks{
ConnectorID: s.connectorID,
URLPath: "/test",
Webhook: models.Webhook{
ID: "test",
ConnectorID: s.connectorID,
QueryValues: map[string][]string{
"test": {"test"},
},
Headers: map[string][]string{
"test": {"test"},
},
Body: []byte(`{}`),
},
})

s.True(s.env.IsWorkflowCompleted())
err := s.env.GetWorkflowError()
s.Error(err)
}

func (s *UnitTestSuite) Test_HandleWebhooks_StorageWebhooksStore_Error() {
s.env.OnActivity(activities.StorageWebhooksConfigsGetActivity, mock.Anything, s.connectorID).Once().Return(
[]models.WebhookConfig{
Expand All @@ -172,6 +225,7 @@ func (s *UnitTestSuite) Test_HandleWebhooks_StorageWebhooksStore_Error() {
},
nil,
)
s.env.OnActivity(activities.PluginVerifyWebhookActivity, mock.Anything, mock.Anything).Once().Return(&models.VerifyWebhookResponse{}, nil)
s.env.OnActivity(activities.StorageWebhooksStoreActivity, mock.Anything, mock.Anything).Once().Return(
temporal.NewNonRetryableApplicationError("test", "test", errors.New("test")),
)
Expand Down Expand Up @@ -208,6 +262,7 @@ func (s *UnitTestSuite) Test_HandleWebhooks_PluginTranslateWebhook_Error() {
},
nil,
)
s.env.OnActivity(activities.PluginVerifyWebhookActivity, mock.Anything, mock.Anything).Once().Return(&models.VerifyWebhookResponse{}, nil)
s.env.OnActivity(activities.StorageWebhooksStoreActivity, mock.Anything, mock.Anything).Once().Return(nil)
s.env.OnActivity(activities.PluginTranslateWebhookActivity, mock.Anything, mock.Anything).Once().Return(nil,
temporal.NewNonRetryableApplicationError("test", "test", errors.New("test")),
Expand Down Expand Up @@ -245,12 +300,12 @@ func (s *UnitTestSuite) Test_HandleWebhooks_RunStoreWebhookTranslation_Error() {
},
nil,
)
s.env.OnActivity(activities.PluginVerifyWebhookActivity, mock.Anything, mock.Anything).Once().Return(&models.VerifyWebhookResponse{}, nil)
s.env.OnActivity(activities.StorageWebhooksStoreActivity, mock.Anything, mock.Anything).Once().Return(nil)
s.env.OnActivity(activities.PluginTranslateWebhookActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, req activities.TranslateWebhookRequest) (*models.TranslateWebhookResponse, error) {
return &models.TranslateWebhookResponse{
Responses: []models.WebhookResponse{
{
IdempotencyKey: "test",
Account: &s.pspAccount,
ExternalAccount: &s.pspAccount,
Payment: &s.pspPayment,
Expand Down
4 changes: 4 additions & 0 deletions internal/connectors/plugins/base_plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,10 @@
return models.CreateWebhooksResponse{}, ErrNotImplemented
}

func (dp *basePlugin) VerifyWebhook(ctx context.Context, req models.VerifyWebhookRequest) (models.VerifyWebhookResponse, error) {
return models.VerifyWebhookResponse{}, ErrNotImplemented

Check warning on line 80 in internal/connectors/plugins/base_plugin.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/plugins/base_plugin.go#L79-L80

Added lines #L79 - L80 were not covered by tests
}

func (dp *basePlugin) TranslateWebhook(ctx context.Context, req models.TranslateWebhookRequest) (models.TranslateWebhookResponse, error) {
return models.TranslateWebhookResponse{}, ErrNotImplemented
}
Expand Down
17 changes: 11 additions & 6 deletions internal/connectors/plugins/public/adyen/accounts_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,23 +15,28 @@ import (

var _ = Describe("Adyen Plugin Accounts", func() {
var (
plg *Plugin
ctrl *gomock.Controller
m *client.MockClient
plg models.Plugin
)

BeforeEach(func() {
plg = &Plugin{}
ctrl = gomock.NewController(GinkgoT())
m = client.NewMockClient(ctrl)

plg = &Plugin{client: m}
})

AfterEach(func() {
ctrl.Finish()
})

Context("fetching next accounts", func() {
var (
m *client.MockClient
sampleAccounts []management.Merchant
)

BeforeEach(func() {
ctrl := gomock.NewController(GinkgoT())
m = client.NewMockClient(ctrl)
plg.client = m

for i := 10; i < 60; i++ {
sampleAccounts = append(sampleAccounts, management.Merchant{
Expand Down
8 changes: 8 additions & 0 deletions internal/connectors/plugins/public/adyen/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,14 @@
}, nil
}

func (p *Plugin) VerifyWebhook(ctx context.Context, req models.VerifyWebhookRequest) (models.VerifyWebhookResponse, error) {
if p.client == nil {
return models.VerifyWebhookResponse{}, plugins.ErrNotYetInstalled
}

Check warning on line 120 in internal/connectors/plugins/public/adyen/plugin.go

View check run for this annotation

Codecov / codecov/patch

internal/connectors/plugins/public/adyen/plugin.go#L119-L120

Added lines #L119 - L120 were not covered by tests

return p.verifyWebhook(ctx, req)
}

func (p *Plugin) TranslateWebhook(ctx context.Context, req models.TranslateWebhookRequest) (models.TranslateWebhookResponse, error) {
if p.client == nil {
return models.TranslateWebhookResponse{}, plugins.ErrNotYetInstalled
Expand Down
7 changes: 6 additions & 1 deletion internal/connectors/plugins/public/adyen/plugin_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func TestPlugin(t *testing.T) {
var _ = Describe("Adyen Plugin", func() {
var (
plg *Plugin
ctrl *gomock.Controller
m *client.MockClient
logger = logging.NewDefaultLogger(GinkgoWriter, true, false, false)
)
Expand All @@ -29,10 +30,14 @@ var _ = Describe("Adyen Plugin", func() {
plg = &Plugin{
Plugin: plugins.NewBasePlugin(),
}
ctrl := gomock.NewController(GinkgoT())
ctrl = gomock.NewController(GinkgoT())
m = client.NewMockClient(ctrl)
})

AfterEach(func() {
ctrl.Finish()
})

Context("install", func() {
It("reports validation errors in the config - apiKey", func(ctx SpecContext) {
_, err := New("adyen", logger, json.RawMessage(`{"companyID":"test"}`))
Expand Down
Loading
Loading
0