From f21afce562c0dd40343b5c29403bb8b7b48cfb40 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Fri, 21 Mar 2025 14:31:25 +0100 Subject: [PATCH 1/2] fix(*): payment initiations fixes --- internal/api/backend/backend_generated.go | 1 - .../services/payment_initiations_approve.go | 12 +-- .../payment_initiations_approve_test.go | 13 ++- .../services/payment_initiations_create.go | 11 +-- .../payment_initiations_create_test.go | 4 +- .../api/services/payment_initiations_retry.go | 6 +- .../payment_initiations_retry_test.go | 5 +- .../v2/handler_transfer_initiations_get.go | 18 ++++- .../v2/handler_transfer_initiations_list.go | 13 ++- .../activities/schedule_client_generated.go | 2 - internal/connectors/engine/engine.go | 10 +-- .../connectors/engine/engine_generated.go | 18 ++--- .../engine/plugins/plugin_generated.go | 1 - internal/connectors/engine/workers.go | 2 +- .../engine/workflow/create_payout.go | 23 ++++++ .../engine/workflow/create_payout_test.go | 81 +++++++++++++++++++ .../engine/workflow/create_transfer.go | 23 ++++++ .../engine/workflow/create_transfer_test.go | 81 +++++++++++++++++++ .../payment_initiation_adjusments_status.go | 5 ++ openapi.yaml | 3 +- openapi/v1-2/v1-2.yaml | 2 +- openapi/v3/v3-schemas.yaml | 1 + 22 files changed, 282 insertions(+), 53 deletions(-) diff --git a/internal/api/backend/backend_generated.go b/internal/api/backend/backend_generated.go index 701243583..27adf1fe7 100644 --- a/internal/api/backend/backend_generated.go +++ b/internal/api/backend/backend_generated.go @@ -27,7 +27,6 @@ import ( type MockBackend struct { ctrl *gomock.Controller recorder *MockBackendMockRecorder - isgomock struct{} } // MockBackendMockRecorder is the mock recorder for MockBackend. diff --git a/internal/api/services/payment_initiations_approve.go b/internal/api/services/payment_initiations_approve.go index 22facd2ba..29956128d 100644 --- a/internal/api/services/payment_initiations_approve.go +++ b/internal/api/services/payment_initiations_approve.go @@ -39,21 +39,21 @@ func (s *Service) PaymentInitiationsApprove(ctx context.Context, id models.Payme return models.Task{}, newStorageError(err, "cannot get payment initiation") } - startDelay := 0 * time.Second - now := time.Now() - if !pi.ScheduledAt.IsZero() && pi.ScheduledAt.After(now) { - startDelay = pi.ScheduledAt.Sub(now) + if !pi.ScheduledAt.IsZero() && pi.ScheduledAt.After(time.Now()) { + // In any case, if the payment initiation is scheduled for the future, + // we do not want to wait for the results + waitResult = false } switch pi.Type { case models.PAYMENT_INITIATION_TYPE_TRANSFER: - task, err := s.engine.CreateTransfer(ctx, pi.ID, startDelay, 1, waitResult) + task, err := s.engine.CreateTransfer(ctx, pi.ID, 1, waitResult) if err != nil { return models.Task{}, handleEngineErrors(err) } return task, nil case models.PAYMENT_INITIATION_TYPE_PAYOUT: - task, err := s.engine.CreatePayout(ctx, pi.ID, startDelay, 1, waitResult) + task, err := s.engine.CreatePayout(ctx, pi.ID, 1, waitResult) if err != nil { return models.Task{}, handleEngineErrors(err) } diff --git a/internal/api/services/payment_initiations_approve_test.go b/internal/api/services/payment_initiations_approve_test.go index 62ac10ec5..bce8f9b17 100644 --- a/internal/api/services/payment_initiations_approve_test.go +++ b/internal/api/services/payment_initiations_approve_test.go @@ -41,7 +41,7 @@ func TestPaymentInitiationsApprove(t *testing.T) { } piWithScheduledAt := models.PaymentInitiation{ Type: models.PAYMENT_INITIATION_TYPE_PAYOUT, - ScheduledAt: time.Now(), + ScheduledAt: time.Now().Add(time.Hour), } tests := []struct { @@ -141,17 +141,22 @@ func TestPaymentInitiationsApprove(t *testing.T) { if test.expectedAdjError == nil { store.EXPECT().PaymentInitiationsGet(gomock.Any(), pid).Return(&test.pi, test.piGetStorageErr) + waitResult := true + if !test.pi.ScheduledAt.IsZero() { + waitResult = false + } + if test.piGetStorageErr == nil { switch test.pi.Type { case models.PAYMENT_INITIATION_TYPE_TRANSFER: - eng.EXPECT().CreateTransfer(gomock.Any(), pid, 0*time.Second, 1, false).Return(models.Task{}, test.engineErr) + eng.EXPECT().CreateTransfer(gomock.Any(), pid, 1, waitResult).Return(models.Task{}, test.engineErr) case models.PAYMENT_INITIATION_TYPE_PAYOUT: - eng.EXPECT().CreatePayout(gomock.Any(), pid, gomock.Any(), 1, false).Return(models.Task{}, test.engineErr) + eng.EXPECT().CreatePayout(gomock.Any(), pid, 1, waitResult).Return(models.Task{}, test.engineErr) } } } - _, err := s.PaymentInitiationsApprove(context.Background(), pid, false) + _, err := s.PaymentInitiationsApprove(context.Background(), pid, true) switch { case test.expectedAdjError == nil && test.expectedPIError == nil && test.expectedEngineError == nil: require.NoError(t, err) diff --git a/internal/api/services/payment_initiations_create.go b/internal/api/services/payment_initiations_create.go index f723b8647..1218e8d93 100644 --- a/internal/api/services/payment_initiations_create.go +++ b/internal/api/services/payment_initiations_create.go @@ -2,7 +2,6 @@ package services import ( "context" - "time" "github.com/formancehq/payments/internal/models" ) @@ -28,21 +27,15 @@ func (s *Service) PaymentInitiationsCreate(ctx context.Context, paymentInitiatio return models.Task{}, newStorageError(err, "cannot create payment initiation") } - startDelay := 0 * time.Second - now := time.Now() - if !paymentInitiation.ScheduledAt.IsZero() && paymentInitiation.ScheduledAt.After(now) { - startDelay = paymentInitiation.ScheduledAt.Sub(now) - } - switch paymentInitiation.Type { case models.PAYMENT_INITIATION_TYPE_TRANSFER: - task, err := s.engine.CreateTransfer(ctx, paymentInitiation.ID, startDelay, 1, waitResult) + task, err := s.engine.CreateTransfer(ctx, paymentInitiation.ID, 1, waitResult) if err != nil { return models.Task{}, handleEngineErrors(err) } return task, nil case models.PAYMENT_INITIATION_TYPE_PAYOUT: - task, err := s.engine.CreatePayout(ctx, paymentInitiation.ID, startDelay, 1, waitResult) + task, err := s.engine.CreatePayout(ctx, paymentInitiation.ID, 1, waitResult) if err != nil { return models.Task{}, handleEngineErrors(err) } diff --git a/internal/api/services/payment_initiations_create_test.go b/internal/api/services/payment_initiations_create_test.go index eda7fda1a..da4b48a56 100644 --- a/internal/api/services/payment_initiations_create_test.go +++ b/internal/api/services/payment_initiations_create_test.go @@ -97,9 +97,9 @@ func TestPaymentInitiationsCreate(t *testing.T) { if test.piUpsertStorageErr == nil && test.sendToPSP { switch test.pi.Type { case models.PAYMENT_INITIATION_TYPE_TRANSFER: - eng.EXPECT().CreateTransfer(gomock.Any(), models.PaymentInitiationID{}, 0*time.Second, 1, false).Return(models.Task{}, test.engineErr) + eng.EXPECT().CreateTransfer(gomock.Any(), models.PaymentInitiationID{}, 1, false).Return(models.Task{}, test.engineErr) case models.PAYMENT_INITIATION_TYPE_PAYOUT: - eng.EXPECT().CreatePayout(gomock.Any(), models.PaymentInitiationID{}, gomock.Any(), 1, false).Return(models.Task{}, test.engineErr) + eng.EXPECT().CreatePayout(gomock.Any(), models.PaymentInitiationID{}, 1, false).Return(models.Task{}, test.engineErr) } } diff --git a/internal/api/services/payment_initiations_retry.go b/internal/api/services/payment_initiations_retry.go index d1f18a26c..fcccb0955 100644 --- a/internal/api/services/payment_initiations_retry.go +++ b/internal/api/services/payment_initiations_retry.go @@ -3,7 +3,6 @@ package services import ( "context" "fmt" - "time" "github.com/formancehq/go-libs/v2/bun/bunpaginate" "github.com/formancehq/payments/internal/models" @@ -36,16 +35,15 @@ func (s *Service) PaymentInitiationsRetry(ctx context.Context, id models.Payment attempts := getAttemps(adjustments) - startDelay := 0 * time.Second switch pi.Type { case models.PAYMENT_INITIATION_TYPE_TRANSFER: - task, err := s.engine.CreateTransfer(ctx, pi.ID, startDelay, attempts+1, waitResult) + task, err := s.engine.CreateTransfer(ctx, pi.ID, attempts+1, waitResult) if err != nil { return models.Task{}, handleEngineErrors(err) } return task, nil case models.PAYMENT_INITIATION_TYPE_PAYOUT: - task, err := s.engine.CreatePayout(ctx, pi.ID, startDelay, attempts+1, waitResult) + task, err := s.engine.CreatePayout(ctx, pi.ID, attempts+1, waitResult) if err != nil { return models.Task{}, handleEngineErrors(err) } diff --git a/internal/api/services/payment_initiations_retry_test.go b/internal/api/services/payment_initiations_retry_test.go index 7ef977d74..57a8dd106 100644 --- a/internal/api/services/payment_initiations_retry_test.go +++ b/internal/api/services/payment_initiations_retry_test.go @@ -4,7 +4,6 @@ import ( "context" "fmt" "testing" - "time" "github.com/formancehq/go-libs/v2/bun/bunpaginate" "github.com/formancehq/payments/internal/connectors/engine" @@ -143,9 +142,9 @@ func TestPaymentInitiationsRetry(t *testing.T) { if test.piGetStorageErr == nil { switch test.pi.Type { case models.PAYMENT_INITIATION_TYPE_TRANSFER: - eng.EXPECT().CreateTransfer(gomock.Any(), pid, 0*time.Second, 2, false).Return(models.Task{}, test.engineErr) + eng.EXPECT().CreateTransfer(gomock.Any(), pid, 2, false).Return(models.Task{}, test.engineErr) case models.PAYMENT_INITIATION_TYPE_PAYOUT: - eng.EXPECT().CreatePayout(gomock.Any(), pid, gomock.Any(), 2, false).Return(models.Task{}, test.engineErr) + eng.EXPECT().CreatePayout(gomock.Any(), pid, 2, false).Return(models.Task{}, test.engineErr) } } } diff --git a/internal/api/v2/handler_transfer_initiations_get.go b/internal/api/v2/handler_transfer_initiations_get.go index a93fa2c26..fadd6f7dc 100644 --- a/internal/api/v2/handler_transfer_initiations_get.go +++ b/internal/api/v2/handler_transfer_initiations_get.go @@ -118,10 +118,14 @@ func transferInitiationsGet(backend backend.Backend) http.HandlerFunc { func translateAdjustments(from []models.PaymentInitiationAdjustment) []transferInitiationAdjustmentsResponse { to := make([]transferInitiationAdjustmentsResponse, len(from)) for i, adjustment := range from { + status, toSend := translateStatus(adjustment.Status) + if !toSend { + continue + } to[i] = transferInitiationAdjustmentsResponse{ AdjustmentID: adjustment.ID.String(), CreatedAt: adjustment.CreatedAt, - Status: adjustment.Status.String(), + Status: status, Error: func() string { if adjustment.Error == nil { return "" @@ -134,6 +138,18 @@ func translateAdjustments(from []models.PaymentInitiationAdjustment) []transferI return to } +func translateStatus(from models.PaymentInitiationAdjustmentStatus) (string, bool) { + switch from { + case models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING: + // PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING is not supported + // in v2 as it is introduced in v3. Since we're gonna list all adjustments + // we can drop this one + return "", false + default: + return from.String(), true + } +} + func translateRelatedPayments(from []models.Payment) []transferInitiationPaymentsResponse { to := make([]transferInitiationPaymentsResponse, len(from)) for i, payment := range from { diff --git a/internal/api/v2/handler_transfer_initiations_list.go b/internal/api/v2/handler_transfer_initiations_list.go index 7086ef3c6..d5b3251a2 100644 --- a/internal/api/v2/handler_transfer_initiations_list.go +++ b/internal/api/v2/handler_transfer_initiations_list.go @@ -9,6 +9,7 @@ import ( "github.com/formancehq/go-libs/v2/pointer" "github.com/formancehq/payments/internal/api/backend" "github.com/formancehq/payments/internal/api/common" + "github.com/formancehq/payments/internal/models" "github.com/formancehq/payments/internal/otel" "github.com/formancehq/payments/internal/storage" ) @@ -49,8 +50,18 @@ func transferInitiationsList(backend backend.Backend) http.HandlerFunc { return } + status := "" + switch lastAdjustment.Status { + case models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING: + // PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING is not supported + // in v2 as it is introduced in v3. We map it to PROCESSING for backward compatibility. + status = models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_PROCESSING.String() + default: + status = lastAdjustment.Status.String() + } + if lastAdjustment != nil { - data[i].Status = lastAdjustment.Status.String() + data[i].Status = status data[i].Error = func() string { if lastAdjustment.Error == nil { return "" diff --git a/internal/connectors/engine/activities/schedule_client_generated.go b/internal/connectors/engine/activities/schedule_client_generated.go index 78e805f3c..4e3c5cf46 100644 --- a/internal/connectors/engine/activities/schedule_client_generated.go +++ b/internal/connectors/engine/activities/schedule_client_generated.go @@ -21,7 +21,6 @@ import ( type MockScheduleClient struct { ctrl *gomock.Controller recorder *MockScheduleClientMockRecorder - isgomock struct{} } // MockScheduleClientMockRecorder is the mock recorder for MockScheduleClient. @@ -89,7 +88,6 @@ func (mr *MockScheduleClientMockRecorder) List(ctx, options any) *gomock.Call { type MockScheduleHandle struct { ctrl *gomock.Controller recorder *MockScheduleHandleMockRecorder - isgomock struct{} } // MockScheduleHandleMockRecorder is the mock recorder for MockScheduleHandle. diff --git a/internal/connectors/engine/engine.go b/internal/connectors/engine/engine.go index 953794846..d3b1e655f 100644 --- a/internal/connectors/engine/engine.go +++ b/internal/connectors/engine/engine.go @@ -44,11 +44,11 @@ type Engine interface { // in the external system (PSP). ForwardBankAccount(ctx context.Context, bankAccountID uuid.UUID, connectorID models.ConnectorID, waitResult bool) (models.Task, error) // Create a transfer between two accounts on the given connector (PSP). - CreateTransfer(ctx context.Context, piID models.PaymentInitiationID, startDelay time.Duration, attempt int, waitResult bool) (models.Task, error) + CreateTransfer(ctx context.Context, piID models.PaymentInitiationID, attempt int, waitResult bool) (models.Task, error) // Reverse a transfer on the given connector (PSP). ReverseTransfer(ctx context.Context, reversal models.PaymentInitiationReversal, waitResult bool) (models.Task, error) // Create a payout on the given connector (PSP). - CreatePayout(ctx context.Context, piID models.PaymentInitiationID, startDelay time.Duration, attempt int, waitResult bool) (models.Task, error) + CreatePayout(ctx context.Context, piID models.PaymentInitiationID, attempt int, waitResult bool) (models.Task, error) // Reverse a payout on the given connector (PSP). ReversePayout(ctx context.Context, reversal models.PaymentInitiationReversal, waitResult bool) (models.Task, error) @@ -513,7 +513,7 @@ func (e *engine) ForwardBankAccount(ctx context.Context, bankAccountID uuid.UUID return task, nil } -func (e *engine) CreateTransfer(ctx context.Context, piID models.PaymentInitiationID, startDelay time.Duration, attempt int, waitResult bool) (models.Task, error) { +func (e *engine) CreateTransfer(ctx context.Context, piID models.PaymentInitiationID, attempt int, waitResult bool) (models.Task, error) { ctx, span := otel.Tracer().Start(ctx, "engine.CreateTransfer") defer span.End() @@ -540,7 +540,6 @@ func (e *engine) CreateTransfer(ctx context.Context, piID models.PaymentInitiati ctx, client.StartWorkflowOptions{ ID: id, - StartDelay: startDelay, TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, @@ -632,7 +631,7 @@ func (e *engine) ReverseTransfer(ctx context.Context, reversal models.PaymentIni return task, nil } -func (e *engine) CreatePayout(ctx context.Context, piID models.PaymentInitiationID, startDelay time.Duration, attempt int, waitResult bool) (models.Task, error) { +func (e *engine) CreatePayout(ctx context.Context, piID models.PaymentInitiationID, attempt int, waitResult bool) (models.Task, error) { ctx, span := otel.Tracer().Start(ctx, "engine.CreatePayout") defer span.End() @@ -659,7 +658,6 @@ func (e *engine) CreatePayout(ctx context.Context, piID models.PaymentInitiation ctx, client.StartWorkflowOptions{ ID: id, - StartDelay: startDelay, TaskQueue: GetDefaultTaskQueue(e.stack), WorkflowIDReusePolicy: enums.WORKFLOW_ID_REUSE_POLICY_REJECT_DUPLICATE, WorkflowExecutionErrorWhenAlreadyStarted: false, diff --git a/internal/connectors/engine/engine_generated.go b/internal/connectors/engine/engine_generated.go index 86f1d89f8..b7ec05d1a 100644 --- a/internal/connectors/engine/engine_generated.go +++ b/internal/connectors/engine/engine_generated.go @@ -13,7 +13,6 @@ import ( context "context" json "encoding/json" reflect "reflect" - time "time" models "github.com/formancehq/payments/internal/models" uuid "github.com/google/uuid" @@ -24,7 +23,6 @@ import ( type MockEngine struct { ctrl *gomock.Controller recorder *MockEngineMockRecorder - isgomock struct{} } // MockEngineMockRecorder is the mock recorder for MockEngine. @@ -87,18 +85,18 @@ func (mr *MockEngineMockRecorder) CreateFormancePayment(ctx, payment any) *gomoc } // CreatePayout mocks base method. -func (m *MockEngine) CreatePayout(ctx context.Context, piID models.PaymentInitiationID, startDelay time.Duration, attempt int, waitResult bool) (models.Task, error) { +func (m *MockEngine) CreatePayout(ctx context.Context, piID models.PaymentInitiationID, attempt int, waitResult bool) (models.Task, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreatePayout", ctx, piID, startDelay, attempt, waitResult) + ret := m.ctrl.Call(m, "CreatePayout", ctx, piID, attempt, waitResult) ret0, _ := ret[0].(models.Task) ret1, _ := ret[1].(error) return ret0, ret1 } // CreatePayout indicates an expected call of CreatePayout. -func (mr *MockEngineMockRecorder) CreatePayout(ctx, piID, startDelay, attempt, waitResult any) *gomock.Call { +func (mr *MockEngineMockRecorder) CreatePayout(ctx, piID, attempt, waitResult any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreatePayout", reflect.TypeOf((*MockEngine)(nil).CreatePayout), ctx, piID, startDelay, attempt, waitResult) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreatePayout", reflect.TypeOf((*MockEngine)(nil).CreatePayout), ctx, piID, attempt, waitResult) } // CreatePool mocks base method. @@ -116,18 +114,18 @@ func (mr *MockEngineMockRecorder) CreatePool(ctx, pool any) *gomock.Call { } // CreateTransfer mocks base method. -func (m *MockEngine) CreateTransfer(ctx context.Context, piID models.PaymentInitiationID, startDelay time.Duration, attempt int, waitResult bool) (models.Task, error) { +func (m *MockEngine) CreateTransfer(ctx context.Context, piID models.PaymentInitiationID, attempt int, waitResult bool) (models.Task, error) { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "CreateTransfer", ctx, piID, startDelay, attempt, waitResult) + ret := m.ctrl.Call(m, "CreateTransfer", ctx, piID, attempt, waitResult) ret0, _ := ret[0].(models.Task) ret1, _ := ret[1].(error) return ret0, ret1 } // CreateTransfer indicates an expected call of CreateTransfer. -func (mr *MockEngineMockRecorder) CreateTransfer(ctx, piID, startDelay, attempt, waitResult any) *gomock.Call { +func (mr *MockEngineMockRecorder) CreateTransfer(ctx, piID, attempt, waitResult any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTransfer", reflect.TypeOf((*MockEngine)(nil).CreateTransfer), ctx, piID, startDelay, attempt, waitResult) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CreateTransfer", reflect.TypeOf((*MockEngine)(nil).CreateTransfer), ctx, piID, attempt, waitResult) } // DeletePool mocks base method. diff --git a/internal/connectors/engine/plugins/plugin_generated.go b/internal/connectors/engine/plugins/plugin_generated.go index aaf604702..e7af12f08 100644 --- a/internal/connectors/engine/plugins/plugin_generated.go +++ b/internal/connectors/engine/plugins/plugin_generated.go @@ -21,7 +21,6 @@ import ( type MockPlugins struct { ctrl *gomock.Controller recorder *MockPluginsMockRecorder - isgomock struct{} } // MockPluginsMockRecorder is the mock recorder for MockPlugins. diff --git a/internal/connectors/engine/workers.go b/internal/connectors/engine/workers.go index a9b690b5e..08a0dfb6c 100644 --- a/internal/connectors/engine/workers.go +++ b/internal/connectors/engine/workers.go @@ -112,7 +112,7 @@ func (w *WorkerPool) OnStart(ctx context.Context) error { return nil } -func (w *WorkerPool) onStartPlugin(ctx context.Context, connector models.Connector) error { +func (w *WorkerPool) onStartPlugin(_ context.Context, connector models.Connector) error { // Even if the connector is scheduled for deletion, we still need to register // the plugin to be able to handle the uninstallation. // It will be unregistered when the uninstallation is done in the workflow diff --git a/internal/connectors/engine/workflow/create_payout.go b/internal/connectors/engine/workflow/create_payout.go index 57faa3b69..81bfe7a7c 100644 --- a/internal/connectors/engine/workflow/create_payout.go +++ b/internal/connectors/engine/workflow/create_payout.go @@ -52,6 +52,29 @@ func (w Workflow) createPayout( return err } + now := workflow.Now(ctx) + if !pi.ScheduledAt.IsZero() && pi.ScheduledAt.After(now) { + err = w.addPIAdjustment( + ctx, + models.PaymentInitiationAdjustmentID{ + PaymentInitiationID: createPayout.PaymentInitiationID, + CreatedAt: workflow.Now(ctx), + Status: models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING, + }, + pi.Amount, + &pi.Asset, + nil, + map[string]string{ + "scheduledAt": pi.ScheduledAt.String(), + }, + ) + if err != nil { + return err + } + + workflow.Sleep(ctx, pi.ScheduledAt.Sub(now)) + } + pspPI, err := w.getPSPPI(ctx, pi) if err != nil { return err diff --git a/internal/connectors/engine/workflow/create_payout_test.go b/internal/connectors/engine/workflow/create_payout_test.go index d15201af8..09a803492 100644 --- a/internal/connectors/engine/workflow/create_payout_test.go +++ b/internal/connectors/engine/workflow/create_payout_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math/big" + "time" "github.com/formancehq/go-libs/v2/pointer" "github.com/formancehq/payments/internal/connectors/engine/activities" @@ -80,6 +81,86 @@ func (s *UnitTestSuite) Test_CreatePayout_WithPayment_Success() { s.NoError(s.env.GetWorkflowError()) } +func (s *UnitTestSuite) Test_CreatePayout_WithScheduledAt_WithPayment_Success() { + paymentInitiationPayout := s.paymentInitiationPayout + paymentInitiationPayout.ScheduledAt = s.env.Now().Add(1 * time.Hour) + s.env.OnActivity(activities.StoragePaymentInitiationsGetActivity, mock.Anything, s.paymentInitiationID).Once().Return(&paymentInitiationPayout, nil) + + s.env.OnActivity(activities.StoragePaymentInitiationsAdjustmentsStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, adj models.PaymentInitiationAdjustment) error { + s.Equal(s.paymentInitiationID, adj.ID.PaymentInitiationID) + s.Equal(models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING, adj.Status) + s.Equal(big.NewInt(100), adj.Amount) + s.NotNil(adj.Asset) + s.Equal("USD/2", *adj.Asset) + s.Nil(adj.Error) + return nil + }) + + s.env.OnActivity(activities.StorageAccountsGetActivity, mock.Anything, *s.paymentInitiationPayout.SourceAccountID).Once().Return(&s.account, nil) + s.env.OnActivity(activities.StorageAccountsGetActivity, mock.Anything, *s.paymentInitiationPayout.DestinationAccountID).Once().Return(&s.account, nil) + s.env.OnActivity(activities.StoragePaymentInitiationsAdjustmentsStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, adj models.PaymentInitiationAdjustment) error { + s.Equal(s.paymentInitiationID, adj.ID.PaymentInitiationID) + s.Equal(models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_PROCESSING, adj.Status) + s.Equal(big.NewInt(100), adj.Amount) + s.NotNil(adj.Asset) + s.Equal("USD/2", *adj.Asset) + s.Nil(adj.Error) + return nil + }) + s.env.OnActivity(activities.PluginCreatePayoutActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, req activities.CreatePayoutRequest) (*models.CreatePayoutResponse, error) { + s.Equal(s.connectorID, req.ConnectorID) + s.Equal(s.paymentInitiationID.Reference, req.Req.PaymentInitiation.Reference) + return &models.CreatePayoutResponse{ + Payment: &s.pspPayment, + }, nil + }) + s.env.OnActivity(activities.StoragePaymentsStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, payments []models.Payment) error { + s.Equal(1, len(payments)) + s.Equal(s.paymentPayoutID, payments[0].ID) + return nil + }) + s.env.OnWorkflow(RunSendEvents, mock.Anything, mock.Anything).Once().Return(func(ctx workflow.Context, req SendEvents) error { + s.NotNil(req.Payment) + s.Nil(req.Account) + s.Nil(req.Balance) + s.Nil(req.BankAccount) + s.Nil(req.ConnectorReset) + s.Nil(req.PoolsCreation) + s.Nil(req.PoolsDeletion) + return nil + }) + s.env.OnActivity(activities.StoragePaymentInitiationsRelatedPaymentsStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, relatedPayment activities.RelatedPayment) error { + s.Equal(s.paymentInitiationID, relatedPayment.PiID) + s.Equal(s.paymentPayoutID, relatedPayment.PID) + return nil + }) + s.env.OnActivity(activities.StoragePaymentInitiationsAdjustmentsStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, adj models.PaymentInitiationAdjustment) error { + s.Equal(s.paymentInitiationID, adj.ID.PaymentInitiationID) + s.Equal(models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_PROCESSED, adj.Status) + s.Equal(big.NewInt(100), adj.Amount) + s.NotNil(adj.Asset) + s.Equal("USD/2", *adj.Asset) + s.Nil(adj.Error) + return nil + }) + s.env.OnActivity(activities.StorageTasksStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, task models.Task) error { + s.Equal(models.TASK_STATUS_SUCCEEDED, task.Status) + return nil + }) + + s.env.ExecuteWorkflow(RunCreatePayout, CreatePayout{ + TaskID: models.TaskID{ + Reference: "test", + ConnectorID: s.connectorID, + }, + ConnectorID: s.connectorID, + PaymentInitiationID: s.paymentInitiationID, + }) + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) +} + func (s *UnitTestSuite) Test_CreatePayout_WithPollingPayment_Success() { s.env.OnActivity(activities.StoragePaymentInitiationsGetActivity, mock.Anything, s.paymentInitiationID).Once().Return(&s.paymentInitiationPayout, nil) s.env.OnActivity(activities.StorageAccountsGetActivity, mock.Anything, *s.paymentInitiationPayout.SourceAccountID).Once().Return(&s.account, nil) diff --git a/internal/connectors/engine/workflow/create_transfer.go b/internal/connectors/engine/workflow/create_transfer.go index 6550c0080..5e7b5308b 100644 --- a/internal/connectors/engine/workflow/create_transfer.go +++ b/internal/connectors/engine/workflow/create_transfer.go @@ -52,6 +52,29 @@ func (w Workflow) createTransfer( return err } + now := workflow.Now(ctx) + if !pi.ScheduledAt.IsZero() && pi.ScheduledAt.After(now) { + err = w.addPIAdjustment( + ctx, + models.PaymentInitiationAdjustmentID{ + PaymentInitiationID: createTransfer.PaymentInitiationID, + CreatedAt: workflow.Now(ctx), + Status: models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING, + }, + pi.Amount, + &pi.Asset, + nil, + map[string]string{ + "scheduledAt": pi.ScheduledAt.String(), + }, + ) + if err != nil { + return err + } + + workflow.Sleep(ctx, pi.ScheduledAt.Sub(now)) + } + pspPI, err := w.getPSPPI(ctx, pi) if err != nil { return err diff --git a/internal/connectors/engine/workflow/create_transfer_test.go b/internal/connectors/engine/workflow/create_transfer_test.go index ad42863c4..1a2407078 100644 --- a/internal/connectors/engine/workflow/create_transfer_test.go +++ b/internal/connectors/engine/workflow/create_transfer_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "math/big" + "time" "github.com/formancehq/go-libs/v2/pointer" "github.com/formancehq/payments/internal/connectors/engine/activities" @@ -80,6 +81,86 @@ func (s *UnitTestSuite) Test_CreateTransfer_WithPayment_Success() { s.NoError(s.env.GetWorkflowError()) } +func (s *UnitTestSuite) Test_CreateTransfer_WithScheduledAt_WithPayment_Success() { + paymentInitiationTransfer := s.paymentInitiationTransfer + paymentInitiationTransfer.ScheduledAt = s.env.Now().Add(1 * time.Hour) + s.env.OnActivity(activities.StoragePaymentInitiationsGetActivity, mock.Anything, s.paymentInitiationID).Once().Return(&paymentInitiationTransfer, nil) + + s.env.OnActivity(activities.StoragePaymentInitiationsAdjustmentsStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, adj models.PaymentInitiationAdjustment) error { + s.Equal(s.paymentInitiationID, adj.ID.PaymentInitiationID) + s.Equal(models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING, adj.Status) + s.Equal(big.NewInt(100), adj.Amount) + s.NotNil(adj.Asset) + s.Equal("USD/2", *adj.Asset) + s.Nil(adj.Error) + return nil + }) + + s.env.OnActivity(activities.StorageAccountsGetActivity, mock.Anything, *s.paymentInitiationTransfer.SourceAccountID).Once().Return(&s.account, nil) + s.env.OnActivity(activities.StorageAccountsGetActivity, mock.Anything, *s.paymentInitiationTransfer.DestinationAccountID).Once().Return(&s.account, nil) + s.env.OnActivity(activities.StoragePaymentInitiationsAdjustmentsStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, adj models.PaymentInitiationAdjustment) error { + s.Equal(s.paymentInitiationID, adj.ID.PaymentInitiationID) + s.Equal(models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_PROCESSING, adj.Status) + s.Equal(big.NewInt(100), adj.Amount) + s.NotNil(adj.Asset) + s.Equal("USD/2", *adj.Asset) + s.Nil(adj.Error) + return nil + }) + s.env.OnActivity(activities.PluginCreateTransferActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, req activities.CreateTransferRequest) (*models.CreateTransferResponse, error) { + s.Equal(s.connectorID, req.ConnectorID) + s.Equal(s.paymentInitiationID.Reference, req.Req.PaymentInitiation.Reference) + return &models.CreateTransferResponse{ + Payment: &s.pspPayment, + }, nil + }) + s.env.OnActivity(activities.StoragePaymentsStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, payments []models.Payment) error { + s.Equal(1, len(payments)) + s.Equal(s.paymentPayoutID, payments[0].ID) + return nil + }) + s.env.OnWorkflow(RunSendEvents, mock.Anything, mock.Anything).Once().Return(func(ctx workflow.Context, req SendEvents) error { + s.NotNil(req.Payment) + s.Nil(req.Account) + s.Nil(req.Balance) + s.Nil(req.BankAccount) + s.Nil(req.ConnectorReset) + s.Nil(req.PoolsCreation) + s.Nil(req.PoolsDeletion) + return nil + }) + s.env.OnActivity(activities.StoragePaymentInitiationsRelatedPaymentsStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, relatedPayment activities.RelatedPayment) error { + s.Equal(s.paymentInitiationID, relatedPayment.PiID) + s.Equal(s.paymentPayoutID, relatedPayment.PID) + return nil + }) + s.env.OnActivity(activities.StoragePaymentInitiationsAdjustmentsStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, adj models.PaymentInitiationAdjustment) error { + s.Equal(s.paymentInitiationID, adj.ID.PaymentInitiationID) + s.Equal(models.PAYMENT_INITIATION_ADJUSTMENT_STATUS_PROCESSED, adj.Status) + s.Equal(big.NewInt(100), adj.Amount) + s.NotNil(adj.Asset) + s.Equal("USD/2", *adj.Asset) + s.Nil(adj.Error) + return nil + }) + s.env.OnActivity(activities.StorageTasksStoreActivity, mock.Anything, mock.Anything).Once().Return(func(ctx context.Context, task models.Task) error { + s.Equal(models.TASK_STATUS_SUCCEEDED, task.Status) + return nil + }) + + s.env.ExecuteWorkflow(RunCreateTransfer, CreateTransfer{ + TaskID: models.TaskID{ + Reference: "test", + ConnectorID: s.connectorID, + }, + ConnectorID: s.connectorID, + PaymentInitiationID: s.paymentInitiationID, + }) + + s.True(s.env.IsWorkflowCompleted()) + s.NoError(s.env.GetWorkflowError()) +} + func (s *UnitTestSuite) Test_CreateTransfer_WithPollingPayment_Success() { s.env.OnActivity(activities.StoragePaymentInitiationsGetActivity, mock.Anything, s.paymentInitiationID).Once().Return(&s.paymentInitiationTransfer, nil) s.env.OnActivity(activities.StorageAccountsGetActivity, mock.Anything, *s.paymentInitiationTransfer.SourceAccountID).Once().Return(&s.account, nil) diff --git a/internal/models/payment_initiation_adjusments_status.go b/internal/models/payment_initiation_adjusments_status.go index a707714ae..eea2b8868 100644 --- a/internal/models/payment_initiation_adjusments_status.go +++ b/internal/models/payment_initiation_adjusments_status.go @@ -19,6 +19,7 @@ const ( PAYMENT_INITIATION_ADJUSTMENT_STATUS_REVERSE_PROCESSING PAYMENT_INITIATION_ADJUSTMENT_STATUS_REVERSE_FAILED PAYMENT_INITIATION_ADJUSTMENT_STATUS_REVERSED + PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING ) func (s PaymentInitiationAdjustmentStatus) String() string { @@ -39,6 +40,8 @@ func (s PaymentInitiationAdjustmentStatus) String() string { return "REVERSE_FAILED" case PAYMENT_INITIATION_ADJUSTMENT_STATUS_REVERSED: return "REVERSED" + case PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING: + return "SCHEDULED_FOR_PROCESSING" case PAYMENT_INITIATION_ADJUSTMENT_STATUS_UNKNOWN: return "UNKNOWN" } @@ -63,6 +66,8 @@ func PaymentInitiationAdjustmentStatusFromString(s string) (PaymentInitiationAdj return PAYMENT_INITIATION_ADJUSTMENT_STATUS_REVERSE_FAILED, nil case "REVERSED": return PAYMENT_INITIATION_ADJUSTMENT_STATUS_REVERSED, nil + case "SCHEDULED_FOR_PROCESSING": + return PAYMENT_INITIATION_ADJUSTMENT_STATUS_SCHEDULED_FOR_PROCESSING, nil case "UNKNOWN": return PAYMENT_INITIATION_ADJUSTMENT_STATUS_UNKNOWN, nil } diff --git a/openapi.yaml b/openapi.yaml index 51ead1600..1900535f9 100644 --- a/openapi.yaml +++ b/openapi.yaml @@ -2779,7 +2779,7 @@ components: type: string format: date-time status: - $ref: '#/components/schemas/TransferInitiationStatus' + $ref: '#/components/schemas/PaymentStatus' error: type: string BankAccount: @@ -4559,6 +4559,7 @@ components: enum: - UNKNOWN - WAITING_FOR_VALIDATION + - SCHEDULED_FOR_PROCESSING - PROCESSING - PROCESSED - FAILED diff --git a/openapi/v1-2/v1-2.yaml b/openapi/v1-2/v1-2.yaml index c2bfc586b..9f7ad5469 100644 --- a/openapi/v1-2/v1-2.yaml +++ b/openapi/v1-2/v1-2.yaml @@ -1898,7 +1898,7 @@ components: type: string format: date-time status: - $ref: '#/components/schemas/TransferInitiationStatus' + $ref: '#/components/schemas/PaymentStatus' error: type: string BankAccount: diff --git a/openapi/v3/v3-schemas.yaml b/openapi/v3/v3-schemas.yaml index 451ff5e8a..d11efcc69 100644 --- a/openapi/v3/v3-schemas.yaml +++ b/openapi/v3/v3-schemas.yaml @@ -1131,6 +1131,7 @@ components: enum: - UNKNOWN - WAITING_FOR_VALIDATION + - SCHEDULED_FOR_PROCESSING - PROCESSING - PROCESSED - FAILED From b3e53ff70d09d1dbc53e1157894a9a8f25d66fd0 Mon Sep 17 00:00:00 2001 From: Paul Nicolas Date: Fri, 21 Mar 2025 15:06:37 +0100 Subject: [PATCH 2/2] add comments --- internal/connectors/engine/workflow/create_payout.go | 3 +++ internal/connectors/engine/workflow/create_transfer.go | 3 +++ 2 files changed, 6 insertions(+) diff --git a/internal/connectors/engine/workflow/create_payout.go b/internal/connectors/engine/workflow/create_payout.go index 81bfe7a7c..c153e7fc2 100644 --- a/internal/connectors/engine/workflow/create_payout.go +++ b/internal/connectors/engine/workflow/create_payout.go @@ -52,6 +52,9 @@ func (w Workflow) createPayout( return err } + // If the payout is scheduled in the future, we need to add a schedule + // for processing adjustment to the payment initiation, and then sleep until + // the scheduled time. now := workflow.Now(ctx) if !pi.ScheduledAt.IsZero() && pi.ScheduledAt.After(now) { err = w.addPIAdjustment( diff --git a/internal/connectors/engine/workflow/create_transfer.go b/internal/connectors/engine/workflow/create_transfer.go index 5e7b5308b..d59ea5cb2 100644 --- a/internal/connectors/engine/workflow/create_transfer.go +++ b/internal/connectors/engine/workflow/create_transfer.go @@ -52,6 +52,9 @@ func (w Workflow) createTransfer( return err } + // If the transfer is scheduled in the future, we need to add a schedule + // for processing adjustment to the payment initiation, and then sleep until + // the scheduled time. now := workflow.Now(ctx) if !pi.ScheduledAt.IsZero() && pi.ScheduledAt.After(now) { err = w.addPIAdjustment(