-
Notifications
You must be signed in to change notification settings - Fork 1.7k
fix(amqp sink): attempt one reconnect when channel has errored #22971
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
fix(amqp sink): attempt one reconnect when channel has errored #22971
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @aramperes
src/sinks/amqp/channel.rs
Outdated
} | ||
|
||
info!( | ||
message = "Recovering broken connection to the AMQP broker.", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: We can add a URL tag here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure that is a good idea since it could print the password:
INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector::sinks::amqp::channel: Recovering broken connection to the AMQP broker. uri="amqp://user:password@127.0.0.1:5672/%2f?timeout=10" internal_log_rate_limit=true
INFO sink{component_kind="sink" component_id=sink_amqp component_type=amqp}:request{request_id=1}: vector::sinks::amqp::channel: Recovered connection to the AMQP broker. uri="amqp://user:password@127.0.0.1:5672/%2f?timeout=10" internal_log_rate_limit=true
Ok sounds good, I will try a proof of concept and see if it provides value. The official |
@pront I've integrated I tested the recovery behavior and it is the same as with my previous implementation. When picking a Channel from the pool, it will check its current state. If it is not Logs:
|
Summary
Improves the resilience of the
amqp sink
by attempting to re-create a brokenChannel
when sending new events.This is a simple implementation that could be extended later or in this PR, depending on the Vector team's preference & vision. The current implementation:
tower
retry mechanism in some way;amqp source
, which would be more complex to re-create the listeners.Lapin does not have a built-in reconnection mechanism and it is up to the client to implement this. See amqp-rs/lapin#70, amqp-rs/lapin#389
Implementation
A new
AmqpChannel
struct wraps atokio::sync::RwLock<lapin::Channel>
. This read-write lock is used to lock during the re-connection to prevent concurrent attempts.The API could be further improved by making
crate::amqp::AmqpConfig::connect()
return anAmqpChannel
directly, but that would also affect theamqp source
.Change Type
Is this a breaking change?
How did you test this PR?
amqp
sink. Example:Error
state. If the RabbitMQ process hasn't booted up and the re-connection failed, the event is dropped. Otherwise, the channel is re-established and the event is sent to the new channel.Example of the re-connection succeeding:
Example of re-connection failing:
Does this PR include user facing changes?
Notes
@vectordotdev/vector
to reach out to us regarding this PR.pre-push
hook, please see this template.cargo fmt --all
cargo clippy --workspace --all-targets -- -D warnings
cargo nextest run --workspace
(alternatively, you can runcargo test --all
)./scripts/check_changelog_fragments.sh
git merge origin master
andgit push
.Cargo.lock
), pleaserun
cargo vdev build licenses
to regenerate the license inventory and commit the changes (if any). More details here.References
Closes #22313