8000 feat(node-service): Refactor `EngineActor` by clabby · Pull Request #1867 · op-rs/kona · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(node-service): Refactor EngineActor #1867

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 27, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions crates/node/service/src/actors/derivation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ use tokio::{
select,
sync::{
mpsc::{UnboundedReceiver, UnboundedSender},
oneshot::Receiver as OneshotReceiver,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I would tend to prefer, for tokio channels, to actually specify the entire import path inside for every type of channel we're using. Here it would be tokio::sync::oneshot::Receiver. We can also just import tokio::sync to the file namespace. The advantage of doing that are:

  • less code
  • we can easily audit whether all our channels are tokio channels (and not std channels). Pretty easy to import the wrong channel type, which may create pretty subtle bugs.

That is more a matter of personal taste, not blocking for this PR. However, it would be amazing if we could:

  • find a convention for the codebase for how we do imports
  • add it to the contribution guidelines (for external contributors)

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. For now I added named aliases to the channels used in the engine actor. Let's follow up with a standard convention; I prefer shorter qualified paths such as mpsc::{Sender/Receiver}, but glad to discuss w/e is best.

watch::Receiver as WatchReceiver,
},
};
Expand All @@ -33,8 +34,8 @@ where

/// The l2 safe head from the engine.
engine_l2_safe_head: WatchReceiver<L2BlockInfo>,
/// A receiver that tells derivation to begin.
sync_complete_rx: UnboundedReceiver<()>,
/// A receiver that tells derivation to begin. Completing EL sync consumes the instance.
sync_complete_rx: OneshotReceiver<()>,
/// A receiver that sends a [`Signal`] to the derivation pipeline.
///
/// The derivation actor steps over the derivation pipeline to generate
Expand Down Expand Up @@ -85,7 +86,7 @@ where
pub const fn new(
pipeline: P,
engine_l2_safe_head: WatchReceiver<L2BlockInfo>,
sync_complete_rx: UnboundedReceiver<()>,
sync_complete_rx: OneshotReceiver<()>,
derivation_signal_rx: UnboundedReceiver<Signal>,
l1_head_updates: UnboundedReceiver<BlockInfo>,
attributes_out: UnboundedSender<OpAttributesWithParent>,
Expand Down Expand Up @@ -253,10 +254,9 @@ where
_ = self.engine_l2_safe_head.changed() => {
self.process(InboundDerivationMessage::SafeHeadUpdated).await?;
}
_ = self.sync_complete_rx.recv(), if !self.engine_ready => {
_ = &mut self.sync_complete_rx, if !self.engine_ready => {
info!(target: "derivation", "Engine finished syncing, starting derivation.");
self.engine_ready = true;
self.sync_complete_rx.close();
// Optimistically process the first message.
self.process(InboundDerivationMessage::NewDataAvailable).await?;
}
Expand Down

Large diffs are not rendered by default.

21 changes: 21 additions & 0 deletions crates/node/service/src/actors/engine/error.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
//! Error type for the [`EngineActor`].
//!
//! [`EngineActor`]: super::EngineActor

use kona_engine::{EngineResetError, EngineTaskError};

/// An error from the [`EngineActor`].
///
/// [`EngineActor`]: super::EngineActor
#[derive(thiserror::Error, Debug)]
pub enum EngineError {
/// Closed channel error.
#[error("a channel has been closed unexpectedly")]
ChannelClosed,
/// Engine reset error.
#[error(transparent)]
EngineReset(#[from] EngineResetError),
/// Engine task error.
#[error(transparent)]
EngineTask(#[from] EngineTaskError),
}
76 changes: 76 additions & 0 deletions crates/node/service/src/actors/engine/finalizer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
//! The [`L2Finalizer`].

use kona_engine::{Engine, EngineClient, EngineTask, FinalizeTask};
use kona_protocol::{BlockInfo, OpAttributesWithParent};
use std::{collections::BTreeMap, sync::Arc};
use tokio::sync::mpsc::UnboundedReceiver;

/// An internal type alias for L1 block numbers.
type L1BlockNumber = u64;

/// An internal type alias for L2 block numbers.
type L2BlockNumber = u64;

/// The [`L2Finalizer`] is responsible for finalizing L2 blocks derived from finalized L1 blocks.
/// It maintains a queue of derived L2 blocks that are awaiting finalization, and finalizes them
/// as new finalized L1 blocks are received.
#[derive(Debug)]
pub struct L2Finalizer {
/// A channel that receives new finalized L1 blocks intermittently.
finalized_l1_block_rx: UnboundedReceiver<BlockInfo>,
/// An [`EngineClient`], used to create [`FinalizeTask`]s.
client: Arc<EngineClient>,
/// A map of `L1 block number -> highest derived L2 block number` within the L1 epoch, used to
/// track derived [`OpAttributesWithParent`] awaiting finalization. When a new finalized L1
/// block is received, the highest L2 block whose inputs are contained within the finalized
/// L1 chain is finalized.
awaiting_finalization: BTreeMap<L1BlockNumber, L2BlockNumber>,
}

impl L2Finalizer {
/// Creates a new [`L2Finalizer`] with the given channel receiver for finalized L1 blocks.
pub const fn new(
finalized_l1_block_rx: UnboundedReceiver<BlockInfo>,
client: Arc<EngineClient>,
) -> Self {
Self { finalized_l1_block_rx, cl 8000 ient, awaiting_finalization: BTreeMap::new() }
}

/// Enqueues a derived [`OpAttributesWithParent`] for finalization. When a new finalized L1
/// block is observed that is `>=` the height of [`OpAttributesWithParent::l1_origin`], the L2
/// block associated with the payload attributes will be finalized.
pub fn enqueue_for_finalization(&mut self, attributes: &OpAttributesWithParent) {
self.awaiting_finalization
.entry(attributes.l1_origin.number)
.and_modify(|n| *n = (*n).max(attributes.parent.block_info.number + 1))
.or_insert(attributes.parent.block_info.number + 1);
}

/// Clears the finalization queue.
pub fn clear(&mut self) {
self.awaiting_finalization.clear();
}

/// Receives a new finalized L1 block from the channel.
pub async fn recv(&mut self) -> Option<BlockInfo> {
self.finalized_l1_block_rx.recv().await
}

/// Attempts to finalize any L2 blocks that the finalizer knows about and are contained within
/// the new finalized L1 chain.
pub async fn try_finalize_next(&mut self, new_finalized_l1: BlockInfo, engine: &mut Engine) {
// Find the highest safe L2 block that is contained within the finalized chain,
// that the finalizer is aware of.
let highest_safe = self.awaiting_finalization.range(..=new_finalized_l1.number).next_back();

// If the highest safe block is found, enqueue a finalization task and drain the
// queue of all L1 blocks not contained in the finalized L1 chain.
if let Some((_, highest_safe_number)) = highest_safe {
let task =
EngineTask::Finalize(FinalizeTask::new(self.client.clone(), *highest_safe_number));
engine.enqueue(task);

self.awaiting_finalization.retain(|&number, _| number > new_finalized_l1.number);
}
}
}
10 changes: 10 additions & 0 deletions crates/node/service/src/actors/engine/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
//! The [`EngineActor`] and its components.

mod actor;
pub use actor::{EngineActor, EngineLauncher, InboundEngineMessage};

mod error;
pub use error::EngineError;

mod finalizer;
pub use finalizer::L2Finalizer;
2 changes: 1 addition & 1 deletion crates/node/service/src/actors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ mod runtime;
pub use runtime::{RuntimeActor, RuntimeLauncher};

mod engine;
pub use engine::{EngineActor, EngineError, EngineLauncher};
pub use engine::{EngineActor, EngineError, EngineLauncher, InboundEngineMessage, L2Finalizer};

mod rpc;
pub use rpc::{RpcActor, RpcActorError};
Expand Down
5 changes: 3 additions & 2 deletions crates/node/service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@ pub use service::{
mod actors;
pub use actors::{
DerivationActor, DerivationError, EngineActor, EngineError, EngineLauncher,
InboundDerivationMessage, L1WatcherRpc, L1WatcherRpcError, NetworkActor, NetworkActorError,
NodeActor, RpcActor, RpcActorError, RuntimeActor, RuntimeLauncher,
InboundDerivationMessage, InboundEngineMessage, L1WatcherRpc, L1WatcherRpcError, L2Finalizer,
NetworkActor, NetworkActorError, NodeActor, RpcActor, RpcActorError, RuntimeActor,
RuntimeLauncher,
};

mod metrics;
Expand Down
7 changes: 5 additions & 2 deletions crates/node/service/src/service/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ use kona_rpc::{
RpcLauncherError, WsRPC, WsServer,
};
use std::fmt::Display;
use tokio::sync::mpsc::{self, UnboundedSender};
use tokio::sync::{
mpsc::{self, UnboundedSender},
oneshot,
};
use tokio_util::sync::CancellationToken;

/// The [`ValidatorNodeService`] trait defines the common interface for running a validator node
Expand Down Expand Up @@ -91,7 +94,7 @@ pub trait ValidatorNodeService {
let (new_finalized_tx, new_finalized_rx) = mpsc::unbounded_channel();
let (derived_payload_tx, derived_payload_rx) = mpsc::unbounded_channel();
let (unsafe_block_tx, unsafe_block_rx) = mpsc::unbounded_channel();
let (sync_complete_tx, sync_complete_rx) = mpsc::unbounded_channel();
let (sync_complete_tx, sync_complete_rx) = oneshot::channel();
let (runtime_config_tx, runtime_config_rx) = mpsc::unbounded_channel();
let (derivation_signal_tx, derivation_signal_rx) = mpsc::unbounded_channel();
let (reset_request_tx, reset_request_rx) = mpsc::unbounded_channel();
Expand Down
Loading
0