8000 feat: (v3) split worker from api by laouji · Pull Request #223 · formancehq/payments · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: (v3) split worker from api #223

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 18 commits into from
Dec 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension 8000

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 88 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package cmd

import (
"errors"
"fmt"

"github.com/bombsimon/logrusr/v3"
sharedapi "github.com/formancehq/go-libs/v2/api"
"github.com/formancehq/go-libs/v2/auth"
"github.com/formancehq/go-libs/v2/aws/iam"
"github.com/formancehq/go-libs/v2/bun/bunconnect"
"github.com/formancehq/go-libs/v2/health"
"github.com/formancehq/go-libs/v2/licence"
"github.com/formancehq/go-libs/v2/otlp"
"github.com/formancehq/go-libs/v2/otlp/otlpmetrics"
"github.com/formancehq/go-libs/v2/otlp/otlptraces"
"github.com/formancehq/go-libs/v2/profiling"
"github.com/formancehq/go-libs/v2/publish"
"github.com/formancehq/go-libs/v2/service"
"github.com/formancehq/go-libs/v2/temporal"
"github.com/formancehq/payments/internal/connectors/engine"
"github.com/formancehq/payments/internal/connectors/metrics"
"github.com/formancehq/payments/internal/storage"
"github.com/sirupsen/logrus"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"go.uber.org/fx"
)

func setLogger() {
// Add a dedicated logger for opentelemetry in case of error
otel.SetLogger(logrusr.New(logrus.New().WithField("component", "otlp")))
}

func commonFlags(cmd *cobra.Command) {
cmd.Flags().String(StackFlag, "", "Stack name")
cmd.Flags().String(ListenFlag, ":8080", "Listen address")
service.AddFlags(cmd.Flags())
otlpmetrics.AddFlags(cmd.Flags())
otlptraces.AddFlags(cmd.Flags())
auth.AddFlags(cmd.Flags())
publish.AddFlags(ServiceName, cmd.Flags())
bunconnect.AddFlags(cmd.Flags())
iam.AddFlags(cmd.Flags())
profiling.AddFlags(cmd.Flags())
temporal.AddFlags(cmd.Flags())
licence.AddFlags(cmd.Flags())
}

func commonOptions(cmd *cobra.Command) (fx.Option, error) {
configEncryptionKey, _ := cmd.Flags().GetString(ConfigEncryptionKeyFlag)
if configEncryptionKey == "" {
return nil, errors.New("missing config encryption key")
}

connectionOptions, err := bunconnect.ConnectionOptionsFromFlags(cmd)
if err != nil {
return nil, fmt.Errorf("failed to get connection options: %w", err)
}

return fx.Options(
fx.Provide(func() *bunconnect.ConnectionOptions {
return connectionOptions
}),
otlp.FXModuleFromFlags(cmd),
otlptraces.FXModuleFromFlags(cmd),
otlpmetrics.FXModuleFromFlags(cmd),
fx.Provide(metrics.RegisterMetricsRegistry),
fx.Invoke(func(metrics.MetricsRegistry) {}),
temporal.FXModuleFromFlags(
cmd,
engine.Tracer,
temporal.SearchAttributes{
SearchAttributes: engine.SearchAttributes,
},
),
fx.Provide(func() sharedapi.ServiceInfo {
return sharedapi.ServiceInfo{
Version: Version,
}
}),
health.Module(),
publish.FXModuleFromFlags(cmd, service.IsDebug(cmd)),
licence.FXModuleFromFlags(cmd, ServiceName),
storage.Module(cmd, *connectionOptions, configEncryptionKey),
profiling.FXModuleFromFlags(cmd),
), nil
}
101 changes: 3 additions & 98 deletions cmd/root.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,10 @@
package cmd

import (
"errors"
"log"
"os"

_ "github.com/bombsimon/logrusr/v3"
sharedapi "github.com/formancehq/go-libs/v2/api"
"github.com/formancehq/go-libs/v2/auth"
"github.com/formancehq/go-libs/v2/bun/bunconnect"
"github.com/formancehq/go-libs/v2/bun/bunmigrate"
"github.com/formancehq/go-libs/v2/health"
"github.com/formancehq/go-libs/v2/licence"
"github.com/formancehq/go-libs/v2/logging"
"github.com/formancehq/go-libs/v2/otlp"
"github.com/formancehq/go-libs/v2/otlp/otlpmetrics"
"github.com/formancehq/go-libs/v2/otlp/otlptraces"
"github.com/formancehq/go-libs/v2/profiling"
"github.com/formancehq/go-libs/v2/publish"
"github.com/formancehq/go-libs/v2/service"
"github.com/formancehq/go-libs/v2/temporal"
"github.com/formancehq/payments/internal/api"
v2 "github.com/formancehq/payments/internal/api/v2"
v3 "github.com/formancehq/payments/internal/api/v3"
"github.com/formancehq/payments/internal/connectors/engine"
"github.com/formancehq/payments/internal/connectors/metrics"
"github.com/formancehq/payments/internal/storage"
"github.com/spf13/cobra"
"go.uber.org/fx"
)

