8000 refactor(streaming): only scan necessary columns in backfill by BugenZhao · Pull Request #8533 · risingwavelabs/risingwave · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

refactor(streaming): only scan necessary columns in backfill #8533

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 14, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 19 additions & 1 deletion dashboard/proto/gen/stream_plan.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 5 additions & 1 deletion proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,12 @@ message ChainNode {
uint32 table_id = 1;
// The schema of input stream, which will be used to build a MergeNode
repeated plan_common.Field upstream_fields = 2;
// Which columns from upstream are used in this Chain node.
// The columns from the upstream table to output.
// TODO: rename this field.
repeated uint32 upstream_column_indices = 3;
// The columns from the upstream table that'll be internally required by this chain node.
// TODO: This is currently only used by backfill table scan. We should also apply it to the upstream dispatcher (#4529).
repeated int32 upstream_column_ids = 8;
// Generally, the barrier needs to be rearranged during the MV creation process, so that data can
// be flushed to shared buffer periodically, instead of making the first epoch from batch query extra
// large. However, in some cases, e.g., shared state, the barrier cannot be rearranged in ChainNode.
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/catalog/column.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,11 @@ impl From<i32> for ColumnId {
Self::new(column_id)
}
}
impl From<&i32> for ColumnId {
fn from(column_id: &i32) -> Self {
Self::new(*column_id)
}
}

