8000 chore: fix unstable unit test by tabVersion · Pull Request #8674 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

chore: fix unstable unit test #8674

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Mar 23, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions src/batch/src/executor/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,15 @@ pub fn gen_sorted_data(
batch_num: usize,
start: String,
step: u64,
offset: u64,
) -> Vec<DataChunk> {
let mut data_gen = FieldGeneratorImpl::with_number_sequence(
DataType::Int64,
Some(start),
Some(i64::MAX.to_string()),
0,
step,
offset,
)
.unwrap();
let mut ret = Vec::<DataChunk>::with_capacity(batch_num);
Expand Down
15 changes: 14 additions & 1 deletion src/common/src/field_generator/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,13 @@ pub trait NumericFieldRandomGenerator {

/// fields that can be continuously generated impl this trait
pub trait NumericFieldSequenceGenerator {
fn new(start: Option<String>, end: Option<String>, offset: u64, step: u64) -> Result<Self>
fn new(
start: Option<String>,
end: Option<String>,
offset: u64,
step: u64,
event_offset: u64,
) -> Result<Self>
where
Self: Sized;

Expand Down Expand Up @@ -93,37 +99,43 @@ impl FieldGeneratorImpl {
end: Option<String>,
split_index: u64,
split_num: u64,
offset: u64,
) -> Result<Self> {
match data_type {
DataType::Int16 => Ok(FieldGeneratorImpl::I16Sequence(I16SequenceField::new(
start,
end,
split_index,
split_num,
offset,
)?)),
DataType::Int32 => Ok(FieldGeneratorImpl::I32Sequence(I32SequenceField::new(
start,
end,
split_index,
split_num,
offset,
)?)),
DataType::Int64 => Ok(FieldGeneratorImpl::I64Sequence(I64SequenceField::new(
start,
end,
split_index,
split_num,
offset,
)?)),
DataType::Float32 => Ok(FieldGeneratorImpl::F32Sequence(F32SequenceField::new(
start,
end,
split_index,
split_num,
offset,
)?)),
DataType::Float64 => Ok(FieldGeneratorImpl::F64Sequence(F64SequenceField::new(
start,
end,
split_index,
split_num,
offset,
)?)),
_ => unimplemented!(),
}
Expand Down Expand Up @@ -265,6 +277,7 @@ mod tests {
Some("20".to_string()),
split_index,
split_num,
0,
)
.unwrap(),
);
Expand Down
14 changes: 9 additions & 5 deletions src/common/src/field_generator/numeric.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ where
end_option: Option<String>,
offset: u64,
step: u64,
event_offset: u64,
) -> Result<Self>
where
Self: Sized,
Expand All @@ -127,7 +128,9 @@ where
end,
offset,
step,
..Default::default()
cur: T::from(event_offset).ok_or_else(|| {
anyhow::anyhow!("event offset is too big, offset: {}", event_offset,)
})?,
})
}

