From caf40f2aa295d3fc03f631a8d7c97eb8a5ed4e90 Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Thu, 25 Aug 2022 16:36:10 +0200 Subject: [PATCH 01/12] chore: first --- .golangci.yml | 24 --- go.mod | 78 +++++----- go.sum | 170 +++++++++++----------- pkg/api/http.go | 16 +- pkg/bridge/cdi/module.go | 18 +-- pkg/bridge/connectors/example/example.go | 6 +- pkg/bridge/connectors/stripe/translate.go | 98 ++++++------- pkg/bridge/http/api.go | 18 +-- pkg/bridge/ingestion/ingester.go | 55 +++---- pkg/bridge/ingestion/ingester_test.go | 12 +- pkg/bridge/integration/connector.go | 10 +- pkg/bridge/integration/loader.go | 12 +- pkg/bridge/integration/manager.go | 18 +-- pkg/bridge/integration/manager_test.go | 39 ++--- pkg/bridge/integration/store.go | 26 ++-- pkg/bridge/task/context.go | 8 +- pkg/bridge/task/scheduler.go | 44 +++--- pkg/bridge/task/scheduler_test.go | 29 ++-- pkg/bridge/task/store.go | 104 ++++++------- pkg/bridge/writeonly/storage.go | 8 +- pkg/{ => core}/collections.go | 2 +- pkg/{ => core}/config.go | 2 +- pkg/{ => core}/connector.go | 2 +- pkg/{ => core}/payment.go | 2 +- pkg/{ => core}/payment_test.go | 2 +- pkg/{ => core}/task.go | 2 +- pkg/database/indexes.go | 4 +- 27 files changed, 397 insertions(+), 412 deletions(-) rename pkg/{ => core}/collections.go (86%) rename pkg/{ => core}/config.go (90%) rename pkg/{ => core}/connector.go (92%) rename pkg/{ => core}/payment.go (99%) rename pkg/{ => core}/payment_test.go (97%) rename pkg/{ => core}/task.go (98%) diff --git a/.golangci.yml b/.golangci.yml index b82a41075..e6e416e7a 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,32 +1,8 @@ -linters-settings: - goimports: - local-prefixes: github.com/golangci/golangci-lint - govet: - check-shadowing: false - - nolintlint: - allow-leading-space: true # don't require machine-readable nolint directives (i.e. with no leading space) - allow-unused: false # report any unused nolint directives - require-explanation: false # don't require an explanation for nolint directives - require-specific: false # don't require nolint directives to be specific about which linter is being skipped - linters: - disable-all: true enable: - - deadcode #Default linter - - errcheck #Default linter - - gosimple #Default linter - - govet #Default linter - - ineffassign #Default linter - - staticcheck #Default linter - - structcheck #Default linter - - typecheck #Default linter - - unused #Default linter - - varcheck #Default linter - gofmt - gci - goimports run: timeout: 5m - go: '1.18' diff --git a/go.mod b/go.mod index 0df153fdf..64ca92ccd 100644 --- a/go.mod +++ b/go.mod @@ -3,14 +3,15 @@ module github.com/numary/payments go 1.18 require ( - github.com/Shopify/sarama v1.35.0 + github.com/Shopify/sarama v1.36.0 github.com/ThreeDotsLabs/watermill v1.1.1 github.com/bombsimon/logrusr/v3 v3.0.0 github.com/davecgh/go-spew v1.1.1 github.com/gibson042/canonicaljson-go v1.0.3 github.com/gorilla/mux v1.8.0 github.com/iancoleman/strcase v0.2.0 - github.com/numary/go-libs v0.0.0-20220614122310-19e6f94d07c3 + github.com/numary/go-libs v0.0.0-20220801164020-fc3e3280ca13 + github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13 github.com/ory/dockertest v3.3.5+incompatible github.com/pborman/uuid v1.2.1 github.com/pkg/errors v0.9.1 @@ -19,16 +20,16 @@ require ( github.com/spf13/cobra v1.5.0 github.com/spf13/viper v1.12.0 github.com/stretchr/testify v1.8.0 - github.com/stripe/stripe-go/v72 v72.120.0 - github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.14 + github.com/stripe/stripe-go/v72 v72.122.0 + github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.15 github.com/xdg-go/scram v1.1.1 - go.mongodb.org/mongo-driver v1.10.0 - go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.33.0 - go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.33.0 - go.opentelemetry.io/otel v1.8.0 - go.uber.org/dig v1.14.1 - go.uber.org/fx v1.17.1 - golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 + go.mongodb.org/mongo-driver v1.10.1 + go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0 + go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.34.0 + go.opentelemetry.io/otel v1.9.0 + go.uber.org/dig v1.15.0 + go.uber.org/fx v1.18.1 + golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde ) require ( @@ -37,6 +38,7 @@ require ( github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 // indirect github.com/ThreeDotsLabs/watermill-http v1.1.4 // indirect github.com/ThreeDotsLabs/watermill-kafka/v2 v2.2.2 // indirect + github.com/ajg/form v1.5.1 // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect @@ -51,7 +53,7 @@ require ( github.com/felixge/httpsnoop v1.0.3 // indirect github.com/fsnotify/fsnotify v1.5.4 // indirect github.com/go-chi/chi v4.1.2+incompatible // indirect - github.com/go-chi/render v1.0.1 // indirect + github.com/go-chi/render v1.0.2 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect @@ -61,16 +63,16 @@ require ( github.com/google/go-cmp v0.5.8 // indirect github.com/google/uuid v1.3.0 // indirect github.com/gotestyourself/gotestyourself v2.2.0+incompatible // indirect - github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.0 // indirect + github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-multierror v1.1.1 // indirect github.com/hashicorp/go-uuid v1.0.3 // indirect github.com/hashicorp/hcl v1.0.0 // indirect - github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/inconshreveable/mousetrap v1.0.1 // indirect github.com/jcmturner/aescts/v2 v2.0.0 // indirect github.com/jcmturner/dnsutils/v2 v2.0.0 // indirect - github.com/jcmturner/gofork v1.1.0 // indirect - github.com/jcmturner/gokrb5/v8 v8.4.2 // indirect + github.com/jcmturner/gofork v1.7.6 // indirect + github.com/jcmturner/gokrb5/v8 v8.4.3 // indirect github.com/jcmturner/rpc/v2 v2.0.3 // indirect github.com/klauspost/compress v1.15.9 // indirect github.com/lib/pq v1.10.5 // indirect @@ -83,7 +85,7 @@ require ( github.com/opencontainers/image-spec v1.0.2 // indirect github.com/opencontainers/runc v1.1.0 // indirect github.com/pelletier/go-toml v1.9.5 // indirect - github.com/pelletier/go-toml/v2 v2.0.2 // indirect + github.com/pelletier/go-toml/v2 v2.0.3 // indirect github.com/pierrec/lz4/v4 v4.1.15 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/rcrowley/go-metrics v0.0.0-20201227073835-cf1acfcdf475 // indirect @@ -91,32 +93,32 @@ require ( github.com/spf13/cast v1.5.0 // indirect github.com/spf13/jwalterweatherman v1.1.0 // indirect github.com/spf13/pflag v1.0.5 // indirect - github.com/subosito/gotenv v1.4.0 // indirect - github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.14 // indirect + github.com/subosito/gotenv v1.4.1 // indirect + github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.15 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.3 // indirect github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect - go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.33.0 // indirect - go.opentelemetry.io/otel/exporters/jaeger v1.8.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.8.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.8.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.8.0 // indirect - go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.8.0 // indirect - go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.8.0 // indirect - go.opentelemetry.io/otel/sdk v1.8.0 // indirect - go.opentelemetry.io/otel/trace v1.8.0 // indirect - go.opentelemetry.io/proto/otlp v0.18.0 // indirect - go.uber.org/atomic v1.9.0 // indirect + go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.34.0 // indirect + go.opentelemetry.io/otel/exporters/jaeger v1.9.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.9.0 // indirect + go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.9.0 // indirect + go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0 // indirect + go.opentelemetry.io/otel/sdk v1.9.0 // indirect + go.opentelemetry.io/otel/trace v1.9.0 // indirect + go.opentelemetry.io/proto/otlp v0.19.0 // indirect + go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect - go.uber.org/zap v1.21.0 // indirect - golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa // indirect - golang.org/x/net v0.0.0-20220722155237-a158d28d115b // indirect - golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect + go.uber.org/zap v1.23.0 // indirect + golang.org/x/crypto v0.0.0-20220824171710-5757bc0c5503 // indirect + golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c // indirect + golang.org/x/sys v0.0.0-20220823224334-20c2bfdbfe24 // indirect golang.org/x/text v0.3.7 // indirect - google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252 // indirect - google.golang.org/grpc v1.48.0 // indirect - google.golang.org/protobuf v1.28.0 // indirect - gopkg.in/ini.v1 v1.66.6 // indirect + google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc // indirect + google.golang.org/grpc v1.49.0 // indirect + google.golang.org/protobuf v1.28.1 // indirect + gopkg.in/ini.v1 v1.67.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect gotest.tools v2.2.0+incompatible // indirect diff --git a/go.sum b/go.sum index 7793ef892..fdcb71dfe 100644 --- a/go.sum +++ b/go.sum @@ -46,8 +46,8 @@ github.com/Microsoft/go-winio v0.5.1/go.mod h1:JPGBdM1cNvN/6ISo+n8V5iA4v8pBzdOpz github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw= github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5/go.mod h1:lmUJ/7eu/Q8D7ML55dXQrVaamCz2vxCfdQBasLZfHKk= github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= -github.com/Shopify/sarama v1.35.0 h1:opEGHcK8s5OpQF99wW0D4ol7A3qUpfSFigrDXnWmOcs= -github.com/Shopify/sarama v1.35.0/go.mod h1:n8obse6Cz5NjjXjKwR1JeYr7CkQn4KG+HENJ8n/T9oQ= +github.com/Shopify/sarama v1.36.0 h1:0OJs3eCcnezkWniVjwBbCJVaa0B1k7ImCRS3WN6NsSk= +github.com/Shopify/sarama v1.36.0/go.mod h1:9glG3eX83tgVYJ5aVtrjVUnEsOPqQIBGx1BWfN+X51I= github.com/Shopify/toxiproxy/v2 v2.4.0 h1:O1e4Jfvr/hefNTNu+8VtdEG5lSeamJRo4aKhMOKNM64= github.com/Shopify/toxiproxy/v2 v2.4.0/go.mod h1:3ilnjng821bkozDRxNoo64oI/DKqM+rOyJzb564+bvg= github.com/ThreeDotsLabs/watermill v1.1.0/go.mod h1:Qd1xNFxolCAHCzcMrm6RnjW0manbvN+DJVWc1MWRFlI= @@ -57,6 +57,8 @@ github.com/ThreeDotsLabs/watermill-http v1.1.4 h1:wRM54z/BPnIWjGbXMrOnwOlrCAESzo github.com/ThreeDotsLabs/watermill-http v1.1.4/go.mod h1:mkQ9CC0pxTZerNwr281rBoOy355vYt/lePkmYSX/BRg= github.com/ThreeDotsLabs/watermill-kafka/v2 v2.2.2 h1:COB5neqVL8jGwoz1Y9dawQ7Xhxid1XXX8+1CI/PebVU= github.com/ThreeDotsLabs/watermill-kafka/v2 v2.2.2/go.mod h1:U001oyrHo+df3Q7hIXgKqxY2OW6woz64+GNuIxZokbM= +github.com/ajg/form v1.5.1 h1:t9c7v8JUKu/XxOGBU0yjNpaMloxGEJhUkqFRq0ibGeU= +github.com/ajg/form v1.5.1/go.mod h1:uL1WgH+h2mgNtvBq0339dVnzXdBETtL2LeUXaIv25UY= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -64,7 +66,6 @@ github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= -github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8= @@ -94,7 +95,6 @@ github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XP github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= -github.com/cncf/xds/go v0.0.0-20211001041855-01bcc9b48dfe/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= github.com/containerd/continuity v0.2.2 h1:QSqfxcn8c+12slxwu00AtzXrsami0MJb/MQs9lOLHLA= @@ -138,7 +138,6 @@ github.com/envoyproxy/go-control-plane v0.9.7/go.mod h1:cwu0lG7PUMfa9snN8LXBig5y github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= -github.com/envoyproxy/go-control-plane v0.10.2-0.20220325020618-49ff273808a1/go.mod h1:KJwIaB5Mv44NWtYuAOFCVOjcI94vtpEz2JU/D2v6IjE= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk= github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= @@ -155,8 +154,9 @@ github.com/gibson042/canonicaljson-go v1.0.3/go.mod h1:DsLpJTThXyGNO+KZlI85C1/KD github.com/go-chi/chi v4.0.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= github.com/go-chi/chi v4.1.2+incompatible h1:fGFk2Gmi/YKXk0OmGfBh0WgmN3XB8lVnEyNz34tQRec= github.com/go-chi/chi v4.1.2+incompatible/go.mod h1:eB3wogJHnLi3x/kFX2A+IbTBlXxmMeXJVKy9tTv1XzQ= -github.com/go-chi/render v1.0.1 h1:4/5tis2cKaNdnv9zFLfXzcquC9HbeZgCnxGnKrltBS8= github.com/go-chi/render v1.0.1/go.mod h1:pq4Rr7HbnsdaeHagklXub+p6Wd16Af5l9koip1OvJns= +github.com/go-chi/render v1.0.2 h1:4ER/udB0+fMWB2Jlf15RV3F4A2FDuYi/9f+lFttR/Lg= +github.com/go-chi/render v1.0.2/go.mod h1:/gr3hVkmYR0YlEy3LxCuVRFzEu9Ruok+gFqbIofjao0= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8= @@ -263,8 +263,8 @@ github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgf github.com/grpc-ecosystem/grpc-gateway v1.9.0/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY= github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= github.com/grpc-ecosystem/grpc-gateway/v2 v2.7.0/go.mod h1:hgWBS7lorOAVIJEQMi4ZsPv9hVvWI6+ch50m39Pf2Ks= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.0 h1:Ghn7copILfeIg0y8sTGRppI1bd8I4l2VN3cob0Xeqwg= -github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.0/go.mod h1:dnjr4snxnhRSn5GWqJUva2AoMbeaxyAcepvc0Tg8lXk= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3 h1:lLT7ZLSzGLI08vc9cpd+tYmNWjdKDqyr/2L+f6U12Fk= +github.com/grpc-ecosystem/grpc-gateway/v2 v2.11.3/go.mod h1:o//XUCC/F+yRGJoPO/VU0GSB0f8Nhgmxx0VIRUvaC0w= github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= @@ -282,19 +282,19 @@ github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHL github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= +github.com/inconshreveable/mousetrap v1.0.1 h1:U3uMjPSQEBMNp1lFxmllqCPM6P5u/Xq7Pgzkat/bFNc= +github.com/inconshreveable/mousetrap v1.0.1/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jcmturner/aescts/v2 v2.0.0 h1:9YKLH6ey7H4eDBXW8khjYslgyqG2xZikXP0EQFKrle8= github.com/jcmturner/aescts/v2 v2.0.0/go.mod h1:AiaICIRyfYg35RUkr8yESTqvSy7csK90qZ5xfvvsoNs= github.com/jcmturner/dnsutils/v2 v2.0.0 h1:lltnkeZGL0wILNvrNiVCR6Ro5PGU/SeBvVO/8c/iPbo= github.com/jcmturner/dnsutils/v2 v2.0.0/go.mod h1:b0TnjGOvI/n42bZa+hmXL+kFJZsFT7G4t3HTlQ184QM= -github.com/jcmturner/gofork v1.0.0/go.mod h1:MK8+TM0La+2rjBD4jE12Kj1pCCxK7d2LK/UM3ncEo0o= -github.com/jcmturner/gofork v1.1.0 h1:+epBuLBi86GK6YOVJ7hsYzqLuZ8nVNf+QwwTabNbh74= -github.com/jcmturner/gofork v1.1.0/go.mod h1:pF30UrBFvmdtUdDY9SGShm2T0ruIQ48yAqqk7uSgIeo= +github.com/jcmturner/gofork v1.7.6 h1:QH0l3hzAU1tfT3rZCnW5zXl+orbkNMMRGJfdJjHVETg= +github.com/jcmturner/gofork v1.7.6/go.mod h1:1622LH6i/EZqLloHfE7IeZ0uEJwMSUyQ/nDd82IeqRo= github.com/jcmturner/goidentity/v6 v6.0.1 h1:VKnZd2oEIMorCTsFBnJWbExfNN7yZr3EhJAxwOkZg6o= github.com/jcmturner/goidentity/v6 v6.0.1/go.mod h1:X1YW3bgtvwAXju7V3LCIMpY0Gbxyjn/mY9zx4tFonSg= -github.com/jcmturner/gokrb5/v8 v8.4.2 h1:6ZIM6b/JJN0X8UM43ZOM6Z4SJzla+a/u7scXFJzodkA= -github.com/jcmturner/gokrb5/v8 v8.4.2/go.mod h1:sb+Xq/fTY5yktf/VxLsE3wlfPqQjp0aWNYyvBVK62bc= +github.com/jcmturner/gokrb5/v8 v8.4.3 h1:iTonLeSJOn7MVUtyMT+arAn5AKAPrkilzhGw8wE/Tq8= +github.com/jcmturner/gokrb5/v8 v8.4.3/go.mod h1:dqRwJGXznQrzw6cWmyo6kH+E7jksEQG/CyVWsJEsJO0= github.com/jcmturner/rpc/v2 v2.0.3 h1:7FXXj8Ti1IaVFpSAziCZWNzbNuZmnvw/i6CqLNdWfZY= github.com/jcmturner/rpc/v2 v2.0.3/go.mod h1:VUJYCIDm3PVOEHw8sgt091/20OJjskO/YJki3ELg/Hc= github.com/jonboulle/clockwork v0.1.0/go.mod h1:Ii8DK3G1RaLaWxj9trq07+26W01tbo22gdxWY5EU2bo= @@ -310,7 +310,6 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.15.8/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -350,8 +349,10 @@ github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/numary/go-libs v0.0.0-20220614122310-19e6f94d07c3 h1:efaq9m7swyS/Hl9xS34B2oS926kO9oFHy3nTYiFuwDg= -github.com/numary/go-libs v0.0.0-20220614122310-19e6f94d07c3/go.mod h1:7StJNTZ3QbU2uBWpOeryPaHD6xMYUN8AWXWTjP67kv0= +github.com/numary/go-libs v0.0.0-20220801164020-fc3e3280ca13 h1:NNdscRfbbhvfPdAreWRf5cUmpw7XzOf8WOMERbwpLVc= +github.com/numary/go-libs v0.0.0-20220801164020-fc3e3280ca13/go.mod h1:Jsfy6yKwxVaBFFuFL9IVYbMKvepElGQYVoWMhFdK0l0= +github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13 h1:axI+XCZ03xg1Pym3MLwvT2Xw4wSZeCuxRbm6lIWheT8= +github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13/go.mod h1:AZTmSrMGpNL9GzacAKwYsbSs+1U2ItR9i074hNILq6o= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= @@ -369,8 +370,8 @@ github.com/pborman/uuid v1.2.1/go.mod h1:X/NO0urCmaxf9VXbdlT7C2Yzkj2IKimNn4k+gtP github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= github.com/pelletier/go-toml v1.9.5 h1:4yBQzkHv+7BHq2PQUZF3Mx0IYxG7LsP222s7Agd3ve8= github.com/pelletier/go-toml v1.9.5/go.mod h1:u1nR/EPcESfeI/szUZKdtJ0xRNbUoANCkoOuaOx1Y+c= -github.com/pelletier/go-toml/v2 v2.0.2 h1:+jQXlF3scKIcSEKkdHzXhCTDLPFi5r1wnK6yPS+49Gw= -github.com/pelletier/go-toml/v2 v2.0.2/go.mod h1:MovirKjgVRESsAvNZlAjtFwV867yGuwRkXbG66OzopI= +github.com/pelletier/go-toml/v2 v2.0.3 h1:h9JoA60e1dVEOpp0PFwJSmt1Htu057NUq9/bUwaO61s= +github.com/pelletier/go-toml/v2 v2.0.3/go.mod h1:OMHamSCAODeSsVrwwvcJOaoN0LIUIaFVNZzmWyNfXas= github.com/pierrec/lz4/v4 v4.1.15 h1:MO0/ucJhngq7299dKLwIMtgTfbkoSPF6AoMYDd8Q4q0= github.com/pierrec/lz4/v4 v4.1.15/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= @@ -454,23 +455,22 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.7.2/go.mod h1:R6va5+xMeoiuVRoj+gSkQ7d3FALtqAAGI1FQKckRals= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= -github.com/stripe/stripe-go/v72 v72.120.0 h1:DQy4dWbzlU6XxMbRtnpwiwHqGnvsIGePcgELMLgBr3w= -github.com/stripe/stripe-go/v72 v72.120.0/go.mod h1:QwqJQtduHubZht9mek5sds9CtQcKFdsykV9ZepRWwo0= -github.com/subosito/gotenv v1.4.0 h1:yAzM1+SmVcz5R4tXGsNMu1jUl2aOJXoiWUCEwwnGrvs= -github.com/subosito/gotenv v1.4.0/go.mod h1:mZd6rFysKEcUhUHXJk0C/08wAgyDBFuwEYL7vWWGaGo= +github.com/stripe/stripe-go/v72 v72.122.0 h1:eRXWqnEwGny6dneQ5BsxGzUCED5n180u8n665JHlut8= +github.com/stripe/stripe-go/v72 v72.122.0/go.mod h1:QwqJQtduHubZht9mek5sds9CtQcKFdsykV9ZepRWwo0= +github.com/subosito/gotenv v1.4.1 h1:jyEFiXpy21Wm81FBN71l9VoMMV8H8jG+qIK3GCpY6Qs= +github.com/subosito/gotenv v1.4.1/go.mod h1:ayKnFf/c6rvx/2iiLrJUk1e6plDbT3edrFNGqEflhK0= github.com/syndtr/gocapability v0.0.0-20200815063812-42c35b437635/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww= github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/tmc/grpc-websocket-proxy v0.0.0-20190109142713-0ad062ec5ee5/go.mod h1:ncp9v5uamzpCO7NfCPTXjqaC+bZgJeR0sMTm6dMHP7U= github.com/tv42/httpunix v0.0.0-20191220191345-2ba4b9c3382c/go.mod h1:hzIxponao9Kjc7aWznkXaL4U4TWaDSs8zcsY4Ka08nM= github.com/ugorji/go v1.1.4/go.mod h1:uQMGLiO92mf5W77hV/PUCpI3pbzQx3CRekS0kk+RGrc= -github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.14 h1:wr4qKY3KUDcltzKTQJ/tOuUhUnY4zbDnHoAq13e+ZnY= -github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.14/go.mod h1:AgwK/cKYxv+JQyaBf1lG5YBA9zdG1EDlqjgrIXJPMc8= -github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.14 h1:vuYn+33yx7KrzmnUH2TYmapR8B8DwO0nRIluc0gRdfE= -github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.14/go.mod h1:vNmY7gwoy9t4RQxhmYHV1Myt4qf2bSMpSgmzFKuoZDg= +github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.15 h1:KSd3yYV2fk04D1cuAGkMiCTbAs59u69Dsq1oUisUzJY= +github.com/uptrace/opentelemetry-go-extra/otellogrus v0.1.15/go.mod h1:h9Fha0tfy6Bo4Zvalrd1c1MiE2QvD8DFxZ5rD1hDJcY= +github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.15 h1:5eYO+onNB1mbdc3+uw6r+yNFFEzqR6sm8sj7zcrjrAs= +github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.15/go.mod h1:6fGFPZDTcvHLxgWTFvf8hHWQrRO1tMXAFlxlqE+c650= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= @@ -490,60 +490,57 @@ github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= -github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= -go.mongodb.org/mongo-driver v1.10.0 h1:UtV6N5k14upNp4LTduX0QCufG124fSu25Wz9tu94GLg= -go.mongodb.org/mongo-driver v1.10.0/go.mod h1:wsihk0Kdgv8Kqu1Anit4sfK+22vSFbUrAVEYRhCXrA8= +go.mongodb.org/mongo-driver v1.10.1 h1:NujsPveKwHaWuKUer/ceo9DzEe7HIj1SlJ6uvXZG0S4= +go.mongodb.org/mongo-driver v1.10.1/go.mod h1:z4XpeoU6w+9Vht+jAFyLgVrD+jGSQQe0+CBWFHNiHt8= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.5/go.mod h1:5pWMHQbX5EPX2/62yrJeAkowc+lfs/XD7Uxpq3pI6kk= -go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.33.0 h1:k+63Wu3pnOvNCFgcMOfjRy8OC5nTgkpJVj3pqYHy3eo= -go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.33.0/go.mod h1:PvPPvQYDT/oI8huqmsGl1IZiqX4jclu1B3FARUnH8/4= -go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.33.0 h1:iXGg04s3HrJ4dhcJXasZK1C81Gg6UbpKNT5gS2rlres= -go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.33.0/go.mod h1:sqJmmInWWqMCphq1GXR9yUMZs5u7aRw/ZM3CKTrpUOw= -go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.33.0 h1:XM8+Uojy+jxM7v56u+DVFFR5jVVSNlQIwV99PB4kg9s= -go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.33.0/go.mod h1:v4XAvjK7zf0u1qpnktGXsx7CxrrhjnGMFb/fRw+4emo= -go.opentelemetry.io/otel v1.8.0 h1:zcvBFizPbpa1q7FehvFiHbQwGzmPILebO0tyqIR5Djg= -go.opentelemetry.io/otel v1.8.0/go.mod h1:2pkj+iMj0o03Y+cW6/m8Y4WkRdYN3AvCXCnzRMp9yvM= -go.opentelemetry.io/otel/exporters/jaeger v1.8.0 h1:TLLqD6kDhLPziEC7pgPrMvP9lAqdk3n1gf8DiFSnfW8= -go.opentelemetry.io/otel/exporters/jaeger v1.8.0/go.mod h1:GbWg+ng88rDtx+id26C34QLqw2erqJeAjsCx9AFeHfE= -go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.8.0 h1:ao8CJIShCaIbaMsGxy+jp2YHSudketpDgDRcbirov78= -go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.8.0/go.mod h1:78XhIg8Ht9vR4tbLNUhXsiOnE2HOuSeKAiAcoVQEpOY= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.8.0 h1:LrHL1A3KqIgAgi6mK7Q0aczmzU414AONAGT5xtnp+uo= -go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.8.0/go.mod h1:w8aZL87GMOvOBa2lU/JlVXE1q4chk/0FX+8ai4513bw= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.8.0 h1:00hCSGLIxdYK/Z7r8GkaX0QIlfvgU3tmnLlQvcnix6U= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.8.0/go.mod h1:twhIvtDQW2sWP1O2cT1N8nkSBgKCRZv2z6COTTBrf8Q= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.8.0 h1:SMO1HopgdAqNRit+WA3w3dcJSGANuH/ihKXDekEHfuY= -go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.8.0/go.mod h1:tsw+QO2+pGo7xOrPXrS27HxW8uqGQkw5AzJwdsoyvgw= -go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.8.0 h1:FVy7BZCjoA2Nk+fHqIdoTmm554J9wTX+YcrDp+mc368= -go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.8.0/go.mod h1:ztncjvKpotSUQq7rlgPibGt8kZfSI3/jI8EO7JjuY2c= -go.opentelemetry.io/otel/sdk v1.8.0 h1:xwu69/fNuwbSHWe/0PGS888RmjWY181OmcXDQKu7ZQk= -go.opentelemetry.io/otel/sdk v1.8.0/go.mod h1:uPSfc+yfDH2StDM/Rm35WE8gXSNdvCg023J6HeGNO0c= -go.opentelemetry.io/otel/trace v1.8.0 h1:cSy0DF9eGI5WIfNwZ1q2iUyGj00tGzP24dE1lOlHrfY= -go.opentelemetry.io/otel/trace v1.8.0/go.mod h1:0Bt3PXY8w+3pheS3hQUt+wow8b1ojPaTBoTCh2zIFI4= +go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.34.0 h1:OA6wSaNHGFLez0ZcXkADui2UmeptQmoe8VUGeTrgAbU= +go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama v0.34.0/go.mod h1:8cfNbNK5aJIRQnqOFGEALF17iZPDym2mmYP+ECIxi8M= +go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0 h1:OkXMRbgldT4yZR7RwB4SFYTjYJGTXwPQVX69pYtTnc4= +go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux v0.34.0/go.mod h1:zMu+r6aEorSQi8Ad0Y1fNrznm+VM8F10D2WlZp3HeFw= +go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.34.0 h1:RBgtLcpCco6JtjXnoY3MstbUjDVWH7ECYn4WmzmlOBI= +go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo v0.34.0/go.mod h1:GIWhiaNpCoQJoD/R1q5GXySIwQIBQ+5pxMbuWhq3X94= +go.opentelemetry.io/otel v1.9.0 h1:8WZNQFIB2a71LnANS9JeyidJKKGOOremcUtb/OtHISw= +go.opentelemetry.io/otel v1.9.0/go.mod h1:np4EoPGzoPs3O67xUVNoPPcmSvsfOxNlNA4F4AC+0Eo= +go.opentelemetry.io/otel/exporters/jaeger v1.9.0 h1:gAEgEVGDWwFjcis9jJTOJqZNxDzoZfR12WNIxr7g9Ww= +go.opentelemetry.io/otel/exporters/jaeger v1.9.0/go.mod h1:hquezOLVAybNW6vanIxkdLXTXvzlj2Vn3wevSP15RYs= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0 h1:ggqApEjDKczicksfvZUCxuvoyDmR6Sbm56LwiK8DVR0= +go.opentelemetry.io/otel/exporters/otlp/internal/retry v1.9.0/go.mod h1:78XhIg8Ht9vR4tbLNUhXsiOnE2HOuSeKAiAcoVQEpOY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0 h1:NN90Cuna0CnBg8YNu1Q0V35i2E8LDByFOwHRCq/ZP9I= +go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.9.0/go.mod h1:0EsCXjZAiiZGnLdEUXM9YjCKuuLZMYyglh2QDXcYKVA= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.9.0 h1:M0/hqGuJBLeIEu20f89H74RGtqV2dn+SFWEz9ATAAwY= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.9.0/go.mod h1:K5G92gbtCrYJ0mn6zj9Pst7YFsDFuvSYEhYKRMcufnM= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.9.0 h1:FAF9l8Wjxi9Ad2k/vLTfHZyzXYX72C62wBGpV3G6AIo= +go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.9.0/go.mod h1:smUdtylgc0YQiUr2PuifS4hBXhAS5xtR6WQhxP1wiNA= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0 h1:0uV0qzHk48i1SF8qRI8odMYiwPOLh9gBhiJFpj8H6JY= +go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.9.0/go.mod h1:Fl1iS5ZhWgXXXTdJMuBSVsS5nkL5XluHbg97kjOuYU4= +go.opentelemetry.io/otel/sdk v1.9.0 h1:LNXp1vrr83fNXTHgU8eO89mhzxb/bbWAsHG6fNf3qWo= +go.opentelemetry.io/otel/sdk v1.9.0/go.mod h1:AEZc8nt5bd2F7BC24J5R0mrjYnpEgYHyTcM/vrSple4= +go.opentelemetry.io/otel/trace v1.9.0 h1:oZaCNJUjWcg60VXWee8lJKlqhPbXAPB51URuR47pQYc= +go.opentelemetry.io/otel/trace v1.9.0/go.mod h1:2737Q0MuG8q1uILYm2YYVkAyLtOofiTNGg6VODnOiPo= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= -go.opentelemetry.io/proto/otlp v0.18.0 h1:W5hyXNComRa23tGpKwG+FRAc4rfF6ZUg1JReK+QHS80= -go.opentelemetry.io/proto/otlp v0.18.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= +go.opentelemetry.io/proto/otlp v0.19.0 h1:IVN6GR+mhC4s5yfcTbmzHYODqvWAp3ZedA2SJPI1Nnw= +go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= -go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/dig v1.14.1 h1:fyakRgZDdi2F8FgwJJoRGangMSPTIxPSLGzR3Oh0/54= -go.uber.org/dig v1.14.1/go.mod h1:52EKx/Vjdpz9EzeNcweC4YMsTrDdFn9mS/+Uw5ZnVTI= -go.uber.org/fx v1.17.1 h1:S42dZ6Pok8hQ3jxKwo6ZMYcCgHQA/wAS/gnpRa1Pksg= -go.uber.org/fx v1.17.1/go.mod h1:yO7KN5rhlARljyo4LR047AjaV6J+KFzd/Z7rnTbEn0A= -go.uber.org/goleak v1.1.11/go.mod h1:cwTWslyiVhfpKIDGSZEM2HlOvcqm+tG4zioyIeLoqMQ= +go.uber.org/atomic v1.10.0 h1:9qC72Qh0+3MqyJbAn8YU5xVq1frD8bn3JtD2oXtafVQ= +go.uber.org/atomic v1.10.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= +go.uber.org/dig v1.15.0 h1:vq3YWr8zRj1eFGC7Gvf907hE0eRjPTZ1d3xHadD6liE= +go.uber.org/dig v1.15.0/go.mod h1:pKHs0wMynzL6brANhB2hLMro+zalv1osARTviTcqHLM= +go.uber.org/fx v1.18.1 h1:I7VWkdv4iKcbpH7KVSi9Fe1LGmpJv+pbBIb9NidPb+E= +go.uber.org/fx v1.18.1/go.mod h1:g0V1KMQ66zIRk8bLu3Ea5Jt2w/cHlOIp4wdRsgh0JaY= go.uber.org/goleak v1.1.12 h1:gZAh5/EyT/HQwlpkCy6wTpqfH9H8Lz8zbm3dZh+OyzA= go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= -go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= go.uber.org/multierr v1.8.0 h1:dg6GjLku4EH+249NNmoIciG9N/jURbDG+pFlTkhzIC8= go.uber.org/multierr v1.8.0/go.mod h1:7EAYxJLBy9rStEaz58O2t4Uvip6FSURkq8/ppBp95ak= go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= -go.uber.org/zap v1.21.0 h1:WefMeulhovoZ2sYXz7st6K0sLj7bBhpiFaud4r4zST8= -go.uber.org/zap v1.21.0/go.mod h1:wjWOCqI0f2ZZrJF/UufIOkiC8ii6tm1iqIsLo76RfJw= +go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= +go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -551,13 +548,12 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= -golang.org/x/crypto v0.0.0-20201112155050-0c6587e931a9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= -golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa h1:zuSxTR4o9y82ebqCUJYNGJbGPo6sKVl54f/TVDObg1c= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= +golang.org/x/crypto v0.0.0-20220824171710-5757bc0c5503 h1:vJ2V3lFLg+bBhgroYuRfyN583UzVveQmIXjc8T/y3to= +golang.org/x/crypto v0.0.0-20220824171710-5757bc0c5503/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -591,7 +587,6 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= -golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= @@ -630,9 +625,10 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.0.0-20220708220712-1185a9018129/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0= -golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= +golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= +golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c h1:JVAXQ10yGGVbSyoer5VILysz6YKjdNT2bsvlayjqhes= +golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -655,8 +651,9 @@ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde h1:ejfdSekXMDxDLbRrJMwUk6KnSLZ2McaUCVcIKM+N6jc= +golang.org/x/sync v0.0.0-20220819030929-7fc1605a5dde/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= @@ -713,10 +710,11 @@ golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220823224334-20c2bfdbfe24 h1:TyKJRhyo17yWxOMCTHKWrc5rddHORMlnZ/j57umaUd8= +golang.org/x/sys v0.0.0-20220823224334-20c2bfdbfe24/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -780,7 +778,6 @@ golang.org/x/tools v0.0.0-20201208233053-a543418bbed2/go.mod h1:emZCQorbCU4vsT4f golang.org/x/tools v0.0.0-20210105154028-b0ab187a4818/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.0.0-20210108195828-e2f9c7f1fc8e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0= -golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -849,8 +846,8 @@ google.golang.org/genproto v0.0.0-20201214200347-8c77b98c765d/go.mod h1:FWY/as6D google.golang.org/genproto v0.0.0-20210108203827-ffc7fda8c3d7/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20210226172003-ab064af71705/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no= google.golang.org/genproto v0.0.0-20211118181313-81c1377c94b1/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= -google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252 h1:G5AjFxR+ibe9Taamo0TdW+iylfBYK10DSkHYdx7PZ9w= -google.golang.org/genproto v0.0.0-20220722212130-b98a9ff5e252/go.mod h1:GkXuJDJ6aQ7lnJcRF+SJVgFdQhypqgl3LB1C9vabdRE= +google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc h1:Nf+EdcTLHR8qDNN/KfkQL0u0ssxt9OhbaWCl5C0ucEI= +google.golang.org/genproto v0.0.0-20220822174746-9e6da59bd2fc/go.mod h1:dbqgFATTzChvnt+ujMdZwITVAJHFtfyN1qUhDqEiIlk= google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= google.golang.org/grpc v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= google.golang.org/grpc v1.21.0/go.mod h1:oYelfM1adQP15Ek0mdvEgi9Df8B9CZIaU1084ijfRaM= @@ -872,8 +869,8 @@ google.golang.org/grpc v1.35.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= google.golang.org/grpc v1.42.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= -google.golang.org/grpc v1.48.0 h1:rQOsyJ/8+ufEDJd/Gdsz7HG220Mh9HAhFHRGnIjda0w= -google.golang.org/grpc v1.48.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk= +google.golang.org/grpc v1.49.0 h1:WTLtQzmQori5FUH25Pq4WT22oCsv8USpQ+F6rqtsmxw= +google.golang.org/grpc v1.49.0/go.mod h1:ZgQEeidpAuNRZ8iRrlBKXZQP1ghovWIVhdJRyCDK+GI= google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= @@ -887,8 +884,8 @@ google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlba google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= -google.golang.org/protobuf v1.28.0 h1:w43yiav+6bVFTBQFZX0r7ipe9JQ1QsbMgHwbBziscLw= -google.golang.org/protobuf v1.28.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.28.1 h1:d0NfwRgPtno5B1Wa6L2DAG+KivqkdutMf1UhdNx175w= +google.golang.org/protobuf v1.28.1/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -896,8 +893,8 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= -gopkg.in/ini.v1 v1.66.6 h1:LATuAqN/shcYAOkv3wl2L4rkaKqkcgTBQjOyYDvcPKI= -gopkg.in/ini.v1 v1.66.6/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= +gopkg.in/ini.v1 v1.67.0 h1:Dgnx+6+nfE+IfzjUEISNeydPJh9AXNNsWbGP9KzCsOA= +gopkg.in/ini.v1 v1.67.0/go.mod h1:pNLf8WUiyNEtQjuu5G5vTm06TEv9tsIgeAvK8hOrP4k= gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.0.0-20170812160011-eb3733d160e7/go.mod h1:JAlM8MvJe8wmxCU4Bli9HhUf9+ttbYbLASfIpnQbh74= @@ -906,7 +903,6 @@ gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.5/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY= gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= diff --git a/pkg/api/http.go b/pkg/api/http.go index 814a9c12f..45d34fc0e 100644 --- a/pkg/api/http.go +++ b/pkg/api/http.go @@ -8,7 +8,7 @@ import ( "github.com/gorilla/mux" "github.com/numary/go-libs/sharedapi" "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/core" . "github.com/numary/payments/pkg/http" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" @@ -102,21 +102,21 @@ func ListPaymentsHandler(db *mongo.Database) http.HandlerFunc { }) } - cursor, err := db.Collection(payments.Collection).Aggregate(r.Context(), pipeline) + cursor, err := db.Collection(core.Collection).Aggregate(r.Context(), pipeline) if err != nil { handleServerError(w, r, err) return } defer cursor.Close(r.Context()) - ret := make([]payments.Payment, 0) + ret := make([]core.Payment, 0) err = cursor.All(r.Context(), &ret) if err != nil { handleServerError(w, r, err) return } - err = json.NewEncoder(w).Encode(sharedapi.BaseResponse[[]payments.Payment]{ + err = json.NewEncoder(w).Encode(sharedapi.BaseResponse[[]core.Payment]{ Data: &ret, }) if err != nil { @@ -131,13 +131,13 @@ func ReadPaymentHandler(db *mongo.Database) http.HandlerFunc { paymentId := mux.Vars(r)["paymentId"] - identifier, err := payments.IdentifierFromString(paymentId) + identifier, err := core.IdentifierFromString(paymentId) if err != nil { w.WriteHeader(http.StatusNotFound) return } - ret := db.Collection(payments.Collection).FindOne(r.Context(), identifier) + ret := db.Collection(core.Collection).FindOne(r.Context(), identifier) if ret.Err() != nil { if ret.Err() == mongo.ErrNoDocuments { w.WriteHeader(http.StatusNotFound) @@ -147,7 +147,7 @@ func ReadPaymentHandler(db *mongo.Database) http.HandlerFunc { return } type Object struct { - Items []payments.Payment `bson:"items"` + Items []core.Payment `bson:"items"` } ob := &Object{} err = ret.Decode(ob) @@ -156,7 +156,7 @@ func ReadPaymentHandler(db *mongo.Database) http.HandlerFunc { return } - err = json.NewEncoder(w).Encode(sharedapi.BaseResponse[payments.Payment]{ + err = json.NewEncoder(w).Encode(sharedapi.BaseResponse[core.Payment]{ Data: &ob.Items[0], }) if err != nil { diff --git a/pkg/bridge/cdi/module.go b/pkg/bridge/cdi/module.go index f97675c13..87c5b8636 100644 --- a/pkg/bridge/cdi/module.go +++ b/pkg/bridge/cdi/module.go @@ -2,29 +2,29 @@ package cdi import ( "context" - http2 "net/http" + "net/http" "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedpublish" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/http" + bridgeHttp "github.com/numary/payments/pkg/bridge/http" "github.com/numary/payments/pkg/bridge/ingestion" "github.com/numary/payments/pkg/bridge/integration" "github.com/numary/payments/pkg/bridge/task" "github.com/numary/payments/pkg/bridge/writeonly" + "github.com/numary/payments/pkg/core" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/dig" "go.uber.org/fx" ) type ConnectorHandler struct { - Handler http2.Handler + Handler http.Handler Name string } func ConnectorModule[ - ConnectorConfig payments.ConnectorConfigObject, - TaskDescriptor payments.TaskDescriptor, + ConnectorConfig core.ConnectorConfigObject, + TaskDescriptor core.TaskDescriptor, ](useScopes bool, loader integration.Loader[ConnectorConfig, TaskDescriptor]) fx.Option { return fx.Options( fx.Provide(func(db *mongo.Database, publisher sharedpublish.Publisher) *integration.ConnectorManager[ConnectorConfig, TaskDescriptor] { @@ -32,11 +32,11 @@ func ConnectorModule[ taskStore := task.NewMongoDBStore[TaskDescriptor](db) logger := sharedlogging.GetLogger(context.Background()) schedulerFactory := integration.TaskSchedulerFactoryFn[TaskDescriptor](func(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] { - return task.NewDefaultScheduler[TaskDescriptor](loader.Name(), logger, taskStore, task.ContainerFactoryFn(func(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) { + return task.NewDefaultScheduler[TaskDescriptor](loader.Name(), logger, taskStore, task.ContainerFactoryFn(func(ctx context.Context, descriptor core.TaskDescriptor) (*dig.Container, error) { container := dig.New() if err := container.Provide(func() ingestion.Ingester { return ingestion.NewDefaultIngester(loader.Name(), descriptor, db, logger.WithFields(map[string]interface{}{ - "task-id": payments.IDFromDescriptor(descriptor), + "task-id": core.IDFromDescriptor(descriptor), }), publisher) }); err != nil { return nil, err @@ -54,7 +54,7 @@ func ConnectorModule[ }), fx.Provide(fx.Annotate(func(cm *integration.ConnectorManager[ConnectorConfig, TaskDescriptor]) ConnectorHandler { return ConnectorHandler{ - Handler: http.ConnectorRouter(loader.Name(), useScopes, cm), + Handler: bridgeHttp.ConnectorRouter(loader.Name(), useScopes, cm), Name: loader.Name(), } }, fx.ResultTags(`group:"connectorHandlers"`))), diff --git a/pkg/bridge/connectors/example/example.go b/pkg/bridge/connectors/example/example.go index 1329279ad..d6e842a7c 100644 --- a/pkg/bridge/connectors/example/example.go +++ b/pkg/bridge/connectors/example/example.go @@ -9,10 +9,10 @@ import ( "time" "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/ingestion" "github.com/numary/payments/pkg/bridge/integration" "github.com/numary/payments/pkg/bridge/task" + "github.com/numary/payments/pkg/core" ) type ( @@ -69,7 +69,7 @@ var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example"). } type JsonPayment struct { - payments.Data + core.Data Reference string `json:"reference"` Type string `json:"type"` } @@ -82,7 +82,7 @@ var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example"). return ingester.Ingest(ctx, ingestion.Batch{ { - Referenced: payments.Referenced{ + Referenced: core.Referenced{ Reference: jsonPayment.Reference, Type: jsonPayment.Type, }, diff --git a/pkg/bridge/connectors/stripe/translate.go b/pkg/bridge/connectors/stripe/translate.go index 6a994abb6..03aa9f177 100644 --- a/pkg/bridge/connectors/stripe/translate.go +++ b/pkg/bridge/connectors/stripe/translate.go @@ -7,8 +7,8 @@ import ( "time" "github.com/davecgh/go-spew/spew" - payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/ingestion" + "github.com/numary/payments/pkg/core" "github.com/stripe/stripe-go/v72" ) @@ -125,9 +125,9 @@ var currencies = map[string]currency{ func CreateBatchElement(bt *stripe.BalanceTransaction, forward bool) (ingestion.BatchElement, bool) { var ( - reference payments.Referenced - paymentData *payments.Data - adjustment *payments.Adjustment + reference core.Referenced + paymentData *core.Data + adjustment *core.Adjustment ) defer func() { // DEBUG @@ -155,99 +155,99 @@ func CreateBatchElement(bt *stripe.BalanceTransaction, forward bool) (ingestion. return fmt.Sprintf("%s/%d", asset, def.decimals) } - convertPayoutStatus := func() (status payments.Status) { + convertPayoutStatus := func() (status core.Status) { switch bt.Source.Payout.Status { case stripe.PayoutStatusCanceled: - status = payments.StatusCancelled + status = core.StatusCancelled case stripe.PayoutStatusFailed: - status = payments.StatusFailed + status = core.StatusFailed case stripe.PayoutStatusInTransit, stripe.PayoutStatusPending: - status = payments.StatusPending + status = core.StatusPending case stripe.PayoutStatusPaid: - status = payments.StatusSucceeded + status = core.StatusSucceeded } return } switch bt.Type { case "charge": - reference = payments.Referenced{ + reference = core.Referenced{ Reference: bt.Source.Charge.ID, - Type: payments.TypePayIn, + Type: core.TypePayIn, } - paymentData = &payments.Data{ - Status: payments.StatusSucceeded, + paymentData = &core.Data{ + Status: core.StatusSucceeded, InitialAmount: bt.Source.Charge.Amount, Asset: formatAsset(bt.Source.Charge.Currency), Raw: bt, - Scheme: payments.Scheme(bt.Source.Charge.PaymentMethodDetails.Card.Brand), + Scheme: core.Scheme(bt.Source.Charge.PaymentMethodDetails.Card.Brand), CreatedAt: time.Unix(bt.Created, 0), } case "payout": - reference = payments.Referenced{ + reference = core.Referenced{ Reference: bt.Source.Payout.ID, - Type: payments.TypePayout, + Type: core.TypePayout, } - paymentData = &payments.Data{ + paymentData = &core.Data{ Status: convertPayoutStatus(), InitialAmount: bt.Source.Payout.Amount, Raw: bt, Asset: formatAsset(bt.Source.Payout.Currency), - Scheme: func() payments.Scheme { + Scheme: func() core.Scheme { switch bt.Source.Payout.Type { case "bank_account": - return payments.SchemeSepaCredit + return core.SchemeSepaCredit case "card": - return payments.Scheme(bt.Source.Payout.Card.Brand) + return core.Scheme(bt.Source.Payout.Card.Brand) } - return payments.SchemeUnknown + return core.SchemeUnknown }(), CreatedAt: time.Unix(bt.Created, 0), } case "transfer": - reference = payments.Referenced{ + reference = core.Referenced{ Reference: bt.Source.Transfer.ID, - Type: payments.TypePayout, + Type: core.TypePayout, } - paymentData = &payments.Data{ - Status: payments.StatusSucceeded, + paymentData = &core.Data{ + Status: core.StatusSucceeded, InitialAmount: bt.Source.Transfer.Amount, Raw: bt, Asset: formatAsset(bt.Source.Transfer.Currency), - Scheme: payments.SchemeOther, + Scheme: core.SchemeOther, CreatedAt: time.Unix(bt.Created, 0), } case "refund": - reference = payments.Referenced{ + reference = core.Referenced{ Reference: bt.Source.Refund.Charge.ID, - Type: payments.TypePayIn, + Type: core.TypePayIn, } - adjustment = &payments.Adjustment{ - Status: payments.StatusSucceeded, + adjustment = &core.Adjustment{ + Status: core.StatusSucceeded, Amount: bt.Amount, Date: time.Unix(bt.Created, 0), Raw: bt, } case "payment": - reference = payments.Referenced{ + reference = core.Referenced{ Reference: bt.Source.Charge.ID, - Type: payments.TypePayIn, + Type: core.TypePayIn, } - paymentData = &payments.Data{ - Status: payments.StatusSucceeded, + paymentData = &core.Data{ + Status: core.StatusSucceeded, InitialAmount: bt.Source.Charge.Amount, Raw: bt, Asset: formatAsset(bt.Source.Charge.Currency), - Scheme: payments.SchemeOther, + Scheme: core.SchemeOther, CreatedAt: time.Unix(bt.Created, 0), } case "payout_cancel": - reference = payments.Referenced{ + reference = core.Referenced{ Reference: bt.Source.Payout.ID, - Type: payments.TypePayout, + Type: core.TypePayout, } - adjustment = &payments.Adjustment{ + adjustment = &core.Adjustment{ Status: convertPayoutStatus(), Amount: 0, Date: time.Unix(bt.Created, 0), @@ -255,11 +255,11 @@ func CreateBatchElement(bt *stripe.BalanceTransaction, forward bool) (ingestion. Absolute: true, } case "payout_failure": - reference = payments.Referenced{ + reference = core.Referenced{ Reference: bt.Source.Payout.ID, - Type: payments.TypePayIn, + Type: core.TypePayIn, } - adjustment = &payments.Adjustment{ + adjustment = &core.Adjustment{ Status: convertPayoutStatus(), Amount: 0, Date: time.Unix(bt.Created, 0), @@ -267,23 +267,23 @@ func CreateBatchElement(bt *stripe.BalanceTransaction, forward bool) (ingestion. Absolute: true, } case "payment_refund": - reference = payments.Referenced{ + reference = core.Referenced{ Reference: bt.Source.Refund.Charge.ID, - Type: payments.TypePayIn, + Type: core.TypePayIn, } - adjustment = &payments.Adjustment{ - Status: payments.StatusSucceeded, + adjustment = &core.Adjustment{ + Status: core.StatusSucceeded, Amount: bt.Amount, Date: time.Unix(bt.Created, 0), Raw: bt, } case "adjustment": - reference = payments.Referenced{ + reference = core.Referenced{ Reference: bt.Source.Dispute.Charge.ID, - Type: payments.TypePayIn, + Type: core.TypePayIn, } - adjustment = &payments.Adjustment{ - Status: payments.StatusCancelled, + adjustment = &core.Adjustment{ + Status: core.StatusCancelled, Amount: bt.Amount, Date: time.Unix(bt.Created, 0), Raw: bt, diff --git a/pkg/bridge/http/api.go b/pkg/bridge/http/api.go index 28136dfc5..b2240d204 100644 --- a/pkg/bridge/http/api.go +++ b/pkg/bridge/http/api.go @@ -9,8 +9,8 @@ import ( "github.com/gorilla/mux" "github.com/numary/go-libs/sharedapi" "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/integration" + "github.com/numary/payments/pkg/core" . "github.com/numary/payments/pkg/http" ) @@ -27,7 +27,7 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) { } } -func ReadConfig[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func ReadConfig[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { config, err := cm.ReadConfig(r.Context()) @@ -43,7 +43,7 @@ func ReadConfig[Config payments.ConnectorConfigObject, Descriptor payments.TaskD } } -func ListTasks[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func ListTasks[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { tasks, err := cm.ListTasksStates(r.Context()) @@ -59,11 +59,11 @@ func ListTasks[Config payments.ConnectorConfigObject, Descriptor payments.TaskDe } } -func ReadTask[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func ReadTask[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var descriptor Descriptor - payments.DescriptorFromID(mux.Vars(r)["taskId"], &descriptor) + core.DescriptorFromID(mux.Vars(r)["taskId"], &descriptor) tasks, err := cm.ReadTaskState(r.Context(), descriptor) if err != nil { @@ -78,7 +78,7 @@ func ReadTask[Config payments.ConnectorConfigObject, Descriptor payments.TaskDes } } -func Uninstall[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func Uninstall[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { err := cm.Uninstall(r.Context()) if err != nil { @@ -90,7 +90,7 @@ func Uninstall[Config payments.ConnectorConfigObject, Descriptor payments.TaskDe } } -func Install[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func Install[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { installed, err := cm.IsInstalled(context.Background()) @@ -122,7 +122,7 @@ func Install[Config payments.ConnectorConfigObject, Descriptor payments.TaskDesc } } -func Reset[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func Reset[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { installed, err := cm.IsInstalled(context.Background()) @@ -145,7 +145,7 @@ func Reset[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescri } } -func ConnectorRouter[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor]( +func ConnectorRouter[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor]( name string, useScopes bool, manager *integration.ConnectorManager[Config, Descriptor], diff --git a/pkg/bridge/ingestion/ingester.go b/pkg/bridge/ingestion/ingester.go index ae565275c..6418df591 100644 --- a/pkg/bridge/ingestion/ingester.go +++ b/pkg/bridge/ingestion/ingester.go @@ -9,26 +9,27 @@ import ( "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedpublish" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/core" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) const ( - PaymentsTopics = "payments" + TopicPayments = "payments" + EventPaymentsSavedPayment = "SAVED_PAYMENT" ) -type Event struct { - Date time.Time `json:"date"` - Type string `json:"type"` - Payload payments.ComputedPayment `json:"payload"` +type EventPaymentsMessage struct { + Date time.Time `json:"date"` + Type string `json:"type"` + Payload core.ComputedPayment `json:"payload"` } type BatchElement struct { - Referenced payments.Referenced - Payment *payments.Data - Adjustment *payments.Adjustment + Referenced core.Referenced + Payment *core.Data + Adjustment *core.Adjustment Forward bool } @@ -53,14 +54,14 @@ type defaultIngester struct { db *mongo.Database logger sharedlogging.Logger provider string - descriptor payments.TaskDescriptor + descriptor core.TaskDescriptor publisher sharedpublish.Publisher } -type referenced payments.Referenced +type referenced core.Referenced -func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]payments.Payment, error) { - allPayments := make([]payments.Payment, 0) +func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]core.Payment, error) { + allPayments := make([]core.Payment, 0) for _, elem := range batch { logger := i.logger.WithFields(map[string]any{ "id": referenced(elem.Referenced), @@ -92,13 +93,13 @@ func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym } case elem.Forward && elem.Payment != nil: update = bson.M{ - "$set": payments.Payment{ - Identifier: payments.Identifier{ + "$set": core.Payment{ + Identifier: core.Identifier{ Referenced: elem.Referenced, Provider: i.provider, }, Data: *elem.Payment, - Adjustments: []payments.Adjustment{ + Adjustments: []core.Adjustment{ { Status: elem.Payment.Status, Amount: elem.Payment.InitialAmount, @@ -123,7 +124,7 @@ func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym update = bson.M{ "$push": bson.M{ "adjustments": bson.M{ - "$each": []any{payments.Adjustment{ + "$each": []any{core.Adjustment{ Status: elem.Payment.Status, Amount: elem.Payment.InitialAmount, Date: elem.Payment.CreatedAt, @@ -151,9 +152,9 @@ func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym logger.WithFields(map[string]interface{}{ "update": string(data), }).Debugf("Update payment") - ret := i.db.Collection(payments.Collection).FindOneAndUpdate( + ret := i.db.Collection(core.Collection).FindOneAndUpdate( ctx, - payments.Identifier{ + core.Identifier{ Referenced: elem.Referenced, Provider: i.provider, }, @@ -164,7 +165,7 @@ func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym logger.Errorf("Error updating payment: %s", ret.Err()) return nil, fmt.Errorf("error updating payment: %s", ret.Err()) } - p := payments.Payment{} + p := core.Payment{} err = ret.Decode(&p) if err != nil { return nil, err @@ -194,7 +195,7 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a }).Debugf("Ingest batch") err := i.db.Client().UseSession(ctx, func(ctx mongo.SessionContext) (err error) { - var allPayments []payments.Payment + var allPayments []core.Payment _, err = ctx.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { allPayments, err = i.processBatch(ctx, batch) if err != nil { @@ -203,7 +204,7 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a i.logger.Debugf("Update state") - _, err = i.db.Collection(payments.TasksCollection).UpdateOne(ctx, map[string]any{ + _, err = i.db.Collection(core.TasksCollection).UpdateOne(ctx, map[string]any{ "provider": i.provider, "descriptor": i.descriptor, }, map[string]any{ @@ -216,13 +217,13 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a } return nil, nil }) - allPayments = Filter(allPayments, payments.Payment.HasInitialValue) + allPayments = Filter(allPayments, core.Payment.HasInitialValue) if i.publisher != nil { for _, e := range allPayments { - err = i.publisher.Publish(ctx, PaymentsTopics, Event{ - Date: time.Now(), - Type: "SAVED_PAYMENT", + err = i.publisher.Publish(ctx, TopicPayments, EventPaymentsMessage{ + Date: time.Now().UTC(), + Type: EventPaymentsSavedPayment, Payload: e.Computed(), }) if err != nil { @@ -251,7 +252,7 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a func NewDefaultIngester( provider string, - descriptor payments.TaskDescriptor, + descriptor core.TaskDescriptor, db *mongo.Database, logger sharedlogging.Logger, publisher sharedpublish.Publisher, diff --git a/pkg/bridge/ingestion/ingester_test.go b/pkg/bridge/ingestion/ingester_test.go index c558da43f..e0a57b0f5 100644 --- a/pkg/bridge/ingestion/ingester_test.go +++ b/pkg/bridge/ingestion/ingester_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/core" "github.com/pborman/uuid" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" @@ -34,14 +34,14 @@ func TestIngester(t *testing.T) { err := ingester.Ingest(context.Background(), Batch{ { - Referenced: payments.Referenced{ + Referenced: core.Referenced{ Reference: "p1", - Type: payments.TypePayIn, + Type: core.TypePayIn, }, - Payment: &payments.Data{ - Status: payments.StatusSucceeded, + Payment: &core.Data{ + Status: core.StatusSucceeded, InitialAmount: 100, - Scheme: payments.SchemeOther, + Scheme: core.SchemeOther, Asset: "USD/2", CreatedAt: time.Now(), }, diff --git a/pkg/bridge/integration/connector.go b/pkg/bridge/integration/connector.go index bc0931006..af8b6aa24 100644 --- a/pkg/bridge/integration/connector.go +++ b/pkg/bridge/integration/connector.go @@ -3,12 +3,12 @@ package integration import ( "context" - payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/task" + "github.com/numary/payments/pkg/core" ) // Connector provide entry point to a payment provider -type Connector[TaskDescriptor payments.TaskDescriptor] interface { +type Connector[TaskDescriptor core.TaskDescriptor] interface { // Install is used to start the connector. The implementation if in charge of scheduling all required resources. Install(ctx task.ConnectorContext[TaskDescriptor]) error // Uninstall is used to uninstall the connector. It has to close all related resources opened by the connector. @@ -17,7 +17,7 @@ type Connector[TaskDescriptor payments.TaskDescriptor] interface { Resolve(descriptor TaskDescriptor) task.Task } -type ConnectorBuilder[TaskDescriptor payments.TaskDescriptor] struct { +type ConnectorBuilder[TaskDescriptor core.TaskDescriptor] struct { name string uninstall func(ctx context.Context) error resolve func(descriptor TaskDescriptor) task.Task @@ -48,11 +48,11 @@ func (b *ConnectorBuilder[TaskDescriptor]) Build() Connector[TaskDescriptor] { } } -func NewConnectorBuilder[TaskDescriptor payments.TaskDescriptor]() *ConnectorBuilder[TaskDescriptor] { +func NewConnectorBuilder[TaskDescriptor core.TaskDescriptor]() *ConnectorBuilder[TaskDescriptor] { return &ConnectorBuilder[TaskDescriptor]{} } -type BuiltConnector[TaskDescriptor payments.TaskDescriptor] struct { +type BuiltConnector[TaskDescriptor core.TaskDescriptor] struct { name string uninstall func(ctx context.Context) error resolve func(name TaskDescriptor) task.Task diff --git a/pkg/bridge/integration/loader.go b/pkg/bridge/integration/loader.go index 8a6d91f19..e95fccf34 100644 --- a/pkg/bridge/integration/loader.go +++ b/pkg/bridge/integration/loader.go @@ -2,10 +2,10 @@ package integration import ( "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/core" ) -type Loader[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] interface { +type Loader[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor] interface { Name() string Load(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] // ApplyDefaults is used to fill default values of the provided configuration object @@ -16,7 +16,7 @@ type Loader[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payme AllowTasks() int } -type LoaderBuilder[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] struct { +type LoaderBuilder[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor] struct { loadFunction func(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] applyDefaults func(t ConnectorConfig) ConnectorConfig name string @@ -47,13 +47,13 @@ func (b *LoaderBuilder[ConnectorConfig, TaskDescriptor]) Build() *BuiltLoader[Co } } -func NewLoaderBuilder[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor](name string) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { +func NewLoaderBuilder[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor](name string) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { return &LoaderBuilder[ConnectorConfig, TaskDescriptor]{ name: name, } } -type BuiltLoader[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] struct { +type BuiltLoader[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor] struct { loadFunction func(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] applyDefaults func(t ConnectorConfig) ConnectorConfig name string @@ -82,4 +82,4 @@ func (b *BuiltLoader[ConnectorConfig, TaskDescriptor]) ApplyDefaults(t Connector return t } -var _ Loader[payments.EmptyConnectorConfig, struct{}] = &BuiltLoader[payments.EmptyConnectorConfig, struct{}]{} +var _ Loader[core.EmptyConnectorConfig, struct{}] = &BuiltLoader[core.EmptyConnectorConfig, struct{}]{} diff --git a/pkg/bridge/integration/manager.go b/pkg/bridge/integration/manager.go index 3a809b406..8605d5dda 100644 --- a/pkg/bridge/integration/manager.go +++ b/pkg/bridge/integration/manager.go @@ -4,8 +4,8 @@ import ( "context" "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/task" + "github.com/numary/payments/pkg/core" "github.com/pkg/errors" ) @@ -17,18 +17,18 @@ var ( ErrAlreadyRunning = errors.New("already running") ) -type TaskSchedulerFactory[TaskDescriptor payments.TaskDescriptor] interface { +type TaskSchedulerFactory[TaskDescriptor core.TaskDescriptor] interface { Make(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] } -type TaskSchedulerFactoryFn[TaskDescriptor payments.TaskDescriptor] func(resolver task.Resolver[TaskDescriptor], maxProcesses int) *task.DefaultTaskScheduler[TaskDescriptor] +type TaskSchedulerFactoryFn[TaskDescriptor core.TaskDescriptor] func(resolver task.Resolver[TaskDescriptor], maxProcesses int) *task.DefaultTaskScheduler[TaskDescriptor] func (fn TaskSchedulerFactoryFn[TaskDescriptor]) Make(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] { return fn(resolver, maxTasks) } type ConnectorManager[ - Config payments.ConnectorConfigObject, - TaskDescriptor payments.TaskDescriptor, + Config core.ConnectorConfigObject, + TaskDescriptor core.TaskDescriptor, ] struct { logger sharedlogging.Logger loader Loader[Config, TaskDescriptor] @@ -195,11 +195,11 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) IsInstalled(ctx cont return l.store.IsInstalled(ctx, l.loader.Name()) } -func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) ListTasksStates(ctx context.Context) ([]payments.TaskState[TaskDescriptor], error) { +func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) ListTasksStates(ctx context.Context) ([]core.TaskState[TaskDescriptor], error) { return l.scheduler.ListTasks(ctx) } -func (l ConnectorManager[Config, TaskDescriptor]) ReadTaskState(ctx context.Context, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { +func (l ConnectorManager[Config, TaskDescriptor]) ReadTaskState(ctx context.Context, descriptor TaskDescriptor) (*core.TaskState[TaskDescriptor], error) { return l.scheduler.ReadTask(ctx, descriptor) } @@ -216,8 +216,8 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Reset(ctx context.Co } func NewConnectorManager[ - ConnectorConfig payments.ConnectorConfigObject, - TaskDescriptor payments.TaskDescriptor, + ConnectorConfig core.ConnectorConfigObject, + TaskDescriptor core.TaskDescriptor, ]( logger sharedlogging.Logger, store ConnectorStore, diff --git a/pkg/bridge/integration/manager_test.go b/pkg/bridge/integration/manager_test.go index 54cd73114..dc527df41 100644 --- a/pkg/bridge/integration/manager_test.go +++ b/pkg/bridge/integration/manager_test.go @@ -5,10 +5,11 @@ import ( "testing" "github.com/numary/go-libs/sharedlogging" - "github.com/numary/go-libs/sharedlogging/sharedloggingtesting" - payments "github.com/numary/payments/pkg" + "github.com/numary/go-libs/sharedlogging/sharedlogginglogrus" "github.com/numary/payments/pkg/bridge/task" + "github.com/numary/payments/pkg/core" "github.com/pborman/uuid" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" ) @@ -21,7 +22,7 @@ func ChanClosed[T any](ch chan T) bool { } } -type testContext[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] struct { +type testContext[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor] struct { manager *ConnectorManager[ConnectorConfig, TaskDescriptor] taskStore task.Store[TaskDescriptor] connectorStore ConnectorStore @@ -29,8 +30,12 @@ type testContext[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor provider string } -func withManager[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor](builder *ConnectorBuilder[TaskDescriptor], callback func(ctx *testContext[ConnectorConfig, TaskDescriptor])) { - logger := sharedloggingtesting.Logger() +func withManager[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor](builder *ConnectorBuilder[TaskDescriptor], callback func(ctx *testContext[ConnectorConfig, TaskDescriptor])) { + l := logrus.New() + if testing.Verbose() { + l.SetLevel(logrus.DebugLevel) + } + logger := sharedlogginglogrus.New(l) taskStore := task.NewInMemoryStore[TaskDescriptor]() managerStore := NewInMemoryStore() provider := uuid.New() @@ -65,12 +70,12 @@ func TestInstallConnector(t *testing.T) { close(installed) return nil }) - withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { - err := tc.manager.Install(context.Background(), payments.EmptyConnectorConfig{}) + withManager(builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { + err := tc.manager.Install(context.Background(), core.EmptyConnectorConfig{}) require.NoError(t, err) require.True(t, ChanClosed(installed)) - err = tc.manager.Install(context.Background(), payments.EmptyConnectorConfig{}) + err = tc.manager.Install(context.Background(), core.EmptyConnectorConfig{}) require.Equal(t, ErrAlreadyInstalled, err) }) } @@ -98,8 +103,8 @@ func TestUninstallConnector(t *testing.T) { close(uninstalled) return nil }) - withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { - err := tc.manager.Install(context.Background(), payments.EmptyConnectorConfig{}) + withManager(builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { + err := tc.manager.Install(context.Background(), core.EmptyConnectorConfig{}) require.NoError(t, err) <-taskStarted require.NoError(t, tc.manager.Uninstall(context.Background())) @@ -120,8 +125,8 @@ func TestDisableConnector(t *testing.T) { close(uninstalled) return nil }) - withManager[payments.EmptyConnectorConfig, any](builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { - err := tc.manager.Install(context.Background(), payments.EmptyConnectorConfig{}) + withManager[core.EmptyConnectorConfig, any](builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { + err := tc.manager.Install(context.Background(), core.EmptyConnectorConfig{}) require.NoError(t, err) enabled, err := tc.manager.IsEnabled(context.Background()) @@ -137,19 +142,19 @@ func TestDisableConnector(t *testing.T) { func TestEnableConnector(t *testing.T) { builder := NewConnectorBuilder[any]() - withManager[payments.EmptyConnectorConfig, any](builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { + withManager[core.EmptyConnectorConfig, any](builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { err := tc.connectorStore.Enable(context.Background(), tc.loader.Name()) require.NoError(t, err) - err = tc.manager.Install(context.Background(), payments.EmptyConnectorConfig{}) + err = tc.manager.Install(context.Background(), core.EmptyConnectorConfig{}) require.NoError(t, err) }) } func TestRestoreEnabledConnector(t *testing.T) { builder := NewConnectorBuilder[any]() - withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { - err := tc.connectorStore.Install(context.Background(), tc.loader.Name(), payments.EmptyConnectorConfig{}) + withManager(builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { + err := tc.connectorStore.Install(context.Background(), tc.loader.Name(), core.EmptyConnectorConfig{}) require.NoError(t, err) err = tc.manager.Restore(context.Background()) @@ -160,7 +165,7 @@ func TestRestoreEnabledConnector(t *testing.T) { func TestRestoreNotInstalledConnector(t *testing.T) { builder := NewConnectorBuilder[any]() - withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { + withManager(builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { err := tc.manager.Restore(context.Background()) require.Equal(t, ErrNotInstalled, err) }) diff --git a/pkg/bridge/integration/store.go b/pkg/bridge/integration/store.go index 5642d1cdf..b132e6d19 100644 --- a/pkg/bridge/integration/store.go +++ b/pkg/bridge/integration/store.go @@ -4,7 +4,7 @@ import ( "context" "reflect" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/core" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -96,19 +96,19 @@ type mongodbConnectorStore struct { func (m *mongodbConnectorStore) Uninstall(ctx context.Context, name string) error { return m.db.Client().UseSession(ctx, func(ctx mongo.SessionContext) error { _, err := ctx.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { - _, err := m.db.Collection(payments.TasksCollection).DeleteMany(ctx, map[string]any{ + _, err := m.db.Collection(core.TasksCollection).DeleteMany(ctx, map[string]any{ "provider": name, }) if err != nil { return nil, errors.Wrap(err, "deleting tasks") } - _, err = m.db.Collection(payments.Collection).DeleteMany(ctx, map[string]any{ + _, err = m.db.Collection(core.Collection).DeleteMany(ctx, map[string]any{ "provider": name, }) if err != nil { return nil, errors.Wrap(err, "deleting payments") } - _, err = m.db.Collection(payments.ConnectorsCollection).DeleteOne(ctx, map[string]any{ + _, err = m.db.Collection(core.ConnectorsCollection).DeleteOne(ctx, map[string]any{ "provider": name, }) if err != nil { @@ -121,7 +121,7 @@ func (m *mongodbConnectorStore) Uninstall(ctx context.Context, name string) erro } func (m *mongodbConnectorStore) IsInstalled(ctx context.Context, name string) (bool, error) { - ret := m.db.Collection(payments.ConnectorsCollection).FindOne(ctx, map[string]any{ + ret := m.db.Collection(core.ConnectorsCollection).FindOne(ctx, map[string]any{ "provider": name, }) if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { @@ -134,7 +134,7 @@ func (m *mongodbConnectorStore) IsInstalled(ctx context.Context, name string) (b } func (m *mongodbConnectorStore) Install(ctx context.Context, name string, config any) error { - _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ + _, err := m.db.Collection(core.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ "$set": map[string]any{ @@ -145,7 +145,7 @@ func (m *mongodbConnectorStore) Install(ctx context.Context, name string, config } func (m *mongodbConnectorStore) UpdateConfig(ctx context.Context, name string, config any) error { - _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ + _, err := m.db.Collection(core.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ "$set": map[string]any{ @@ -156,7 +156,7 @@ func (m *mongodbConnectorStore) UpdateConfig(ctx context.Context, name string, c } func (m *mongodbConnectorStore) Enable(ctx context.Context, name string) error { - _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ + _, err := m.db.Collection(core.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ "$set": map[string]any{ @@ -167,7 +167,7 @@ func (m *mongodbConnectorStore) Enable(ctx context.Context, name string) error { } func (m *mongodbConnectorStore) Disable(ctx context.Context, name string) error { - _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ + _, err := m.db.Collection(core.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ "$set": map[string]any{ @@ -178,7 +178,7 @@ func (m *mongodbConnectorStore) Disable(ctx context.Context, name string) error } func (m *mongodbConnectorStore) IsEnabled(ctx context.Context, name string) (bool, error) { - ret := m.db.Collection(payments.ConnectorsCollection).FindOne(ctx, map[string]any{ + ret := m.db.Collection(core.ConnectorsCollection).FindOne(ctx, map[string]any{ "provider": name, }) if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { @@ -187,7 +187,7 @@ func (m *mongodbConnectorStore) IsEnabled(ctx context.Context, name string) (boo if ret.Err() == mongo.ErrNoDocuments { return false, ErrNotInstalled } - p := payments.Connector[payments.EmptyConnectorConfig]{} + p := core.Connector[core.EmptyConnectorConfig]{} err := ret.Decode(&p) if err != nil { return false, err @@ -197,7 +197,7 @@ func (m *mongodbConnectorStore) IsEnabled(ctx context.Context, name string) (boo } func (m *mongodbConnectorStore) ReadConfig(ctx context.Context, name string, to interface{}) error { - ret := m.db.Collection(payments.ConnectorsCollection).FindOne(ctx, map[string]any{ + ret := m.db.Collection(core.ConnectorsCollection).FindOne(ctx, map[string]any{ "provider": name, }) if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { @@ -206,7 +206,7 @@ func (m *mongodbConnectorStore) ReadConfig(ctx context.Context, name string, to if ret.Err() == mongo.ErrNoDocuments { return errors.New("not installed") } - p := payments.Connector[bson.Raw]{} + p := core.Connector[bson.Raw]{} err := ret.Decode(&p) if err != nil { return err diff --git a/pkg/bridge/task/context.go b/pkg/bridge/task/context.go index 93a3211a6..38f3390d1 100644 --- a/pkg/bridge/task/context.go +++ b/pkg/bridge/task/context.go @@ -3,15 +3,15 @@ package task import ( "context" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/core" ) -type ConnectorContext[TaskDescriptor payments.TaskDescriptor] interface { +type ConnectorContext[TaskDescriptor core.TaskDescriptor] interface { Context() context.Context Scheduler() Scheduler[TaskDescriptor] } -type connectorContext[TaskDescriptor payments.TaskDescriptor] struct { +type connectorContext[TaskDescriptor core.TaskDescriptor] struct { ctx context.Context scheduler Scheduler[TaskDescriptor] } @@ -23,7 +23,7 @@ func (ctx *connectorContext[TaskDescriptor]) Scheduler() Scheduler[TaskDescripto return ctx.scheduler } -func NewConnectorContext[TaskDescriptor payments.TaskDescriptor](ctx context.Context, scheduler Scheduler[TaskDescriptor]) *connectorContext[TaskDescriptor] { +func NewConnectorContext[TaskDescriptor core.TaskDescriptor](ctx context.Context, scheduler Scheduler[TaskDescriptor]) *connectorContext[TaskDescriptor] { return &connectorContext[TaskDescriptor]{ ctx: ctx, scheduler: scheduler, diff --git a/pkg/bridge/task/scheduler.go b/pkg/bridge/task/scheduler.go index fa6f36f89..89befbdb8 100644 --- a/pkg/bridge/task/scheduler.go +++ b/pkg/bridge/task/scheduler.go @@ -8,7 +8,7 @@ import ( "time" "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/core" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.uber.org/dig" @@ -19,40 +19,40 @@ var ( ErrUnableToResolve = errors.New("unable to resolve task") ) -type Resolver[TaskDescriptor payments.TaskDescriptor] interface { +type Resolver[TaskDescriptor core.TaskDescriptor] interface { Resolve(descriptor TaskDescriptor) Task } -type ResolverFn[TaskDescriptor payments.TaskDescriptor] func(descriptor TaskDescriptor) Task +type ResolverFn[TaskDescriptor core.TaskDescriptor] func(descriptor TaskDescriptor) Task func (fn ResolverFn[TaskDescriptor]) Resolve(descriptor TaskDescriptor) Task { return fn(descriptor) } type ContainerFactory interface { - Create(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) + Create(ctx context.Context, descriptor core.TaskDescriptor) (*dig.Container, error) } -type ContainerFactoryFn func(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) +type ContainerFactoryFn func(ctx context.Context, descriptor core.TaskDescriptor) (*dig.Container, error) -func (fn ContainerFactoryFn) Create(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) { +func (fn ContainerFactoryFn) Create(ctx context.Context, descriptor core.TaskDescriptor) (*dig.Container, error) { return fn(ctx, descriptor) } -var DefaultContainerFactory = ContainerFactoryFn(func(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) { +var DefaultContainerFactory = ContainerFactoryFn(func(ctx context.Context, descriptor core.TaskDescriptor) (*dig.Container, error) { return dig.New(), nil }) -type Scheduler[TaskDescriptor payments.TaskDescriptor] interface { +type Scheduler[TaskDescriptor core.TaskDescriptor] interface { Schedule(p TaskDescriptor, restart bool) error } -type taskHolder[TaskDescriptor payments.TaskDescriptor] struct { +type taskHolder[TaskDescriptor core.TaskDescriptor] struct { descriptor TaskDescriptor cancel func() logger sharedlogging.Logger stopChan StopChan } -type DefaultTaskScheduler[TaskDescriptor payments.TaskDescriptor] struct { +type DefaultTaskScheduler[TaskDescriptor core.TaskDescriptor] struct { provider string logger sharedlogging.Logger store Store[TaskDescriptor] @@ -64,11 +64,11 @@ type DefaultTaskScheduler[TaskDescriptor payments.TaskDescriptor] struct { stopped bool } -func (s *DefaultTaskScheduler[TaskDescriptor]) ListTasks(ctx context.Context) ([]payments.TaskState[TaskDescriptor], error) { +func (s *DefaultTaskScheduler[TaskDescriptor]) ListTasks(ctx context.Context) ([]core.TaskState[TaskDescriptor], error) { return s.store.ListTaskStates(ctx, s.provider) } -func (s *DefaultTaskScheduler[TaskDescriptor]) ReadTask(ctx context.Context, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { +func (s *DefaultTaskScheduler[TaskDescriptor]) ReadTask(ctx context.Context, descriptor TaskDescriptor) (*core.TaskState[TaskDescriptor], error) { return s.store.ReadTaskState(ctx, s.provider, descriptor) } @@ -77,7 +77,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) Schedule(descriptor TaskDescripto s.mu.Lock() defer s.mu.Unlock() - taskId := payments.IDFromDescriptor(descriptor) + taskId := core.IDFromDescriptor(descriptor) if _, ok := s.tasks[taskId]; ok { return ErrAlreadyScheduled } @@ -133,7 +133,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) Shutdown(ctx context.Context) err func (s *DefaultTaskScheduler[TaskDescriptor]) Restore(ctx context.Context) error { - states, err := s.store.ListTaskStatesByStatus(ctx, s.provider, payments.TaskStatusActive) + states, err := s.store.ListTaskStatesByStatus(ctx, s.provider, core.TaskStatusActive) if err != nil { return err } @@ -159,7 +159,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) registerTaskError(ctx context.Con } holder.logger.Errorf("Task terminated with error: %s", taskErr) - err := s.store.UpdateTaskStatus(ctx, s.provider, holder.descriptor, payments.TaskStatusFailed, pe) + err := s.store.UpdateTaskStatus(ctx, s.provider, holder.descriptor, core.TaskStatusFailed, pe) if err != nil { holder.logger.Error("Error updating task status: %s", pe) } @@ -168,7 +168,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) registerTaskError(ctx context.Con func (s *DefaultTaskScheduler[TaskDescriptor]) deleteTask(holder *taskHolder[TaskDescriptor]) { s.mu.Lock() defer s.mu.Unlock() - delete(s.tasks, payments.IDFromDescriptor(holder.descriptor)) + delete(s.tasks, core.IDFromDescriptor(holder.descriptor)) if s.stopped { return @@ -197,12 +197,12 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) deleteTask(holder *taskHolder[Tas type StopChan chan chan struct{} func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescriptor) error { - ps, err := s.store.FindTaskAndUpdateStatus(context.Background(), s.provider, descriptor, payments.TaskStatusActive, "") + ps, err := s.store.FindTaskAndUpdateStatus(context.Background(), s.provider, descriptor, core.TaskStatusActive, "") if err != nil { return errors.Wrap(err, "finding task and update") } - taskId := payments.IDFromDescriptor(descriptor) + taskId := core.IDFromDescriptor(descriptor) logger := s.logger.WithFields(map[string]interface{}{ "task-id": taskId, }) @@ -266,7 +266,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript panic(err) } - s.tasks[payments.IDFromDescriptor(descriptor)] = holder + s.tasks[core.IDFromDescriptor(descriptor)] = holder go func() { logger.Infof("Starting task...") @@ -286,7 +286,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript } logger.Infof("Task terminated with success") - err = s.store.UpdateTaskStatus(ctx, s.provider, descriptor, payments.TaskStatusTerminated, "") + err = s.store.UpdateTaskStatus(ctx, s.provider, descriptor, core.TaskStatusTerminated, "") if err != nil { logger.Error("Error updating task status: %s", err) } @@ -299,12 +299,12 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) stackTask(descriptor TaskDescript "descriptor": descriptor, }).Infof("Stacking task") return s.store.UpdateTaskStatus( - context.Background(), s.provider, descriptor, payments.TaskStatusPending, "") + context.Background(), s.provider, descriptor, core.TaskStatusPending, "") } var _ Scheduler[struct{}] = &DefaultTaskScheduler[struct{}]{} -func NewDefaultScheduler[TaskDescriptor payments.TaskDescriptor]( +func NewDefaultScheduler[TaskDescriptor core.TaskDescriptor]( provider string, logger sharedlogging.Logger, store Store[TaskDescriptor], diff --git a/pkg/bridge/task/scheduler_test.go b/pkg/bridge/task/scheduler_test.go index 7a471de9c..478867f52 100644 --- a/pkg/bridge/task/scheduler_test.go +++ b/pkg/bridge/task/scheduler_test.go @@ -6,13 +6,14 @@ import ( "testing" "time" - "github.com/numary/go-libs/sharedlogging/sharedloggingtesting" - payments "github.com/numary/payments/pkg" + "github.com/numary/go-libs/sharedlogging/sharedlogginglogrus" + "github.com/numary/payments/pkg/core" "github.com/pborman/uuid" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" ) -func TaskTerminatedWithStatus[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor, expectedStatus payments.TaskStatus, errString string) func() bool { +func TaskTerminatedWithStatus[TaskDescriptor core.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor, expectedStatus core.TaskStatus, errString string) func() bool { return func() bool { status, err, ok := store.Result(provider, descriptor) if !ok { @@ -25,24 +26,28 @@ func TaskTerminatedWithStatus[TaskDescriptor payments.TaskDescriptor](store *inM } } -func TaskTerminated[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { - return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusTerminated, "") +func TaskTerminated[TaskDescriptor core.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { + return TaskTerminatedWithStatus(store, provider, descriptor, core.TaskStatusTerminated, "") } -func TaskFailed[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor, errStr string) func() bool { - return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusFailed, errStr) +func TaskFailed[TaskDescriptor core.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor, errStr string) func() bool { + return TaskTerminatedWithStatus(store, provider, descriptor, core.TaskStatusFailed, errStr) } -func TaskPending[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { - return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusPending, "") +func TaskPending[TaskDescriptor core.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { + return TaskTerminatedWithStatus(store, provider, descriptor, core.TaskStatusPending, "") } -func TaskActive[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { - return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusActive, "") +func TaskActive[TaskDescriptor core.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { + return TaskTerminatedWithStatus(store, provider, descriptor, core.TaskStatusActive, "") } func TestTaskScheduler(t *testing.T) { - logger := sharedloggingtesting.Logger() + l := logrus.New() + if testing.Verbose() { + l.SetLevel(logrus.DebugLevel) + } + logger := sharedlogginglogrus.New(l) t.Run("Nominal", func(t *testing.T) { store := NewInMemoryStore[string]() diff --git a/pkg/bridge/task/store.go b/pkg/bridge/task/store.go index 6cbe5907d..8377fa015 100644 --- a/pkg/bridge/task/store.go +++ b/pkg/bridge/task/store.go @@ -6,7 +6,7 @@ import ( "strings" "time" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/core" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -17,29 +17,29 @@ var ( ErrNotFound = errors.New("not found") ) -type Store[TaskDescriptor payments.TaskDescriptor] interface { - UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, err string) error +type Store[TaskDescriptor core.TaskDescriptor] interface { + UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status core.TaskStatus, err string) error FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, - status payments.TaskStatus, err string) (*payments.TaskState[TaskDescriptor], error) - ListTaskStatesByStatus(ctx context.Context, provider string, status payments.TaskStatus) ([]payments.TaskState[TaskDescriptor], error) - ListTaskStates(ctx context.Context, provider string) ([]payments.TaskState[TaskDescriptor], error) - ReadOldestPendingTask(ctx context.Context, provider string) (*payments.TaskState[TaskDescriptor], error) - ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) + status core.TaskStatus, err string) (*core.TaskState[TaskDescriptor], error) + ListTaskStatesByStatus(ctx context.Context, provider string, status core.TaskStatus) ([]core.TaskState[TaskDescriptor], error) + ListTaskStates(ctx context.Context, provider string) ([]core.TaskState[TaskDescriptor], error) + ReadOldestPendingTask(ctx context.Context, provider string) (*core.TaskState[TaskDescriptor], error) + ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*core.TaskState[TaskDescriptor], error) } -type inMemoryStore[TaskDescriptor payments.TaskDescriptor] struct { - statuses map[string]payments.TaskStatus +type inMemoryStore[TaskDescriptor core.TaskDescriptor] struct { + statuses map[string]core.TaskStatus created map[string]time.Time errors map[string]string } -func (s *inMemoryStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { - id := payments.IDFromDescriptor(descriptor) +func (s *inMemoryStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*core.TaskState[TaskDescriptor], error) { + id := core.IDFromDescriptor(descriptor) status, ok := s.statuses[id] if !ok { return nil, ErrNotFound } - return &payments.TaskState[TaskDescriptor]{ + return &core.TaskState[TaskDescriptor]{ Provider: provider, Descriptor: descriptor, Status: status, @@ -49,17 +49,17 @@ func (s *inMemoryStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provi }, nil } -func (s *inMemoryStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provider string) ([]payments.TaskState[TaskDescriptor], error) { - ret := make([]payments.TaskState[TaskDescriptor], 0) +func (s *inMemoryStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provider string) ([]core.TaskState[TaskDescriptor], error) { + ret := make([]core.TaskState[TaskDescriptor], 0) for id, status := range s.statuses { if !strings.HasPrefix(id, fmt.Sprintf("%s/", provider)) { continue } var descriptor TaskDescriptor - payments.DescriptorFromID(id, &descriptor) + core.DescriptorFromID(id, &descriptor) - ret = append(ret, payments.TaskState[TaskDescriptor]{ + ret = append(ret, core.TaskState[TaskDescriptor]{ Provider: provider, Descriptor: descriptor, Status: status, @@ -71,13 +71,13 @@ func (s *inMemoryStore[TaskDescriptor]) ListTaskStates(ctx context.Context, prov return ret, nil } -func (s *inMemoryStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, provider string) (*payments.TaskState[TaskDescriptor], error) { +func (s *inMemoryStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, provider string) (*core.TaskState[TaskDescriptor], error) { var ( oldestDate time.Time oldestId string ) for id, status := range s.statuses { - if status != payments.TaskStatusPending { + if status != core.TaskStatusPending { continue } if oldestDate.IsZero() || s.created[id].Before(oldestDate) { @@ -92,25 +92,25 @@ func (s *inMemoryStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Contex descriptorStr := strings.Split(oldestId, "/")[1] var descriptor TaskDescriptor - payments.DescriptorFromID(descriptorStr, &descriptor) + core.DescriptorFromID(descriptorStr, &descriptor) - return &payments.TaskState[TaskDescriptor]{ + return &core.TaskState[TaskDescriptor]{ Provider: provider, Descriptor: descriptor, - Status: payments.TaskStatusPending, + Status: core.TaskStatusPending, State: nil, CreatedAt: s.created[oldestId], }, nil } -func (s *inMemoryStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, taskStatus payments.TaskStatus) ([]payments.TaskState[TaskDescriptor], error) { +func (s *inMemoryStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, taskStatus core.TaskStatus) ([]core.TaskState[TaskDescriptor], error) { all, err := s.ListTaskStates(ctx, provider) if err != nil { return nil, err } - ret := make([]payments.TaskState[TaskDescriptor], 0) + ret := make([]core.TaskState[TaskDescriptor], 0) for _, v := range all { if v.Status != taskStatus { continue @@ -121,13 +121,13 @@ func (s *inMemoryStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Conte return ret, nil } -func (s *inMemoryStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, taskErr string) (*payments.TaskState[TaskDescriptor], error) { +func (s *inMemoryStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status core.TaskStatus, taskErr string) (*core.TaskState[TaskDescriptor], error) { err := s.UpdateTaskStatus(ctx, provider, descriptor, status, taskErr) if err != nil { return nil, err } - return &payments.TaskState[TaskDescriptor]{ + return &core.TaskState[TaskDescriptor]{ Provider: provider, Descriptor: descriptor, Status: status, @@ -137,8 +137,8 @@ func (s *inMemoryStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Cont }, nil } -func (s *inMemoryStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, err string) error { - taskId := payments.IDFromDescriptor(descriptor) +func (s *inMemoryStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status core.TaskStatus, err string) error { + taskId := core.IDFromDescriptor(descriptor) key := fmt.Sprintf("%s/%s", provider, taskId) s.statuses[key] = status s.errors[key] = err @@ -148,8 +148,8 @@ func (s *inMemoryStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, pr return nil } -func (s *inMemoryStore[TaskDescriptor]) Result(provider string, descriptor payments.TaskDescriptor) (payments.TaskStatus, string, bool) { - taskId := payments.IDFromDescriptor(descriptor) +func (s *inMemoryStore[TaskDescriptor]) Result(provider string, descriptor core.TaskDescriptor) (core.TaskStatus, string, bool) { + taskId := core.IDFromDescriptor(descriptor) key := fmt.Sprintf("%s/%s", provider, taskId) status, ok := s.statuses[key] if !ok { @@ -158,9 +158,9 @@ func (s *inMemoryStore[TaskDescriptor]) Result(provider string, descriptor payme return status, s.errors[key], true } -func NewInMemoryStore[TaskDescriptor payments.TaskDescriptor]() *inMemoryStore[TaskDescriptor] { +func NewInMemoryStore[TaskDescriptor core.TaskDescriptor]() *inMemoryStore[TaskDescriptor] { return &inMemoryStore[TaskDescriptor]{ - statuses: make(map[string]payments.TaskStatus), + statuses: make(map[string]core.TaskStatus), errors: make(map[string]string), created: make(map[string]time.Time), } @@ -168,12 +168,12 @@ func NewInMemoryStore[TaskDescriptor payments.TaskDescriptor]() *inMemoryStore[T var _ Store[struct{}] = &inMemoryStore[struct{}]{} -type mongoDBStore[TaskDescriptor payments.TaskDescriptor] struct { +type mongoDBStore[TaskDescriptor core.TaskDescriptor] struct { db *mongo.Database } -func (m *mongoDBStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { - ret := m.db.Collection(payments.TasksCollection).FindOne(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*core.TaskState[TaskDescriptor], error) { + ret := m.db.Collection(core.TasksCollection).FindOne(ctx, map[string]any{ "provider": provider, "descriptor": descriptor, }) @@ -183,7 +183,7 @@ func (m *mongoDBStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provid } return nil, ret.Err() } - ts := payments.TaskState[TaskDescriptor]{} + ts := core.TaskState[TaskDescriptor]{} err := ret.Decode(&ts) if err != nil { return nil, err @@ -192,10 +192,10 @@ func (m *mongoDBStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provid return &ts, nil } -func (m *mongoDBStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, provider string) (*payments.TaskState[TaskDescriptor], error) { - ret := m.db.Collection(payments.TasksCollection).FindOne(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, provider string) (*core.TaskState[TaskDescriptor], error) { + ret := m.db.Collection(core.TasksCollection).FindOne(ctx, map[string]any{ "provider": provider, - "status": payments.TaskStatusPending, + "status": core.TaskStatusPending, }, options.FindOne().SetSort(bson.M{"createdAt": 1})) if ret.Err() != nil { if ret.Err() == mongo.ErrNoDocuments { @@ -203,7 +203,7 @@ func (m *mongoDBStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context } return nil, ret.Err() } - ps := &payments.TaskState[TaskDescriptor]{} + ps := &core.TaskState[TaskDescriptor]{} err := ret.Decode(ps) if err != nil { return nil, err @@ -212,8 +212,8 @@ func (m *mongoDBStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context return ps, nil } -func (m *mongoDBStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, taskErr string) error { - _, err := m.db.Collection(payments.TasksCollection).UpdateOne(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status core.TaskStatus, taskErr string) error { + _, err := m.db.Collection(core.TasksCollection).UpdateOne(ctx, map[string]any{ "provider": provider, "descriptor": descriptor, }, map[string]any{ @@ -228,8 +228,8 @@ func (m *mongoDBStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, pro return err } -func (m *mongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, taskErr string) (*payments.TaskState[TaskDescriptor], error) { - ret := m.db.Collection(payments.TasksCollection).FindOneAndUpdate(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status core.TaskStatus, taskErr string) (*core.TaskState[TaskDescriptor], error) { + ret := m.db.Collection(core.TasksCollection).FindOneAndUpdate(ctx, map[string]any{ "provider": provider, "descriptor": descriptor, }, map[string]any{ @@ -244,7 +244,7 @@ func (m *mongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Conte if ret.Err() != nil { return nil, errors.Wrap(ret.Err(), "retrieving task") } - ps := &payments.TaskState[TaskDescriptor]{} + ps := &core.TaskState[TaskDescriptor]{} err := ret.Decode(ps) if err != nil { return nil, errors.Wrap(err, "decoding task state") @@ -252,8 +252,8 @@ func (m *mongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Conte return ps, nil } -func (m *mongoDBStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, status payments.TaskStatus) ([]payments.TaskState[TaskDescriptor], error) { - cursor, err := m.db.Collection(payments.TasksCollection).Find(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, status core.TaskStatus) ([]core.TaskState[TaskDescriptor], error) { + cursor, err := m.db.Collection(core.TasksCollection).Find(ctx, map[string]any{ "provider": provider, "status": status, }) @@ -263,7 +263,7 @@ func (m *mongoDBStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Contex if err != nil { return nil, err } - ret := make([]payments.TaskState[TaskDescriptor], 0) + ret := make([]core.TaskState[TaskDescriptor], 0) err = cursor.All(ctx, &ret) if err != nil { return nil, err @@ -272,8 +272,8 @@ func (m *mongoDBStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Contex return ret, nil } -func (m *mongoDBStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provider string) ([]payments.TaskState[TaskDescriptor], error) { - cursor, err := m.db.Collection(payments.TasksCollection).Find(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provider string) ([]core.TaskState[TaskDescriptor], error) { + cursor, err := m.db.Collection(core.TasksCollection).Find(ctx, map[string]any{ "provider": provider, }) if err != nil { @@ -282,7 +282,7 @@ func (m *mongoDBStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provi if err != nil { return nil, err } - ret := make([]payments.TaskState[TaskDescriptor], 0) + ret := make([]core.TaskState[TaskDescriptor], 0) err = cursor.All(ctx, &ret) if err != nil { return nil, err @@ -293,6 +293,6 @@ func (m *mongoDBStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provi var _ Store[struct{}] = &mongoDBStore[struct{}]{} -func NewMongoDBStore[TaskDescriptor payments.TaskDescriptor](db *mongo.Database) *mongoDBStore[TaskDescriptor] { +func NewMongoDBStore[TaskDescriptor core.TaskDescriptor](db *mongo.Database) *mongoDBStore[TaskDescriptor] { return &mongoDBStore[TaskDescriptor]{db: db} } diff --git a/pkg/bridge/writeonly/storage.go b/pkg/bridge/writeonly/storage.go index 05e4d413d..870e055a1 100644 --- a/pkg/bridge/writeonly/storage.go +++ b/pkg/bridge/writeonly/storage.go @@ -4,7 +4,7 @@ import ( "context" "github.com/iancoleman/strcase" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/core" "go.mongodb.org/mongo-driver/mongo" ) @@ -32,7 +32,7 @@ func Write[T any](ctx context.Context, storage Storage, items ...T) error { type MongoDBStorage struct { db *mongo.Database provider string - taskDescriptor payments.TaskDescriptor + taskDescriptor core.TaskDescriptor } func (m *MongoDBStorage) Write(ctx context.Context, items ...any) error { @@ -40,7 +40,7 @@ func (m *MongoDBStorage) Write(ctx context.Context, items ...any) error { for _, i := range items { toSave = append(toSave, Item{ Provider: m.provider, - TaskId: payments.IDFromDescriptor(m.taskDescriptor), + TaskId: core.IDFromDescriptor(m.taskDescriptor), Data: i, }) } @@ -53,7 +53,7 @@ func (m *MongoDBStorage) Write(ctx context.Context, items ...any) error { return nil } -func NewMongoDBStorage(db *mongo.Database, provider string, descriptor payments.TaskDescriptor) *MongoDBStorage { +func NewMongoDBStorage(db *mongo.Database, provider string, descriptor core.TaskDescriptor) *MongoDBStorage { return &MongoDBStorage{ db: db, provider: provider, diff --git a/pkg/collections.go b/pkg/core/collections.go similarity index 86% rename from pkg/collections.go rename to pkg/core/collections.go index d634b3e23..a92544670 100644 --- a/pkg/collections.go +++ b/pkg/core/collections.go @@ -1,4 +1,4 @@ -package payments +package core const Collection = "Payments" const ConnectorsCollection = "Connectors" diff --git a/pkg/config.go b/pkg/core/config.go similarity index 90% rename from pkg/config.go rename to pkg/core/config.go index 6e095ab52..5dc40ea42 100644 --- a/pkg/config.go +++ b/pkg/core/config.go @@ -1,4 +1,4 @@ -package payments +package core type ConnectorConfigObject interface { Validate() error diff --git a/pkg/connector.go b/pkg/core/connector.go similarity index 92% rename from pkg/connector.go rename to pkg/core/connector.go index 71b33fc75..df93f4557 100644 --- a/pkg/connector.go +++ b/pkg/core/connector.go @@ -1,4 +1,4 @@ -package payments +package core type Connector[T ConnectorConfigObject] struct { Provider string `json:"provider" bson:"provider"` diff --git a/pkg/payment.go b/pkg/core/payment.go similarity index 99% rename from pkg/payment.go rename to pkg/core/payment.go index e983b66f9..f43acf057 100644 --- a/pkg/payment.go +++ b/pkg/core/payment.go @@ -1,4 +1,4 @@ -package payments +package core import ( "encoding/base64" diff --git a/pkg/payment_test.go b/pkg/core/payment_test.go similarity index 97% rename from pkg/payment_test.go rename to pkg/core/payment_test.go index 75fd19d8d..de6348170 100644 --- a/pkg/payment_test.go +++ b/pkg/core/payment_test.go @@ -1,4 +1,4 @@ -package payments +package core import ( "testing" diff --git a/pkg/task.go b/pkg/core/task.go similarity index 98% rename from pkg/task.go rename to pkg/core/task.go index 3ba080a51..5a60e96e3 100644 --- a/pkg/task.go +++ b/pkg/core/task.go @@ -1,4 +1,4 @@ -package payments +package core import ( "encoding/base64" diff --git a/pkg/database/indexes.go b/pkg/database/indexes.go index 82836d019..865d0157a 100644 --- a/pkg/database/indexes.go +++ b/pkg/database/indexes.go @@ -5,14 +5,14 @@ import ( "fmt" "reflect" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/pkg/core" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/x/bsonx" ) var indexes = map[string][]mongo.IndexModel{ - payments.Collection: { + core.Collection: { { Keys: bsonx.Doc{ bsonx.Elem{ From ba0c92c3b651511c334e3824fce8a253d8541980 Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Fri, 26 Aug 2022 10:00:58 +0200 Subject: [PATCH 02/12] chore: bump go-libs --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index 64ca92ccd..b882cbafb 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/gibson042/canonicaljson-go v1.0.3 github.com/gorilla/mux v1.8.0 github.com/iancoleman/strcase v0.2.0 - github.com/numary/go-libs v0.0.0-20220801164020-fc3e3280ca13 + github.com/numary/go-libs v0.0.0-20220826075524-cb04b561a84c github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13 github.com/ory/dockertest v3.3.5+incompatible github.com/pborman/uuid v1.2.1 diff --git a/go.sum b/go.sum index fdcb71dfe..e3cc66cc4 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/numary/go-libs v0.0.0-20220801164020-fc3e3280ca13 h1:NNdscRfbbhvfPdAreWRf5cUmpw7XzOf8WOMERbwpLVc= -github.com/numary/go-libs v0.0.0-20220801164020-fc3e3280ca13/go.mod h1:Jsfy6yKwxVaBFFuFL9IVYbMKvepElGQYVoWMhFdK0l0= +github.com/numary/go-libs v0.0.0-20220826075524-cb04b561a84c h1:lJKCT+eWBtLCOcH9wN/osukmmomgUSJYbHEoxLW6UNU= +github.com/numary/go-libs v0.0.0-20220826075524-cb04b561a84c/go.mod h1:/nDz6oCcZUlcH4LsD4mZIMiI2uH8pFBVJJ9Slzz2rTk= github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13 h1:axI+XCZ03xg1Pym3MLwvT2Xw4wSZeCuxRbm6lIWheT8= github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13/go.mod h1:AZTmSrMGpNL9GzacAKwYsbSs+1U2ItR9i074hNILq6o= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= From f522070a5f8a728ceb7427a556d6076c26fa2240 Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Fri, 26 Aug 2022 11:56:59 +0200 Subject: [PATCH 03/12] chore: bump go-libs --- go.mod | 2 +- go.sum | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/go.mod b/go.mod index b882cbafb..971904ff1 100644 --- a/go.mod +++ b/go.mod @@ -10,7 +10,7 @@ require ( github.com/gibson042/canonicaljson-go v1.0.3 github.com/gorilla/mux v1.8.0 github.com/iancoleman/strcase v0.2.0 - github.com/numary/go-libs v0.0.0-20220826075524-cb04b561a84c + github.com/numary/go-libs v1.0.0 github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13 github.com/ory/dockertest v3.3.5+incompatible github.com/pborman/uuid v1.2.1 diff --git a/go.sum b/go.sum index e3cc66cc4..2a701d0b3 100644 --- a/go.sum +++ b/go.sum @@ -349,8 +349,8 @@ github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt github.com/mrunalp/fileutils v0.5.0/go.mod h1:M1WthSahJixYnrXQl/DFQuteStB1weuxD2QJNHXfbSQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= -github.com/numary/go-libs v0.0.0-20220826075524-cb04b561a84c h1:lJKCT+eWBtLCOcH9wN/osukmmomgUSJYbHEoxLW6UNU= -github.com/numary/go-libs v0.0.0-20220826075524-cb04b561a84c/go.mod h1:/nDz6oCcZUlcH4LsD4mZIMiI2uH8pFBVJJ9Slzz2rTk= +github.com/numary/go-libs v1.0.0 h1:IsYJ/xFsYt+UyPCQR+/ZPW9T2fcO+V4Xv8B9g2os8Ac= +github.com/numary/go-libs v1.0.0/go.mod h1:u9XNKBrHJSCwu13s85GkEg4TWfRm2CF3fknav4TTLn4= github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13 h1:axI+XCZ03xg1Pym3MLwvT2Xw4wSZeCuxRbm6lIWheT8= github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13/go.mod h1:AZTmSrMGpNL9GzacAKwYsbSs+1U2ItR9i074hNILq6o= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= From ecbb8d6fa33d435595aed6a309df4571648face1 Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Fri, 26 Aug 2022 12:04:54 +0200 Subject: [PATCH 04/12] chore: add dependabot --- .github/dependabot.yml | 11 +++++++++++ 1 file changed, 11 insertions(+) create mode 100644 .github/dependabot.yml diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 000000000..83a8ab439 --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,11 @@ +version: 2 +updates: + + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + - package-ecosystem: "gomod" + directory: "/" + schedule: + interval: "weekly" From b23fb28f48d3bd21a7088ffb6e2fd2aeb4b628ad Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Fri, 26 Aug 2022 12:07:40 +0200 Subject: [PATCH 05/12] chore: move message models --- pkg/bridge/ingestion/ingester.go | 23 +++++++---------------- pkg/bridge/ingestion/message.go | 18 ++++++++++++++++++ 2 files changed, 25 insertions(+), 16 deletions(-) create mode 100644 pkg/bridge/ingestion/message.go diff --git a/pkg/bridge/ingestion/ingester.go b/pkg/bridge/ingestion/ingester.go index 6418df591..d6c95c887 100644 --- a/pkg/bridge/ingestion/ingester.go +++ b/pkg/bridge/ingestion/ingester.go @@ -15,17 +15,6 @@ import ( "go.mongodb.org/mongo-driver/mongo/options" ) -const ( - TopicPayments = "payments" - EventPaymentsSavedPayment = "SAVED_PAYMENT" -) - -type EventPaymentsMessage struct { - Date time.Time `json:"date"` - Type string `json:"type"` - Payload core.ComputedPayment `json:"payload"` -} - type BatchElement struct { Referenced core.Referenced Payment *core.Data @@ -221,11 +210,13 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a if i.publisher != nil { for _, e := range allPayments { - err = i.publisher.Publish(ctx, TopicPayments, EventPaymentsMessage{ - Date: time.Now().UTC(), - Type: EventPaymentsSavedPayment, - Payload: e.Computed(), - }) + err = i.publisher.Publish(ctx, + TopicPayments, + EventPaymentsMessage{ + Date: time.Now().UTC(), + Type: EventPaymentsSavedPayment, + Payload: e.Computed(), + }) if err != nil { i.logger.Errorf("Error publishing payment: %s", err) } diff --git a/pkg/bridge/ingestion/message.go b/pkg/bridge/ingestion/message.go new file mode 100644 index 000000000..e712d074a --- /dev/null +++ b/pkg/bridge/ingestion/message.go @@ -0,0 +1,18 @@ +package ingestion + +import ( + "time" + + "github.com/numary/payments/pkg/core" +) + +const ( + TopicPayments = "payments" + EventPaymentsSavedPayment = "SAVED_PAYMENT" +) + +type EventPaymentsMessage struct { + Date time.Time `json:"date"` + Type string `json:"type"` + Payload core.ComputedPayment `json:"payload"` +} From 9782f031d37b3afa4122a975224ce6117ea0187e Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Fri, 26 Aug 2022 14:27:33 +0200 Subject: [PATCH 06/12] chore: new method to create kafka message --- pkg/bridge/ingestion/ingester.go | 11 +++-------- pkg/bridge/ingestion/message.go | 21 ++++++++++++++++----- 2 files changed, 19 insertions(+), 13 deletions(-) diff --git a/pkg/bridge/ingestion/ingester.go b/pkg/bridge/ingestion/ingester.go index d6c95c887..40c2435e1 100644 --- a/pkg/bridge/ingestion/ingester.go +++ b/pkg/bridge/ingestion/ingester.go @@ -210,14 +210,9 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a if i.publisher != nil { for _, e := range allPayments { - err = i.publisher.Publish(ctx, - TopicPayments, - EventPaymentsMessage{ - Date: time.Now().UTC(), - Type: EventPaymentsSavedPayment, - Payload: e.Computed(), - }) - if err != nil { + if err = i.publisher.Publish(ctx, TopicPayments, + NewEventPaymentsSavedPayment(SavedPayment(e.Computed())), + ); err != nil { i.logger.Errorf("Error publishing payment: %s", err) } } diff --git a/pkg/bridge/ingestion/message.go b/pkg/bridge/ingestion/message.go index e712d074a..dcdc8501c 100644 --- a/pkg/bridge/ingestion/message.go +++ b/pkg/bridge/ingestion/message.go @@ -7,12 +7,23 @@ import ( ) const ( - TopicPayments = "payments" + TopicPayments = "payments" + EventPaymentsSavedPayment = "SAVED_PAYMENT" ) -type EventPaymentsMessage struct { - Date time.Time `json:"date"` - Type string `json:"type"` - Payload core.ComputedPayment `json:"payload"` +type EventPaymentsMessage[P any] struct { + Date time.Time `json:"date"` + Type string `json:"type"` + Payload P `json:"payload"` +} + +type SavedPayment core.ComputedPayment + +func NewEventPaymentsSavedPayment(payload SavedPayment) EventPaymentsMessage[SavedPayment] { + return EventPaymentsMessage[SavedPayment]{ + Date: time.Now().UTC(), + Type: EventPaymentsSavedPayment, + Payload: payload, + } } From 2945ea5f287c452b569fea7778318fc97ce62b5e Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Fri, 26 Aug 2022 14:54:52 +0200 Subject: [PATCH 07/12] chore: move kafka messages to core pkg --- pkg/bridge/ingestion/ingester.go | 4 ++-- pkg/{bridge/ingestion => core}/message.go | 6 ++---- 2 files changed, 4 insertions(+), 6 deletions(-) rename pkg/{bridge/ingestion => core}/message.go (83%) diff --git a/pkg/bridge/ingestion/ingester.go b/pkg/bridge/ingestion/ingester.go index 40c2435e1..9a52f24a0 100644 --- a/pkg/bridge/ingestion/ingester.go +++ b/pkg/bridge/ingestion/ingester.go @@ -210,8 +210,8 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a if i.publisher != nil { for _, e := range allPayments { - if err = i.publisher.Publish(ctx, TopicPayments, - NewEventPaymentsSavedPayment(SavedPayment(e.Computed())), + if err = i.publisher.Publish(ctx, core.TopicPayments, + core.NewEventPaymentsSavedPayment(core.SavedPayment(e.Computed())), ); err != nil { i.logger.Errorf("Error publishing payment: %s", err) } diff --git a/pkg/bridge/ingestion/message.go b/pkg/core/message.go similarity index 83% rename from pkg/bridge/ingestion/message.go rename to pkg/core/message.go index dcdc8501c..2da3ed698 100644 --- a/pkg/bridge/ingestion/message.go +++ b/pkg/core/message.go @@ -1,9 +1,7 @@ -package ingestion +package core import ( "time" - - "github.com/numary/payments/pkg/core" ) const ( @@ -18,7 +16,7 @@ type EventPaymentsMessage[P any] struct { Payload P `json:"payload"` } -type SavedPayment core.ComputedPayment +type SavedPayment ComputedPayment func NewEventPaymentsSavedPayment(payload SavedPayment) EventPaymentsMessage[SavedPayment] { return EventPaymentsMessage[SavedPayment]{ From 4aa232cae5b313ebf88b1cad6719b43cdff477c8 Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Fri, 26 Aug 2022 18:02:39 +0200 Subject: [PATCH 08/12] chore: update messages --- pkg/core/message.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/core/message.go b/pkg/core/message.go index 2da3ed698..8a2e312ce 100644 --- a/pkg/core/message.go +++ b/pkg/core/message.go @@ -7,13 +7,18 @@ import ( const ( TopicPayments = "payments" - EventPaymentsSavedPayment = "SAVED_PAYMENT" + EventVersion = "v1" + EventApp = "payments" + + EventPaymentsTypeSavedPayment = "SAVED_PAYMENT" ) -type EventPaymentsMessage[P any] struct { +type EventPaymentsMessage[T any] struct { Date time.Time `json:"date"` + App string `json:"app"` + Version string `json:"version"` Type string `json:"type"` - Payload P `json:"payload"` + Payload T `json:"payload"` } type SavedPayment ComputedPayment @@ -21,7 +26,9 @@ type SavedPayment ComputedPayment func NewEventPaymentsSavedPayment(payload SavedPayment) EventPaymentsMessage[SavedPayment] { return EventPaymentsMessage[SavedPayment]{ Date: time.Now().UTC(), - Type: EventPaymentsSavedPayment, + App: EventApp, + Version: EventVersion, + Type: EventPaymentsTypeSavedPayment, Payload: payload, } } From bfc6f836d17d9212703b4513938d1570c0a28a0f Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Mon, 29 Aug 2022 09:35:21 +0200 Subject: [PATCH 09/12] chore: revert moving files --- pkg/api/http.go | 16 ++-- pkg/bridge/cdi/module.go | 10 +-- pkg/bridge/connectors/example/example.go | 6 +- pkg/bridge/connectors/stripe/translate.go | 98 ++++++++++---------- pkg/bridge/http/api.go | 18 ++-- pkg/bridge/ingestion/ingester.go | 42 ++++----- pkg/bridge/ingestion/ingester_test.go | 12 +-- pkg/bridge/integration/connector.go | 10 +-- pkg/bridge/integration/loader.go | 12 +-- pkg/bridge/integration/manager.go | 18 ++-- pkg/bridge/integration/manager_test.go | 30 +++---- pkg/bridge/integration/store.go | 26 +++--- pkg/bridge/task/context.go | 8 +- pkg/bridge/task/scheduler.go | 44 ++++----- pkg/bridge/task/scheduler_test.go | 20 ++--- pkg/bridge/task/store.go | 104 +++++++++++----------- pkg/bridge/writeonly/storage.go | 8 +- pkg/{core => }/collections.go | 2 +- pkg/{core => }/config.go | 2 +- pkg/{core => }/connector.go | 2 +- pkg/database/indexes.go | 4 +- pkg/{core => }/message.go | 2 +- pkg/{core => }/payment.go | 2 +- pkg/{core => }/payment_test.go | 2 +- pkg/{core => }/task.go | 2 +- 25 files changed, 250 insertions(+), 250 deletions(-) rename pkg/{core => }/collections.go (86%) rename pkg/{core => }/config.go (90%) rename pkg/{core => }/connector.go (92%) rename pkg/{core => }/message.go (97%) rename pkg/{core => }/payment.go (99%) rename pkg/{core => }/payment_test.go (97%) rename pkg/{core => }/task.go (98%) diff --git a/pkg/api/http.go b/pkg/api/http.go index 45d34fc0e..814a9c12f 100644 --- a/pkg/api/http.go +++ b/pkg/api/http.go @@ -8,7 +8,7 @@ import ( "github.com/gorilla/mux" "github.com/numary/go-libs/sharedapi" "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" . "github.com/numary/payments/pkg/http" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" @@ -102,21 +102,21 @@ func ListPaymentsHandler(db *mongo.Database) http.HandlerFunc { }) } - cursor, err := db.Collection(core.Collection).Aggregate(r.Context(), pipeline) + cursor, err := db.Collection(payments.Collection).Aggregate(r.Context(), pipeline) if err != nil { handleServerError(w, r, err) return } defer cursor.Close(r.Context()) - ret := make([]core.Payment, 0) + ret := make([]payments.Payment, 0) err = cursor.All(r.Context(), &ret) if err != nil { handleServerError(w, r, err) return } - err = json.NewEncoder(w).Encode(sharedapi.BaseResponse[[]core.Payment]{ + err = json.NewEncoder(w).Encode(sharedapi.BaseResponse[[]payments.Payment]{ Data: &ret, }) if err != nil { @@ -131,13 +131,13 @@ func ReadPaymentHandler(db *mongo.Database) http.HandlerFunc { paymentId := mux.Vars(r)["paymentId"] - identifier, err := core.IdentifierFromString(paymentId) + identifier, err := payments.IdentifierFromString(paymentId) if err != nil { w.WriteHeader(http.StatusNotFound) return } - ret := db.Collection(core.Collection).FindOne(r.Context(), identifier) + ret := db.Collection(payments.Collection).FindOne(r.Context(), identifier) if ret.Err() != nil { if ret.Err() == mongo.ErrNoDocuments { w.WriteHeader(http.StatusNotFound) @@ -147,7 +147,7 @@ func ReadPaymentHandler(db *mongo.Database) http.HandlerFunc { return } type Object struct { - Items []core.Payment `bson:"items"` + Items []payments.Payment `bson:"items"` } ob := &Object{} err = ret.Decode(ob) @@ -156,7 +156,7 @@ func ReadPaymentHandler(db *mongo.Database) http.HandlerFunc { return } - err = json.NewEncoder(w).Encode(sharedapi.BaseResponse[core.Payment]{ + err = json.NewEncoder(w).Encode(sharedapi.BaseResponse[payments.Payment]{ Data: &ob.Items[0], }) if err != nil { diff --git a/pkg/bridge/cdi/module.go b/pkg/bridge/cdi/module.go index 87c5b8636..369821910 100644 --- a/pkg/bridge/cdi/module.go +++ b/pkg/bridge/cdi/module.go @@ -6,12 +6,12 @@ import ( "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedpublish" + payments "github.com/numary/payments/pkg" bridgeHttp "github.com/numary/payments/pkg/bridge/http" "github.com/numary/payments/pkg/bridge/ingestion" "github.com/numary/payments/pkg/bridge/integration" "github.com/numary/payments/pkg/bridge/task" "github.com/numary/payments/pkg/bridge/writeonly" - "github.com/numary/payments/pkg/core" "go.mongodb.org/mongo-driver/mongo" "go.uber.org/dig" "go.uber.org/fx" @@ -23,8 +23,8 @@ type ConnectorHandler struct { } func ConnectorModule[ - ConnectorConfig core.ConnectorConfigObject, - TaskDescriptor core.TaskDescriptor, + ConnectorConfig payments.ConnectorConfigObject, + TaskDescriptor payments.TaskDescriptor, ](useScopes bool, loader integration.Loader[ConnectorConfig, TaskDescriptor]) fx.Option { return fx.Options( fx.Provide(func(db *mongo.Database, publisher sharedpublish.Publisher) *integration.ConnectorManager[ConnectorConfig, TaskDescriptor] { @@ -32,11 +32,11 @@ func ConnectorModule[ taskStore := task.NewMongoDBStore[TaskDescriptor](db) logger := sharedlogging.GetLogger(context.Background()) schedulerFactory := integration.TaskSchedulerFactoryFn[TaskDescriptor](func(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] { - return task.NewDefaultScheduler[TaskDescriptor](loader.Name(), logger, taskStore, task.ContainerFactoryFn(func(ctx context.Context, descriptor core.TaskDescriptor) (*dig.Container, error) { + return task.NewDefaultScheduler[TaskDescriptor](loader.Name(), logger, taskStore, task.ContainerFactoryFn(func(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) { container := dig.New() if err := container.Provide(func() ingestion.Ingester { return ingestion.NewDefaultIngester(loader.Name(), descriptor, db, logger.WithFields(map[string]interface{}{ - "task-id": core.IDFromDescriptor(descriptor), + "task-id": payments.IDFromDescriptor(descriptor), }), publisher) }); err != nil { return nil, err diff --git a/pkg/bridge/connectors/example/example.go b/pkg/bridge/connectors/example/example.go index d6e842a7c..1329279ad 100644 --- a/pkg/bridge/connectors/example/example.go +++ b/pkg/bridge/connectors/example/example.go @@ -9,10 +9,10 @@ import ( "time" "github.com/numary/go-libs/sharedlogging" + payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/ingestion" "github.com/numary/payments/pkg/bridge/integration" "github.com/numary/payments/pkg/bridge/task" - "github.com/numary/payments/pkg/core" ) type ( @@ -69,7 +69,7 @@ var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example"). } type JsonPayment struct { - core.Data + payments.Data Reference string `json:"reference"` Type string `json:"type"` } @@ -82,7 +82,7 @@ var Loader = integration.NewLoaderBuilder[Config, TaskDescriptor]("example"). return ingester.Ingest(ctx, ingestion.Batch{ { - Referenced: core.Referenced{ + Referenced: payments.Referenced{ Reference: jsonPayment.Reference, Type: jsonPayment.Type, }, diff --git a/pkg/bridge/connectors/stripe/translate.go b/pkg/bridge/connectors/stripe/translate.go index 03aa9f177..6a994abb6 100644 --- a/pkg/bridge/connectors/stripe/translate.go +++ b/pkg/bridge/connectors/stripe/translate.go @@ -7,8 +7,8 @@ import ( "time" "github.com/davecgh/go-spew/spew" + payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/ingestion" - "github.com/numary/payments/pkg/core" "github.com/stripe/stripe-go/v72" ) @@ -125,9 +125,9 @@ var currencies = map[string]currency{ func CreateBatchElement(bt *stripe.BalanceTransaction, forward bool) (ingestion.BatchElement, bool) { var ( - reference core.Referenced - paymentData *core.Data - adjustment *core.Adjustment + reference payments.Referenced + paymentData *payments.Data + adjustment *payments.Adjustment ) defer func() { // DEBUG @@ -155,99 +155,99 @@ func CreateBatchElement(bt *stripe.BalanceTransaction, forward bool) (ingestion. return fmt.Sprintf("%s/%d", asset, def.decimals) } - convertPayoutStatus := func() (status core.Status) { + convertPayoutStatus := func() (status payments.Status) { switch bt.Source.Payout.Status { case stripe.PayoutStatusCanceled: - status = core.StatusCancelled + status = payments.StatusCancelled case stripe.PayoutStatusFailed: - status = core.StatusFailed + status = payments.StatusFailed case stripe.PayoutStatusInTransit, stripe.PayoutStatusPending: - status = core.StatusPending + status = payments.StatusPending case stripe.PayoutStatusPaid: - status = core.StatusSucceeded + status = payments.StatusSucceeded } return } switch bt.Type { case "charge": - reference = core.Referenced{ + reference = payments.Referenced{ Reference: bt.Source.Charge.ID, - Type: core.TypePayIn, + Type: payments.TypePayIn, } - paymentData = &core.Data{ - Status: core.StatusSucceeded, + paymentData = &payments.Data{ + Status: payments.StatusSucceeded, InitialAmount: bt.Source.Charge.Amount, Asset: formatAsset(bt.Source.Charge.Currency), Raw: bt, - Scheme: core.Scheme(bt.Source.Charge.PaymentMethodDetails.Card.Brand), + Scheme: payments.Scheme(bt.Source.Charge.PaymentMethodDetails.Card.Brand), CreatedAt: time.Unix(bt.Created, 0), } case "payout": - reference = core.Referenced{ + reference = payments.Referenced{ Reference: bt.Source.Payout.ID, - Type: core.TypePayout, + Type: payments.TypePayout, } - paymentData = &core.Data{ + paymentData = &payments.Data{ Status: convertPayoutStatus(), InitialAmount: bt.Source.Payout.Amount, Raw: bt, Asset: formatAsset(bt.Source.Payout.Currency), - Scheme: func() core.Scheme { + Scheme: func() payments.Scheme { switch bt.Source.Payout.Type { case "bank_account": - return core.SchemeSepaCredit + return payments.SchemeSepaCredit case "card": - return core.Scheme(bt.Source.Payout.Card.Brand) + return payments.Scheme(bt.Source.Payout.Card.Brand) } - return core.SchemeUnknown + return payments.SchemeUnknown }(), CreatedAt: time.Unix(bt.Created, 0), } case "transfer": - reference = core.Referenced{ + reference = payments.Referenced{ Reference: bt.Source.Transfer.ID, - Type: core.TypePayout, + Type: payments.TypePayout, } - paymentData = &core.Data{ - Status: core.StatusSucceeded, + paymentData = &payments.Data{ + Status: payments.StatusSucceeded, InitialAmount: bt.Source.Transfer.Amount, Raw: bt, Asset: formatAsset(bt.Source.Transfer.Currency), - Scheme: core.SchemeOther, + Scheme: payments.SchemeOther, CreatedAt: time.Unix(bt.Created, 0), } case "refund": - reference = core.Referenced{ + reference = payments.Referenced{ Reference: bt.Source.Refund.Charge.ID, - Type: core.TypePayIn, + Type: payments.TypePayIn, } - adjustment = &core.Adjustment{ - Status: core.StatusSucceeded, + adjustment = &payments.Adjustment{ + Status: payments.StatusSucceeded, Amount: bt.Amount, Date: time.Unix(bt.Created, 0), Raw: bt, } case "payment": - reference = core.Referenced{ + reference = payments.Referenced{ Reference: bt.Source.Charge.ID, - Type: core.TypePayIn, + Type: payments.TypePayIn, } - paymentData = &core.Data{ - Status: core.StatusSucceeded, + paymentData = &payments.Data{ + Status: payments.StatusSucceeded, InitialAmount: bt.Source.Charge.Amount, Raw: bt, Asset: formatAsset(bt.Source.Charge.Currency), - Scheme: core.SchemeOther, + Scheme: payments.SchemeOther, CreatedAt: time.Unix(bt.Created, 0), } case "payout_cancel": - reference = core.Referenced{ + reference = payments.Referenced{ Reference: bt.Source.Payout.ID, - Type: core.TypePayout, + Type: payments.TypePayout, } - adjustment = &core.Adjustment{ + adjustment = &payments.Adjustment{ Status: convertPayoutStatus(), Amount: 0, Date: time.Unix(bt.Created, 0), @@ -255,11 +255,11 @@ func CreateBatchElement(bt *stripe.BalanceTransaction, forward bool) (ingestion. Absolute: true, } case "payout_failure": - reference = core.Referenced{ + reference = payments.Referenced{ Reference: bt.Source.Payout.ID, - Type: core.TypePayIn, + Type: payments.TypePayIn, } - adjustment = &core.Adjustment{ + adjustment = &payments.Adjustment{ Status: convertPayoutStatus(), Amount: 0, Date: time.Unix(bt.Created, 0), @@ -267,23 +267,23 @@ func CreateBatchElement(bt *stripe.BalanceTransaction, forward bool) (ingestion. Absolute: true, } case "payment_refund": - reference = core.Referenced{ + reference = payments.Referenced{ Reference: bt.Source.Refund.Charge.ID, - Type: core.TypePayIn, + Type: payments.TypePayIn, } - adjustment = &core.Adjustment{ - Status: core.StatusSucceeded, + adjustment = &payments.Adjustment{ + Status: payments.StatusSucceeded, Amount: bt.Amount, Date: time.Unix(bt.Created, 0), Raw: bt, } case "adjustment": - reference = core.Referenced{ + reference = payments.Referenced{ Reference: bt.Source.Dispute.Charge.ID, - Type: core.TypePayIn, + Type: payments.TypePayIn, } - adjustment = &core.Adjustment{ - Status: core.StatusCancelled, + adjustment = &payments.Adjustment{ + Status: payments.StatusCancelled, Amount: bt.Amount, Date: time.Unix(bt.Created, 0), Raw: bt, diff --git a/pkg/bridge/http/api.go b/pkg/bridge/http/api.go index b2240d204..28136dfc5 100644 --- a/pkg/bridge/http/api.go +++ b/pkg/bridge/http/api.go @@ -9,8 +9,8 @@ import ( "github.com/gorilla/mux" "github.com/numary/go-libs/sharedapi" "github.com/numary/go-libs/sharedlogging" + payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/integration" - "github.com/numary/payments/pkg/core" . "github.com/numary/payments/pkg/http" ) @@ -27,7 +27,7 @@ func handleError(w http.ResponseWriter, r *http.Request, err error) { } } -func ReadConfig[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func ReadConfig[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { config, err := cm.ReadConfig(r.Context()) @@ -43,7 +43,7 @@ func ReadConfig[Config core.ConnectorConfigObject, Descriptor core.TaskDescripto } } -func ListTasks[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func ListTasks[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { tasks, err := cm.ListTasksStates(r.Context()) @@ -59,11 +59,11 @@ func ListTasks[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor } } -func ReadTask[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func ReadTask[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { var descriptor Descriptor - core.DescriptorFromID(mux.Vars(r)["taskId"], &descriptor) + payments.DescriptorFromID(mux.Vars(r)["taskId"], &descriptor) tasks, err := cm.ReadTaskState(r.Context(), descriptor) if err != nil { @@ -78,7 +78,7 @@ func ReadTask[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor] } } -func Uninstall[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func Uninstall[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { err := cm.Uninstall(r.Context()) if err != nil { @@ -90,7 +90,7 @@ func Uninstall[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor } } -func Install[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func Install[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { installed, err := cm.IsInstalled(context.Background()) @@ -122,7 +122,7 @@ func Install[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor]( } } -func Reset[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { +func Reset[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { installed, err := cm.IsInstalled(context.Background()) @@ -145,7 +145,7 @@ func Reset[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor](cm } } -func ConnectorRouter[Config core.ConnectorConfigObject, Descriptor core.TaskDescriptor]( +func ConnectorRouter[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor]( name string, useScopes bool, manager *integration.ConnectorManager[Config, Descriptor], diff --git a/pkg/bridge/ingestion/ingester.go b/pkg/bridge/ingestion/ingester.go index 9a52f24a0..627f754ba 100644 --- a/pkg/bridge/ingestion/ingester.go +++ b/pkg/bridge/ingestion/ingester.go @@ -9,16 +9,16 @@ import ( "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedpublish" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) type BatchElement struct { - Referenced core.Referenced - Payment *core.Data - Adjustment *core.Adjustment + Referenced payments.Referenced + Payment *payments.Data + Adjustment *payments.Adjustment Forward bool } @@ -43,14 +43,14 @@ type defaultIngester struct { db *mongo.Database logger sharedlogging.Logger provider string - descriptor core.TaskDescriptor + descriptor payments.TaskDescriptor publisher sharedpublish.Publisher } -type referenced core.Referenced +type referenced payments.Referenced -func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]core.Payment, error) { - allPayments := make([]core.Payment, 0) +func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]payments.Payment, error) { + allPayments := make([]payments.Payment, 0) for _, elem := range batch { logger := i.logger.WithFields(map[string]any{ "id": referenced(elem.Referenced), @@ -82,13 +82,13 @@ func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]core } case elem.Forward && elem.Payment != nil: update = bson.M{ - "$set": core.Payment{ - Identifier: core.Identifier{ + "$set": payments.Payment{ + Identifier: payments.Identifier{ Referenced: elem.Referenced, Provider: i.provider, }, Data: *elem.Payment, - Adjustments: []core.Adjustment{ + Adjustments: []payments.Adjustment{ { Status: elem.Payment.Status, Amount: elem.Payment.InitialAmount, @@ -113,7 +113,7 @@ func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]core update = bson.M{ "$push": bson.M{ "adjustments": bson.M{ - "$each": []any{core.Adjustment{ + "$each": []any{payments.Adjustment{ Status: elem.Payment.Status, Amount: elem.Payment.InitialAmount, Date: elem.Payment.CreatedAt, @@ -141,9 +141,9 @@ func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]core logger.WithFields(map[string]interface{}{ "update": string(data), }).Debugf("Update payment") - ret := i.db.Collection(core.Collection).FindOneAndUpdate( + ret := i.db.Collection(payments.Collection).FindOneAndUpdate( ctx, - core.Identifier{ + payments.Identifier{ Referenced: elem.Referenced, Provider: i.provider, }, @@ -154,7 +154,7 @@ func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]core logger.Errorf("Error updating payment: %s", ret.Err()) return nil, fmt.Errorf("error updating payment: %s", ret.Err()) } - p := core.Payment{} + p := payments.Payment{} err = ret.Decode(&p) if err != nil { return nil, err @@ -184,7 +184,7 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a }).Debugf("Ingest batch") err := i.db.Client().UseSession(ctx, func(ctx mongo.SessionContext) (err error) { - var allPayments []core.Payment + var allPayments []payments.Payment _, err = ctx.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { allPayments, err = i.processBatch(ctx, batch) if err != nil { @@ -193,7 +193,7 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a i.logger.Debugf("Update state") - _, err = i.db.Collection(core.TasksCollection).UpdateOne(ctx, map[string]any{ + _, err = i.db.Collection(payments.TasksCollection).UpdateOne(ctx, map[string]any{ "provider": i.provider, "descriptor": i.descriptor, }, map[string]any{ @@ -206,12 +206,12 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a } return nil, nil }) - allPayments = Filter(allPayments, core.Payment.HasInitialValue) + allPayments = Filter(allPayments, payments.Payment.HasInitialValue) if i.publisher != nil { for _, e := range allPayments { - if err = i.publisher.Publish(ctx, core.TopicPayments, - core.NewEventPaymentsSavedPayment(core.SavedPayment(e.Computed())), + if err = i.publisher.Publish(ctx, payments.TopicPayments, + payments.NewEventPaymentsSavedPayment(payments.SavedPayment(e.Computed())), ); err != nil { i.logger.Errorf("Error publishing payment: %s", err) } @@ -238,7 +238,7 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a func NewDefaultIngester( provider string, - descriptor core.TaskDescriptor, + descriptor payments.TaskDescriptor, db *mongo.Database, logger sharedlogging.Logger, publisher sharedpublish.Publisher, diff --git a/pkg/bridge/ingestion/ingester_test.go b/pkg/bridge/ingestion/ingester_test.go index e0a57b0f5..c558da43f 100644 --- a/pkg/bridge/ingestion/ingester_test.go +++ b/pkg/bridge/ingestion/ingester_test.go @@ -6,7 +6,7 @@ import ( "time" "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" "github.com/pborman/uuid" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" @@ -34,14 +34,14 @@ func TestIngester(t *testing.T) { err := ingester.Ingest(context.Background(), Batch{ { - Referenced: core.Referenced{ + Referenced: payments.Referenced{ Reference: "p1", - Type: core.TypePayIn, + Type: payments.TypePayIn, }, - Payment: &core.Data{ - Status: core.StatusSucceeded, + Payment: &payments.Data{ + Status: payments.StatusSucceeded, InitialAmount: 100, - Scheme: core.SchemeOther, + Scheme: payments.SchemeOther, Asset: "USD/2", CreatedAt: time.Now(), }, diff --git a/pkg/bridge/integration/connector.go b/pkg/bridge/integration/connector.go index af8b6aa24..bc0931006 100644 --- a/pkg/bridge/integration/connector.go +++ b/pkg/bridge/integration/connector.go @@ -3,12 +3,12 @@ package integration import ( "context" + payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/task" - "github.com/numary/payments/pkg/core" ) // Connector provide entry point to a payment provider -type Connector[TaskDescriptor core.TaskDescriptor] interface { +type Connector[TaskDescriptor payments.TaskDescriptor] interface { // Install is used to start the connector. The implementation if in charge of scheduling all required resources. Install(ctx task.ConnectorContext[TaskDescriptor]) error // Uninstall is used to uninstall the connector. It has to close all related resources opened by the connector. @@ -17,7 +17,7 @@ type Connector[TaskDescriptor core.TaskDescriptor] interface { Resolve(descriptor TaskDescriptor) task.Task } -type ConnectorBuilder[TaskDescriptor core.TaskDescriptor] struct { +type ConnectorBuilder[TaskDescriptor payments.TaskDescriptor] struct { name string uninstall func(ctx context.Context) error resolve func(descriptor TaskDescriptor) task.Task @@ -48,11 +48,11 @@ func (b *ConnectorBuilder[TaskDescriptor]) Build() Connector[TaskDescriptor] { } } -func NewConnectorBuilder[TaskDescriptor core.TaskDescriptor]() *ConnectorBuilder[TaskDescriptor] { +func NewConnectorBuilder[TaskDescriptor payments.TaskDescriptor]() *ConnectorBuilder[TaskDescriptor] { return &ConnectorBuilder[TaskDescriptor]{} } -type BuiltConnector[TaskDescriptor core.TaskDescriptor] struct { +type BuiltConnector[TaskDescriptor payments.TaskDescriptor] struct { name string uninstall func(ctx context.Context) error resolve func(name TaskDescriptor) task.Task diff --git a/pkg/bridge/integration/loader.go b/pkg/bridge/integration/loader.go index e95fccf34..8a6d91f19 100644 --- a/pkg/bridge/integration/loader.go +++ b/pkg/bridge/integration/loader.go @@ -2,10 +2,10 @@ package integration import ( "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" ) -type Loader[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor] interface { +type Loader[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] interface { Name() string Load(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] // ApplyDefaults is used to fill default values of the provided configuration object @@ -16,7 +16,7 @@ type Loader[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.Task AllowTasks() int } -type LoaderBuilder[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor] struct { +type LoaderBuilder[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] struct { loadFunction func(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] applyDefaults func(t ConnectorConfig) ConnectorConfig name string @@ -47,13 +47,13 @@ func (b *LoaderBuilder[ConnectorConfig, TaskDescriptor]) Build() *BuiltLoader[Co } } -func NewLoaderBuilder[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor](name string) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { +func NewLoaderBuilder[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor](name string) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { return &LoaderBuilder[ConnectorConfig, TaskDescriptor]{ name: name, } } -type BuiltLoader[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor] struct { +type BuiltLoader[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] struct { loadFunction func(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] applyDefaults func(t ConnectorConfig) ConnectorConfig name string @@ -82,4 +82,4 @@ func (b *BuiltLoader[ConnectorConfig, TaskDescriptor]) ApplyDefaults(t Connector return t } -var _ Loader[core.EmptyConnectorConfig, struct{}] = &BuiltLoader[core.EmptyConnectorConfig, struct{}]{} +var _ Loader[payments.EmptyConnectorConfig, struct{}] = &BuiltLoader[payments.EmptyConnectorConfig, struct{}]{} diff --git a/pkg/bridge/integration/manager.go b/pkg/bridge/integration/manager.go index 8605d5dda..3a809b406 100644 --- a/pkg/bridge/integration/manager.go +++ b/pkg/bridge/integration/manager.go @@ -4,8 +4,8 @@ import ( "context" "github.com/numary/go-libs/sharedlogging" + payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/task" - "github.com/numary/payments/pkg/core" "github.com/pkg/errors" ) @@ -17,18 +17,18 @@ var ( ErrAlreadyRunning = errors.New("already running") ) -type TaskSchedulerFactory[TaskDescriptor core.TaskDescriptor] interface { +type TaskSchedulerFactory[TaskDescriptor payments.TaskDescriptor] interface { Make(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] } -type TaskSchedulerFactoryFn[TaskDescriptor core.TaskDescriptor] func(resolver task.Resolver[TaskDescriptor], maxProcesses int) *task.DefaultTaskScheduler[TaskDescriptor] +type TaskSchedulerFactoryFn[TaskDescriptor payments.TaskDescriptor] func(resolver task.Resolver[TaskDescriptor], maxProcesses int) *task.DefaultTaskScheduler[TaskDescriptor] func (fn TaskSchedulerFactoryFn[TaskDescriptor]) Make(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] { return fn(resolver, maxTasks) } type ConnectorManager[ - Config core.ConnectorConfigObject, - TaskDescriptor core.TaskDescriptor, + Config payments.ConnectorConfigObject, + TaskDescriptor payments.TaskDescriptor, ] struct { logger sharedlogging.Logger loader Loader[Config, TaskDescriptor] @@ -195,11 +195,11 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) IsInstalled(ctx cont return l.store.IsInstalled(ctx, l.loader.Name()) } -func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) ListTasksStates(ctx context.Context) ([]core.TaskState[TaskDescriptor], error) { +func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) ListTasksStates(ctx context.Context) ([]payments.TaskState[TaskDescriptor], error) { return l.scheduler.ListTasks(ctx) } -func (l ConnectorManager[Config, TaskDescriptor]) ReadTaskState(ctx context.Context, descriptor TaskDescriptor) (*core.TaskState[TaskDescriptor], error) { +func (l ConnectorManager[Config, TaskDescriptor]) ReadTaskState(ctx context.Context, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { return l.scheduler.ReadTask(ctx, descriptor) } @@ -216,8 +216,8 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Reset(ctx context.Co } func NewConnectorManager[ - ConnectorConfig core.ConnectorConfigObject, - TaskDescriptor core.TaskDescriptor, + ConnectorConfig payments.ConnectorConfigObject, + TaskDescriptor payments.TaskDescriptor, ]( logger sharedlogging.Logger, store ConnectorStore, diff --git a/pkg/bridge/integration/manager_test.go b/pkg/bridge/integration/manager_test.go index dc527df41..6809e2469 100644 --- a/pkg/bridge/integration/manager_test.go +++ b/pkg/bridge/integration/manager_test.go @@ -6,8 +6,8 @@ import ( "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedlogging/sharedlogginglogrus" + payments "github.com/numary/payments/pkg" "github.com/numary/payments/pkg/bridge/task" - "github.com/numary/payments/pkg/core" "github.com/pborman/uuid" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -22,7 +22,7 @@ func ChanClosed[T any](ch chan T) bool { } } -type testContext[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor] struct { +type testContext[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] struct { manager *ConnectorManager[ConnectorConfig, TaskDescriptor] taskStore task.Store[TaskDescriptor] connectorStore ConnectorStore @@ -30,7 +30,7 @@ type testContext[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core provider string } -func withManager[ConnectorConfig core.ConnectorConfigObject, TaskDescriptor core.TaskDescriptor](builder *ConnectorBuilder[TaskDescriptor], callback func(ctx *testContext[ConnectorConfig, TaskDescriptor])) { +func withManager[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor](builder *ConnectorBuilder[TaskDescriptor], callback func(ctx *testContext[ConnectorConfig, TaskDescriptor])) { l := logrus.New() if testing.Verbose() { l.SetLevel(logrus.DebugLevel) @@ -70,12 +70,12 @@ func TestInstallConnector(t *testing.T) { close(installed) return nil }) - withManager(builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { - err := tc.manager.Install(context.Background(), core.EmptyConnectorConfig{}) + withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { + err := tc.manager.Install(context.Background(), payments.EmptyConnectorConfig{}) require.NoError(t, err) require.True(t, ChanClosed(installed)) - err = tc.manager.Install(context.Background(), core.EmptyConnectorConfig{}) + err = tc.manager.Install(context.Background(), payments.EmptyConnectorConfig{}) require.Equal(t, ErrAlreadyInstalled, err) }) } @@ -103,8 +103,8 @@ func TestUninstallConnector(t *testing.T) { close(uninstalled) return nil }) - withManager(builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { - err := tc.manager.Install(context.Background(), core.EmptyConnectorConfig{}) + withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { + err := tc.manager.Install(context.Background(), payments.EmptyConnectorConfig{}) require.NoError(t, err) <-taskStarted require.NoError(t, tc.manager.Uninstall(context.Background())) @@ -125,8 +125,8 @@ func TestDisableConnector(t *testing.T) { close(uninstalled) return nil }) - withManager[core.EmptyConnectorConfig, any](builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { - err := tc.manager.Install(context.Background(), core.EmptyConnectorConfig{}) + withManager[payments.EmptyConnectorConfig, any](builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { + err := tc.manager.Install(context.Background(), payments.EmptyConnectorConfig{}) require.NoError(t, err) enabled, err := tc.manager.IsEnabled(context.Background()) @@ -142,19 +142,19 @@ func TestDisableConnector(t *testing.T) { func TestEnableConnector(t *testing.T) { builder := NewConnectorBuilder[any]() - withManager[core.EmptyConnectorConfig, any](builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { + withManager[payments.EmptyConnectorConfig, any](builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { err := tc.connectorStore.Enable(context.Background(), tc.loader.Name()) require.NoError(t, err) - err = tc.manager.Install(context.Background(), core.EmptyConnectorConfig{}) + err = tc.manager.Install(context.Background(), payments.EmptyConnectorConfig{}) require.NoError(t, err) }) } func TestRestoreEnabledConnector(t *testing.T) { builder := NewConnectorBuilder[any]() - withManager(builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { - err := tc.connectorStore.Install(context.Background(), tc.loader.Name(), core.EmptyConnectorConfig{}) + withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { + err := tc.connectorStore.Install(context.Background(), tc.loader.Name(), payments.EmptyConnectorConfig{}) require.NoError(t, err) err = tc.manager.Restore(context.Background()) @@ -165,7 +165,7 @@ func TestRestoreEnabledConnector(t *testing.T) { func TestRestoreNotInstalledConnector(t *testing.T) { builder := NewConnectorBuilder[any]() - withManager(builder, func(tc *testContext[core.EmptyConnectorConfig, any]) { + withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { err := tc.manager.Restore(context.Background()) require.Equal(t, ErrNotInstalled, err) }) diff --git a/pkg/bridge/integration/store.go b/pkg/bridge/integration/store.go index b132e6d19..5642d1cdf 100644 --- a/pkg/bridge/integration/store.go +++ b/pkg/bridge/integration/store.go @@ -4,7 +4,7 @@ import ( "context" "reflect" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -96,19 +96,19 @@ type mongodbConnectorStore struct { func (m *mongodbConnectorStore) Uninstall(ctx context.Context, name string) error { return m.db.Client().UseSession(ctx, func(ctx mongo.SessionContext) error { _, err := ctx.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { - _, err := m.db.Collection(core.TasksCollection).DeleteMany(ctx, map[string]any{ + _, err := m.db.Collection(payments.TasksCollection).DeleteMany(ctx, map[string]any{ "provider": name, }) if err != nil { return nil, errors.Wrap(err, "deleting tasks") } - _, err = m.db.Collection(core.Collection).DeleteMany(ctx, map[string]any{ + _, err = m.db.Collection(payments.Collection).DeleteMany(ctx, map[string]any{ "provider": name, }) if err != nil { return nil, errors.Wrap(err, "deleting payments") } - _, err = m.db.Collection(core.ConnectorsCollection).DeleteOne(ctx, map[string]any{ + _, err = m.db.Collection(payments.ConnectorsCollection).DeleteOne(ctx, map[string]any{ "provider": name, }) if err != nil { @@ -121,7 +121,7 @@ func (m *mongodbConnectorStore) Uninstall(ctx context.Context, name string) erro } func (m *mongodbConnectorStore) IsInstalled(ctx context.Context, name string) (bool, error) { - ret := m.db.Collection(core.ConnectorsCollection).FindOne(ctx, map[string]any{ + ret := m.db.Collection(payments.ConnectorsCollection).FindOne(ctx, map[string]any{ "provider": name, }) if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { @@ -134,7 +134,7 @@ func (m *mongodbConnectorStore) IsInstalled(ctx context.Context, name string) (b } func (m *mongodbConnectorStore) Install(ctx context.Context, name string, config any) error { - _, err := m.db.Collection(core.ConnectorsCollection).UpdateOne(ctx, map[string]any{ + _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ "$set": map[string]any{ @@ -145,7 +145,7 @@ func (m *mongodbConnectorStore) Install(ctx context.Context, name string, config } func (m *mongodbConnectorStore) UpdateConfig(ctx context.Context, name string, config any) error { - _, err := m.db.Collection(core.ConnectorsCollection).UpdateOne(ctx, map[string]any{ + _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ "$set": map[string]any{ @@ -156,7 +156,7 @@ func (m *mongodbConnectorStore) UpdateConfig(ctx context.Context, name string, c } func (m *mongodbConnectorStore) Enable(ctx context.Context, name string) error { - _, err := m.db.Collection(core.ConnectorsCollection).UpdateOne(ctx, map[string]any{ + _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ "$set": map[string]any{ @@ -167,7 +167,7 @@ func (m *mongodbConnectorStore) Enable(ctx context.Context, name string) error { } func (m *mongodbConnectorStore) Disable(ctx context.Context, name string) error { - _, err := m.db.Collection(core.ConnectorsCollection).UpdateOne(ctx, map[string]any{ + _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ "$set": map[string]any{ @@ -178,7 +178,7 @@ func (m *mongodbConnectorStore) Disable(ctx context.Context, name string) error } func (m *mongodbConnectorStore) IsEnabled(ctx context.Context, name string) (bool, error) { - ret := m.db.Collection(core.ConnectorsCollection).FindOne(ctx, map[string]any{ + ret := m.db.Collection(payments.ConnectorsCollection).FindOne(ctx, map[string]any{ "provider": name, }) if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { @@ -187,7 +187,7 @@ func (m *mongodbConnectorStore) IsEnabled(ctx context.Context, name string) (boo if ret.Err() == mongo.ErrNoDocuments { return false, ErrNotInstalled } - p := core.Connector[core.EmptyConnectorConfig]{} + p := payments.Connector[payments.EmptyConnectorConfig]{} err := ret.Decode(&p) if err != nil { return false, err @@ -197,7 +197,7 @@ func (m *mongodbConnectorStore) IsEnabled(ctx context.Context, name string) (boo } func (m *mongodbConnectorStore) ReadConfig(ctx context.Context, name string, to interface{}) error { - ret := m.db.Collection(core.ConnectorsCollection).FindOne(ctx, map[string]any{ + ret := m.db.Collection(payments.ConnectorsCollection).FindOne(ctx, map[string]any{ "provider": name, }) if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { @@ -206,7 +206,7 @@ func (m *mongodbConnectorStore) ReadConfig(ctx context.Context, name string, to if ret.Err() == mongo.ErrNoDocuments { return errors.New("not installed") } - p := core.Connector[bson.Raw]{} + p := payments.Connector[bson.Raw]{} err := ret.Decode(&p) if err != nil { return err diff --git a/pkg/bridge/task/context.go b/pkg/bridge/task/context.go index 38f3390d1..93a3211a6 100644 --- a/pkg/bridge/task/context.go +++ b/pkg/bridge/task/context.go @@ -3,15 +3,15 @@ package task import ( "context" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" ) -type ConnectorContext[TaskDescriptor core.TaskDescriptor] interface { +type ConnectorContext[TaskDescriptor payments.TaskDescriptor] interface { Context() context.Context Scheduler() Scheduler[TaskDescriptor] } -type connectorContext[TaskDescriptor core.TaskDescriptor] struct { +type connectorContext[TaskDescriptor payments.TaskDescriptor] struct { ctx context.Context scheduler Scheduler[TaskDescriptor] } @@ -23,7 +23,7 @@ func (ctx *connectorContext[TaskDescriptor]) Scheduler() Scheduler[TaskDescripto return ctx.scheduler } -func NewConnectorContext[TaskDescriptor core.TaskDescriptor](ctx context.Context, scheduler Scheduler[TaskDescriptor]) *connectorContext[TaskDescriptor] { +func NewConnectorContext[TaskDescriptor payments.TaskDescriptor](ctx context.Context, scheduler Scheduler[TaskDescriptor]) *connectorContext[TaskDescriptor] { return &connectorContext[TaskDescriptor]{ ctx: ctx, scheduler: scheduler, diff --git a/pkg/bridge/task/scheduler.go b/pkg/bridge/task/scheduler.go index 89befbdb8..fa6f36f89 100644 --- a/pkg/bridge/task/scheduler.go +++ b/pkg/bridge/task/scheduler.go @@ -8,7 +8,7 @@ import ( "time" "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.uber.org/dig" @@ -19,40 +19,40 @@ var ( ErrUnableToResolve = errors.New("unable to resolve task") ) -type Resolver[TaskDescriptor core.TaskDescriptor] interface { +type Resolver[TaskDescriptor payments.TaskDescriptor] interface { Resolve(descriptor TaskDescriptor) Task } -type ResolverFn[TaskDescriptor core.TaskDescriptor] func(descriptor TaskDescriptor) Task +type ResolverFn[TaskDescriptor payments.TaskDescriptor] func(descriptor TaskDescriptor) Task func (fn ResolverFn[TaskDescriptor]) Resolve(descriptor TaskDescriptor) Task { return fn(descriptor) } type ContainerFactory interface { - Create(ctx context.Context, descriptor core.TaskDescriptor) (*dig.Container, error) + Create(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) } -type ContainerFactoryFn func(ctx context.Context, descriptor core.TaskDescriptor) (*dig.Container, error) +type ContainerFactoryFn func(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) -func (fn ContainerFactoryFn) Create(ctx context.Context, descriptor core.TaskDescriptor) (*dig.Container, error) { +func (fn ContainerFactoryFn) Create(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) { return fn(ctx, descriptor) } -var DefaultContainerFactory = ContainerFactoryFn(func(ctx context.Context, descriptor core.TaskDescriptor) (*dig.Container, error) { +var DefaultContainerFactory = ContainerFactoryFn(func(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) { return dig.New(), nil }) -type Scheduler[TaskDescriptor core.TaskDescriptor] interface { +type Scheduler[TaskDescriptor payments.TaskDescriptor] interface { Schedule(p TaskDescriptor, restart bool) error } -type taskHolder[TaskDescriptor core.TaskDescriptor] struct { +type taskHolder[TaskDescriptor payments.TaskDescriptor] struct { descriptor TaskDescriptor cancel func() logger sharedlogging.Logger stopChan StopChan } -type DefaultTaskScheduler[TaskDescriptor core.TaskDescriptor] struct { +type DefaultTaskScheduler[TaskDescriptor payments.TaskDescriptor] struct { provider string logger sharedlogging.Logger store Store[TaskDescriptor] @@ -64,11 +64,11 @@ type DefaultTaskScheduler[TaskDescriptor core.TaskDescriptor] struct { stopped bool } -func (s *DefaultTaskScheduler[TaskDescriptor]) ListTasks(ctx context.Context) ([]core.TaskState[TaskDescriptor], error) { +func (s *DefaultTaskScheduler[TaskDescriptor]) ListTasks(ctx context.Context) ([]payments.TaskState[TaskDescriptor], error) { return s.store.ListTaskStates(ctx, s.provider) } -func (s *DefaultTaskScheduler[TaskDescriptor]) ReadTask(ctx context.Context, descriptor TaskDescriptor) (*core.TaskState[TaskDescriptor], error) { +func (s *DefaultTaskScheduler[TaskDescriptor]) ReadTask(ctx context.Context, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { return s.store.ReadTaskState(ctx, s.provider, descriptor) } @@ -77,7 +77,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) Schedule(descriptor TaskDescripto s.mu.Lock() defer s.mu.Unlock() - taskId := core.IDFromDescriptor(descriptor) + taskId := payments.IDFromDescriptor(descriptor) if _, ok := s.tasks[taskId]; ok { return ErrAlreadyScheduled } @@ -133,7 +133,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) Shutdown(ctx context.Context) err func (s *DefaultTaskScheduler[TaskDescriptor]) Restore(ctx context.Context) error { - states, err := s.store.ListTaskStatesByStatus(ctx, s.provider, core.TaskStatusActive) + states, err := s.store.ListTaskStatesByStatus(ctx, s.provider, payments.TaskStatusActive) if err != nil { return err } @@ -159,7 +159,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) registerTaskError(ctx context.Con } holder.logger.Errorf("Task terminated with error: %s", taskErr) - err := s.store.UpdateTaskStatus(ctx, s.provider, holder.descriptor, core.TaskStatusFailed, pe) + err := s.store.UpdateTaskStatus(ctx, s.provider, holder.descriptor, payments.TaskStatusFailed, pe) if err != nil { holder.logger.Error("Error updating task status: %s", pe) } @@ -168,7 +168,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) registerTaskError(ctx context.Con func (s *DefaultTaskScheduler[TaskDescriptor]) deleteTask(holder *taskHolder[TaskDescriptor]) { s.mu.Lock() defer s.mu.Unlock() - delete(s.tasks, core.IDFromDescriptor(holder.descriptor)) + delete(s.tasks, payments.IDFromDescriptor(holder.descriptor)) if s.stopped { return @@ -197,12 +197,12 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) deleteTask(holder *taskHolder[Tas type StopChan chan chan struct{} func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescriptor) error { - ps, err := s.store.FindTaskAndUpdateStatus(context.Background(), s.provider, descriptor, core.TaskStatusActive, "") + ps, err := s.store.FindTaskAndUpdateStatus(context.Background(), s.provider, descriptor, payments.TaskStatusActive, "") if err != nil { return errors.Wrap(err, "finding task and update") } - taskId := core.IDFromDescriptor(descriptor) + taskId := payments.IDFromDescriptor(descriptor) logger := s.logger.WithFields(map[string]interface{}{ "task-id": taskId, }) @@ -266,7 +266,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript panic(err) } - s.tasks[core.IDFromDescriptor(descriptor)] = holder + s.tasks[payments.IDFromDescriptor(descriptor)] = holder go func() { logger.Infof("Starting task...") @@ -286,7 +286,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript } logger.Infof("Task terminated with success") - err = s.store.UpdateTaskStatus(ctx, s.provider, descriptor, core.TaskStatusTerminated, "") + err = s.store.UpdateTaskStatus(ctx, s.provider, descriptor, payments.TaskStatusTerminated, "") if err != nil { logger.Error("Error updating task status: %s", err) } @@ -299,12 +299,12 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) stackTask(descriptor TaskDescript "descriptor": descriptor, }).Infof("Stacking task") return s.store.UpdateTaskStatus( - context.Background(), s.provider, descriptor, core.TaskStatusPending, "") + context.Background(), s.provider, descriptor, payments.TaskStatusPending, "") } var _ Scheduler[struct{}] = &DefaultTaskScheduler[struct{}]{} -func NewDefaultScheduler[TaskDescriptor core.TaskDescriptor]( +func NewDefaultScheduler[TaskDescriptor payments.TaskDescriptor]( provider string, logger sharedlogging.Logger, store Store[TaskDescriptor], diff --git a/pkg/bridge/task/scheduler_test.go b/pkg/bridge/task/scheduler_test.go index 478867f52..25dfc6b1d 100644 --- a/pkg/bridge/task/scheduler_test.go +++ b/pkg/bridge/task/scheduler_test.go @@ -7,13 +7,13 @@ import ( "time" "github.com/numary/go-libs/sharedlogging/sharedlogginglogrus" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" "github.com/pborman/uuid" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" ) -func TaskTerminatedWithStatus[TaskDescriptor core.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor, expectedStatus core.TaskStatus, errString string) func() bool { +func TaskTerminatedWithStatus[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor, expectedStatus payments.TaskStatus, errString string) func() bool { return func() bool { status, err, ok := store.Result(provider, descriptor) if !ok { @@ -26,20 +26,20 @@ func TaskTerminatedWithStatus[TaskDescriptor core.TaskDescriptor](store *inMemor } } -func TaskTerminated[TaskDescriptor core.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { - return TaskTerminatedWithStatus(store, provider, descriptor, core.TaskStatusTerminated, "") +func TaskTerminated[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { + return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusTerminated, "") } -func TaskFailed[TaskDescriptor core.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor, errStr string) func() bool { - return TaskTerminatedWithStatus(store, provider, descriptor, core.TaskStatusFailed, errStr) +func TaskFailed[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor, errStr string) func() bool { + return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusFailed, errStr) } -func TaskPending[TaskDescriptor core.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { - return TaskTerminatedWithStatus(store, provider, descriptor, core.TaskStatusPending, "") +func TaskPending[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { + return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusPending, "") } -func TaskActive[TaskDescriptor core.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { - return TaskTerminatedWithStatus(store, provider, descriptor, core.TaskStatusActive, "") +func TaskActive[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { + return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusActive, "") } func TestTaskScheduler(t *testing.T) { diff --git a/pkg/bridge/task/store.go b/pkg/bridge/task/store.go index 8377fa015..6cbe5907d 100644 --- a/pkg/bridge/task/store.go +++ b/pkg/bridge/task/store.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -17,29 +17,29 @@ var ( ErrNotFound = errors.New("not found") ) -type Store[TaskDescriptor core.TaskDescriptor] interface { - UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status core.TaskStatus, err string) error +type Store[TaskDescriptor payments.TaskDescriptor] interface { + UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, err string) error FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, - status core.TaskStatus, err string) (*core.TaskState[TaskDescriptor], error) - ListTaskStatesByStatus(ctx context.Context, provider string, status core.TaskStatus) ([]core.TaskState[TaskDescriptor], error) - ListTaskStates(ctx context.Context, provider string) ([]core.TaskState[TaskDescriptor], error) - ReadOldestPendingTask(ctx context.Context, provider string) (*core.TaskState[TaskDescriptor], error) - ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*core.TaskState[TaskDescriptor], error) + status payments.TaskStatus, err string) (*payments.TaskState[TaskDescriptor], error) + ListTaskStatesByStatus(ctx context.Context, provider string, status payments.TaskStatus) ([]payments.TaskState[TaskDescriptor], error) + ListTaskStates(ctx context.Context, provider string) ([]payments.TaskState[TaskDescriptor], error) + ReadOldestPendingTask(ctx context.Context, provider string) (*payments.TaskState[TaskDescriptor], error) + ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) } -type inMemoryStore[TaskDescriptor core.TaskDescriptor] struct { - statuses map[string]core.TaskStatus +type inMemoryStore[TaskDescriptor payments.TaskDescriptor] struct { + statuses map[string]payments.TaskStatus created map[string]time.Time errors map[string]string } -func (s *inMemoryStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*core.TaskState[TaskDescriptor], error) { - id := core.IDFromDescriptor(descriptor) +func (s *inMemoryStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { + id := payments.IDFromDescriptor(descriptor) status, ok := s.statuses[id] if !ok { return nil, ErrNotFound } - return &core.TaskState[TaskDescriptor]{ + return &payments.TaskState[TaskDescriptor]{ Provider: provider, Descriptor: descriptor, Status: status, @@ -49,17 +49,17 @@ func (s *inMemoryStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provi }, nil } -func (s *inMemoryStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provider string) ([]core.TaskState[TaskDescriptor], error) { - ret := make([]core.TaskState[TaskDescriptor], 0) +func (s *inMemoryStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provider string) ([]payments.TaskState[TaskDescriptor], error) { + ret := make([]payments.TaskState[TaskDescriptor], 0) for id, status := range s.statuses { if !strings.HasPrefix(id, fmt.Sprintf("%s/", provider)) { continue } var descriptor TaskDescriptor - core.DescriptorFromID(id, &descriptor) + payments.DescriptorFromID(id, &descriptor) - ret = append(ret, core.TaskState[TaskDescriptor]{ + ret = append(ret, payments.TaskState[TaskDescriptor]{ Provider: provider, Descriptor: descriptor, Status: status, @@ -71,13 +71,13 @@ func (s *inMemoryStore[TaskDescriptor]) ListTaskStates(ctx context.Context, prov return ret, nil } -func (s *inMemoryStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, provider string) (*core.TaskState[TaskDescriptor], error) { +func (s *inMemoryStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, provider string) (*payments.TaskState[TaskDescriptor], error) { var ( oldestDate time.Time oldestId string ) for id, status := range s.statuses { - if status != core.TaskStatusPending { + if status != payments.TaskStatusPending { continue } if oldestDate.IsZero() || s.created[id].Before(oldestDate) { @@ -92,25 +92,25 @@ func (s *inMemoryStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Contex descriptorStr := strings.Split(oldestId, "/")[1] var descriptor TaskDescriptor - core.DescriptorFromID(descriptorStr, &descriptor) + payments.DescriptorFromID(descriptorStr, &descriptor) - return &core.TaskState[TaskDescriptor]{ + return &payments.TaskState[TaskDescriptor]{ Provider: provider, Descriptor: descriptor, - Status: core.TaskStatusPending, + Status: payments.TaskStatusPending, State: nil, CreatedAt: s.created[oldestId], }, nil } -func (s *inMemoryStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, taskStatus core.TaskStatus) ([]core.TaskState[TaskDescriptor], error) { +func (s *inMemoryStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, taskStatus payments.TaskStatus) ([]payments.TaskState[TaskDescriptor], error) { all, err := s.ListTaskStates(ctx, provider) if err != nil { return nil, err } - ret := make([]core.TaskState[TaskDescriptor], 0) + ret := make([]payments.TaskState[TaskDescriptor], 0) for _, v := range all { if v.Status != taskStatus { continue @@ -121,13 +121,13 @@ func (s *inMemoryStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Conte return ret, nil } -func (s *inMemoryStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status core.TaskStatus, taskErr string) (*core.TaskState[TaskDescriptor], error) { +func (s *inMemoryStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, taskErr string) (*payments.TaskState[TaskDescriptor], error) { err := s.UpdateTaskStatus(ctx, provider, descriptor, status, taskErr) if err != nil { return nil, err } - return &core.TaskState[TaskDescriptor]{ + return &payments.TaskState[TaskDescriptor]{ Provider: provider, Descriptor: descriptor, Status: status, @@ -137,8 +137,8 @@ func (s *inMemoryStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Cont }, nil } -func (s *inMemoryStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status core.TaskStatus, err string) error { - taskId := core.IDFromDescriptor(descriptor) +func (s *inMemoryStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, err string) error { + taskId := payments.IDFromDescriptor(descriptor) key := fmt.Sprintf("%s/%s", provider, taskId) s.statuses[key] = status s.errors[key] = err @@ -148,8 +148,8 @@ func (s *inMemoryStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, pr return nil } -func (s *inMemoryStore[TaskDescriptor]) Result(provider string, descriptor core.TaskDescriptor) (core.TaskStatus, string, bool) { - taskId := core.IDFromDescriptor(descriptor) +func (s *inMemoryStore[TaskDescriptor]) Result(provider string, descriptor payments.TaskDescriptor) (payments.TaskStatus, string, bool) { + taskId := payments.IDFromDescriptor(descriptor) key := fmt.Sprintf("%s/%s", provider, taskId) status, ok := s.statuses[key] if !ok { @@ -158,9 +158,9 @@ func (s *inMemoryStore[TaskDescriptor]) Result(provider string, descriptor core. return status, s.errors[key], true } -func NewInMemoryStore[TaskDescriptor core.TaskDescriptor]() *inMemoryStore[TaskDescriptor] { +func NewInMemoryStore[TaskDescriptor payments.TaskDescriptor]() *inMemoryStore[TaskDescriptor] { return &inMemoryStore[TaskDescriptor]{ - statuses: make(map[string]core.TaskStatus), + statuses: make(map[string]payments.TaskStatus), errors: make(map[string]string), created: make(map[string]time.Time), } @@ -168,12 +168,12 @@ func NewInMemoryStore[TaskDescriptor core.TaskDescriptor]() *inMemoryStore[TaskD var _ Store[struct{}] = &inMemoryStore[struct{}]{} -type mongoDBStore[TaskDescriptor core.TaskDescriptor] struct { +type mongoDBStore[TaskDescriptor payments.TaskDescriptor] struct { db *mongo.Database } -func (m *mongoDBStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*core.TaskState[TaskDescriptor], error) { - ret := m.db.Collection(core.TasksCollection).FindOne(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { + ret := m.db.Collection(payments.TasksCollection).FindOne(ctx, map[string]any{ "provider": provider, "descriptor": descriptor, }) @@ -183,7 +183,7 @@ func (m *mongoDBStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provid } return nil, ret.Err() } - ts := core.TaskState[TaskDescriptor]{} + ts := payments.TaskState[TaskDescriptor]{} err := ret.Decode(&ts) if err != nil { return nil, err @@ -192,10 +192,10 @@ func (m *mongoDBStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provid return &ts, nil } -func (m *mongoDBStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, provider string) (*core.TaskState[TaskDescriptor], error) { - ret := m.db.Collection(core.TasksCollection).FindOne(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, provider string) (*payments.TaskState[TaskDescriptor], error) { + ret := m.db.Collection(payments.TasksCollection).FindOne(ctx, map[string]any{ "provider": provider, - "status": core.TaskStatusPending, + "status": payments.TaskStatusPending, }, options.FindOne().SetSort(bson.M{"createdAt": 1})) if ret.Err() != nil { if ret.Err() == mongo.ErrNoDocuments { @@ -203,7 +203,7 @@ func (m *mongoDBStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context } return nil, ret.Err() } - ps := &core.TaskState[TaskDescriptor]{} + ps := &payments.TaskState[TaskDescriptor]{} err := ret.Decode(ps) if err != nil { return nil, err @@ -212,8 +212,8 @@ func (m *mongoDBStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context return ps, nil } -func (m *mongoDBStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status core.TaskStatus, taskErr string) error { - _, err := m.db.Collection(core.TasksCollection).UpdateOne(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, taskErr string) error { + _, err := m.db.Collection(payments.TasksCollection).UpdateOne(ctx, map[string]any{ "provider": provider, "descriptor": descriptor, }, map[string]any{ @@ -228,8 +228,8 @@ func (m *mongoDBStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, pro return err } -func (m *mongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status core.TaskStatus, taskErr string) (*core.TaskState[TaskDescriptor], error) { - ret := m.db.Collection(core.TasksCollection).FindOneAndUpdate(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, taskErr string) (*payments.TaskState[TaskDescriptor], error) { + ret := m.db.Collection(payments.TasksCollection).FindOneAndUpdate(ctx, map[string]any{ "provider": provider, "descriptor": descriptor, }, map[string]any{ @@ -244,7 +244,7 @@ func (m *mongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Conte if ret.Err() != nil { return nil, errors.Wrap(ret.Err(), "retrieving task") } - ps := &core.TaskState[TaskDescriptor]{} + ps := &payments.TaskState[TaskDescriptor]{} err := ret.Decode(ps) if err != nil { return nil, errors.Wrap(err, "decoding task state") @@ -252,8 +252,8 @@ func (m *mongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Conte return ps, nil } -func (m *mongoDBStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, status core.TaskStatus) ([]core.TaskState[TaskDescriptor], error) { - cursor, err := m.db.Collection(core.TasksCollection).Find(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, status payments.TaskStatus) ([]payments.TaskState[TaskDescriptor], error) { + cursor, err := m.db.Collection(payments.TasksCollection).Find(ctx, map[string]any{ "provider": provider, "status": status, }) @@ -263,7 +263,7 @@ func (m *mongoDBStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Contex if err != nil { return nil, err } - ret := make([]core.TaskState[TaskDescriptor], 0) + ret := make([]payments.TaskState[TaskDescriptor], 0) err = cursor.All(ctx, &ret) if err != nil { return nil, err @@ -272,8 +272,8 @@ func (m *mongoDBStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Contex return ret, nil } -func (m *mongoDBStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provider string) ([]core.TaskState[TaskDescriptor], error) { - cursor, err := m.db.Collection(core.TasksCollection).Find(ctx, map[string]any{ +func (m *mongoDBStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provider string) ([]payments.TaskState[TaskDescriptor], error) { + cursor, err := m.db.Collection(payments.TasksCollection).Find(ctx, map[string]any{ "provider": provider, }) if err != nil { @@ -282,7 +282,7 @@ func (m *mongoDBStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provi if err != nil { return nil, err } - ret := make([]core.TaskState[TaskDescriptor], 0) + ret := make([]payments.TaskState[TaskDescriptor], 0) err = cursor.All(ctx, &ret) if err != nil { return nil, err @@ -293,6 +293,6 @@ func (m *mongoDBStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provi var _ Store[struct{}] = &mongoDBStore[struct{}]{} -func NewMongoDBStore[TaskDescriptor core.TaskDescriptor](db *mongo.Database) *mongoDBStore[TaskDescriptor] { +func NewMongoDBStore[TaskDescriptor payments.TaskDescriptor](db *mongo.Database) *mongoDBStore[TaskDescriptor] { return &mongoDBStore[TaskDescriptor]{db: db} } diff --git a/pkg/bridge/writeonly/storage.go b/pkg/bridge/writeonly/storage.go index 870e055a1..05e4d413d 100644 --- a/pkg/bridge/writeonly/storage.go +++ b/pkg/bridge/writeonly/storage.go @@ -4,7 +4,7 @@ import ( "context" "github.com/iancoleman/strcase" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" "go.mongodb.org/mongo-driver/mongo" ) @@ -32,7 +32,7 @@ func Write[T any](ctx context.Context, storage Storage, items ...T) error { type MongoDBStorage struct { db *mongo.Database provider string - taskDescriptor core.TaskDescriptor + taskDescriptor payments.TaskDescriptor } func (m *MongoDBStorage) Write(ctx context.Context, items ...any) error { @@ -40,7 +40,7 @@ func (m *MongoDBStorage) Write(ctx context.Context, items ...any) error { for _, i := range items { toSave = append(toSave, Item{ Provider: m.provider, - TaskId: core.IDFromDescriptor(m.taskDescriptor), + TaskId: payments.IDFromDescriptor(m.taskDescriptor), Data: i, }) } @@ -53,7 +53,7 @@ func (m *MongoDBStorage) Write(ctx context.Context, items ...any) error { return nil } -func NewMongoDBStorage(db *mongo.Database, provider string, descriptor core.TaskDescriptor) *MongoDBStorage { +func NewMongoDBStorage(db *mongo.Database, provider string, descriptor payments.TaskDescriptor) *MongoDBStorage { return &MongoDBStorage{ db: db, provider: provider, diff --git a/pkg/core/collections.go b/pkg/collections.go similarity index 86% rename from pkg/core/collections.go rename to pkg/collections.go index a92544670..d634b3e23 100644 --- a/pkg/core/collections.go +++ b/pkg/collections.go @@ -1,4 +1,4 @@ -package core +package payments const Collection = "Payments" const ConnectorsCollection = "Connectors" diff --git a/pkg/core/config.go b/pkg/config.go similarity index 90% rename from pkg/core/config.go rename to pkg/config.go index 5dc40ea42..6e095ab52 100644 --- a/pkg/core/config.go +++ b/pkg/config.go @@ -1,4 +1,4 @@ -package core +package payments type ConnectorConfigObject interface { Validate() error diff --git a/pkg/core/connector.go b/pkg/connector.go similarity index 92% rename from pkg/core/connector.go rename to pkg/connector.go index df93f4557..71b33fc75 100644 --- a/pkg/core/connector.go +++ b/pkg/connector.go @@ -1,4 +1,4 @@ -package core +package payments type Connector[T ConnectorConfigObject] struct { Provider string `json:"provider" bson:"provider"` diff --git a/pkg/database/indexes.go b/pkg/database/indexes.go index 865d0157a..82836d019 100644 --- a/pkg/database/indexes.go +++ b/pkg/database/indexes.go @@ -5,14 +5,14 @@ import ( "fmt" "reflect" - "github.com/numary/payments/pkg/core" + payments "github.com/numary/payments/pkg" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/x/bsonx" ) var indexes = map[string][]mongo.IndexModel{ - core.Collection: { + payments.Collection: { { Keys: bsonx.Doc{ bsonx.Elem{ diff --git a/pkg/core/message.go b/pkg/message.go similarity index 97% rename from pkg/core/message.go rename to pkg/message.go index 8a2e312ce..b990ebe30 100644 --- a/pkg/core/message.go +++ b/pkg/message.go @@ -1,4 +1,4 @@ -package core +package payments import ( "time" diff --git a/pkg/core/payment.go b/pkg/payment.go similarity index 99% rename from pkg/core/payment.go rename to pkg/payment.go index f43acf057..e983b66f9 100644 --- a/pkg/core/payment.go +++ b/pkg/payment.go @@ -1,4 +1,4 @@ -package core +package payments import ( "encoding/base64" diff --git a/pkg/core/payment_test.go b/pkg/payment_test.go similarity index 97% rename from pkg/core/payment_test.go rename to pkg/payment_test.go index de6348170..75fd19d8d 100644 --- a/pkg/core/payment_test.go +++ b/pkg/payment_test.go @@ -1,4 +1,4 @@ -package core +package payments import ( "testing" diff --git a/pkg/core/task.go b/pkg/task.go similarity index 98% rename from pkg/core/task.go rename to pkg/task.go index 5a60e96e3..3ba080a51 100644 --- a/pkg/core/task.go +++ b/pkg/task.go @@ -1,4 +1,4 @@ -package core +package payments import ( "encoding/base64" From 0c131f05a1c51eca3a0caf5250f7cbc5dfcc6215 Mon Sep 17 00:00:00 2001 From: Antoine Gelloz Date: Mon, 29 Aug 2022 11:59:22 +0200 Subject: [PATCH 10/12] chore: update messages --- pkg/bridge/ingestion/ingester.go | 8 ++--- pkg/bridge/ingestion/message.go | 51 ++++++++++++++++++++++++++++++++ pkg/message.go | 34 --------------------- 3 files changed, 54 insertions(+), 39 deletions(-) create mode 100644 pkg/bridge/ingestion/message.go delete mode 100644 pkg/message.go diff --git a/pkg/bridge/ingestion/ingester.go b/pkg/bridge/ingestion/ingester.go index 627f754ba..558d5aeb5 100644 --- a/pkg/bridge/ingestion/ingester.go +++ b/pkg/bridge/ingestion/ingester.go @@ -210,11 +210,9 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a if i.publisher != nil { for _, e := range allPayments { - if err = i.publisher.Publish(ctx, payments.TopicPayments, - payments.NewEventPaymentsSavedPayment(payments.SavedPayment(e.Computed())), - ); err != nil { - i.logger.Errorf("Error publishing payment: %s", err) - } + i.publish(ctx, TopicPayments, + NewEventSavedPayment( + SavedPayment(e.Computed()))) } } diff --git a/pkg/bridge/ingestion/message.go b/pkg/bridge/ingestion/message.go new file mode 100644 index 000000000..bf8d52ceb --- /dev/null +++ b/pkg/bridge/ingestion/message.go @@ -0,0 +1,51 @@ +package ingestion + +import ( + "context" + "encoding/json" + "time" + + "github.com/numary/go-libs/sharedlogging" + payments "github.com/numary/payments/pkg" +) + +const ( + TopicPayments = "payments" + + EventVersion = "v1" + EventApp = "payments" + + EventTypeSavedPayment = "SAVED_PAYMENT" +) + +type EventMessage struct { + Date time.Time `json:"date"` + App string `json:"app"` + Version string `json:"version"` + Type string `json:"type"` + Payload json.RawMessage `json:"payload"` +} + +type SavedPayment payments.ComputedPayment + +func NewEventSavedPayment(payment SavedPayment) EventMessage { + payload, err := json.Marshal(payment) + if err != nil { + panic(err) + } + + return EventMessage{ + Date: time.Now().UTC(), + App: EventApp, + Version: EventVersion, + Type: EventTypeSavedPayment, + Payload: payload, + } +} + +func (i *defaultIngester) publish(ctx context.Context, topic string, ev EventMessage) { + if err := i.publisher.Publish(ctx, topic, ev); err != nil { + sharedlogging.GetLogger(ctx).Errorf("Publishing message: %s", err) + return + } +} diff --git a/pkg/message.go b/pkg/message.go deleted file mode 100644 index b990ebe30..000000000 --- a/pkg/message.go +++ /dev/null @@ -1,34 +0,0 @@ -package payments - -import ( - "time" -) - -const ( - TopicPayments = "payments" - - EventVersion = "v1" - EventApp = "payments" - - EventPaymentsTypeSavedPayment = "SAVED_PAYMENT" -) - -type EventPaymentsMessage[T any] struct { - Date time.Time `json:"date"` - App string `json:"app"` - Version string `json:"version"` - Type string `json:"type"` - Payload T `json:"payload"` -} - -type SavedPayment ComputedPayment - -func NewEventPaymentsSavedPayment(payload SavedPayment) EventPaymentsMessage[SavedPayment] { - return EventPaymentsMessage[SavedPayment]{ - Date: time.Now().UTC(), - App: EventApp, - Version: EventVersion, - Type: EventPaymentsTypeSavedPayment, - Payload: payload, - } -} From e1f09401bc39528db1b2a0ef36ad8b9771049477 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 29 Aug 2022 10:09:50 +0000 Subject: [PATCH 11/12] chore(deps): bump github/codeql-action from 1 to 2 Bumps [github/codeql-action](https://github.com/github/codeql-action) from 1 to 2. - [Release notes](https://github.com/github/codeql-action/releases) - [Changelog](https://github.com/github/codeql-action/blob/main/CHANGELOG.md) - [Commits](https://github.com/github/codeql-action/compare/v1...v2) --- updated-dependencies: - dependency-name: github/codeql-action dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] --- .github/workflows/codeql.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.github/workflows/codeql.yml b/.github/workflows/codeql.yml index 0b3d48b37..ba615193d 100644 --- a/.github/workflows/codeql.yml +++ b/.github/workflows/codeql.yml @@ -24,10 +24,10 @@ jobs: - name: Checkout repository uses: actions/checkout@v2 - name: Initialize CodeQL - uses: github/codeql-action/init@v1 + uses: github/codeql-action/init@v2 with: languages: ${{ matrix.language }} - name: Autobuild - uses: github/codeql-action/autobuild@v1 + uses: github/codeql-action/autobuild@v2 - name: Perform CodeQL Analysis - uses: github/codeql-action/analyze@v1 + uses: github/codeql-action/analyze@v2 From 4729bb22bb34c262ba075459eb88b01d4cebb72a Mon Sep 17 00:00:00 2001 From: Geoffrey Ragot Date: Mon, 29 Aug 2022 14:32:01 +0200 Subject: [PATCH 12/12] fix: bad dependencies --- cmd/server.go | 4 ++-- go.mod | 2 +- go.sum | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/cmd/server.go b/cmd/server.go index 91ad18457..1bfa8f3cf 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -15,8 +15,8 @@ import ( "github.com/numary/go-libs/sharedauth" "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedlogging/sharedlogginglogrus" - "github.com/numary/go-libs/sharedotlp" - "github.com/numary/go-libs/sharedotlp/sharedotlptraces" + sharedotlp "github.com/numary/go-libs/sharedotlp/pkg" + "github.com/numary/go-libs/sharedotlp/pkg/sharedotlptraces" "github.com/numary/go-libs/sharedpublish" "github.com/numary/go-libs/sharedpublish/sharedpublishhttp" "github.com/numary/go-libs/sharedpublish/sharedpublishkafka" diff --git a/go.mod b/go.mod index 971904ff1..1d5b9e159 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/gorilla/mux v1.8.0 github.com/iancoleman/strcase v0.2.0 github.com/numary/go-libs v1.0.0 - github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13 + github.com/numary/go-libs/sharedotlp v0.0.0-20220829123039-3eeb76619d81 github.com/ory/dockertest v3.3.5+incompatible github.com/pborman/uuid v1.2.1 github.com/pkg/errors v0.9.1 diff --git a/go.sum b/go.sum index 2a701d0b3..a22f41eae 100644 --- a/go.sum +++ b/go.sum @@ -351,8 +351,8 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/numary/go-libs v1.0.0 h1:IsYJ/xFsYt+UyPCQR+/ZPW9T2fcO+V4Xv8B9g2os8Ac= github.com/numary/go-libs v1.0.0/go.mod h1:u9XNKBrHJSCwu13s85GkEg4TWfRm2CF3fknav4TTLn4= -github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13 h1:axI+XCZ03xg1Pym3MLwvT2Xw4wSZeCuxRbm6lIWheT8= -github.com/numary/go-libs/sharedotlp v0.0.0-20220801164020-fc3e3280ca13/go.mod h1:AZTmSrMGpNL9GzacAKwYsbSs+1U2ItR9i074hNILq6o= +github.com/numary/go-libs/sharedotlp v0.0.0-20220829123039-3eeb76619d81 h1:4ENnzCeXQGAAXGD7CSM8CZkKvQb78q1qNJ/Gonrb5Zs= +github.com/numary/go-libs/sharedotlp v0.0.0-20220829123039-3eeb76619d81/go.mod h1:4QEZTmjeQbNMjWd/pKADguTjxTvAuWgh+ZBWzJ7xDiI= github.com/oklog/ulid v1.3.1 h1:EGfNDEx6MqHz8B3uNV6QAib1UR2Lm97sHi3ocA6ESJ4= github.com/oklog/ulid v1.3.1/go.mod h1:CirwcVhetQ6Lv90oh/F+FBtV6XMibvdAFo93nm5qn4U= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U=