var (
Expand Down Expand Up @@ -63,16 +40,11 @@ func NewRootCommand() *cobra.Command {

server := newServer()
addAutoMigrateCommand(server)
server.Flags().String(ListenFlag, ":8080", "Listen address")
server.Flags().String(StackFlag, "", "Stack name")
server.Flags().String(stackPublicURLFlag, "", "Stack public url")
// MaxConcurrentWorkflowTaskPollers should not be set to a number < 2, otherwise
// temporal will panic.
// After meeting with the temporal team, we decided to set it to 20 as per
// their recommendation.
server.Flags().Int(temporalMaxConcurrentWorkflowTaskPollersFlag, 20, "Max concurrent workflow task pollers")
root.AddCommand(server)

worker := newWorker()
root.AddCommand(worker)

purge := newPurge()
purge.Flags().String(StackFlag, "", "Stack name")
root.AddCommand(purge)
Expand All @@ -94,70 +66,3 @@ func addAutoMigrateCommand(cmd *cobra.Command) {
return nil
}
}

func commonOptions(cmd *cobra.Command) (fx.Option, error) {
configEncryptionKey, _ := cmd.Flags().GetString(ConfigEncryptionKeyFlag)
if configEncryptionKey == "" {
return nil, errors.New("missing config encryption key")
}

connectionOptions, err := bunconnect.ConnectionOptionsFromFlags(cmd)
if err != nil {
return nil, err
}

listen, _ := cmd.Flags().GetString(ListenFlag)
stack, _ := cmd.Flags().GetString(StackFlag)
stackPublicURL, _ := cmd.Flags().GetString(stackPublicURLFlag)
debug, _ := cmd.Flags().GetBool(service.DebugFlag)
jsonFormatter, _ := cmd.Flags().GetBool(logging.JsonFormattingLoggerFlag)
temporalNamespace, _ := cmd.Flags().GetString(temporal.TemporalNamespaceFlag)
temporalMaxConcurrentWorkflowTaskPollers, _ := cmd.Flags().GetInt(temporalMaxConcurrentWorkflowTaskPollersFlag)

if len(os.Args) < 2 {
// this shouldn't happen as long as this function is called by a subcommand
log.Fatalf("os arguments does not contain command name: %s", os.Args)
}
rawFlags := os.Args[2:]

return fx.Options(
fx.Provide(func() *bunconnect.ConnectionOptions {
return connectionOptions
}),
fx.Provide(func() sharedapi.ServiceInfo {
return sharedapi.ServiceInfo{
Version: Version,
}
}),
otlp.FXModuleFromFlags(cmd),
otlptraces.FXModuleFromFlags(cmd),
otlpmetrics.FXModuleFromFlags(cmd),
fx.Provide(metrics.RegisterMetricsRegistry),
fx.Invoke(func(metrics.MetricsRegistry) {}),
temporal.FXModuleFromFlags(
cmd,
engine.Tracer,
temporal.SearchAttributes{
SearchAttributes: engine.SearchAttributes,
},
< 6D47 /td> ),
auth.FXModuleFromFlags(cmd),
health.Module(),
publish.FXModuleFromFlags(cmd, service.IsDebug(cmd)),
licence.FXModuleFromFlags(cmd, ServiceName),
storage.Module(cmd, *connectionOptions, configEncryptionKey),
api.NewModule(listen, service.IsDebug(cmd)),
profiling.FXModuleFromFlags(cmd),
engine.Module(
stack,
stackPublicURL,
temporalNamespace,
temporalMaxConcurrentWorkflowTaskPollers,
rawFlags,
debug,
jsonFormatter,
),
v2.NewModule(),
v3.NewModule(),
), nil
}
58 changes: 29 additions & 29 deletions cmd/server.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,16 @@
package cmd

import (
"github.com/bombsimon/logrusr/v3"
"fmt"

"github.com/formancehq/go-libs/v2/auth"
"github.com/formancehq/go-libs/v2/aws/iam"
"github.com/formancehq/go-libs/v2/bun/bunconnect"
"github.com/formancehq/go-libs/v2/licence"
"github.com/formancehq/go-libs/v2/otlp/otlpmetrics"
"github.com/formancehq/go-libs/v2/otlp/otlptraces"
"github.com/formancehq/go-libs/v2/profiling"
"github.com/formancehq/go-libs/v2/publish"
"github.com/formancehq/go-libs/v2/service"
"github.com/formancehq/go-libs/v2/temporal"
"github.com/sirupsen/logrus"
"github.com/formancehq/payments/internal/api"
v2 "github.com/formancehq/payments/internal/api/v2"
v3 "github.com/formancehq/payments/internal/api/v3"
"github.com/formancehq/payments/internal/connectors/engine"
"github.com/spf13/cobra"
"go.opentelemetry.io/otel"
"go.uber.org/fx"
)

func newServer() *cobra.Command {
Expand All @@ -25,35 +21,39 @@ func newServer() *cobra.Command {
SilenceUsage: true,
RunE: runServer(),
}

service.AddFlags(cmd.Flags())
otlpmetrics.AddFlags(cmd.Flags())
otlptraces.AddFlags(cmd.Flags())
auth.AddFlags(cmd.Flags())
publish.AddFlags(ServiceName, cmd.Flags())
bunconnect.AddFlags(cmd.Flags())
iam.AddFlags(cmd.Flags())
profiling.AddFlags(cmd.Flags())
temporal.AddFlags(cmd.Flags())
licence.AddFlags(cmd.Flags())

commonFlags(cmd)
return cmd
}

func runServer() func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
setLogger()

options, err := commonOptions(cmd)
opts := []fx.Option{}
commonOpts, err := commonOptions(cmd)
if err != nil {
return fmt.Errorf("failed to configure common options for server: %w", err)
}
opts = append(opts, commonOpts)

serverOpts, err := serverOptions(cmd)
if err != nil {
return err
return fmt.Errorf("failed to configure options for server: %w", err)
}
opts = append(opts, serverOpts)

return service.New(cmd.OutOrStdout(), options).Run(cmd)
return service.New(cmd.OutOrStdout(), fx.Options(opts...)).Run(cmd)
}
}

