-
Notifications
You must be signed in to change notification settings - Fork 87
feat: Add support for repodata patching in rattler-index, fix silent failures #1129
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
e78df5a
impl
pavelzw afa31c1
.
pavelzw 29430c4
fail earlier
pavelzw 01a2da5
wip
pavelzw a1d4ac0
finish
pavelzw b71a227
fix
pavelzw 7bbb1a3
fix
pavelzw 5964509
fix
pavelzw d896ab4
fix
pavelzw 069fdf3
Merge branch 'main' into repodata-patching
pavelzw 4b716f9
fix docstring
pavelzw ef7aba6
Merge branch 'main' into repodata-patching
pavelzw 5894ce7
Merge branch 'main' into repodata-patching
pavelzw File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
8000
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -2,21 +2,24 @@ | |
//! files | ||
#![deny(missing_docs)] | ||
|
||
use anyhow::Result; | ||
use anyhow::{Context, Result}; | ||
use bytes::buf::Buf; | ||
use fs_err::{self as fs}; | ||
use futures::future::try_join_all; | ||
use futures::{stream::FuturesUnordered, StreamExt}; | ||
use fxhash::FxHashMap; | ||
use indicatif::{MultiProgress, ProgressBar, ProgressStyle}; | ||
use rattler_conda_types::{ | ||
package::{ArchiveType, IndexJson, PackageFile}, | ||
ChannelInfo, PackageRecord, Platform, RepoData, | ||
ChannelInfo, PackageRecord, PatchInstructions, Platform, RepoData, | ||
}; | ||
use rattler_networking::{Authentication, AuthenticationStorage}; | ||
use rattler_package_streaming::{read, seek}; | ||
use rattler_package_streaming::{ | ||
read, | ||
seek::{self, stream_conda_content}, | ||
}; | ||
use std::{ | ||
collections::{HashMap, HashSet}, | ||
io::{Cursor, Read}, | ||
io::{Cursor, Read, Seek}, | ||
path::{Path, PathBuf}, | ||
str::FromStr, | ||
sync::Arc, | ||
|
@@ -71,6 +74,46 @@ pub fn package_record_from_index_json<T: Read>( | |
Ok(package_record) | ||
} | ||
|
||
fn repodata_patch_from_conda_package_stream<'a>( | ||
package: impl Read + Seek + 'a, | ||
) -> anyhow::Result<rattler_conda_types::RepoDataPatch> { | ||
let mut subdirs = FxHashMap::default(); | ||
|
||
let mut content_reader = stream_conda_content(package)?; | ||
let entries = content_reader.entries()?; | ||
for entry in entries { | ||
let mut entry = entry?; | ||
if !entry.header().entry_type().is_file() { | ||
return Err(anyhow::anyhow!( | ||
"Expected repodata patch package to be a file" | ||
)); | ||
} | ||
let mut buf = Vec::new(); | ||
entry.read_to_end(&mut buf)?; | ||
let path = entry.path()?; | ||
let components = path.components().collect::<Vec<_>>(); | ||
let subdir = | ||
if components.len() == 2 && components[1].as_os_str() == "patch_instructions.json" { | ||
let subdir_str = components[0] | ||
.as_os_str() | ||
.to_str() | ||
.context("Could not convert OsStr to str")?; | ||
let _ = Platform::from_str(subdir_str)?; | ||
subdir_str.to_string() | ||
} else { | ||
return Err(anyhow::anyhow!( | ||
"Expected files of form <subdir>/patch_instructions.json, but found {}", | ||
path.display() | ||
)); | ||
}; | ||
|
||
let instructions: PatchInstructions = serde_json::from_slice(&buf)?; | ||
subdirs.insert(subdir, instructions); | ||
} | ||
|
||
Ok(rattler_conda_types::RepoDataPatch { subdirs }) | ||
} | ||
|
||
/// Extract the package record from a `.tar.bz2` package file. | ||
/// This function will look for the `info/index.json` file in the conda package | ||
/// and extract the package record from it. | ||
|
@@ -132,12 +175,17 @@ async fn index_subdir( | |
subdir: Platform, | ||
op: Operator, | ||
force: bool, | ||
repodata_patch: Option<PatchInstructions>, | ||
progress: Option<MultiProgress>, | ||
semaphore: Arc<Semaphore>, | ||
) -> Result<()> { | ||
let repodata_path = if repodata_patch.is_some() { | ||
format!("{subdir}/repodata_from_packages.json") | ||
} else { | ||
format!("{subdir}/repodata.json") | ||
}; | ||
let mut registered_packages: FxHashMap<String, PackageRecord> = HashMap::default(); | ||
if !force { | ||
let repodata_path = format!("{subdir}/repodata.json"); | ||
let repodata_bytes = op.read(&repodata_path).await; | ||
let repodata: RepoData = match repodata_bytes { | ||
Ok(bytes) => serde_json::from_slice(&bytes.to_vec())?, | ||
|
@@ -210,7 +258,7 @@ async fn index_subdir( | |
.cloned() | ||
.collect::<Vec<_>>(); | ||
|
||
tracing::debug!( | ||
tracing::info!( | ||
"Adding {} packages to subdir {}.", | ||
packages_to_add.len(), | ||
subdir | ||
|
@@ -229,53 +277,79 @@ async fn index_subdir( | |
.progress_chars("##-"); | ||
pb.set_style(sty); | ||
|
||
let tasks = packages_to_add | ||
.iter() | ||
.map(|filename| { | ||
tokio::spawn({ | ||
let op = op.clone(); | ||
let filename = filename.clone(); | ||
let pb = pb.clone(); | ||
let semaphore = semaphore.clone(); | ||
{ | ||
async move { | ||
let _permit = semaphore | ||
.acquire() | ||
.await | ||
.expect("Semaphore was unexpectedly closed"); | ||
pb.set_message(format!( | ||
"Indexing {} {}", | ||
subdir.as_str(), | ||
console::style(filename.clone()).dim() | ||
)); | ||
let file_path = format!("{subdir}/{filename}"); | ||
let buffer = op.read(&file_path).await?; | ||
let reader = buffer.reader(); | ||
// We already know it's not None | ||
let archive_type = ArchiveType::try_from(&filename).unwrap(); | ||
let record = match archive_type { | ||
ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader), | ||
ArchiveType::Conda => package_record_from_conda_reader(reader), | ||
}?; | ||
pb.inc(1); | ||
Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record)) | ||
} | ||
let mut tasks = FuturesUnordered::new(); | ||
for filename in packages_to_add.iter() { | ||
let task = { | ||
let op = op.clone(); | ||
let filename = filename.clone(); | ||
let pb = pb.clone(); | ||
let semaphore = semaphore.clone(); | ||
{ | ||
async move { | ||
let _permit = semaphore | ||
.acquire() | ||
.await | ||
.expect("Semaphore was unexpectedly closed"); | ||
pb.set_message(format!( | ||
"Indexing {} {}", | ||
subdir.as_str(), | ||
console::style(filename.clone()).dim() | ||
)); | ||
let file_path = format!("{subdir}/{filename}"); | ||
let buffer = op.read(&file_path).await?; | ||
let reader = buffer.reader(); | ||
// We already know it's not None | ||
let archive_type = ArchiveType::try_from(&filename).unwrap(); | ||
let record = match archive_type { | ||
ArchiveType::TarBz2 => package_record_from_tar_bz2_reader(reader), | ||
ArchiveType::Conda => package_record_from_conda_reader(reader), | ||
}?; | ||
pb.inc(1); | ||
Ok::<(String, PackageRecord), std::io::Error>((filename.clone(), record)) | ||
} | ||
}) | ||
}) | ||
.collect::<Vec<_>>(); | ||
let results = try_join_all(tasks).await?; | ||
|
||
pb.finish_with_message(format!("Finished {}", subdir.as_str())); | ||
} | ||
}; | ||
tasks.push(tokio::spawn(task)); | ||
} | ||
let mut results = Vec::new(); | ||
while let Some(join_result) = tasks.next().await { | ||
match join_result { | ||
Ok(Ok(result)) => results.push(result), | ||
Ok(Err(e)) => { | ||
tasks.clear(); | ||
tracing::error!("Failed to process package: {}", e); | ||
pb.abandon_with_message(format!( | ||
"{} {}", | ||
console::style("Failed to index").red(), | ||
console::style(subdir.as_str()).dim() | ||
)); | ||
return Err(e.into()); | ||
} | ||
Err(join_err) => { | ||
tasks.clear(); | ||
tracing::error!("Task panicked: {}", join_err); | ||
pb.abandon_with_message(format!( | ||
"{} {}", | ||
console::style("Failed to index").red(), | ||
console::style(subdir.as_str()).dim() | ||
)); | ||
return Err(anyhow::anyhow!("Task panicked: {}", join_err)); | ||
} | ||
} | ||
} | ||
pb.finish_with_message(format!( | ||
"{} {}", | ||
console::style("Finished").green(), | ||
subdir.as_str() | ||
)); | ||
|
||
tracing::debug!( | ||
tracing::info!( | ||
"Successfully added {} packages to subdir {}.", | ||
results.len(), | ||
subdir | ||
); | ||
|
||
for result in results { | ||
let (filename, record) = result?; | ||
for (filename, record) in results { | ||
registered_packages.insert(filename, record); | ||
} | ||
|
||
|
@@ -304,26 +378,46 @@ async fn index_subdir( | |
version: Some(2), | ||
}; | ||
|
||
let repodata_path = format!("{subdir}/repodata.json"); | ||
tracing::info!("Writing repodata to {}", repodata_path); | ||
let repodata_bytes = serde_json::to_vec(&repodata)?; | ||
op.write(&repodata_path, repodata_bytes).await?; | ||
|
||
if let Some(instructions) = repodata_patch { | ||
let patched_repodata_path = format!("{subdir}/repodata.json"); | ||
tracing::info!("Writing patched repodata to {}", patched_repodata_path); | ||
let mut patched_repodata = repodata.clone(); | ||
patched_repodata.apply_patches(&instructions); | ||
let patched_repodata_bytes = serde_json::to_vec(&patched_repodata)?; | ||
op.write(&patched_repodata_path, patched_repodata_bytes) | ||
.await?; | ||
} | ||
// todo: also write repodata.json.bz2, repodata.json.zst, repodata.json.jlap and sharded repodata once available in rattler | ||
// https://github.com/conda/rattler/issues/1096 | ||
|
||
Ok(()) | ||
} | ||
|
||
/// Create a new `repodata.json` for all packages in the channel at the given directory. | ||
#[allow(clippy::too_many_arguments)] | ||
pub async fn index_fs( | ||
channel: impl Into<PathBuf>, | ||
target_platform: Option<Platform>, | ||
repodata_patch: Option<String>, | ||
force: bool, | ||
max_parallel: usize, | ||
multi_progress: Option<MultiProgress>, | ||
) -> anyhow::Result<()> { | ||
let mut config = FsConfig::default(); | ||
config.root = Some(channel.into().canonicalize()?.to_string_lossy().to_string()); | ||
index(target_platform, config, force, max_parallel, multi_progress).await | ||
index( | ||
target_platform, | ||
config, | ||
repodata_patch, | ||
force, | ||
max_parallel, | ||
multi_progress, | ||
) | ||
.await | ||
} | ||
|
||
/// Create a new `repodata.json` for all packages in the channel at the given S3 URL. | ||
|
@@ -337,6 +431,7 @@ pub async fn index_s3( | |
secret_access_key: Option<String>, | ||
session_token: Option<String>, | ||
target_platform: Option<Platform>, | ||
repodata_patch: Option<String>, | ||
force: bool, | ||
max_parallel: usize, | ||
multi_progress: Option<MultiProgress>, | ||
|
@@ -376,6 +471,7 @@ pub async fn index_s3( | |
index( | ||
target_platform, | ||
s3_config, | ||
repodata_patch, | ||
force, | ||
max_parallel, | ||
multi_progress, | ||
|
@@ -398,6 +494,7 @@ pub async fn index_s3( | |
pub async fn index<T: Configurator>( | ||
target_platform: Option<Platform>, | ||
config: T, | ||
repodata_patch: Option<String>, | ||
force: bool, | ||
max_parallel: usize, | ||
multi_progress: Option<MultiProgress>, | ||
|
@@ -443,22 +540,57 @@ pub async fn index<T: Configurator>( | |
subdirs.insert(Platform::NoArch); | ||
} | ||
|
||
let repodata_patch = if let Some(path) = repodata_patch { | ||
match ArchiveType::try_from(path.clone()) { | ||
Some(ArchiveType::Conda) => {} | ||
Some(ArchiveType::TarBz2) | None => { | ||
return Err(anyhow::anyhow!( | ||
"Only .conda packages are supported for repodata patches. Got: {}", | ||
path | ||
)) | ||
} | ||
} | ||
let repodata_patch_path = format!("noarch/{path}"); | ||
let repodata_patch_bytes = op.read(&repodata_patch_path).await?.to_bytes(); | ||
let reader = Cursor::new(repodata_patch_bytes); | ||
let repodata_patch = repodata_patch_from_conda_package_stream(reader)?; | ||
Some(repodata_patch) | ||
} else { | ||
None | ||
}; | ||
|
||
let semaphore = Semaphore::new(max_parallel); | ||
let semaphore = Arc::new(semaphore); | ||
|
||
let tasks = subdirs | ||
.iter() | ||
.map(|subdir| { | ||
tokio::spawn(index_subdir( | ||
*subdir, | ||
op.clone(), | ||
force, | ||
multi_progress.clone(), | ||
semaphore.clone(), | ||
)) | ||
}) | ||
.collect::<Vec<_>>(); | ||
try_join_all(tasks).await?; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this was silently failing before because it only checked whether the join handles were okay but not whether the actual return values of the tasks were |
||
let mut tasks = FuturesUnordered::new(); | ||
for subdir in subdirs.iter() { | ||
let task = index_subdir( | ||
*subdir, | ||
op.clone(), | ||
force, | ||
repodata_patch | ||
.as_ref() | ||
.and_then(|p| p.subdirs.get(&subdir.to_string()).cloned()), | ||
multi_progress.clone(), | ||
semaphore.clone(), | ||
); | ||
tasks.push(tokio::spawn(task)); | ||
} | ||
|
||
while let Some(join_result) = tasks.next().await { | ||
match join_result { | ||
Ok(Ok(_)) => {} | ||
Ok(Err(e)) => { | ||
tracing::error!("Failed to process subdir: {}", e); | ||
tasks.clear(); | ||
return Err(e); | ||
} | ||
Err(join_err) => { | ||
tracing::error!("Task panicked: {}", join_err); | ||
tasks.clear(); | ||
return Err(anyhow::anyhow!("Task panicked: {}", join_err)); | ||
} | ||
} | ||
} | ||
Ok(()) | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is not perfect as the progress bar can get overwritten by other tasks that are still executed after this failure. haven't seen a nice way to avoid this behavior
i think we can keep it for now like this since the error messages are printed anyway and i expect this script to be executed in non-interactive environments 5D40 anyway most of the time