From 731dc93c3f066bd4714fb8fceb37cc080b90bf19 Mon Sep 17 00:00:00 2001 From: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com> Date: Wed, 26 Oct 2022 14:26:20 +0300 Subject: [PATCH 1/3] feat: payments metadata Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com> --- internal/pkg/connectors/dummypay/config.go | 41 +++++++++++- internal/pkg/connectors/dummypay/loader.go | 4 +- .../connectors/dummypay/task_generate_file.go | 2 +- .../connectors/dummypay/task_read_files.go | 2 +- internal/pkg/ingestion/ingester.go | 52 +++++++++++++-- internal/pkg/payments/collections.go | 7 +- internal/pkg/payments/metadata.go | 66 +++++++++++++++++++ internal/pkg/payments/payment.go | 1 + 8 files changed, 161 insertions(+), 14 deletions(-) create mode 100644 internal/pkg/payments/metadata.go diff --git a/internal/pkg/connectors/dummypay/config.go b/internal/pkg/connectors/dummypay/config.go index 22c1d63b6..81cce6af6 100644 --- a/internal/pkg/connectors/dummypay/config.go +++ b/internal/pkg/connectors/dummypay/config.go @@ -1,7 +1,9 @@ package dummypay import ( + "encoding/json" "fmt" + "github.com/pkg/errors" "time" ) @@ -11,10 +13,10 @@ type Config struct { Directory string `json:"directory" yaml:"directory" bson:"directory"` // FilePollingPeriod is the period between file polling. - FilePollingPeriod time.Duration `json:"filePollingPeriod" yaml:"filePollingPeriod" bson:"filePollingPeriod"` + FilePollingPeriod Duration `json:"filePollingPeriod" yaml:"filePollingPeriod" bson:"filePollingPeriod"` // FileGenerationPeriod is the period between file generation - FileGenerationPeriod time.Duration `json:"fileGenerationPeriod" yaml:"fileGenerationPeriod" bson:"fileGenerationPeriod"` + FileGenerationPeriod Duration `json:"fileGenerationPeriod" yaml:"fileGenerationPeriod" bson:"fileGenerationPeriod"` } // String returns a string representation of the configuration. @@ -44,3 +46,38 @@ func (cfg Config) Validate() error { return nil } + +type Duration time.Duration + +func (d *Duration) Duration() time.Duration { + return time.Duration(*d) +} + +func (d *Duration) MarshalJSON() ([]byte, error) { + return json.Marshal(time.Duration(*d).String()) +} + +func (d *Duration) UnmarshalJSON(b []byte) error { + var v interface{} + + if err := json.Unmarshal(b, &v); err != nil { + return err + } + + switch value := v.(type) { + case float64: + *d = Duration(time.Duration(value)) + return nil + case string: + tmp, err := time.ParseDuration(value) + if err != nil { + return err + } + + *d = Duration(tmp) + + return nil + default: + return errors.New("invalid duration") + } +} diff --git a/internal/pkg/connectors/dummypay/loader.go b/internal/pkg/connectors/dummypay/loader.go index 41f9885bb..6ddb68e0a 100644 --- a/internal/pkg/connectors/dummypay/loader.go +++ b/internal/pkg/connectors/dummypay/loader.go @@ -31,11 +31,11 @@ const ( // ApplyDefaults applies default values to the configuration. func (l *Loader) ApplyDefaults(cfg Config) Config { if cfg.FileGenerationPeriod == 0 { - cfg.FileGenerationPeriod = defaultFileGenerationPeriod + cfg.FileGenerationPeriod = Duration(defaultFileGenerationPeriod) } if cfg.FilePollingPeriod == 0 { - cfg.FilePollingPeriod = defaultFilePollingPeriod + cfg.FilePollingPeriod = Duration(defaultFilePollingPeriod) } return cfg diff --git a/internal/pkg/connectors/dummypay/task_generate_file.go b/internal/pkg/connectors/dummypay/task_generate_file.go index edb463c5b..540587cc7 100644 --- a/internal/pkg/connectors/dummypay/task_generate_file.go +++ b/internal/pkg/connectors/dummypay/task_generate_file.go @@ -31,7 +31,7 @@ func taskGenerateFiles(config Config, fs fs) task.Task { select { case <-ctx.Done(): return nil - case <-time.After(config.FileGenerationPeriod): + case <-time.After(config.FileGenerationPeriod.Duration()): err := generateFile(config, fs) if err != nil { return err diff --git a/internal/pkg/connectors/dummypay/task_read_files.go b/internal/pkg/connectors/dummypay/task_read_files.go index 1d6c6ec5d..a5f21518d 100644 --- a/internal/pkg/connectors/dummypay/task_read_files.go +++ b/internal/pkg/connectors/dummypay/task_read_files.go @@ -31,7 +31,7 @@ func taskReadFiles(config Config, fs fs) task.Task { select { case <-ctx.Done(): return nil - case <-time.After(config.FilePollingPeriod): + case <-time.After(config.FilePollingPeriod.Duration()): files, err := parseFilesToIngest(config, fs) if err != nil { return fmt.Errorf("error parsing files to ingest: %w", err) diff --git a/internal/pkg/ingestion/ingester.go b/internal/pkg/ingestion/ingester.go index ccb05f77e..0dba2fbd4 100644 --- a/internal/pkg/ingestion/ingester.go +++ b/internal/pkg/ingestion/ingester.go @@ -20,6 +20,7 @@ type BatchElement struct { Referenced payments.Referenced Payment *payments.Data Adjustment *payments.Adjustment + Metadata payments.Metadata Forward bool } @@ -60,10 +61,39 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym var update bson.M - if elem.Adjustment != nil && elem.Payment != nil { + if elem.Adjustment == nil && elem.Payment == nil { return nil, errors.New("either adjustment or payment must be provided") } + var metadataChanges payments.MetadataChanges + + if elem.Payment != nil { + ret := i.db.Collection(payments.Collection).FindOne( + ctx, + payments.Identifier{ + Referenced: elem.Referenced, + Provider: i.provider, + }) + if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { + logger.Errorf("Error retrieving payment: %s", ret.Err()) + + return nil, fmt.Errorf("error retrieving payment: %w", ret.Err()) + } + + if ret.Err() == nil { + payment := payments.Payment{} + + err := ret.Decode(&payment) + if err != nil { + return nil, err + } + + metadataChanges = payment.MergeMetadata(elem.Metadata) + + elem.Metadata = metadataChanges.After + } + } + var err error switch { @@ -78,7 +108,7 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym "$set": bson.M{ "status": elem.Adjustment.Status, "raw": elem.Adjustment.Raw, - "data": elem.Adjustment.Date, + "date": elem.Adjustment.Date, }, } case elem.Forward && elem.Payment != nil: @@ -97,6 +127,7 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym Raw: elem.Payment.Raw, }, }, + Metadata: elem.Metadata, }, } case !elem.Forward && elem.Adjustment != nil: @@ -153,10 +184,10 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym update, options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After), ) - if ret.Err() != nil { - logger.Errorf("Error updating payment: %s", ret.Err()) + if err != nil { + logger.Errorf("Error updating payment: %s", err) - return nil, fmt.Errorf("error updating payment: %w", ret.Err()) + return nil, fmt.Errorf("error updating payment: %w", err) } payment := payments.Payment{} @@ -166,6 +197,17 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym return nil, err } + if metadataChanges.HasChanged() { + logger.WithFields(map[string]interface{}{ + "metadata": payment.Metadata, + }).Debugf("Metadata changed") + + _, err = i.db.Collection(payments.MetadataChangelogCollection).InsertOne(ctx, metadataChanges) + if err != nil { + return nil, err + } + } + allPayments = append(allPayments, payment) } diff --git a/internal/pkg/payments/collections.go b/internal/pkg/payments/collections.go index 665adb0f5..041128e3f 100644 --- a/internal/pkg/payments/collections.go +++ b/internal/pkg/payments/collections.go @@ -1,7 +1,8 @@ package payments const ( - Collection = "Payments" - ConnectorsCollection = "Connectors" - TasksCollection = "Tasks" + Collection = "Payments" + ConnectorsCollection = "Connectors" + TasksCollection = "Tasks" + MetadataChangelogCollection = "MetadataChangelog" ) diff --git a/internal/pkg/payments/metadata.go b/internal/pkg/payments/metadata.go new file mode 100644 index 000000000..5708a1a5d --- /dev/null +++ b/internal/pkg/payments/metadata.go @@ -0,0 +1,66 @@ +package payments + +import ( + "fmt" + "time" +) + +type Metadata map[string]any + +type MetadataChanges struct { + PaymentID string `json:"paymentID" bson:"paymentID"` + OccurredAt time.Time `json:"occurredAt" bson:"occurredAt"` + Before Metadata `json:"before" bson:"before"` + After Metadata `json:"after" bson:"after"` +} + +func (mc MetadataChanges) HasChanged() bool { + if mc.Before == nil { + return false + } + + return !mc.Before.Equal(mc.After) +} + +func (p *Payment) MergeMetadata(metadata Metadata) MetadataChanges { + changes := MetadataChanges{ + PaymentID: p.Identifier.String(), + OccurredAt: time.Now(), + Before: copyMap(p.Metadata), + } + + if p.Metadata == nil { + p.Metadata = make(Metadata) + } + + for key, value := range metadata { + p.Metadata[key] = value + } + + changes.After = p.Metadata + + return changes +} + +func (m Metadata) Equal(comparable Metadata) bool { + if len(m) != len(comparable) { + return false + } + + for key, value := range m { + if v, ok := comparable[key]; !ok || fmt.Sprint(v) != fmt.Sprint(value) { + return false + } + } + + return true +} + +func copyMap[K string, V any](m map[K]V) map[K]V { + result := make(map[K]V) + for k, v := range m { + result[k] = v + } + + return result +} diff --git a/internal/pkg/payments/payment.go b/internal/pkg/payments/payment.go index a2be2c180..9ff067cc5 100644 --- a/internal/pkg/payments/payment.go +++ b/internal/pkg/payments/payment.go @@ -105,6 +105,7 @@ type Payment struct { Identifier `bson:",inline"` Data `bson:",inline"` Adjustments []Adjustment `json:"adjustments" bson:"adjustments"` + Metadata Metadata `json:"metadata" bson:"metadata"` } func (p Payment) HasInitialValue() bool { From 0a6ee3a4dbf19620ca51acbce2618d812909f433 Mon Sep 17 00:00:00 2001 From: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com> Date: Wed, 26 Oct 2022 14:33:30 +0300 Subject: [PATCH 2/3] fix: lint & tests Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com> --- .golangci.yml | 4 ++-- internal/pkg/connectors/dummypay/config.go | 16 ++++++++++------ internal/pkg/connectors/dummypay/config_test.go | 4 ++-- internal/pkg/connectors/dummypay/errors.go | 3 +++ internal/pkg/connectors/dummypay/loader_test.go | 4 ++-- internal/pkg/ingestion/ingester.go | 8 ++++---- internal/pkg/payments/metadata.go | 6 +++--- 7 files changed, 26 insertions(+), 19 deletions(-) diff --git a/.golangci.yml b/.golangci.yml index fb419181b..b64b8e2eb 100644 --- a/.golangci.yml +++ b/.golangci.yml @@ -15,9 +15,9 @@ linters-settings: - performance - style cyclop: - max-complexity: 20 + max-complexity: 30 gocyclo: - min-complexity: 20 + min-complexity: 30 goimports: local-prefixes: github.com/numary/payments govet: diff --git a/internal/pkg/connectors/dummypay/config.go b/internal/pkg/connectors/dummypay/config.go index 81cce6af6..257f921a2 100644 --- a/internal/pkg/connectors/dummypay/config.go +++ b/internal/pkg/connectors/dummypay/config.go @@ -3,7 +3,6 @@ package dummypay import ( "encoding/json" "fmt" - "github.com/pkg/errors" "time" ) @@ -22,7 +21,7 @@ type Config struct { // String returns a string representation of the configuration. func (cfg Config) String() string { return fmt.Sprintf("directory: %s, filePollingPeriod: %s, fileGenerationPeriod: %s", - cfg.Directory, cfg.FilePollingPeriod, cfg.FileGenerationPeriod) + cfg.Directory, cfg.FilePollingPeriod.String(), cfg.FileGenerationPeriod.String()) } // Validate validates the configuration. @@ -49,6 +48,10 @@ func (cfg Config) Validate() error { type Duration time.Duration +func (d *Duration) String() string { + return time.Duration(*d).String() +} + func (d *Duration) Duration() time.Duration { return time.Duration(*d) } @@ -58,15 +61,16 @@ func (d *Duration) MarshalJSON() ([]byte, error) { } func (d *Duration) UnmarshalJSON(b []byte) error { - var v interface{} + var durationValue interface{} - if err := json.Unmarshal(b, &v); err != nil { + if err := json.Unmarshal(b, &durationValue); err != nil { return err } - switch value := v.(type) { + switch value := durationValue.(type) { case float64: *d = Duration(time.Duration(value)) + return nil case string: tmp, err := time.ParseDuration(value) @@ -78,6 +82,6 @@ func (d *Duration) UnmarshalJSON(b []byte) error { return nil default: - return errors.New("invalid duration") + return ErrDurationInvalid } } diff --git a/internal/pkg/connectors/dummypay/config_test.go b/internal/pkg/connectors/dummypay/config_test.go index 8b459000b..3feed8b6c 100644 --- a/internal/pkg/connectors/dummypay/config_test.go +++ b/internal/pkg/connectors/dummypay/config_test.go @@ -14,8 +14,8 @@ func TestConfigString(t *testing.T) { config := Config{ Directory: "test", - FilePollingPeriod: time.Second, - FileGenerationPeriod: time.Minute, + FilePollingPeriod: Duration(time.Second), + FileGenerationPeriod: Duration(time.Minute), } assert.Equal(t, "directory: test, filePollingPeriod: 1s, fileGenerationPeriod: 1m0s", config.String()) diff --git a/internal/pkg/connectors/dummypay/errors.go b/internal/pkg/connectors/dummypay/errors.go index a656377ef..40c127aa5 100644 --- a/internal/pkg/connectors/dummypay/errors.go +++ b/internal/pkg/connectors/dummypay/errors.go @@ -14,4 +14,7 @@ var ( // ErrMissingTask is returned when the task is missing. ErrMissingTask = errors.New("task is not implemented") + + // ErrDurationInvalid is returned when the duration is invalid. + ErrDurationInvalid = errors.New("duration is invalid") ) diff --git a/internal/pkg/connectors/dummypay/loader_test.go b/internal/pkg/connectors/dummypay/loader_test.go index fdd7b8384..c7e9ad238 100644 --- a/internal/pkg/connectors/dummypay/loader_test.go +++ b/internal/pkg/connectors/dummypay/loader_test.go @@ -21,8 +21,8 @@ func TestLoader(t *testing.T) { assert.Equal(t, connectorName, loader.Name()) assert.Equal(t, 10, loader.AllowTasks()) assert.Equal(t, Config{ - FilePollingPeriod: 10 * time.Second, - FileGenerationPeriod: 5 * time.Second, + FilePollingPeriod: Duration(10 * time.Second), + FileGenerationPeriod: Duration(5 * time.Second), }, loader.ApplyDefaults(config)) assert.EqualValues(t, newConnector(logger, config, newFS()), loader.Load(logger, config)) diff --git a/internal/pkg/ingestion/ingester.go b/internal/pkg/ingestion/ingester.go index 0dba2fbd4..ea55fddc8 100644 --- a/internal/pkg/ingestion/ingester.go +++ b/internal/pkg/ingestion/ingester.go @@ -74,7 +74,7 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym Referenced: elem.Referenced, Provider: i.provider, }) - if ret.Err() != nil && ret.Err() != mongo.ErrNoDocuments { + if ret.Err() != nil && errors.Is(ret.Err(), mongo.ErrNoDocuments) { logger.Errorf("Error retrieving payment: %s", ret.Err()) return nil, fmt.Errorf("error retrieving payment: %w", ret.Err()) @@ -184,10 +184,10 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym update, options.FindOneAndUpdate().SetUpsert(true).SetReturnDocument(options.After), ) - if err != nil { - logger.Errorf("Error updating payment: %s", err) + if ret.Err() != nil { + logger.Errorf("Error updating payment: %s", ret.Err()) - return nil, fmt.Errorf("error updating payment: %w", err) + return nil, fmt.Errorf("error updating payment: %w", ret.Err()) } payment := payments.Payment{} diff --git a/internal/pkg/payments/metadata.go b/internal/pkg/payments/metadata.go index 5708a1a5d..ce0c216a5 100644 --- a/internal/pkg/payments/metadata.go +++ b/internal/pkg/payments/metadata.go @@ -42,13 +42,13 @@ func (p *Payment) MergeMetadata(metadata Metadata) MetadataChanges { return changes } -func (m Metadata) Equal(comparable Metadata) bool { - if len(m) != len(comparable) { +func (m Metadata) Equal(comparableMetadata Metadata) bool { + if len(m) != len(comparableMetadata) { return false } for key, value := range m { - if v, ok := comparable[key]; !ok || fmt.Sprint(v) != fmt.Sprint(value) { + if v, ok := comparableMetadata[key]; !ok || fmt.Sprint(v) != fmt.Sprint(value) { return false } } From 42e78a18d355bef6883be49390b6d7f7971f99a0 Mon Sep 17 00:00:00 2001 From: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com> Date: Wed, 26 Oct 2022 15:30:17 +0300 Subject: [PATCH 3/3] fix: tests Signed-off-by: Lawrence Zawila <113581282+darkmatterpool@users.noreply.github.com> --- Makefile | 3 +++ internal/pkg/ingestion/ingester.go | 6 ++---- internal/pkg/ingestion/ingester_test.go | 9 +++++++++ 3 files changed, 14 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index 53c6879a0..0f1162921 100644 --- a/Makefile +++ b/Makefile @@ -7,3 +7,6 @@ lint: lint-fix: golangci-lint run --fix +run-tests: + go test -race -count=1 ./... + diff --git a/internal/pkg/ingestion/ingester.go b/internal/pkg/ingestion/ingester.go index ea55fddc8..1c1c7b2aa 100644 --- a/internal/pkg/ingestion/ingester.go +++ b/internal/pkg/ingestion/ingester.go @@ -74,13 +74,13 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym Referenced: elem.Referenced, Provider: i.provider, }) - if ret.Err() != nil && errors.Is(ret.Err(), mongo.ErrNoDocuments) { + if ret.Err() != nil && !errors.Is(ret.Err(), mongo.ErrNoDocuments) { logger.Errorf("Error retrieving payment: %s", ret.Err()) return nil, fmt.Errorf("error retrieving payment: %w", ret.Err()) } - if ret.Err() == nil { + if ret != nil && ret.Err() == nil { payment := payments.Payment{} err := ret.Decode(&payment) @@ -94,8 +94,6 @@ func (i *DefaultIngester) processBatch(ctx context.Context, batch Batch) ([]paym } } - var err error - switch { case elem.Forward && elem.Adjustment != nil: update = bson.M{ diff --git a/internal/pkg/ingestion/ingester_test.go b/internal/pkg/ingestion/ingester_test.go index 54aa95c98..9c8b94527 100644 --- a/internal/pkg/ingestion/ingester_test.go +++ b/internal/pkg/ingestion/ingester_test.go @@ -26,6 +26,14 @@ func TestIngester(t *testing.T) { ingester := NewDefaultIngester(provider, uuid.New(), mt.DB, sharedlogging.NewNoOpLogger(), nil) mt.AddMockResponses( + mtest.CreateCursorResponse(1, "test.test", mtest.FirstBatch, bson.D{ + {Key: "ok", Value: 1}, + {Key: "value", Value: bson.D{}}, + }), + bson.D{ + {Key: "ok", Value: 1}, + {Key: "value", Value: bson.D{}}, + }, // Find payment update bson.D{ {Key: "ok", Value: 1}, {Key: "value", Value: bson.D{}}, @@ -51,6 +59,7 @@ func TestIngester(t *testing.T) { }, State{ Counter: 1, }) + require.NoError(t, err) }) }