8000 feat(dal): Allow arrays to subscribe to multiple values by jkeiser · Pull Request #6111 · systeminit/si · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat(dal): Allow arrays to subscribe to multiple values #6111

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and 8000 contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
May 15, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 32 additions & 14 deletions app/web/src/store/components.store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -295,10 +295,13 @@ type EventBusEvents = {
// "/domain/DomainConfig/blah.com/TTL": 3600
// }
//
// - UNSET a value using `{ "$source": "value" }`. The value will revert to using its default value.
// - UNSET a value using `{ "$source": null }`. The value will revert to using its default value.
// (NOTE: `{ "$source": {} }` unsets the value as well, allowing JS callers to construct the
// API call using `{ "$source": { value: myValueVariable } }``. If myValue is undefined, it
// will unset the value, but if it is null, it will set the value to null.
//
// {
// "/domain/Timeout": { "$source": "value" },
// "/domain/Timeout": { "$source": null },
// "/domain/DomainConfig/blah.com/TTL": { "$source": "value" }
// }
//
Expand All @@ -309,9 +312,9 @@ type EventBusEvents = {
// reverse order.*
//
// {
// "/domain/Tags/Environment": { "$source": "value" },
// "/domain/IpAddresses/2": { "$source": "value" },
// "/domain/IpAddresses/1": { "$source": "value" }
// "/domain/Tags/Environment": { "$source": null },
// "/domain/IpAddresses/2": { "$source": null },
// "/domain/IpAddresses/1": { "$source": null }
// }
//
// - SUBSCRIBE to another attribute's value: this will cause the value to always equal another
Expand All @@ -320,24 +323,36 @@ type EventBusEvents = {
//
// {
// "/domain/SubnetId": {
// "$source": "subscription",
// "component": "ComponentNameOrId",
// "path": "/resource/SubnetId"
// "$source": { "component": "ComponentNameOrId", "path": "/resource/SubnetId" }
// }
// }
//
// - ESCAPE HATCH for setting a value: setting an attribute to `{ "$source": "value", "value": <value> }`
// You may also APPEND a subscription by adding `keepExistingSubscriptions: true` to the
// subscription:
//
// {
// "/domain/SubnetId": {
// "$source": { "component": "ComponentNameOrId", "path": "/resource/SubnetId", keepExistingSubscriptions: true }
// }
// }
//
// If you do this, the subscription will be added to the list if it's not already there, and
// any other subscriptions will also be kept.
//
// - ESCAPE HATCH for setting a value: setting an attribute to `{ "$source": { "value": <value> } }`
// has the same behavior as all the above cases. The reason this exists is, if you happen to
// have an object whose keys are "$source" and "value", the existing interface would treat that
// have an object with a "$source" key, the existing interface would treat that as an error.
// This allows you to set that value anyway.
//
// This is a safer way to "escape" values if you are writing code that sets values generically
// without knowing their types and can avoid misinterpreted instructions or possibly even
// avoid injection attacks.
//
// {
// "/domain/Tags": {
// "$source": "value",
// "value": { "Environment": "Prod", "$source": "ThisTagIsActuallyNamed_$source" }
// "$source": {
// "value": { "$source": "ThisTagIsActuallyNamed_$source" }
// }
// }
// }
//
Expand All @@ -346,10 +361,13 @@ export type UpdateComponentAttributesArgs = Record<
AttributeSource
>;

// Things you can set an attribute to
// Set attribute to a subscription (another component's value feeds it)
type AttributeSourceSetSubscription = {
$source: { component: ComponentId | ComponentName; path: AttributePath };
$source: {
component: ComponentId | ComponentName;
path: AttributePath;
keepExistingSubscriptions?: boolean;
};
};
const isAttributeSourceSetSubscription = (
s: AttributeSource,
Expand Down
2 changes: 1 addition & 1 deletion lib/dal-materialized-views/src/component.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub async fn assemble(ctx: DalContext, component_id: ComponentId) -> crate::Resu
InputSocket::component_attribute_value_id(ctx, input_socket_id, component_id).await?;
let attribute_prototype_id = AttributeValue::prototype_id(ctx, attribute_value_id).await?;
let attribute_prototype_argument_ids =
AttributePrototype::list_arguments_for_id(ctx, attribute_prototype_id).await?;
AttributePrototype::list_arguments(ctx, attribute_prototype_id).await?;
for attribute_prototype_argument_id in attribute_prototype_argument_ids {
let attribute_prototype_argument =
AttributePrototypeArgument::get_by_id(ctx, attribute_prototype_argument_id).await?;
Expand Down
4 changes: 2 additions & 2 deletions lib/dal-materialized-views/src/incoming_connections.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async fn socket_to_socket(ctx: &DalContext, component_id: ComponentId) -> Result
let attribute_prototype_id =
AttributeValue::prototype_id(ctx, input_socket_attribute_value_id).await?;
let attribute_prototype_argument_ids =
AttributePrototype::list_arguments_for_id(ctx, attribute_prototype_id).await?;
AttributePrototype::list_arguments(ctx, attribute_prototype_id).await?;

// Don't bother gathering information to cache if there are no prototype arguments.
if attribute_prototype_argument_ids.is_empty() {
Expand Down Expand Up @@ -131,7 +131,7 @@ async fn prop_to_prop(ctx: &DalContext, component_id: ComponentId) -> Result<Vec
// connections are found.
let mut in_progress = Vec::new();
for attribute_prototype_argument_id in
AttributePrototype::list_arguments_for_id(ctx, attribute_prototype_id).await?
AttributePrototype::list_arguments(ctx, attribute_prototype_id).await?
{
if let Some(ValueSource::ValueSubscription(subscription)) =
AttributePrototypeArgument::value_source_opt(ctx, attribute_prototype_argument_id)
Expand Down
21 changes: 4 additions & 17 deletions lib/dal-test/src/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,6 @@ use dal::{
SchemaVariantId,
User,
UserPk,
attribute::{
path::AttributePath,
value::subscription::ValueSubscription,
},
audit_logging,
component::socket::{
ComponentInputSocket,
Expand All @@ -45,9 +41,12 @@ use names::{
use si_data_nats::async_nats::jetstream::stream::Stream;
use tokio::time::Instant;

mod change_set;
mod property_editor_test_view;

/// Test helpers for attribute values and prototypes
pub mod attribute;
/// Test helpers for change sets
pub mod change_set;
/// Test helpers for components
pub mod component;
/// Test helpers for schemas
Expand Down Expand Up @@ -494,15 +493,3 @@ pub async fn list_audit_logs_until_expected_number_of_rows(
"hit timeout before audit logs query returns expected number of rows (expected: {expected_number_of_rows}, actual: {actual_number_of_rows})"
))
}

/// Make a [`ValueSubscription`] for a given [`Component`] and path.
pub async fn make_subscription(
ctx: &DalContext,
component_id: dal::ComponentId,
json_pointer: impl Into<String>,
) -> Result<ValueSubscription> {
Ok(ValueSubscription {
attribute_value_id: Component::root_attribute_value_id(ctx, component_id).await?,
path: AttributePath::JsonPointer(json_pointer.into()),
})
}
2 changes: 2 additions & 0 deletions lib/dal-test/src/helpers/attribute.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
/// Helpers for attribute values
pub mod value;
190 changes: 190 additions & 0 deletions lib/dal-test/src/helpers/attribute/value.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
use color_eyre::eyre::eyre;
use dal::{
AttributeValue,
Component,
DalContext,
attribute::{
path::AttributePath,
value::subscription::ValueSubscription,
},
};
use si_id::{
AttributeValueId,
ComponentId,
};

use crate::{
Result,
helpers::component::ComponentKey,
};

///
/// Things that you can pass as attribute values (id, or (component, path))
///
#[allow(async_fn_in_trait)]
pub trait AttributeValueKey {
///
/// Get the AttributeValueId for this key
///
async fn lookup_attribute_value(self, ctx: &DalContext) -> Result<AttributeValueId>;
///
/// Get the AttributeValueId for this key, or None if it doesn't exist
///
async fn resolve_attribute_value(self, ctx: &DalContext) -> Result<Option<AttributeValueId>>;
///
/// Get the AttributeValueId for this key, *or create it* if it doesn't exist
///
async fn vivify_attribute_value(self, ctx: &DalContext) -> Result<AttributeValueId>;
///
/// Turn this into a subscription (resolves component but not av)
///
async fn to_subscription(self, ctx: &DalContext) -> Result<ValueSubscription>;
}
impl AttributeValueKey for AttributeValueId {
async fn lookup_attribute_value(self, _: &DalContext) -> Result<AttributeValueId> {
Ok(self)
}
async fn resolve_attribute_value(self, _: &DalContext) -> Result<Option<AttributeValueId>> {
Ok(Some(self))
}
async fn vivify_attribute_value(self, _: &DalContext) -> Result<AttributeValueId> {
Ok(self)
}
async fn to_subscription(self, ctx: &DalContext) -> Result<ValueSubscription> {
let (root, path) = AttributeValue::path_from_root(ctx, self).await?;
let path: &str = &path;
(root, path).to_subscription(ctx).await
}
}
impl AttributeValueKey for ValueSubscription {
async fn lookup_attribute_value(self, ctx: &DalContext) -> Result<AttributeValueId> {
self.resolve(ctx)
.await?
.ok_or(eyre!("Attribute value not found"))
}
async fn resolve_attribute_value(self, ctx: &DalContext) -> Result<Option<AttributeValueId>> {
Ok(self.resolve(ctx).await?)
}
async fn vivify_attribute_value(self, ctx: &DalContext) -> Result<AttributeValueId> {
Ok(self.path.vivify(ctx, self.attribute_value_id).await?)
}
async fn to_subscription(self, _: &DalContext) -> Result<ValueSubscription> {
Ok(self)
}
}
impl AttributeValueKey for (AttributeValueId, &str) {
async fn lookup_attribute_value(self, ctx: &DalContext) -> Result<AttributeValueId> {
self.to_subscription(ctx)
.await?
.lookup_attribute_value(ctx)
.await
}
async fn resolve_attribute_value(self, ctx: &DalContext) -> Result<Option<AttributeValueId>> {
self.to_subscription(ctx)
.await?
.resolve_attribute_value(ctx)
.await
}
async fn vivify_attribute_value(self, ctx: &DalContext) -> Result<AttributeValueId> {
self.to_subscription(ctx)
.await?
.vivify_attribute_value(ctx)
.await
}
async fn to_subscription(self, _: &DalContext) -> Result<ValueSubscription> {
Ok(ValueSubscription {
attribute_value_id: self.0,
path: AttributePath::from_json_pointer(self.1),
})
}
}
impl AttributeValueKey for (ComponentId, &str) {
async fn lookup_attribute_value(self, ctx: &DalContext) -> Result<AttributeValueId> {
self.to_subscription(ctx)
.await?
.lookup_attribute_value(ctx)
.await
}
async fn resolve_attribute_value(self, ctx: &DalContext) -> Result<Option<AttributeValueId>> {
self.to_subscription(ctx)
.await?
.resolve_attribute_value(ctx)
.await
}
async fn vivify_attribute_value(self, ctx: &DalContext) -> Result<AttributeValueId> {
self.to_subscription(ctx)
.await?
.vivify_attribute_value(ctx)
.await
}
async fn to_subscription(self, ctx: &DalContext) -> Result<ValueSubscription> {
let root_id = Component::root_attribute_value_id(ctx, self.0).await?;
(root_id, self.1).to_subscription(ctx).await
}
}
impl AttributeValueKey for (&str, &str) {
async fn lookup_attribute_value(self, ctx: &DalContext) -> Result<AttributeValueId> {
self.to_subscription(ctx)
.await?
.lookup_attribute_value(ctx)
.await
}
async fn resolve_attribute_value(self, ctx: &DalContext) -> Result<Option<AttributeValueId>> {
self.to_subscription(ctx)
.await?
.resolve_attribute_value(ctx)
.await
}
async fn vivify_attribute_value(self, ctx: &DalContext) -> Result<AttributeValueId> {
self.to_subscription(ctx)
.await?
.vivify_attribute_value(ctx)
.await
}
async fn to_subscription(self, ctx: &DalContext) -> Result<ValueSubscription> {
let component_id = self.0.lookup_component(ctx).await?;
(component_id, self.1).to_subscription(ctx).await
}
}

/// Set the subscriptions on a value
pub async fn subscribe<S: AttributeValueKey>(
ctx: &DalContext,
subscriber: impl AttributeValueKey,
subscriptions: impl IntoIterator<Item = S>,
) -> Result<()> {
let subscriber = subscriber.vivify_attribute_value(ctx).await?;
let mut converted_subscriptions = vec![];
for subscription in subscriptions {
converted_subscriptions.push(subscription.to_subscription(ctx).await?);
}
AttributeValue::set_to_subscriptions(ctx, subscriber, converted_subscriptions).await?;
Ok(())
}

/// Get the value
pub async fn get(ctx: &DalContext, av: impl AttributeValueKey) -> Result<serde_json::Value> {
let av_id = av.lookup_attribute_value(ctx).await?;
AttributeValue::view_by_id(ctx, av_id)
.await?
.ok_or(eyre!("Attribute value not found"))
}

/// Check whether the value exists and is set
pub async fn has_value(ctx: &DalContext, av: impl AttributeValueKey) -> Result<bool> {
match av.resolve_attribute_value(ctx).await? {
Some(av_id) => Ok(AttributeValue::view_by_id(ctx, av_id).await?.is_some()),
None => Ok(false),
}
}

/// Set a value (creates it if it doesn't exist)
pub async fn set(
ctx: &DalContext,
av: impl AttributeValueKey,
value: impl Into<serde_json::Value>,
) -> Result<()> {
let av_id = av.vivify_attribute_value(ctx).await?;
AttributeValue::update(ctx, av_id, Some(value.into())).await?;
Ok(())
}
6 changes: 6 additions & 0 deletions lib/dal-test/src/helpers/change_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ use dal::{

use crate::helpers::generate_fake_name;

/// First, this function performs a blocking commit which will return an error if
/// there are conflicts. Then, it updates the snapshot to the current visibility.
pub async fn commit(ctx: &mut DalContext) -> Result<()> {
ChangeSetTestHelpers::commit_and_update_snapshot_to_visibility(ctx).await
}

/// This unit struct providers helper functions for working with [`ChangeSets`](ChangeSet). It is
/// designed to centralize logic for test authors wishing to commit changes, fork, apply, abandon,
/// etc.
Expand Down
Loading
0