8000 feat(dal): Respect subscriptions when calculating action dependencies by jkeiser · Pull Request #6102 · systeminit/si · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(dal): Respect subscriptions when calculating action dependencies #6102

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions lib/dal-test/src/helpers/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,15 @@

use color_eyre::eyre::eyre;
use dal::{
AttributeValue,
Component,
ComponentId,
DalContext,
FuncId,
attribute::{
path::AttributePath,
value::subscription::ValueSubscription,
},
diagram::{
geometry::Geometry,
view::View,
Expand Down Expand Up @@ -38,6 +43,31 @@ pub async fn create(
)
}

/// Subscribe from one component's attribute to another component's attribute
pub async fn subscribe(
ctx: &mut DalContext,
subscriber: ComponentId,
path: impl Into<String>,
subscribed_to: ComponentId,
subscribed_to_path: &str,
) -> Result<()> {
let subscriber_root_id = Component::root_attribute_value_id(ctx, subscriber).await?;
let subscriber_av_id = AttributePath::from_json_pointer(path)
.resolve(ctx, subscriber_root_id)
.await?
.expect("attribute to exist");
AttributeValue::subscribe(
ctx,
subscriber_av_id,
ValueSubscription {
attribute_value_id: Component::root_attribute_value_id(ctx, subscribed_to).await?,
path: AttributePath::from_json_pointer(subscribed_to_path),
},
)
.await?;
Ok(())
}

/// Execute a the management function and apply the result to the component
pub async fn execute_management_func(
ctx: &DalContext,
Expand Down
6 changes: 6 additions & 0 deletions lib/dal/src/action.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,12 @@ pub mod prototype;
pub enum ActionError {
#[error("action prototype error: {0}")]
ActionPrototype(#[from] ActionPrototypeError),
#[error("attribute prototype error: {0}")]
AttributePrototype(#[from] crate::attribute::prototype::AttributePrototypeError),
#[error("attribute prototype argument error: {0}")]
AttributePrototypeArgument(
#[from] crate::attribute::prototype::argument::AttributePrototypeArgumentError,
),
#[error("AttributeValue error: {0}")]
AttributeValue(#[from] AttributeValueError),
#[error("Change Set error: {0}")]
Expand Down
56 changes: 36 additions & 20 deletions lib/dal/src/action/dependency_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,16 @@ use super::{
},
};
use crate::{
AttributePrototype,
AttributeValue,
Component,
ComponentId,
DalContext,
action::{
Action,
ActionId,
},
attribute::prototype::argument::AttributePrototypeArgument,
dependency_graph::DependencyGraph,
};

Expand Down Expand Up @@ -115,19 +118,16 @@ impl ActionDependencyGraph {
.or_insert_with(|| component_dependencies.add_node(component_id))
.to_owned();
for incoming_connection in component.incoming_connections(ctx).await? {
component_dependencies_index_by_id
// The edges of this graph go `output_socket_component (source) ->
// input_socket_component (target)`, matching the flow of the data between
// components.
let source_component_index = component_dependencies_index_by_id
.entry(incoming_connection.from_component_id)
.or_insert_with(|| {
component_dependencies.add_node(incoming_connection.from_component_id)
});
if let Some(&source_component_index) =
component_dependencies_index_by_id.get(&incoming_connection.from_component_id)
{
// The edges of this graph go `output_socket_component (source) ->
// input_socket_component (target)`, matching the flow of the data between
// components.
component_dependencies.update_edge(source_component_index, component_index, ());
}
})
.to_owned();
component_dependencies.update_edge(source_component_index, component_index, ());
if seen_list.insert(incoming_connection.from_component_id) {
components_to_process.push_back(incoming_connection.from_component_id);
}
Expand All @@ -136,23 +136,39 @@ impl ActionDependencyGraph {
.inferred_incoming_connections_for_component(ctx, component_id)
.await?
{
component_dependencies_index_by_id
// The edges of this graph go `output_socket_component (source) ->
// input_socket_component (target)`, matching the flow of the data between
// components.
let source_component_index = component_dependencies_index_by_id
.entry(inferred_connection.source_component_id)
.or_insert_with(|| {
component_dependencies.add_node(inferred_connection.source_component_id)
});
if let Some(&source_component_index) =
component_dependencies_index_by_id.get(&inferred_connection.source_component_id)
{
// The edges of this graph go `output_socket_component (source) ->
// input_socket_component (target)`, matching the flow of the data between
// components.
component_dependencies.update_edge(source_component_index, component_index, ());
}
})
.to_owned();
component_dependencies.update_edge(source_component_index, component_index, ());
if seen_list.insert(inferred_connection.source_component_id) {
components_to_process.push_back(inferred_connection.source_component_id);
}
}
for (_, apa_id) in Component::subscribers(ctx, component_id).await? {
let prototype_id = AttributePrototypeArgument::prototype_id(ctx, apa_id).await?;
if let Some(av_id) =
AttributePrototype::attribute_value_id(ctx, prototype_id).await?
{
let subscriber_id = AttributeValue::component_id(ctx, av_id).await?;
// The edges of this graph go `subscribed_to_component (source) ->
// subscriber_component (target)`, matching the flow of the data between
// components.
let subscriber_index = component_dependencies_index_by_id
.entry(subscriber_id)
.or_insert_with(|| component_dependencies.add_node(subscriber_id))
.to_owned();
component_dependencies.update_edge(component_index, subscriber_index, ());
if seen_list.insert(subscriber_id) {
components_to_process.push_back(subscriber_id);
}
}
}
}

// For each Component's Actions, mark them as depending on the Actions for the first dependency
Expand Down
35 changes: 27 additions & 8 deletions lib/dal/src/attribute/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,17 @@ use tokio::sync::{
TryLockError,
};

use super::prototype::argument::{
AttributePrototypeArgument,
AttributePrototypeArgumentError,
AttributePrototypeArgumentId,
static_value::StaticArgumentValue,
value_source::{
ValueSource,
ValueSourceError,
use super::{
path::AttributePath,
prototype::argument::{
AttributePrototypeArgument,
AttributePrototypeArgumentError,
AttributePrototypeArgumentId,
static_value::StaticArgumentValue,
value_source::{
ValueSource,
ValueSourceError,
},
},
};
use crate::{
Expand Down Expand Up @@ -2277,6 +2280,22 @@ impl AttributeValue {
Ok(())
}

pub async fn subscribers(
ctx: &DalContext,
attribute_value_id: AttributeValueId,
) -> AttributeValueResult<impl Iterator<Item = (AttributePath, AttributePrototypeArgumentId)>>
{
Ok(ctx
.workspace_snapshot()?
.edges_directed(attribute_value_id, Direction::Incoming)
.await?
.into_iter()
.filter_map(|(edge, source, _)| match edge.kind {
EdgeWeightKind::ValueSubscription(path) => Some((path, source.into())),
_ => None,
}))
}

pub async fn get_by_id(
ctx: &DalContext,
attribute_value_id: AttributeValueId,
Expand Down
11 changes: 11 additions & 0 deletions lib/dal/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ use crate::{
ActorViewError,
},
attribute::{
path::AttributePath,
prototype::{
AttributePrototypeError,
AttributePrototypeSource,
Expand Down Expand Up @@ -1321,6 +1322,16 @@ impl Component {
Ok(input_socket_ids)
}

/// Gets the list of subscriptions pointing at this root AV, returning the subscriber AV
/// as well as the path they are subscribed to.
pub async fn subscribers(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A doc comment here would be great, explaining what a Subscriber is

ctx: &DalContext,
component_id: ComponentId,
) -> ComponentResult<impl Iterator<Item = (AttributePath, AttributePrototypeArgumentId)>> {
let root_av_id = Self::root_attribute_value_id(ctx, component_id).await?;
Ok(AttributeValue::subscribers(ctx, root_av_id).await?)
}

pub async fn get_children_for_id(
ctx: &DalContext,
component_id: ComponentId,
Expand Down
Loading
0