func setLogger() {
// Add a dedicated logger for opentelemetry in case of error
otel.SetLogger(logrusr.New(logrus.New().WithField("component", "otlp")))
func serverOptions(cmd *cobra.Command) (fx.Option, error) {
listen, _ := cmd.Flags().GetString(ListenFlag)
stack, _ := cmd.Flags().GetString(StackFlag)
return fx.Options(
auth.FXModuleFromFlags(cmd),
api.NewModule(listen, service.IsDebug(cmd)),
v2.NewModule(),
v3.NewModule(),
engine.Module(stack, service.IsDebug(cmd)),
), nil
}
68 changes: 68 additions & 0 deletions cmd/worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package cmd

import (
"fmt"

"github.com/formancehq/go-libs/v2/service"
"github.com/formancehq/go-libs/v2/temporal"
"github.com/formancehq/payments/internal/worker"
"github.com/spf13/cobra"
"go.uber.org/fx"
)

func newWorker() *cobra.Command {
cmd := &cobra.Command{
Use: "run-worker",
Aliases: []string{"worker"},
Short: "Launch api worker",
SilenceUsage: true,
RunE: runWorker(),
}
commonFlags(cmd)
// MaxConcurrentWorkflowTaskPollers should not be set to a number < 2, otherwise
// temporal will panic.
// After meeting with the temporal team, we decided to set it to 20 as per
// their recommendation.
cmd.Flags().Int(temporalMaxConcurrentWorkflowTaskPollersFlag, 20, "Max concurrent workflow task pollers")
cmd.Flags().String(stackPublicURLFlag, "", "Stack public url")
return cmd
}

func runWorker() func(cmd *cobra.Command, args []string) error {
return func(cmd *cobra.Command, args []string) error {
setLogger()

opts := []fx.Option{}
commonOpts, err := commonOptions(cmd)
if err != nil {
return fmt.Errorf("failed to configure common options for worker: %w", err)
}
opts = append(opts, commonOpts)

workerOpts, err := workerOptions(cmd)
if err != nil {
return fmt.Errorf("failed to configure options for worker: %w", err)
}
opts = append(opts, workerOpts)

return service.New(cmd.OutOrStdout(), fx.Options(opts...)).Run(cmd)
}
}

func workerOptions(cmd *cobra.Command) (fx.Option, error) {
listen, _ := cmd.Flags().GetString(ListenFlag)
stack, _ := cmd.Flags().GetString(StackFlag)
stackPublicURL, _ := cmd.Flags().GetString(stackPublicURLFlag)
temporalNamespace, _ := cmd.Flags().GetString(temporal.TemporalNamespaceFlag)
temporalMaxConcurrentWorkflowTaskPollers, _ := cmd.Flags().GetInt(temporalMaxConcurrentWorkflowTaskPollersFlag)
return fx.Options(
worker.NewHealthCheckModule(listen, service.IsDebug(cmd)),
worker.NewModule(
stack,
stackPublicURL,
temporalNamespace,
temporalMaxConcurrentWorkflowTaskPollers,
service.IsDebug(cmd),
),
), nil
}
Loading
Loading
0