-
Notifications
You must be signed in to change notification settings - Fork 92
feat(node-service): Refactor EngineActor
#1867
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
321 changes: 161 additions & 160 deletions
321
crates/node/service/src/actors/engine.rs → ...s/node/service/src/actors/engine/actor.rs
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
//! Error type for the [`EngineActor`]. | ||
//! | ||
//! [`EngineActor`]: super::EngineActor | ||
|
||
use kona_engine::{EngineResetError, EngineTaskError}; | ||
|
||
/// An error from the [`EngineActor`]. | ||
/// | ||
/// [`EngineActor`]: super::EngineActor | ||
#[derive(thiserror::Error, Debug)] | ||
pub enum EngineError { | ||
/// Closed channel error. | ||
#[error("a channel has been closed unexpectedly")] | ||
ChannelClosed, | ||
/// Engine reset error. | ||
#[error(transparent)] | ||
EngineReset(#[from] EngineResetError), | ||
/// Engine task error. | ||
#[error(transparent)] | ||
EngineTask(#[from] EngineTaskError), | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,76 @@ | ||
//! The [`L2Finalizer`]. | ||
clabby marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
use kona_engine::{Engine, EngineClient, EngineTask, FinalizeTask}; | ||
use kona_protocol::{BlockInfo, OpAttributesWithParent}; | ||
use std::{collections::BTreeMap, sync::Arc}; | ||
use tokio::sync::mpsc::UnboundedReceiver; | ||
|
||
/// An internal type alias for L1 block numbers. | ||
type L1BlockNumber = u64; | ||
|
||
/// An internal type alias for L2 block numbers. | ||
type L2BlockNumber = u64; | ||
|
||
/// The [`L2Finalizer`] is responsible for finalizing L2 blocks derived from finalized L1 blocks. | ||
/// It maintains a queue of derived L2 blocks that are awaiting finalization, and finalizes them | ||
/// as new finalized L1 blocks are received. | ||
#[derive(Debug)] | ||
pub struct L2Finalizer { | ||
/// A channel that receives new finalized L1 blocks intermittently. | ||
finalized_l1_block_rx: UnboundedReceiver<BlockInfo>, | ||
/// An [`EngineClient`], used to create [`FinalizeTask`]s. | ||
client: Arc<EngineClient>, | ||
/// A map of `L1 block number -> highest derived L2 block number` within the L1 epoch, used to | ||
/// track derived [`OpAttributesWithParent`] awaiting finalization. When a new finalized L1 | ||
/// block is received, the highest L2 block whose inputs are contained within the finalized | ||
/// L1 chain is finalized. | ||
awaiting_finalization: BTreeMap<L1BlockNumber, L2BlockNumber>, | ||
} | ||
|
||
impl L2Finalizer { | ||
/// Creates a new [`L2Finalizer`] with the given channel receiver for finalized L1 blocks. | ||
pub const fn new( | ||
finalized_l1_block_rx: UnboundedReceiver<BlockInfo>, | ||
client: Arc<EngineClient>, | ||
) -> Self { | ||
Self { finalized_l1_block_rx, cl 8000 ient, awaiting_finalization: BTreeMap::new() } | ||
} | ||
|
||
/// Enqueues a derived [`OpAttributesWithParent`] for finalization. When a new finalized L1 | ||
/// block is observed that is `>=` the height of [`OpAttributesWithParent::l1_origin`], the L2 | ||
/// block associated with the payload attributes will be finalized. | ||
pub fn enqueue_for_finalization(&mut self, attributes: &OpAttributesWithParent) { | ||
self.awaiting_finalization | ||
.entry(attributes.l1_origin.number) | ||
.and_modify(|n| *n = (*n).max(attributes.parent.block_info.number + 1)) | ||
.or_insert(attributes.parent.block_info.number + 1); | ||
} | ||
|
||
/// Clears the finalization queue. | ||
pub fn clear(&mut self) { | ||
self.awaiting_finalization.clear(); | ||
} | ||
|
||
/// Receives a new finalized L1 block from the channel. | ||
pub async fn recv(&mut self) -> Option<BlockInfo> { | ||
self.finalized_l1_block_rx.recv().await | ||
} | ||
|
||
/// Attempts to finalize any L2 blocks that the finalizer knows about and are contained within | ||
/// the new finalized L1 chain. | ||
pub async fn try_finalize_next(&mut self, new_finalized_l1: BlockInfo, engine: &mut Engine) { | ||
// Find the highest safe L2 block that is contained within the finalized chain, | ||
// that the finalizer is aware of. | ||
let highest_safe = self.awaiting_finalization.range(..=new_finalized_l1.number).next_back(); | ||
|
||
// If the highest safe block is found, enqueue a finalization task and drain the | ||
// queue of all L1 blocks not contained in the finalized L1 chain. | ||
if let Some((_, highest_safe_number)) = highest_safe { | ||
let task = | ||
EngineTask::Finalize(FinalizeTask::new(self.client.clone(), *highest_safe_number)); | ||
engine.enqueue(task); | ||
|
||
self.awaiting_finalization.retain(|&number, _| number > new_finalized_l1.number); | ||
} | ||
clabby marked this conversation as resolved.
Show resolved
Hide resolved
|
||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
//! The [`EngineActor`] and its components. | ||
|
||
mod actor; | ||
pub use actor::{EngineActor, EngineLauncher, InboundEngineMessage}; | ||
|
||
mod error; | ||
pub use error::EngineError; | ||
|
||
mod finalizer; | ||
pub use finalizer::L2Finalizer; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: I would tend to prefer, for tokio channels, to actually specify the entire import path inside for every type of channel we're using. Here it would be
tokio::sync::oneshot::Receiver
. We can also just importtokio::sync
to the file namespace. The advantage of doing that are:That is more a matter of personal taste, not blocking for this PR. However, it would be amazing if we could:
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree. For now I added named aliases to the channels used in the engine actor. Let's follow up with a standard convention; I prefer shorter qualified paths such as
mpsc::{Sender/Receiver}
, but glad to discuss w/e is best.