8000 feat: Add support for repodata patching in rattler-index, fix silent failures by pavelzw · Pull Request #1129 · conda/rattler · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: Add support for repodata patching in rattler-index, fix silent failures #1129

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/rattler_conda_types/src/repo_data/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ fn determine_subdir(
Arch::X86_64 => "64",
_ => arch.as_str(),
};
Ok(format!("{}-{}", platform, arch_str))
Ok(format!("{platform}-{arch_str}"))
}
Err(_) => Err(ConvertSubdirError::NoKnownCombination { platform, arch }),
}
Expand Down
256 changes: 194 additions & 62 deletions crates/rattler_index/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,21 +2,24 @@
//! files
#![deny(missing_docs)]

use anyhow::Result;
use anyhow::{Context, Result};
use bytes::buf::Buf;
use fs_err::{self as fs};
use futures::future::try_join_all;
use futures::{stream::FuturesUnordered, StreamExt};
use fxhash::FxHashMap;
use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
use rattler_conda_types::{
package::{ArchiveType, IndexJson, PackageFile},
ChannelInfo, PackageRecord, Platform, RepoData,
ChannelInfo, PackageRecord, PatchInstructions, Platform, RepoData,
};
use rattler_networking::{Authentication, AuthenticationStorage};
use rattler_package_streaming::{read, seek};
use rattler_package_streaming::{
read,
seek::{self, stream_conda_content},
};
use std::{
collections::{HashMap, HashSet},
io::{Cursor, Read},
io::{Cursor, Read, Seek},
path::{Path, PathBuf},
str::FromStr,
sync::Arc,
Expand Down Expand Up @@ -71,6 +74,46 @@ pub fn package_record_from_index_json<T: Read>(
Ok(package_record)
}

fn repodata_patch_from_conda_package_stream<'a>(
package: impl Read + Seek + 'a,
) -> anyhow::Result<rattler_conda_types::RepoDataPatch> {
let mut subdirs = FxHashMap::default();

let mut content_reader = stream_conda_content(package)?;
let entries = content_reader.entries()?;
for entry in entries {
let mut entry = entry?;
if !entry.header().entry_type().is_file() {
return Err(anyhow::anyhow!(
"Expected repodata patch package to be a file"
));
}
let mut buf = Vec::new();
entry.read_to_end(&mut buf)?;
let path = entry.path()?;
let components = path.components().collect::<Vec<_>>();
let subdir =
if components.len() == 2 && components[1].as_os_str() == "patch_instructions.json" {
let subdir_str = components[0]
.as_os_str()
.to_str()
.context("Could not convert OsStr to str")?;
let _ = Platform::from_str(subdir_str)?;
subdir_str.to_string()
} else {
return Err(anyhow::anyhow!(
"Expected files of form <subdir>/patch_instructions.json, but found {}",
path.display()
));
};

let instructions: PatchInstructions = serde_json::from_slice(&buf)?;
subdirs.insert(subdir, instructions);
}

Ok(rattler_conda_types::RepoDataPatch { subdirs })
}

