8000 fix(config): allow usage of metrics-only decoders in log sources by jorgehermo9 · Pull Request #21040 · vectordotdev/vector · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

fix(config): allow usage of metrics-only decoders in log sources #21040

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Log sources can now use metrics-only decoders such as the recently added `influxdb` decoder.

authors: jorgehermo9
32 changes: 22 additions & 10 deletions lib/vector-core/src/config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,20 +117,18 @@ pub struct SourceOutput {

impl SourceOutput {
/// Create a `SourceOutput` of the given data type that contains a single output `Definition`.
/// If the data type does not contain logs, the schema definition will be ignored.
/// Designed for use in log sources.
///
///
/// # Panics
///
/// Panics if `ty` does not contain [`DataType::Log`].
#[must_use]
pub fn new_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
assert!(ty.contains(DataType::Log));
pub fn new_maybe_logs(ty: DataType, schema_definition: schema::Definition) -> Self {
let schema_definition = ty
.contains(DataType::Log)
.then(|| Arc::new(schema_definition));

Self {
port: None,
ty,
schema_definition: Some(Arc::new(schema_definition)),
schema_definition,
}
}

Expand Down Expand Up @@ -573,7 +571,7 @@ mod test {
let definition = schema::Definition::empty_legacy_namespace()
.with_event_field(&owned_value_path!("zork"), Kind::bytes(), Some("zork"))
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);
let output = SourceOutput::new_logs(DataType::Log, definition);
let output = SourceOutput::new_maybe_logs(DataType::Log, definition);

let valid_event = LogEvent::from(Value::from(btreemap! {
"zork" => "norknoog",
Expand Down Expand Up @@ -619,7 +617,7 @@ mod test {
)
.with_event_field(&owned_value_path!("nork"), Kind::integer(), None);

let output = SourceOutput::new_logs(DataType::Log, definition);
let output = SourceOutput::new_maybe_logs(DataType::Log, definition);

let mut valid_event = LogEvent::from(Value::from(btreemap! {
"nork" => 32
Expand Down Expand Up @@ -673,4 +671,18 @@ mod test {
new_definition.assert_valid_for_event(&valid_event);
new_definition.assert_valid_for_event(&invalid_event);
}

#[test]
fn test_new_log_source_ignores_definition_with_metric_data_type() {
Copy link
Contributor Author
@jorgehermo9 jorgehermo9 Aug 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe these two tests naming could be improved, I'm open for suggestions on how to name them & test this behavior

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think these tests are good 👍

let definition = schema::Definition::any();
let output = SourceOutput::new_maybe_logs(DataType::Metric, definition);
assert_eq!(output.schema_definition(true), None);
}

#[test]
fn test_new_log_source_uses_definition_with_log_data_type() {
let definition = schema::Definition::any();
let output = SourceOutput::new_maybe_logs(DataType::Log, definition.clone());
assert_eq!(output.schema_definition(true), Some(definition));
}
}
8 changes: 4 additions & 4 deletions src/config/graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ mod test {
outputs: vec![match ty {
DataType::Metric => SourceOutput::new_metrics(),
DataType::Trace => SourceOutput::new_traces(),
_ => SourceOutput::new_logs(ty, Definition::any()),
_ => SourceOutput::new_maybe_logs(ty, Definition::any()),
}],
},
);
Expand Down Expand Up @@ -639,7 +639,7 @@ mod test {
graph.nodes.insert(
ComponentKey::from("foo.bar"),
Node::Source {
outputs: vec![SourceOutput::new_logs(
outputs: vec![SourceOutput::new_maybe_logs(
DataType::all_bits(),
Definition::any(),
)],
Expand All @@ -648,7 +648,7 @@ mod test {
graph.nodes.insert(
ComponentKey::from("foo.bar"),
Node::Source {
outputs: vec![SourceOutput::new_logs(
outputs: vec![SourceOutput::new_maybe_logs(
DataType::all_bits(),
Definition::any(),
)],
Expand Down Expand Up @@ -676,7 +676,7 @@ mod test {
graph.nodes.insert(
ComponentKey::from("baz.errors"),
Node::Source {
outputs: vec![SourceOutput::new_logs(
outputs: vec![SourceOutput::new_maybe_logs(
DataType::all_bits(),
Definition::any(),
)],
Expand Down
4 changes: 2 additions & 2 deletions src/config/unit_test/unit_test_components.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl SourceConfig for UnitTestSourceConfig {
}

fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
DataType::all_bits(),
schema::Definition::default_legacy_namespace(),
)]
Expand Down Expand Up @@ -103,7 +103,7 @@ impl SourceConfig for UnitTestStreamSourceConfig {
}

fn outputs(&self, _global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
DataType::all_bits(),
schema::Definition::default_legacy_namespace(),
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/amqp.rs
10000
Original file line numberDiff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ impl SourceConfig for AmqpSourceConfig {
None,
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_kinesis_firehose/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ impl SourceConfig for AwsKinesisFirehoseConfig {
None,
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_s3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ impl SourceConfig for AwsS3Config {
schema_definition = schema_definition.unknown_fields(Kind::bytes());
}

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/aws_sqs/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,7 @@ impl SourceConfig for AwsSqsConfig {
Some("timestamp"),
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
7 changes: 5 additions & 2 deletions src/sources/datadog_agent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -294,7 +294,7 @@ impl SourceConfig for DatadogAgentConfig {

if self.multiple_outputs {
if !self.disable_logs {
output.push(SourceOutput::new_logs(DataType::Log, definition).with_port(LOGS))
output.push(SourceOutput::new_maybe_logs(DataType::Log, definition).with_port(LOGS))
}
if !self.disable_metrics {
output.push(SourceOutput::new_metrics().with_port(METRICS))
Expand All @@ -303,7 +303,10 @@ impl SourceConfig for DatadogAgentConfig {
output.push(SourceOutput::new_traces().with_port(TRACES))
}
} else {
output.push(SourceOutput::new_logs(DataType::all_bits(), definition))
output.push(SourceOutput::new_maybe_logs(
DataType::all_bits(),
definition,
))
}
output
}
Expand Down
2 changes: 1 addition & 1 deletion src/sources/demo_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ impl SourceConfig for DemoLogsConfig {
Some("service"),
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
5 changes: 4 additions & 1 deletion src/sources/dnstap/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,10 @@ impl SourceConfig for DnstapConfig {
let schema_definition = self
.schema_definition(log_namespace)
.with_standard_vector_source_metadata();
vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
5 changes: 4 additions & 1 deletion src/sources/docker_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,10 @@ impl SourceConfig for DockerLogsConfig {
None,
);

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/exec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ impl SourceConfig for ExecConfig {
None,
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
5 changes: 4 additions & 1 deletion src/sources/file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,10 @@ impl SourceConfig for FileConfig {
None,
);

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/file_descriptors/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ fn outputs(
)
.with_standard_vector_source_metadata();

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
decoding.output_type(),
schema_definition,
)]
Expand Down
5 changes: 4 additions & 1 deletion src/sources/fluent/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,10 @@ impl SourceConfig for FluentConfig {
let log_namespace = global_log_namespace.merge(self.log_namespace);
let schema_definition = self.schema_definition(log_namespace);

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn resources(&self) -> Vec<Resource> {
Expand Down
5 changes: 4 additions & 1 deletion src/sources/gcp_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,7 +353,10 @@ impl SourceConfig for PubsubConfig {
None,
);

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/heroku_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,7 +196,7 @@ impl SourceConfig for LogplexConfig {
// There is a global and per-source `log_namespace` config.
// The source config overrides the global setting and is merged here.
let schema_def = self.schema_definition(global_log_namespace.merge(self.log_namespace));
vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_def,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/http_client/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ impl SourceConfig for HttpClientConfig {
.schema_definition(log_namespace)
.with_standard_vector_source_metadata();

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/http_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,7 +382,7 @@ impl SourceConfig for SimpleHttpConfig {

let schema_definition = self.schema_definition(log_namespace);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding
.as_ref()
.map(|d| d.output_type())
Expand Down
5 changes: 4 additions & 1 deletion src/sources/internal_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ impl SourceConfig for InternalLogsConfig {
let schema_definition =
self.schema_definition(global_log_namespace.merge(self.log_namespace));

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
5 changes: 4 additions & 1 deletion src/sources/journald.rs
Original file line number Diff line number Diff line change
Expand Up @@ -397,7 +397,10 @@ impl SourceConfig for JournaldConfig {
let schema_definition =
self.schema_definition(global_log_namespace.merge(self.log_namespace));

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,7 @@ impl SourceConfig for KafkaSourceConfig {
None,
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
5 changes: 4 additions & 1 deletion src/sources/kubernetes_logs/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,10 @@ impl SourceConfig for Config {
)
.with_standard_vector_source_metadata();

vec![SourceOutput::new_logs(DataType::Log, schema_definition)]
vec![SourceOutput::new_maybe_logs(
DataType::Log,
schema_definition,
)]
}

fn can_acknowledge(&self) -> bool {
Expand Down
2 changes: 1 addition & 1 deletion src/sources/logstash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ impl SourceConfig for LogstashConfig {
fn outputs(&self, global_log_namespace: LogNamespace) -> Vec<SourceOutput> {
// There is a global and per-source `log_namespace` config.
// The source config overrides the global setting and is merged here.
vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
DataType::Log,
self.schema_definition(global_log_namespace.merge(self.log_namespace)),
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ impl SourceConfig for NatsSourceConfig {
None,
);

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/opentelemetry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ impl SourceConfig for OpentelemetryConfig {
};

vec![
SourceOutput::new_logs(DataType::Log, schema_definition).with_port(LOGS),
SourceOutput::new_maybe_logs(DataType::Log, schema_definition).with_port(LOGS),
SourceOutput::new_traces().with_port(TRACES),
]
}
Expand Down
2 changes: 1 addition & 1 deletion src/sources/pulsar.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl SourceConfig for PulsarSourceConfig {
Kind::bytes(),
Some("producer_name"),
);
vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/redis/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl SourceConfig for RedisSourceConfig {
)
.with_standard_vector_source_metadata();

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding.output_type(),
schema_definition,
)]
Expand Down
2 changes: 1 addition & 1 deletion src/sources/socket/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,7 @@ impl SourceConfig for SocketConfig {
}
};

vec![SourceOutput::new_logs(
vec![SourceOutput::new_maybe_logs(
self.decoding().output_type(),
schema_definition,
)]
Expand Down
Loading
Loading
0