8000 feat(ircv3): add simple internal batch processing by RaitoBezarius · Pull Request #266 · aatxe/irc · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(ircv3): add simple internal batch processing #266

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: develop
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,10 @@ members = [ "./", "irc-proto/" ]


[features]
default = ["ctcp", "tls-native", "channel-lists", "toml_config"]
default = ["ctcp", "tls-native", "channel-lists", "batch", "toml_config"]
ctcp = []
channel-lists = []
batch = []

json_config = ["serde", "serde/derive", "serde_derive", "serde_json"]
toml_config = ["serde", "serde/derive", "serde_derive", "toml"]
Expand Down
63 changes: 63 additions & 0 deletions src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ use crate::{
},
error,
proto::{
BatchSubCommand,
mode::ModeType,
CapSubCommand::{END, LS, REQ},
Capability, ChannelMode, Command,
Expand Down Expand Up @@ -498,6 +499,8 @@ struct ClientState {
config: Config,
/// A thread-safe map of channels to the list of users in them.
chanlists: RwLock<HashMap<String, Vec<User>>>,
/// A thread-safe map of in-progress batch
inflight_batches: RwLock<HashMap<String, Vec<Message>>>,
/// A thread-safe index to track the current alternative nickname being used.
alt_nick_index: RwLock<usize>,
/// Default ghost sequence to send if one is required but none is configured.
Expand All @@ -509,6 +512,7 @@ impl ClientState {
ClientState {
sender,
config,
inflight_batches: RwLock::new(HashMap::new()),
chanlists: RwLock::new(HashMap::new()),
alt_nick_index: RwLock::new(0),
default_ghost_sequence: vec![String::from("GHOST")],
Expand Down Expand Up @@ -553,6 +557,30 @@ impl ClientState {
/// Handles received messages internally for basic client functionality.
fn handle_message(&self, msg: &Message) -> error::Result<()> {
log::trace!("[RECV] {}", msg.to_string());

if let Some(tags) = msg.tags {
if let Some(batch_tag) = tags.into_iter().find(|tag| tag.0 == "batch") {
match batch_tag.1 {
Some(batch_id) => {
let inflight_batches = self.inflight_batches.read();
// TODO: check if we negotiated batch as well.
if !inflight_batches.contains_key(&batch_id) {
} else {
// Release the read lock, we are upgrading to a write lock.
drop(inflight_batches);
let mut inflight_batches = self.inflight_batches.write();
// This message processing is delayed until the batch is finished.
inflight_batches.entry(batch_id).and_modify(|messages| messages.push(msg.clone()));
return Ok(());
}
},
None => {
// TODO: Return an invalid message error.
}
}
}
}

match msg.command {
JOIN(ref chan, _, _) => self.handle_join(msg.source_nickname().unwrap_or(""), chan),
PART(ref chan, _) => self.handle_part(msg.source_nickname().unwrap_or(""), chan),
Expand All @@ -579,6 +607,7 @@ impl ClientState {
}
}
}
Command::BATCH(ref reference_tag, ref sub, ref params) => self.handle_batch(reference_tag, sub.as_ref(), params.as_ref())?,
Command::Response(Response::RPL_NAMREPLY, ref args) => self.handle_namreply(args),
Command::Response(Response::RPL_ENDOFMOTD, _)
| Command::Response(Response::ERR_NOMOTD, _) => {
Expand Down Expand Up @@ -672,6 +701,40 @@ impl ClientState {
}
}

#[cfg(not(feature = "batch"))]
fn handle_batch(&self, _: &str, _: Option<&BatchSubCommand>, _: Option<&Vec<String>>) -> error::Result<()> {}

#[cfg(feature = "batch")]
fn handle_batch(&self, reference_tag: &str, sub: Option<&BatchSubCommand>, params: Option<&Vec<String>>) -> error::Result<()> {
// TODO: increase type safety here.
let is_start = reference_tag.chars().nth(0).unwrap() == '+';
let mut inflight_batches = self.inflight_batches.write();

// TODO: handle nested batches better.
// TODO: handling sub commands such as netsplit and netjoin could be done by having extra
// handlers for end users to warn them of an incoming netsplit or netjoin batch.
// If this is chathistory, handle_chathistory could also be designed.

let identifier = reference_tag[1..].to_string();
if is_start {
if inflight_batches.contains_key(&identifier) {
return Err(error::Error::BatchAlreadyExists(identifier));
}

// Create a new pending batch.
inflight_batches.insert(identifier, Vec::new());
} else {
// Remove the pending batches and replay all the messages.
let pending_messages = inflight_batches.remove(&reference_tag[1..].to_string()).ok_or(error::Error::BatchDisappearedBeforeEndOfBatchProcessed)?;
// Replay all delayed messages now.
for message in pending_messages {
self.handle_message(&message)?;
}
}

Ok(())
}

#[cfg(not(feature = "channel-lists"))]
fn handle_join(&self, _: &str, _: &str) {}

Expand Down
8 changes: 8 additions & 0 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,14 @@ pub enum Error {
/// Stream has already been configured.
#[error("stream has already been configured")]
StreamAlreadyConfigured,

/// A end of batch message was sent after a pending batch was removed from the inflight list
#[error("batch disappeared before end of batch was processed")]
BatchDisappearedBeforeEndOfBatchProcessed,

/// A new batch was started with the same reference tag
#[error("batch {} already exists but a new batch request with the same tag was sent", .0)]
BatchAlreadyExists(String),
}

/// Errors that occur with configurations.
Expand Down
0