diff --git a/.golangci.yml b/.golangci.yml index fb419181b..b64b8e2eb 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -15,9 +15,9 @@ linters-settings: - performance - style cyclop: - max-complexity: 20 + max-complexity: 30 gocyclo: - min-complexity: 20 + min-complexity: 30 goimports: local-prefixes: github.com/numary/payments govet: diff --git a/Makefile b/Makefile index 53c6879a0..0f1162921 100644 --- a/Makefile +++ b/Makefile @@ -7,3 +7,6 @@ lint: lint-fix: golangci-lint run --fix +run-tests: + go test -race -count=1 ./... + diff --git a/internal/pkg/connectors/dummypay/config.go b/internal/pkg/connectors/dummypay/config.go index 22c1d63b6..257f921a2 100644 --- a/internal/pkg/connectors/dummypay/config.go +++ b/internal/pkg/connectors/dummypay/config.go @@ -1,6 +1,7 @@ package dummypay import ( + "encoding/json" "fmt" "time" ) @@ -11,16 +12,16 @@ type Config struct { Directory string `json:"directory" yaml:"directory" bson:"directory"` // FilePollingPeriod is the period between file polling. - FilePollingPeriod time.Duration `json:"filePollingPeriod" yaml:"filePollingPeriod" bson:"filePollingPeriod"` + FilePollingPeriod Duration `json:"filePollingPeriod" yaml:"filePollingPeriod" bson:"filePollingPeriod"` // FileGenerationPeriod is the period between file generation - FileGenerationPeriod time.Duration `json:"fileGenerationPeriod" yaml:"fileGenerationPeriod" bson:"fileGenerationPeriod"` + FileGenerationPeriod Duration `json:"fileGenerationPeriod" yaml:"fileGenerationPeriod" bson:"fileGenerationPeriod"` } // String returns a string representation of the configuration. func (cfg Config) String() string { return fmt.Sprintf("directory: %s, filePollingPeriod: %s, fileGenerationPeriod: %s", - cfg.Directory, cfg.FilePollingPeriod, cfg.FileGenerationPeriod) + cfg.Directory, cfg.FilePollingPeriod.String(), cfg.FileGenerationPeriod.String()) } // Validate validates the configuration. @@ -44,3 +45,43 @@ func (cfg Config) Validate() error { return nil } + +type Duration time.Duration + +func (d *Duration) String() string { + return time.Duration(*d).String() +} + +func (d *Duration) Duration() time.Duration { + return time.Duration(*d) +} + +func (d *Duration) MarshalJSON() ([]byte, error) { + return json.Marshal(time.Duration(*d).String()) +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var durationValue interface{} + + if err := json.Unmarshal(b, &durationValue); err != nil { + return err + } + + switch value := durationValue.(type) { + case float64: + *d = Duration(time.Duration(value)) + + return nil + case string: + tmp, err := time.ParseDuration(value) + if err != nil { + return err + } + + *d = Duration(tmp) + + return nil + default: + return ErrDurationInvalid + } +} diff --git a/internal/pkg/connectors/dummypay/config_test.go b/internal/pkg/connectors/dummypay/config_test.go index 8b459000b..3feed8b6c 100644 --- a/internal/pkg/connectors/dummypay/config_test.go +++ b/internal/pkg/connectors/dummypay/config_test.go @@ -14,8 +14,8 @@ func TestConfigString(t *testing.T) { config := Config{ Directory: "test", - FilePollingPeriod: time.Second, - FileGenerationPeriod: time.Minute, + FilePollingPeriod: Duration(time.Second), + FileGenerationPeriod: Duration(time.Minute), } assert.Equal(t, "directory: test, filePollingPeriod: 1s, fileGenerationPeriod: 1m0s", config.String()) diff --git a/internal/pkg/connectors/dummypay/errors.go b/internal/pkg/connectors/dummypay/errors.go index a656377ef..40c127aa5 100644 --- a/internal/pkg/connectors/dummypay/errors.go +++ b/internal/pkg/connectors/dummypay/errors.go @@ -14,4 +14,7 @@ var ( // ErrMissingTask is returned when the task is missing. ErrMissingTask = errors.New("task is not implemented") + + // ErrDurationInvalid is returned when the duration is invalid. + ErrDurationInvalid = errors.New("duration is invalid") ) diff --git a/internal/pkg/connectors/dummypay/loader.go b/internal/pkg/connectors/dummypay/loader.go index 41f9885bb..6ddb68e0a 100644 --- a/internal/pkg/connectors/dummypay/loader.go +++ b/internal/pkg/connectors/dummypay/loader.go @@ -31,11 +31,11 @@ const ( // ApplyDefaults applies default values to the configuration. func (l *Loader) ApplyDefaults(cfg Config) Config { if cfg.FileGenerationPeriod == 0 { - cfg.FileGenerationPeriod = defaultFileGenerationPeriod + cfg.FileGenerationPeriod = Duration(defaultFileGenerationPeriod) } if cfg.FilePollingPeriod == 0 { - cfg.FilePollingPeriod = defaultFilePollingPeriod + cfg.FilePollingPeriod = Duration(defaultFilePollingPeriod) } return cfg diff --git a/internal/pkg/connectors/dummypay/loader_test.go b/internal/pkg/connectors/dummypay/loader_test.go index fdd7b8384..c7e9ad238 100644 --- a/internal/pkg/connectors/dummypay/loader_test.go +++ b/internal/pkg/connectors/dummypay/loader_test.go @@ -21,8 +21,8 @@ func TestLoader(t *testing.T) { assert.Equal(t, connectorName, loader.Name()) assert.Equal(t, 10, loader.AllowTasks()) assert.Equal(t, Config{ - FilePollingPeriod: 10 * time.Second, - FileGenerationPeriod: 5 * time.Second, + FilePollingPeriod: Duration(10 * time.Second), + FileGenerationPeriod: Duration(5 * time.Second), }, loader.ApplyDefaults(config)) assert.EqualValues(t, newConnector(logger, config, newFS()), loader.Load(logger, config)) diff --git a/internal/pkg/connectors/dummypay/task_generate_file.go b/internal/pkg/connectors/dummypay/task_generate_file.go index edb463c5b..540587cc7 100644 --- a/internal/pkg/connectors/dummypay/task_generate_file.go +++ b/internal/pkg/connectors/dummypay/task_generate_file.go @@ -31,7 +31,7 @@ func taskGenerateFiles(config Config, fs fs) task.Task { select { case <-ctx.Done(): return nil - case <-time.After(config.FileGenerationPeriod): + case <-time.After(config.FileGenerationPeriod.Duration()): err := generateFile(config, fs) if err != nil { return err diff --git a/internal/pkg/connectors/dummypay/task_read_files.go b/internal/pkg/connectors/dummypay/task_read_files.go index 1d6c6ec5d..a5f21518d 100644 --- a/internal/pkg/connectors/dummypay/task_read_files.go +++ b/internal/pkg/connectors/dummypay/task_read_files.go @@ -31,7 +31,7 @@ func taskReadFiles(config Config, fs fs) task.Task { select { case <-ctx.Done(): return nil - case <-time.After(config.FilePollingPeriod): + case <-time.After(config.FilePollingPeriod.Duration()): files, err := parseFilesToIngest(config, fs) if err != nil { return fmt.Errorf("error parsing files to ingest: %w", err) diff --git a/internal/pkg/ingestion/ingester.go b/internal/pkg/ingestion/ingester.go index ccb05f77e..1c1c7b2aa 100644 --- a/internal/pkg/ingestion/ingester.go +++ b/internal/pkg/ingestion/ingester.go @@ -20,6 +20,7 @@ type BatchElement struct { Referenced payments.Referenced Payment *payments.Data Adjustment *payments.Adjustment + Metadata payments.Metadata Forward bool } @@ -60,11 +61,38 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym var update bson.M - if elem.Adjustment != nil && elem.Payment != nil { + if elem.Adjustment == nil && elem.Payment == nil { return nil, errors.New("either adjustment or payment must be provided") } - var err error + var metadataChanges payments.MetadataChanges + + if elem.Payment != nil { + ret := i.db.Collection(payments.Collection).FindOne( + ctx, + payments.Identifier{ + Referenced: elem.Referenced, + Provider: i.provider, + }) + if ret.Err() != nil && !errors.Is(ret.Err(), mongo.ErrNoDocuments) { + logger.Errorf("Error retrieving payment: %s", ret.Err()) + + return nil, fmt.Errorf("error retrieving payment: %w", ret.Err()) + } + + if ret != nil && ret.Err() == nil { + payment := payments.Payment{} + + err := ret.Decode(&payment) + if err != nil { + return nil, err + } + + metadataChanges = payment.MergeMetadata(elem.Metadata) + + elem.Metadata = metadataChanges.After + } + } switch { case elem.Forward && elem.Adjustment != nil: @@ -78,7 +106,7 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym "$set": bson.M{ "status": elem.Adjustment.Status, "raw": elem.Adjustment.Raw, - "data": elem.Adjustment.Date, + "date": elem.Adjustment.Date, }, } case elem.Forward && elem.Payment != nil: @@ -97,6 +125,7 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym Raw: elem.Payment.Raw, }, }, + Metadata: elem.Metadata, }, } case !elem.Forward && elem.Adjustment != nil: @@ -166,6 +195,17 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym return nil, err } + if metadataChanges.HasChanged() { + logger.WithFields(map[string]interface{}{ + "metadata": payment.Metadata, + }).Debugf("Metadata changed") + + _, err = i.db.Collection(payments.MetadataChangelogCollection).InsertOne(ctx, metadataChanges) + if err != nil { + return nil, err + } + } + allPayments = append(allPayments, payment) } diff --git a/internal/pkg/ingestion/ingester_test.go b/internal/pkg/ingestion/ingester_test.go index 54aa95c98..9c8b94527 100644 --- a/internal/pkg/ingestion/ingester_test.go +++ b/internal/pkg/ingestion/ingester_test.go @@ -26,6 +26,14 @@ func TestIngester(t *testing.T) { ingester := NewDefaultIngester(provider, uuid.New(), mt.DB, sharedlogging.NewNoOpLogger(), nil) mt.AddMockResponses( + mtest.CreateCursorResponse(1, "test.test", mtest.FirstBatch, bson.D{ + {Key: "ok", Value: 1}, + {Key: "value", Value: bson.D{}}, + }), + bson.D{ + {Key: "ok", Value: 1}, + {Key: "value", Value: bson.D{}}, + }, // Find payment update bson.D{ {Key: "ok", Value: 1}, {Key: "value", Value: bson.D{}}, @@ -51,6 +59,7 @@ func TestIngester(t *testing.T) { }, State{ Counter: 1, }) + require.NoError(t, err) }) } diff --git a/internal/pkg/payments/collections.go b/internal/pkg/payments/collections.go index 665adb0f5..041128e3f 100644 --- a/internal/pkg/payments/collections.go +++ b/internal/pkg/payments/collections.go @@ -1,7 +1,8 @@ package payments const ( - Collection = "Payments" - ConnectorsCollection = "Connectors" - TasksCollection = "Tasks" + Collection = "Payments" + ConnectorsCollection = "Connectors" + TasksCollection = "Tasks" + MetadataChangelogCollection = "MetadataChangelog" ) diff --git a/internal/pkg/payments/metadata.go b/internal/pkg/payments/metadata.go new file mode 100644 index 000000000..ce0c216a5 --- /dev/null +++ b/internal/pkg/payments/metadata.go @@ -0,0 +1,66 @@ +package payments + +import ( + "fmt" + "time" +) + +type Metadata map[string]any + +type MetadataChanges struct { + PaymentID string `json:"paymentID" bson:"paymentID"` + OccurredAt time.Time `json:"occurredAt" bson:"occurredAt"` + Before Metadata `json:"before" bson:"before"` + After Metadata `json:"after" bson:"after"` +} + +func (mc MetadataChanges) HasChanged() bool { + if mc.Before == nil { + return false + } + + return !mc.Before.Equal(mc.After) +} + +func (p *Payment) MergeMetadata(metadata Metadata) MetadataChanges { + changes := MetadataChanges{ + PaymentID: p.Identifier.String(), + OccurredAt: time.Now(), + Before: copyMap(p.Metadata), + } + + if p.Metadata == nil { + p.Metadata = make(Metadata) + } + + for key, value := range metadata { + p.Metadata[key] = value + } + + changes.After = p.Metadata + + return changes +} + +func (m Metadata) Equal(comparableMetadata Metadata) bool { + if len(m) != len(comparableMetadata) { + return false + } + + for key, value := range m { + if v, ok := comparableMetadata[key]; !ok || fmt.Sprint(v) != fmt.Sprint(value) { + return false + } + } + + return true +} + +func copyMap[K string, V any](m map[K]V) map[K]V { + result := make(map[K]V) + for k, v := range m { + result[k] = v + } + + return result +} diff --git a/internal/pkg/payments/payment.go b/internal/pkg/payments/payment.go index a2be2c180..9ff067cc5 100644 --- a/internal/pkg/payments/payment.go +++ b/internal/pkg/payments/payment.go @@ -105,6 +105,7 @@ type Payment struct { Identifier `bson:",inline"` Data `bson:",inline"` Adjustments []Adjustment `json:"adjustments" bson:"adjustments"` + Metadata Metadata `json:"metadata" bson:"metadata"` } func (p Payment) HasInitialValue() bool {