Description
A note for the community
- Please vote on this issue by adding a 👍 reaction to the original issue to help the community and maintainers prioritize this request
- If you are interested in working on this issue or have submitted a pull request, please leave a comment
Problem
Since upgrading to 0.31 (from 0.29 in the case of this instance), there has been a marked uptick in dropped events to my splunk_hec_logs sink, which is backed by a disk buffer. The error indicates the events are dropped due to an InvalidProtobufPayload error reading from disk.
2023-08-01T13:18:46.116488Z ERROR sink{component_kind="sink" component_id=staging_splunk_hec component_type=splunk_hec_logs component_name=staging_splunk_hec}: vector_buffers::internal_events: Error encountered during buffer read. error=failed to decoded record: InvalidProtobufPayload error_code="decode_failed" error_type="reader_failed" stage="processing" internal_log_rate_limit=true
I can't find it at the moment, but seem to remember another issue or discussion where the underlying protobuf library was now implementing a 4MB size limit and potentially truncating messages larger than that. Maybe that is also related?
Configuration
data_dir: /vector-data-dir
acknowledgements:
enabled: true
api:
enabled: true
address: 127.0.0.1:8686
playground: false
sources:
kafka_in:
type: kafka
bootstrap_servers: kafka-kafka-bootstrap.kafka:9093
group_id: '${KAFKA_CONSUMER_GROUP_ID}'
topics:
- ^[^_].+
librdkafka_options:
"topic.blacklist": "^strimzi.+"
decoding:
codec: json
sasl:
enabled: true
mechanism: SCRAM-SHA-512
username: '${KAFKA_CONSUMER_USERNAME}'
password: '${KAFKA_CONSUMER_PASSWORD}'
transforms:
msg_router:
type: route
inputs:
- kafka_in
route:
staging: includes(array!(.destinations), "staging")
# a few other routes
staging_filter:
type: filter
inputs:
- msg_router.staging
condition: .vector_metadata.exclude != true
staging_throttler:
type: sample
inputs:
- staging_filter
rate: 20 # 5%
staging_metadata:
type: remap
inputs:
- staging_throttler
source: |-
.host = .vector_metadata.node
if exists(.vector_metadata.host) {
.host = .vector_metadata.host
}
.splunk.metadata.index = .vector_metadata.index
.splunk.metadata.source = .vector_metadata.source
.splunk.metadata.sourcetype = .vector_metadata.sourcetype
sinks:
staging_splunk_hec:
type: splunk_hec_logs
inputs:
- staging_metadata
endpoint: https://hec.splunk.staging:8088
default_token: '${STAGING_HEC_TOKEN}'
encoding:
codec: text
index: '{{ splunk.metadata.index }}'
source: '{{ splunk.metadata.source }}'
sourcetype: '{{ splunk.metadata.sourcetype }}'
acknowledgements:
query_interval: 30
retry_limit: 60
request:
timeout_secs: 1200
retry_max_duration_secs: 300
concurrency: adaptive
buffer:
type: disk
max_size: 5368709120 # 5Gi
Version
vector 0.31.0 (x86_64-unknown-linux-gnu 0f13b22 2023-07-06 13:52:34.591204470)
Debug Output
No response
Example Data
No response
Additional Context
No response
References
No response