diff --git a/.github/workflows/pr-open.yml b/.github/workflows/pr-open.yml index 13a419cca..42caf7440 100644 --- a/.github/workflows/pr-open.yml +++ b/.github/workflows/pr-open.yml @@ -7,7 +7,20 @@ jobs: uses: formancehq/gh-workflows/.github/workflows/pr-style.yml@main lint: - uses: formancehq/gh-workflows/.github/workflows/golang-lint.yml@main + # Temporary disabled due to outdated shared version + # uses: formancehq/gh-workflows/.github/workflows/golang-lint.yml@main + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - uses: actions/setup-go@v3 + with: + go-version-file: 'go.mod' + cache: true + - name: golangci-lint + uses: golangci/golangci-lint-action@v3 + with: + version: v1.50.0 + args: --verbose test: uses: formancehq/gh-workflows/.github/workflows/golang-test.yml@main diff --git a/.golangci.yml b/.golangci.yml index e6e416e7a..fb419181b 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -1,8 +1,106 @@ +linters-settings: + dupl: + threshold: 30 + funlen: + lines: 150 + statements: 30 + goconst: + min-len: 2 + min-occurrences: 2 + gocritic: + enabled-tags: + - diagnostic + - experimental + - opinionated + - performance + - style + cyclop: + max-complexity: 20 + gocyclo: + min-complexity: 20 + goimports: + local-prefixes: github.com/numary/payments + govet: + check-shadowing: true + lll: + line-length: 120 + nakedret: + max-func-lines: 0 + tagliatelle: + case: + rules: + json: goCamel + yaml: goCamel + bson: goCamel + gomnd: + ignored-numbers: + - '2' + - '5' + - '10' + - '100' + - '64' + - '256' + - '512' + varnamelen: + ignore-names: + - id + - i + - db + - m + - r + - c + - fn + - w + - h + - to + - fs + linters: - enable: - - gofmt - - gci - - goimports + enable-all: true + disable: + - deadcode # deprecated + - maligned #deprecated + - golint # deprecated + - ifshort # deprecated + - structcheck # deprecated + - varcheck # deprecated + - interfacer # deprecated + - scopelint #deprecated + - nosnakecase # deprecated + - rowserrcheck # disabled due to generics + - sqlclosecheck # disabled due to generics + - structcheck # disabled due to generics + - wastedassign # disabled due to generics + - gci # conflicts with gofumpt + - goimports # conflicts with gofumpt + - testpackage # Disabled by design + - wrapcheck # Disabled by design + - exhaustivestruct # Disabled by design + - exhaustruct # Disabled by design + - exhaustive # Disabled by design + - dupl # Disabled by design + - godox # Disabled by design + - lll # Disabled by design + - funlen # Disabled by design + - misspell # Disabled by design + - ireturn # Disabled by design + - gocritic # TODO: FIX. Seems to have issues with generics + - gocognit # TODO: FIX + - goerr113 # TODO: FIX + - noctx # TODO: FIX + - contextcheck # TODO: FIX + - containedctx # TODO: FIX + +issues: + exclude-rules: + - path: _test\.go + linters: + - goerr113 + - varnamelen + + - linters: + - nolintlint + text: "should be written without leading space" run: timeout: 5m diff --git a/Dockerfile b/Dockerfile index 3d6f5e539..884bd52f5 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,33 +1,41 @@ -FROM --platform=$BUILDPLATFORM golang:1.18 AS builder +FROM golang:1.19.2-bullseye AS builder + RUN apt-get update && \ apt-get install -y gcc-aarch64-linux-gnu gcc-x86-64-linux-gnu && \ ln -s /usr/bin/aarch64-linux-gnu-gcc /usr/bin/arm64-linux-gnu-gcc && \ ln -s /usr/bin/x86_64-linux-gnu-gcc /usr/bin/amd64-linux-gnu-gcc + # 1. Precompile the entire go standard library into the first Docker cache layer: useful for other projects too! RUN CGO_ENABLED=0 GOOS=linux go install -v -installsuffix cgo -a std + ARG TARGETARCH ARG APP_SHA ARG VERSION + WORKDIR /go/src/github.com/numary/payments + # get deps first so it's cached COPY go.mod . COPY go.sum . -RUN --mount=type=cache,id=gomod,target=/go/bridge/mod \ - --mount=type=cache,id=gobuild,target=/root/.cache/go-build \ - go mod download + +RUN go mod download + COPY . . -RUN --mount=type=cache,id=gomod,target=/go/bridge/mod \ - --mount=type=cache,id=gobuild,target=/root/.cache/go-build \ - CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH \ + +RUN CGO_ENABLED=0 GOOS=linux GOARCH=$TARGETARCH \ CC=$TARGETARCH-linux-gnu-gcc \ - go build -o payments \ + go build -o bin/payments \ -ldflags="-X github.com/numary/payments/cmd.Version=${VERSION} \ -X github.com/numary/payments/cmd.BuildDate=$(date +%s) \ -X github.com/numary/payments/cmd.Commit=${APP_SHA}" ./ -FROM ubuntu:jammy -RUN apt update && apt install -y ca-certificates && rm -rf /var/lib/apt/lists/* -COPY --from=builder /go/src/github.com/numary/payments/payments /usr/local/bin/payments +FROM scratch + +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=builder /go/src/github.com/numary/payments/bin/payments /usr/local/bin/payments + EXPOSE 8080 + ENTRYPOINT ["payments"] + CMD ["server"] diff --git a/Makefile b/Makefile new file mode 100644 index 000000000..53c6879a0 --- /dev/null +++ b/Makefile @@ -0,0 +1,9 @@ +docker-build: + DOCKER_BUILDKIT=1 docker build -t payments:local --build-arg BUILDPLATFORM=amd64 . + +lint: + golangci-lint run + +lint-fix: + golangci-lint run --fix + diff --git a/cmd/root.go b/cmd/root.go index 761a2413d..6bc3dc35a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -1,3 +1,4 @@ +//nolint:gochecknoglobals,golint,revive // allow for cobra & logrus init package cmd import ( @@ -20,7 +21,7 @@ var ( Commit = "-" ) -func NewRootCommand() *cobra.Command { +func rootCommand() *cobra.Command { viper.SetDefault("version", Version) root := &cobra.Command{ @@ -29,29 +30,37 @@ func NewRootCommand() *cobra.Command { DisableAutoGenTag: true, } - version := NewVersion() + version := newVersion() root.AddCommand(version) - server := NewServer() + + server := newServer() root.AddCommand(server) root.PersistentFlags().Bool(debugFlag, false, "Debug mode") root.Flags().BoolP("toggle", "t", false, "Help message for toggle") root.Flags().Bool(debugFlag, false, "Debug mode") - root.Flags().String(mongodbUriFlag, "mongodb://localhost:27017", "MongoDB address") + root.Flags().String(mongodbURIFlag, "mongodb://localhost:27017", "MongoDB address") root.Flags().String(mongodbDatabaseFlag, "payments", "MongoDB database name") root.Flags().Bool(otelTracesFlag, false, "Enable OpenTelemetry traces support") root.Flags().String(otelTracesExporterFlag, "stdout", "OpenTelemetry traces exporter") - root.Flags().String(otelTracesExporterJaegerEndpointFlag, "", "OpenTelemetry traces Jaeger exporter endpoint") - root.Flags().String(otelTracesExporterJaegerUserFlag, "", "OpenTelemetry traces Jaeger exporter user") - root.Flags().String(otelTracesExporterJaegerPasswordFlag, "", "OpenTelemetry traces Jaeger exporter password") - root.Flags().String(otelTracesExporterOTLPModeFlag, "grpc", "OpenTelemetry traces OTLP exporter mode (grpc|http)") - root.Flags().String(otelTracesExporterOTLPEndpointFlag, "", "OpenTelemetry traces grpc endpoint") - root.Flags().Bool(otelTracesExporterOTLPInsecureFlag, false, "OpenTelemetry traces grpc insecure") + root.Flags().String(otelTracesExporterJaegerEndpointFlag, + "", "OpenTelemetry traces Jaeger exporter endpoint") + root.Flags().String(otelTracesExporterJaegerUserFlag, + "", "OpenTelemetry traces Jaeger exporter user") + root.Flags().String(otelTracesExporterJaegerPasswordFlag, + "", "OpenTelemetry traces Jaeger exporter password") + root.Flags().String(otelTracesExporterOTLPModeFlag, + "grpc", "OpenTelemetry traces OTLP exporter mode (grpc|http)") + root.Flags().String(otelTracesExporterOTLPEndpointFlag, + "", "OpenTelemetry traces grpc endpoint") + root.Flags().Bool(otelTracesExporterOTLPInsecureFlag, + false, "OpenTelemetry traces grpc insecure") root.Flags().String(envFlag, "local", "Environment") root.Flags().Bool(publisherKafkaEnabledFlag, false, "Publish write events to kafka") root.Flags().StringSlice(publisherKafkaBrokerFlag, []string{}, "Kafka address is kafka enabled") - root.Flags().StringSlice(publisherTopicMappingFlag, []string{}, "Define mapping between internal event types and topics") - root.Flags().Bool(publisherHttpEnabledFlag, false, "Sent write event to http endpoint") + root.Flags().StringSlice(publisherTopicMappingFlag, + []string{}, "Define mapping between internal event types and topics") + root.Flags().Bool(publisherHTTPEnabledFlag, false, "Sent write event to http endpoint") root.Flags().Bool(publisherKafkaSASLEnabled, false, "Enable SASL authentication on kafka publisher") root.Flags().String(publisherKafkaSASLUsername, "", "SASL username") root.Flags().String(publisherKafkaSASLPassword, "", "SASL password") @@ -59,15 +68,18 @@ func NewRootCommand() *cobra.Command { root.Flags().Int(publisherKafkaSASLScramSHASize, 512, "SASL SCRAM SHA size") root.Flags().Bool(publisherKafkaTLSEnabled, false, "Enable TLS to connect on kafka") root.Flags().Bool(authBasicEnabledFlag, false, "Enable basic auth") - root.Flags().StringSlice(authBasicCredentialsFlag, []string{}, "HTTP basic auth credentials (:)") + root.Flags().StringSlice(authBasicCredentialsFlag, []string{}, + "HTTP basic auth credentials (:)") root.Flags().Bool(authBearerEnabledFlag, false, "Enable bearer auth") - root.Flags().String(authBearerIntrospectUrlFlag, "", "OAuth2 introspect URL") + root.Flags().String(authBearerIntrospectURLFlag, "", "OAuth2 introspect URL") root.Flags().StringSlice(authBearerAudienceFlag, []string{}, "Allowed audiences") root.Flags().Bool(authBearerAudiencesWildcardFlag, false, "Don't check audience") - root.Flags().Bool(authBearerUseScopesFlag, false, "Use scopes as defined by rfc https://datatracker.ietf.org/doc/html/rfc8693") + root.Flags().Bool(authBearerUseScopesFlag, + false, "Use scopes as defined by rfc https://datatracker.ietf.org/doc/html/rfc8693") viper.SetEnvKeyReplacer(strings.NewReplacer(".", "_", "-", "_")) viper.AutomaticEnv() + err := viper.BindPFlags(root.Flags()) if err != nil { panic(err) @@ -77,10 +89,11 @@ func NewRootCommand() *cobra.Command { } func Execute() { - if err := NewRootCommand().Execute(); err != nil { - if _, err := fmt.Fprintln(os.Stderr, err); err != nil { + if err := rootCommand().Execute(); err != nil { + if _, err = fmt.Fprintln(os.Stderr, err); err != nil { panic(err) } + os.Exit(1) } } diff --git a/cmd/server.go b/cmd/server.go index 5c1d7613f..5a0e6b733 100644 --- a/cmd/server.go +++ b/cmd/server.go @@ -1,49 +1,33 @@ package cmd import ( - "context" - "net" - "net/http" - "runtime/debug" "strings" + "github.com/bombsimon/logrusr/v3" + "github.com/numary/payments/internal/app/api" + "github.com/numary/payments/internal/app/database" + "github.com/pkg/errors" + "go.opentelemetry.io/otel" + "github.com/Shopify/sarama" "github.com/ThreeDotsLabs/watermill/message" - "github.com/bombsimon/logrusr/v3" - "github.com/gorilla/mux" - "github.com/numary/go-libs/oauth2/oauth2introspect" - "github.com/numary/go-libs/sharedauth" "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedlogging/sharedlogginglogrus" - sharedotlp "github.com/numary/go-libs/sharedotlp/pkg" "github.com/numary/go-libs/sharedotlp/pkg/sharedotlptraces" "github.com/numary/go-libs/sharedpublish" "github.com/numary/go-libs/sharedpublish/sharedpublishhttp" "github.com/numary/go-libs/sharedpublish/sharedpublishkafka" - "github.com/numary/payments/pkg/api" - "github.com/numary/payments/pkg/bridge/cdi" - "github.com/numary/payments/pkg/bridge/connectors/dummypay" - "github.com/numary/payments/pkg/bridge/connectors/modulr" - "github.com/numary/payments/pkg/bridge/connectors/stripe" - "github.com/numary/payments/pkg/bridge/connectors/wise" - bridgeHttp "github.com/numary/payments/pkg/bridge/http" - "github.com/numary/payments/pkg/database" - paymentapi "github.com/numary/payments/pkg/http" - "github.com/pkg/errors" - "github.com/rs/cors" "github.com/sirupsen/logrus" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/uptrace/opentelemetry-go-extra/otellogrus" "github.com/xdg-go/scram" - "go.mongodb.org/mongo-driver/mongo" - "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" - "go.opentelemetry.io/otel" "go.uber.org/fx" ) +//nolint:gosec // false positive const ( - mongodbUriFlag = "mongodb-uri" + mongodbURIFlag = "mongodb-uri" mongodbDatabaseFlag = "mongodb-database" otelTracesFlag = "otel-traces" otelTracesExporterFlag = "otel-traces-exporter" @@ -63,11 +47,11 @@ const ( publisherKafkaSASLScramSHASize = "publisher-kafka-sasl-scram-sha-size" publisherKafkaTLSEnabled = "publisher-kafka-tls-enabled" publisherTopicMappingFlag = "publisher-topic-mapping" - publisherHttpEnabledFlag = "publisher-http-enabled" + publisherHTTPEnabledFlag = "publisher-http-enabled" authBasicEnabledFlag = "auth-basic-enabled" authBasicCredentialsFlag = "auth-basic-credentials" authBearerEnabledFlag = "auth-bearer-enabled" - authBearerIntrospectUrlFlag = "auth-bearer-introspect-url" + authBearerIntrospectURLFlag = "auth-bearer-introspect-url" authBearerAudienceFlag = "auth-bearer-audience" authBearerAudiencesWildcardFlag = "auth-bearer-audiences-wildcard" authBearerUseScopesFlag = "auth-bearer-use-scopes" @@ -75,223 +59,153 @@ const ( serviceName = "Payments" ) -func NewServer() *cobra.Command { +func newServer() *cobra.Command { return &cobra.Command{ Use: "server", Short: "Launch server", SilenceUsage: true, - RunE: func(cmd *cobra.Command, args []string) error { - l := logrus.New() - if viper.GetBool(debugFlag) { - l.SetLevel(logrus.DebugLevel) - } - if viper.GetBool(otelTracesFlag) { - l.AddHook(otellogrus.NewHook(otellogrus.WithLevels( - logrus.PanicLevel, - logrus.FatalLevel, - logrus.ErrorLevel, - logrus.WarnLevel, - ))) - l.SetFormatter(&logrus.JSONFormatter{}) - } - sharedlogging.SetFactory(sharedlogging.StaticLoggerFactory(sharedlogginglogrus.New(l))) - - // Add a dedicated logger for opentelemetry in case of error - otel.SetLogger(logrusr.New(logrus.New().WithField("component", "otlp"))) - - mongodbUri := viper.GetString(mongodbUriFlag) - if mongodbUri == "" { - return errors.New("missing mongodb uri") - } - - mongodbDatabase := viper.GetString(mongodbDatabaseFlag) - if mongodbDatabase == "" { - return errors.New("missing mongodb database name") - } - - topics := viper.GetStringSlice(publisherTopicMappingFlag) - mapping := make(map[string]string) - for _, topic := range topics { - parts := strings.SplitN(topic, ":", 2) - if len(parts) != 2 { - panic("invalid topic flag") - } - mapping[parts[0]] = parts[1] - } - - options := make([]fx.Option, 0) - if !viper.GetBool(debugFlag) { - options = append(options, fx.NopLogger) - } - //if viper.GetBool(otelTracesFlag) { - // options = append(options, database.MongoMonitor()) - //} - options = append(options, - database.MongoModule(mongodbUri, mongodbDatabase), - sharedotlptraces.TracesModule(sharedotlptraces.ModuleConfig{ - Exporter: viper.GetString(otelTracesExporterFlag), - OTLPConfig: &sharedotlptraces.OTLPConfig{ - Mode: viper.GetString(otelTracesExporterOTLPModeFlag), - Endpoint: viper.GetString(otelTracesExporterOTLPEndpointFlag), - Insecure: viper.GetBool(otelTracesExporterOTLPInsecureFlag), - }, - }), - fx.Provide(fx.Annotate(func(p message.Publisher) *sharedpublish.TopicMapperPublisher { - return sharedpublish.NewTopicMapperPublisher(p, mapping) - }, fx.As(new(sharedpublish.Publisher)))), - HTTPModule(), - ) - - options = append(options, sharedpublish.Module()) - switch { - case viper.GetBool(publisherHttpEnabledFlag): - options = append(options, sharedpublishhttp.Module()) - case viper.GetBool(publisherKafkaEnabledFlag): - options = append(options, - sharedpublishkafka.Module(serviceName, viper.GetStringSlice(publisherKafkaBrokerFlag)...), - sharedpublishkafka.ProvideSaramaOption( - sharedpublishkafka.WithConsumerReturnErrors(), - sharedpublishkafka.WithProducerReturnSuccess(), - ), - ) - if viper.GetBool(publisherKafkaTLSEnabled) { - options = append(options, sharedpublishkafka.ProvideSaramaOption(sharedpublishkafka.WithTLS())) - } - if viper.GetBool(publisherKafkaSASLEnabled) { - options = append(options, sharedpublishkafka.ProvideSaramaOption( - sharedpublishkafka.WithSASLEnabled(), - sharedpublishkafka.WithSASLCredentials( - viper.GetString(publisherKafkaSASLUsername), - viper.GetString(publisherKafkaSASLPassword), - ), - sharedpublishkafka.WithSASLMechanism(sarama.SASLMechanism(viper.GetString(publisherKafkaSASLMechanism))), - sharedpublishkafka.WithSASLScramClient(func() sarama.SCRAMClient { - var fn scram.HashGeneratorFcn - switch viper.GetInt(publisherKafkaSASLScramSHASize) { - case 512: - fn = sharedpublishkafka.SHA512 - case 256: - fn = sharedpublishkafka.SHA256 - default: - panic("sha size not handled") - } - return &sharedpublishkafka.XDGSCRAMClient{ - HashGeneratorFcn: fn, - } - }), - )) - } - } - - err := fx.New(options...).Start(cmd.Context()) - if err != nil { - return err - } - <-cmd.Context().Done() - return nil - }} + RunE: runServer, + } } -func HTTPModule() fx.Option { - return fx.Options( - fx.Invoke(func(m *mux.Router, lc fx.Lifecycle) { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - conn, err := net.Listen("tcp", ":8080") - if err != nil { - return err - } - go func() { - err := http.Serve(conn, m) - if err != nil { - panic(err) - } - }() - return nil - }, - }) - }), - fx.Provide(fx.Annotate(func(db *mongo.Database, client *mongo.Client, handlers []cdi.ConnectorHandler) (*mux.Router, error) { - - rootMux := mux.NewRouter() - if viper.GetBool(otelTracesFlag) { - rootMux.Use(otelmux.Middleware(serviceName)) - } - rootMux.Use( - paymentapi.Recovery(func(ctx context.Context, e interface{}) { - if viper.GetBool(otelTracesFlag) { - sharedotlp.RecordAsError(ctx, e) - } else { - logrus.Errorln(e) - debug.PrintStack() - } - }), - cors.New(cors.Options{ - AllowedOrigins: []string{"*"}, - AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodPut}, - AllowCredentials: true, - }).Handler, - ) - rootMux.Use(func(handler http.Handler) http.Handler { - return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - w.Header().Set("Content-Type", "application/json") - handler.ServeHTTP(w, r) - }) - }) - rootMux.Path("/_health").Handler(paymentapi.HealthHandler(client)) - rootMux.Path("/_live").Handler(paymentapi.LiveHandler()) - authenticatedRouter := rootMux.Name("authenticated").Subrouter() - methods := make([]sharedauth.Method, 0) - if viper.GetBool(authBasicEnabledFlag) { - credentials := sharedauth.Credentials{} - for _, kv := range viper.GetStringSlice(authBasicCredentialsFlag) { - parts := strings.SplitN(kv, ":", 2) - credentials[parts[0]] = sharedauth.Credential{ - Password: parts[1], - Scopes: append(api.AllScopes, bridgeHttp.AllScopes...), - } - } - methods = append(methods, sharedauth.NewHTTPBasicMethod(credentials)) - } - if viper.GetBool(authBearerEnabledFlag) { - methods = append(methods, sharedauth.NewHttpBearerMethod( - sharedauth.NewIntrospectionValidator( - oauth2introspect.NewIntrospecter(viper.GetString(authBearerIntrospectUrlFlag)), - viper.GetBool(authBearerAudiencesWildcardFlag), - sharedauth.AudienceIn(viper.GetStringSlice(authBearerAudienceFlag)...), - ), - )) - } - if len(methods) > 0 { - authenticatedRouter.Use(sharedauth.Middleware(methods...)) - } - connectorsSubRouter := authenticatedRouter.PathPrefix("/connectors").Subrouter() - for _, h := range handlers { - connectorsSubRouter.PathPrefix("/" + h.Name).Handler( - http.StripPrefix("/connectors", h.Handler), - ) - } - authenticatedRouter.PathPrefix("/").Handler( - api.PaymentsRouter(db, viper.GetBool(authBearerUseScopesFlag)), - ) - - return rootMux, nil - }, fx.ParamTags(``, ``, `group:"connectorHandlers"`))), - cdi.ConnectorModule[stripe.Config, stripe.TaskDescriptor]( - viper.GetBool(authBearerUseScopesFlag), - stripe.NewLoader(), - ), - cdi.ConnectorModule[dummypay.Config, dummypay.TaskDescriptor]( - viper.GetBool(authBearerUseScopesFlag), - dummypay.NewLoader(), - ), - cdi.ConnectorModule( - viper.GetBool(authBearerUseScopesFlag), - wise.NewLoader(), - ), - cdi.ConnectorModule( - viper.GetBool(authBearerUseScopesFlag), - modulr.NewLoader(), - ), - ) +func runServer(cmd *cobra.Command, args []string) error { + setLogger() + + databaseOptions, err := prepareDatabaseOptions() + if err != nil { + return err + } + + options := make([]fx.Option, 0) + + if !viper.GetBool(debugFlag) { + options = append(options, fx.NopLogger) + } + + options = append(options, databaseOptions) + + options = append(options, sharedotlptraces.TracesModule(sharedotlptraces.ModuleConfig{ + Exporter: viper.GetString(otelTracesExporterFlag), + OTLPConfig: &sharedotlptraces.OTLPConfig{ + Mode: viper.GetString(otelTracesExporterOTLPModeFlag), + Endpoint: viper.GetString(otelTracesExporterOTLPEndpointFlag), + Insecure: viper.GetBool(otelTracesExporterOTLPInsecureFlag), + }, + })) + + options = append(options, + fx.Provide(fx.Annotate(func(p message.Publisher) *sharedpublish.TopicMapperPublisher { + return sharedpublish.NewTopicMapperPublisher(p, topicsMapping()) + }, fx.As(new(sharedpublish.Publisher))))) + + options = append(options, api.HTTPModule()) + options = append(options, sharedpublish.Module()) + + switch { + case viper.GetBool(publisherHTTPEnabledFlag): + options = append(options, sharedpublishhttp.Module()) + case viper.GetBool(publisherKafkaEnabledFlag): + options = append(options, + sharedpublishkafka.Module(serviceName, viper.GetStringSlice(publisherKafkaBrokerFlag)...), + sharedpublishkafka.ProvideSaramaOption( + sharedpublishkafka.WithConsumerReturnErrors(), + sharedpublishkafka.WithProducerReturnSuccess(), + ), + ) + + if viper.GetBool(publisherKafkaTLSEnabled) { + options = append(options, sharedpublishkafka.ProvideSaramaOption(sharedpublishkafka.WithTLS())) + } + + if viper.GetBool(publisherKafkaSASLEnabled) { + options = append(options, sharedpublishkafka.ProvideSaramaOption( + sharedpublishkafka.WithSASLEnabled(), + sharedpublishkafka.WithSASLCredentials( + viper.GetString(publisherKafkaSASLUsername), + viper.GetString(publisherKafkaSASLPassword), + ), + sharedpublishkafka.WithSASLMechanism(sarama.SASLMechanism(viper.GetString(publisherKafkaSASLMechanism))), + sharedpublishkafka.WithSASLScramClient(setSCRAMClient), + )) + } + } + + err = fx.New(options...).Start(cmd.Context()) + if err != nil { + return err + } + + <-cmd.Context().Done() + + return nil +} + +func setLogger() { + log := logrus.New() + + if viper.GetBool(debugFlag) { + log.SetLevel(logrus.DebugLevel) + } + + if viper.GetBool(otelTracesFlag) { + log.AddHook(otellogrus.NewHook(otellogrus.WithLevels( + logrus.PanicLevel, + logrus.FatalLevel, + logrus.ErrorLevel, + logrus.WarnLevel, + ))) + log.SetFormatter(&logrus.JSONFormatter{}) + } + + sharedlogging.SetFactory(sharedlogging.StaticLoggerFactory(sharedlogginglogrus.New(log))) + + // Add a dedicated logger for opentelemetry in case of error + otel.SetLogger(logrusr.New(logrus.New().WithField("component", "otlp"))) +} + +func prepareDatabaseOptions() (fx.Option, error) { + mongodbURI := viper.GetString(mongodbURIFlag) + if mongodbURI == "" { + return nil, errors.New("missing mongodb uri") + } + + mongodbDatabase := viper.GetString(mongodbDatabaseFlag) + if mongodbDatabase == "" { + return nil, errors.New("missing mongodb database name") + } + + return database.MongoModule(mongodbURI, mongodbDatabase), nil +} + +func topicsMapping() map[string]string { + topics := viper.GetStringSlice(publisherTopicMappingFlag) + mapping := make(map[string]string) + + for _, topic := range topics { + parts := strings.SplitN(topic, ":", 2) + if len(parts) != 2 { + panic("invalid topic flag") + } + + mapping[parts[0]] = parts[1] + } + + return mapping +} + +func setSCRAMClient() sarama.SCRAMClient { + var fn scram.HashGeneratorFcn + + switch viper.GetInt(publisherKafkaSASLScramSHASize) { + case 512: + fn = sharedpublishkafka.SHA512 + case 256: + fn = sharedpublishkafka.SHA256 + default: + panic("sha size not handled") + } + + return &sharedpublishkafka.XDGSCRAMClient{ + HashGeneratorFcn: fn, + } } diff --git a/cmd/version.go b/cmd/version.go index 5b9e9ff24..9cad90866 100644 --- a/cmd/version.go +++ b/cmd/version.go @@ -1,21 +1,21 @@ package cmd import ( - "fmt" + "log" "github.com/spf13/cobra" ) -func PrintVersion(cmd *cobra.Command, args []string) { - fmt.Printf("Version: %s \n", Version) - fmt.Printf("Date: %s \n", BuildDate) - fmt.Printf("Commit: %s \n", Commit) -} - -func NewVersion() *cobra.Command { +func newVersion() *cobra.Command { return &cobra.Command{ Use: "version", Short: "Get version", - Run: PrintVersion, + Run: printVersion, } } + +func printVersion(cmd *cobra.Command, args []string) { + log.Printf("Version: %s \n", Version) + log.Printf("Date: %s \n", BuildDate) + log.Printf("Commit: %s \n", Commit) +} diff --git a/docker-compose.yml b/docker-compose.yml index 5410e30c5..b04d35207 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -12,7 +12,7 @@ services: - "27017:27017/tcp" payments: - image: golang:1.18-alpine + image: golang:1.18-alpine3.16 depends_on: - mongodb command: diff --git a/go.mod b/go.mod index b44ec7b2c..0ce9f6108 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,6 @@ module github.com/numary/payments -go 1.18 +go 1.19 require ( github.com/Shopify/sarama v1.36.0 @@ -40,6 +40,7 @@ require ( github.com/ThreeDotsLabs/watermill-http v1.1.4 // indirect github.com/ThreeDotsLabs/watermill-kafka/v2 v2.2.2 // indirect github.com/ajg/form v1.5.1 // indirect + github.com/andybalholm/brotli v1.0.4 // indirect github.com/cenkalti/backoff v2.2.1+incompatible // indirect github.com/cenkalti/backoff/v4 v4.1.3 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect @@ -57,6 +58,7 @@ require ( github.com/go-chi/render v1.0.2 // indirect github.com/go-logr/logr v1.2.3 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/gofiber/fiber/v2 v2.38.1 // indirect github.com/golang-jwt/jwt v3.2.2+incompatible // indirect github.com/golang/glog v1.0.0 // indirect github.com/golang/protobuf v1.5.2 // indirect @@ -95,6 +97,9 @@ require ( github.com/spf13/pflag v1.0.5 // indirect github.com/subosito/gotenv v1.4.1 // indirect github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.15 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.40.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect github.com/xdg-go/stringprep v1.0.3 // indirect github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect diff --git a/go.sum b/go.sum index 15c036eb3..fcf10074d 100644 --- a/go.sum +++ b/go.sum @@ -64,6 +64,8 @@ github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuy github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/benbjohnson/clock v1.3.0 h1:ip6w0uFQkncKQ979AypyG0ER7mqUSBdKLOgAle/AT8A= @@ -174,6 +176,8 @@ github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.0.6/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofiber/fiber/v2 v2.38.1 h1:GEQ/Yt3Wsf2a30iTqtLXlBYJZso0JXPovt/tmj5H9jU= +github.com/gofiber/fiber/v2 v2.38.1/go.mod h1:t0NlbaXzuGH7I+7M4paE848fNWInZ7mfxI/Er1fTth8= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.1/go.mod h1:hp+jE20tsWTFYpLwKvXlhS1hjn+gTNwPg2I6zVXpSg4= github.com/golang-jwt/jwt v3.2.2+incompatible h1:IfV12K8xAKAnZqdXVzCZ+TOjboZ2keLg81eXfW3O+oY= @@ -310,6 +314,7 @@ github.com/julienschmidt/httprouter v1.3.0/go.mod h1:JR6WtHb+2LUe8TCKY3cZOxFyyO8 github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.0/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= github.com/klauspost/compress v1.15.9 h1:wKRjX6JRtDdrE9qwa4b/Cip7ACOshUI4smpCQanqjSY= github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -473,6 +478,12 @@ github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.15 h1:5eYO+onNB1mbdc3+uw github.com/uptrace/opentelemetry-go-extra/otelutil v0.1.15/go.mod h1:6fGFPZDTcvHLxgWTFvf8hHWQrRO1tMXAFlxlqE+c650= github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.40.0 h1:CRq/00MfruPGFLTQKY8b+8SfdK60TxNztjRMnH0t1Yc= +github.com/valyala/fasthttp v1.40.0/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/vishvananda/netlink v1.1.0/go.mod h1:cTgwzPIzzgDAYoQrMm0EdrjRUBkTqKYppBueQtXaqoE= github.com/vishvananda/netns v0.0.0-20191106174202-0a2b9b5464df/go.mod h1:JP3t17pCcGlemwknint6hfoeCVQrEMVwxRLRjXpq+BU= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= @@ -550,6 +561,7 @@ golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220824171710-5757bc0c5503 h1:vJ2V3lFLg+bBhgroYuRfyN583UzVveQmIXjc8T/y3to= @@ -625,6 +637,7 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220725212005-46097bf591d3/go.mod h1:AaygXjzTFtRAg2ttMY5RMuhpJ3cNnI0XpyFJD1iQRSM= golang.org/x/net v0.0.0-20220809184613-07c6da5e1ced/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.0.0-20220822230855-b0a4917ee28c h1:JVAXQ10yGGVbSyoer5VILysz6YKjdNT2bsvlayjqhes= @@ -708,7 +721,9 @@ golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20210906170528-6f6e22806c34/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211116061358-0a5406a5449c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/internal/app/api/connector.go b/internal/app/api/connector.go new file mode 100644 index 000000000..357db34e6 --- /dev/null +++ b/internal/app/api/connector.go @@ -0,0 +1,167 @@ +package api + +import ( + "context" + "encoding/json" + "errors" + "net/http" + + "github.com/numary/payments/internal/pkg/integration" + "github.com/numary/payments/internal/pkg/payments" + + "github.com/gorilla/mux" + "github.com/numary/go-libs/sharedapi" + "github.com/numary/go-libs/sharedlogging" +) + +func handleError(w http.ResponseWriter, r *http.Request, err error) { + w.WriteHeader(http.StatusInternalServerError) + sharedlogging.GetLogger(r.Context()).Error(err) + // TODO: Opentracing + err = json.NewEncoder(w).Encode(sharedapi.ErrorResponse{ + ErrorCode: "INTERNAL", + ErrorMessage: err.Error(), + }) + if err != nil { + panic(err) + } +} + +func readConfig[Config payments.ConnectorConfigObject, + Descriptor payments.TaskDescriptor](connectorManager *integration.ConnectorManager[Config, Descriptor], +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + config, err := connectorManager.ReadConfig(r.Context()) + if err != nil { + handleError(w, r, err) + + return + } + + err = json.NewEncoder(w).Encode(config) + if err != nil { + panic(err) + } + } +} + +func listTasks[Config payments.ConnectorConfigObject, + Descriptor payments.TaskDescriptor](connectorManager *integration.ConnectorManager[Config, Descriptor], +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + tasks, err := connectorManager.ListTasksStates(r.Context()) + if err != nil { + handleError(w, r, err) + + return + } + + err = json.NewEncoder(w).Encode(tasks) + if err != nil { + panic(err) + } + } +} + +func readTask[Config payments.ConnectorConfigObject, + Descriptor payments.TaskDescriptor](connectorManager *integration.ConnectorManager[Config, Descriptor], +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + var descriptor Descriptor + + payments.DescriptorFromID(mux.Vars(r)["taskID"], &descriptor) + + tasks, err := connectorManager.ReadTaskState(r.Context(), descriptor) + if err != nil { + handleError(w, r, err) + + return + } + + err = json.NewEncoder(w).Encode(tasks) + if err != nil { + panic(err) + } + } +} + +func uninstall[Config payments.ConnectorConfigObject, + Descriptor payments.TaskDescriptor](connectorManager *integration.ConnectorManager[Config, Descriptor], +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + err := connectorManager.Uninstall(r.Context()) + if err != nil { + handleError(w, r, err) + + return + } + + w.WriteHeader(http.StatusNoContent) + } +} + +func install[Config payments.ConnectorConfigObject, + Descriptor payments.TaskDescriptor](connectorManager *integration.ConnectorManager[Config, Descriptor], +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + installed, err := connectorManager.IsInstalled(context.Background()) + if err != nil { + handleError(w, r, err) + + return + } + + if installed { + handleError(w, r, integration.ErrAlreadyInstalled) + + return + } + + var config Config + if r.ContentLength > 0 { + err = json.NewDecoder(r.Body).Decode(&config) + if err != nil { + handleError(w, r, err) + + return + } + } + + err = connectorManager.Install(r.Context(), config) + if err != nil { + handleError(w, r, err) + + return + } + + w.WriteHeader(http.StatusNoContent) + } +} + +func reset[Config payments.ConnectorConfigObject, + Descriptor payments.TaskDescriptor](connectorManager *integration.ConnectorManager[Config, Descriptor], +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + installed, err := connectorManager.IsInstalled(context.Background()) + if err != nil { + handleError(w, r, err) + + return + } + + if !installed { + handleError(w, r, errors.New("connector not installed")) + + return + } + + err = connectorManager.Reset(r.Context()) + if err != nil { + handleError(w, r, err) + + return + } + + w.WriteHeader(http.StatusNoContent) + } +} diff --git a/internal/app/api/connectormodule.go b/internal/app/api/connectormodule.go new file mode 100644 index 000000000..98ee32818 --- /dev/null +++ b/internal/app/api/connectormodule.go @@ -0,0 +1,110 @@ +package api + +import ( + "context" + "net/http" + + "github.com/gorilla/mux" + "github.com/numary/payments/internal/pkg/ingestion" + "github.com/numary/payments/internal/pkg/integration" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/payments/internal/pkg/writeonly" + + "github.com/numary/go-libs/sharedlogging" + "github.com/numary/go-libs/sharedpublish" + "go.mongodb.org/mongo-driver/mongo" + "go.uber.org/dig" + "go.uber.org/fx" +) + +type connectorHandler struct { + Handler http.Handler + Name string +} + +func addConnector[ + ConnectorConfig payments.ConnectorConfigObject, + TaskDescriptor payments.TaskDescriptor, +](loader integration.Loader[ConnectorConfig, TaskDescriptor], +) fx.Option { + return fx.Options( + fx.Provide(func(db *mongo.Database, + publisher sharedpublish.Publisher, + ) *integration.ConnectorManager[ConnectorConfig, TaskDescriptor] { + connectorStore := integration.NewMongoDBConnectorStore(db) + taskStore := task.NewMongoDBStore[TaskDescriptor](db) + logger := sharedlogging.GetLogger(context.Background()) + + schedulerFactory := integration.TaskSchedulerFactoryFn[TaskDescriptor](func( + resolver task.Resolver[TaskDescriptor], maxTasks int, + ) *task.DefaultTaskScheduler[TaskDescriptor] { + return task.NewDefaultScheduler[TaskDescriptor](loader.Name(), logger, + taskStore, task.ContainerFactoryFn(func(ctx context.Context, + descriptor payments.TaskDescriptor, + ) (*dig.Container, error) { + container := dig.New() + + if err := container.Provide(func() ingestion.Ingester { + return ingestion.NewDefaultIngester(loader.Name(), descriptor, db, + logger.WithFields(map[string]interface{}{ + "task-id": payments.IDFromDescriptor(descriptor), + }), publisher) + }); err != nil { + return nil, err + } + + err := container.Provide(func() writeonly.Storage { + return writeonly.NewMongoDBStorage(db, loader.Name(), descriptor) + }) + if err != nil { + panic(err) + } + + return container, nil + }), resolver, maxTasks) + }) + + return integration.NewConnectorManager[ConnectorConfig, TaskDescriptor](logger, + connectorStore, loader, schedulerFactory) + }), + fx.Provide(fx.Annotate(func(cm *integration.ConnectorManager[ConnectorConfig, + TaskDescriptor], + ) connectorHandler { + return connectorHandler{ + Handler: connectorRouter(loader.Name(), cm), + Name: loader.Name(), + } + }, fx.ResultTags(`group:"connectorHandlers"`))), + fx.Invoke(func(lc fx.Lifecycle, cm *integration.ConnectorManager[ConnectorConfig, TaskDescriptor]) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + _ = cm.Restore(ctx) + + return nil + }, + }) + }), + ) +} + +func connectorRouter[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor]( + name string, + manager *integration.ConnectorManager[Config, Descriptor], +) *mux.Router { + r := mux.NewRouter() + + r.Path("/" + name).Methods(http.MethodPost).Handler(install(manager)) + + r.Path("/" + name + "/reset").Methods(http.MethodPost).Handler(reset(manager)) + + r.Path("/" + name).Methods(http.MethodDelete).Handler(uninstall(manager)) + + r.Path("/" + name + "/config").Methods(http.MethodGet).Handler(readConfig(manager)) + + r.Path("/" + name + "/tasks").Methods(http.MethodGet).Handler(listTasks(manager)) + + r.Path("/" + name + "/tasks/{taskID}").Methods(http.MethodGet).Handler(readTask(manager)) + + return r +} diff --git a/pkg/http/health.go b/internal/app/api/health.go similarity index 67% rename from pkg/http/health.go rename to internal/app/api/health.go index 131c5493b..36c5c70aa 100644 --- a/pkg/http/health.go +++ b/internal/app/api/health.go @@ -1,4 +1,4 @@ -package http +package api import ( "context" @@ -9,21 +9,24 @@ import ( "go.mongodb.org/mongo-driver/mongo/readpref" ) -func HealthHandler(client *mongo.Client) http.HandlerFunc { +func healthHandler(client *mongo.Client) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - ctx, cancel := context.WithDeadline(r.Context(), time.Now().Add(3*time.Second)) + ctx, cancel := context.WithDeadline(r.Context(), time.Now(). + Add(3*time.Second)) //nolint:gomnd // allow for now defer cancel() err := client.Ping(ctx, readpref.Primary()) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) + return } + w.WriteHeader(http.StatusOK) } } -func LiveHandler() http.HandlerFunc { +func liveHandler() http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) } diff --git a/internal/app/api/module.go b/internal/app/api/module.go new file mode 100644 index 000000000..bd93545b9 --- /dev/null +++ b/internal/app/api/module.go @@ -0,0 +1,155 @@ +package api + +import ( + "context" + "net/http" + "runtime/debug" + "strings" + "time" + + "github.com/gorilla/mux" + "github.com/numary/go-libs/oauth2/oauth2introspect" + "github.com/numary/go-libs/sharedauth" + sharedotlp "github.com/numary/go-libs/sharedotlp/pkg" + "github.com/numary/payments/internal/pkg/connectors/dummypay" + "github.com/numary/payments/internal/pkg/connectors/modulr" + "github.com/numary/payments/internal/pkg/connectors/stripe" + "github.com/numary/payments/internal/pkg/connectors/wise" + "github.com/rs/cors" + "github.com/sirupsen/logrus" + "github.com/spf13/viper" + "go.mongodb.org/mongo-driver/mongo" + "go.opentelemetry.io/contrib/instrumentation/github.com/gorilla/mux/otelmux" + "go.uber.org/fx" +) + +//nolint:gosec // false positive +const ( + otelTracesFlag = "otel-traces" + authBasicEnabledFlag = "auth-basic-enabled" + authBasicCredentialsFlag = "auth-basic-credentials" + authBearerEnabledFlag = "auth-bearer-enabled" + authBearerIntrospectURLFlag = "auth-bearer-introspect-url" + authBearerAudienceFlag = "auth-bearer-audience" + authBearerAudiencesWildcardFlag = "auth-bearer-audiences-wildcard" + + serviceName = "Payments" +) + +func HTTPModule() fx.Option { + return fx.Options( + fx.Invoke(func(m *mux.Router, lc fx.Lifecycle) { + lc.Append(fx.Hook{ + OnStart: func(ctx context.Context) error { + go func() { + //nolint:gomnd // allow timeout values + srv := &http.Server{ + Handler: m, + Addr: "0.0.0.0:8080", + WriteTimeout: 15 * time.Second, + ReadTimeout: 15 * time.Second, + } + + err := srv.ListenAndServe() + if err != nil { + panic(err) + } + }() + + return nil + }, + }) + }), + fx.Provide(fx.Annotate(httpRouter, fx.ParamTags(``, ``, `group:"connectorHandlers"`))), + addConnector[dummypay.Config, dummypay.TaskDescriptor](dummypay.NewLoader()), + addConnector[modulr.Config, modulr.TaskDescriptor](modulr.NewLoader()), + addConnector[stripe.Config, stripe.TaskDescriptor](stripe.NewLoader()), + addConnector[wise.Config, wise.TaskDescriptor](wise.NewLoader()), + ) +} + +func httpRouter(db *mongo.Database, client *mongo.Client, handlers []connectorHandler) (*mux.Router, error) { + rootMux := mux.NewRouter() + + if viper.GetBool(otelTracesFlag) { + rootMux.Use(otelmux.Middleware(serviceName)) + } + + rootMux.Use(recoveryHandler(httpRecoveryFunc)) + rootMux.Use(httpCorsHandler()) + rootMux.Use(httpServeFunc) + + rootMux.Path("/_health").Handler(healthHandler(client)) + rootMux.Path("/_live").Handler(liveHandler()) + + authGroup := rootMux.Name("authenticated").Subrouter() + + if methods := sharedAuthMethods(); len(methods) > 0 { + authGroup.Use(sharedauth.Middleware(methods...)) + } + + connectorGroup := authGroup.PathPrefix("/connectors").Subrouter() + + for _, h := range handlers { + connectorGroup.PathPrefix("/" + h.Name).Handler( + http.StripPrefix("/connectors", h.Handler), + ) + } + + authGroup.PathPrefix("/").Handler(paymentsRouter(db)) + + return rootMux, nil +} + +func httpRecoveryFunc(ctx context.Context, e interface{}) { + if viper.GetBool(otelTracesFlag) { + sharedotlp.RecordAsError(ctx, e) + } else { + logrus.Errorln(e) + debug.PrintStack() + } +} + +func httpCorsHandler() func(http.Handler) http.Handler { + return cors.New(cors.Options{ + AllowedOrigins: []string{"*"}, + AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodPut}, + AllowCredentials: true, + }).Handler +} + +func httpServeFunc(handler http.Handler) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + handler.ServeHTTP(w, r) + }) +} + +func sharedAuthMethods() []sharedauth.Method { + methods := make([]sharedauth.Method, 0) + + if viper.GetBool(authBasicEnabledFlag) { + credentials := sharedauth.Credentials{} + + for _, kv := range viper.GetStringSlice(authBasicCredentialsFlag) { + parts := strings.SplitN(kv, ":", 2) + credentials[parts[0]] = sharedauth.Credential{ + Password: parts[1], + } + } + + methods = append(methods, sharedauth.NewHTTPBasicMethod(credentials)) + } + + if viper.GetBool(authBearerEnabledFlag) { + methods = append(methods, sharedauth.NewHttpBearerMethod( + sharedauth.NewIntrospectionValidator( + oauth2introspect.NewIntrospecter(viper.GetString(authBearerIntrospectURLFlag)), + viper.GetBool(authBearerAudiencesWildcardFlag), + sharedauth.AudienceIn(viper.GetStringSlice(authBearerAudienceFlag)...), + ), + )) + } + + return methods +} diff --git a/pkg/api/http.go b/internal/app/api/payments.go similarity index 78% rename from pkg/api/http.go rename to internal/app/api/payments.go index 55b13e1ca..4bfaa0bfe 100644 --- a/pkg/api/http.go +++ b/internal/app/api/payments.go @@ -5,11 +5,11 @@ import ( "net/http" "strings" + "github.com/numary/payments/internal/pkg/payments" + "github.com/gorilla/mux" "github.com/numary/go-libs/sharedapi" "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" - . "github.com/numary/payments/pkg/http" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -22,7 +22,7 @@ const ( func handleServerError(w http.ResponseWriter, r *http.Request, err error) { w.WriteHeader(http.StatusInternalServerError) sharedlogging.GetLogger(r.Context()).Error(err) - //TODO: Opentracing + // TODO: Opentracing err = json.NewEncoder(w).Encode(sharedapi.ErrorResponse{ ErrorCode: "INTERNAL", ErrorMessage: err.Error(), @@ -35,7 +35,7 @@ func handleServerError(w http.ResponseWriter, r *http.Request, err error) { func handleValidationError(w http.ResponseWriter, r *http.Request, err error) { w.WriteHeader(http.StatusBadRequest) sharedlogging.GetLogger(r.Context()).Error(err) - //TODO: Opentracing + // TODO: Opentracing err = json.NewEncoder(w).Encode(sharedapi.ErrorResponse{ ErrorCode: "VALIDATION", ErrorMessage: err.Error(), @@ -45,15 +45,17 @@ func handleValidationError(w http.ResponseWriter, r *http.Request, err error) { } } -func ListPaymentsHandler(db *mongo.Database) http.HandlerFunc { +func listPaymentsHandler(db *mongo.Database) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { - pipeline := make([]map[string]any, 0) + if sortParams := r.URL.Query()["sort"]; sortParams != nil { sort := bson.M{} + for _, s := range sortParams { parts := strings.SplitN(s, ":", 2) desc := false + if len(parts) > 1 { switch parts[1] { case "asc", "ASC": @@ -61,9 +63,11 @@ func ListPaymentsHandler(db *mongo.Database) http.HandlerFunc { desc = true default: handleValidationError(w, r, errors.New("sort order not well specified, got "+parts[1])) + return } } + key := parts[0] if key == "id" { key = "_id" @@ -73,29 +77,38 @@ func ListPaymentsHandler(db *mongo.Database) http.HandlerFunc { if desc { return -1 } + return 1 }() } + pipeline = append(pipeline, map[string]any{"$sort": sort}) } - skip, err := IntegerWithDefault(r, "skip", 0) + + skip, err := integerWithDefault(r, "skip", 0) if err != nil { handleValidationError(w, r, err) + return } + if skip != 0 { pipeline = append(pipeline, map[string]any{ "$skip": skip, }) } - limit, err := IntegerWithDefault(r, "limit", maxPerPage) + + limit, err := integerWithDefault(r, "limit", maxPerPage) if err != nil { handleValidationError(w, r, err) + return } + if limit > maxPerPage { limit = maxPerPage } + if limit != 0 { pipeline = append(pipeline, map[string]any{ "$limit": limit, @@ -105,14 +118,18 @@ func ListPaymentsHandler(db *mongo.Database) http.HandlerFunc { cursor, err := db.Collection(payments.Collection).Aggregate(r.Context(), pipeline) if err != nil { handleServerError(w, r, err) + return } + defer cursor.Close(r.Context()) ret := make([]payments.Payment, 0) + err = cursor.All(r.Context(), &ret) if err != nil { handleServerError(w, r, err) + return } @@ -121,51 +138,57 @@ func ListPaymentsHandler(db *mongo.Database) http.HandlerFunc { }) if err != nil { handleServerError(w, r, err) + return } } } -func ReadPaymentHandler(db *mongo.Database) http.HandlerFunc { +func readPaymentHandler(db *mongo.Database) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + paymentID := mux.Vars(r)["paymentID"] - paymentId := mux.Vars(r)["paymentId"] - - identifier, err := payments.IdentifierFromString(paymentId) + identifier, err := payments.IdentifierFromString(paymentID) if err != nil { w.WriteHeader(http.StatusNotFound) + return } ret := db.Collection(payments.Collection).FindOne(r.Context(), identifier) if ret.Err() != nil { - if ret.Err() == mongo.ErrNoDocuments { + if errors.Is(ret.Err(), mongo.ErrNoDocuments) { w.WriteHeader(http.StatusNotFound) + return } + handleServerError(w, r, ret.Err()) + return } - ob := &payments.Payment{} - err = ret.Decode(ob) + + payment := &payments.Payment{} + + err = ret.Decode(payment) if err != nil { handleServerError(w, r, err) + return } err = json.NewEncoder(w).Encode(sharedapi.BaseResponse[payments.Payment]{ - Data: ob, + Data: payment, }) if err != nil { handleServerError(w, r, err) + return } } } -func PaymentsRouter( - db *mongo.Database, - useScopes bool) *mux.Router { +func paymentsRouter(db *mongo.Database) *mux.Router { router := mux.NewRouter() router.Use(func(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { @@ -173,8 +196,8 @@ func PaymentsRouter( h.ServeHTTP(w, r) }) }) - router.Path("/payments").Methods(http.MethodGet).Handler(WrapHandler(useScopes, ListPaymentsHandler(db), ScopeReadPayments, ScopeWritePayments)) - router.Path("/payments/{paymentId}").Methods(http.MethodGet).Handler(WrapHandler(useScopes, ReadPaymentHandler(db), ScopeReadPayments, ScopeWritePayments)) + router.Path("/payments").Methods(http.MethodGet).Handler(listPaymentsHandler(db)) + router.Path("/payments/{paymentID}").Methods(http.MethodGet).Handler(readPaymentHandler(db)) return router } diff --git a/internal/app/api/queryparam.go b/internal/app/api/queryparam.go new file mode 100644 index 000000000..da1b8ac4f --- /dev/null +++ b/internal/app/api/queryparam.go @@ -0,0 +1,32 @@ +package api + +import ( + "net/http" + "strconv" +) + +func integer(r *http.Request, key string) (int64, bool, error) { + if value := r.URL.Query().Get(key); value != "" { + ret, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return 0, false, err + } + + return ret, true, nil + } + + return 0, false, nil +} + +func integerWithDefault(r *http.Request, key string, def int64) (int64, error) { + value, ok, err := integer(r, key) + if err != nil { + return 0, err + } + + if ok { + return value, nil + } + + return def, nil +} diff --git a/pkg/http/recovery.go b/internal/app/api/recovery.go similarity index 73% rename from pkg/http/recovery.go rename to internal/app/api/recovery.go index b1cfd02c3..fce3087e1 100644 --- a/pkg/http/recovery.go +++ b/internal/app/api/recovery.go @@ -1,11 +1,11 @@ -package http +package api import ( "context" "net/http" ) -func Recovery(reporter func(ctx context.Context, e interface{})) func(h http.Handler) http.Handler { +func recoveryHandler(reporter func(ctx context.Context, e interface{})) func(h http.Handler) http.Handler { return func(h http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { defer func() { diff --git a/pkg/database/indexes.go b/internal/app/database/indexes.go similarity index 55% rename from pkg/database/indexes.go rename to internal/app/database/indexes.go index 82836d019..4c2764e00 100644 --- a/pkg/database/indexes.go +++ b/internal/app/database/indexes.go @@ -2,83 +2,92 @@ package database import ( "context" - "fmt" + "log" "reflect" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/internal/pkg/payments" + "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/x/bsonx" ) -var indexes = map[string][]mongo.IndexModel{ - payments.Collection: { - { - Keys: bsonx.Doc{ - bsonx.Elem{ - Key: "provider", - Value: bsonx.Int32(1), - }, - bsonx.Elem{ - Key: "reference", - Value: bsonx.Int32(1), - }, - bsonx.Elem{ - Key: "type", - Value: bsonx.Int32(1), +func indexes() map[string][]mongo.IndexModel { + return map[string][]mongo.IndexModel{ + payments.Collection: { + { + Keys: bsonx.Doc{ + bsonx.Elem{ + Key: "provider", + Value: bsonx.Int32(1), + }, + bsonx.Elem{ + Key: "reference", + Value: bsonx.Int32(1), + }, + bsonx.Elem{ + Key: "type", + Value: bsonx.Int32(1), + }, }, + Options: options.Index().SetUnique(true).SetName("identifier"), }, - Options: options.Index().SetUnique(true).SetName("identifier"), - }, - { - Keys: bsonx.Doc{ - bsonx.Elem{ - Key: "provider", - Value: bsonx.Int32(1), + { + Keys: bsonx.Doc{ + bsonx.Elem{ + Key: "provider", + Value: bsonx.Int32(1), + }, }, + Options: options.Index().SetName("provider"), }, - Options: options.Index().SetName("provider"), - }, - { - Keys: bsonx.Doc{ - bsonx.Elem{ - Key: "type", - Value: bsonx.Int32(1), + { + Keys: bsonx.Doc{ + bsonx.Elem{ + Key: "type", + Value: bsonx.Int32(1), + }, }, + Options: options.Index().SetName("payment-type"), }, - Options: options.Index().SetName("payment-type"), - }, - { - Keys: bsonx.Doc{ - bsonx.Elem{ - Key: "reference", - Value: bsonx.Int32(1), + { + Keys: bsonx.Doc{ + bsonx.Elem{ + Key: "reference", + Value: bsonx.Int32(1), + }, }, + Options: options.Index().SetName("payment-reference"), }, - Options: options.Index().SetName("payment-reference"), }, - }, + } } -func CreateIndexes(ctx context.Context, db *mongo.Database) error { - - type StoredIndex struct { +// TODO: Refactor +// +//nolint:gocyclo,cyclop // allow for now +func createIndexes(ctx context.Context, db *mongo.Database) error { + type storedIndex struct { Unique bool `bson:"unique"` - Key bsonx.Doc `bson:"key"` - Name string `bson:"name"` - PartialFilterExpression interface{} `bson:"partialFilterExpression"` ExpireAfterSeconds int32 `bson:"expireAfterSeconds"` + Name string `bson:"name"` + Key bsonx.Doc `bson:"key"` Collation *options.Collation `bson:"collation"` + PartialFilterExpression interface{} `bson:"partialFilterExpression"` } - for entity, indexes := range indexes { + const id = "_id_" + + for entity, indexes := range indexes() { c := db.Collection(entity) + listCursor, err := c.Indexes().List(ctx) if err != nil { return err } - storedIndexes := make([]StoredIndex, 0) + storedIndexes := make([]storedIndex, 0) + err = listCursor.All(ctx, &storedIndexes) if err != nil { return err @@ -86,53 +95,66 @@ func CreateIndexes(ctx context.Context, db *mongo.Database) error { l: for _, storedIndex := range storedIndexes { - if storedIndex.Name == "_id_" { + if storedIndex.Name == id { continue l } + for _, index := range indexes { if *index.Options.Name != storedIndex.Name { continue } + var modified bool if !reflect.DeepEqual(index.Keys, storedIndex.Key) { - fmt.Printf("Keys of index %s of collection %s modified\r\n", *index.Options.Name, entity) + log.Printf("Keys of index %s of collection %s modified\r\n", *index.Options.Name, entity) modified = true } + if (index.Options.PartialFilterExpression == nil && storedIndex.PartialFilterExpression != nil) || (index.Options.PartialFilterExpression != nil && storedIndex.PartialFilterExpression == nil) || !reflect.DeepEqual(index.Options.PartialFilterExpression, storedIndex.PartialFilterExpression) { - fmt.Printf("PartialFilterExpression of index %s of collection %s modified\r\n", *index.Options.Name, entity) + log.Printf("PartialFilterExpression of index %s of collection %s modified\r\n", *index.Options.Name, entity) modified = true } - if (index.Options.Unique == nil && storedIndex.Unique) || (index.Options.Unique != nil && *index.Options.Unique != storedIndex.Unique) { - fmt.Printf("Uniqueness of index %s of collection %s modified\r\n", *index.Options.Name, entity) + + if (index.Options.Unique == nil && storedIndex.Unique) || + (index.Options.Unique != nil && *index.Options.Unique != storedIndex.Unique) { + log.Printf("Uniqueness of index %s of collection %s modified\r\n", *index.Options.Name, entity) modified = true } - if (index.Options.ExpireAfterSeconds == nil && storedIndex.ExpireAfterSeconds > 0) || (index.Options.ExpireAfterSeconds != nil && *index.Options.ExpireAfterSeconds != storedIndex.ExpireAfterSeconds) { - fmt.Printf("ExpireAfterSeconds of index %s of collection %s modified\r\n", *index.Options.Name, entity) + + if (index.Options.ExpireAfterSeconds == nil && storedIndex.ExpireAfterSeconds > 0) || + (index.Options.ExpireAfterSeconds != nil && *index.Options.ExpireAfterSeconds != storedIndex.ExpireAfterSeconds) { + log.Printf("ExpireAfterSeconds of index %s of collection %s modified\r\n", *index.Options.Name, entity) modified = true } + if (index.Options.Collation == nil && storedIndex.Collation != nil) || (index.Options.Collation != nil && storedIndex.Collation == nil) || !reflect.DeepEqual(index.Options.Collation, storedIndex.Collation) { - fmt.Printf("Collation of index %s of collection %s modified\r\n", *index.Options.Name, entity) + log.Printf("Collation of index %s of collection %s modified\r\n", *index.Options.Name, entity) modified = true } + if !modified { - fmt.Printf("Index %s of collection %s not modified\r\n", *index.Options.Name, entity) + log.Printf("Index %s of collection %s not modified\r\n", *index.Options.Name, entity) + continue l } - fmt.Printf("Recreate index %s on collection %s\r\n", *index.Options.Name, entity) + log.Printf("Recreate index %s on collection %s\r\n", *index.Options.Name, entity) + _, err = c.Indexes().DropOne(ctx, storedIndex.Name) if err != nil { - fmt.Printf("Unable to drop index %s of collection %s: %s\r\n", *index.Options.Name, entity, err) + log.Printf("Unable to drop index %s of collection %s: %s\r\n", *index.Options.Name, entity, err) + continue l } _, err = c.Indexes().CreateOne(ctx, index) if err != nil { - fmt.Printf("Unable to create index %s of collection %s: %s\r\n", *index.Options.Name, entity, err) + log.Printf("Unable to create index %s of collection %s: %s\r\n", *index.Options.Name, entity, err) + continue l } } @@ -141,18 +163,21 @@ func CreateIndexes(ctx context.Context, db *mongo.Database) error { // Check for deleted index l3: for _, storedIndex := range storedIndexes { - if storedIndex.Name == "_id_" { + if storedIndex.Name == id { continue l3 } + for _, index := range indexes { if *index.Options.Name == storedIndex.Name { continue l3 } } - fmt.Printf("Detected deleted index %s on collection %s\r\n", storedIndex.Name, entity) + + log.Printf("Detected deleted index %s on collection %s\r\n", storedIndex.Name, entity) + _, err = c.Indexes().DropOne(ctx, storedIndex.Name) if err != nil { - fmt.Printf("Unable to drop index %s of collection %s: %s\r\n", storedIndex.Name, entity, err) + log.Printf("Unable to drop index %s of collection %s: %s\r\n", storedIndex.Name, entity, err) } } @@ -164,13 +189,15 @@ func CreateIndexes(ctx context.Context, db *mongo.Database) error { continue l2 } } - fmt.Printf("Create new index %s on collection %s\r\n", *index.Options.Name, entity) + + log.Printf("Create new index %s on collection %s\r\n", *index.Options.Name, entity) + _, err = c.Indexes().CreateOne(ctx, index) if err != nil { - fmt.Printf("Unable to create index %s of collection %s: %s\r\n", *index.Options.Name, entity, err) + log.Printf("Unable to create index %s of collection %s: %s\r\n", *index.Options.Name, entity, err) } } - } + return nil } diff --git a/pkg/database/module.go b/internal/app/database/module.go similarity index 83% rename from pkg/database/module.go rename to internal/app/database/module.go index 40a207883..e82f211c3 100644 --- a/pkg/database/module.go +++ b/internal/app/database/module.go @@ -12,23 +12,24 @@ import ( "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" "go.mongodb.org/mongo-driver/mongo/readpref" - "go.opentelemetry.io/contrib/instrumentation/go.mongodb.org/mongo-driver/mongo/otelmongo" "go.uber.org/fx" ) func MongoModule(uri string, dbName string) fx.Option { - return fx.Options( fx.Provide(func() *options.ClientOptions { tM := reflect.TypeOf(bson.M{}) reg := bson.NewRegistryBuilder().RegisterTypeMapEntry(bsontype.EmbeddedDocument, tM).Build() + return options.Client(). SetRegistry(reg). ApplyURI(uri) }), + fx.Provide(func(opts *options.ClientOptions) (*mongo.Client, error) { return mongo.NewClient(opts) }), + fx.Provide(func(client *mongo.Client) *mongo.Database { return client.Database(dbName) }), @@ -39,7 +40,9 @@ func MongoModule(uri string, dbName string) fx.Option { if err != nil { return err } + sharedlogging.Debug("Ping database...") + ctx, cancel := context.WithDeadline(ctx, time.Now().Add(time.Second*5)) defer cancel() @@ -48,20 +51,14 @@ func MongoModule(uri string, dbName string) fx.Option { return err } - err = CreateIndexes(ctx, db) + err = createIndexes(ctx, db) if err != nil { return errors.Wrap(err, "creating indices") } + return nil }, }) }), ) } - -func MongoMonitor() fx.Option { - return fx.Decorate(func(opts *options.ClientOptions) *options.ClientOptions { - opts.SetMonitor(otelmongo.NewMonitor()) - return opts - }) -} diff --git a/pkg/bridge/connectors/dummypay/config.go b/internal/pkg/connectors/dummypay/config.go similarity index 95% rename from pkg/bridge/connectors/dummypay/config.go rename to internal/pkg/connectors/dummypay/config.go index c7a09fe87..22c1d63b6 100644 --- a/pkg/bridge/connectors/dummypay/config.go +++ b/internal/pkg/connectors/dummypay/config.go @@ -13,7 +13,7 @@ type Config struct { // FilePollingPeriod is the period between file polling. FilePollingPeriod time.Duration `json:"filePollingPeriod" yaml:"filePollingPeriod" bson:"filePollingPeriod"` - // FileGenerationPeriod is the period between file generation. + // FileGenerationPeriod is the period between file generation FileGenerationPeriod time.Duration `json:"fileGenerationPeriod" yaml:"fileGenerationPeriod" bson:"fileGenerationPeriod"` } diff --git a/pkg/bridge/connectors/dummypay/config_test.go b/internal/pkg/connectors/dummypay/config_test.go similarity index 100% rename from pkg/bridge/connectors/dummypay/config_test.go rename to internal/pkg/connectors/dummypay/config_test.go diff --git a/pkg/bridge/connectors/dummypay/connector.go b/internal/pkg/connectors/dummypay/connector.go similarity index 90% rename from pkg/bridge/connectors/dummypay/connector.go rename to internal/pkg/connectors/dummypay/connector.go index af0ea6786..46006d686 100644 --- a/pkg/bridge/connectors/dummypay/connector.go +++ b/internal/pkg/connectors/dummypay/connector.go @@ -4,8 +4,9 @@ import ( "context" "fmt" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/task" ) // connectorName is the name of the connector. @@ -52,8 +53,7 @@ func (c *Connector) Resolve(descriptor TaskDescriptor) task.Task { return handleResolve(c.cfg, descriptor, c.fs) } -// NewConnector creates a new dummy payment connector. -func NewConnector(logger sharedlogging.Logger, cfg Config, fs fs) *Connector { +func newConnector(logger sharedlogging.Logger, cfg Config, fs fs) *Connector { return &Connector{ logger: logger.WithFields(map[string]any{ "component": "connector", diff --git a/pkg/bridge/connectors/dummypay/connector_test.go b/internal/pkg/connectors/dummypay/connector_test.go similarity index 68% rename from pkg/bridge/connectors/dummypay/connector_test.go rename to internal/pkg/connectors/dummypay/connector_test.go index 6372ebf16..96c7861b1 100644 --- a/pkg/bridge/connectors/dummypay/connector_test.go +++ b/internal/pkg/connectors/dummypay/connector_test.go @@ -5,13 +5,14 @@ import ( "reflect" "testing" + "github.com/numary/payments/internal/pkg/payments" + task3 "github.com/numary/payments/internal/pkg/task" + "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/task" "github.com/stretchr/testify/assert" ) -// Create a minimal mock for connector installation +// Create a minimal mock for connector installation. type ( mockConnectorContext[TaskDescriptor payments.TaskDescriptor] struct { ctx context.Context @@ -23,11 +24,11 @@ func (mcc *mockConnectorContext[TaskDescriptor]) Context() context.Context { return mcc.ctx } -func (s mockScheduler[TaskDescriptor]) Schedule(p TaskDescriptor, restart bool) error { +func (mcc mockScheduler[TaskDescriptor]) Schedule(p TaskDescriptor, restart bool) error { return nil } -func (mcc *mockConnectorContext[TaskDescriptor]) Scheduler() task.Scheduler[TaskDescriptor] { +func (mcc *mockConnectorContext[TaskDescriptor]) Scheduler() task3.Scheduler[TaskDescriptor] { return mockScheduler[TaskDescriptor]{} } @@ -37,20 +38,20 @@ func TestConnector(t *testing.T) { config := Config{} logger := sharedlogging.GetLogger(context.Background()) - fs := newTestFS() + fileSystem := newTestFS() - connector := NewConnector(logger, config, fs) + connector := newConnector(logger, config, fileSystem) err := connector.Install(new(mockConnectorContext[TaskDescriptor])) assert.NoErrorf(t, err, "Install() failed") testCases := []struct { key taskKey - task task.Task + task task3.Task }{ - {taskKeyReadFiles, taskReadFiles(config, fs)}, - {taskKeyGenerateFiles, taskGenerateFiles(config, fs)}, - {taskKeyIngest, taskIngest(config, TaskDescriptor{}, fs)}, + {taskKeyReadFiles, taskReadFiles(config, fileSystem)}, + {taskKeyGenerateFiles, taskGenerateFiles(config, fileSystem)}, + {taskKeyIngest, taskIngest(config, TaskDescriptor{}, fileSystem)}, } for _, testCase := range testCases { diff --git a/pkg/bridge/connectors/dummypay/errors.go b/internal/pkg/connectors/dummypay/errors.go similarity index 100% rename from pkg/bridge/connectors/dummypay/errors.go rename to internal/pkg/connectors/dummypay/errors.go diff --git a/pkg/bridge/connectors/dummypay/fs.go b/internal/pkg/connectors/dummypay/fs.go similarity index 100% rename from pkg/bridge/connectors/dummypay/fs.go rename to internal/pkg/connectors/dummypay/fs.go diff --git a/pkg/bridge/connectors/dummypay/fs_test.go b/internal/pkg/connectors/dummypay/fs_test.go similarity index 100% rename from pkg/bridge/connectors/dummypay/fs_test.go rename to internal/pkg/connectors/dummypay/fs_test.go diff --git a/pkg/bridge/connectors/dummypay/loader.go b/internal/pkg/connectors/dummypay/loader.go similarity index 69% rename from pkg/bridge/connectors/dummypay/loader.go rename to internal/pkg/connectors/dummypay/loader.go index ef593ef28..41f9885bb 100644 --- a/pkg/bridge/connectors/dummypay/loader.go +++ b/internal/pkg/connectors/dummypay/loader.go @@ -3,19 +3,20 @@ package dummypay import ( "time" + "github.com/numary/payments/internal/pkg/integration" + "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/integration" ) -type loader struct{} +type Loader struct{} // Name returns the name of the connector. -func (l *loader) Name() string { +func (l *Loader) Name() string { return connectorName } // AllowTasks returns the amount of tasks that are allowed to be scheduled. -func (l *loader) AllowTasks() int { +func (l *Loader) AllowTasks() int { return 10 } @@ -28,7 +29,7 @@ const ( ) // ApplyDefaults applies default values to the configuration. -func (l *loader) ApplyDefaults(cfg Config) Config { +func (l *Loader) ApplyDefaults(cfg Config) Config { if cfg.FileGenerationPeriod == 0 { cfg.FileGenerationPeriod = defaultFileGenerationPeriod } @@ -41,11 +42,11 @@ func (l *loader) ApplyDefaults(cfg Config) Config { } // Load returns the connector. -func (l *loader) Load(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { - return NewConnector(logger, config, newFS()) +func (l *Loader) Load(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { + return newConnector(logger, config, newFS()) } // NewLoader creates a new loader. -func NewLoader() integration.Loader[Config, TaskDescriptor] { - return &loader{} +func NewLoader() *Loader { + return &Loader{} } diff --git a/pkg/bridge/connectors/dummypay/loader_test.go b/internal/pkg/connectors/dummypay/loader_test.go similarity index 90% rename from pkg/bridge/connectors/dummypay/loader_test.go rename to internal/pkg/connectors/dummypay/loader_test.go index 54a9293f5..fdd7b8384 100644 --- a/pkg/bridge/connectors/dummypay/loader_test.go +++ b/internal/pkg/connectors/dummypay/loader_test.go @@ -25,5 +25,5 @@ func TestLoader(t *testing.T) { FileGenerationPeriod: 5 * time.Second, }, loader.ApplyDefaults(config)) - assert.EqualValues(t, NewConnector(logger, config, newFS()), loader.Load(logger, config)) + assert.EqualValues(t, newConnector(logger, config, newFS()), loader.Load(logger, config)) } diff --git a/pkg/bridge/connectors/dummypay/payment.go b/internal/pkg/connectors/dummypay/payment.go similarity index 78% rename from pkg/bridge/connectors/dummypay/payment.go rename to internal/pkg/connectors/dummypay/payment.go index c1b06349b..c0e5381ef 100644 --- a/pkg/bridge/connectors/dummypay/payment.go +++ b/internal/pkg/connectors/dummypay/payment.go @@ -1,6 +1,8 @@ package dummypay -import payments "github.com/numary/payments/pkg" +import ( + "github.com/numary/payments/internal/pkg/payments" +) // payment represents a payment structure used in the generated files. type payment struct { diff --git a/pkg/bridge/connectors/dummypay/remove_files.go b/internal/pkg/connectors/dummypay/remove_files.go similarity index 100% rename from pkg/bridge/connectors/dummypay/remove_files.go rename to internal/pkg/connectors/dummypay/remove_files.go diff --git a/pkg/bridge/connectors/dummypay/task_descriptor.go b/internal/pkg/connectors/dummypay/task_descriptor.go similarity index 93% rename from pkg/bridge/connectors/dummypay/task_descriptor.go rename to internal/pkg/connectors/dummypay/task_descriptor.go index fd8340fc5..b38c8075f 100644 --- a/pkg/bridge/connectors/dummypay/task_descriptor.go +++ b/internal/pkg/connectors/dummypay/task_descriptor.go @@ -3,7 +3,7 @@ package dummypay import ( "fmt" - "github.com/numary/payments/pkg/bridge/task" + "github.com/numary/payments/internal/pkg/task" ) // taskKey defines a unique key of the task. diff --git a/pkg/bridge/connectors/dummypay/task_generate_file.go b/internal/pkg/connectors/dummypay/task_generate_file.go similarity index 85% rename from pkg/bridge/connectors/dummypay/task_generate_file.go rename to internal/pkg/connectors/dummypay/task_generate_file.go index f709cd091..edb463c5b 100644 --- a/pkg/bridge/connectors/dummypay/task_generate_file.go +++ b/internal/pkg/connectors/dummypay/task_generate_file.go @@ -7,8 +7,8 @@ import ( "math/rand" "time" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/task" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/payments/internal/pkg/task" ) const ( @@ -87,6 +87,7 @@ const nMax = 10000 func generateRandomNumber() int { rand.Seed(time.Now().UnixNano()) + //nolint:gosec // allow weak random number generator as it is not used for security value := rand.Intn(nMax) return value @@ -110,14 +111,14 @@ func generateRandomStatus() payments.Status { // ~50% chance. paymentStatus := payments.StatusSucceeded - n := generateRandomNumber() + num := generateRandomNumber() switch { - case n < nMax/4: // 25% chance + case num < nMax/4: // 25% chance paymentStatus = payments.StatusPending - case n < nMax/3: // ~9% chance + case num < nMax/3: // ~9% chance paymentStatus = payments.StatusFailed - case n < nMax/2: // ~16% chance + case num < nMax/2: // ~16% chance paymentStatus = payments.StatusCancelled } @@ -126,7 +127,7 @@ func generateRandomStatus() payments.Status { // generateRandomScheme generates a random payment scheme. func generateRandomScheme() payments.Scheme { - n := generateRandomNumber() / 1000 + num := generateRandomNumber() / 1000 //nolint:gomnd // allow for random number paymentScheme := payments.SchemeCardMasterCard @@ -143,13 +144,13 @@ func generateRandomScheme() payments.Scheme { payments.SchemeApplePay, payments.SchemeGooglePay, payments.SchemeA2A, - payments.SchemeAchDebit, - payments.SchemeAch, - payments.SchemeRtp, + payments.SchemeACHDebit, + payments.SchemeACH, + payments.SchemeRTP, } - if n < len(availableSchemes) { - paymentScheme = availableSchemes[n] + if num < len(availableSchemes) { + paymentScheme = availableSchemes[num] } return paymentScheme diff --git a/pkg/bridge/connectors/dummypay/task_ingest.go b/internal/pkg/connectors/dummypay/task_ingest.go similarity index 91% rename from pkg/bridge/connectors/dummypay/task_ingest.go rename to internal/pkg/connectors/dummypay/task_ingest.go index 95ba06977..2755e95f7 100644 --- a/pkg/bridge/connectors/dummypay/task_ingest.go +++ b/internal/pkg/connectors/dummypay/task_ingest.go @@ -6,9 +6,9 @@ import ( "fmt" "path/filepath" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/ingestion" - "github.com/numary/payments/pkg/bridge/task" + "github.com/numary/payments/internal/pkg/ingestion" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/payments/internal/pkg/task" ) const taskKeyIngest = "ingest" diff --git a/pkg/bridge/connectors/dummypay/task_read_files.go b/internal/pkg/connectors/dummypay/task_read_files.go similarity index 90% rename from pkg/bridge/connectors/dummypay/task_read_files.go rename to internal/pkg/connectors/dummypay/task_read_files.go index b96b7348f..1d6c6ec5d 100644 --- a/pkg/bridge/connectors/dummypay/task_read_files.go +++ b/internal/pkg/connectors/dummypay/task_read_files.go @@ -6,8 +6,9 @@ import ( "strings" "time" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/task" "github.com/spf13/afero" ) @@ -24,7 +25,8 @@ func newTaskReadFiles() TaskDescriptor { // Only reads files with the generatedFilePrefix in their name. func taskReadFiles(config Config, fs fs) task.Task { return func(ctx context.Context, logger sharedlogging.Logger, - scheduler task.Scheduler[TaskDescriptor]) error { + scheduler task.Scheduler[TaskDescriptor], + ) error { for { select { case <-ctx.Done(): @@ -53,7 +55,7 @@ func parseFilesToIngest(config Config, fs fs) ([]string, error) { return nil, fmt.Errorf("error reading directory '%s': %w", config.Directory, err) } - var files []string + var files []string //nolint:prealloc // length is unknown // iterate over all files in the directory. for _, file := range dir { @@ -63,7 +65,6 @@ func parseFilesToIngest(config Config, fs fs) ([]string, error) { } files = append(files, file.Name()) - } return files, nil diff --git a/pkg/bridge/connectors/dummypay/task_test.go b/internal/pkg/connectors/dummypay/task_test.go similarity index 100% rename from pkg/bridge/connectors/dummypay/task_test.go rename to internal/pkg/connectors/dummypay/task_test.go diff --git a/pkg/bridge/connectors/modulr/client/accounts.go b/internal/pkg/connectors/modulr/client/accounts.go similarity index 92% rename from pkg/bridge/connectors/modulr/client/accounts.go rename to internal/pkg/connectors/modulr/client/accounts.go index e581f36bc..194e250de 100644 --- a/pkg/bridge/connectors/modulr/client/accounts.go +++ b/internal/pkg/connectors/modulr/client/accounts.go @@ -6,13 +6,14 @@ import ( "net/http" ) +//nolint:tagliatelle // allow for clients type Account struct { ID string `json:"id"` Name string `json:"name"` Status string `json:"status"` Balance string `json:"balance"` Currency string `json:"currency"` - CustomerId string `json:"customerId"` + CustomerID string `json:"customerId"` Identifiers []struct { AccountNumber string `json:"accountNumber"` SortCode string `json:"sortCode"` diff --git a/pkg/bridge/connectors/modulr/client/client.go b/internal/pkg/connectors/modulr/client/client.go similarity index 87% rename from pkg/bridge/connectors/modulr/client/client.go rename to internal/pkg/connectors/modulr/client/client.go index d5f363d47..2ac3a6841 100644 --- a/pkg/bridge/connectors/modulr/client/client.go +++ b/internal/pkg/connectors/modulr/client/client.go @@ -4,7 +4,7 @@ import ( "fmt" "net/http" - "github.com/numary/payments/pkg/bridge/connectors/modulr/hmac" + "github.com/numary/payments/internal/pkg/connectors/modulr/hmac" ) type apiTransport struct { @@ -35,11 +35,11 @@ func (m *Client) buildEndpoint(path string, args ...interface{}) string { return fmt.Sprintf("%s/%s", m.endpoint, fmt.Sprintf(path, args...)) } -const sandboxApiEndpoint = "https://api-sandbox.modulrfinance.com/api-sandbox-token" +const sandboxAPIEndpoint = "https://api-sandbox.modulrfinance.com/api-sandbox-token" func NewClient(apiKey, apiSecret, endpoint string) (*Client, error) { if endpoint == "" { - endpoint = sandboxApiEndpoint + endpoint = sandboxAPIEndpoint } headers, err := hmac.GenerateHeaders(apiKey, apiSecret, "", false) diff --git a/pkg/bridge/connectors/modulr/client/transactions.go b/internal/pkg/connectors/modulr/client/transactions.go similarity index 86% rename from pkg/bridge/connectors/modulr/client/transactions.go rename to internal/pkg/connectors/modulr/client/transactions.go index 64fe49ffe..a9b90b3d6 100644 --- a/pkg/bridge/connectors/modulr/client/transactions.go +++ b/internal/pkg/connectors/modulr/client/transactions.go @@ -6,6 +6,7 @@ import ( "net/http" ) +//nolint:tagliatelle // allow different styled tags in client type Transaction struct { ID string `json:"id"` Type string `json:"type"` @@ -19,8 +20,8 @@ type Transaction struct { AdditionalInfo interface{} `json:"additionalInfo"` } -func (m *Client) GetTransactions(accountId string) ([]Transaction, error) { - resp, err := m.httpClient.Get(m.buildEndpoint("accounts/%s/transactions", accountId)) +func (m *Client) GetTransactions(accountID string) ([]Transaction, error) { + resp, err := m.httpClient.Get(m.buildEndpoint("accounts/%s/transactions", accountID)) if err != nil { return nil, err } @@ -37,5 +38,4 @@ func (m *Client) GetTransactions(accountId string) ([]Transaction, error) { } return res.Content, nil - } diff --git a/pkg/bridge/connectors/modulr/config.go b/internal/pkg/connectors/modulr/config.go similarity index 100% rename from pkg/bridge/connectors/modulr/config.go rename to internal/pkg/connectors/modulr/config.go diff --git a/internal/pkg/connectors/modulr/connector.go b/internal/pkg/connectors/modulr/connector.go new file mode 100644 index 000000000..da3da63be --- /dev/null +++ b/internal/pkg/connectors/modulr/connector.go @@ -0,0 +1,41 @@ +package modulr + +import ( + "context" + + "github.com/numary/go-libs/sharedlogging" + "github.com/numary/payments/internal/pkg/integration" + "github.com/numary/payments/internal/pkg/task" +) + +const connectorName = "modulr" + +type Connector struct { + logger sharedlogging.Logger + cfg Config +} + +func (c *Connector) Install(ctx task.ConnectorContext[TaskDescriptor]) error { + return ctx.Scheduler().Schedule(TaskDescriptor{ + Name: taskNameFetchAccounts, + }, false) +} + +func (c *Connector) Uninstall(ctx context.Context) error { + return nil +} + +func (c *Connector) Resolve(descriptor TaskDescriptor) task.Task { + return resolveTasks(c.logger, c.cfg)(descriptor) +} + +var _ integration.Connector[TaskDescriptor] = &Connector{} + +func newConnector(logger sharedlogging.Logger, cfg Config) *Connector { + return &Connector{ + logger: logger.WithFields(map[string]any{ + "component": "connector", + }), + cfg: cfg, + } +} diff --git a/pkg/bridge/connectors/modulr/errors.go b/internal/pkg/connectors/modulr/errors.go similarity index 100% rename from pkg/bridge/connectors/modulr/errors.go rename to internal/pkg/connectors/modulr/errors.go diff --git a/pkg/bridge/connectors/modulr/hmac/hmac.go b/internal/pkg/connectors/modulr/hmac/hmac.go similarity index 96% rename from pkg/bridge/connectors/modulr/hmac/hmac.go rename to internal/pkg/connectors/modulr/hmac/hmac.go index 8651a5fad..6bdfec3b5 100644 --- a/pkg/bridge/connectors/modulr/hmac/hmac.go +++ b/internal/pkg/connectors/modulr/hmac/hmac.go @@ -28,7 +28,8 @@ func GenerateHeaders(apiKey string, apiSecret string, nonce string, hasRetry boo } func constructHeadersMap(apiKey string, apiSecret string, nonce string, hasRetry bool, - timestamp time.Time) map[string]string { + timestamp time.Time, +) map[string]string { headers := make(map[string]string) date := timestamp.Format(time.RFC1123) nonce = generateNonceIfEmpty(nonce) diff --git a/pkg/bridge/connectors/modulr/hmac/hmac_test.go b/internal/pkg/connectors/modulr/hmac/hmac_test.go similarity index 94% rename from pkg/bridge/connectors/modulr/hmac/hmac_test.go rename to internal/pkg/connectors/modulr/hmac/hmac_test.go index f2d92fecf..456ae503a 100644 --- a/pkg/bridge/connectors/modulr/hmac/hmac_test.go +++ b/internal/pkg/connectors/modulr/hmac/hmac_test.go @@ -8,12 +8,16 @@ import ( ) func TestGenerateReturnsAnHMACString(t *testing.T) { + t.Parallel() + headers, _ := GenerateHeaders("api_key", "api_secret", "", false) expectedSignature := "Signature keyId=\"api_key\",algorithm=\"hmac-sha1\",headers=\"date x-mod-nonce\",signature=\"" assert.Equal(t, expectedSignature, headers["Authorization"][0:86], "generate should return the hmac headers") } func TestGenerateReturnsADateHeader(t *testing.T) { + t.Parallel() + timestamp := time.Date(2020, 1, 2, 15, 4, 5, 0, time.UTC) headers := constructHeadersMap("api_key", "api_secret", "", false, timestamp) @@ -24,32 +28,44 @@ func TestGenerateReturnsADateHeader(t *testing.T) { } func TestGenerateReturnsANonceHeaderWithExpectedValue(t *testing.T) { + t.Parallel() + nonce := "thisIsTheNonce" headers, _ := GenerateHeaders("api_key", "api_secret", nonce, false) assert.Equal(t, nonce, headers["x-mod-nonce"]) } func TestGenerateReturnsARetryHeaderWithTrueIfRetryIsExpected(t *testing.T) { + t.Parallel() + headers, _ := GenerateHeaders("api_key", "api_secret", "", true) assert.Equal(t, "true", headers["x-mod-retry"]) } func TestGenerateReturnsARetryHeaderWithFalseIfRetryIsNotExpected(t *testing.T) { + t.Parallel() + headers, _ := GenerateHeaders("api_key", "api_secret", "", false) assert.Equal(t, "false", headers["x-mod-retry"]) } func TestGenerateReturnsAGeneratedNonceHeaderIfNonceIsEmpty(t *testing.T) { + t.Parallel() + headers, _ := GenerateHeaders("api_key", "api_secret", "", false) assert.True(t, headers["x-mod-nonce"] != "", "x-mod-nonce header should have been populated") } func TestGenerateThrowsErrorIfApiKeyIsNull(t *testing.T) { + t.Parallel() + _, err := GenerateHeaders("", "api_secret", "", false) assert.ErrorIs(t, err, ErrInvalidCredentials) } func TestGenerateThrowsErrorIfApiSecretIsNull(t *testing.T) { + t.Parallel() + _, err := GenerateHeaders("api_key", "", "", false) assert.ErrorIs(t, err, ErrInvalidCredentials) } diff --git a/pkg/bridge/connectors/modulr/hmac/signature_generator.go b/internal/pkg/connectors/modulr/hmac/signature_generator.go similarity index 84% rename from pkg/bridge/connectors/modulr/hmac/signature_generator.go rename to internal/pkg/connectors/modulr/hmac/signature_generator.go index e5f5e735d..36e517a6d 100644 --- a/pkg/bridge/connectors/modulr/hmac/signature_generator.go +++ b/internal/pkg/connectors/modulr/hmac/signature_generator.go @@ -2,7 +2,7 @@ package hmac import ( "crypto/hmac" - "crypto/sha1" + "crypto/sha1" //nolint:gosec // we need sha1 for the hmac "encoding/base64" "net/url" ) @@ -15,11 +15,11 @@ const ( suffix = "\"" newline = "\n" nonceKey = "x-mod-nonce: " - keyIdPrefix = "Signature keyId=\"" + keyIDPrefix = "Signature keyId=\"" ) func buildSignature(apiKey string, apiSecret string, nonce string, date string) string { - keyID := keyIdPrefix + apiKey + "\"," + keyID := keyIDPrefix + apiKey + "\"," mac := hmac.New(sha1.New, []byte(apiSecret)) mac.Write([]byte(datePrefix + date + newline + nonceKey + nonce)) diff --git a/pkg/bridge/connectors/modulr/hmac/signature_test.go b/internal/pkg/connectors/modulr/hmac/signature_test.go similarity index 89% rename from pkg/bridge/connectors/modulr/hmac/signature_test.go rename to internal/pkg/connectors/modulr/hmac/signature_test.go index ceb3ea189..1670098fa 100644 --- a/pkg/bridge/connectors/modulr/hmac/signature_test.go +++ b/internal/pkg/connectors/modulr/hmac/signature_test.go @@ -9,13 +9,17 @@ import ( ) func TestGenerateReturnsSignatureWithKeyId(t *testing.T) { + t.Parallel() + signature := buildSignature("api_key", "api_secret", "", date()) expectedPrefix := "Signature keyId=\"api_key\"," - hasKeyId := strings.HasPrefix(signature, expectedPrefix) - assert.True(t, hasKeyId, "HMAC signature must contain the keyId") + hasKeyID := strings.HasPrefix(signature, expectedPrefix) + assert.True(t, hasKeyID, "HMAC signature must contain the keyId") } func TestGenerateReturnsSignatureWithAlgorithm(t *testing.T) { + t.Parallel() + signature := buildSignature("api_key", "api_secret", "", date()) expectedAlgorithm := "algorithm=\"hmac-sha1\"," actualValue := signature[26:48] @@ -23,6 +27,8 @@ func TestGenerateReturnsSignatureWithAlgorithm(t *testing.T) { } func TestGenerateReturnsSignatureWithHeaders(t *testing.T) { + t.Parallel() + signature := buildSignature("api_key", "api_secret", "", date()) expectedHeaders := "headers=\"date x-mod-nonce\"," actualValue := signature[48:75] @@ -30,6 +36,8 @@ func TestGenerateReturnsSignatureWithHeaders(t *testing.T) { } func TestGenerateReturnsSignatureWithSignatureValue(t *testing.T) { + t.Parallel() + signature := buildSignature("api_key", "api_secret", "", date()) expectedSignature := "signature=\"" actualValue := signature[75:86] @@ -37,12 +45,16 @@ func TestGenerateReturnsSignatureWithSignatureValue(t *testing.T) { } func TestGenerateReturnsHashedSignature(t *testing.T) { + t.Parallel() + signature := buildSignature("api_key", "api_secret", "", date()) actualValue := signature[86:117] assert.True(t, actualValue != "", "Encoded HMAC signature should be present") } func TestGenerateAcceptsANonce(t *testing.T) { + t.Parallel() + signature := buildSignature("api_key", "api_secret", "nonce", date()) actualValue := signature[86:116] expected := "9V8gi5Mp9MsL%2FO7mV6qZlBM9%2FR" @@ -51,5 +63,6 @@ func TestGenerateAcceptsANonce(t *testing.T) { func date() string { now, _ := time.Parse(time.RFC1123, "Mon, 02 Jan 2020 15:04:05 GMT") + return now.String() } diff --git a/internal/pkg/connectors/modulr/loader.go b/internal/pkg/connectors/modulr/loader.go new file mode 100644 index 000000000..abf4284cb --- /dev/null +++ b/internal/pkg/connectors/modulr/loader.go @@ -0,0 +1,31 @@ +package modulr + +import ( + "github.com/numary/go-libs/sharedlogging" + "github.com/numary/payments/internal/pkg/integration" +) + +type Loader struct{} + +const allowedTasks = 50 + +func (l *Loader) AllowTasks() int { + return allowedTasks +} + +func (l *Loader) Name() string { + return connectorName +} + +func (l *Loader) Load(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { + return newConnector(logger, config) +} + +func (l *Loader) ApplyDefaults(cfg Config) Config { + return cfg +} + +// NewLoader creates a new loader. +func NewLoader() *Loader { + return &Loader{} +} diff --git a/pkg/bridge/connectors/modulr/task_fetch_accounts.go b/internal/pkg/connectors/modulr/task_fetch_accounts.go similarity index 85% rename from pkg/bridge/connectors/modulr/task_fetch_accounts.go rename to internal/pkg/connectors/modulr/task_fetch_accounts.go index 34af57cb4..fd982f1cc 100644 --- a/pkg/bridge/connectors/modulr/task_fetch_accounts.go +++ b/internal/pkg/connectors/modulr/task_fetch_accounts.go @@ -3,9 +3,10 @@ package modulr import ( "context" + "github.com/numary/payments/internal/pkg/connectors/modulr/client" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/connectors/modulr/client" - "github.com/numary/payments/pkg/bridge/task" ) func taskFetchAccounts(logger sharedlogging.Logger, client *client.Client) task.Task { diff --git a/pkg/bridge/connectors/modulr/task_fetch_transactions.go b/internal/pkg/connectors/modulr/task_fetch_transactions.go similarity index 87% rename from pkg/bridge/connectors/modulr/task_fetch_transactions.go rename to internal/pkg/connectors/modulr/task_fetch_transactions.go index e87327b8b..c87d81ab2 100644 --- a/pkg/bridge/connectors/modulr/task_fetch_transactions.go +++ b/internal/pkg/connectors/modulr/task_fetch_transactions.go @@ -5,11 +5,12 @@ import ( "fmt" "strings" + "github.com/numary/payments/internal/pkg/connectors/modulr/client" + "github.com/numary/payments/internal/pkg/ingestion" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/connectors/modulr/client" - "github.com/numary/payments/pkg/bridge/ingestion" - "github.com/numary/payments/pkg/bridge/task" ) func taskFetchTransactions(logger sharedlogging.Logger, client *client.Client, accountID string) task.Task { diff --git a/pkg/bridge/connectors/modulr/task_resolve.go b/internal/pkg/connectors/modulr/task_resolve.go similarity index 91% rename from pkg/bridge/connectors/modulr/task_resolve.go rename to internal/pkg/connectors/modulr/task_resolve.go index 5d00c2243..5ac2edb2f 100644 --- a/pkg/bridge/connectors/modulr/task_resolve.go +++ b/internal/pkg/connectors/modulr/task_resolve.go @@ -3,9 +3,10 @@ package modulr import ( "fmt" + "github.com/numary/payments/internal/pkg/connectors/modulr/client" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/connectors/modulr/client" - "github.com/numary/payments/pkg/bridge/task" ) const ( diff --git a/pkg/bridge/connectors/stripe/client.go b/internal/pkg/connectors/stripe/client.go similarity index 86% rename from pkg/bridge/connectors/stripe/client.go rename to internal/pkg/connectors/stripe/client.go index 43a5aa171..04a84745e 100644 --- a/pkg/bridge/connectors/stripe/client.go +++ b/internal/pkg/connectors/stripe/client.go @@ -6,7 +6,8 @@ import ( "fmt" "net/http" - "github.com/numary/payments/pkg/bridge/writeonly" + "github.com/numary/payments/internal/pkg/writeonly" + "github.com/pkg/errors" "github.com/stripe/stripe-go/v72" ) @@ -33,20 +34,23 @@ type Client interface { ForAccount(account string) Client } -type defaultClient struct { +type DefaultClient struct { httpClient *http.Client apiKey string stripeAccount string storage writeonly.Storage } -func (d *defaultClient) ForAccount(account string) Client { +func (d *DefaultClient) ForAccount(account string) Client { cp := *d cp.stripeAccount = account + return &cp } -func (d *defaultClient) BalanceTransactions(ctx context.Context, options ...ClientOption) ([]*stripe.BalanceTransaction, bool, error) { +func (d *DefaultClient) BalanceTransactions(ctx context.Context, + options ...ClientOption, +) ([]*stripe.BalanceTransaction, bool, error) { req, err := http.NewRequest(http.MethodGet, balanceTransactionsEndpoint, nil) if err != nil { return nil, false, errors.Wrap(err, "creating http request") @@ -57,13 +61,16 @@ func (d *defaultClient) BalanceTransactions(ctx context.Context, options ...Clie for _, opt := range options { opt.apply(req) } + if d.stripeAccount != "" { req.Header.Set("Stripe-Account", d.stripeAccount) } + req.Header.Set("Content-Type", "application/x-www-form-urlencoded") req.SetBasicAuth(d.apiKey, "") // gfyrag: really weird authentication right? var httpResponse *http.Response + httpResponse, err = d.httpClient.Do(req) if err != nil { return nil, false, errors.Wrap(err, "doing request") @@ -80,28 +87,34 @@ func (d *defaultClient) BalanceTransactions(ctx context.Context, options ...Clie } rsp := &listResponse{} + err = json.NewDecoder(httpResponse.Body).Decode(rsp) if err != nil { return nil, false, errors.Wrap(err, "decoding response") } asBalanceTransactions := make([]*stripe.BalanceTransaction, 0) + if len(rsp.Data) > 0 { asMaps := make([]any, 0) for _, data := range rsp.Data { asMap := make(map[string]interface{}) - err := json.Unmarshal(data, &asMap) + + err = json.Unmarshal(data, &asMap) if err != nil { return nil, false, err } + asMaps = append(asMaps, asMap) asBalanceTransaction := &stripe.BalanceTransaction{} + err = json.Unmarshal(data, &asBalanceTransaction) if err != nil { return nil, false, err } + asBalanceTransactions = append(asBalanceTransactions, asBalanceTransaction) } @@ -114,12 +127,12 @@ func (d *defaultClient) BalanceTransactions(ctx context.Context, options ...Clie return asBalanceTransactions, rsp.HasMore, nil } -func NewDefaultClient(httpClient *http.Client, apiKey string, storage writeonly.Storage) *defaultClient { - return &defaultClient{ +func NewDefaultClient(httpClient *http.Client, apiKey string, storage writeonly.Storage) *DefaultClient { + return &DefaultClient{ httpClient: httpClient, apiKey: apiKey, storage: storage, } } -var _ Client = &defaultClient{} +var _ Client = &DefaultClient{} diff --git a/pkg/bridge/connectors/stripe/client_test.go b/internal/pkg/connectors/stripe/client_test.go similarity index 64% rename from pkg/bridge/connectors/stripe/client_test.go rename to internal/pkg/connectors/stripe/client_test.go index 25d26fab6..5c19645f6 100644 --- a/pkg/bridge/connectors/stripe/client_test.go +++ b/internal/pkg/connectors/stripe/client_test.go @@ -24,37 +24,27 @@ type httpMock struct { mu sync.Mutex } -func NewHTTPMock(t *testing.T) (*httpMock, *http.Client) { - m := &httpMock{ - t: t, - expectations: []httpMockExpectation{}, - } - return m, &http.Client{ - Transport: m, - } -} - -func (m *httpMock) RoundTrip(request *http.Request) (*http.Response, error) { - m.mu.Lock() - defer m.mu.Unlock() +func (mock *httpMock) RoundTrip(request *http.Request) (*http.Response, error) { + mock.mu.Lock() + defer mock.mu.Unlock() - if len(m.expectations) == 0 { + if len(mock.expectations) == 0 { return nil, fmt.Errorf("no more expectations") } - e := m.expectations[0] - if len(m.expectations) == 1 { - m.expectations = make([]httpMockExpectation, 0) + expectations := mock.expectations[0] + if len(mock.expectations) == 1 { + mock.expectations = make([]httpMockExpectation, 0) } else { - m.expectations = m.expectations[1:] + mock.expectations = mock.expectations[1:] } - return e.handle(m.t, request) + return expectations.handle(mock.t, request) } var _ http.RoundTripper = &httpMock{} -type httpExpect[REQUEST any, RESPONSE any] struct { +type HTTPExpect[REQUEST any, RESPONSE any] struct { statusCode int path string method string @@ -63,20 +53,25 @@ type httpExpect[REQUEST any, RESPONSE any] struct { queryParams map[string]any } -func (e *httpExpect[REQUEST, RESPONSE]) handle(t *testing.T, request *http.Request) (*http.Response, error) { +func (e *HTTPExpect[REQUEST, RESPONSE]) handle(t *testing.T, request *http.Request) (*http.Response, error) { + t.Helper() if e.path != request.URL.Path { return nil, fmt.Errorf("expected url was '%s', got, '%s'", e.path, request.URL.Path) } + if e.method != request.Method { return nil, fmt.Errorf("expected method was '%s', got, '%s'", e.method, request.Method) } + if e.requestBody != nil { body := new(REQUEST) + err := json.NewDecoder(request.Body).Decode(body) if err != nil { panic(err) } + if !reflect.DeepEqual(*e.responseBody, *body) { return nil, fmt.Errorf("mismatch body") } @@ -84,20 +79,24 @@ func (e *httpExpect[REQUEST, RESPONSE]) handle(t *testing.T, request *http.Reque for key, value := range e.queryParams { qpvalue := "" + switch value.(type) { case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: qpvalue = fmt.Sprintf("%d", value) default: qpvalue = fmt.Sprintf("%s", value) } + if rvalue := request.URL.Query().Get(key); rvalue != qpvalue { return nil, fmt.Errorf("expected query param '%s' with value '%s', got '%s'", key, qpvalue, rvalue) } } data := make([]byte, 0) + if e.responseBody != nil { var err error + data, err = json.Marshal(e.responseBody) if err != nil { panic(err) @@ -112,90 +111,111 @@ func (e *httpExpect[REQUEST, RESPONSE]) handle(t *testing.T, request *http.Reque }, nil } -func (e *httpExpect[REQUEST, RESPONSE]) Path(p string) *httpExpect[REQUEST, RESPONSE] { +func (e *HTTPExpect[REQUEST, RESPONSE]) Path(p string) *HTTPExpect[REQUEST, RESPONSE] { e.path = p + return e } -func (e *httpExpect[REQUEST, RESPONSE]) Method(p string) *httpExpect[REQUEST, RESPONSE] { +func (e *HTTPExpect[REQUEST, RESPONSE]) Method(p string) *HTTPExpect[REQUEST, RESPONSE] { e.method = p + return e } -func (e *httpExpect[REQUEST, RESPONSE]) Body(body *REQUEST) *httpExpect[REQUEST, RESPONSE] { +func (e *HTTPExpect[REQUEST, RESPONSE]) Body(body *REQUEST) *HTTPExpect[REQUEST, RESPONSE] { e.requestBody = body + return e } -func (e *httpExpect[REQUEST, RESPONSE]) QueryParam(key string, value any) *httpExpect[REQUEST, RESPONSE] { +func (e *HTTPExpect[REQUEST, RESPONSE]) QueryParam(key string, value any) *HTTPExpect[REQUEST, RESPONSE] { e.queryParams[key] = value + return e } -func (e *httpExpect[REQUEST, RESPONSE]) RespondsWith(statusCode int, body *RESPONSE) *httpExpect[REQUEST, RESPONSE] { +func (e *HTTPExpect[REQUEST, RESPONSE]) RespondsWith(statusCode int, + body *RESPONSE, +) *HTTPExpect[REQUEST, RESPONSE] { e.statusCode = statusCode e.responseBody = body + return e } -func Expect[REQUEST any, RESPONSE any](mock *httpMock) *httpExpect[REQUEST, RESPONSE] { - e := &httpExpect[REQUEST, RESPONSE]{ +func Expect[REQUEST any, RESPONSE any](mock *httpMock) *HTTPExpect[REQUEST, RESPONSE] { + expectations := &HTTPExpect[REQUEST, RESPONSE]{ queryParams: map[string]any{}, } + mock.mu.Lock() defer mock.mu.Unlock() - mock.expectations = append(mock.expectations, e) - return e + mock.expectations = append(mock.expectations, expectations) + + return expectations } -type stripeBalanceTransactionListExpect struct { - *httpExpect[struct{}, MockedListResponse] +type StripeBalanceTransactionListExpect struct { + *HTTPExpect[struct{}, MockedListResponse] } -func (e *stripeBalanceTransactionListExpect) Path(p string) *stripeBalanceTransactionListExpect { - e.httpExpect.Path(p) +func (e *StripeBalanceTransactionListExpect) Path(p string) *StripeBalanceTransactionListExpect { + e.HTTPExpect.Path(p) + return e } -func (e *stripeBalanceTransactionListExpect) Method(p string) *stripeBalanceTransactionListExpect { - e.httpExpect.Method(p) +func (e *StripeBalanceTransactionListExpect) Method(p string) *StripeBalanceTransactionListExpect { + e.HTTPExpect.Method(p) + return e } -func (e *stripeBalanceTransactionListExpect) QueryParam(key string, value any) *stripeBalanceTransactionListExpect { - e.httpExpect.QueryParam(key, value) +func (e *StripeBalanceTransactionListExpect) QueryParam(key string, + value any, +) *StripeBalanceTransactionListExpect { + e.HTTPExpect.QueryParam(key, value) + return e } -func (e *stripeBalanceTransactionListExpect) RespondsWith(statusCode int, hasMore bool, body ...*stripe.BalanceTransaction) *stripeBalanceTransactionListExpect { - e.httpExpect.RespondsWith(statusCode, &MockedListResponse{ +func (e *StripeBalanceTransactionListExpect) RespondsWith(statusCode int, hasMore bool, + body ...*stripe.BalanceTransaction, +) *StripeBalanceTransactionListExpect { + e.HTTPExpect.RespondsWith(statusCode, &MockedListResponse{ HasMore: hasMore, Data: body, }) + return e } -func (e *stripeBalanceTransactionListExpect) StartingAfter(v string) *stripeBalanceTransactionListExpect { +func (e *StripeBalanceTransactionListExpect) StartingAfter(v string) *StripeBalanceTransactionListExpect { e.QueryParam("starting_after", v) + return e } -func (e *stripeBalanceTransactionListExpect) CreatedLte(v time.Time) *stripeBalanceTransactionListExpect { +func (e *StripeBalanceTransactionListExpect) CreatedLte(v time.Time) *StripeBalanceTransactionListExpect { e.QueryParam("created[lte]", v.Unix()) + return e } -func (e *stripeBalanceTransactionListExpect) Limit(v int) *stripeBalanceTransactionListExpect { +func (e *StripeBalanceTransactionListExpect) Limit(v int) *StripeBalanceTransactionListExpect { e.QueryParam("limit", v) + return e } -func ExpectBalanceTransactionList(mock *httpMock) *stripeBalanceTransactionListExpect { +func ExpectBalanceTransactionList(mock *httpMock) *StripeBalanceTransactionListExpect { e := Expect[struct{}, MockedListResponse](mock) e.Path("/v1/balance_transactions").Method(http.MethodGet) - return &stripeBalanceTransactionListExpect{ - httpExpect: e, + + return &StripeBalanceTransactionListExpect{ + HTTPExpect: e, } } @@ -207,6 +227,7 @@ type BalanceTransactionSource stripe.BalanceTransactionSource func (t *BalanceTransactionSource) MarshalJSON() ([]byte, error) { type Aux BalanceTransactionSource + return json.Marshal(struct { Aux Charge *stripe.Charge `json:"charge"` @@ -226,6 +247,7 @@ type BalanceTransaction stripe.BalanceTransaction func (t *BalanceTransaction) MarshalJSON() ([]byte, error) { type Aux BalanceTransaction + return json.Marshal(struct { Aux Source *BalanceTransactionSource `json:"source"` @@ -235,6 +257,7 @@ func (t *BalanceTransaction) MarshalJSON() ([]byte, error) { }) } +//nolint:tagliatelle // allow snake_case in client type MockedListResponse struct { HasMore bool `json:"has_more"` Data []*stripe.BalanceTransaction `json:"data"` diff --git a/pkg/bridge/connectors/stripe/config.go b/internal/pkg/connectors/stripe/config.go similarity index 74% rename from pkg/bridge/connectors/stripe/config.go rename to internal/pkg/connectors/stripe/config.go index e7d57f473..672d5fcf5 100644 --- a/pkg/bridge/connectors/stripe/config.go +++ b/internal/pkg/connectors/stripe/config.go @@ -8,18 +8,19 @@ import ( type Config struct { PollingPeriod time.Duration `json:"pollingPeriod" yaml:"pollingPeriod" bson:"pollingPeriod"` - ApiKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` + APIKey string `json:"apiKey" yaml:"apiKey" bson:"apiKey"` TimelineConfig `bson:",inline"` } -func (c *Config) String() string { - return fmt.Sprintf("pollingPeriod=%d, pageSize=%d, apiKey=%s", c.PollingPeriod, c.PageSize, c.ApiKey) +func (c Config) String() string { + return fmt.Sprintf("pollingPeriod=%d, pageSize=%d, apiKey=%s", c.PollingPeriod, c.PageSize, c.APIKey) } func (c Config) Validate() error { - if c.ApiKey == "" { + if c.APIKey == "" { return errors.New("missing api key") } + return nil } diff --git a/pkg/bridge/connectors/stripe/connector.go b/internal/pkg/connectors/stripe/connector.go similarity index 82% rename from pkg/bridge/connectors/stripe/connector.go rename to internal/pkg/connectors/stripe/connector.go index 761e0ee9d..bf57fec8c 100644 --- a/pkg/bridge/connectors/stripe/connector.go +++ b/internal/pkg/connectors/stripe/connector.go @@ -3,9 +3,10 @@ package stripe import ( "context" + "github.com/numary/payments/internal/pkg/integration" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/integration" - "github.com/numary/payments/pkg/bridge/task" ) const connectorName = "stripe" @@ -29,12 +30,13 @@ func (c *Connector) Resolve(descriptor TaskDescriptor) task.Task { if descriptor.Main { return MainTask(c.cfg) } + return ConnectedAccountTask(c.cfg, descriptor.Account) } var _ integration.Connector[TaskDescriptor] = &Connector{} -func NewConnector(logger sharedlogging.Logger, cfg Config) *Connector { +func newConnector(logger sharedlogging.Logger, cfg Config) *Connector { return &Connector{ logger: logger.WithFields(map[string]any{ "component": "connector", diff --git a/pkg/bridge/connectors/stripe/descriptor.go b/internal/pkg/connectors/stripe/descriptor.go similarity index 100% rename from pkg/bridge/connectors/stripe/descriptor.go rename to internal/pkg/connectors/stripe/descriptor.go diff --git a/pkg/bridge/connectors/stripe/ingester.go b/internal/pkg/connectors/stripe/ingester.go similarity index 59% rename from pkg/bridge/connectors/stripe/ingester.go rename to internal/pkg/connectors/stripe/ingester.go index 581bb4007..3833ad1a2 100644 --- a/pkg/bridge/connectors/stripe/ingester.go +++ b/internal/pkg/connectors/stripe/ingester.go @@ -9,12 +9,12 @@ import ( type Ingester interface { Ingest(ctx context.Context, batch []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error } -type IngesterFn func(ctx context.Context, batch []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error -func (fn IngesterFn) Ingest(ctx context.Context, batch []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error { +type IngesterFn func(ctx context.Context, batch []*stripe.BalanceTransaction, + commitState TimelineState, tail bool) error + +func (fn IngesterFn) Ingest(ctx context.Context, batch []*stripe.BalanceTransaction, + commitState TimelineState, tail bool, +) error { return fn(ctx, batch, commitState, tail) } - -var NoOpIngester = IngesterFn(func(ctx context.Context, batch []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error { - return nil -}) diff --git a/internal/pkg/connectors/stripe/loader.go b/internal/pkg/connectors/stripe/loader.go new file mode 100644 index 000000000..adf511cca --- /dev/null +++ b/internal/pkg/connectors/stripe/loader.go @@ -0,0 +1,41 @@ +package stripe + +import ( + "time" + + "github.com/numary/payments/internal/pkg/integration" + + "github.com/numary/go-libs/sharedlogging" +) + +type Loader struct{} + +const allowedTasks = 50 + +func (l *Loader) AllowTasks() int { + return allowedTasks +} + +func (l *Loader) Name() string { + return connectorName +} + +func (l *Loader) Load(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { + return newConnector(logger, config) +} + +func (l *Loader) ApplyDefaults(cfg Config) Config { + if cfg.PageSize == 0 { + cfg.PageSize = 10 + } + + if cfg.PollingPeriod == 0 { + cfg.PollingPeriod = 2 * time.Minute + } + + return cfg +} + +func NewLoader() *Loader { + return &Loader{} +} diff --git a/pkg/bridge/connectors/stripe/runner.go b/internal/pkg/connectors/stripe/runner.go similarity index 95% rename from pkg/bridge/connectors/stripe/runner.go rename to internal/pkg/connectors/stripe/runner.go index 1441e7f47..60c3b57e0 100644 --- a/pkg/bridge/connectors/stripe/runner.go +++ b/internal/pkg/connectors/stripe/runner.go @@ -9,7 +9,7 @@ import ( func NewRunner( logger sharedlogging.Logger, - trigger *timelineTrigger, + trigger *TimelineTrigger, pollingPeriod time.Duration, ) *Runner { return &Runner{ @@ -24,7 +24,7 @@ func NewRunner( type Runner struct { stopChan chan chan struct{} - trigger *timelineTrigger + trigger *TimelineTrigger logger sharedlogging.Logger pollingPeriod time.Duration } @@ -45,7 +45,6 @@ func (r *Runner) Stop(ctx context.Context) error { } func (r *Runner) Run(ctx context.Context) error { - r.logger.WithFields(map[string]interface{}{ "polling-period": r.pollingPeriod, }).Info("Starting runner") @@ -56,13 +55,16 @@ func (r *Runner) Run(ctx context.Context) error { defer func() { done <- struct{}{} }() + if err := r.trigger.Fetch(ctx); err != nil { r.logger.Errorf("Error fetching page: %s", err) } } + go fetch() var timeChan <-chan time.Time + for { select { case <-ctx.Done(): @@ -70,11 +72,13 @@ func (r *Runner) Run(ctx context.Context) error { case closeChannel := <-r.stopChan: r.trigger.Cancel(ctx) close(closeChannel) + return nil case <-done: timeChan = time.After(r.pollingPeriod) case <-timeChan: timeChan = nil + go fetch() } } diff --git a/pkg/bridge/connectors/stripe/runner_test.go b/internal/pkg/connectors/stripe/runner_test.go similarity index 62% rename from pkg/bridge/connectors/stripe/runner_test.go rename to internal/pkg/connectors/stripe/runner_test.go index c44e89896..94aad7f0d 100644 --- a/pkg/bridge/connectors/stripe/runner_test.go +++ b/internal/pkg/connectors/stripe/runner_test.go @@ -5,14 +5,23 @@ import ( "testing" "time" + "github.com/stripe/stripe-go/v72" + "github.com/numary/go-libs/sharedlogging" "github.com/stretchr/testify/require" ) func TestStopTailing(t *testing.T) { + t.Parallel() + + NoOpIngester := IngesterFn(func(ctx context.Context, batch []*stripe.BalanceTransaction, + commitState TimelineState, tail bool, + ) error { + return nil + }) mock := NewClientMock(t, true) - tl := NewTimeline(mock, TimelineConfig{ + timeline := NewTimeline(mock, TimelineConfig{ PageSize: 2, }, TimelineState{ OldestID: "tx1", @@ -20,22 +29,23 @@ func TestStopTailing(t *testing.T) { }) logger := sharedlogging.GetLogger(context.Background()) - trigger := NewTimelineTrigger(logger, NoOpIngester, tl) + trigger := NewTimelineTrigger(logger, NoOpIngester, timeline) r := NewRunner(logger, trigger, time.Second) + go func() { _ = r.Run(context.Background()) }() + defer func() { _ = r.Stop(context.Background()) }() - require.False(t, tl.state.NoMoreHistory) + require.False(t, timeline.state.NoMoreHistory) mock.Expect().RespondsWith(false) // Fetch head mock.Expect().RespondsWith(false) // Fetch tail require.Eventually(t, func() bool { - return tl.state.NoMoreHistory + return timeline.state.NoMoreHistory }, time.Second, 10*time.Millisecond) - } diff --git a/pkg/bridge/connectors/stripe/state.go b/internal/pkg/connectors/stripe/state.go similarity index 100% rename from pkg/bridge/connectors/stripe/state.go rename to internal/pkg/connectors/stripe/state.go diff --git a/pkg/bridge/connectors/stripe/task_connected_account.go b/internal/pkg/connectors/stripe/task_connected_account.go similarity index 77% rename from pkg/bridge/connectors/stripe/task_connected_account.go rename to internal/pkg/connectors/stripe/task_connected_account.go index 703c49b60..59b761d6f 100644 --- a/pkg/bridge/connectors/stripe/task_connected_account.go +++ b/internal/pkg/connectors/stripe/task_connected_account.go @@ -4,28 +4,35 @@ import ( "context" "net/http" + "github.com/numary/payments/internal/pkg/ingestion" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/payments/internal/pkg/writeonly" + "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/ingestion" - "github.com/numary/payments/pkg/bridge/task" - "github.com/numary/payments/pkg/bridge/writeonly" "github.com/stripe/stripe-go/v72" ) func ingestBatch(ctx context.Context, logger sharedlogging.Logger, ingester ingestion.Ingester, - bts []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error { - + bts []*stripe.BalanceTransaction, commitState TimelineState, tail bool, +) error { batch := ingestion.Batch{} + for _, bt := range bts { batchElement, handled := CreateBatchElement(bt, !tail) + if !handled { logger.Debugf("Balance transaction type not handled: %s", bt.Type) + continue } + if batchElement.Adjustment == nil && batchElement.Payment == nil { continue } + batch = append(batch, batchElement) } + logger.WithFields(map[string]interface{}{ "state": commitState, }).Debugf("updating state") @@ -41,7 +48,8 @@ func ingestBatch(ctx context.Context, logger sharedlogging.Logger, ingester inge func ConnectedAccountTask(config Config, account string) func(ctx context.Context, logger sharedlogging.Logger, ingester ingestion.Ingester, resolver task.StateResolver, storage writeonly.Storage) error { return func(ctx context.Context, logger sharedlogging.Logger, ingester ingestion.Ingester, - resolver task.StateResolver, storage writeonly.Storage) error { + resolver task.StateResolver, storage writeonly.Storage, + ) error { logger.Infof("Create new trigger") trigger := NewTimelineTrigger( @@ -49,8 +57,10 @@ func ConnectedAccountTask(config Config, account string) func(ctx context.Contex IngesterFn(func(ctx context.Context, bts []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error { return ingestBatch(ctx, logger, ingester, bts, commitState, tail) }), - NewTimeline(NewDefaultClient(http.DefaultClient, config.ApiKey, storage).ForAccount(account), config.TimelineConfig, task.MustResolveTo(ctx, resolver, TimelineState{})), + NewTimeline(NewDefaultClient(http.DefaultClient, config.APIKey, storage). + ForAccount(account), config.TimelineConfig, task.MustResolveTo(ctx, resolver, TimelineState{})), ) + return trigger.Fetch(ctx) } } diff --git a/pkg/bridge/connectors/stripe/task_main.go b/internal/pkg/connectors/stripe/task_main.go similarity index 72% rename from pkg/bridge/connectors/stripe/task_main.go rename to internal/pkg/connectors/stripe/task_main.go index b304c357d..a468732c0 100644 --- a/pkg/bridge/connectors/stripe/task_main.go +++ b/internal/pkg/connectors/stripe/task_main.go @@ -4,10 +4,11 @@ import ( "context" "net/http" + "github.com/numary/payments/internal/pkg/ingestion" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/payments/internal/pkg/writeonly" + "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/ingestion" - "github.com/numary/payments/pkg/bridge/task" - "github.com/numary/payments/pkg/bridge/writeonly" "github.com/pkg/errors" "github.com/stripe/stripe-go/v72" ) @@ -19,14 +20,15 @@ func ingest( ingester ingestion.Ingester, bts []*stripe.BalanceTransaction, commitState TimelineState, - tail bool) error { - + tail bool, +) error { err := ingestBatch(ctx, logger, ingester, bts, commitState, tail) if err != nil { return err } connectedAccounts := make([]string, 0) + for _, bt := range bts { if bt.Type == stripe.BalanceTransactionTypeTransfer { connectedAccounts = append(connectedAccounts, bt.Source.Transfer.Destination.ID) @@ -34,10 +36,10 @@ func ingest( } for _, connectedAccount := range connectedAccounts { - err := scheduler.Schedule(TaskDescriptor{ + err = scheduler.Schedule(TaskDescriptor{ Account: connectedAccount, }, true) - if err != nil && err != task.ErrAlreadyScheduled { + if err != nil && !errors.Is(err, task.ErrAlreadyScheduled) { return errors.Wrap(err, "scheduling connected account") } } @@ -48,18 +50,23 @@ func ingest( func MainTask(config Config) func(ctx context.Context, logger sharedlogging.Logger, resolver task.StateResolver, scheduler task.Scheduler[TaskDescriptor], ingester ingestion.Ingester, storage writeonly.Storage) error { return func(ctx context.Context, logger sharedlogging.Logger, resolver task.StateResolver, - scheduler task.Scheduler[TaskDescriptor], ingester ingestion.Ingester, storage writeonly.Storage) error { + scheduler task.Scheduler[TaskDescriptor], ingester ingestion.Ingester, storage writeonly.Storage, + ) error { runner := NewRunner( logger, NewTimelineTrigger( logger, - IngesterFn(func(ctx context.Context, batch []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error { + IngesterFn(func(ctx context.Context, batch []*stripe.BalanceTransaction, + commitState TimelineState, tail bool, + ) error { return ingest(ctx, logger, scheduler, ingester, batch, commitState, tail) }), - NewTimeline(NewDefaultClient(http.DefaultClient, config.ApiKey, storage), config.TimelineConfig, task.MustResolveTo(ctx, resolver, TimelineState{})), + NewTimeline(NewDefaultClient(http.DefaultClient, config.APIKey, storage), + config.TimelineConfig, task.MustResolveTo(ctx, resolver, TimelineState{})), ), config.PollingPeriod, ) + return runner.Run(ctx) } } diff --git a/pkg/bridge/connectors/stripe/timeline.go b/internal/pkg/connectors/stripe/timeline.go similarity index 84% rename from pkg/bridge/connectors/stripe/timeline.go rename to internal/pkg/connectors/stripe/timeline.go index 1febb77e2..a65eef814 100644 --- a/pkg/bridge/connectors/stripe/timeline.go +++ b/internal/pkg/connectors/stripe/timeline.go @@ -13,44 +13,48 @@ const ( balanceTransactionsEndpoint = "https://api.stripe.com/v1/balance_transactions" ) +//nolint:tagliatelle // allow different styled tags in client type ListResponse struct { HasMore bool `json:"has_more"` Data []*stripe.BalanceTransaction `json:"data"` } type TimelineOption interface { - apply(c *timeline) + apply(c *Timeline) } -type TimelineOptionFn func(c *timeline) +type TimelineOptionFn func(c *Timeline) -func (fn TimelineOptionFn) apply(c *timeline) { +func (fn TimelineOptionFn) apply(c *Timeline) { fn(c) } func WithStartingAt(v time.Time) TimelineOptionFn { - return func(c *timeline) { + return func(c *Timeline) { c.startingAt = v } } -var defaultOptions = make([]TimelineOption, 0) +func NewTimeline(client Client, cfg TimelineConfig, state TimelineState, options ...TimelineOption) *Timeline { + defaultOptions := make([]TimelineOption, 0) -func NewTimeline(client Client, cfg TimelineConfig, state TimelineState, options ...TimelineOption) *timeline { - c := &timeline{ + c := &Timeline{ config: cfg, state: state, client: client, } + options = append(defaultOptions, append([]TimelineOption{ WithStartingAt(time.Now()), }, options...)...) + for _, opt := range options { opt.apply(c) } + return c } -type timeline struct { +type Timeline struct { state TimelineState firstIDAfterStartingAt string startingAt time.Time @@ -58,11 +62,13 @@ type timeline struct { client Client } -func (tl *timeline) doRequest(ctx context.Context, queryParams url.Values, to *[]*stripe.BalanceTransaction) (bool, error) { - +func (tl *Timeline) doRequest(ctx context.Context, queryParams url.Values, + to *[]*stripe.BalanceTransaction, +) (bool, error) { options := make([]ClientOption, 0) options = append(options, QueryParam("limit", fmt.Sprintf("%d", tl.config.PageSize))) options = append(options, QueryParam("expand[]", "data.source")) + for k, v := range queryParams { options = append(options, QueryParam(k, v[0])) } @@ -71,27 +77,33 @@ func (tl *timeline) doRequest(ctx context.Context, queryParams url.Values, to *[ if err != nil { return false, err } + *to = txs + return hasMore, nil } -func (tl *timeline) init(ctx context.Context) error { +func (tl *Timeline) init(ctx context.Context) error { ret := make([]*stripe.BalanceTransaction, 0) params := url.Values{} params.Set("limit", "1") params.Set("created[lt]", fmt.Sprintf("%d", tl.startingAt.Unix())) + _, err := tl.doRequest(ctx, params, &ret) if err != nil { return err } + if len(ret) > 0 { tl.firstIDAfterStartingAt = ret[0].ID } + return nil } -func (tl *timeline) Tail(ctx context.Context, to *[]*stripe.BalanceTransaction) (bool, TimelineState, func(), error) { +func (tl *Timeline) Tail(ctx context.Context, to *[]*stripe.BalanceTransaction) (bool, TimelineState, func(), error) { queryParams := url.Values{} + switch { case tl.state.OldestID != "": queryParams.Set("starting_after", tl.state.OldestID) @@ -105,12 +117,13 @@ func (tl *timeline) Tail(ctx context.Context, to *[]*stripe.BalanceTransaction) } futureState := tl.state - if len(*to) > 0 { + if len(*to) > 0 { lastItem := (*to)[len(*to)-1] futureState.OldestID = lastItem.ID oldestDate := time.Unix(lastItem.Created, 0) futureState.OldestDate = &oldestDate + if futureState.MoreRecentID == "" { firstItem := (*to)[0] futureState.MoreRecentID = firstItem.ID @@ -118,6 +131,7 @@ func (tl *timeline) Tail(ctx context.Context, to *[]*stripe.BalanceTransaction) futureState.MoreRecentDate = &moreRecentDate } } + futureState.NoMoreHistory = !hasMore return hasMore, futureState, func() { @@ -125,12 +139,13 @@ func (tl *timeline) Tail(ctx context.Context, to *[]*stripe.BalanceTransaction) }, nil } -func (tl *timeline) Head(ctx context.Context, to *[]*stripe.BalanceTransaction) (bool, TimelineState, func(), error) { +func (tl *Timeline) Head(ctx context.Context, to *[]*stripe.BalanceTransaction) (bool, TimelineState, func(), error) { if tl.firstIDAfterStartingAt == "" && tl.state.MoreRecentID == "" { err := tl.init(ctx) if err != nil { return false, TimelineState{}, nil, err } + if tl.firstIDAfterStartingAt == "" { return false, TimelineState{ NoMoreHistory: true, @@ -139,6 +154,7 @@ func (tl *timeline) Head(ctx context.Context, to *[]*stripe.BalanceTransaction) } queryParams := url.Values{} + switch { case tl.state.MoreRecentID != "": queryParams.Set("ending_before", tl.state.MoreRecentID) @@ -152,11 +168,13 @@ func (tl *timeline) Head(ctx context.Context, to *[]*stripe.BalanceTransaction) } futureState := tl.state + if len(*to) > 0 { firstItem := (*to)[0] futureState.MoreRecentID = firstItem.ID moreRecentDate := time.Unix(firstItem.Created, 0) futureState.MoreRecentDate = &moreRecentDate + if futureState.OldestID == "" { lastItem := (*to)[len(*to)-1] futureState.OldestID = lastItem.ID @@ -174,6 +192,6 @@ func (tl *timeline) Head(ctx context.Context, to *[]*stripe.BalanceTransaction) }, nil } -func (tl *timeline) State() TimelineState { +func (tl *Timeline) State() TimelineState { return tl.state } diff --git a/pkg/bridge/connectors/stripe/timeline_test.go b/internal/pkg/connectors/stripe/timeline_test.go similarity index 87% rename from pkg/bridge/connectors/stripe/timeline_test.go rename to internal/pkg/connectors/stripe/timeline_test.go index a6e6a1f78..f576717db 100644 --- a/pkg/bridge/connectors/stripe/timeline_test.go +++ b/internal/pkg/connectors/stripe/timeline_test.go @@ -10,9 +10,11 @@ import ( ) func TestTimeline(t *testing.T) { + t.Parallel() + mock := NewClientMock(t, true) ref := time.Now() - tl := NewTimeline(mock, TimelineConfig{ + timeline := NewTimeline(mock, TimelineConfig{ PageSize: 2, }, TimelineState{}, WithStartingAt(ref)) @@ -32,7 +34,7 @@ func TestTimeline(t *testing.T) { RespondsWith(true, tx1, tx2) ret := make([]*stripe.BalanceTransaction, 0) - hasMore, state, commit, err := tl.Tail(context.Background(), &ret) + hasMore, state, commit, err := timeline.Tail(context.Background(), &ret) require.NoError(t, err) require.True(t, hasMore) require.Equal(t, TimelineState{ @@ -52,7 +54,7 @@ func TestTimeline(t *testing.T) { mock.Expect().Limit(2).StartingAfter(tx2.ID).RespondsWith(false, tx3) - hasMore, state, _, err = tl.Tail(context.Background(), &ret) + hasMore, state, _, err = timeline.Tail(context.Background(), &ret) require.NoError(t, err) require.False(t, hasMore) require.Equal(t, TimelineState{ diff --git a/pkg/bridge/connectors/stripe/timeline_trigger.go b/internal/pkg/connectors/stripe/timeline_trigger.go similarity index 82% rename from pkg/bridge/connectors/stripe/timeline_trigger.go rename to internal/pkg/connectors/stripe/timeline_trigger.go index 207e9fd86..0401d2fcf 100644 --- a/pkg/bridge/connectors/stripe/timeline_trigger.go +++ b/internal/pkg/connectors/stripe/timeline_trigger.go @@ -12,35 +12,37 @@ import ( func NewTimelineTrigger( logger sharedlogging.Logger, ingester Ingester, - tl *timeline, -) *timelineTrigger { - return &timelineTrigger{ + timeline *Timeline, +) *TimelineTrigger { + return &TimelineTrigger{ logger: logger.WithFields(map[string]interface{}{ "component": "timeline-trigger", }), ingester: ingester, - timeline: tl, + timeline: timeline, sem: semaphore.NewWeighted(1), } } -type timelineTrigger struct { +type TimelineTrigger struct { logger sharedlogging.Logger ingester Ingester - timeline *timeline + timeline *Timeline sem *semaphore.Weighted cancel func() } -func (t *timelineTrigger) Fetch(ctx context.Context) error { +func (t *TimelineTrigger) Fetch(ctx context.Context) error { if t.sem.TryAcquire(1) { defer t.sem.Release(1) + ctx, t.cancel = context.WithCancel(ctx) if !t.timeline.State().NoMoreHistory { if err := t.fetch(ctx, true); err != nil { return err } } + select { case <-ctx.Done(): return ctx.Err() @@ -50,21 +52,24 @@ func (t *timelineTrigger) Fetch(ctx context.Context) error { } } } + return nil } -func (t *timelineTrigger) Cancel(ctx context.Context) { +func (t *TimelineTrigger) Cancel(ctx context.Context) { if t.cancel != nil { t.cancel() + err := t.sem.Acquire(ctx, 1) if err != nil { panic(err) } + t.sem.Release(1) } } -func (t *timelineTrigger) fetch(ctx context.Context, tail bool) error { +func (t *TimelineTrigger) fetch(ctx context.Context, tail bool) error { for { select { case <-ctx.Done(): @@ -74,6 +79,7 @@ func (t *timelineTrigger) fetch(ctx context.Context, tail bool) error { if err != nil { return errors.Wrap(err, "error triggering tail page") } + if !hasMore { return nil } @@ -81,15 +87,16 @@ func (t *timelineTrigger) fetch(ctx context.Context, tail bool) error { } } -func (t *timelineTrigger) triggerPage(ctx context.Context, tail bool) (bool, error) { - +func (t *TimelineTrigger) triggerPage(ctx context.Context, tail bool) (bool, error) { logger := t.logger.WithFields(map[string]interface{}{ "tail": tail, }) + logger.Debugf("Trigger page") ret := make([]*stripe.BalanceTransaction, 0) method := t.timeline.Head + if tail { method = t.timeline.Tail } @@ -100,6 +107,7 @@ func (t *timelineTrigger) triggerPage(ctx context.Context, tail bool) (bool, err } logger.Debug("Ingest batch") + if len(ret) > 0 { err = t.ingester.Ingest(ctx, ret, futureState, tail) if err != nil { @@ -108,5 +116,6 @@ func (t *timelineTrigger) triggerPage(ctx context.Context, tail bool) (bool, err } commitFn() + return hasMore, nil } diff --git a/pkg/bridge/connectors/stripe/timeline_trigger_test.go b/internal/pkg/connectors/stripe/timeline_trigger_test.go similarity index 93% rename from pkg/bridge/connectors/stripe/timeline_trigger_test.go rename to internal/pkg/connectors/stripe/timeline_trigger_test.go index 7c0372e60..500cd5f7e 100644 --- a/pkg/bridge/connectors/stripe/timeline_trigger_test.go +++ b/internal/pkg/connectors/stripe/timeline_trigger_test.go @@ -12,11 +12,13 @@ import ( ) func TestTimelineTrigger(t *testing.T) { + t.Parallel() + const txCount = 12 mock := NewClientMock(t, true) ref := time.Now().Add(-time.Minute * time.Duration(txCount) / 2) - tl := NewTimeline(mock, TimelineConfig{ + timeline := NewTimeline(mock, TimelineConfig{ PageSize: 2, }, TimelineState{}, WithStartingAt(ref)) @@ -25,9 +27,10 @@ func TestTimelineTrigger(t *testing.T) { sharedlogging.GetLogger(context.Background()), IngesterFn(func(ctx context.Context, batch []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error { ingestedTx = append(ingestedTx, batch...) + return nil }), - tl, + timeline, ) allTxs := make([]*stripe.BalanceTransaction, txCount) @@ -45,6 +48,7 @@ func TestTimelineTrigger(t *testing.T) { for i := 0; i < txCount/2; i += 2 { mock.Expect().Limit(2).RespondsWith(i < txCount/2-2, allTxs[txCount/2+i], allTxs[txCount/2+i+1]) } + for i := 0; i < txCount/2; i += 2 { mock.Expect().Limit(2).RespondsWith(i < txCount/2-2, allTxs[txCount/2-i-2], allTxs[txCount/2-i-1]) } @@ -57,11 +61,13 @@ func TestTimelineTrigger(t *testing.T) { } func TestCancelTimelineTrigger(t *testing.T) { + t.Parallel() + const txCount = 12 mock := NewClientMock(t, false) ref := time.Now().Add(-time.Minute * time.Duration(txCount) / 2) - tl := NewTimeline(mock, TimelineConfig{ + timeline := NewTimeline(mock, TimelineConfig{ PageSize: 1, }, TimelineState{}, WithStartingAt(ref)) @@ -71,9 +77,10 @@ func TestCancelTimelineTrigger(t *testing.T) { IngesterFn(func(ctx context.Context, batch []*stripe.BalanceTransaction, commitState TimelineState, tail bool) error { close(waiting) // Instruct the test the trigger is in fetching state <-ctx.Done() + return nil }), - tl, + timeline, ) allTxs := make([]*stripe.BalanceTransaction, txCount) @@ -85,7 +92,7 @@ func TestCancelTimelineTrigger(t *testing.T) { } go func() { - //TODO: Handle error + // TODO: Handle error _ = trigger.Fetch(context.Background()) }() select { diff --git a/internal/pkg/connectors/stripe/translate.go b/internal/pkg/connectors/stripe/translate.go new file mode 100644 index 000000000..7064b416a --- /dev/null +++ b/internal/pkg/connectors/stripe/translate.go @@ -0,0 +1,315 @@ +package stripe + +import ( + "fmt" + "log" + "runtime/debug" + "strings" + "time" + + "github.com/numary/payments/internal/pkg/ingestion" + "github.com/numary/payments/internal/pkg/payments" + + "github.com/davecgh/go-spew/spew" + "github.com/stripe/stripe-go/v72" +) + +type currency struct { + decimals int +} + +func currencies() map[string]currency { + return map[string]currency{ + "ARS": {2}, // Argentine Peso + "AMD": {2}, // Armenian Dram + "AWG": {2}, // Aruban Guilder + "AUD": {2}, // Australian Dollar + "BSD": {2}, // Bahamian Dollar + "BHD": {3}, // Bahraini Dinar + "BDT": {2}, // Bangladesh, Taka + "BZD": {2}, // Belize Dollar + "BMD": {2}, // Bermudian Dollar + "BOB": {2}, // Bolivia, Boliviano + "BAM": {2}, // Bosnia and Herzegovina, Convertible Marks + "BWP": {2}, // Botswana, Pula + "BRL": {2}, // Brazilian Real + "BND": {2}, // Brunei Dollar + "CAD": {2}, // Canadian Dollar + "KYD": {2}, // Cayman Islands Dollar + "CLP": {0}, // Chilean Peso + "CNY": {2}, // China Yuan Renminbi + "COP": {2}, // Colombian Peso + "CRC": {2}, // Costa Rican Colon + "HRK": {2}, // Croatian Kuna + "CUC": {2}, // Cuban Convertible Peso + "CUP": {2}, // Cuban Peso + "CYP": {2}, // Cyprus Pound + "CZK": {2}, // Czech Koruna + "DKK": {2}, // Danish Krone + "DOP": {2}, // Dominican Peso + "XCD": {2}, // East Caribbean Dollar + "EGP": {2}, // Egyptian Pound + "SVC": {2}, // El Salvador Colon + "ATS": {2}, // Euro + "BEF": {2}, // Euro + "DEM": {2}, // Euro + "EEK": {2}, // Euro + "ESP": {2}, // Euro + "EUR": {2}, // Euro + "FIM": {2}, // Euro + "FRF": {2}, // Euro + "GRD": {2}, // Euro + "IEP": {2}, // Euro + "ITL": {2}, // Euro + "LUF": {2}, // Euro + "NLG": {2}, // Euro + "PTE": {2}, // Euro + "GHC": {2}, // Ghana, Cedi + "GIP": {2}, // Gibraltar Pound + "GTQ": {2}, // Guatemala, Quetzal + "HNL": {2}, // Honduras, Lempira + "HKD": {2}, // Hong Kong Dollar + "HUF": {0}, // Hungary, Forint + "ISK": {0}, // Iceland Krona + "INR": {2}, // Indian Rupee + "IDR": {2}, // Indonesia, Rupiah + "IRR": {2}, // Iranian Rial + "JMD": {2}, // Jamaican Dollar + "JPY": {0}, // Japan, Yen + "JOD": {3}, // Jordanian Dinar + "KES": {2}, // Kenyan Shilling + "KWD": {3}, // Kuwaiti Dinar + "LVL": {2}, // Latvian Lats + "LBP": {0}, // Lebanese Pound + "LTL": {2}, // Lithuanian Litas + "MKD": {2}, // Macedonia, Denar + "MYR": {2}, // Malaysian Ringgit + "MTL": {2}, // Maltese Lira + "MUR": {0}, // Mauritius Rupee + "MXN": {2}, // Mexican Peso + "MZM": {2}, // Mozambique Metical + "NPR": {2}, // Nepalese Rupee + "ANG": {2}, // Netherlands Antillian Guilder + "ILS": {2}, // New Israeli Shekel + "TRY": {2}, // New Turkish Lira + "NZD": {2}, // New Zealand Dollar + "NOK": {2}, // Norwegian Krone + "PKR": {2}, // Pakistan Rupee + "PEN": {2}, // Peru, Nuevo Sol + "UYU": {2}, // Peso Uruguayo + "PHP": {2}, // Philippine Peso + "PLN": {2}, // Poland, Zloty + "GBP": {2}, // Pound Sterling + "OMR": {3}, // Rial Omani + "RON": {2}, // Romania, New Leu + "ROL": {2}, // Romania, Old Leu + "RUB": {2}, // Russian Ruble + "SAR": {2}, // Saudi Riyal + "SGD": {2}, // Singapore Dollar + "SKK": {2}, // Slovak Koruna + "SIT": {2}, // Slovenia, Tolar + "ZAR": {2}, // South Africa, Rand + "KRW": {0}, // South Korea, Won + "SZL": {2}, // Swaziland, Lilangeni + "SEK": {2}, // Swedish Krona + "CHF": {2}, // Swiss Franc + "TZS": {2}, // Tanzanian Shilling + "THB": {2}, // Thailand, Baht + "TOP": {2}, // Tonga, Paanga + "AED": {2}, // UAE Dirham + "UAH": {2}, // Ukraine, Hryvnia + "USD": {2}, // US Dollar + "VUV": {0}, // Vanuatu, Vatu + "VEF": {2}, // Venezuela Bolivares Fuertes + "VEB": {2}, // Venezuela, Bolivar + "VND": {0}, // Viet Nam, Dong + "ZWD": {2}, // Zimbabwe Dollar + } +} + +func CreateBatchElement(balanceTransaction *stripe.BalanceTransaction, forward bool) (ingestion.BatchElement, bool) { + var ( + reference payments.Referenced + paymentData *payments.Data + adjustment *payments.Adjustment + ) + + defer func() { + // DEBUG + if e := recover(); e != nil { + log.Println("Error translating transaction") + debug.PrintStack() + spew.Dump(balanceTransaction) + panic(e) + } + }() + + if balanceTransaction.Source == nil { + return ingestion.BatchElement{}, false + } + + formatAsset := func(cur stripe.Currency) string { + asset := strings.ToUpper(string(cur)) + + def, ok := currencies()[asset] + if !ok { + return asset + } + + if def.decimals == 0 { + return asset + } + + return fmt.Sprintf("%s/%d", asset, def.decimals) + } + + payoutStatus := convertPayoutStatus(balanceTransaction.Source.Payout.Status) + + switch balanceTransaction.Type { + case "charge": + reference = payments.Referenced{ + Reference: balanceTransaction.Source.Charge.ID, + Type: payments.TypePayIn, + } + paymentData = &payments.Data{ + Status: payments.StatusSucceeded, + InitialAmount: balanceTransaction.Source.Charge.Amount, + Asset: formatAsset(balanceTransaction.Source.Charge.Currency), + Raw: balanceTransaction, + Scheme: payments.Scheme(balanceTransaction.Source.Charge.PaymentMethodDetails.Card.Brand), + CreatedAt: time.Unix(balanceTransaction.Created, 0), + } + case "payout": + reference = payments.Referenced{ + Reference: balanceTransaction.Source.Payout.ID, + Type: payments.TypePayout, + } + paymentData = &payments.Data{ + Status: payoutStatus, + InitialAmount: balanceTransaction.Source.Payout.Amount, + Raw: balanceTransaction, + Asset: formatAsset(balanceTransaction.Source.Payout.Currency), + Scheme: func() payments.Scheme { + switch balanceTransaction.Source.Payout.Type { + case "bank_account": + return payments.SchemeSepaCredit + case "card": + return payments.Scheme(balanceTransaction.Source.Payout.Card.Brand) + } + + return payments.SchemeUnknown + }(), + CreatedAt: time.Unix(balanceTransaction.Created, 0), + } + + case "transfer": + reference = payments.Referenced{ + Reference: balanceTransaction.Source.Transfer.ID, + Type: payments.TypePayout, + } + paymentData = &payments.Data{ + Status: payments.StatusSucceeded, + InitialAmount: balanceTransaction.Source.Transfer.Amount, + Raw: balanceTransaction, + Asset: formatAsset(balanceTransaction.Source.Transfer.Currency), + Scheme: payments.SchemeOther, + CreatedAt: time.Unix(balanceTransaction.Created, 0), + } + case "refund": + reference = payments.Referenced{ + Reference: balanceTransaction.Source.Refund.Charge.ID, + Type: payments.TypePayIn, + } + adjustment = &payments.Adjustment{ + Status: payments.StatusSucceeded, + Amount: balanceTransaction.Amount, + Date: time.Unix(balanceTransaction.Created, 0), + Raw: balanceTransaction, + } + case "payment": + reference = payments.Referenced{ + Reference: balanceTransaction.Source.Charge.ID, + Type: payments.TypePayIn, + } + paymentData = &payments.Data{ + Status: payments.StatusSucceeded, + InitialAmount: balanceTransaction.Source.Charge.Amount, + Raw: balanceTransaction, + Asset: formatAsset(balanceTransaction.Source.Charge.Currency), + Scheme: payments.SchemeOther, + CreatedAt: time.Unix(balanceTransaction.Created, 0), + } + case "payout_cancel": + reference = payments.Referenced{ + Reference: balanceTransaction.Source.Payout.ID, + Type: payments.TypePayout, + } + adjustment = &payments.Adjustment{ + Status: payoutStatus, + Amount: 0, + Date: time.Unix(balanceTransaction.Created, 0), + Raw: balanceTransaction, + Absolute: true, + } + case "payout_failure": + reference = payments.Referenced{ + Reference: balanceTransaction.Source.Payout.ID, + Type: payments.TypePayIn, + } + adjustment = &payments.Adjustment{ + Status: payoutStatus, + Amount: 0, + Date: time.Unix(balanceTransaction.Created, 0), + Raw: balanceTransaction, + Absolute: true, + } + case "payment_refund": + reference = payments.Referenced{ + Reference: balanceTransaction.Source.Refund.Charge.ID, + Type: payments.TypePayIn, + } + adjustment = &payments.Adjustment{ + Status: payments.StatusSucceeded, + Amount: balanceTransaction.Amount, + Date: time.Unix(balanceTransaction.Created, 0), + Raw: balanceTransaction, + } + case "adjustment": + reference = payments.Referenced{ + Reference: balanceTransaction.Source.Dispute.Charge.ID, + Type: payments.TypePayIn, + } + adjustment = &payments.Adjustment{ + Status: payments.StatusCancelled, + Amount: balanceTransaction.Amount, + Date: time.Unix(balanceTransaction.Created, 0), + Raw: balanceTransaction, + } + case "stripe_fee", "network_cost": + return ingestion.BatchElement{}, true + default: + return ingestion.BatchElement{}, false + } + + return ingestion.BatchElement{ + Referenced: reference, + Payment: paymentData, + Adjustment: adjustment, + Forward: forward, + }, true +} + +func convertPayoutStatus(status stripe.PayoutStatus) payments.Status { + switch status { + case stripe.PayoutStatusCanceled: + return payments.StatusCancelled + case stripe.PayoutStatusFailed: + return payments.StatusFailed + case stripe.PayoutStatusInTransit, stripe.PayoutStatusPending: + return payments.StatusPending + case stripe.PayoutStatusPaid: + return payments.StatusSucceeded + } + + return payments.StatusOther +} diff --git a/pkg/bridge/connectors/stripe/utils_test.go b/internal/pkg/connectors/stripe/utils_test.go similarity index 54% rename from pkg/bridge/connectors/stripe/utils_test.go rename to internal/pkg/connectors/stripe/utils_test.go index f5ec588a4..6d3058c5b 100644 --- a/pkg/bridge/connectors/stripe/utils_test.go +++ b/internal/pkg/connectors/stripe/utils_test.go @@ -11,15 +11,17 @@ import ( "testing" "time" + "github.com/numary/payments/internal/pkg/fifo" + "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedlogging/sharedlogginglogrus" - "github.com/numary/payments/pkg/bridge/utils" "github.com/sirupsen/logrus" "github.com/stripe/stripe-go/v72" ) func TestMain(m *testing.M) { flag.Parse() + if testing.Verbose() { l := logrus.New() l.Level = logrus.DebugLevel @@ -29,13 +31,13 @@ func TestMain(m *testing.M) { os.Exit(m.Run()) } -type clientMockExpectation struct { +type ClientMockExpectation struct { query url.Values hasMore bool items []*stripe.BalanceTransaction } -func (e *clientMockExpectation) QueryParam(key string, value any) *clientMockExpectation { +func (e *ClientMockExpectation) QueryParam(key string, value any) *ClientMockExpectation { var qpvalue string switch value.(type) { case int, int8, int16, int32, int64, uint, uint8, uint16, uint32, uint64: @@ -44,72 +46,89 @@ func (e *clientMockExpectation) QueryParam(key string, value any) *clientMockExp qpvalue = fmt.Sprintf("%s", value) } e.query.Set(key, qpvalue) + return e } -func (e *clientMockExpectation) StartingAfter(v string) *clientMockExpectation { +func (e *ClientMockExpectation) StartingAfter(v string) *ClientMockExpectation { e.QueryParam("starting_after", v) + return e } -func (e *clientMockExpectation) CreatedLte(v time.Time) *clientMockExpectation { +func (e *ClientMockExpectation) CreatedLte(v time.Time) *ClientMockExpectation { e.QueryParam("created[lte]", v.Unix()) + return e } -func (e *clientMockExpectation) Limit(v int) *clientMockExpectation { +func (e *ClientMockExpectation) Limit(v int) *ClientMockExpectation { e.QueryParam("limit", v) + return e } -func (e *clientMockExpectation) RespondsWith(hasMore bool, txs ...*stripe.BalanceTransaction) *clientMockExpectation { +func (e *ClientMockExpectation) RespondsWith(hasMore bool, + txs ...*stripe.BalanceTransaction, +) *ClientMockExpectation { e.hasMore = hasMore e.items = txs + return e } -func (e *clientMockExpectation) handle(ctx context.Context, options ...ClientOption) ([]*stripe.BalanceTransaction, bool, error) { +func (e *ClientMockExpectation) handle(options ...ClientOption) ([]*stripe.BalanceTransaction, bool, error) { req := httptest.NewRequest(http.MethodGet, "/", nil) - for _, o := range options { - o.apply(req) + + for _, option := range options { + option.apply(req) } - for k := range e.query { - if req.URL.Query().Get(k) != e.query.Get(k) { - return nil, false, fmt.Errorf("mismatch query params, expected query param '%s' with value '%s', got '%s'", k, e.query.Get(k), req.URL.Query().Get(k)) + + for key := range e.query { + if req.URL.Query().Get(key) != e.query.Get(key) { + return nil, false, fmt.Errorf("mismatch query params, expected query param '%s' "+ + "with value '%s', got '%s'", key, e.query.Get(key), req.URL.Query().Get(key)) } } + return e.items, e.hasMore, nil } -type clientMock struct { - expectations *utils.FIFO[*clientMockExpectation] +type ClientMock struct { + expectations *fifo.FIFO[*ClientMockExpectation] } -func (m *clientMock) ForAccount(account string) Client { +func (m *ClientMock) ForAccount(account string) Client { return m } -func (m *clientMock) BalanceTransactions(ctx context.Context, options ...ClientOption) ([]*stripe.BalanceTransaction, bool, error) { +func (m *ClientMock) BalanceTransactions(ctx context.Context, + options ...ClientOption, +) ([]*stripe.BalanceTransaction, bool, error) { e, ok := m.expectations.Pop() if !ok { return nil, false, fmt.Errorf("no more expectation") } - return e.handle(ctx, options...) + return e.handle(options...) } -func (m *clientMock) Expect() *clientMockExpectation { - e := &clientMockExpectation{ +func (m *ClientMock) Expect() *ClientMockExpectation { + e := &ClientMockExpectation{ query: url.Values{}, } m.expectations.Push(e) + return e } -func NewClientMock(t *testing.T, expectationsShouldBeConsumed bool) *clientMock { - m := &clientMock{ - expectations: &utils.FIFO[*clientMockExpectation]{}, +func NewClientMock(t *testing.T, expectationsShouldBeConsumed bool) *ClientMock { + t.Helper() + + m := &ClientMock{ + expectations: &fifo.FIFO[*ClientMockExpectation]{}, } + if expectationsShouldBeConsumed { t.Cleanup(func() { if !m.expectations.Empty() && !t.Failed() { @@ -121,4 +140,4 @@ func NewClientMock(t *testing.T, expectationsShouldBeConsumed bool) *clientMock return m } -var _ Client = &clientMock{} +var _ Client = &ClientMock{} diff --git a/pkg/bridge/connectors/wise/client.go b/internal/pkg/connectors/wise/client.go similarity index 60% rename from pkg/bridge/connectors/wise/client.go rename to internal/pkg/connectors/wise/client.go index 818d9c236..c8e89bdf6 100644 --- a/pkg/bridge/connectors/wise/client.go +++ b/internal/pkg/connectors/wise/client.go @@ -1,6 +1,7 @@ package wise import ( + "context" "encoding/json" "fmt" "io" @@ -10,11 +11,12 @@ import ( const apiEndpoint = "https://api.wise.com" type apiTransport struct { - ApiKey string + APIKey string } func (t *apiTransport) RoundTrip(req *http.Request) (*http.Response, error) { - req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", t.ApiKey)) + req.Header.Add("Authorization", fmt.Sprintf("Bearer %s", t.APIKey)) + return http.DefaultTransport.RoundTrip(req) } @@ -28,18 +30,19 @@ type profile struct { } type transfer struct { - ID uint64 `json:"id"` - Reference string `json:"reference"` - Status string `json:"status"` - SourceAccount uint64 `json:"sourceAccount"` - SourceCurrency string `json:"sourceCurrency"` - SourceValue float64 `json:"sourceValue"` - TargetAccount uint64 `json:"targetAccount"` - TargetCurrency string `json:"targetCurrency"` - TargetValue float64 `json:"targetValue"` - Business uint64 `json:"business"` - Created string `json:"created"` - CustomerTransactionId string `json:"customerTransactionId"` + ID uint64 `json:"id"` + Reference string `json:"reference"` + Status string `json:"status"` + SourceAccount uint64 `json:"sourceAccount"` + SourceCurrency string `json:"sourceCurrency"` + SourceValue float64 `json:"sourceValue"` + TargetAccount uint64 `json:"targetAccount"` + TargetCurrency string `json:"targetCurrency"` + TargetValue float64 `json:"targetValue"` + Business uint64 `json:"business"` + Created string `json:"created"` + //nolint:tagliatelle // allow for clients + CustomerTransactionID string `json:"customerTransactionId"` Details struct { Reference string `json:"reference"` } `json:"details"` @@ -59,12 +62,14 @@ func (w *client) getProfiles() ([]profile, error) { return profiles, err } - b, err := io.ReadAll(res.Body) + defer res.Body.Close() + + body, err := io.ReadAll(res.Body) if err != nil { return nil, fmt.Errorf("failed to read response body: %w", err) } - err = json.Unmarshal(b, &profiles) + err = json.Unmarshal(body, &profiles) if err != nil { return nil, fmt.Errorf("failed to unmarshal profiles: %w", err) } @@ -79,9 +84,8 @@ func (w *client) getTransfers(profile *profile) ([]transfer, error) { offset := 0 for { - var ts []transfer - - req, err := http.NewRequest(http.MethodGet, w.endpoint("v1/transfers"), nil) + req, err := http.NewRequestWithContext(context.TODO(), + http.MethodGet, w.endpoint("v1/transfers"), http.NoBody) if err != nil { return transfers, err } @@ -97,19 +101,27 @@ func (w *client) getTransfers(profile *profile) ([]transfer, error) { return transfers, err } - b, err := io.ReadAll(res.Body) + body, err := io.ReadAll(res.Body) if err != nil { + res.Body.Close() + return nil, fmt.Errorf("failed to read response body: %w", err) } - err = json.Unmarshal(b, &ts) + if err = res.Body.Close(); err != nil { + return nil, fmt.Errorf("failed to close response body: %w", err) + } + + var transferList []transfer + + err = json.Unmarshal(body, &transferList) if err != nil { return nil, fmt.Errorf("failed to unmarshal transfers: %w", err) } - transfers = append(transfers, ts...) + transfers = append(transfers, transferList...) - if len(ts) < limit { + if len(transferList) < limit { break } @@ -122,7 +134,7 @@ func (w *client) getTransfers(profile *profile) ([]transfer, error) { func newClient(apiKey string) *client { httpClient := &http.Client{ Transport: &apiTransport{ - ApiKey: apiKey, + APIKey: apiKey, }, } diff --git a/pkg/bridge/connectors/wise/config.go b/internal/pkg/connectors/wise/config.go similarity index 100% rename from pkg/bridge/connectors/wise/config.go rename to internal/pkg/connectors/wise/config.go diff --git a/internal/pkg/connectors/wise/connector.go b/internal/pkg/connectors/wise/connector.go new file mode 100644 index 000000000..db90784ce --- /dev/null +++ b/internal/pkg/connectors/wise/connector.go @@ -0,0 +1,41 @@ +package wise + +import ( + "context" + + "github.com/numary/go-libs/sharedlogging" + "github.com/numary/payments/internal/pkg/integration" + "github.com/numary/payments/internal/pkg/task" +) + +const connectorName = "wise" + +type Connector struct { + logger sharedlogging.Logger + cfg Config +} + +func (c *Connector) Install(ctx task.ConnectorContext[TaskDescriptor]) error { + return ctx.Scheduler().Schedule(TaskDescriptor{ + Name: taskNameFetchProfiles, + }, false) +} + +func (c *Connector) Uninstall(ctx context.Context) error { + return nil +} + +func (c *Connector) Resolve(descriptor TaskDescriptor) task.Task { + return resolveTasks(c.logger, c.cfg)(descriptor) +} + +var _ integration.Connector[TaskDescriptor] = &Connector{} + +func newConnector(logger sharedlogging.Logger, cfg Config) *Connector { + return &Connector{ + logger: logger.WithFields(map[string]any{ + "component": "connector", + }), + cfg: cfg, + } +} diff --git a/pkg/bridge/connectors/wise/errors.go b/internal/pkg/connectors/wise/errors.go similarity index 100% rename from pkg/bridge/connectors/wise/errors.go rename to internal/pkg/connectors/wise/errors.go diff --git a/internal/pkg/connectors/wise/loader.go b/internal/pkg/connectors/wise/loader.go new file mode 100644 index 000000000..29069b36f --- /dev/null +++ b/internal/pkg/connectors/wise/loader.go @@ -0,0 +1,31 @@ +package wise + +import ( + "github.com/numary/go-libs/sharedlogging" + "github.com/numary/payments/internal/pkg/integration" +) + +type Loader struct{} + +const allowedTasks = 50 + +func (l *Loader) AllowTasks() int { + return allowedTasks +} + +func (l *Loader) Name() string { + return connectorName +} + +func (l *Loader) Load(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { + return newConnector(logger, config) +} + +func (l *Loader) ApplyDefaults(cfg Config) Config { + return cfg +} + +// NewLoader creates a new loader. +func NewLoader() *Loader { + return &Loader{} +} diff --git a/pkg/bridge/connectors/wise/task_fetch_profiles.go b/internal/pkg/connectors/wise/task_fetch_profiles.go similarity index 82% rename from pkg/bridge/connectors/wise/task_fetch_profiles.go rename to internal/pkg/connectors/wise/task_fetch_profiles.go index 891e53fd3..f7ce7efb9 100644 --- a/pkg/bridge/connectors/wise/task_fetch_profiles.go +++ b/internal/pkg/connectors/wise/task_fetch_profiles.go @@ -4,14 +4,15 @@ import ( "context" "fmt" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/task" ) func taskFetchProfiles(logger sharedlogging.Logger, client *client) task.Task { return func( ctx context.Context, - scheduler task.Scheduler[TaskDefinition], + scheduler task.Scheduler[TaskDescriptor], ) error { profiles, err := client.getProfiles() if err != nil { @@ -21,7 +22,7 @@ func taskFetchProfiles(logger sharedlogging.Logger, client *client) task.Task { for _, profile := range profiles { logger.Infof(fmt.Sprintf("scheduling fetch-transfers: %d", profile.ID)) - def := TaskDefinition{ + def := TaskDescriptor{ Name: taskNameFetchTransfers, ProfileID: profile.ID, } diff --git a/pkg/bridge/connectors/wise/task_fetch_transfers.go b/internal/pkg/connectors/wise/task_fetch_transfers.go similarity index 87% rename from pkg/bridge/connectors/wise/task_fetch_transfers.go rename to internal/pkg/connectors/wise/task_fetch_transfers.go index 1f5ebd0e5..cab773051 100644 --- a/pkg/bridge/connectors/wise/task_fetch_transfers.go +++ b/internal/pkg/connectors/wise/task_fetch_transfers.go @@ -5,15 +5,15 @@ import ( "fmt" "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/ingestion" - "github.com/numary/payments/pkg/bridge/task" + "github.com/numary/payments/internal/pkg/ingestion" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/payments/internal/pkg/task" ) func taskFetchTransfers(logger sharedlogging.Logger, client *client, profileID uint64) task.Task { return func( ctx context.Context, - scheduler task.Scheduler[TaskDefinition], + scheduler task.Scheduler[TaskDescriptor], ingester ingestion.Ingester, ) error { transfers, err := client.getTransfers(&profile{ diff --git a/pkg/bridge/connectors/wise/task_resolve.go b/internal/pkg/connectors/wise/task_resolve.go similarity index 76% rename from pkg/bridge/connectors/wise/task_resolve.go rename to internal/pkg/connectors/wise/task_resolve.go index 29b17c6a1..50ab9521b 100644 --- a/pkg/bridge/connectors/wise/task_resolve.go +++ b/internal/pkg/connectors/wise/task_resolve.go @@ -3,8 +3,9 @@ package wise import ( "fmt" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/task" ) const ( @@ -12,16 +13,16 @@ const ( taskNameFetchProfiles = "fetch-profiles" ) -// TaskDefinition is the definition of a task. -type TaskDefinition struct { +// TaskDescriptor is the definition of a task. +type TaskDescriptor struct { Name string `json:"name" yaml:"name" bson:"name"` ProfileID uint64 `json:"profileID" yaml:"profileID" bson:"profileID"` } -func resolveTasks(logger sharedlogging.Logger, config Config) func(taskDefinition TaskDefinition) task.Task { +func resolveTasks(logger sharedlogging.Logger, config Config) func(taskDefinition TaskDescriptor) task.Task { client := newClient(config.APIKey) - return func(taskDefinition TaskDefinition) task.Task { + return func(taskDefinition TaskDescriptor) task.Task { switch taskDefinition.Name { case taskNameFetchProfiles: return taskFetchProfiles(logger, client) diff --git a/pkg/bridge/utils/fifo.go b/internal/pkg/fifo/fifo.go similarity index 70% rename from pkg/bridge/utils/fifo.go rename to internal/pkg/fifo/fifo.go index 4b805e95a..38d79344a 100644 --- a/pkg/bridge/utils/fifo.go +++ b/internal/pkg/fifo/fifo.go @@ -1,4 +1,4 @@ -package utils +package fifo import "sync" @@ -7,30 +7,39 @@ type FIFO[ITEM any] struct { items []ITEM } -func (s *FIFO[ITEM]) Pop() (ret ITEM, ok bool) { +func (s *FIFO[ITEM]) Pop() (ITEM, bool) { s.mu.Lock() defer s.mu.Unlock() if len(s.items) == 0 { - return + var i ITEM + + return i, false } - ret = s.items[0] - ok = true + + ret := s.items[0] + if len(s.items) == 1 { s.items = make([]ITEM, 0) - return + + return ret, true } + s.items = s.items[1:] - return + + return ret, true } -func (s *FIFO[ITEM]) Peek() (ret ITEM, ok bool) { +func (s *FIFO[ITEM]) Peek() (ITEM, bool) { s.mu.Lock() defer s.mu.Unlock() if len(s.items) == 0 { - return + var i ITEM + + return i, false } + return s.items[0], true } @@ -39,11 +48,13 @@ func (s *FIFO[ITEM]) Push(i ITEM) *FIFO[ITEM] { defer s.mu.Unlock() s.items = append(s.items, i) + return s } func (s *FIFO[ITEM]) Empty() bool { s.mu.Lock() defer s.mu.Unlock() + return len(s.items) == 0 } diff --git a/pkg/bridge/ingestion/ingester.go b/internal/pkg/ingestion/ingester.go similarity index 90% rename from pkg/bridge/ingestion/ingester.go rename to internal/pkg/ingestion/ingester.go index 8a0a882c7..ccb05f77e 100644 --- a/pkg/bridge/ingestion/ingester.go +++ b/internal/pkg/ingestion/ingester.go @@ -7,9 +7,10 @@ import ( "fmt" "time" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedpublish" - payments "github.com/numary/payments/pkg" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" @@ -39,7 +40,7 @@ func NoOpIngester() IngesterFn { }) } -type defaultIngester struct { +type DefaultIngester struct { db *mongo.Database logger sharedlogging.Logger provider string @@ -49,22 +50,22 @@ type defaultIngester struct { type referenced payments.Referenced -func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]payments.Payment, error) { +func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]payments.Payment, error) { allPayments := make([]payments.Payment, 0) + for _, elem := range batch { logger := i.logger.WithFields(map[string]any{ "id": referenced(elem.Referenced), }) - var ( - update bson.M - ) + var update bson.M if elem.Adjustment != nil && elem.Payment != nil { return nil, errors.New("either adjustment or payment must be provided") } var err error + switch { case elem.Forward && elem.Adjustment != nil: update = bson.M{ @@ -138,9 +139,11 @@ func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym if err != nil { panic(err) } + logger.WithFields(map[string]interface{}{ "update": string(data), }).Debugf("Update payment") + ret := i.db.Collection(payments.Collection).FindOneAndUpdate( ctx, payments.Identifier{ @@ -152,40 +155,48 @@ func (i *defaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym ) if ret.Err() != nil { logger.Errorf("Error updating payment: %s", ret.Err()) - return nil, fmt.Errorf("error updating payment: %s", ret.Err()) + + return nil, fmt.Errorf("error updating payment: %w", ret.Err()) } - p := payments.Payment{} - err = ret.Decode(&p) + + payment := payments.Payment{} + + err = ret.Decode(&payment) if err != nil { return nil, err } - allPayments = append(allPayments, p) + allPayments = append(allPayments, payment) } + return allPayments, nil } func Filter[T any](objects []T, compareFn func(t T) bool) []T { ret := make([]T, 0) + for _, o := range objects { if compareFn(o) { ret = append(ret, o) } } + return ret } -func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState any) error { - +func (i *DefaultIngester) Ingest(ctx context.Context, batch Batch, commitState any) error { startingAt := time.Now() + i.logger.WithFields(map[string]interface{}{ "size": len(batch), "startingAt": startingAt, }).Debugf("Ingest batch") - err := i.db.Client().UseSession(ctx, func(ctx mongo.SessionContext) (err error) { + err := i.db.Client().UseSession(ctx, func(ctx mongo.SessionContext) error { var allPayments []payments.Payment - _, err = ctx.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { + _, err := ctx.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { + var err error + allPayments, err = i.processBatch(ctx, batch) if err != nil { return nil, err @@ -201,10 +212,8 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a "state": commitState, }, }, options.Update().SetUpsert(true)) - if err != nil { - return nil, err - } - return nil, nil + + return nil, err }) allPayments = Filter(allPayments, payments.Payment.HasInitialValue) @@ -220,6 +229,7 @@ func (i *defaultIngester) Ingest(ctx context.Context, batch Batch, commitState a }) if err != nil { sharedlogging.GetLogger(ctx).Errorf("Error ingesting batch: %s", err) + return err } @@ -240,8 +250,8 @@ func NewDefaultIngester( db *mongo.Database, logger sharedlogging.Logger, publisher sharedpublish.Publisher, -) *defaultIngester { - return &defaultIngester{ +) *DefaultIngester { + return &DefaultIngester{ provider: provider, descriptor: descriptor, db: db, diff --git a/pkg/bridge/ingestion/ingester_test.go b/internal/pkg/ingestion/ingester_test.go similarity index 94% rename from pkg/bridge/ingestion/ingester_test.go rename to internal/pkg/ingestion/ingester_test.go index c558da43f..54aa95c98 100644 --- a/pkg/bridge/ingestion/ingester_test.go +++ b/internal/pkg/ingestion/ingester_test.go @@ -5,8 +5,9 @@ import ( "testing" "time" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" "github.com/pborman/uuid" "github.com/stretchr/testify/require" "go.mongodb.org/mongo-driver/bson" @@ -18,8 +19,9 @@ type State struct { } func TestIngester(t *testing.T) { - mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)).Run("Schedule task", func(mt *mtest.T) { + t.Parallel() + mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)).Run("Schedule task", func(mt *mtest.T) { provider := "testing" ingester := NewDefaultIngester(provider, uuid.New(), mt.DB, sharedlogging.NewNoOpLogger(), nil) diff --git a/pkg/bridge/ingestion/message.go b/internal/pkg/ingestion/message.go similarity index 87% rename from pkg/bridge/ingestion/message.go rename to internal/pkg/ingestion/message.go index 2eb1e3b77..7dc06359d 100644 --- a/pkg/bridge/ingestion/message.go +++ b/internal/pkg/ingestion/message.go @@ -4,8 +4,9 @@ import ( "context" "time" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" ) const ( @@ -35,9 +36,10 @@ func NewEventSavedPayment(payment payments.SavedPayment) EventMessage { } } -func (i *defaultIngester) publish(ctx context.Context, topic string, ev EventMessage) { +func (i *DefaultIngester) publish(ctx context.Context, topic string, ev EventMessage) { if err := i.publisher.Publish(ctx, topic, ev); err != nil { sharedlogging.GetLogger(ctx).Errorf("Publishing message: %s", err) + return } } diff --git a/pkg/bridge/integration/connector.go b/internal/pkg/integration/connector.go similarity index 77% rename from pkg/bridge/integration/connector.go rename to internal/pkg/integration/connector.go index bc0931006..ab2672ef1 100644 --- a/pkg/bridge/integration/connector.go +++ b/internal/pkg/integration/connector.go @@ -3,11 +3,11 @@ package integration import ( "context" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/task" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/payments/internal/pkg/task" ) -// Connector provide entry point to a payment provider +// Connector provide entry point to a payment provider. type Connector[TaskDescriptor payments.TaskDescriptor] interface { // Install is used to start the connector. The implementation if in charge of scheduling all required resources. Install(ctx task.ConnectorContext[TaskDescriptor]) error @@ -24,18 +24,27 @@ type ConnectorBuilder[TaskDescriptor payments.TaskDescriptor] struct { install func(ctx task.ConnectorContext[TaskDescriptor]) error } -func (b *ConnectorBuilder[TaskDescriptor]) WithUninstall(uninstallFunction func(ctx context.Context) error) *ConnectorBuilder[TaskDescriptor] { +func (b *ConnectorBuilder[TaskDescriptor]) WithUninstall( + uninstallFunction func(ctx context.Context) error, +) *ConnectorBuilder[TaskDescriptor] { b.uninstall = uninstallFunction + return b } -func (b *ConnectorBuilder[TaskDescriptor]) WithResolve(resolveFunction func(name TaskDescriptor) task.Task) *ConnectorBuilder[TaskDescriptor] { +func (b *ConnectorBuilder[TaskDescriptor]) WithResolve( + resolveFunction func(name TaskDescriptor) task.Task, +) *ConnectorBuilder[TaskDescriptor] { b.resolve = resolveFunction + return b } -func (b *ConnectorBuilder[TaskDescriptor]) WithInstall(installFunction func(ctx task.ConnectorContext[TaskDescriptor]) error) *ConnectorBuilder[TaskDescriptor] { +func (b *ConnectorBuilder[TaskDescriptor]) WithInstall( + installFunction func(ctx task.ConnectorContext[TaskDescriptor]) error, +) *ConnectorBuilder[TaskDescriptor] { b.install = installFunction + return b } @@ -67,6 +76,7 @@ func (b *BuiltConnector[TaskDescriptor]) Install(ctx task.ConnectorContext[TaskD if b.install != nil { return b.install(ctx) } + return nil } @@ -74,6 +84,7 @@ func (b *BuiltConnector[TaskDescriptor]) Uninstall(ctx context.Context) error { if b.uninstall != nil { return b.uninstall(ctx) } + return nil } @@ -81,6 +92,7 @@ func (b *BuiltConnector[TaskDescriptor]) Resolve(name TaskDescriptor) task.Task if b.resolve != nil { return b.resolve(name) } + return nil } diff --git a/pkg/bridge/integration/loader.go b/internal/pkg/integration/loader.go similarity index 81% rename from pkg/bridge/integration/loader.go rename to internal/pkg/integration/loader.go index fd98f089f..54c31a308 100644 --- a/pkg/bridge/integration/loader.go +++ b/internal/pkg/integration/loader.go @@ -2,14 +2,16 @@ package integration import ( "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/internal/pkg/payments" ) type Loader[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] interface { Name() string Load(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] + // ApplyDefaults is used to fill default values of the provided configuration object ApplyDefaults(t ConnectorConfig) ConnectorConfig + // AllowTasks define how many task the connector can run // If too many tasks are scheduled by the connector, // those will be set to pending state and restarted later when some other tasks will be terminated @@ -23,18 +25,27 @@ type LoaderBuilder[ConnectorConfig payments.ConnectorConfigObject, TaskDescripto allowedTasks int } -func (b *LoaderBuilder[ConnectorConfig, TaskDescriptor]) WithLoad(loadFunction func(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor]) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { +func (b *LoaderBuilder[ConnectorConfig, TaskDescriptor]) WithLoad(loadFunction func(logger sharedlogging.Logger, + config ConnectorConfig) Connector[TaskDescriptor], +) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { b.loadFunction = loadFunction + return b } -func (b *LoaderBuilder[ConnectorConfig, TaskDescriptor]) WithApplyDefaults(applyDefaults func(t ConnectorConfig) ConnectorConfig) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { +func (b *LoaderBuilder[ConnectorConfig, TaskDescriptor]) WithApplyDefaults( + applyDefaults func(t ConnectorConfig) ConnectorConfig, +) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { b.applyDefaults = applyDefaults + return b } -func (b *LoaderBuilder[ConnectorConfig, TaskDescriptor]) WithAllowedTasks(v int) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { +func (b *LoaderBuilder[ConnectorConfig, + TaskDescriptor]) WithAllowedTasks(v int, +) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { b.allowedTasks = v + return b } @@ -47,7 +58,9 @@ func (b *LoaderBuilder[ConnectorConfig, TaskDescriptor]) Build() *BuiltLoader[Co } } -func NewLoaderBuilder[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor](name string) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { +func NewLoaderBuilder[ConnectorConfig payments.ConnectorConfigObject, + TaskDescriptor payments.TaskDescriptor](name string, +) *LoaderBuilder[ConnectorConfig, TaskDescriptor] { return &LoaderBuilder[ConnectorConfig, TaskDescriptor]{ name: name, } @@ -68,10 +81,13 @@ func (b *BuiltLoader[ConnectorConfig, TaskDescriptor]) Name() string { return b.name } -func (b *BuiltLoader[ConnectorConfig, TaskDescriptor]) Load(logger sharedlogging.Logger, config ConnectorConfig) Connector[TaskDescriptor] { +func (b *BuiltLoader[ConnectorConfig, TaskDescriptor]) Load(logger sharedlogging.Logger, + config ConnectorConfig, +) Connector[TaskDescriptor] { if b.loadFunction != nil { return b.loadFunction(logger, config) } + return nil } @@ -79,6 +95,7 @@ func (b *BuiltLoader[ConnectorConfig, TaskDescriptor]) ApplyDefaults(t Connector if b.applyDefaults != nil { return b.applyDefaults(t) } + return t } diff --git a/pkg/bridge/integration/manager.go b/internal/pkg/integration/manager.go similarity index 81% rename from pkg/bridge/integration/manager.go rename to internal/pkg/integration/manager.go index 3a809b406..4b9b3f842 100644 --- a/pkg/bridge/integration/manager.go +++ b/internal/pkg/integration/manager.go @@ -4,8 +4,8 @@ import ( "context" "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/task" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/payments/internal/pkg/task" "github.com/pkg/errors" ) @@ -17,15 +17,6 @@ var ( ErrAlreadyRunning = errors.New("already running") ) -type TaskSchedulerFactory[TaskDescriptor payments.TaskDescriptor] interface { - Make(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] -} -type TaskSchedulerFactoryFn[TaskDescriptor payments.TaskDescriptor] func(resolver task.Resolver[TaskDescriptor], maxProcesses int) *task.DefaultTaskScheduler[TaskDescriptor] - -func (fn TaskSchedulerFactoryFn[TaskDescriptor]) Make(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] { - return fn(resolver, maxTasks) -} - type ConnectorManager[ Config payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor, @@ -39,8 +30,8 @@ type ConnectorManager[ } func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Enable(ctx context.Context) error { - l.logger.Info("Enabling connector") + err := l.store.Enable(ctx, l.loader.Name()) if err != nil { return err @@ -49,9 +40,11 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Enable(ctx context.C return nil } -func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) ReadConfig(ctx context.Context) (*ConnectorConfig, error) { - +func (l *ConnectorManager[ConnectorConfig, + TaskDescriptor]) ReadConfig(ctx context.Context, +) (*ConnectorConfig, error) { var config ConnectorConfig + err := l.store.ReadConfig(ctx, l.loader.Name(), &config) if err != nil { return &config, err @@ -67,8 +60,7 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) load(config Connecto l.scheduler = l.schedulerFactory.Make(l.connector, l.loader.AllowTasks()) } -func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Install(ctx context.Context, config ConnectorConfig) (err error) { - +func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Install(ctx context.Context, config ConnectorConfig) error { l.logger.WithFields(map[string]interface{}{ "config": config, }).Infof("Install connector %s", l.loader.Name()) @@ -76,15 +68,19 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Install(ctx context. isInstalled, err := l.store.IsInstalled(ctx, l.loader.Name()) if err != nil { l.logger.Errorf("Error checking if connector is installed: %s", err) + return err } + if isInstalled { l.logger.Errorf("Connector already installed") + return ErrAlreadyInstalled } config = l.loader.ApplyDefaults(config) - if err := config.Validate(); err != nil { + + if err = config.Validate(); err != nil { return err } @@ -93,6 +89,7 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Install(ctx context. err = l.connector.Install(task.NewConnectorContext[TaskDescriptor](context.Background(), l.scheduler)) if err != nil { l.logger.Errorf("Error starting connector: %s", err) + return err } @@ -107,16 +104,18 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Install(ctx context. } func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Uninstall(ctx context.Context) error { - l.logger.Infof("Uninstalling connector") isInstalled, err := l.IsInstalled(ctx) if err != nil { l.logger.Errorf("Error checking if connector is installed: %s", err) + return err } + if !isInstalled { l.logger.Errorf("Connector not installed") + return ErrNotInstalled } @@ -134,6 +133,7 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Uninstall(ctx contex if err != nil { return err } + l.logger.Info("Connector uninstalled") return nil @@ -146,8 +146,10 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Restore(ctx context. if err != nil { return err } + if !installed { l.logger.Info("Not installed, skip") + return ErrNotInstalled } @@ -155,8 +157,10 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Restore(ctx context. if err != nil { return err } + if !enabled { l.logger.Info("Not enabled, skip") + return ErrNotEnabled } @@ -174,10 +178,12 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Restore(ctx context. err = l.scheduler.Restore(ctx) if err != nil { l.logger.Errorf("Unable to restore scheduler: %s", err) + return err } l.logger.Info("State restored") + return nil } @@ -195,11 +201,15 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) IsInstalled(ctx cont return l.store.IsInstalled(ctx, l.loader.Name()) } -func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) ListTasksStates(ctx context.Context) ([]payments.TaskState[TaskDescriptor], error) { +func (l *ConnectorManager[ConnectorConfig, + TaskDescriptor]) ListTasksStates(ctx context.Context, +) ([]payments.TaskState[TaskDescriptor], error) { return l.scheduler.ListTasks(ctx) } -func (l ConnectorManager[Config, TaskDescriptor]) ReadTaskState(ctx context.Context, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { +func (l *ConnectorManager[Config, TaskDescriptor]) ReadTaskState(ctx context.Context, + descriptor TaskDescriptor, +) (*payments.TaskState[TaskDescriptor], error) { return l.scheduler.ReadTask(ctx, descriptor) } @@ -208,10 +218,12 @@ func (l *ConnectorManager[ConnectorConfig, TaskDescriptor]) Reset(ctx context.Co if err != nil { return err } + err = l.Uninstall(ctx) if err != nil { return err } + return l.Install(ctx, *config) } diff --git a/pkg/bridge/integration/manager_test.go b/internal/pkg/integration/manager_test.go similarity index 88% rename from pkg/bridge/integration/manager_test.go rename to internal/pkg/integration/manager_test.go index 6809e2469..895c84521 100644 --- a/pkg/bridge/integration/manager_test.go +++ b/internal/pkg/integration/manager_test.go @@ -4,10 +4,11 @@ import ( "context" "testing" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/payments/internal/pkg/task" + "github.com/numary/go-libs/sharedlogging" "github.com/numary/go-libs/sharedlogging/sharedlogginglogrus" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/task" "github.com/pborman/uuid" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" @@ -22,7 +23,8 @@ func ChanClosed[T any](ch chan T) bool { } } -type testContext[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor] struct { +type testContext[ConnectorConfig payments.ConnectorConfigObject, + TaskDescriptor payments.TaskDescriptor] struct { manager *ConnectorManager[ConnectorConfig, TaskDescriptor] taskStore task.Store[TaskDescriptor] connectorStore ConnectorStore @@ -30,17 +32,24 @@ type testContext[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor provider string } -func withManager[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor payments.TaskDescriptor](builder *ConnectorBuilder[TaskDescriptor], callback func(ctx *testContext[ConnectorConfig, TaskDescriptor])) { +func withManager[ConnectorConfig payments.ConnectorConfigObject, + TaskDescriptor payments.TaskDescriptor](builder *ConnectorBuilder[TaskDescriptor], + callback func(ctx *testContext[ConnectorConfig, TaskDescriptor]), +) { l := logrus.New() if testing.Verbose() { l.SetLevel(logrus.DebugLevel) } + logger := sharedlogginglogrus.New(l) taskStore := task.NewInMemoryStore[TaskDescriptor]() managerStore := NewInMemoryStore() provider := uuid.New() - schedulerFactory := TaskSchedulerFactoryFn[TaskDescriptor](func(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] { - return task.NewDefaultScheduler[TaskDescriptor](provider, logger, taskStore, task.DefaultContainerFactory, resolver, maxTasks) + schedulerFactory := TaskSchedulerFactoryFn[TaskDescriptor](func(resolver task.Resolver[TaskDescriptor], + maxTasks int, + ) *task.DefaultTaskScheduler[TaskDescriptor] { + return task.NewDefaultScheduler[TaskDescriptor](provider, logger, taskStore, + task.DefaultContainerFactory, resolver, maxTasks) }) loader := NewLoaderBuilder[ConnectorConfig, TaskDescriptor](provider). @@ -49,7 +58,9 @@ func withManager[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor }). WithAllowedTasks(1). Build() - manager := NewConnectorManager[ConnectorConfig, TaskDescriptor](logger, managerStore, loader, schedulerFactory) + manager := NewConnectorManager[ConnectorConfig, TaskDescriptor](logger, managerStore, loader, + schedulerFactory) + defer func() { _ = manager.Uninstall(context.Background()) }() @@ -64,10 +75,13 @@ func withManager[ConnectorConfig payments.ConnectorConfigObject, TaskDescriptor } func TestInstallConnector(t *testing.T) { + t.Parallel() + installed := make(chan struct{}) builder := NewConnectorBuilder[any](). WithInstall(func(ctx task.ConnectorContext[any]) error { close(installed) + return nil }) withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { @@ -81,6 +95,8 @@ func TestInstallConnector(t *testing.T) { } func TestUninstallConnector(t *testing.T) { + t.Parallel() + uninstalled := make(chan struct{}) taskTerminated := make(chan struct{}) taskStarted := make(chan struct{}) @@ -101,6 +117,7 @@ func TestUninstallConnector(t *testing.T) { }). WithUninstall(func(ctx context.Context) error { close(uninstalled) + return nil }) withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { @@ -109,7 +126,7 @@ func TestUninstallConnector(t *testing.T) { <-taskStarted require.NoError(t, tc.manager.Uninstall(context.Background())) require.True(t, ChanClosed(uninstalled)) - //TODO: We need to give a chance to the connector to properly stop execution + // TODO: We need to give a chance to the connector to properly stop execution require.True(t, ChanClosed(taskTerminated)) isInstalled, err := tc.manager.IsInstalled(context.Background()) @@ -119,10 +136,13 @@ func TestUninstallConnector(t *testing.T) { } func TestDisableConnector(t *testing.T) { + t.Parallel() + uninstalled := make(chan struct{}) builder := NewConnectorBuilder[any](). WithUninstall(func(ctx context.Context) error { close(uninstalled) + return nil }) withManager[payments.EmptyConnectorConfig, any](builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { @@ -141,6 +161,8 @@ func TestDisableConnector(t *testing.T) { } func TestEnableConnector(t *testing.T) { + t.Parallel() + builder := NewConnectorBuilder[any]() withManager[payments.EmptyConnectorConfig, any](builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { err := tc.connectorStore.Enable(context.Background(), tc.loader.Name()) @@ -152,6 +174,8 @@ func TestEnableConnector(t *testing.T) { } func TestRestoreEnabledConnector(t *testing.T) { + t.Parallel() + builder := NewConnectorBuilder[any]() withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { err := tc.connectorStore.Install(context.Background(), tc.loader.Name(), payments.EmptyConnectorConfig{}) @@ -164,6 +188,8 @@ func TestRestoreEnabledConnector(t *testing.T) { } func TestRestoreNotInstalledConnector(t *testing.T) { + t.Parallel() + builder := NewConnectorBuilder[any]() withManager(builder, func(tc *testContext[payments.EmptyConnectorConfig, any]) { err := tc.manager.Restore(context.Background()) diff --git a/pkg/bridge/integration/store.go b/internal/pkg/integration/store.go similarity index 63% rename from pkg/bridge/integration/store.go rename to internal/pkg/integration/store.go index 5642d1cdf..c5f3b24a5 100644 --- a/pkg/bridge/integration/store.go +++ b/internal/pkg/integration/store.go @@ -4,7 +4,8 @@ import ( "context" "reflect" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/internal/pkg/payments" + "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" @@ -22,78 +23,85 @@ type ConnectorStore interface { ReadConfig(ctx context.Context, name string, to interface{}) error } -type inMemoryConnectorStore struct { +type InMemoryConnectorStore struct { installed map[string]bool disabled map[string]bool configs map[string]any } -func (i *inMemoryConnectorStore) Uninstall(ctx context.Context, name string) error { +func (i *InMemoryConnectorStore) Uninstall(ctx context.Context, name string) error { delete(i.installed, name) delete(i.configs, name) delete(i.disabled, name) + return nil } -func (i *inMemoryConnectorStore) IsInstalled(ctx context.Context, name string) (bool, error) { +func (i *InMemoryConnectorStore) IsInstalled(ctx context.Context, name string) (bool, error) { return i.installed[name], nil } -func (i *inMemoryConnectorStore) Install(ctx context.Context, name string, config any) error { +func (i *InMemoryConnectorStore) Install(ctx context.Context, name string, config any) error { i.installed[name] = true i.configs[name] = config i.disabled[name] = false + return nil } -func (i inMemoryConnectorStore) UpdateConfig(ctx context.Context, name string, config any) error { +func (i *InMemoryConnectorStore) UpdateConfig(ctx context.Context, name string, config any) error { i.configs[name] = config + return nil } -func (i inMemoryConnectorStore) Enable(ctx context.Context, name string) error { +func (i *InMemoryConnectorStore) Enable(ctx context.Context, name string) error { i.disabled[name] = false + return nil } -func (i inMemoryConnectorStore) Disable(ctx context.Context, name string) error { +func (i *InMemoryConnectorStore) Disable(ctx context.Context, name string) error { i.disabled[name] = true + return nil } -func (i inMemoryConnectorStore) IsEnabled(ctx context.Context, name string) (bool, error) { +func (i *InMemoryConnectorStore) IsEnabled(ctx context.Context, name string) (bool, error) { disabled, ok := i.disabled[name] if !ok { return false, nil } + return !disabled, nil } -func (i inMemoryConnectorStore) ReadConfig(ctx context.Context, name string, to interface{}) error { +func (i *InMemoryConnectorStore) ReadConfig(ctx context.Context, name string, to interface{}) error { cfg, ok := i.configs[name] if !ok { return ErrNotFound } reflect.ValueOf(to).Elem().Set(reflect.ValueOf(cfg)) + return nil } -var _ ConnectorStore = &inMemoryConnectorStore{} +var _ ConnectorStore = &InMemoryConnectorStore{} -func NewInMemoryStore() *inMemoryConnectorStore { - return &inMemoryConnectorStore{ +func NewInMemoryStore() *InMemoryConnectorStore { + return &InMemoryConnectorStore{ installed: make(map[string]bool), disabled: make(map[string]bool), configs: make(map[string]any), } } -type mongodbConnectorStore struct { +type MongodbConnectorStore struct { db *mongo.Database } -func (m *mongodbConnectorStore) Uninstall(ctx context.Context, name string) error { +func (m *MongodbConnectorStore) Uninstall(ctx context.Context, name string) error { return m.db.Client().UseSession(ctx, func(ctx mongo.SessionContext) error { _, err := ctx.WithTransaction(ctx, func(ctx mongo.SessionContext) (interface{}, error) { _, err := m.db.Collection(payments.TasksCollection).DeleteMany(ctx, map[string]any{ @@ -102,38 +110,44 @@ func (m *mongodbConnectorStore) Uninstall(ctx context.Context, name string) erro if err != nil { return nil, errors.Wrap(err, "deleting tasks") } + _, err = m.db.Collection(payments.Collection).DeleteMany(ctx, map[string]any{ "provider": name, }) if err != nil { return nil, errors.Wrap(err, "deleting payments") } + _, err = m.db.Collection(payments.ConnectorsCollection).DeleteOne(ctx, map[string]any{ "provider": name, }) if err != nil { return nil, errors.Wrap(err, "deleting configuration") } + return nil, nil }) + return err }) } -func (m *mongodbConnectorStore) IsInstalled(ctx context.Context, name string) (bool, error) { +func (m *MongodbConnectorStore) IsInstalled(ctx context.Context, name string) (bool, error) { ret := m.db.Collection(payments.ConnectorsCollection).FindOne(ctx, map[string]any{ "provider": name, }) - if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { + if ret.Err() != nil { + if errors.Is(ret.Err(), mongo.ErrNoDocuments) { + return false, nil + } + return false, ret.Err() } - if ret.Err() == mongo.ErrNoDocuments { - return false, nil - } + return true, nil } -func (m *mongodbConnectorStore) Install(ctx context.Context, name string, config any) error { +func (m *MongodbConnectorStore) Install(ctx context.Context, name string, config any) error { _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ @@ -141,10 +155,11 @@ func (m *mongodbConnectorStore) Install(ctx context.Context, name string, config "config": config, }, }, options.Update().SetUpsert(true)) + return err } -func (m *mongodbConnectorStore) UpdateConfig(ctx context.Context, name string, config any) error { +func (m *MongodbConnectorStore) UpdateConfig(ctx context.Context, name string, config any) error { _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ @@ -152,10 +167,11 @@ func (m *mongodbConnectorStore) UpdateConfig(ctx context.Context, name string, c "config": config, }, }) + return err } -func (m *mongodbConnectorStore) Enable(ctx context.Context, name string) error { +func (m *MongodbConnectorStore) Enable(ctx context.Context, name string) error { _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ @@ -163,10 +179,11 @@ func (m *mongodbConnectorStore) Enable(ctx context.Context, name string) error { "disabled": false, }, }) + return err } -func (m *mongodbConnectorStore) Disable(ctx context.Context, name string) error { +func (m *MongodbConnectorStore) Disable(ctx context.Context, name string) error { _, err := m.db.Collection(payments.ConnectorsCollection).UpdateOne(ctx, map[string]any{ "provider": name, }, map[string]any{ @@ -174,51 +191,57 @@ func (m *mongodbConnectorStore) Disable(ctx context.Context, name string) error "disabled": true, }, }) + return err } -func (m *mongodbConnectorStore) IsEnabled(ctx context.Context, name string) (bool, error) { +func (m *MongodbConnectorStore) IsEnabled(ctx context.Context, name string) (bool, error) { ret := m.db.Collection(payments.ConnectorsCollection).FindOne(ctx, map[string]any{ "provider": name, }) - if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { + + if ret.Err() != nil { + if errors.Is(ret.Err(), mongo.ErrNoDocuments) { + return false, ErrNotInstalled + } + return false, ret.Err() } - if ret.Err() == mongo.ErrNoDocuments { - return false, ErrNotInstalled - } - p := payments.Connector[payments.EmptyConnectorConfig]{} - err := ret.Decode(&p) - if err != nil { + + paymentsConnector := payments.Connector[payments.EmptyConnectorConfig]{} + + if err := ret.Decode(&paymentsConnector); err != nil { return false, err } - return !p.Disabled, nil + return !paymentsConnector.Disabled, nil } -func (m *mongodbConnectorStore) ReadConfig(ctx context.Context, name string, to interface{}) error { +func (m *MongodbConnectorStore) ReadConfig(ctx context.Context, name string, to interface{}) error { ret := m.db.Collection(payments.ConnectorsCollection).FindOne(ctx, map[string]any{ "provider": name, }) - if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { + if ret.Err() != nil { + if errors.Is(ret.Err(), mongo.ErrNoDocuments) { + return errors.New("not installed") + } + return ret.Err() } - if ret.Err() == mongo.ErrNoDocuments { - return errors.New("not installed") - } - p := payments.Connector[bson.Raw]{} - err := ret.Decode(&p) - if err != nil { + + paymentsConnector := payments.Connector[bson.Raw]{} + + if err := ret.Decode(&paymentsConnector); err != nil { return err } - return bson.Unmarshal(p.Config, to) + return bson.Unmarshal(paymentsConnector.Config, to) } -var _ ConnectorStore = &mongodbConnectorStore{} +var _ ConnectorStore = &MongodbConnectorStore{} -func NewMongoDBConnectorStore(db *mongo.Database) *mongodbConnectorStore { - return &mongodbConnectorStore{ +func NewMongoDBConnectorStore(db *mongo.Database) *MongodbConnectorStore { + return &MongodbConnectorStore{ db: db, } } diff --git a/internal/pkg/integration/taskscheduler.go b/internal/pkg/integration/taskscheduler.go new file mode 100644 index 000000000..df87a0ca7 --- /dev/null +++ b/internal/pkg/integration/taskscheduler.go @@ -0,0 +1,19 @@ +package integration + +import ( + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/payments/internal/pkg/task" +) + +type TaskSchedulerFactory[TaskDescriptor payments.TaskDescriptor] interface { + Make(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] +} + +type TaskSchedulerFactoryFn[TaskDescriptor payments.TaskDescriptor] func(resolver task.Resolver[TaskDescriptor], + maxProcesses int) *task.DefaultTaskScheduler[TaskDescriptor] + +func (fn TaskSchedulerFactoryFn[TaskDescriptor]) Make(resolver task.Resolver[TaskDescriptor], + maxTasks int, +) *task.DefaultTaskScheduler[TaskDescriptor] { + return fn(resolver, maxTasks) +} diff --git a/internal/pkg/payments/collections.go b/internal/pkg/payments/collections.go new file mode 100644 index 000000000..665adb0f5 --- /dev/null +++ b/internal/pkg/payments/collections.go @@ -0,0 +1,7 @@ +package payments + +const ( + Collection = "Payments" + ConnectorsCollection = "Connectors" + TasksCollection = "Tasks" +) diff --git a/pkg/config.go b/internal/pkg/payments/config.go similarity index 100% rename from pkg/config.go rename to internal/pkg/payments/config.go diff --git a/pkg/connector.go b/internal/pkg/payments/connector.go similarity index 100% rename from pkg/connector.go rename to internal/pkg/payments/connector.go diff --git a/pkg/payment.go b/internal/pkg/payments/payment.go similarity index 92% rename from pkg/payment.go rename to internal/pkg/payments/payment.go index 279a02c98..a2be2c180 100644 --- a/pkg/payment.go +++ b/internal/pkg/payments/payment.go @@ -8,8 +8,10 @@ import ( "github.com/gibson042/canonicaljson-go" ) -type Scheme string -type Status string +type ( + Scheme string + Status string +) const ( SchemeUnknown Scheme = "unknown" @@ -31,9 +33,9 @@ const ( SchemeGooglePay Scheme = "google pay" SchemeA2A Scheme = "a2a" - SchemeAchDebit Scheme = "ach debit" - SchemeAch Scheme = "ach" - SchemeRtp Scheme = "rtp" + SchemeACHDebit Scheme = "ach debit" + SchemeACH Scheme = "ach" + SchemeRTP Scheme = "rtp" TypePayIn = "pay-in" TypePayout = "payout" @@ -62,6 +64,7 @@ func (i Identifier) String() string { if err != nil { panic(err) } + return base64.URLEncoding.EncodeToString(data) } @@ -70,11 +73,14 @@ func IdentifierFromString(v string) (*Identifier, error) { if err != nil { return nil, err } + ret := Identifier{} + err = canonicaljson.Unmarshal(data, &ret) if err != nil { return nil, err } + return &ret, nil } @@ -107,6 +113,7 @@ func (p Payment) HasInitialValue() bool { func (p Payment) MarshalJSON() ([]byte, error) { type Aux Payment + return json.Marshal(struct { ID string `json:"id"` Aux @@ -117,18 +124,20 @@ func (p Payment) MarshalJSON() ([]byte, error) { } func (p Payment) Computed() SavedPayment { - aggregatedAdjustmentValue := int64(0) amount := int64(0) + for i := 0; i < len(p.Adjustments)-1; i++ { - a := p.Adjustments[i] - if a.Absolute { - amount = a.Amount + adjustment := p.Adjustments[i] + if adjustment.Absolute { + amount = adjustment.Amount + break } - aggregatedAdjustmentValue += a.Amount + aggregatedAdjustmentValue += adjustment.Amount } + if amount == 0 { amount = p.InitialAmount + aggregatedAdjustmentValue } @@ -150,6 +159,7 @@ type SavedPayment struct { func (p SavedPayment) MarshalJSON() ([]byte, error) { type Aux SavedPayment + return json.Marshal(struct { ID string `json:"id"` Aux diff --git a/pkg/payment_test.go b/internal/pkg/payments/payment_test.go similarity index 91% rename from pkg/payment_test.go rename to internal/pkg/payments/payment_test.go index 75fd19d8d..94a8ce8d9 100644 --- a/pkg/payment_test.go +++ b/internal/pkg/payments/payment_test.go @@ -9,8 +9,11 @@ import ( ) func TestPayment(t *testing.T) { + t.Parallel() + now := time.Now() - p := Payment{ + + payment := Payment{ Identifier: Identifier{ Provider: "testing", Referenced: Referenced{ @@ -38,6 +41,8 @@ func TestPayment(t *testing.T) { }, }, } - cp := p.Computed() + + cp := payment.Computed() + require.EqualValues(t, 110, cp.Amount) } diff --git a/pkg/task.go b/internal/pkg/payments/task.go similarity index 99% rename from pkg/task.go rename to internal/pkg/payments/task.go index 3ba080a51..7e7c3cb56 100644 --- a/pkg/task.go +++ b/internal/pkg/payments/task.go @@ -11,7 +11,7 @@ import ( type TaskStatus string -var ( +const ( TaskStatusStopped TaskStatus = "stopped" TaskStatusPending TaskStatus = "pending" TaskStatusActive TaskStatus = "active" @@ -47,6 +47,7 @@ func DescriptorFromID(id string, to interface{}) { if err != nil { panic(err) } + err = canonicaljson.Unmarshal(data, to) if err != nil { panic(err) diff --git a/pkg/paymentstesting/mock.go b/internal/pkg/paymentstesting/mock.go similarity index 95% rename from pkg/paymentstesting/mock.go rename to internal/pkg/paymentstesting/mock.go index 9b4ac89f6..30d9afa15 100644 --- a/pkg/paymentstesting/mock.go +++ b/internal/pkg/paymentstesting/mock.go @@ -15,6 +15,7 @@ import ( ) func RunWithMock(t *testing.T, fn func(t *mtest.T)) { + t.Helper() pool, err := dockertest.NewPool("") if err != nil { @@ -43,6 +44,7 @@ func RunWithMock(t *testing.T, fn func(t *mtest.T)) { } uri := "mongodb://localhost:" + resource.GetPort("27017/tcp") + client, err := mongo.NewClient(options.Client().ApplyURI(uri)) if err != nil { panic(err) @@ -50,15 +52,16 @@ func RunWithMock(t *testing.T, fn func(t *mtest.T)) { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) defer cancel() - err = client.Connect(ctx) - if err != nil { + + if err = client.Connect(ctx); err != nil { panic(err) } // exponential backoff-retry, because the application in the container might not be ready to accept connections yet - if err := pool.Retry(func() error { + if err = pool.Retry(func() error { ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second)) defer cancel() + return client.Ping(ctx, readpref.Primary()) }); err != nil { panic("could not connect to database, last error: " + err.Error()) diff --git a/internal/pkg/task/container.go b/internal/pkg/task/container.go new file mode 100644 index 000000000..54ad45fa3 --- /dev/null +++ b/internal/pkg/task/container.go @@ -0,0 +1,25 @@ +package task + +import ( + "context" + + "github.com/numary/payments/internal/pkg/payments" + + "go.uber.org/dig" +) + +type ContainerFactory interface { + Create(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) +} +type ContainerFactoryFn func(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) + +func (fn ContainerFactoryFn) Create(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) { + return fn(ctx, descriptor) +} + +// nolint: gochecknoglobals,golint,stylecheck // allow global +var DefaultContainerFactory = ContainerFactoryFn(func(ctx context.Context, + descriptor payments.TaskDescriptor, +) (*dig.Container, error) { + return dig.New(), nil +}) diff --git a/internal/pkg/task/context.go b/internal/pkg/task/context.go new file mode 100644 index 000000000..d2083ca80 --- /dev/null +++ b/internal/pkg/task/context.go @@ -0,0 +1,34 @@ +package task + +import ( + "context" + + "github.com/numary/payments/internal/pkg/payments" +) + +type ConnectorContext[TaskDescriptor payments.TaskDescriptor] interface { + Context() context.Context + Scheduler() Scheduler[TaskDescriptor] +} + +type ConnectorCtx[TaskDescriptor payments.TaskDescriptor] struct { + ctx context.Context + scheduler Scheduler[TaskDescriptor] +} + +func (ctx *ConnectorCtx[TaskDescriptor]) Context() context.Context { + return ctx.ctx +} + +func (ctx *ConnectorCtx[TaskDescriptor]) Scheduler() Scheduler[TaskDescriptor] { + return ctx.scheduler +} + +func NewConnectorContext[TaskDescriptor payments.TaskDescriptor](ctx context.Context, + scheduler Scheduler[TaskDescriptor], +) *ConnectorCtx[TaskDescriptor] { + return &ConnectorCtx[TaskDescriptor]{ + ctx: ctx, + scheduler: scheduler, + } +} diff --git a/internal/pkg/task/resolver.go b/internal/pkg/task/resolver.go new file mode 100644 index 000000000..91e023885 --- /dev/null +++ b/internal/pkg/task/resolver.go @@ -0,0 +1,14 @@ +package task + +import ( + "github.com/numary/payments/internal/pkg/payments" +) + +type Resolver[TaskDescriptor payments.TaskDescriptor] interface { + Resolve(descriptor TaskDescriptor) Task +} +type ResolverFn[TaskDescriptor payments.TaskDescriptor] func(descriptor TaskDescriptor) Task + +func (fn ResolverFn[TaskDescriptor]) Resolve(descriptor TaskDescriptor) Task { + return fn(descriptor) +} diff --git a/pkg/bridge/task/scheduler.go b/internal/pkg/task/scheduler.go similarity index 76% rename from pkg/bridge/task/scheduler.go rename to internal/pkg/task/scheduler.go index fa6f36f89..7ae39cc79 100644 --- a/pkg/bridge/task/scheduler.go +++ b/internal/pkg/task/scheduler.go @@ -7,11 +7,11 @@ import ( "sync" "time" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" - "go.uber.org/dig" ) var ( @@ -19,28 +19,6 @@ var ( ErrUnableToResolve = errors.New("unable to resolve task") ) -type Resolver[TaskDescriptor payments.TaskDescriptor] interface { - Resolve(descriptor TaskDescriptor) Task -} -type ResolverFn[TaskDescriptor payments.TaskDescriptor] func(descriptor TaskDescriptor) Task - -func (fn ResolverFn[TaskDescriptor]) Resolve(descriptor TaskDescriptor) Task { - return fn(descriptor) -} - -type ContainerFactory interface { - Create(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) -} -type ContainerFactoryFn func(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) - -func (fn ContainerFactoryFn) Create(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) { - return fn(ctx, descriptor) -} - -var DefaultContainerFactory = ContainerFactoryFn(func(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) { - return dig.New(), nil -}) - type Scheduler[TaskDescriptor payments.TaskDescriptor] interface { Schedule(p TaskDescriptor, restart bool) error } @@ -64,21 +42,23 @@ type DefaultTaskScheduler[TaskDescriptor payments.TaskDescriptor] struct { stopped bool } -func (s *DefaultTaskScheduler[TaskDescriptor]) ListTasks(ctx context.Context) ([]payments.TaskState[TaskDescriptor], error) { +func (s *DefaultTaskScheduler[TaskDescriptor]) ListTasks(ctx context.Context, +) ([]payments.TaskState[TaskDescriptor], error) { return s.store.ListTaskStates(ctx, s.provider) } -func (s *DefaultTaskScheduler[TaskDescriptor]) ReadTask(ctx context.Context, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { +func (s *DefaultTaskScheduler[TaskDescriptor]) ReadTask(ctx context.Context, + descriptor TaskDescriptor, +) (*payments.TaskState[TaskDescriptor], error) { return s.store.ReadTaskState(ctx, s.provider, descriptor) } func (s *DefaultTaskScheduler[TaskDescriptor]) Schedule(descriptor TaskDescriptor, restart bool) error { - s.mu.Lock() defer s.mu.Unlock() - taskId := payments.IDFromDescriptor(descriptor) - if _, ok := s.tasks[taskId]; ok { + taskID := payments.IDFromDescriptor(descriptor) + if _, ok := s.tasks[taskID]; ok { return ErrAlreadyScheduled } @@ -94,11 +74,11 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) Schedule(descriptor TaskDescripto if err != nil { return errors.Wrap(err, "stacking task") } + return nil } - err := s.startTask(descriptor) - if err != nil { + if err := s.startTask(descriptor); err != nil { return errors.Wrap(err, "starting task") } @@ -106,33 +86,35 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) Schedule(descriptor TaskDescripto } func (s *DefaultTaskScheduler[TaskDescriptor]) Shutdown(ctx context.Context) error { - s.mu.Lock() s.stopped = true s.mu.Unlock() s.logger.Infof("Stopping scheduler...") - for name, p := range s.tasks { - p.logger.Debugf("Stopping task") - if p.stopChan != nil { + + for name, task := range s.tasks { + task.logger.Debugf("Stopping task") + + if task.stopChan != nil { errCh := make(chan struct{}) - p.stopChan <- errCh + task.stopChan <- errCh select { case <-errCh: case <-time.After(time.Second): // TODO: Make configurable - p.logger.Debugf("Stopping using stop chan timeout, cancelling context") - p.cancel() + task.logger.Debugf("Stopping using stop chan timeout, canceling context") + task.cancel() } } else { - p.cancel() + task.cancel() } + delete(s.tasks, name) } + return nil } func (s *DefaultTaskScheduler[TaskDescriptor]) Restore(ctx context.Context) error { - states, err := s.store.ListTaskStatesByStatus(ctx, s.provider, payments.TaskStatusActive) if err != nil { return err @@ -148,20 +130,23 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) Restore(ctx context.Context) erro return nil } -func (s *DefaultTaskScheduler[TaskDescriptor]) registerTaskError(ctx context.Context, holder *taskHolder[TaskDescriptor], taskErr any) { +func (s *DefaultTaskScheduler[TaskDescriptor]) registerTaskError(ctx context.Context, + holder *taskHolder[TaskDescriptor], taskErr any, +) { + var taskError string - var pe string switch v := taskErr.(type) { case error: - pe = v.Error() + taskError = v.Error() default: - pe = fmt.Sprintf("%s", v) + taskError = fmt.Sprintf("%s", v) } holder.logger.Errorf("Task terminated with error: %s", taskErr) - err := s.store.UpdateTaskStatus(ctx, s.provider, holder.descriptor, payments.TaskStatusFailed, pe) + + err := s.store.UpdateTaskStatus(ctx, s.provider, holder.descriptor, payments.TaskStatusFailed, taskError) if err != nil { - holder.logger.Error("Error updating task status: %s", pe) + holder.logger.Error("Error updating task status: %s", taskError) } } @@ -176,18 +161,22 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) deleteTask(holder *taskHolder[Tas oldestPendingTask, err := s.store.ReadOldestPendingTask(context.Background(), s.provider) if err != nil { - if err == ErrNotFound { + if errors.Is(err, ErrNotFound) { return } + sharedlogging.Error(err) + return } p := s.resolver.Resolve(oldestPendingTask.Descriptor) if p == nil { sharedlogging.Errorf("unable to resolve task") + return } + err = s.startTask(oldestPendingTask.Descriptor) if err != nil { sharedlogging.Error(err) @@ -197,14 +186,15 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) deleteTask(holder *taskHolder[Tas type StopChan chan chan struct{} func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescriptor) error { - ps, err := s.store.FindTaskAndUpdateStatus(context.Background(), s.provider, descriptor, payments.TaskStatusActive, "") + paymentState, err := s.store.FindTaskAndUpdateStatus(context.Background(), s.provider, descriptor, + payments.TaskStatusActive, "") if err != nil { return errors.Wrap(err, "finding task and update") } - taskId := payments.IDFromDescriptor(descriptor) + taskID := payments.IDFromDescriptor(descriptor) logger := s.logger.WithFields(map[string]interface{}{ - "task-id": taskId, + "task-id": taskID, }) task := s.resolver.Resolve(descriptor) @@ -212,7 +202,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript return ErrUnableToResolve } - //TODO: Check task using reflection + // TODO: Check task using reflection ctx, cancel := context.WithCancel(context.Background()) holder := &taskHolder[TaskDescriptor]{ @@ -226,40 +216,47 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript // TODO: Handle error panic(err) } + err = container.Provide(func() context.Context { return ctx }) if err != nil { panic(err) } + err = container.Provide(func() Scheduler[TaskDescriptor] { return s }) if err != nil { panic(err) } + err = container.Provide(func() StopChan { s.mu.Lock() defer s.mu.Unlock() holder.stopChan = make(StopChan, 1) + return holder.stopChan }) if err != nil { panic(err) } + err = container.Provide(func() sharedlogging.Logger { return s.logger }) if err != nil { panic(err) } + err = container.Provide(func() StateResolver { return StateResolverFn(func(ctx context.Context, v any) error { - if ps.State == nil || len(ps.State) == 0 { + if paymentState.State == nil || len(paymentState.State) == 0 { return nil } - return bson.Unmarshal(ps.State, v) + + return bson.Unmarshal(paymentState.State, v) }) }) if err != nil { @@ -267,14 +264,17 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript } s.tasks[payments.IDFromDescriptor(descriptor)] = holder + go func() { logger.Infof("Starting task...") defer func() { defer s.deleteTask(holder) + if e := recover(); e != nil { s.registerTaskError(ctx, holder, e) debug.PrintStack() + return } }() @@ -282,8 +282,10 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript err := container.Invoke(task) if err != nil { s.registerTaskError(ctx, holder, err) + return } + logger.Infof("Task terminated with success") err = s.store.UpdateTaskStatus(ctx, s.provider, descriptor, payments.TaskStatusTerminated, "") @@ -291,6 +293,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) startTask(descriptor TaskDescript logger.Error("Error updating task status: %s", err) } }() + return nil } @@ -298,6 +301,7 @@ func (s *DefaultTaskScheduler[TaskDescriptor]) stackTask(descriptor TaskDescript s.logger.WithFields(map[string]interface{}{ "descriptor": descriptor, }).Infof("Stacking task") + return s.store.UpdateTaskStatus( context.Background(), s.provider, descriptor, payments.TaskStatusPending, "") } diff --git a/pkg/bridge/task/scheduler_test.go b/internal/pkg/task/scheduler_test.go similarity index 68% rename from pkg/bridge/task/scheduler_test.go rename to internal/pkg/task/scheduler_test.go index 25dfc6b1d..5f0ff3732 100644 --- a/pkg/bridge/task/scheduler_test.go +++ b/internal/pkg/task/scheduler_test.go @@ -6,50 +6,68 @@ import ( "testing" "time" + "github.com/numary/payments/internal/pkg/payments" + "github.com/numary/go-libs/sharedlogging/sharedlogginglogrus" - payments "github.com/numary/payments/pkg" "github.com/pborman/uuid" "github.com/sirupsen/logrus" "github.com/stretchr/testify/require" ) -func TaskTerminatedWithStatus[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor, expectedStatus payments.TaskStatus, errString string) func() bool { +func TaskTerminatedWithStatus[TaskDescriptor payments.TaskDescriptor](store *InMemoryStore[TaskDescriptor], + provider string, descriptor TaskDescriptor, expectedStatus payments.TaskStatus, errString string, +) func() bool { return func() bool { status, err, ok := store.Result(provider, descriptor) if !ok { return false } + if err != errString { return false } + return status == expectedStatus } } -func TaskTerminated[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { +func TaskTerminated[TaskDescriptor payments.TaskDescriptor](store *InMemoryStore[TaskDescriptor], + provider string, descriptor TaskDescriptor, +) func() bool { return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusTerminated, "") } -func TaskFailed[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor, errStr string) func() bool { +func TaskFailed[TaskDescriptor payments.TaskDescriptor](store *InMemoryStore[TaskDescriptor], + provider string, descriptor TaskDescriptor, errStr string, +) func() bool { return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusFailed, errStr) } -func TaskPending[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { +func TaskPending[TaskDescriptor payments.TaskDescriptor](store *InMemoryStore[TaskDescriptor], + provider string, descriptor TaskDescriptor, +) func() bool { return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusPending, "") } -func TaskActive[TaskDescriptor payments.TaskDescriptor](store *inMemoryStore[TaskDescriptor], provider string, descriptor TaskDescriptor) func() bool { +func TaskActive[TaskDescriptor payments.TaskDescriptor](store *InMemoryStore[TaskDescriptor], + provider string, descriptor TaskDescriptor, +) func() bool { return TaskTerminatedWithStatus(store, provider, descriptor, payments.TaskStatusActive, "") } func TestTaskScheduler(t *testing.T) { + t.Parallel() + l := logrus.New() if testing.Verbose() { l.SetLevel(logrus.DebugLevel) } + logger := sharedlogginglogrus.New(l) t.Run("Nominal", func(t *testing.T) { + t.Parallel() + store := NewInMemoryStore[string]() provider := uuid.New() done := make(chan struct{}) @@ -76,14 +94,18 @@ func TestTaskScheduler(t *testing.T) { }) t.Run("Duplicate task", func(t *testing.T) { + t.Parallel() + store := NewInMemoryStore[string]() provider := uuid.New() - scheduler := NewDefaultScheduler[string](provider, logger, store, DefaultContainerFactory, ResolverFn[string](func(descriptor string) Task { - return func(ctx context.Context) error { - <-ctx.Done() - return ctx.Err() - } - }), 1) + scheduler := NewDefaultScheduler[string](provider, logger, store, DefaultContainerFactory, + ResolverFn[string](func(descriptor string) Task { + return func(ctx context.Context) error { + <-ctx.Done() + + return ctx.Err() + } + }), 1) descriptor := uuid.New() err := scheduler.Schedule(descriptor, false) @@ -95,21 +117,27 @@ func TestTaskScheduler(t *testing.T) { }) t.Run("Error", func(t *testing.T) { + t.Parallel() + provider := uuid.New() store := NewInMemoryStore[string]() - scheduler := NewDefaultScheduler[string](provider, logger, store, DefaultContainerFactory, ResolverFn[string](func(descriptor string) Task { - return func() error { - return errors.New("test") - } - }), 1) + scheduler := NewDefaultScheduler[string](provider, logger, store, DefaultContainerFactory, + ResolverFn[string](func(descriptor string) Task { + return func() error { + return errors.New("test") + } + }), 1) descriptor := uuid.New() err := scheduler.Schedule(descriptor, false) require.NoError(t, err) - require.Eventually(t, TaskFailed(store, provider, descriptor, "test"), time.Second, 100*time.Millisecond) + require.Eventually(t, TaskFailed(store, provider, descriptor, "test"), time.Second, + 100*time.Millisecond) }) t.Run("Pending", func(t *testing.T) { + t.Parallel() + provider := uuid.New() store := NewInMemoryStore[string]() descriptor1 := uuid.New() @@ -118,29 +146,30 @@ func TestTaskScheduler(t *testing.T) { task1Terminated := make(chan struct{}) task2Terminated := make(chan struct{}) - scheduler := NewDefaultScheduler[string](provider, logger, store, DefaultContainerFactory, ResolverFn[string](func(descriptor string) Task { - switch descriptor { - case descriptor1: - return func(ctx context.Context) error { - select { - case <-task1Terminated: - return nil - case <-ctx.Done(): - return ctx.Err() + scheduler := NewDefaultScheduler[string](provider, logger, store, DefaultContainerFactory, + ResolverFn[string](func(descriptor string) Task { + switch descriptor { + case descriptor1: + return func(ctx context.Context) error { + select { + case <-task1Terminated: + return nil + case <-ctx.Done(): + return ctx.Err() + } } - } - case descriptor2: - return func(ctx context.Context) error { - select { - case <-task2Terminated: - return nil - case <-ctx.Done(): - return ctx.Err() + case descriptor2: + return func(ctx context.Context) error { + select { + case <-task2Terminated: + return nil + case <-ctx.Done(): + return ctx.Err() + } } } - } - panic("unknown descriptor") - }), 1) + panic("unknown descriptor") + }), 1) require.NoError(t, scheduler.Schedule(descriptor1, false)) require.NoError(t, scheduler.Schedule(descriptor2, false)) @@ -154,19 +183,22 @@ func TestTaskScheduler(t *testing.T) { }) t.Run("Stop scheduler", func(t *testing.T) { + t.Parallel() + provider := uuid.New() store := NewInMemoryStore[string]() - scheduler := NewDefaultScheduler[string](provider, logger, store, DefaultContainerFactory, ResolverFn[string](func(descriptor string) Task { - switch descriptor { - case "main": - return func(ctx context.Context, scheduler Scheduler[string]) { - <-ctx.Done() - require.NoError(t, scheduler.Schedule("worker", false)) + scheduler := NewDefaultScheduler[string](provider, logger, store, DefaultContainerFactory, + ResolverFn[string](func(descriptor string) Task { + switch descriptor { + case "main": + return func(ctx context.Context, scheduler Scheduler[string]) { + <-ctx.Done() + require.NoError(t, scheduler.Schedule("worker", false)) + } + default: + panic("should not be called") } - default: - panic("should not be called") - } - }), 1) + }), 1) require.NoError(t, scheduler.Schedule("main", false)) require.Eventually(t, TaskActive(store, provider, "main"), time.Second, 100*time.Millisecond) diff --git a/pkg/bridge/task/state.go b/internal/pkg/task/state.go similarity index 91% rename from pkg/bridge/task/state.go rename to internal/pkg/task/state.go index 373a35b4f..f63aa7750 100644 --- a/pkg/bridge/task/state.go +++ b/internal/pkg/task/state.go @@ -2,6 +2,8 @@ package task import ( "context" + + "github.com/pkg/errors" ) type StateResolver interface { @@ -18,16 +20,19 @@ func ResolveTo[State any](ctx context.Context, resolver StateResolver, to *State if err != nil { return nil, err } + return to, nil } func MustResolveTo[State any](ctx context.Context, resolver StateResolver, to State) State { state, err := ResolveTo[State](ctx, resolver, &to) - if err == ErrNotFound { + if errors.Is(err, ErrNotFound) { return to } + if err != nil { panic(err) } + return *state } diff --git a/pkg/bridge/task/store.go b/internal/pkg/task/store.go similarity index 51% rename from pkg/bridge/task/store.go rename to internal/pkg/task/store.go index 6cbe5907d..05b2b4741 100644 --- a/pkg/bridge/task/store.go +++ b/internal/pkg/task/store.go @@ -6,39 +6,45 @@ import ( "strings" "time" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/internal/pkg/payments" + "github.com/pkg/errors" "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) -var ( - ErrNotFound = errors.New("not found") -) +var ErrNotFound = errors.New("not found") type Store[TaskDescriptor payments.TaskDescriptor] interface { - UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, err string) error + UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, + status payments.TaskStatus, err string) error FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, err string) (*payments.TaskState[TaskDescriptor], error) - ListTaskStatesByStatus(ctx context.Context, provider string, status payments.TaskStatus) ([]payments.TaskState[TaskDescriptor], error) + ListTaskStatesByStatus(ctx context.Context, provider string, + status payments.TaskStatus) ([]payments.TaskState[TaskDescriptor], error) ListTaskStates(ctx context.Context, provider string) ([]payments.TaskState[TaskDescriptor], error) ReadOldestPendingTask(ctx context.Context, provider string) (*payments.TaskState[TaskDescriptor], error) - ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) + ReadTaskState(ctx context.Context, provider string, + descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) } -type inMemoryStore[TaskDescriptor payments.TaskDescriptor] struct { +type InMemoryStore[TaskDescriptor payments.TaskDescriptor] struct { statuses map[string]payments.TaskStatus created map[string]time.Time errors map[string]string } -func (s *inMemoryStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { +func (s *InMemoryStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, + descriptor TaskDescriptor, +) (*payments.TaskState[TaskDescriptor], error) { id := payments.IDFromDescriptor(descriptor) + status, ok := s.statuses[id] if !ok { return nil, ErrNotFound } + return &payments.TaskState[TaskDescriptor]{ Provider: provider, Descriptor: descriptor, @@ -49,14 +55,18 @@ func (s *inMemoryStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provi }, nil } -func (s *inMemoryStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provider string) ([]payments.TaskState[TaskDescriptor], error) { +func (s *InMemoryStore[TaskDescriptor]) ListTaskStates(ctx context.Context, + provider string, +) ([]payments.TaskState[TaskDescriptor], error) { ret := make([]payments.TaskState[TaskDescriptor], 0) + for id, status := range s.statuses { if !strings.HasPrefix(id, fmt.Sprintf("%s/", provider)) { continue } var descriptor TaskDescriptor + payments.DescriptorFromID(id, &descriptor) ret = append(ret, payments.TaskState[TaskDescriptor]{ @@ -68,30 +78,37 @@ func (s *inMemoryStore[TaskDescriptor]) ListTaskStates(ctx context.Context, prov CreatedAt: s.created[id], }) } + return ret, nil } -func (s *inMemoryStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, provider string) (*payments.TaskState[TaskDescriptor], error) { +func (s *InMemoryStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, + provider string, +) (*payments.TaskState[TaskDescriptor], error) { var ( oldestDate time.Time - oldestId string + oldestID string ) + for id, status := range s.statuses { if status != payments.TaskStatusPending { continue } + if oldestDate.IsZero() || s.created[id].Before(oldestDate) { oldestDate = s.created[id] - oldestId = id + oldestID = id } } + if oldestDate.IsZero() { return nil, ErrNotFound } - descriptorStr := strings.Split(oldestId, "/")[1] + descriptorStr := strings.Split(oldestID, "/")[1] var descriptor TaskDescriptor + payments.DescriptorFromID(descriptorStr, &descriptor) return &payments.TaskState[TaskDescriptor]{ @@ -99,29 +116,34 @@ func (s *inMemoryStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Contex Descriptor: descriptor, Status: payments.TaskStatusPending, State: nil, - CreatedAt: s.created[oldestId], + CreatedAt: s.created[oldestID], }, nil } -func (s *inMemoryStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, taskStatus payments.TaskStatus) ([]payments.TaskState[TaskDescriptor], error) { - +func (s *InMemoryStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, + provider string, taskStatus payments.TaskStatus, +) ([]payments.TaskState[TaskDescriptor], error) { all, err := s.ListTaskStates(ctx, provider) if err != nil { return nil, err } ret := make([]payments.TaskState[TaskDescriptor], 0) + for _, v := range all { if v.Status != taskStatus { continue } + ret = append(ret, v) } return ret, nil } -func (s *inMemoryStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, taskErr string) (*payments.TaskState[TaskDescriptor], error) { +func (s *InMemoryStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, + provider string, descriptor TaskDescriptor, status payments.TaskStatus, taskErr string, +) (*payments.TaskState[TaskDescriptor], error) { err := s.UpdateTaskStatus(ctx, provider, descriptor, status, taskErr) if err != nil { return nil, err @@ -131,89 +153,107 @@ func (s *inMemoryStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Cont Provider: provider, Descriptor: descriptor, Status: status, - //CreatedAt: s.created[fmt.Sprintf("%s/%s", provider, name)], + // CreatedAt: s.created[fmt.Sprintf("%s/%s", provider, name)], Error: taskErr, State: nil, }, nil } -func (s *inMemoryStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, err string) error { - taskId := payments.IDFromDescriptor(descriptor) - key := fmt.Sprintf("%s/%s", provider, taskId) +func (s *InMemoryStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, + descriptor TaskDescriptor, status payments.TaskStatus, err string, +) error { + taskID := payments.IDFromDescriptor(descriptor) + key := fmt.Sprintf("%s/%s", provider, taskID) s.statuses[key] = status + s.errors[key] = err if _, ok := s.created[key]; !ok { s.created[key] = time.Now() } + return nil } -func (s *inMemoryStore[TaskDescriptor]) Result(provider string, descriptor payments.TaskDescriptor) (payments.TaskStatus, string, bool) { - taskId := payments.IDFromDescriptor(descriptor) - key := fmt.Sprintf("%s/%s", provider, taskId) +func (s *InMemoryStore[TaskDescriptor]) Result(provider string, + descriptor payments.TaskDescriptor, +) (payments.TaskStatus, string, bool) { + taskID := payments.IDFromDescriptor(descriptor) + key := fmt.Sprintf("%s/%s", provider, taskID) + status, ok := s.statuses[key] if !ok { return "", "", false } + return status, s.errors[key], true } -func NewInMemoryStore[TaskDescriptor payments.TaskDescriptor]() *inMemoryStore[TaskDescriptor] { - return &inMemoryStore[TaskDescriptor]{ +func NewInMemoryStore[TaskDescriptor payments.TaskDescriptor]() *InMemoryStore[TaskDescriptor] { + return &InMemoryStore[TaskDescriptor]{ statuses: make(map[string]payments.TaskStatus), errors: make(map[string]string), created: make(map[string]time.Time), } } -var _ Store[struct{}] = &inMemoryStore[struct{}]{} +var _ Store[struct{}] = &InMemoryStore[struct{}]{} -type mongoDBStore[TaskDescriptor payments.TaskDescriptor] struct { +type MongoDBStore[TaskDescriptor payments.TaskDescriptor] struct { db *mongo.Database } -func (m *mongoDBStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, descriptor TaskDescriptor) (*payments.TaskState[TaskDescriptor], error) { - ret := m.db.Collection(payments.TasksCollection).FindOne(ctx, map[string]any{ +func (s *MongoDBStore[TaskDescriptor]) ReadTaskState(ctx context.Context, provider string, + descriptor TaskDescriptor, +) (*payments.TaskState[TaskDescriptor], error) { + ret := s.db.Collection(payments.TasksCollection).FindOne(ctx, map[string]any{ "provider": provider, "descriptor": descriptor, }) if ret.Err() != nil { - if ret.Err() == mongo.ErrNoDocuments { + if errors.Is(ret.Err(), mongo.ErrNoDocuments) { return nil, ErrNotFound } + return nil, ret.Err() } - ts := payments.TaskState[TaskDescriptor]{} - err := ret.Decode(&ts) - if err != nil { + + paymentState := payments.TaskState[TaskDescriptor]{} + + if err := ret.Decode(&paymentState); err != nil { return nil, err } - return &ts, nil + return &paymentState, nil } -func (m *mongoDBStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, provider string) (*payments.TaskState[TaskDescriptor], error) { - ret := m.db.Collection(payments.TasksCollection).FindOne(ctx, map[string]any{ +func (s *MongoDBStore[TaskDescriptor]) ReadOldestPendingTask(ctx context.Context, + provider string, +) (*payments.TaskState[TaskDescriptor], error) { + ret := s.db.Collection(payments.TasksCollection).FindOne(ctx, map[string]any{ "provider": provider, "status": payments.TaskStatusPending, }, options.FindOne().SetSort(bson.M{"createdAt": 1})) if ret.Err() != nil { - if ret.Err() == mongo.ErrNoDocuments { + if errors.Is(ret.Err(), mongo.ErrNoDocuments) { return nil, ErrNotFound } + return nil, ret.Err() } - ps := &payments.TaskState[TaskDescriptor]{} - err := ret.Decode(ps) - if err != nil { + + paymentState := &payments.TaskState[TaskDescriptor]{} + + if err := ret.Decode(paymentState); err != nil { return nil, err } - return ps, nil + return paymentState, nil } -func (m *mongoDBStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, taskErr string) error { - _, err := m.db.Collection(payments.TasksCollection).UpdateOne(ctx, map[string]any{ +func (s *MongoDBStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, provider string, + descriptor TaskDescriptor, status payments.TaskStatus, taskErr string, +) error { + _, err := s.db.Collection(payments.TasksCollection).UpdateOne(ctx, map[string]any{ "provider": provider, "descriptor": descriptor, }, map[string]any{ @@ -225,11 +265,14 @@ func (m *mongoDBStore[TaskDescriptor]) UpdateTaskStatus(ctx context.Context, pro "createdAt": time.Now(), }, }, options.Update().SetUpsert(true)) + return err } -func (m *mongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, descriptor TaskDescriptor, status payments.TaskStatus, taskErr string) (*payments.TaskState[TaskDescriptor], error) { - ret := m.db.Collection(payments.TasksCollection).FindOneAndUpdate(ctx, map[string]any{ +func (s *MongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Context, provider string, + descriptor TaskDescriptor, status payments.TaskStatus, taskErr string, +) (*payments.TaskState[TaskDescriptor], error) { + ret := s.db.Collection(payments.TasksCollection).FindOneAndUpdate(ctx, map[string]any{ "provider": provider, "descriptor": descriptor, }, map[string]any{ @@ -241,58 +284,61 @@ func (m *mongoDBStore[TaskDescriptor]) FindTaskAndUpdateStatus(ctx context.Conte "createdAt": time.Now(), }, }, options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After)) + if ret.Err() != nil { return nil, errors.Wrap(ret.Err(), "retrieving task") } - ps := &payments.TaskState[TaskDescriptor]{} - err := ret.Decode(ps) - if err != nil { + + paymentState := &payments.TaskState[TaskDescriptor]{} + + if err := ret.Decode(paymentState); err != nil { return nil, errors.Wrap(err, "decoding task state") } - return ps, nil + + return paymentState, nil } -func (m *mongoDBStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, status payments.TaskStatus) ([]payments.TaskState[TaskDescriptor], error) { - cursor, err := m.db.Collection(payments.TasksCollection).Find(ctx, map[string]any{ +func (s *MongoDBStore[TaskDescriptor]) ListTaskStatesByStatus(ctx context.Context, provider string, + status payments.TaskStatus, +) ([]payments.TaskState[TaskDescriptor], error) { + cursor, err := s.db.Collection(payments.TasksCollection).Find(ctx, map[string]any{ "provider": provider, "status": status, }) if err != nil { return nil, err } - if err != nil { - return nil, err - } + ret := make([]payments.TaskState[TaskDescriptor], 0) - err = cursor.All(ctx, &ret) - if err != nil { + + if err = cursor.All(ctx, &ret); err != nil { return nil, err } return ret, nil } -func (m *mongoDBStore[TaskDescriptor]) ListTaskStates(ctx context.Context, provider string) ([]payments.TaskState[TaskDescriptor], error) { - cursor, err := m.db.Collection(payments.TasksCollection).Find(ctx, map[string]any{ +func (s *MongoDBStore[TaskDescriptor]) ListTaskStates(ctx context.Context, + provider string, +) ([]payments.TaskState[TaskDescriptor], error) { + cursor, err := s.db.Collection(payments.TasksCollection).Find(ctx, map[string]any{ "provider": provider, }) if err != nil { return nil, err } - if err != nil { - return nil, err - } + ret := make([]payments.TaskState[TaskDescriptor], 0) - err = cursor.All(ctx, &ret) - if err != nil { + + if err = cursor.All(ctx, &ret); err != nil { return nil, err } return ret, nil } -var _ Store[struct{}] = &mongoDBStore[struct{}]{} +var _ Store[struct{}] = &MongoDBStore[struct{}]{} -func NewMongoDBStore[TaskDescriptor payments.TaskDescriptor](db *mongo.Database) *mongoDBStore[TaskDescriptor] { - return &mongoDBStore[TaskDescriptor]{db: db} +func NewMongoDBStore[TaskDescriptor payments.TaskDescriptor](db *mongo.Database) *MongoDBStore[TaskDescriptor] { + return &MongoDBStore[TaskDescriptor]{db: db} } diff --git a/pkg/bridge/task/task.go b/internal/pkg/task/task.go similarity index 100% rename from pkg/bridge/task/task.go rename to internal/pkg/task/task.go diff --git a/pkg/bridge/writeonly/item.go b/internal/pkg/writeonly/item.go similarity index 76% rename from pkg/bridge/writeonly/item.go rename to internal/pkg/writeonly/item.go index 8180d5618..a74c69652 100644 --- a/pkg/bridge/writeonly/item.go +++ b/internal/pkg/writeonly/item.go @@ -2,6 +2,6 @@ package writeonly type Item struct { Provider string `bson:"provider"` - TaskId string `bson:"taskId"` + TaskID string `bson:"taskID"` Data any `bson:"data"` } diff --git a/pkg/bridge/writeonly/storage.go b/internal/pkg/writeonly/storage.go similarity index 87% rename from pkg/bridge/writeonly/storage.go rename to internal/pkg/writeonly/storage.go index 05e4d413d..0511164bc 100644 --- a/pkg/bridge/writeonly/storage.go +++ b/internal/pkg/writeonly/storage.go @@ -4,7 +4,7 @@ import ( "context" "github.com/iancoleman/strcase" - payments "github.com/numary/payments/pkg" + "github.com/numary/payments/internal/pkg/payments" "go.mongodb.org/mongo-driver/mongo" ) @@ -17,15 +17,12 @@ func (fn StorageFn) Write(ctx context.Context, items ...any) error { return fn(ctx, items...) } -var NoOpStorage = StorageFn(func(ctx context.Context, items ...any) error { - return nil -}) - func Write[T any](ctx context.Context, storage Storage, items ...T) error { m := make([]any, 0) for _, item := range items { m = append(m, item) } + return storage.Write(ctx, m...) } @@ -40,11 +37,13 @@ func (m *MongoDBStorage) Write(ctx context.Context, items ...any) error { for _, i := range items { toSave = append(toSave, Item{ Provider: m.provider, - TaskId: payments.IDFromDescriptor(m.taskDescriptor), + TaskID: payments.IDFromDescriptor(m.taskDescriptor), Data: i, }) } + collectionName := strcase.ToCamel(m.provider) + "Storage" + _, err := m.db.Collection(collectionName).InsertMany(ctx, toSave) if err != nil { return err diff --git a/pkg/api/http_test.go b/pkg/api/http_test.go deleted file mode 100644 index 38c9353c3..000000000 --- a/pkg/api/http_test.go +++ /dev/null @@ -1,102 +0,0 @@ -package api_test - -// -//import ( -// "bytes" -// "context" -// "encoding/json" -// "github.com/gorilla/mux" -// "github.com/numary/go-libs/sharedapi" -// payment "github.com/numary/payments/pkg" -// http2 "github.com/numary/payments/pkg/http" -// "github.com/numary/payments/pkg/ingester" -// testing2 "github.com/numary/payments/pkg/testing" -// "github.com/stretchr/testify/assert" -// "go.mongodb.org/mongo-driver/bson" -// "go.mongodb.org/mongo-driver/mongo/integration/mtest" -// "go.mongodb.org/mongo-driver/mongo/options" -// "net/http" -// "net/http/httptest" -// "net/url" -// "testing" -// "time" -//) -// -//func runApiWithMock(t *testing.T, fn func(t *mtest.T, mux *mux.Router)) { -// testing2.RunWithMock(t, func(t *mtest.T) { -// fn(t, http2.NewMux(ingester.NewDefaultService(t.DB), false)) -// }) -//} -// -//func TestHttpServerCreatePayment(t *testing.T) { -// runApiWithMock(t, func(t *mtest.T, m *mux.Router) { -// rec := httptest.NewRecorder() -// req := httptest.NewRequest(http.MethodPut, "/", bytes.NewBufferString(`{}`)) -// -// m.ServeHTTP(rec, req) -// -// assert.Equal(t, http.StatusNoContent, rec.Result().StatusCode) -// }) -//} -// -//func TestHttpServerUpdatePayment(t *testing.T) { -// runApiWithMock(t, func(t *mtest.T, m *mux.Router) { -// _, err := t.DB.Collection(ingester.Collection).InsertOne(context.Background(), map[string]interface{}{ -// "id": "1", -// "date": time.Now(), -// }) -// assert.NoError(t, err) -// -// rec := httptest.NewRecorder() -// req := httptest.NewRequest(http.MethodPut, "/", bytes.NewBufferString(`{"id": "1", "scheme": "visa", "date": "`+time.Now().Add(time.Minute).Format(time.RFC3339)+`"}`)) -// -// m.ServeHTTP(rec, req) -// -// assert.Equal(t, http.StatusNoContent, rec.Result().StatusCode) -// -// ret := t.DB.Collection(ingester.Collection).FindOne(context.Background(), map[string]interface{}{ -// "id": "1", -// }, options.FindOne().SetSort(bson.M{"date": -1})) -// assert.NoError(t, ret.Err()) -// -// p := payment.Payment{} -// assert.NoError(t, ret.Decode(&p)) -// assert.Equal(t, "visa", p.Scheme) -// }) -//} -// -//func TestHttpServerListPayments(t *testing.T) { -// runApiWithMock(t, func(t *mtest.T, m *mux.Router) { -// _, err := t.DB.Collection("Payment").InsertMany(context.Background(), []interface{}{ -// map[string]interface{}{ -// "id": "1", -// }, -// map[string]interface{}{ -// "id": "2", -// }, -// map[string]interface{}{ -// "id": "3", -// }, -// }) -// assert.NoError(t, err) -// -// rec := httptest.NewRecorder() -// req := httptest.NewRequest(http.MethodGet, "/", nil) -// values := url.Values{} -// values.Set("limit", "2") -// values.Set("sort", "id:desc") -// req.URL.RawQuery = values.Encode() -// -// m.ServeHTTP(rec, req) -// -// assert.Equal(t, http.StatusOK, rec.Result().StatusCode) -// -// type Response struct { -// sharedapi.BaseResponse -// Data []payment.Payment `json:"data"` -// } -// ret := &Response{} -// assert.NoError(t, json.NewDecoder(rec.Body).Decode(&ret)) -// assert.Len(t, ret.Data, 2) -// }) -//} diff --git a/pkg/api/scopes.go b/pkg/api/scopes.go deleted file mode 100644 index ed4936769..000000000 --- a/pkg/api/scopes.go +++ /dev/null @@ -1,11 +0,0 @@ -package api - -const ( - ScopeReadPayments = "payments:read" - ScopeWritePayments = "payments:write" -) - -var AllScopes = []string{ - ScopeReadPayments, - ScopeWritePayments, -} diff --git a/pkg/bridge/cdi/module.go b/pkg/bridge/cdi/module.go deleted file mode 100644 index 369821910..000000000 --- a/pkg/bridge/cdi/module.go +++ /dev/null @@ -1,70 +0,0 @@ -package cdi - -import ( - "context" - "net/http" - - "github.com/numary/go-libs/sharedlogging" - "github.com/numary/go-libs/sharedpublish" - payments "github.com/numary/payments/pkg" - bridgeHttp "github.com/numary/payments/pkg/bridge/http" - "github.com/numary/payments/pkg/bridge/ingestion" - "github.com/numary/payments/pkg/bridge/integration" - "github.com/numary/payments/pkg/bridge/task" - "github.com/numary/payments/pkg/bridge/writeonly" - "go.mongodb.org/mongo-driver/mongo" - "go.uber.org/dig" - "go.uber.org/fx" -) - -type ConnectorHandler struct { - Handler http.Handler - Name string -} - -func ConnectorModule[ - ConnectorConfig payments.ConnectorConfigObject, - TaskDescriptor payments.TaskDescriptor, -](useScopes bool, loader integration.Loader[ConnectorConfig, TaskDescriptor]) fx.Option { - return fx.Options( - fx.Provide(func(db *mongo.Database, publisher sharedpublish.Publisher) *integration.ConnectorManager[ConnectorConfig, TaskDescriptor] { - connectorStore := integration.NewMongoDBConnectorStore(db) - taskStore := task.NewMongoDBStore[TaskDescriptor](db) - logger := sharedlogging.GetLogger(context.Background()) - schedulerFactory := integration.TaskSchedulerFactoryFn[TaskDescriptor](func(resolver task.Resolver[TaskDescriptor], maxTasks int) *task.DefaultTaskScheduler[TaskDescriptor] { - return task.NewDefaultScheduler[TaskDescriptor](loader.Name(), logger, taskStore, task.ContainerFactoryFn(func(ctx context.Context, descriptor payments.TaskDescriptor) (*dig.Container, error) { - container := dig.New() - if err := container.Provide(func() ingestion.Ingester { - return ingestion.NewDefaultIngester(loader.Name(), descriptor, db, logger.WithFields(map[string]interface{}{ - "task-id": payments.IDFromDescriptor(descriptor), - }), publisher) - }); err != nil { - return nil, err - } - err := container.Provide(func() writeonly.Storage { - return writeonly.NewMongoDBStorage(db, loader.Name(), descriptor) - }) - if err != nil { - panic(err) - } - return container, nil - }), resolver, maxTasks) - }) - return integration.NewConnectorManager[ConnectorConfig, TaskDescriptor](logger, connectorStore, loader, schedulerFactory) - }), - fx.Provide(fx.Annotate(func(cm *integration.ConnectorManager[ConnectorConfig, TaskDescriptor]) ConnectorHandler { - return ConnectorHandler{ - Handler: bridgeHttp.ConnectorRouter(loader.Name(), useScopes, cm), - Name: loader.Name(), - } - }, fx.ResultTags(`group:"connectorHandlers"`))), - fx.Invoke(func(lc fx.Lifecycle, cm *integration.ConnectorManager[ConnectorConfig, TaskDescriptor]) { - lc.Append(fx.Hook{ - OnStart: func(ctx context.Context) error { - _ = cm.Restore(ctx) - return nil - }, - }) - }), - ) -} diff --git a/pkg/bridge/connectors/modulr/loader.go b/pkg/bridge/connectors/modulr/loader.go deleted file mode 100644 index 2950ed345..000000000 --- a/pkg/bridge/connectors/modulr/loader.go +++ /dev/null @@ -1,26 +0,0 @@ -package modulr - -import ( - "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/integration" - "github.com/numary/payments/pkg/bridge/task" -) - -const connectorName = "modulr" - -// NewLoader creates a new loader. -func NewLoader() integration.Loader[Config, TaskDescriptor] { - loader := integration.NewLoaderBuilder[Config, TaskDescriptor](connectorName). - WithLoad(func(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { - return integration.NewConnectorBuilder[TaskDescriptor](). - WithInstall(func(ctx task.ConnectorContext[TaskDescriptor]) error { - return ctx.Scheduler().Schedule( - TaskDescriptor{Name: taskNameFetchAccounts}, false) - }). - WithResolve(resolveTasks(logger, config)). - Build() - }). - Build() - - return loader -} diff --git a/pkg/bridge/connectors/stripe/loader.go b/pkg/bridge/connectors/stripe/loader.go deleted file mode 100644 index e5845dcd7..000000000 --- a/pkg/bridge/connectors/stripe/loader.go +++ /dev/null @@ -1,38 +0,0 @@ -package stripe - -import ( - "time" - - "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/integration" -) - -type loader struct{} - -func (l *loader) AllowTasks() int { - return 50 -} - -func (l *loader) Name() string { - return connectorName -} - -func (l *loader) Load(logger sharedlogging.Logger, config Config) integration.Connector[TaskDescriptor] { - return NewConnector(logger, config) -} - -func (l *loader) ApplyDefaults(cfg Config) Config { - if cfg.PageSize == 0 { - cfg.PageSize = 10 - } - if cfg.PollingPeriod == 0 { - cfg.PollingPeriod = 2 * time.Minute - } - return cfg -} - -var _ integration.Loader[Config, TaskDescriptor] = &loader{} - -func NewLoader() *loader { - return &loader{} -} diff --git a/pkg/bridge/connectors/stripe/translate.go b/pkg/bridge/connectors/stripe/translate.go deleted file mode 100644 index 6a994abb6..000000000 --- a/pkg/bridge/connectors/stripe/translate.go +++ /dev/null @@ -1,303 +0,0 @@ -package stripe - -import ( - "fmt" - "runtime/debug" - "strings" - "time" - - "github.com/davecgh/go-spew/spew" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/ingestion" - "github.com/stripe/stripe-go/v72" -) - -type currency struct { - decimals int -} - -var currencies = map[string]currency{ - "ARS": {2}, // Argentine Peso - "AMD": {2}, // Armenian Dram - "AWG": {2}, // Aruban Guilder - "AUD": {2}, // Australian Dollar - "BSD": {2}, // Bahamian Dollar - "BHD": {3}, // Bahraini Dinar - "BDT": {2}, // Bangladesh, Taka - "BZD": {2}, // Belize Dollar - "BMD": {2}, // Bermudian Dollar - "BOB": {2}, // Bolivia, Boliviano - "BAM": {2}, // Bosnia and Herzegovina, Convertible Marks - "BWP": {2}, // Botswana, Pula - "BRL": {2}, // Brazilian Real - "BND": {2}, // Brunei Dollar - "CAD": {2}, // Canadian Dollar - "KYD": {2}, // Cayman Islands Dollar - "CLP": {0}, // Chilean Peso - "CNY": {2}, // China Yuan Renminbi - "COP": {2}, // Colombian Peso - "CRC": {2}, // Costa Rican Colon - "HRK": {2}, // Croatian Kuna - "CUC": {2}, // Cuban Convertible Peso - "CUP": {2}, // Cuban Peso - "CYP": {2}, // Cyprus Pound - "CZK": {2}, // Czech Koruna - "DKK": {2}, // Danish Krone - "DOP": {2}, // Dominican Peso - "XCD": {2}, // East Caribbean Dollar - "EGP": {2}, // Egyptian Pound - "SVC": {2}, // El Salvador Colon - "ATS": {2}, // Euro - "BEF": {2}, // Euro - "DEM": {2}, // Euro - "EEK": {2}, // Euro - "ESP": {2}, // Euro - "EUR": {2}, // Euro - "FIM": {2}, // Euro - "FRF": {2}, // Euro - "GRD": {2}, // Euro - "IEP": {2}, // Euro - "ITL": {2}, // Euro - "LUF": {2}, // Euro - "NLG": {2}, // Euro - "PTE": {2}, // Euro - "GHC": {2}, // Ghana, Cedi - "GIP": {2}, // Gibraltar Pound - "GTQ": {2}, // Guatemala, Quetzal - "HNL": {2}, // Honduras, Lempira - "HKD": {2}, // Hong Kong Dollar - "HUF": {0}, // Hungary, Forint - "ISK": {0}, // Iceland Krona - "INR": {2}, // Indian Rupee - "IDR": {2}, // Indonesia, Rupiah - "IRR": {2}, // Iranian Rial - "JMD": {2}, // Jamaican Dollar - "JPY": {0}, // Japan, Yen - "JOD": {3}, // Jordanian Dinar - "KES": {2}, // Kenyan Shilling - "KWD": {3}, // Kuwaiti Dinar - "LVL": {2}, // Latvian Lats - "LBP": {0}, // Lebanese Pound - "LTL": {2}, // Lithuanian Litas - "MKD": {2}, // Macedonia, Denar - "MYR": {2}, // Malaysian Ringgit - "MTL": {2}, // Maltese Lira - "MUR": {0}, // Mauritius Rupee - "MXN": {2}, // Mexican Peso - "MZM": {2}, // Mozambique Metical - "NPR": {2}, // Nepalese Rupee - "ANG": {2}, // Netherlands Antillian Guilder - "ILS": {2}, // New Israeli Shekel - "TRY": {2}, // New Turkish Lira - "NZD": {2}, // New Zealand Dollar - "NOK": {2}, // Norwegian Krone - "PKR": {2}, // Pakistan Rupee - "PEN": {2}, // Peru, Nuevo Sol - "UYU": {2}, // Peso Uruguayo - "PHP": {2}, // Philippine Peso - "PLN": {2}, // Poland, Zloty - "GBP": {2}, // Pound Sterling - "OMR": {3}, // Rial Omani - "RON": {2}, // Romania, New Leu - "ROL": {2}, // Romania, Old Leu - "RUB": {2}, // Russian Ruble - "SAR": {2}, // Saudi Riyal - "SGD": {2}, // Singapore Dollar - "SKK": {2}, // Slovak Koruna - "SIT": {2}, // Slovenia, Tolar - "ZAR": {2}, // South Africa, Rand - "KRW": {0}, // South Korea, Won - "SZL": {2}, // Swaziland, Lilangeni - "SEK": {2}, // Swedish Krona - "CHF": {2}, // Swiss Franc - "TZS": {2}, // Tanzanian Shilling - "THB": {2}, // Thailand, Baht - "TOP": {2}, // Tonga, Paanga - "AED": {2}, // UAE Dirham - "UAH": {2}, // Ukraine, Hryvnia - "USD": {2}, // US Dollar - "VUV": {0}, // Vanuatu, Vatu - "VEF": {2}, // Venezuela Bolivares Fuertes - "VEB": {2}, // Venezuela, Bolivar - "VND": {0}, // Viet Nam, Dong - "ZWD": {2}, // Zimbabwe Dollar -} - -func CreateBatchElement(bt *stripe.BalanceTransaction, forward bool) (ingestion.BatchElement, bool) { - var ( - reference payments.Referenced - paymentData *payments.Data - adjustment *payments.Adjustment - ) - defer func() { - // DEBUG - if e := recover(); e != nil { - fmt.Println("Error translating transaction") - debug.PrintStack() - spew.Dump(bt) - panic(e) - } - }() - - if bt.Source == nil { - return ingestion.BatchElement{}, false - } - - formatAsset := func(cur stripe.Currency) string { - asset := strings.ToUpper(string(cur)) - def, ok := currencies[asset] - if !ok { - return asset - } - if def.decimals == 0 { - return asset - } - return fmt.Sprintf("%s/%d", asset, def.decimals) - } - - convertPayoutStatus := func() (status payments.Status) { - switch bt.Source.Payout.Status { - case stripe.PayoutStatusCanceled: - status = payments.StatusCancelled - case stripe.PayoutStatusFailed: - status = payments.StatusFailed - case stripe.PayoutStatusInTransit, stripe.PayoutStatusPending: - status = payments.StatusPending - case stripe.PayoutStatusPaid: - status = payments.StatusSucceeded - } - return - } - - switch bt.Type { - case "charge": - reference = payments.Referenced{ - Reference: bt.Source.Charge.ID, - Type: payments.TypePayIn, - } - paymentData = &payments.Data{ - Status: payments.StatusSucceeded, - InitialAmount: bt.Source.Charge.Amount, - Asset: formatAsset(bt.Source.Charge.Currency), - Raw: bt, - Scheme: payments.Scheme(bt.Source.Charge.PaymentMethodDetails.Card.Brand), - CreatedAt: time.Unix(bt.Created, 0), - } - case "payout": - reference = payments.Referenced{ - Reference: bt.Source.Payout.ID, - Type: payments.TypePayout, - } - paymentData = &payments.Data{ - Status: convertPayoutStatus(), - InitialAmount: bt.Source.Payout.Amount, - Raw: bt, - Asset: formatAsset(bt.Source.Payout.Currency), - Scheme: func() payments.Scheme { - switch bt.Source.Payout.Type { - case "bank_account": - return payments.SchemeSepaCredit - case "card": - return payments.Scheme(bt.Source.Payout.Card.Brand) - } - return payments.SchemeUnknown - }(), - CreatedAt: time.Unix(bt.Created, 0), - } - - case "transfer": - reference = payments.Referenced{ - Reference: bt.Source.Transfer.ID, - Type: payments.TypePayout, - } - paymentData = &payments.Data{ - Status: payments.StatusSucceeded, - InitialAmount: bt.Source.Transfer.Amount, - Raw: bt, - Asset: formatAsset(bt.Source.Transfer.Currency), - Scheme: payments.SchemeOther, - CreatedAt: time.Unix(bt.Created, 0), - } - case "refund": - reference = payments.Referenced{ - Reference: bt.Source.Refund.Charge.ID, - Type: payments.TypePayIn, - } - adjustment = &payments.Adjustment{ - Status: payments.StatusSucceeded, - Amount: bt.Amount, - Date: time.Unix(bt.Created, 0), - Raw: bt, - } - case "payment": - reference = payments.Referenced{ - Reference: bt.Source.Charge.ID, - Type: payments.TypePayIn, - } - paymentData = &payments.Data{ - Status: payments.StatusSucceeded, - InitialAmount: bt.Source.Charge.Amount, - Raw: bt, - Asset: formatAsset(bt.Source.Charge.Currency), - Scheme: payments.SchemeOther, - CreatedAt: time.Unix(bt.Created, 0), - } - case "payout_cancel": - reference = payments.Referenced{ - Reference: bt.Source.Payout.ID, - Type: payments.TypePayout, - } - adjustment = &payments.Adjustment{ - Status: convertPayoutStatus(), - Amount: 0, - Date: time.Unix(bt.Created, 0), - Raw: bt, - Absolute: true, - } - case "payout_failure": - reference = payments.Referenced{ - Reference: bt.Source.Payout.ID, - Type: payments.TypePayIn, - } - adjustment = &payments.Adjustment{ - Status: convertPayoutStatus(), - Amount: 0, - Date: time.Unix(bt.Created, 0), - Raw: bt, - Absolute: true, - } - case "payment_refund": - reference = payments.Referenced{ - Reference: bt.Source.Refund.Charge.ID, - Type: payments.TypePayIn, - } - adjustment = &payments.Adjustment{ - Status: payments.StatusSucceeded, - Amount: bt.Amount, - Date: time.Unix(bt.Created, 0), - Raw: bt, - } - case "adjustment": - reference = payments.Referenced{ - Reference: bt.Source.Dispute.Charge.ID, - Type: payments.TypePayIn, - } - adjustment = &payments.Adjustment{ - Status: payments.StatusCancelled, - Amount: bt.Amount, - Date: time.Unix(bt.Created, 0), - Raw: bt, - } - case "stripe_fee", "network_cost": - return ingestion.BatchElement{}, true - default: - return ingestion.BatchElement{}, false - } - - return ingestion.BatchElement{ - Referenced: reference, - Payment: paymentData, - Adjustment: adjustment, - Forward: forward, - }, true -} diff --git a/pkg/bridge/connectors/wise/loader.go b/pkg/bridge/connectors/wise/loader.go deleted file mode 100644 index 1d44f011d..000000000 --- a/pkg/bridge/connectors/wise/loader.go +++ /dev/null @@ -1,25 +0,0 @@ -package wise - -import ( - "github.com/numary/go-libs/sharedlogging" - "github.com/numary/payments/pkg/bridge/integration" - "github.com/numary/payments/pkg/bridge/task" -) - -// NewLoader creates a new loader. -func NewLoader() integration.Loader[Config, TaskDefinition] { - loader := integration.NewLoaderBuilder[Config, TaskDefinition]("wise"). - WithLoad(func(logger sharedlogging.Logger, config Config) integration.Connector[TaskDefinition] { - return integration.NewConnectorBuilder[TaskDefinition](). - WithInstall(func(ctx task.ConnectorContext[TaskDefinition]) error { - return ctx.Scheduler(). - Schedule( - TaskDefinition{Name: taskNameFetchProfiles}, - false) - }). - WithResolve(resolveTasks(logger, config)). - Build() - }).Build() - - return loader -} diff --git a/pkg/bridge/http/api.go b/pkg/bridge/http/api.go deleted file mode 100644 index 28136dfc5..000000000 --- a/pkg/bridge/http/api.go +++ /dev/null @@ -1,173 +0,0 @@ -package http - -import ( - "context" - "encoding/json" - "errors" - "net/http" - - "github.com/gorilla/mux" - "github.com/numary/go-libs/sharedapi" - "github.com/numary/go-libs/sharedlogging" - payments "github.com/numary/payments/pkg" - "github.com/numary/payments/pkg/bridge/integration" - . "github.com/numary/payments/pkg/http" -) - -func handleError(w http.ResponseWriter, r *http.Request, err error) { - w.WriteHeader(http.StatusInternalServerError) - sharedlogging.GetLogger(r.Context()).Error(err) - // TODO: Opentracing - err = json.NewEncoder(w).Encode(sharedapi.ErrorResponse{ - ErrorCode: "INTERNAL", - ErrorMessage: err.Error(), - }) - if err != nil { - panic(err) - } -} - -func ReadConfig[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - - config, err := cm.ReadConfig(r.Context()) - if err != nil { - handleError(w, r, err) - return - } - - err = json.NewEncoder(w).Encode(config) - if err != nil { - panic(err) - } - } -} - -func ListTasks[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - - tasks, err := cm.ListTasksStates(r.Context()) - if err != nil { - handleError(w, r, err) - return - } - - err = json.NewEncoder(w).Encode(tasks) - if err != nil { - panic(err) - } - } -} - -func ReadTask[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - - var descriptor Descriptor - payments.DescriptorFromID(mux.Vars(r)["taskId"], &descriptor) - - tasks, err := cm.ReadTaskState(r.Context(), descriptor) - if err != nil { - handleError(w, r, err) - return - } - - err = json.NewEncoder(w).Encode(tasks) - if err != nil { - panic(err) - } - } -} - -func Uninstall[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - err := cm.Uninstall(r.Context()) - if err != nil { - handleError(w, r, err) - return - } - - w.WriteHeader(http.StatusNoContent) - } -} - -func Install[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - - installed, err := cm.IsInstalled(context.Background()) - if err != nil { - handleError(w, r, err) - return - } - if installed { - handleError(w, r, integration.ErrAlreadyInstalled) - return - } - - var config Config - if r.ContentLength > 0 { - err := json.NewDecoder(r.Body).Decode(&config) - if err != nil { - handleError(w, r, err) - return - } - } - - err = cm.Install(r.Context(), config) - if err != nil { - handleError(w, r, err) - return - } - - w.WriteHeader(http.StatusNoContent) - } -} - -func Reset[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor](cm *integration.ConnectorManager[Config, Descriptor]) http.HandlerFunc { - return func(w http.ResponseWriter, r *http.Request) { - - installed, err := cm.IsInstalled(context.Background()) - if err != nil { - handleError(w, r, err) - return - } - if !installed { - handleError(w, r, errors.New("connector not installed")) - return - } - - err = cm.Reset(r.Context()) - if err != nil { - handleError(w, r, err) - return - } - - w.WriteHeader(http.StatusNoContent) - } -} - -func ConnectorRouter[Config payments.ConnectorConfigObject, Descriptor payments.TaskDescriptor]( - name string, - useScopes bool, - manager *integration.ConnectorManager[Config, Descriptor], -) *mux.Router { - r := mux.NewRouter() - r.Path("/" + name).Methods(http.MethodPost).Handler( - WrapHandler(useScopes, Install(manager), ScopeWriteConnectors), - ) - r.Path("/" + name + "/reset").Methods(http.MethodPost).Handler( - WrapHandler(useScopes, Reset(manager), ScopeWriteConnectors), - ) - r.Path("/" + name).Methods(http.MethodDelete).Handler( - WrapHandler(useScopes, Uninstall(manager), ScopeWriteConnectors), - ) - r.Path("/" + name + "/config").Methods(http.MethodGet).Handler( - WrapHandler(useScopes, ReadConfig(manager), ScopeReadConnectors, ScopeWriteConnectors), - ) - r.Path("/" + name + "/tasks").Methods(http.MethodGet).Handler( - WrapHandler(useScopes, ListTasks(manager), ScopeReadConnectors, ScopeWriteConnectors), - ) - r.Path("/" + name + "/tasks/{taskId}").Methods(http.MethodGet).Handler( - WrapHandler(useScopes, ReadTask(manager), ScopeReadConnectors, ScopeWriteConnectors), - ) - return r -} diff --git a/pkg/bridge/http/scopes.go b/pkg/bridge/http/scopes.go deleted file mode 100644 index 1fa3ef0cc..000000000 --- a/pkg/bridge/http/scopes.go +++ /dev/null @@ -1,11 +0,0 @@ -package http - -const ( - ScopeReadConnectors = "connectors:read" - ScopeWriteConnectors = "connectors:write" -) - -var AllScopes = []string{ - ScopeReadConnectors, - ScopeWriteConnectors, -} diff --git a/pkg/bridge/task/context.go b/pkg/bridge/task/context.go deleted file mode 100644 index 93a3211a6..000000000 --- a/pkg/bridge/task/context.go +++ /dev/null @@ -1,31 +0,0 @@ -package task - -import ( - "context" - - payments "github.com/numary/payments/pkg" -) - -type ConnectorContext[TaskDescriptor payments.TaskDescriptor] interface { - Context() context.Context - Scheduler() Scheduler[TaskDescriptor] -} - -type connectorContext[TaskDescriptor payments.TaskDescriptor] struct { - ctx context.Context - scheduler Scheduler[TaskDescriptor] -} - -func (ctx *connectorContext[TaskDescriptor]) Context() context.Context { - return ctx.ctx -} -func (ctx *connectorContext[TaskDescriptor]) Scheduler() Scheduler[TaskDescriptor] { - return ctx.scheduler -} - -func NewConnectorContext[TaskDescriptor payments.TaskDescriptor](ctx context.Context, scheduler Scheduler[TaskDescriptor]) *connectorContext[TaskDescriptor] { - return &connectorContext[TaskDescriptor]{ - ctx: ctx, - scheduler: scheduler, - } -} diff --git a/pkg/collections.go b/pkg/collections.go deleted file mode 100644 index d634b3e23..000000000 --- a/pkg/collections.go +++ /dev/null @@ -1,5 +0,0 @@ -package payments - -const Collection = "Payments" -const ConnectorsCollection = "Connectors" -const TasksCollection = "Tasks" diff --git a/pkg/http/auth.go b/pkg/http/auth.go deleted file mode 100644 index 4e7c8e234..000000000 --- a/pkg/http/auth.go +++ /dev/null @@ -1,14 +0,0 @@ -package http - -import ( - "net/http" - - "github.com/numary/go-libs/sharedauth" -) - -func WrapHandler(useScopes bool, h http.Handler, scopes ...string) http.Handler { - if !useScopes { - return h - } - return sharedauth.NeedOneOfScopes(scopes...)(h) -} diff --git a/pkg/http/queryparam.go b/pkg/http/queryparam.go deleted file mode 100644 index d71b0ede8..000000000 --- a/pkg/http/queryparam.go +++ /dev/null @@ -1,38 +0,0 @@ -package http - -import ( - "net/http" - "strconv" - "strings" -) - -func Bool(r *http.Request, key string) (bool, bool) { - vv := r.URL.Query().Get(key) - if vv == "" { - return false, false - } - vv = strings.ToUpper(vv) - return vv == "YES" || vv == "TRUE" || vv == "1", true -} - -func Integer(r *http.Request, key string) (int64, bool, error) { - if value := r.URL.Query().Get(key); value != "" { - ret, err := strconv.ParseInt(value, 10, 64) - if err != nil { - return 0, false, err - } - return ret, true, nil - } - return 0, false, nil -} - -func IntegerWithDefault(r *http.Request, key string, def int64) (int64, error) { - value, ok, err := Integer(r, key) - if err != nil { - return 0, err - } - if ok { - return value, nil - } - return def, nil -}