impl From<ColumnId> for i32 {
fn from(id: ColumnId) -> i32 {
Expand Down
12 changes: 12 additions & 0 deletions src/frontend/src/optimizer/plan_node/logical_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,18 @@ impl LogicalScan {
.collect()
}

/// Get the ids of the output columns and primary key columns.
pub fn output_and_pk_column_ids(&self) -> Vec<ColumnId> {
let mut ids = self.output_column_ids();
for column_order in self.primary_key() {
let id = self.table_desc().columns[column_order.column_index].column_id;
if !ids.contains(&id) {
ids.push(id);
}
}
ids
}

pub fn output_column_indices(&self) -> &[usize] {
&self.core.output_col_idx
}
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_index_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,8 @@ impl StreamNode for StreamIndexScan {
}

impl StreamIndexScan {
// TODO: this method is almost the same as `StreamTableScan::adhoc_to_stream_prost`, we should
// avoid duplication.
pub fn adhoc_to_stream_prost(&self) -> ProstStreamPlan {
use risingwave_pb::plan_common::*;
use risingwave_pb::stream_plan::*;
Expand All @@ -141,6 +143,14 @@ impl StreamIndexScan {

let stream_key = self.base.logical_pk.iter().map(|x| *x as u32).collect_vec();

let upstream_column_ids = match self.chain_type {
ChainType::Backfill => self.logical.output_and_pk_column_ids(),
ChainType::Chain | ChainType::Rearrange | ChainType::UpstreamOnly => {
self.logical.output_column_ids()
}
ChainType::ChainUnspecified => unreachable!(),
};

ProstStreamPlan {
fields: self.schema().to_prost(),
input: vec![
Expand Down Expand Up @@ -195,6 +205,7 @@ impl StreamIndexScan {
.iter()
.map(|&i| i as _)
.collect(),
upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(),
is_singleton: false,
table_desc: Some(self.logical.table_desc().to_protobuf()),
})),
Expand Down
11 changes: 11 additions & 0 deletions src/frontend/src/optimizer/plan_node/stream_table_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,16 @@ impl StreamTableScan {

let stream_key = self.logical_pk().iter().map(|x| *x as u32).collect_vec();

// The required columns from the table (both scan and upstream).
let upstream_column_ids = match self.chain_type {
ChainType::Chain | ChainType::Rearrange | ChainType::UpstreamOnly => {
self.logical.output_column_ids()
}
// For backfill, we additionally need the primary key columns.
ChainType::Backfill => self.logical.output_and_pk_column_ids(),
ChainType::ChainUnspecified => unreachable!(),
};

ProstStreamPlan {
fields: self.schema().to_prost(),
input: vec![
Expand Down Expand Up @@ -225,6 +235,7 @@ impl StreamTableScan {
.iter()
.map(|&i| i as _)
.collect(),
upstream_column_ids: upstream_column_ids.iter().map(|i| i.get_id()).collect(),
is_singleton: *self.distribution() == Distribution::Single,
// The table desc used by backfill executor
table_desc: Some(self.logical.table_desc().to_protobuf()),
Expand Down
14 changes: 14 additions & 0 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,20 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
pub fn pk_indices(&self) -> &[usize] {
&self.pk_indices
}

pub fn output_indices(&self) -> &[usize] {
&self.output_indices
}

/// Get the indices of the primary key columns in the output columns.
///
/// Returns `None` if any of the primary key columns is not in the output columns.
pub fn pk_in_output_indices(&self) -> Option<Vec<usize>> {
self.pk_indices
.iter()
.map(|&i| self.output_indices.iter().position(|&j| i == j))
.collect()
}
}

/// Point get
Expand Down
44 changes: 32 additions & 12 deletions src/stream/src/executor/backfill.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use either::Either;
use futures::stream::select_with_strategy;
use futures::{pin_mut, stream, StreamExt, TryStreamExt};
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::{Op, StreamChunk};
use risingwave_common::buffer::BitmapBuilder;
use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -104,10 +105,24 @@ where

#[try_stream(ok = Message, error = StreamExecutorError)]
async fn execute_inner(mut self) {
// Table storage primary key.
let table_pk_indices = self.table.pk_indices();
// The primary key columns, in the output columns of the table scan.
let pk_in_output_indices = self.table.pk_in_output_indices().unwrap();
let pk_order = self.table.pk_serializer().get_order_types();
let upstream_indices = self.upstream_indices;

// TODO: unify these two mappings if we make the upstream and table output the same.
// The columns to be forwarded to the downstream, in the upstream columns.
let downstream_in_upstream_indices = self.upstream_indices;
// The columns to be forwarded to the downstream, in the output columns of the table scan.
let downstream_in_output_indices = downstream_in_upstream_indices
.iter()
.map(|&i| {
self.table
.output_indices()
.iter()
.position(|&j| i == j)
.unwrap()
})
.collect_vec();

let mut upstream = self.upstream.execute();

Expand Down Expand Up @@ -139,7 +154,9 @@ where
// Forward messages directly to the downstream.
#[for_await]
for message in upstream {
if let Some(message) = Self::mapping_message(message?, &upstream_indices) {
if let Some(message) =
Self::mapping_message(message?, &downstream_in_upstream_indices)
{
yield message;
}
}
Expand Down Expand Up @@ -213,10 +230,10 @@ where
Self::mark_chunk(
chunk,
current_pos,
table_pk_indices,
&pk_in_output_indices,
pk_order,
),
&upstream_indices,
&downstream_in_upstream_indices,
));
}
}
Expand Down Expand Up @@ -255,7 +272,7 @@ where
processed_rows += chunk.cardinality() as u64;
yield Message::Chunk(Self::mapping_chunk(
chunk,
&upstream_indices,
&downstream_in_upstream_indices,
));
}

Expand All @@ -272,11 +289,14 @@ where
.last()
.unwrap()
.1
.project(table_pk_indices)
.project(&pk_in_output_indices)
.into_owned_row(),
);
processed_rows += chunk.cardinality() as u64;
yield Message::Chunk(Self::mapping_chunk(chunk, &upstream_indices));
yield Message::Chunk(Self::mapping_chunk(
chunk,
&downstream_in_output_indices,
));
}
}
}
Expand All @@ -293,7 +313,7 @@ where
// Forward messages directly to the downstream.
#[for_await]
for msg in upstream {
if let Some(msg) = Self::mapping_message(msg?, &upstream_indices) {
if let Some(msg) = Self::mapping_message(msg?, &downstream_in_upstream_indices) {
if let Some(barrier) = msg.as_barrier() {
self.progress.finish(barrier.epoch.curr);
}
Expand Down Expand Up @@ -360,7 +380,7 @@ where
fn mark_chunk(
chunk: StreamChunk,
current_pos: &OwnedRow,
table_pk_indices: PkIndicesRef<'_>,
pk_in_output_indices: PkIndicesRef<'_>,
pk_order: &[OrderType],
) -> StreamChunk {
let chunk = chunk.compact();
Expand All @@ -369,7 +389,7 @@ where
// Use project to avoid allocation.
for v in data.rows().map(|row| {
match row
.project(table_pk_indices)
.project(pk_in_output_indices)
.iter()
.zip_eq_fast(pk_order.iter())
.cmp_by(current_pos.iter(), |(x, order), y| {
Expand Down
8 changes: 6 additions & 2 deletions src/stream/src/from_proto/chain.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::catalog::{ColumnDesc, TableId, TableOption};
use risingwave_common::catalog::{ColumnDesc, ColumnId, TableId, TableOption};
use risingwave_common::util::sort_util::OrderType;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_pb::stream_plan::{ChainNode, ChainType};
Expand Down Expand Up @@ -98,7 +98,11 @@ impl ExecutorBuilder for ChainExecutorBuilder {
.iter()
.map(ColumnDesc::from)
.collect_vec();
let column_ids = column_descs.iter().map(|x| x.column_id).collect_vec();
let column_ids = node
.upstream_column_ids
.iter()
.map(ColumnId::from)
.collect_vec();

// Use indices based on full table instead of streaming executor output.
let pk_indices = table_desc
Expand Down
0