8000 feat: support send multivalue metrics to sls by Abingcbc · Pull Request #2233 · alibaba/loongcollector · GitHub
[go: up one dir, main page]
More Web Proxy on the site http://driver.im/
Skip to content

feat: support send multivalue metrics to sls #2233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Jun 9, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
438 changes: 283 additions & 155 deletions core/collection_pipeline/serializer/SLSSerializer.cpp

Large diffs are not rendered by default.

40 changes: 40 additions & 0 deletions core/collection_pipeline/serializer/SLSSerializer.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,55 @@
#include <vector>

#include "collection_pipeline/serializer/Serializer.h"
#include "protobuf/sls/LogGroupSerializer.h"

namespace logtail {

struct MetricEventContentCacheItem {
std::vector<std::string> mMetricEventContentCache;
size_t mLabelSize = 0;
};

class SLSEventGroupSerializer : public Serializer<BatchedEvents> {
public:
SLSEventGroupSerializer(Flusher* f) : Serializer<BatchedEvents>(f) {}

private:
bool Serialize(BatchedEvents&& p, std::string& res, std::string& errorMsg) override;

void CalculateLogEventSize(const BatchedEvents& group,
size_t& logGroupSZ,
std::vector<size_t>& logSZ,
bool enableNs) const;
void CalculateMetricEventSize(const BatchedEvents& group,
size_t& logGroupSZ,
std::vector<MetricEventContentCacheItem>& metricEventContentCache,
std::vector<size_t>& logSZ) const;
void CalculateSpanEventSize(const BatchedEvents& group,
size_t& logGroupSZ,
std::vector<std::array<std::string, 6>>& spanEventContentCache,
std::vector<size_t>& logSZ) const;
void CalculateRawEventSize(const BatchedEvents& group,
size_t& logGroupSZ,
std::vector<size_t>& logSZ,
bool enableNs) const;

void SerializeLogEvent(LogGroupSerializer& serializer,
const BatchedEvents& group,
std::vector<size_t>& logSZ,
bool enableNs) const;
void SerializeMetricEvent(LogGroupSerializer& serializer,
BatchedEvents& group,
std::vector<MetricEventContentCacheItem>& metricEventContentCache,
std::vector<size_t>& logSZ) const;
void SerializeSpanEvent(LogGroupSerializer& serializer,
const BatchedEvents& group,
std::vector<std::array<std::string, 6>>& spanEventContentCache,
std::vector<size_t>& logSZ) const;
void SerializeRawEvent(LogGroupSerializer& serializer,
const BatchedEvents& group,
std::vector<size_t>& logSZ,
bool enableNs) const;
};

struct CompressedLogGroup {
Expand Down
2 changes: 1 addition & 1 deletion core/models/MetricValue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ std::map<StringView, UntypedMultiDoubleValue>::const_iterator UntypedMultiDouble
return mValues.end();
}

size_t UntypedMultiDoubleValues::ValusSize() const {
size_t UntypedMultiDoubleValues::ValuesSize() const {
return mValues.size();
}

Expand Down
2 changes: 1 addition & 1 deletion core/models/MetricValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ struct UntypedMultiDoubleValues {

std::map<StringView, UntypedMultiDoubleValue>::const_iterator ValuesBegin() const;
std::map<StringView, UntypedMultiDoubleValue>::const_iterator ValuesEnd() const;
size_t ValusSize() const;
size_t ValuesSize() const;

size_t DataSize() const;
void ResetPipelineEvent(PipelineEvent* ptr) { mMetricEventPtr = ptr; }
Expand Down
1 change: 1 addition & 0 deletions core/plugin/flusher/sls/DiskBufferWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,6 +914,7 @@ SLSResponse DiskBufferWriter::SendBufferFileData(const sls_logs::LogtailBufferMe
= bufferMeta.has_telemetrytype() ? bufferMeta.telemetrytype() : sls_logs::SLS_TELEMETRY_TYPE_LOGS;
switch (telemetryType) {
case sls_logs::SLS_TELEMETRY_TYPE_LOGS:
case sls_logs::SLS_TELEMETRY_TYPE_METRICS_MULTIVALUE:
return PostLogStoreLogs(accessKeyId,
accessKeySecret,
type,
Expand Down
58 changes: 53 additions & 5 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ shared_ptr<ConcurrencyLimiter> FlusherSLS::GetLogstoreConcurrencyLimiter(const s
auto limiter = iter->second.lock();
if (!limiter) {
limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key,
AppConfig::GetInstance()->GetSendRequestConcurrency());
AppConfig::GetInstance()->GetSendRequestConcurrency());
iter->second = limiter;
}
return limiter;
Expand All @@ -153,7 +153,7 @@ shared_ptr<ConcurrencyLimiter> FlusherSLS::GetProjectConcurrencyLimiter(const st
auto limiter = iter->second.lock();
if (!limiter) {
limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project,
AppConfig::GetInstance()->GetSendRequestConcurrency());
AppConfig::GetInstance()->GetSendRequestConcurrency());
iter->second = limiter;
}
return limiter;
Expand Down Expand Up @@ -313,6 +313,10 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
// TelemetryType set to metrics
mTelemetryType = BOOL_FLAG(enable_metricstore_channel) ? sls_logs::SLS_TELEMETRY_TYPE_METRICS
: sls_logs::SLS_TELEMETRY_TYPE_LOGS;
} else if (telemetryType == "metrics_multivalue") {
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_METRICS_MULTIVALUE;
} else if (telemetryType == "metrics_host") {
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_METRICS_HOST;
} else if (telemetryType == "arms_agentinfo") {
mSubpath = APM_AGENTINFOS_URL;
mTelemetryType = sls_logs::SLS_TELEMETRY_TYPE_APM_AGENTINFOS;
Expand All @@ -336,7 +340,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
}

// Logstore
if (mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS) {
if (IsRawSLSTelemetryType()) {
// log and metric
if (!GetMandatoryStringParam(config, "Logstore", mLogstore, errorMsg)) {
PARAM_ERROR_RETURN(mContext->GetLogger(),
Expand Down Expand Up @@ -504,8 +508,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
if (!mBatcher.Init(itr ? *itr : Json::Value(),
this,
strategy,
!mContext->IsExactlyOnceEnabled() && mShardHashKeys.empty()
&& mTelemetryType != sls_logs::SLS_TELEMETRY_TYPE_METRICS)) {
!mContext->IsExactlyOnceEnabled() && mShardHashKeys.empty() && IsMetricsTelemetryType())) {
// when either exactly once is enabled or ShardHashKeys is not empty or telemetry type is metrics, we don't
// enable group batch
return false;
Expand Down Expand Up @@ -670,8 +673,12 @@ bool FlusherSLS::BuildRequest(SenderQueueItem* item, unique_ptr<HttpSinkRequest>

switch (mTelemetryType) {
case sls_logs::SLS_TELEMETRY_TYPE_LOGS:
case sls_logs::SLS_TELEMETRY_TYPE_METRICS_MULTIVALUE:
req = CreatePostLogStoreLogsRequest(accessKeyId, accessKeySecret, type, data);
break;
case sls_logs::SLS_TELEMETRY_TYPE_METRICS_HOST:
req = CreatePostHostMetricsRequest(accessKeyId, accessKeySecret, type, data);
break;
case sls_logs::SLS_TELEMETRY_TYPE_METRICS:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SLS_TELEMETRY_TYPE_APM_AGENTINFOS jq

req = CreatePostMetricStoreLogsRequest(accessKeyId, accessKeySecret, type, data);
break;
Expand Down Expand Up @@ -1214,6 +1221,36 @@ unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostLogStoreLogsRequest(const stri
CurlSocket(INT32_FLAG(sls_request_dscp)));
}

unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostHostMetricsRequest(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
SLSSenderQueueItem* item) const {
string path, query;
map<string, string> header;
PreparePostHostMetricsRequest(accessKeyId,
accessKeySecret,
type,
CompressTypeToString(mCompressor->GetCompressType()),
item->mType,
item->mData,
item->mRawSize,
path,
header);
bool httpsFlag = SLSClientManager::GetInstance()->UsingHttps(mRegion);
return make_unique<HttpSinkRequest>(HTTP_POST,
httpsFlag,
item->mCurrentHost,
httpsFlag ? 443 : 80,
path,
query,
header,
item->mData,
item,
INT32_FLAG(default_http_request_timeout_sec),
1,
CurlSocket(INT32_FLAG(sls_request_dscp)));
}

unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostMetricStoreLogsRequest(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
Expand Down Expand Up @@ -1283,6 +1320,17 @@ unique_ptr<HttpSinkRequest> FlusherSLS::CreatePostAPMBackendRequest(const string
CurlSocket(INT32_FLAG(sls_request_dscp)));
}

bool FlusherSLS::IsRawSLSTelemetryType() const {
return mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_LOGS || mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS
|| mTelemetryType == sls_logs::SLS_TELEMETRY_TYPE_METRICS_MULTIVALUE;
}

bool FlusherSLS::IsMetricsTelemetryType() const {
return mTelemetryType != sls_logs::SLS_TELEMETRY_TYPE_METRICS
&& mTelemetryType != sls_logs::SLS_TELEMETRY_TYPE_METRICS_MULTIVALUE
&& mTelemetryType != sls_logs::SLS_TELEMETRY_TYPE_METRICS_HOST;
}

sls_logs::SlsCompressType ConvertCompressType(CompressType type) {
sls_logs::SlsCompressType compressType = sls_logs::SLS_CMP_NONE;
switch (type) {
Expand Down
6 changes: 6 additions & 0 deletions core/plugin/flusher/sls/FlusherSLS.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ class FlusherSLS : public HttpFlusher {
const std::string& accessKeySecret,
SLSClientManager::AuthType type,
SLSSenderQueueItem* item) const;
std::unique_ptr<HttpSinkRequest> CreatePostHostMetricsRequest(const std::string& accessKeyId,
const std::string& accessKeySecret,
SLSClientManager::AuthType type,
SLSSenderQueueItem* item) const;
std::unique_ptr<HttpSinkRequest> CreatePostMetricStoreLogsRequest(const std::string& accessKeyId,
const std::string& accessKeySecret,
SLSClientManager::AuthType type,
Expand All @@ -137,6 +141,8 @@ class FlusherSLS : public HttpFlusher {
SLSClientManager::AuthType type,
SLSSenderQueueItem* item,
const std::string& subPath) const;
bool IsRawSLSTelemetryType() const;
bool IsMetricsTelemetryType() const;

std::string mSubpath;

Expand Down
36 changes: 36 additions & 0 deletions core/plugin/flusher/sls/SLSClientManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,42 @@ void PreparePostLogStoreLogsRequest(const string& accessKeyId,
header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature;
}

void PreparePostHostMetricsRequest(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
const string& compressType,
RawDataType dataType,
const string& body,
size_t rawSize,
string& path,
map<string, string>& header) {
path = HOSTMETRICS;

header[USER_AGENT] = SLSClientManager::GetInstance()->GetUserAgent();
header[DATE] = GetDateString();
header[CONTENT_TYPE] = TYPE_LOG_PROTOBUF;
header[CONTENT_LENGTH] = to_string(body.size());
header[CONTENT_MD5] = CalcMD5(body);
header[X_LOG_APIVERSION] = LOG_API_VERSION;
header[X_LOG_SIGNATUREMETHOD] = HMAC_SHA1;
if (!compressType.empty()) {
header[X_LOG_COMPRESSTYPE] = compressType;
}
if (dataType == RawDataType::EVENT_GROUP) {
header[X_LOG_BODYRAWSIZE] = to_string(rawSize);
} else {
header[X_LOG_BODYRAWSIZE] = to_string(body.size());
header[X_LOG_MODE] = LOG_MODE_BATCH_GROUP;
}
if (type == SLSClientManager::AuthType::ANONYMOUS) {
header[X_LOG_KEYPROVIDER] = MD5_SHA1_SALT_KEYPROVIDER;
}

map<string, string> parameterList;
string signature = GetUrlSignature(HTTP_POST, path, header, parameterList, body, accessKeySecret);
header[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature;
}

void PreparePostMetricStoreLogsRequest(const string& accessKeyId,
const string& accessKeySecret,
SLSClientManager::AuthType type,
Expand Down
9 changes: 9 additions & 0 deletions core/plugin/flusher/sls/SLSClientManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ void PreparePostLogStoreLogsRequest(const std::string& accessKeyId,
std::string& path,
std::string& query,
std::map<std::string, std::string>& header);
void PreparePostHostMetricsRequest(const std::string& accessKeyId,
const std::string& accessKeySecret,
SLSClientManager::AuthType type,
const std::string& compressType,
RawDataType dataType,
const std::string& body,
size_t rawSize,
std::string& path,
std::map<std::string, std::string>& header);
void PreparePostMetricStoreLogsRequest(const std::string& accessKeyId,
const std::string& accessKeySecret,
SLSClientManager::AuthType type,
Expand Down
1 change: 1 addition & 0 deletions core/plugin/flusher/sls/SLSConstant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ namespace logtail {
const string LOGSTORES = "/logstores";
const string METRICSTORES = "/prometheus";
const string HEALTH = "/health";
const string HOSTMETRICS = "/hostmetrics";

const string APM_METRICS_URL = "/apm/metric/arms/v1/metric_log/metricstore-apm-metrics";
const string APM_TRACES_URL = "/apm/trace/arms/v1/trace_log/logstore-tracing";
Expand Down
1 change: 1 addition & 0 deletions core/plugin/flusher/sls/SLSConstant.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ namespace logtail {
extern const std::string LOGSTORES;
extern const std::string METRICSTORES;
extern const std::string HEALTH;
extern const std::string HOSTMETRICS;

extern const std::string APM_METRICS_URL;
extern const std::string APM_TRACES_URL;
Expand Down
2 changes: 2 additions & 0 deletions core/protobuf/sls/sls_logs.proto
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ enum SlsTelemetryType
SLS_TELEMETRY_TYPE_APM_METRICS = 2;
SLS_TELEMETRY_TYPE_APM_TRACES = 3;
SLS_TELEMETRY_TYPE_APM_AGENTINFOS = 4;
SLS_TELEMETRY_TYPE_METRICS_MULTIVALUE = 5;
SLS_TELEMETRY_TYPE_METRICS_HOST = 6;
}

message Log
Expand Down
18 changes: 18 additions & 0 deletions core/unittest/flusher/FlusherSLSUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -460,6 +460,24 @@ void FlusherSLSUnittest::OnSuccessfulInit() {
APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
SenderQueueManager::GetInstance()->Clear();

configStr = R"(
{
"Type": "flusher_sls",
"Project": "test_project",
"Logstore": "test_logstore",
"Region": "test_region",
"Endpoint": "test_region.log.aliyuncs.com",
"TelemetryType": "metrics_multivalue"
}
)";
APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg));
flusher.reset(new FlusherSLS());
flusher->SetContext(ctx);
flusher->SetMetricsRecordRef(FlusherSLS::sName, "1");
APSARA_TEST_TRUE(flusher->Init(configJson, optionalGoPipeline));
APSARA_TEST_FALSE(flusher->mBatcher.GetGroupFlushStrategy().has_value());
SenderQueueManager::GetInstance()->Clear();

// apm
std::vector<std::string> apmConfigStr = {R"(
{
Expand Down
Loading
Loading
0