diff --git a/go.mod b/go.mod index 0ce9f6108..239476dc3 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( github.com/bombsimon/logrusr/v3 v3.0.0 github.com/davecgh/go-spew v1.1.1 github.com/gibson042/canonicaljson-go v1.0.3 + github.com/google/uuid v1.3.0 github.com/gorilla/mux v1.8.0 github.com/iancoleman/strcase v0.2.0 github.com/numary/go-libs v1.0.1 @@ -26,7 +27,6 @@ require ( github.com/xdg-go/scram v1.1.1 go.mongodb.org/mongo-driver v1.10.1 go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0 - go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.34.0 go.opentelemetry.io/otel v1.9.0 go.uber.org/dig v1.15.0 go.uber.org/fx v1.18.1 @@ -40,7 +40,6 @@ require ( github.com/ThreeDotsLabs/watermill-http v1.1.4 // indirect github.com/ThreeDotsLabs/watermill-kafka/v2 v2.2.2 // indirect github.com/ajg/form v1.5.1 // indirect - github.com/andybalholm/brotli v1.0.4 // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect @@ -58,13 +57,11 @@ require ( github.com/go-chi/render v1.0.2 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect - github.com/gofiber/fiber/v2 v2.38.1 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/glog v1.0.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.5.8 // indirect - github.com/google/uuid v1.3.0 // indirect github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect @@ -97,9 +94,6 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.4.1 // indirect github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.15 // indirect - github.com/valyala/bytebufferpool v1.0.0 // indirect - github.com/valyala/fasthttp v1.40.0 // indirect - github.com/valyala/tcplisten v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.3 // indirect github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect diff --git a/go.sum b/go.sum index fcf10074d..47658b676 100644 --- a/go.sum +++ b/go.sum @@ -64,8 +64,6 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= -github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= -github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= @@ -176,8 +174,6 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= -github.com/gofiber/fiber/v2 v2.38.1 h1:GEQ/Yt3Wsf2a30iTqtLXlBYJZso0JXPovt/tmj5H9jU= -github.com/gofiber/fiber/v2 v2.38.1/go.mod h1:t0NlbaXzuGH7I+7M4paE848fNWInZ7mfxI/Er1fTth8= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= @@ -314,7 +310,6 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -478,12 +473,6 @@ github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.15 h1:5eYO+onNB1mbdc3+uw github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.15/go.mod h1:6fGFPZDTcvHLxgWTFvf8hHWQrRO1tMXAFlxlqE+c650= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= -github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= -github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= -github.com/valyala/fasthttp v1.40.0 h1:CRq/00MfruPGFLTQKY8b+8SfdK60TxNztjRMnH0t1Yc= -github.com/valyala/fasthttp v1.40.0/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I= -github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= -github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= @@ -514,8 +503,6 @@ go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.34.0/go.mod h1:8cfNbNK5aJIRQnqOFGEALF17iZPDym2mmYP+ECIxi8M= go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0 h1:OkXMRbgldT4yZR7RwB4SFYTjYJGTXwPQVX69pYtTnc4= go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0/go.mod h1:zMu+r6aEorSQi8Ad0Y1fNrznm+VM8F10D2WlZp3HeFw= -go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.34.0 h1:RBgtLcpCco6JtjXnoY3MstbUjDVWH7ECYn4WmzmlOBI= -go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.34.0/go.mod h1:GIWhiaNpCoQJoD/R1q5GXySIwQIBQ+5pxMbuWhq3X94= go.opentelemetry.io/otel v1.9.0 h1:8WZNQFIB2a71LnANS9JeyidJKKGOOremcUtb/OtHISw= go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4AC+0Eo= go.opentelemetry.io/otel/exporters/jaeger v1.9.0 h1:gAEgEVGDWwFjcis9jJTOJqZNxDzoZfR12WNIxr7g9Ww= @@ -561,7 +548,6 @@ golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220824171710-5757bc0c5503 h1:vJ2V3lFLg+bBhgroYuRfyN583UzVveQmIXjc8T/y3to= @@ -637,7 +623,6 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c h1:JVAXQ10yGGVbSyoer5VILysz6YKjdNT2bsvlayjqhes= @@ -721,9 +706,7 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/pkg/connectors/dummypay/task_ingest.go b/internal/pkg/connectors/dummypay/task_ingest.go index 2755e95f7..93cda97ec 100644 --- a/internal/pkg/connectors/dummypay/task_ingest.go +++ b/internal/pkg/connectors/dummypay/task_ingest.go @@ -30,7 +30,7 @@ func taskIngest(config Config, descriptor TaskDescriptor, fs fs) task.Task { } // Ingest the payment into the system. - err = ingester.Ingest(ctx, ingestionPayload, struct{}{}) + err = ingester.IngestPayments(ctx, ingestionPayload, struct{}{}) if err != nil { return fmt.Errorf("failed to ingest file '%s': %w", descriptor.FileName, err) } @@ -39,7 +39,7 @@ func taskIngest(config Config, descriptor TaskDescriptor, fs fs) task.Task { } } -func parseIngestionPayload(config Config, descriptor TaskDescriptor, fs fs) (ingestion.Batch, error) { +func parseIngestionPayload(config Config, descriptor TaskDescriptor, fs fs) (ingestion.PaymentBatch, error) { // Open the file. file, err := fs.Open(filepath.Join(config.Directory, descriptor.FileName)) if err != nil { @@ -56,7 +56,7 @@ func parseIngestionPayload(config Config, descriptor TaskDescriptor, fs fs) (ing return nil, fmt.Errorf("failed to decode file '%s': %w", descriptor.FileName, err) } - ingestionPayload := ingestion.Batch{ingestion.BatchElement{ + ingestionPayload := ingestion.PaymentBatch{ingestion.PaymentBatchElement{ Referenced: payments.Referenced{ Reference: paymentElement.Reference, Type: paymentElement.Type, diff --git a/internal/pkg/connectors/modulr/task_fetch_transactions.go b/internal/pkg/connectors/modulr/task_fetch_transactions.go index c87d81ab2..2aca714cd 100644 --- a/internal/pkg/connectors/modulr/task_fetch_transactions.go +++ b/internal/pkg/connectors/modulr/task_fetch_transactions.go @@ -25,12 +25,12 @@ func taskFetchTransactions(logger sharedlogging.Logger, client *client.Client, a return err } - batch := ingestion.Batch{} + batch := ingestion.PaymentBatch{} for _, transaction := range transactions { logger.Info(transaction) - batchElement := ingestion.BatchElement{ + batchElement := ingestion.PaymentBatchElement{ Referenced: payments.Referenced{ Reference: transaction.ID, Type: matchTransactionType(transaction.Type), @@ -48,7 +48,7 @@ func taskFetchTransactions(logger sharedlogging.Logger, client *client.Client, a batch = append(batch, batchElement) } - return ingester.Ingest(ctx, batch, struct{}{}) + return ingester.IngestPayments(ctx, batch, struct{}{}) } } diff --git a/internal/pkg/connectors/stripe/task_connected_account.go b/internal/pkg/connectors/stripe/task_connected_account.go index 59b761d6f..b552ee824 100644 --- a/internal/pkg/connectors/stripe/task_connected_account.go +++ b/internal/pkg/connectors/stripe/task_connected_account.go @@ -15,7 +15,7 @@ import ( func ingestBatch(ctx context.Context, logger sharedlogging.Logger, ingester ingestion.Ingester, bts []*stripe.BalanceTransaction, commitState TimelineState, tail bool, ) error { - batch := ingestion.Batch{} + batch := ingestion.PaymentBatch{} for _, bt := range bts { batchElement, handled := CreateBatchElement(bt, !tail) @@ -37,7 +37,7 @@ func ingestBatch(ctx context.Context, logger sharedlogging.Logger, ingester inge "state": commitState, }).Debugf("updating state") - err := ingester.Ingest(ctx, batch, commitState) + err := ingester.IngestPayments(ctx, batch, commitState) if err != nil { return err } diff --git a/internal/pkg/connectors/stripe/translate.go b/internal/pkg/connectors/stripe/translate.go index d859aa8ca..b06b3607e 100644 --- a/internal/pkg/connectors/stripe/translate.go +++ b/internal/pkg/connectors/stripe/translate.go @@ -127,7 +127,7 @@ func currencies() map[string]currency { } } -func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward bool) (ingestion.BatchElement, bool) { +func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward bool) (ingestion.PaymentBatchElement, bool) { var ( reference payments.Referenced paymentData *payments.Data @@ -145,11 +145,11 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b }() if balanceTransaction.Source == nil { - return ingestion.BatchElement{}, false + return ingestion.PaymentBatchElement{}, false } if balanceTransaction.Source.Payout == nil { - return ingestion.BatchElement{}, false + return ingestion.PaymentBatchElement{}, false } formatAsset := func(cur stripe.Currency) string { @@ -290,12 +290,12 @@ func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward b Raw: balanceTransaction, } case "stripe_fee", "network_cost": - return ingestion.BatchElement{}, true + return ingestion.PaymentBatchElement{}, true default: - return ingestion.BatchElement{}, false + return ingestion.PaymentBatchElement{}, false } - return ingestion.BatchElement{ + return ingestion.PaymentBatchElement{ Referenced: reference, Payment: paymentData, Adjustment: adjustment, diff --git a/internal/pkg/connectors/wise/task_fetch_transfers.go b/internal/pkg/connectors/wise/task_fetch_transfers.go index cab773051..644e461ae 100644 --- a/internal/pkg/connectors/wise/task_fetch_transfers.go +++ b/internal/pkg/connectors/wise/task_fetch_transfers.go @@ -23,12 +23,15 @@ func taskFetchTransfers(logger sharedlogging.Logger, client *client, profileID u return err } - batch := ingestion.Batch{} + var ( + accountBatch ingestion.AccountBatch + paymentBatch ingestion.PaymentBatch + ) for _, transfer := range transfers { logger.Info(transfer) - batchElement := ingestion.BatchElement{ + batchElement := ingestion.PaymentBatchElement{ Referenced: payments.Referenced{ Reference: fmt.Sprintf("%d", transfer.ID), Type: payments.TypeTransfer, @@ -42,10 +45,41 @@ func taskFetchTransfers(logger sharedlogging.Logger, client *client, profileID u }, } - batch = append(batch, batchElement) + if transfer.SourceAccount != 0 { + ref := fmt.Sprintf("%d", transfer.SourceAccount) + + accountBatch = append(accountBatch, + ingestion.AccountBatchElement{ + Reference: ref, + Type: payments.AccountTypeSource, + }, + ) + + batchElement.Referenced.Accounts = append(batchElement.Referenced.Accounts, ref) + } + + if transfer.TargetAccount != 0 { + ref := fmt.Sprintf("%d", transfer.TargetAccount) + + accountBatch = append(accountBatch, + ingestion.AccountBatchElement{ + Reference: ref, + Type: payments.AccountTypeTarget, + }, + ) + + batchElement.Referenced.Accounts = append(batchElement.Referenced.Accounts, ref) + } + + paymentBatch = append(paymentBatch, batchElement) + } + + err = ingester.IngestAccounts(ctx, accountBatch) + if err != nil { + return err } - return ingester.Ingest(ctx, batch, struct{}{}) + return ingester.IngestPayments(ctx, paymentBatch, struct{}{}) } } diff --git a/internal/pkg/ingestion/accounts.go b/internal/pkg/ingestion/accounts.go new file mode 100644 index 000000000..6eca8373f --- /dev/null +++ b/internal/pkg/ingestion/accounts.go @@ -0,0 +1,135 @@ +package ingestion + +import ( + "context" + "encoding/json" + "fmt" + "time" + + "github.com/numary/payments/internal/pkg/payments" + + "github.com/numary/go-libs/sharedlogging" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type AccountBatchElement struct { + Reference string + Provider string + Type payments.AccountType +} + +type AccountBatch []AccountBatchElement + +type AccountIngesterFn func(ctx context.Context, batch AccountBatch, commitState any) error + +func (fn AccountIngesterFn) IngestAccounts(ctx context.Context, batch AccountBatch, commitState any) error { + return fn(ctx, batch, commitState) +} + +func (i *DefaultIngester) processAccountBatch(ctx context.Context, batch AccountBatch) ([]payments.Account, error) { + allAccounts := make([]payments.Account, 0) + + for _, elem := range batch { + logger := i.logger.WithFields(map[string]any{ + "id": elem.Reference, + }) + + update := bson.M{ + "$set": payments.Account{ + Reference: elem.Reference, + Provider: i.provider, + Type: elem.Type, + }, + } + + data, err := json.Marshal(update) + if err != nil { + panic(err) + } + + logger.WithFields(map[string]interface{}{ + "update": string(data), + }).Debugf("Update account") + + ret := i.db.Collection(payments.AccountsCollection).FindOneAndUpdate( + ctx, + payments.Account{ + Reference: elem.Reference, + Provider: i.provider, + Type: elem.Type, + }, + update, + options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After), + ) + if ret.Err() != nil { + logger.Errorf("Error updating account: %s", ret.Err()) + + return nil, fmt.Errorf("error updating payment: %w", ret.Err()) + } + + account := payments.Account{} + + err = ret.Decode(&account) + if err != nil { + return nil, err + } + + allAccounts = append(allAccounts, account) + } + + return allAccounts, nil +} + +func (i *DefaultIngester) IngestAccounts(ctx context.Context, batch AccountBatch) error { + startingAt := time.Now() + + i.logger.WithFields(map[string]interface{}{ + "size": len(batch), + "startingAt": startingAt, + }).Debugf("Ingest accounts batch") + + err := i.db.Client().UseSession(ctx, func(ctx mongo.SessionContext) error { + var allAccounts []payments.Account + + _, err := ctx.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { + var err error + + allAccounts, err = i.processAccountBatch(ctx, batch) + if err != nil { + return nil, err + } + + i.logger.Debugf("Update state") + + return nil, err + }) + if err != nil { + return err + } + + if i.publisher != nil { + for _, e := range allAccounts { + i.publish(ctx, TopicPayments, NewEventSavedAccount(e)) + } + } + + return err + }) + if err != nil { + sharedlogging.GetLogger(ctx).Errorf("Error ingesting accounts batch: %s", err) + + return err + } + + endedAt := time.Now() + + i.logger.WithFields(map[string]interface{}{ + "size": len(batch), + "endedAt": endedAt, + "latency": endedAt.Sub(startingAt).String(), + }).Debugf("Accounts batch ingested") + + return nil +} diff --git a/internal/pkg/ingestion/ingester.go b/internal/pkg/ingestion/ingester.go index 1c1c7b2aa..66e28ed90 100644 --- a/internal/pkg/ingestion/ingester.go +++ b/internal/pkg/ingestion/ingester.go @@ -2,43 +2,17 @@ package ingestion import ( "context" - "encoding/json" - "errors" - "fmt" - "time" "github.com/numary/payments/internal/pkg/payments" "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedpublish" - "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" ) -type BatchElement struct { - Referenced payments.Referenced - Payment *payments.Data - Adjustment *payments.Adjustment - Metadata payments.Metadata - Forward bool -} - -type Batch []BatchElement - type Ingester interface { - Ingest(ctx context.Context, batch Batch, commitState any) error -} -type IngesterFn func(ctx context.Context, batch Batch, commitState any) error - -func (fn IngesterFn) Ingest(ctx context.Context, batch Batch, commitState any) error { - return fn(ctx, batch, commitState) -} - -func NoOpIngester() IngesterFn { - return IngesterFn(func(ctx context.Context, batch Batch, commitState any) error { - return nil - }) + IngestPayments(ctx context.Context, batch PaymentBatch, commitState any) error + IngestAccounts(ctx context.Context, batch AccountBatch) error } type DefaultIngester struct { @@ -49,241 +23,6 @@ type DefaultIngester struct { publisher sharedpublish.Publisher } -type referenced payments.Referenced - -func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]payments.Payment, error) { - allPayments := make([]payments.Payment, 0) - - for _, elem := range batch { - logger := i.logger.WithFields(map[string]any{ - "id": referenced(elem.Referenced), - }) - - var update bson.M - - if elem.Adjustment == nil && elem.Payment == nil { - return nil, errors.New("either adjustment or payment must be provided") - } - - var metadataChanges payments.MetadataChanges - - if elem.Payment != nil { - ret := i.db.Collection(payments.Collection).FindOne( - ctx, - payments.Identifier{ - Referenced: elem.Referenced, - Provider: i.provider, - }) - if ret.Err() != nil && !errors.Is(ret.Err(), mongo.ErrNoDocuments) { - logger.Errorf("Error retrieving payment: %s", ret.Err()) - - return nil, fmt.Errorf("error retrieving payment: %w", ret.Err()) - } - - if ret != nil && ret.Err() == nil { - payment := payments.Payment{} - - err := ret.Decode(&payment) - if err != nil { - return nil, err - } - - metadataChanges = payment.MergeMetadata(elem.Metadata) - - elem.Metadata = metadataChanges.After - } - } - - switch { - case elem.Forward && elem.Adjustment != nil: - update = bson.M{ - "$push": bson.M{ - "adjustments": bson.M{ - "$each": []any{elem.Adjustment}, - "$position": 0, - }, - }, - "$set": bson.M{ - "status": elem.Adjustment.Status, - "raw": elem.Adjustment.Raw, - "date": elem.Adjustment.Date, - }, - } - case elem.Forward && elem.Payment != nil: - update = bson.M{ - "$set": payments.Payment{ - Identifier: payments.Identifier{ - Referenced: elem.Referenced, - Provider: i.provider, - }, - Data: *elem.Payment, - Adjustments: []payments.Adjustment{ - { - Status: elem.Payment.Status, - Amount: elem.Payment.InitialAmount, - Date: elem.Payment.CreatedAt, - Raw: elem.Payment.Raw, - }, - }, - Metadata: elem.Metadata, - }, - } - case !elem.Forward && elem.Adjustment != nil: - update = bson.M{ - "$push": bson.M{ - "adjustments": bson.M{ - "$each": []any{elem.Adjustment}, - }, - }, - "$setOnInsert": bson.M{ - "status": elem.Adjustment.Status, - }, - } - case !elem.Forward && elem.Payment != nil: - update = bson.M{ - "$push": bson.M{ - "adjustments": bson.M{ - "$each": []any{payments.Adjustment{ - Status: elem.Payment.Status, - Amount: elem.Payment.InitialAmount, - Date: elem.Payment.CreatedAt, - Raw: elem.Payment.Raw, - }}, - }, - }, - "$set": bson.M{ - "raw": elem.Payment.Raw, - "createdAt": elem.Payment.CreatedAt, - "scheme": elem.Payment.Scheme, - "initialAmount": elem.Payment.InitialAmount, - "asset": elem.Payment.Asset, - }, - "$setOnInsert": bson.M{ - "status": elem.Payment.Status, - }, - } - } - - data, err := json.Marshal(update) - if err != nil { - panic(err) - } - - logger.WithFields(map[string]interface{}{ - "update": string(data), - }).Debugf("Update payment") - - ret := i.db.Collection(payments.Collection).FindOneAndUpdate( - ctx, - payments.Identifier{ - Referenced: elem.Referenced, - Provider: i.provider, - }, - update, - options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After), - ) - if ret.Err() != nil { - logger.Errorf("Error updating payment: %s", ret.Err()) - - return nil, fmt.Errorf("error updating payment: %w", ret.Err()) - } - - payment := payments.Payment{} - - err = ret.Decode(&payment) - if err != nil { - return nil, err - } - - if metadataChanges.HasChanged() { - logger.WithFields(map[string]interface{}{ - "metadata": payment.Metadata, - }).Debugf("Metadata changed") - - _, err = i.db.Collection(payments.MetadataChangelogCollection).InsertOne(ctx, metadataChanges) - if err != nil { - return nil, err - } - } - - allPayments = append(allPayments, payment) - } - - return allPayments, nil -} - -func Filter[T any](objects []T, compareFn func(t T) bool) []T { - ret := make([]T, 0) - - for _, o := range objects { - if compareFn(o) { - ret = append(ret, o) - } - } - - return ret -} - -func (i *DefaultIngester) Ingest(ctx context.Context, batch Batch, commitState any) error { - startingAt := time.Now() - - i.logger.WithFields(map[string]interface{}{ - "size": len(batch), - "startingAt": startingAt, - }).Debugf("Ingest batch") - - err := i.db.Client().UseSession(ctx, func(ctx mongo.SessionContext) error { - var allPayments []payments.Payment - _, err := ctx.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { - var err error - - allPayments, err = i.processBatch(ctx, batch) - if err != nil { - return nil, err - } - - i.logger.Debugf("Update state") - - _, err = i.db.Collection(payments.TasksCollection).UpdateOne(ctx, map[string]any{ - "provider": i.provider, - "descriptor": i.descriptor, - }, map[string]any{ - "$set": map[string]any{ - "state": commitState, - }, - }, options.Update().SetUpsert(true)) - - return nil, err - }) - allPayments = Filter(allPayments, payments.Payment.HasInitialValue) - - if i.publisher != nil { - for _, e := range allPayments { - i.publish(ctx, TopicPayments, - NewEventSavedPayment( - e.Computed())) - } - } - - return err - }) - if err != nil { - sharedlogging.GetLogger(ctx).Errorf("Error ingesting batch: %s", err) - - return err - } - - endedAt := time.Now() - - i.logger.WithFields(map[string]interface{}{ - "size": len(batch), - "endedAt": endedAt, - "latency": endedAt.Sub(startingAt).String(), - }).Debugf("Batch ingested") - - return nil -} - func NewDefaultIngester( provider string, descriptor payments.TaskDescriptor, diff --git a/internal/pkg/ingestion/ingester_test.go b/internal/pkg/ingestion/ingester_test.go index 9c8b94527..12e6fc730 100644 --- a/internal/pkg/ingestion/ingester_test.go +++ b/internal/pkg/ingestion/ingester_test.go @@ -42,7 +42,7 @@ func TestIngester(t *testing.T) { mtest.CreateSuccessResponse(), // Commit transaction ) - err := ingester.Ingest(context.Background(), Batch{ + err := ingester.IngestPayments(context.Background(), PaymentBatch{ { Referenced: payments.Referenced{ Reference: "p1", diff --git a/internal/pkg/ingestion/message.go b/internal/pkg/ingestion/message.go index 7dc06359d..8309096f3 100644 --- a/internal/pkg/ingestion/message.go +++ b/internal/pkg/ingestion/message.go @@ -16,6 +16,7 @@ const ( EventApp = "payments" EventTypeSavedPayment = "SAVED_PAYMENT" + EventTypeSavedAccount = "SAVED_ACCOUNT" ) type EventMessage struct { @@ -36,6 +37,16 @@ func NewEventSavedPayment(payment payments.SavedPayment) EventMessage { } } +func NewEventSavedAccount(account payments.Account) EventMessage { + return EventMessage{ + Date: time.Now().UTC(), + App: EventApp, + Version: EventVersion, + Type: EventTypeSavedAccount, + Payload: account, + } +} + func (i *DefaultIngester) publish(ctx context.Context, topic string, ev EventMessage) { if err := i.publisher.Publish(ctx, topic, ev); err != nil { sharedlogging.GetLogger(ctx).Errorf("Publishing message: %s", err) diff --git a/internal/pkg/ingestion/payments.go b/internal/pkg/ingestion/payments.go new file mode 100644 index 000000000..e44381d29 --- /dev/null +++ b/internal/pkg/ingestion/payments.go @@ -0,0 +1,265 @@ +package ingestion + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + + "github.com/numary/payments/internal/pkg/payments" + + "github.com/numary/go-libs/sharedlogging" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" +) + +type PaymentBatchElement struct { + Referenced payments.Referenced + Payment *payments.Data + Adjustment *payments.Adjustment + Metadata payments.Metadata + Forward bool +} + +type PaymentBatch []PaymentBatchElement + +type IngesterFn func(ctx context.Context, batch PaymentBatch, commitState any) error + +func (fn IngesterFn) IngestPayments(ctx context.Context, batch PaymentBatch, commitState any) error { + return fn(ctx, batch, commitState) +} + +type referenced payments.Referenced + +func (i *DefaultIngester) processPaymentBatch(ctx context.Context, batch PaymentBatch) ([]payments.Payment, error) { + allPayments := make([]payments.Payment, 0) + + for _, elem := range batch { + logger := i.logger.WithFields(map[string]any{ + "id": referenced(elem.Referenced), + }) + + var update bson.M + + if elem.Adjustment == nil && elem.Payment == nil { + return nil, errors.New("either adjustment or payment must be provided") + } + + var metadataChanges payments.MetadataChanges + + if elem.Payment != nil { + ret := i.db.Collection(payments.Collection).FindOne( + ctx, + payments.Identifier{ + Referenced: elem.Referenced, + Provider: i.provider, + }) + if ret.Err() != nil && !errors.Is(ret.Err(), mongo.ErrNoDocuments) { + logger.Errorf("Error retrieving payment: %s", ret.Err()) + + return nil, fmt.Errorf("error retrieving payment: %w", ret.Err()) + } + + if ret != nil && ret.Err() == nil { + payment := payments.Payment{} + + err := ret.Decode(&payment) + if err != nil { + return nil, err + } + + metadataChanges = payment.MergeMetadata(elem.Metadata) + + elem.Metadata = metadataChanges.After + } + } + + switch { + case elem.Forward && elem.Adjustment != nil: + update = bson.M{ + "$push": bson.M{ + "adjustments": bson.M{ + "$each": []any{elem.Adjustment}, + "$position": 0, + }, + }, + "$set": bson.M{ + "status": elem.Adjustment.Status, + "raw": elem.Adjustment.Raw, + "date": elem.Adjustment.Date, + }, + } + case elem.Forward && elem.Payment != nil: + update = bson.M{ + "$set": payments.Payment{ + Identifier: payments.Identifier{ + Referenced: elem.Referenced, + Provider: i.provider, + }, + Data: *elem.Payment, + Adjustments: []payments.Adjustment{ + { + Status: elem.Payment.Status, + Amount: elem.Payment.InitialAmount, + Date: elem.Payment.CreatedAt, + Raw: elem.Payment.Raw, + }, + }, + Metadata: elem.Metadata, + }, + } + case !elem.Forward && elem.Adjustment != nil: + update = bson.M{ + "$push": bson.M{ + "adjustments": bson.M{ + "$each": []any{elem.Adjustment}, + }, + }, + "$setOnInsert": bson.M{ + "status": elem.Adjustment.Status, + }, + } + case !elem.Forward && elem.Payment != nil: + update = bson.M{ + "$push": bson.M{ + "adjustments": bson.M{ + "$each": []any{payments.Adjustment{ + Status: elem.Payment.Status, + Amount: elem.Payment.InitialAmount, + Date: elem.Payment.CreatedAt, + Raw: elem.Payment.Raw, + }}, + }, + }, + "$set": bson.M{ + "raw": elem.Payment.Raw, + "createdAt": elem.Payment.CreatedAt, + "scheme": elem.Payment.Scheme, + "initialAmount": elem.Payment.InitialAmount, + "asset": elem.Payment.Asset, + }, + "$setOnInsert": bson.M{ + "status": elem.Payment.Status, + }, + } + } + + data, err := json.Marshal(update) + if err != nil { + panic(err) + } + + logger.WithFields(map[string]interface{}{ + "update": string(data), + }).Debugf("Update payment") + + ret := i.db.Collection(payments.Collection).FindOneAndUpdate( + ctx, + payments.Identifier{ + Referenced: elem.Referenced, + Provider: i.provider, + }, + update, + options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After), + ) + if ret.Err() != nil { + logger.Errorf("Error updating payment: %s", ret.Err()) + + return nil, fmt.Errorf("error updating payment: %w", ret.Err()) + } + + payment := payments.Payment{} + + err = ret.Decode(&payment) + if err != nil { + return nil, err + } + + if metadataChanges.HasChanged() { + logger.WithFields(map[string]interface{}{ + "metadata": payment.Metadata, + }).Debugf("Metadata changed") + + _, err = i.db.Collection(payments.MetadataChangelogCollection).InsertOne(ctx, metadataChanges) + if err != nil { + return nil, err + } + } + + allPayments = append(allPayments, payment) + } + + return allPayments, nil +} + +func filter[T any](objects []T, compareFn func(t T) bool) []T { + ret := make([]T, 0) + + for _, o := range objects { + if compareFn(o) { + ret = append(ret, o) + } + } + + return ret +} + +func (i *DefaultIngester) IngestPayments(ctx context.Context, batch PaymentBatch, commitState any) error { + startingAt := time.Now() + + i.logger.WithFields(map[string]interface{}{ + "size": len(batch), + "startingAt": startingAt, + }).Debugf("Ingest batch") + + err := i.db.Client().UseSession(ctx, func(ctx mongo.SessionContext) error { + var allPayments []payments.Payment + _, err := ctx.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { + var err error + + allPayments, err = i.processPaymentBatch(ctx, batch) + if err != nil { + return nil, err + } + + i.logger.Debugf("Update state") + + _, err = i.db.Collection(payments.TasksCollection).UpdateOne(ctx, map[string]any{ + "provider": i.provider, + "descriptor": i.descriptor, + }, map[string]any{ + "$set": map[string]any{ + "state": commitState, + }, + }, options.Update().SetUpsert(true)) + + return nil, err + }) + allPayments = filter(allPayments, payments.Payment.HasInitialValue) + + if i.publisher != nil { + for _, e := range allPayments { + i.publish(ctx, TopicPayments, NewEventSavedPayment(e.Computed())) + } + } + + return err + }) + if err != nil { + sharedlogging.GetLogger(ctx).Errorf("Error ingesting batch: %s", err) + + return err + } + + endedAt := time.Now() + + i.logger.WithFields(map[string]interface{}{ + "size": len(batch), + "endedAt": endedAt, + "latency": endedAt.Sub(startingAt).String(), + }).Debugf("Batch ingested") + + return nil +} diff --git a/internal/pkg/payments/account.go b/internal/pkg/payments/account.go new file mode 100644 index 000000000..9fae73280 --- /dev/null +++ b/internal/pkg/payments/account.go @@ -0,0 +1,15 @@ +package payments + +type Account struct { + Reference string `json:"reference" bson:"reference"` + Provider string `json:"provider" bson:"provider"` + Type AccountType `json:"type" bson:"type"` +} + +type AccountType string + +const ( + AccountTypeSource AccountType = "source" + AccountTypeTarget AccountType = "target" + AccountTypeUnknown AccountType = "unknown" +) diff --git a/internal/pkg/payments/collections.go b/internal/pkg/payments/collections.go index 041128e3f..9bc79afae 100644 --- a/internal/pkg/payments/collections.go +++ b/internal/pkg/payments/collections.go @@ -5,4 +5,5 @@ const ( ConnectorsCollection = "Connectors" TasksCollection = "Tasks" MetadataChangelogCollection = "MetadataChangelog" + AccountsCollection = "Accounts" ) diff --git a/internal/pkg/payments/payment.go b/internal/pkg/payments/payment.go index 9ff067cc5..e60734b8e 100644 --- a/internal/pkg/payments/payment.go +++ b/internal/pkg/payments/payment.go @@ -50,8 +50,9 @@ const ( ) type Referenced struct { - Reference string `json:"reference" bson:"reference"` - Type string `json:"type" bson:"type"` + Reference string `json:"reference" bson:"reference"` + Accounts []string `json:"accounts" bson:"accounts"` + Type string `json:"type" bson:"type"` } type Identifier struct {