/// Extract the package record from a `.tar.bz2` package file.
/// This function will look for the `info/index.json` file in the conda package
/// and extract the package record from it.
Expand Down Expand Up @@ -132,12 +175,17 @@ async fn index_subdir(
subdir: Platform,
op: Operator,
force: bool,
repodata_patch: Option<PatchInstructions>,
progress: Option<MultiProgress>,
semaphore: Arc<Semaphore>,
) -> Result<()> {
let repodata_path = if repodata_patch.is_some() {
format!("{subdir}/repodata_from_packages.json")
} else {
format!("{subdir}/repodata.json")
};
let mut registered_packages: FxHashMap<String, PackageRecord> = HashMap::default();
if !force {
let repodata_path = format!("{subdir}/repodata.json");
let repodata_bytes = op.read(&repodata_path).await;
let repodata: RepoData = match repodata_bytes {
Ok(bytes) => serde_json::from_slice(&bytes.to_vec())?,
Expand Down Expand Up @@ -210,7 +258,7 @@ async fn index_subdir(
.cloned()
.collect::<Vec<_>>();

tracing::debug!(
tracing::info!(
"Adding {} packages to subdir {}.",
packages_to_add.len(),
subdir
Expand All @@ -229,53 +277,79 @@ async fn index_subdir(
.progress_chars("##-");
pb.set_style(sty);

let tasks = packages_to_add
.iter()
.map(|filename| {
tokio::spawn({
let op = op.clone();
let filename = filename.clone();
let pb = pb.clone();
let semaphore = semaphore.clone();
{
async move {
let _permit = semaphore
.acquire()
.await
.expect("Semaphore was unexpectedly closed");
pb.set_message(format!(
"Indexing {} {}",
subdir.as_str(),
console::style(filename.clone()).dim()
));
let file_path = format!("{subdir}/{filename}");
let buffer = op.read(&file_path).await?;
let reader = buffer.reader();
// We already know it's not None
let archive_type = ArchiveType::try_from(&filename).unwrap();
let record = match archive_type {
ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader),
ArchiveType::Conda => package_record_from_conda_reader(reader),
}?;
pb.inc(1);
Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record))
}
let mut tasks = FuturesUnordered::new();
for filename in packages_to_add.iter() {
let task = {
let op = op.clone();
let filename = filename.clone();
let pb = pb.clone();
let semaphore = semaphore.clone();
{
async move {
let _permit = semaphore
.acquire()
.await
.expect("Semaphore was unexpectedly closed");
pb.set_message(format!(
"Indexing {} {}",
subdir.as_str(),
console::style(filename.clone()).dim()
));
let file_path = format!("{subdir}/{filename}");
let buffer = op.read(&file_path).await?;
let reader = buffer.reader();
// We already know it's not None
let archive_type = ArchiveType::try_from(&filename).unwrap();
let record = match archive_type {
ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader),
ArchiveType::Conda => package_record_from_conda_reader(reader),
}?;
pb.inc(1);
Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record))
}
})
})
.collect::<Vec<_>>();
let results = try_join_all(tasks).await?;

pb.finish_with_message(format!("Finished {}", subdir.as_str()));
}
};
tasks.push(tokio::spawn(task));
}
let mut results = Vec::new();
while let Some(join_result) = tasks.next().await {
match join_result {
Ok(Ok(result)) => results.push(result),
Ok(Err(e)) => {
tasks.clear();
tracing::error!("Failed to process package: {}", e);
pb.abandon_with_message(format!(
"{} {}",
console::style("Failed to index").red(),
console::style(subdir.as_str()).dim()
));
Comment on lines +321 to +325
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is not perfect as the progress bar can get overwritten by other tasks that are still executed after this failure. haven't seen a nice way to avoid this behavior

image

i think we can keep it for now like this since the error messages are printed anyway and i expect this script to be executed in non-interactive environments 5D40 anyway most of the time

return Err(e.into());
}
Err(join_err) => {
tasks.clear();
tracing::error!("Task panicked: {}", join_err);
pb.abandon_with_message(format!(
"{} {}",
console::style("Failed to index").red(),
console::style(subdir.as_str()).dim()
));
return Err(anyhow::anyhow!("Task panicked: {}", join_err));
}
}
}
pb.finish_with_message(format!(
"{} {}",
console::style("Finished").green(),
subdir.as_str()
));

tracing::debug!(
tracing::info!(
"Successfully added {} packages to subdir {}.",
results.len(),
subdir
);

for result in results {
let (filename, record) = result?;
for (filename, record) in results {
registered_packages.insert(filename, record);
}

Expand Down Expand Up @@ -304,26 +378,46 @@ async fn index_subdir(
version: Some(2),
};

let repodata_path = format!("{subdir}/repodata.json");
tracing::info!("Writing repodata to {}", repodata_path);
let repodata_bytes = serde_json::to_vec(&repodata)?;
op.write(&repodata_path, repodata_bytes).await?;

if let Some(instructions) = repodata_patch {
let patched_repodata_path = format!("{subdir}/repodata.json");
tracing::info!("Writing patched repodata to {}", patched_repodata_path);
let mut patched_repodata = repodata.clone();
patched_repodata.apply_patches(&instructions);
let patched_repodata_bytes = serde_json::to_vec(&patched_repodata)?;
op.write(&patched_repodata_path, patched_repodata_bytes)
.await?;
}
// todo: also write repodata.json.bz2, repodata.json.zst, repodata.json.jlap and sharded repodata once available in rattler
// https://github.com/conda/rattler/issues/1096

Ok(())
}

/// Create a new `repodata.json` for all packages in the channel at the given directory.
#[allow(clippy::too_many_arguments)]
pub async fn index_fs(
channel: impl Into<PathBuf>,
target_platform: Option<Platform>,
repodata_patch: Option<String>,
force: bool,
max_parallel: usize,
multi_progress: Option<MultiProgress>,
) -> anyhow::Result<()> {
let mut config = FsConfig::default();
config.root = Some(channel.into().canonicalize()?.to_string_lossy().to_string());
index(target_platform, config, force, max_parallel, multi_progress).await
index(
target_platform,
config,
repodata_patch,
force,
max_parallel,
multi_progress,
)
.await
}

/// Create a new `repodata.json` for all packages in the channel at the given S3 URL.
Expand All @@ -337,6 +431,7 @@ pub async fn index_s3(
secret_access_key: Option<String>,
session_token: Option<String>,
target_platform: Option<Platform>,
repodata_patch: Option<String>,
force: bool,
max_parallel: usize,
multi_progress: Option<MultiProgress>,
Expand Down Expand Up @@ -376,6 +471,7 @@ pub async fn index_s3(
index(
target_platform,
s3_config,
repodata_patch,
force,
max_parallel,
multi_progress,
Expand All @@ -398,6 +494,7 @@ pub async fn index_s3(
pub async fn index<T: Configurator>(
target_platform: Option<Platform>,
config: T,
repodata_patch: Option<String>,
force: bool,
max_parallel: usize,
multi_progress: Option<MultiProgress>,
Expand Down Expand Up @@ -443,22 +540,57 @@ pub async fn index<T: Configurator>(
subdirs.insert(Platform::NoArch);
}

let repodata_patch = if let Some(path) = repodata_patch {
match ArchiveType::try_from(path.clone()) {
Some(ArchiveType::Conda) => {}
Some(ArchiveType::TarBz2) | None => {
return Err(anyhow::anyhow!(
"Only .conda packages are supported for repodata patches. Got: {}",
path
))
}
}
let repodata_patch_path = format!("noarch/{path}");
let repodata_patch_bytes = op.read(&repodata_patch_path).await?.to_bytes();
let reader = Cursor::new(repodata_patch_bytes);
let repodata_patch = repodata_patch_from_conda_package_stream(reader)?;
Some(repodata_patch)
} else {
None
};

let semaphore = Semaphore::new(max_parallel);
let semaphore = Arc::new(semaphore);

let tasks = subdirs
.iter()
.map(|subdir| {
tokio::spawn(index_subdir(
*subdir,
op.clone(),
force,
multi_progress.clone(),
semaphore.clone(),
))
})
.collect::<Vec<_>>();
try_join_all(tasks).await?;
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this was silently failing before because it only checked whether the join handles were okay but not whether the actual return values of the tasks were

let mut tasks = FuturesUnordered::new();
for subdir in subdirs.iter() {
let task = index_subdir(
*subdir,
op.clone(),
force,
repodata_patch
.as_ref()
.and_then(|p| p.subdirs.get(&subdir.to_string()).cloned()),
multi_progress.clone(),
semaphore.clone(),
);
tasks.push(tokio::spawn(task));
}

while let Some(join_result) = tasks.next().await {
match join_result {
Ok(Ok(_)) => {}
Ok(Err(e)) => {
tracing::error!("Failed to process subdir: {}", e);
tasks.clear();
return Err(e);
}
Err(join_err) => {
tracing::error!("Task panicked: {}", join_err);
tasks.clear();
return Err(anyhow::anyhow!("Task panicked: {}", join_err));
}
}
}
Ok(())
}
Loading
Loading
0