Expand Down Expand Up @@ -194,7 +197,7 @@ mod tests {
#[test]
fn test_sequence_field_generator() {
let mut i16_field =
I16SequenceField::new(Some("5".to_string()), Some("10".to_string()), 0, 1).unwrap();
I16SequenceField::new(Some("5".to_string()), Some("10".to_string()), 0, 1, 0).unwrap();
for i in 5..=10 {
assert_eq!(i16_field.generate(), json!(i));
}
Expand Down Expand Up @@ -222,7 +225,8 @@ mod tests {
#[test]
fn test_sequence_datum_generator() {
let mut f32_field =
F32SequenceField::new(Some("5.0".to_string()), Some("10.0".to_string()), 0, 1).unwrap();
F32SequenceField::new(Some("5.0".to_string()), Some("10.0".to_string()), 0, 1, 0)
.unwrap();

for i in 5..=10 {
assert_eq!(
Expand All @@ -247,13 +251,13 @@ mod tests {
#[test]
fn test_sequence_field_generator_float() {
let mut f64_field =
F64SequenceField::new(Some("0".to_string()), Some("10".to_string()), 0, 1).unwrap();
F64SequenceField::new(Some("0".to_string()), Some("10".to_string()), 0, 1, 0).unwrap();
for i in 0..=10 {
assert_eq!(f64_field.generate(), json!(i as f64));
}

let mut f32_field =
F32SequenceField::new(Some("-5".to_string()), Some("5".to_string()), 0, 1).unwrap();
F32SequenceField::new(Some("-5".to_string()), Some("5".to_string()), 0, 1, 0).unwrap();
for i in -5..=5 {
assert_eq!(f32_field.generate(), json!(i as f32));
}
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/datagen/source/generator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -241,6 +241,7 @@ mod tests {
Some(end.to_string()),
split_index,
split_num,
0,
)
.unwrap(),
),
Expand All @@ -251,6 +252,7 @@ mod tests {
Some(end.to_string()),
split_index,
split_num,
0,
)
.unwrap(),
),
Expand Down
9 changes: 7 additions & 2 deletions src/connector/src/source/datagen/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ impl SplitReader for DatagenSplitReader {
let mut events_so_far = u64::default();
tracing::debug!("Splits for datagen found! {:?}", splits);

assert!(splits.len() == 1);
debug_assert!(splits.len() == 1);
let split = splits.into_iter().next().unwrap();
// TODO: currently, assume there's only on split in one reader
let split_id = split.id();
Expand Down Expand Up @@ -114,6 +114,7 @@ impl SplitReader for DatagenSplitReader {
&column.name,
split_index,
split_num,
events_so_far,
)?)
} else {
FieldDesc::Invisible
Expand Down Expand Up @@ -172,6 +173,7 @@ fn generator_from_data_type(
name: &String,
split_index: u64,
split_num: u64,
offset: u64,
) -> Result<FieldGeneratorImpl> {
let random_seed_key = format!("fields.{}.seed", name);
let random_seed: u64 = match fields_option_map
Expand Down Expand Up @@ -236,6 +238,7 @@ fn generator_from_data_type(
&format!("{}.{}", name, field_name),
split_index,
split_num,
offset,
)?;
Ok((field_name, gen))
})
Expand All @@ -251,6 +254,7 @@ fn generator_from_data_type(
&format!("{}._", name),
split_index,
split_num,
offset,
)?;
FieldGeneratorImpl::with_list(generator, length_value)
}
Expand All @@ -267,7 +271,8 @@ fn generator_from_data_type(
start_value,
end_value,
split_index,
split_num
split_num,
offset,
)
} else {
let min_key = format!("fields.{}.min", name);
Expand Down
1 change: 1 addition & 0 deletions src/stream/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,4 @@ workspace-hack = { path = "../workspace-hack" }
[dev-dependencies]
assert_matches = "1"
risingwave_hummock_test = { path = "../storage/hummock_test", features = ["test"] }
tracing-test = "0.2"
63 changes: 30 additions & 33 deletions src/stream/src/executor/source/source_executor.rs
< 10000 /tr>
Original file line number Diff line number Diff line change
Expand Up @@ -488,21 +488,21 @@ impl<S: StateStore> Debug for SourceExecutor<S> {

#[cfg(test)]
mod tests {
use std::sync::atomic::AtomicU64;

use std::time::Duration;

use maplit::{convert_args, hashmap};
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::{ColumnId, ConflictBehavior, Field, Schema, TableId};
use risingwave_common::catalog::{ColumnId, Field, Schema, TableId};
use risingwave_common::test_prelude::StreamChunkTestExt;
use risingwave_common::types::DataType;
use risingwave_common::util::sort_util::{ColumnOrder, OrderType};
use risingwave_connector::source::datagen::DatagenSplit;
use risingwave_pb::catalog::StreamSourceInfo;
use risingwave_pb::plan_common::PbRowFormatType;
use risingwave_source::connector_test_utils::create_source_desc_builder;
use risingwave_storage::memory::MemoryStateStore;
use tokio::sync::mpsc::unbounded_channel;
use tracing_test::traced_test;

use super::*;
use crate::executor::ActorContext;
Expand Down Expand Up @@ -600,6 +600,7 @@ mod tests {
);
}

#[traced_test]
#[tokio::test]
async fn test_split_change_mutation() {
let table_id = TableId::default();
Expand All @@ -615,9 +616,9 @@ mod tests {
};
let properties = convert_args!(hashmap!(
"connector" => "datagen",
"fields.v1.min" => "1",
"fields.v1.max" => "1000",
"fields.v1.seed" => "12345",
"fields.v1.kind" => "sequence",
"fields.v1.start" => "11",
"fields.v1.end" => "11111",
));

let source_desc_builder = create_source_desc_builder(
Expand Down Expand Up @@ -658,20 +659,7 @@ mod tests {
u64::MAX,
1,
);

let mut materialize = MaterializeExecutor::for_test(
Box::new(executor),
mem_state_store.clone(),
TableId::from(0x2333),
vec![ColumnOrder::new(0, OrderType::ascending())],
column_ids,
2,
Arc::new(AtomicU64::new(0)),
ConflictBehavior::NoCheck,
)
.await
.boxed()
.execute();
let mut handler = Box::new(executor).execute();

let init_barrier = Barrier::new_test_barrier(1).with_mutation(Mutation::Add {
adds: HashMap::new(),
Expand All @@ -687,11 +675,11 @@ mod tests {
});
barrier_tx.send(init_barrier).unwrap();

(materialize.next().await.unwrap().unwrap())
(handler.next().await.unwrap().unwrap())
.into_barrier()
.unwrap();

let mut ready_chunks = materialize.ready_chunks(10);
let mut ready_chunks = handler.ready_chunks(10);
let chunks = (ready_chunks.next().await.unwrap())
.into_iter()
.map(|msg| msg.unwrap().into_chunk().unwrap())
Expand All @@ -701,10 +689,10 @@ mod tests {
chunk_1,
StreamChunk::from_pretty(
" i
+ 533
+ 833
+ 738
+ 344",
+ 11
+ 14
+ 17
+ 20",
)
);

Expand All @@ -719,6 +707,11 @@ mod tests {
split_num: 3,
start_offset: None,
}),
SplitImpl::Datagen(DatagenSplit {
split_index: 2,
split_num: 3,
start_offset: None,
}),
];

let change_split_mutation =
Expand Down Expand Up @@ -751,18 +744,22 @@ mod tests {
let chunk_2 = StreamChunk::concat(chunks).sort_rows();
assert_eq!(
chunk_2,
// mixed from datagen split 0 and 1
// mixed from datagen split 0, 1 and 2
StreamChunk::from_pretty(
" i
+ 12
+ 13
+ 15
+ 16
+ 18
+ 19
+ 23
+ 26
+ 29
+ 201
+ 344
+ 425
+ 525
+ 533
+ 833",
+ 32",
)
);
tracing::debug!("chunk_2: {:?}", chunk_2.to_pretty_string());

let barrier = Barrier::new_test_barrier(3).with_mutation(Mutation::Pause);
barrier_tx.send(barrier).unwrap();
Expand Down
0