-
Notifications
You must be signed in to change notification settings - Fork 7
feat: (v3) split worker from api #223
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Warning Rate limit exceeded@laouji has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 0 minutes and 35 seconds before requesting another review. ⌛ How to resolve this issue?After the wait time has elapsed, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout. Please see our FAQ for further information. 📒 Files selected for processing (7)
WalkthroughThis pull request introduces a comprehensive restructuring of t 8000 he payments application's architecture, focusing on separating the worker functionality into a distinct component. The changes involve modifying the Earthfile for image building, updating command-line interfaces, and refactoring the internal engine and worker management systems. The primary goal is to enable running the payments worker as a separate pod from the main API service, improving scalability and resource management. Changes
Sequence DiagramsequenceDiagram
participant API as Payments API
participant Worker as Payments Worker
participant Temporal as Temporal Service
API->>Temporal: Register Workflows
Worker->>Temporal: Poll and Execute Workflows
Worker->>API: Report Workflow Status
Assessment against linked issues
Possibly related PRs
Poem
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media? 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
a4547f4
to
92f3a98
Compare
4ad18d8
to
916ba71
Compare
@coderabbitai review |
✅ Actions performedReview triggered.
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🔭 Outside diff range comments (2)
internal/connectors/engine/workers.go (2)
Line range hint
210-217
: Stop top-level worker gracefully
“Close()” locks the worker pool and stops all workers. Confirm any in-process tasks complete or gracefully shut down. You may want to wait until all tasks have confirmed closure or have a short timeout for them to finalize.
Line range hint
225-265
: Check for concurrency in “AddWorker”
You correctly lock the map while adding a new worker. However, the worker is launched in a goroutine inside the same lock. That’s fine if it’s short, but if worker.Start() is slow to return, consider releasing the lock first or ensure it’s non-blocking.
🧹 Nitpick comments (15)
pkg/testserver/worker.go (3)
26-36
: Consider separating worker initialization from immediate startup
The constructor here invokes the worker's Start method immediately, making it impossible to instantiate the worker without starting it. While this may be convenient in tests, it could limit future scenarios where you'd want to defer starting the worker or handle startup separately.
48-70
: Channel buffering limitation
You use a buffered channel of size 1 (errorChan). If multiple errors occur in quick succession, only the first error is captured, and subsequent errors might be lost. Consider using a larger buffer or draining the channel if preserving additional errors is important.
96-101
: Potential indefinite stop
Stopping relies on service.Stopped(w.ctx) to proceed. If the worker never reaches a “stopped” state, this will block until the given ctx times out. This might be acceptable in a test environment, but be mindful of such indefinite waits in future expansions.pkg/testserver/client.go (1)
24-24
: Make timeout configurable
Hardcoding a 2-second timeout ensures some protection but may need to be tweaked for different environments or tests. Consider exposing it through the Configuration struct or an environment variable to avoid magic numbers.pkg/testserver/server.go (1)
198-200
: Implicit worker dependence
Calling NewWorker inside New() ties the server and worker lifecycle together tightly. If a more modular approach is needed later (e.g., separate test servers or mocking the worker), consider decoupling them. For now, this consolidated approach appears practical for integration tests.Also applies to: 202-205
cmd/root.go (1)
45-47
: Add a brief usage description for the worker command
Defining the worker command is good, but consider adding a short description and usage examples. This helps users understand its purpose and how to run it effectively.cmd/server.go (2)
25-25
: Check for consistent flag definitions
This call to “commonFlags(cmd)” suggests a centralized approach to configuring flags. Verify that only relevant flags are included for a server context. Consider removing any flags that aren’t needed by the server.
33-46
: Leverage typed options slices
Using “opts := []fx.Option{}” is a flexible approach. However, ensuring each appended option is typed or carefully validated helps avoid silent runtime mismatches. You might wrap each set of server, worker, or logging options in distinct typed constructs for clarity.internal/connectors/engine/module.go (1)
Line range hint
44-80
: Clarify worker-related parameter usage
The “WorkerModule” function receives multiple parameters like “stackURL” and “temporalMaxConcurrentWorkflowTaskPollers.” Ensure this method’s internal needs match the parameters. If some are not used or validated, consider removing them or adding inline documentation to clarify usage.internal/connectors/engine/workers.go (4)
43-61
: Provide more context in error logs
In “NewWorkerPool,” you just create the pool. Consider logging or tracing the configuration state (e.g.,debug
mode on/off,stack
value) for troubleshooting.
68-113
: Prevent blocking calls within ListenConnectorsChanges
When calling “OnStart,” you register a listener for connector changes. If “ListenConnectorsChanges” blocks significantly, it can prevent the rest of the startup sequence from completing. Confirm it’s either non-blocking or well-scoped for performance.
174-194
: Add a retry mechanism for deletion
“onUpdatePlugin” handles deletion scheduling by removing the worker. If an error occurs or if the workflow is partially uninstalled, consider a retry or fallback path (like a queue or re-check interval) to ensure eventual consistency.
219-221
: Assess necessity of the default worker
The “AddDefaultWorker” approach is handy, but confirm it’s appropriate for all scenarios. Some connectors might still need specialized workers, or no default if the system is fully microservice-based. Evaluate whether the default queue masks potential connector-specific tasks.cmd/worker.go (1)
31-50
: runWorker function with custom error handling.
Solid approach to building an fx-based service. Ensure that potential panics caused by misconfigurations (like invalid poller counts) are handled.cmd/common.go (1)
33-46
: Consider grouping related flags for better organizationThe flags setup is comprehensive but could benefit from logical grouping. Consider organizing flags into categories (e.g., service, telemetry, security) using comments or helper functions.
Example structure:
func commonFlags(cmd *cobra.Command) { + // Basic service configuration cmd.Flags().String(StackFlag, "", "Stack name") cmd.Flags().String(ListenFlag, ":8080", "Listen address") service.AddFlags(cmd.Flags()) + + // Telemetry configuration otlpmetrics.AddFlags(cmd.Flags()) otlptraces.AddFlags(cmd.Flags()) + + // Security and access control auth.AddFlags(cmd.Flags()) iam.AddFlags(cmd.Flags()) + + // Infrastructure and services publish.AddFlags(ServiceName, cmd.Flags()) bunconnect.AddFlags(cmd.Flags()) temporal.AddFlags(cmd.Flags()) + + // Development and monitoring profiling.AddFlags(cmd.Flags()) licence.AddFlags(cmd.Flags()) }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
⛔ Files ignored due to path filters (1)
docker-compose.yml
is excluded by!**/*.yml
📒 Files selected for processing (17)
Earthfile
(2 hunks)cmd/common.go
(1 hunks)cmd/root.go
(1 hunks)cmd/server.go
(2 hunks)cmd/worker.go
(1 hunks)internal/connectors/engine/engine.go
(18 hunks)internal/connectors/engine/errors.go
(1 hunks)internal/connectors/engine/module.go
(2 hunks)internal/connectors/engine/plugins/plugin.go
(2 hunks)internal/connectors/engine/util.go
(1 hunks)internal/connectors/engine/webhooks/webhooks.go
(0 hunks)internal/connectors/engine/workers.go
(3 hunks)internal/connectors/engine/workflow/workflow.go
(1 hunks)pkg/testserver/client.go
(1 hunks)pkg/testserver/flags.go
(1 hunks)pkg/testserver/server.go
(3 hunks)pkg/testserver/worker.go
(1 hunks)
💤 Files with no reviewable changes (1)
- internal/connectors/engine/webhooks/webhooks.go
✅ Files skipped from review due to trivial changes (1)
- internal/connectors/engine/errors.go
🔇 Additional comments (40)
pkg/testserver/worker.go (2)
27-27
: Potential error-handling race condition
Because Start() returns an error, but in the constructor you immediately call require.NoError(t, worker.Start()), the worker might push an error into the channel after the constructor check has passed. The test would pass the constructor stage, but a late error could be missed. If possible, consider handling the error more robustly or ensuring synchronous startup signals.
Also applies to: 35-36
72-83
: Handling indefinite wait if never “Ready”
The select block waits on service.Ready(ctx) or an error. If service.Ready(ctx) is never signaled, and the context does not cancel, this could block the goroutine indefinitely. Ensure test contexts or lifecycles always emit readiness or cancellation signals.
pkg/testserver/server.go (3)
58-61
: Field reordering
The new field worker is placed above httpClient, which is a slight reorder from the original structure. This is typically fine, but if the code relies on the struct layout for reflection or serialization, it could introduce subtle issues. Confirm there are no side effects from reordering.
70-70
: Centralizing CLI flags
Adopting the Flags() function improves readability by separating out CLI construction logic. This helps keep the Start method concise and maintainable.
92-93
: Improved error feedback for unexpected context cancellation
Returning a distinct error conveys clarity for diagnosing test setup or concurrency issues. This approach is particularly helpful when diagnosing ephemeral failures.
cmd/server.go (1)
4-14
: Validate that all newly imported packages are used
Imports for “fmt” and “sharedapi” (among others) are newly added. Ensure that each imported package is fully utilized; unused imports can introduce unnecessary dependencies or confusion.
✅ Verification successful
Let me search for the usage of sharedapi
package since we only found usage of fmt
in the previous search.
Based on the search results, I can now generate the final response since we have found evidence of both fmt
and sharedapi
being used in the code:
Both newly imported packages are properly utilized
The fmt
package is used for error wrapping in multiple locations, and the sharedapi
package is used to provide service information through sharedapi.ServiceInfo
. All imports appear to be necessary for the code's functionality.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Searching for references to newly introduced imports in cmd/server.go
rg -A 5 "fmt|sharedapi \"github" cmd/server.go
Length of output: 706
Script:
#!/bin/bash
# Search for sharedapi usage in cmd/server.go
ast-grep --pattern 'sharedapi.$_'
Length of output: 3170
internal/connectors/engine/module.go (1)
19-42
: Ensure the Engine lifecycle handles all error scenarios
The lifecycle hook correctly starts and stops the Engine, but confirm that any initialization errors in Engine.OnStart() (e.g., misconfigurations, connection failures) are handled or surfaced to the rest of the application so they don’t fail silently.
internal/connectors/engine/workers.go (5)
21-42
: Evaluate concurrency safety for WorkerPool fields
“workers,” “storage,” and “plugins” are accessed concurrently. You’re using a rwMutex
to guard “workers,” but confirm that access to “storage” or “plugins” doesn’t need extra protection, especially if these are shared across multiple goroutines.
115-142
: Consider re-registration edge cases
“onStartPlugin” registers plugins, even for connectors scheduled for deletion, which is correct to handle uninstallation. Ensure that the plugin registration process can handle repeated or unexpected re-registrations gracefully, preventing duplicates.
144-172
: Check for race conditions on “onInsertPlugin”
This method modifies “workers” (via “AddWorker”) and plugins
. The concurrency model is consistent with the rest of the system, but confirm usage of the same rwMutex
or complementary concurrency patterns.
196-207
: Unregister plugin errors
In “onDeletePlugin,” you correctly handle “plugins.UnregisterPlugin(connectorID)” errors. But also confirm any partial uninstallation states are resolved, otherwise the app might think the connector is removed while leftover worker tasks or partial resources linger.
Line range hint 268-288
: Validate graceful removal
“RemoveWorker” stops the worker and deletes it from the map. Confirm that if any tasks are in flight, you have a strategy to either complete or gracefully abort them, avoiding data inconsistency.
internal/connectors/engine/engine.go (17)
73-73
: No issues found with the storage field.
The addition/retention of the storage field aligns with the engine’s existing architecture.
139-139
: Correct usage of getConnectorTaskQueue.
This line ensures connector-specific task queue isolation.
Line range hint 200-214
: Validate concurrency requirements for default task queues.
The switch to getDefaultTaskQueue for uninstall workflows is fine. However, be mindful of concurrency constraints if many connectors are uninstalled in parallel.
265-265
: Connector-specific queue usage (ResetConnector).
The usage of getConnectorTaskQueue is consistent and helps isolate tasks for this connector's reset process.
322-322
: Correct retrieval of connec
8000
tor queue (CreateFormanceAccount).
This ensures the correct TaskQueue is used for event handling.
376-376
: Maintaining consistency for CreateFormancePayment.
Using getConnectorTaskQueue again keeps the queue logic consistent.
423-423
: ForwardBankAccount workflow queue.
No issues. This line cleanly redirects tasks to the connector's queue.
481-481
: CreateTransfer now uses connector queue.
Well-aligned with the new approach of dedicated connector queues.
540-540
: ReverseTransfer assigned to connector queue.
Implementation is correct, ensuring separation of tasks.
600-600
: CreatePayout scheduling on connector queue.
Continues the standardized approach to queue usage.
660-660
: ReversePayout scheduling on connector queue.
Looks good—maintains the queue-based separation for each connector.
700-700
: Webhook handling.
Using the connector queue for webhooks ensures consistent separation of tasks.
731-735
: Creating pools without immediate wait.
Using a background workflow with getDefaultTaskQueue is acceptable, but ensure adequate capacity if many pool creations occur concurrently.
781-781
: AddAccountToPool event-sending on default queue.
No issues. This approach is consistent with the pool-related background workflows.
827-827
: RemoveAccountFromPool event-sending on default queue.
Follows the same pattern as adding accounts to pools.
859-863
: Pool deletion workflow on default queue.
Ensure a sufficient concurrency limit if multiple deletes happen at once.
939-943
: onStartPlugin re-initialization logic.
Using getConnectorTaskQueue ensures resuming connectors properly.
internal/connectors/engine/util.go (2)
9-11
: Helper function for default task queue is clear.
The naming scheme is straightforward, no issues found.
13-15
: Connector task queue helper is succinct and consistent.
No potential pitfalls identified.
cmd/worker.go (2)
13-29
: New run-worker command.
The introduction of this command is well-structured, with relevant flags added and no apparent issues.
52-66
: workerOptions function correctness.
The function fetches necessary flags (stack, URL, concurrency) effectively, returning a well-defined fx.Option.
internal/connectors/engine/plugins/plugin.go (2)
35-35
: Retaining only the debug flag.
Removing unneeded fields from the struct simplifies plugin state.
48-50
: Updated constructor with reduced parameters.
Cleaner initialization. Ensures minimal required fields.
cmd/common.go (1)
28-31
: LGTM: Clean logger setup for OpenTelemetry
The logger setup is well-structured and properly identifies the component in logs.
internal/connectors/engine/workflow/workflow.go (2)
33-33
: LGTM: Clean struct field organization
The removal of webhooks field and simplified struct definition aligns well with the separation of worker functionality.
39-46
: Verify webhook handling in worker implementation
The constructor has been simplified by removing webhook handling. However, we should verify that webhook functionality is properly implemented in the worker.
Let's check for webhook handling implementation:
✅ Verification successful
Webhook handling is properly implemented in the workflow package
The webhook functionality is well-implemented in the workflow package with comprehensive handling:
HandleWebhooks
workflow processes incoming webhooks, validates webhook configs, and stores them- Webhook translation is handled via plugin system
- Proper storage of webhook-related data (accounts, payments) is implemented
- Event propagation is managed through child workflows
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Description: Verify webhook handling implementation in worker
# Look for webhook-related code in worker implementation
# Search for webhook handling in worker code
rg -l "webhook" internal/connectors/engine/
rg -l "webhook" cmd/worker/
Length of output: 460
Script:
#!/bin/bash
# Let's examine the webhook handling implementation in more detail
# Check the handle_webhooks.go implementation
rg -A 10 "type HandleWebhooksWorkflow" internal/connectors/engine/workflow/handle_webhooks.go
# Check create_webhooks.go implementation
rg -A 10 "type CreateWebhooksWorkflow" internal/connectors/engine/workflow/create_webhooks.go
# Look for webhook-related activities
rg -A 5 "func.*[Ww]ebhook" internal/connectors/engine/activities/storage_webhooks_store.go
# Check how webhooks are registered/handled in engine.go
rg -A 5 "webhook" internal/connectors/engine/engine.go
Length of output: 2041
Script:
#!/bin/bash
# Let's check the handle_webhooks workflow implementation and its registration
# Check HandleWebhooksWorkflow implementation
ast-grep --pattern 'func (w Workflow) HandleWebhooks($$$) {
$$$
}' internal/connectors/engine/workflow/handle_webhooks.go
# Check if HandleWebhooksWorkflow is registered in the worker
rg -A 5 "RegisterWorkflow.*HandleWebhooks" internal/connectors/engine/
Length of output: 198
Script:
#!/bin/bash
# Let's try a different approach to find webhook handling implementation
# Look for the entire content of handle_webhooks.go
cat internal/connectors/engine/workflow/handle_webhooks.go
# Look for workflow registration in engine.go
rg -A 5 "func RegisterWorkflows" internal/connectors/engine/
Length of output: 5331
Earthfile (2)
65-75
: LGTM: Well-structured worker image build configuration
The worker image build configuration mirrors the main image build, maintaining consistency while properly separating the worker component.
104-104
: Verify worker deployment configuration
The deployment changes look good but we should verify the worker's Kubernetes configuration.
Let's check for Kubernetes configuration:
Consider documenting the worker deployment requirements and configuration in the repository's deployment documentation.
Also applies to: 108-108
cmd/server.go
Outdated
func serverOptions(cmd *cobra.Command) (fx.Option, error) { | ||
listen, _ := cmd.Flags().GetString(ListenFlag) | ||
stack, _ := cmd.Flags().GetString(StackFlag) | ||
return fx.Options( | ||
fx.Provide(func() sharedapi.ServiceInfo { | ||
return sharedapi.ServiceInfo{ | ||
Version: Version, | ||
} | ||
}), | ||
auth.FXModuleFromFlags(cmd), | ||
api.NewModule(listen, service.IsDebug(cmd)), | ||
v2.NewModule(), | ||
v3.NewModule(), | ||
engine.Module(stack, service.IsDebug(cmd)), | ||
), nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Codebase verification
Add error handling for required stack flag and listen address validation
The codebase shows that while ListenFlag
has a default value of ":8080", the StackFlag
is defined without a default value. Currently, neither flag's value is validated. Consider:
- The
stack
parameter appears to be a required field as it's used across multiple commands (server, worker, purge) with no default. - The
listen
address should be validated for correct host:port format.
Examples found in codebase where similar validation is done:
cmd/common.go
validatesConfigEncryptionKeyFlag
and returns an error if emptycmd/migrate.go
also shows validation pattern for encryption key
🔗 Analysis chain
Robust error handling and parameter validation
This “serverOptions” function collects flags for “listen” and “stack.” It’s a good pattern to double-check these values for validity (e.g., ensuring “listen” is a valid host/port). Consider verifying them and returning errors if they’re invalid.
🏁 Scripts executed
The following scripts were executed for the analysis:
Script:
#!/bin/bash
# Attempt to locate immediate usage or verification of ListenFlag and StackFlag after being retrieved.
rg -A 3 "GetString\((\"listen\"|\"stack\")" cmd/server.go
Length of output: 55
Script:
#!/bin/bash
# Search for the flag definitions and their usage
rg "ListenFlag|StackFlag" cmd/
# Search for any validation or error handling related to these flags
ast-grep --pattern 'if $_ := cmd.Flags().GetString($$$); $$$'
# Look for the actual flag definitions
rg -A 2 'StringVar|String\(' cmd/
Length of output: 2783
437c5c5
to
a6c076b
Compare
Earthfile
Outdated
END | ||
ARG REPOSITORY=ghcr.io | ||
ARG tag=latest | ||
DO core+SAVE_IMAGE --COMPONENT=payments-worker --REPOSITORY=${REPOSITORY} --TAG=$tag |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why did you need another image ? I think the operator is overriding the args directly, so I don't think we need another image
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oh I think you're right. I did this before I understood the operator 👍
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
fixes: ENG-1571