diff --git a/.github/workflows/pr-compile-check.yaml b/.github/workflows/pr-compile-check.yaml index fce4d507b98..999a046df04 100644 --- a/.github/workflows/pr-compile-check.yaml +++ b/.github/workflows/pr-compile-check.yaml @@ -53,9 +53,14 @@ jobs: - name: Setup environment run: | sudo apt-get update - sudo apt-get install -y curl gcc-9 g++-9 clang-12 libsystemd-dev gcovr libyaml-dev libluajit-5.1-dev \ - libnghttp2-dev libjemalloc-dev + sudo apt-get install -y curl gcc-9 g++-9 clang-12 libsystemd-dev gcovr libyaml-dev sudo ln -s /usr/bin/llvm-symbolizer-12 /usr/bin/llvm-symbolizer || true + + - name: Install system libraries for this test + run: | + sudo apt-get update + sudo apt-get install -y libc-ares-dev libjemalloc-dev libluajit-5.1-dev \ + libnghttp2-dev libsqlite3-dev libzstd-dev mkdir -p /tmp/libbacktrace/build && \ curl -L https://github.com/ianlancetaylor/libbacktrace/archive/8602fda.tar.gz | \ tar --strip-components=1 -xzC /tmp/libbacktrace/ && \ @@ -87,5 +92,13 @@ jobs: - name: Display dependencies w/ ldd run: | - ldd ./bin/fluent-bit + export ldd_result=$(ldd ./bin/fluent-bit) + echo "ldd result:" + echo "$ldd_result" + echo "$ldd_result" | grep libcares + echo "$ldd_result" | grep libjemalloc + echo "$ldd_result" | grep libluajit + echo "$ldd_result" | grep libnghttp2 + echo "$ldd_result" | grep libsqlite3 + echo "$ldd_result" | grep libzstd working-directory: build diff --git a/CMakeLists.txt b/CMakeLists.txt index e092433b55e..72b950e7b0a 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -8,7 +8,7 @@ set(CMAKE_POLICY_DEFAULT_CMP0069 NEW) # Fluent Bit Version set(FLB_VERSION_MAJOR 4) set(FLB_VERSION_MINOR 0) -set(FLB_VERSION_PATCH 1) +set(FLB_VERSION_PATCH 2) set(FLB_VERSION_STR "${FLB_VERSION_MAJOR}.${FLB_VERSION_MINOR}.${FLB_VERSION_PATCH}") set(CMAKE_POSITION_INDEPENDENT_CODE ON) @@ -120,6 +120,19 @@ include(cmake/platform_feature_checks.cmake) set(CMAKE_MODULE_PATH "${PROJECT_SOURCE_DIR}/cmake/sanitizers-cmake/cmake" ${CMAKE_MODULE_PATH}) find_package(Sanitizers) +# Check for CXX support +include(CheckLanguage) +check_language(CXX) + +# Enable CXX features if CXX is available +if(CMAKE_CXX_COMPILER) + message(STATUS "CXX compiler found, enable simdutf.") + set(FLB_USE_SIMDUTF Yes) +else() + message(STATUS "CXX compiler not found, disable simdutf.") + set(FLB_USE_SIMDUTF No) +endif() + # Output paths set(FLB_ROOT "${CMAKE_CURRENT_SOURCE_DIR}") set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "${CMAKE_CURRENT_BINARY_DIR}/bin") @@ -139,7 +152,7 @@ option(FLB_COVERAGE "Build with code-coverage" No) option(FLB_JEMALLOC "Build with Jemalloc support" No) option(FLB_REGEX "Build with Regex support" Yes) option(FLB_UTF8_ENCODER "Build with UTF8 encoding support" Yes) -option(FLB_UNICODE_ENCODER "Build with Unicode (UTF-16LE, UTF-16BE) encoding support" Yes) +option(FLB_UNICODE_ENCODER "Build with Unicode (UTF-16LE, UTF-16BE) encoding support" ${FLB_USE_SIMDUTF}) option(FLB_PARSER "Build with Parser support" Yes) option(FLB_TLS "Build with SSL/TLS support" Yes) option(FLB_BINARY "Build executable binary" Yes) @@ -236,7 +249,10 @@ option(FLB_PREFER_SYSTEM_LIB_CARES "Prefer the libcares system library" option(FLB_PREFER_SYSTEM_LIB_JEMALLOC "Prefer the libjemalloc system library" ${FLB_PREFER_SYSTEM_LIBS}) option(FLB_PREFER_SYSTEM_LIB_KAFKA "Prefer the libkafka system library" ${FLB_PREFER_SYSTEM_LIBS}) option(FLB_PREFER_SYSTEM_LIB_LUAJIT "Prefer the libluajit system library" ${FLB_PREFER_SYSTEM_LIBS}) +option(FLB_PREFER_SYSTEM_LIB_MSGPACK "Prefer the libmsgpack system library" ${FLB_PREFER_SYSTEM_LIBS}) option(FLB_PREFER_SYSTEM_LIB_NGHTTP2 "Prefer the libnghttp2 system library" ${FLB_PREFER_SYSTEM_LIBS}) +option(FLB_PREFER_SYSTEM_LIB_SQLITE "Prefer the libsqlite3 system library" ${FLB_PREFER_SYSTEM_LIBS}) +option(FLB_PREFER_SYSTEM_LIB_ZSTD "Prefer the libzstd system library" ${FLB_PREFER_SYSTEM_LIBS}) # Enable all features if(FLB_ALL) @@ -518,11 +534,16 @@ FLB_OPTION(FLUENT_PROTO_PROFILES ON) add_subdirectory(${FLB_PATH_LIB_FLUENT_OTEL} EXCLUDE_FROM_ALL) # MsgPack options -option(MSGPACK_ENABLE_CXX OFF) -option(MSGPACK_ENABLE_SHARED OFF) -option(MSGPACK_BUILD_TESTS OFF) -option(MSGPACK_BUILD_EXAMPLES OFF) -add_subdirectory(${FLB_PATH_LIB_MSGPACK} EXCLUDE_FROM_ALL) +if(FLB_PREFER_SYSTEM_LIB_MSGPACK) + find_package(PkgConfig) + pkg_check_modules(MSGPACK msgpack>=4.0.0) +endif() +if(MSGPACK_FOUND) + include_directories(${MSGPACK_INCLUDE_DIRS}) + link_directories(${MSGPACK_LIBRARY_DIRS}) +else() + include(cmake/msgpack.cmake) +endif() # MPack add_definitions(-DMPACK_EXTENSIONS=1) @@ -532,13 +553,16 @@ add_subdirectory(${FLB_PATH_LIB_MPACK} EXCLUDE_FROM_ALL) add_subdirectory(${FLB_PATH_LIB_MINIZ} EXCLUDE_FROM_ALL) # Zstd (zstd) -set(ZSTD_BUILD_STATIC ON) -set(ZSTD_BUILD_SHARED OFF) -set(ZSTD_BUILD_COMPRESSION ON) -set(ZSTD_BUILD_DECOMPRESSION ON) -set(ZSTD_BUILD_DICTBUILDER OFF) -set(ZSTD_BUILD_DEPRECATED OFF) -add_subdirectory(${FLB_PATH_LIB_ZSTD}/build/cmake EXCLUDE_FROM_ALL) +if(FLB_PREFER_SYSTEM_LIB_ZSTD) + find_package(PkgConfig) + pkg_check_modules(LIBZSTD libzstd>=1.4.8) +endif() +if(LIBZSTD_FOUND) + include_directories(${LIBZSTD_INCLUDE_DIRS}) + link_directories(${LIBZSTD_LIBRARY_DIRS}) +else() + include(cmake/zstd.cmake) +endif() # ring buffer library add_subdirectory(${FLB_PATH_LIB_RING_BUFFER} EXCLUDE_FROM_ALL) @@ -746,8 +770,17 @@ if (FLB_SIGNV4) endif() if(FLB_SQLDB) + if(FLB_PREFER_SYSTEM_LIB_SQLITE) + find_package(PkgConfig) + pkg_check_modules(SQLITE sqlite3>=3.0.0) + endif() + if(SQLITE_FOUND) + include_directories(${SQLITE_INCLUDE_DIRS}) + link_directories(${SQLITE_LIBRARY_DIRS}) + else() + include(cmake/sqlite.cmake) + endif() FLB_DEFINITION(FLB_HAVE_SQLDB) - add_subdirectory(${FLB_PATH_LIB_SQLITE}) endif() if(FLB_TRACE) diff --git a/SECURITY.md b/SECURITY.md index 3c92a1ffb5b..33bb7bd90d2 100644 --- a/SECURITY.md +++ b/SECURITY.md @@ -5,9 +5,9 @@ | Version | Supported | |---------| ------------------ | +| 4.0.x | :white_check_mark: | | 3.2.x | :white_check_mark: | -| 3.1.x | :white_check_mark: | -| < 3.1 | :x: | +| < 3.2 | :x: | ## Reporting a Vulnerability diff --git a/cmake/cares.cmake b/cmake/cares.cmake index f66c70b5e85..e0c34692c2a 100644 --- a/cmake/cares.cmake +++ b/cmake/cares.cmake @@ -10,6 +10,11 @@ if (FLB_SYSTEM_MACOS) FLB_DEFINITION(CARES_HAVE_ARPA_NAMESER_H) endif() +include_directories( + ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_CARES}/include + ${FLB_PATH_ROOT_BINARY_DIR}/${FLB_PATH_LIB_CARES} +) + add_subdirectory(${FLB_PATH_LIB_CARES})# EXCLUDE_FROM_ALL) set(LIBCARES_LIBRARIES "c-ares") diff --git a/cmake/headers.cmake b/cmake/headers.cmake index fa6bccb13e2..e4cc98e94d4 100755 --- a/cmake/headers.cmake +++ b/cmake/headers.cmake @@ -20,7 +20,6 @@ include_directories( ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_CO} ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_RBTREE} - ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MSGPACK}/include # Chunk I/O generate headers also in the binary path ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_CHUNKIO}/include @@ -29,7 +28,6 @@ include_directories( ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MONKEY}/include ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MONKEY}/include/monkey ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MBEDTLS}/include - ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_SQLITE} ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MPACK}/src ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MINIZ}/ ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_ONIGMO} @@ -38,16 +36,6 @@ include_directories( ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_CTRACES}/include ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_CPROFILES}/include ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_RING_BUFFER}/lwrb/src/include - ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_ZSTD}/lib - - # c-ares - ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_CARES}/include - ${FLB_PATH_ROOT_BINARY_DIR}/${FLB_PATH_LIB_CARES} - - # nghttp2 - ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_NGHTTP2}/lib/includes/ - ${FLB_PATH_ROOT_BINARY_DIR}/lib/nghttp2 - ${FLB_PATH_ROOT_BINARY_DIR}/lib/nghttp2/lib/includes/ ${FLB_PATH_ROOT_BINARY_DIR}/${FLB_PATH_LIB_JANSSON}/include ${FLB_PATH_ROOT_BINARY_DIR}/lib/cmetrics diff --git a/cmake/msgpack.cmake b/cmake/msgpack.cmake new file mode 100644 index 00000000000..6211e31f302 --- /dev/null +++ b/cmake/msgpack.cmake @@ -0,0 +1,10 @@ +# msgpack cmake +option(MSGPACK_ENABLE_CXX OFF) +option(MSGPACK_ENABLE_SHARED OFF) +option(MSGPACK_BUILD_TESTS OFF) +option(MSGPACK_BUILD_EXAMPLES OFF) +include_directories( + ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_MSGPACK}/include +) +add_subdirectory(${FLB_PATH_LIB_MSGPACK} EXCLUDE_FROM_ALL) +set(MSGPACK_LIBRARIES "msgpack-c-static") diff --git a/cmake/nghttp2.cmake b/cmake/nghttp2.cmake index 775d557330a..2497e72a681 100644 --- a/cmake/nghttp2.cmake +++ b/cmake/nghttp2.cmake @@ -6,5 +6,11 @@ FLB_OPTION(BUILD_SHARED_LIBS OFF) FLB_OPTION(BUILD_STATIC_LIBS ON) FLB_DEFINITION(NGHTTP2_STATICLIB) +include_directories( + ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_NGHTTP2}/lib/includes/ + ${FLB_PATH_ROOT_BINARY_DIR}/lib/nghttp2 + ${FLB_PATH_ROOT_BINARY_DIR}/lib/nghttp2/lib/includes/ +) + add_subdirectory(${FLB_PATH_LIB_NGHTTP2} EXCLUDE_FROM_ALL) set(NGHTTP2_LIBRARIES "nghttp2_static") diff --git a/cmake/sqlite.cmake b/cmake/sqlite.cmake new file mode 100644 index 00000000000..76585168d98 --- /dev/null +++ b/cmake/sqlite.cmake @@ -0,0 +1,6 @@ +# sqlite cmake +include_directories( + ${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_SQLITE} +) +add_subdirectory(${FLB_PATH_LIB_SQLITE}) +set(SQLITE_LIBRARIES "sqlite3") diff --git a/cmake/zstd.cmake b/cmake/zstd.cmake new file mode 100644 index 00000000000..7351ac1d845 --- /dev/null +++ b/cmake/zstd.cmake @@ -0,0 +1,10 @@ +# zstd cmake +set(ZSTD_BUILD_STATIC ON) +set(ZSTD_BUILD_SHARED OFF) +set(ZSTD_BUILD_COMPRESSION ON) +set(ZSTD_BUILD_DECOMPRESSION ON) +set(ZSTD_BUILD_DICTBUILDER OFF) +set(ZSTD_BUILD_DEPRECATED OFF) +include_directories(${FLB_PATH_ROOT_SOURCE}/${FLB_PATH_LIB_ZSTD}/lib) +add_subdirectory(${FLB_PATH_LIB_ZSTD}/build/cmake EXCLUDE_FROM_ALL) +set(LIBZSTD_LIBRARIES "libzstd_static") diff --git a/dockerfiles/Dockerfile b/dockerfiles/Dockerfile index 9a2e599f1eb..a29798bfabe 100644 --- a/dockerfiles/Dockerfile +++ b/dockerfiles/Dockerfile @@ -13,7 +13,7 @@ # docker buildx build --platform "linux/amd64,linux/arm64,linux/arm/v7,linux/s390x" -f ./dockerfiles/Dockerfile.multiarch --build-arg FLB_TARBALL=https://github.com/fluent/fluent-bit/archive/v1.8.11.tar.gz ./dockerfiles/ # Set this to the current release version: it gets done so as part of the release. -ARG RELEASE_VERSION=4.0.1 +ARG RELEASE_VERSION=4.0.2 # For multi-arch builds - assumption is running on an AMD64 host FROM multiarch/qemu-user-static:x86_64-arm AS qemu-arm32 diff --git a/fluent-bit-4.0.1.bb b/fluent-bit-4.0.2.bb similarity index 99% rename from fluent-bit-4.0.1.bb rename to fluent-bit-4.0.2.bb index fb53368d4a5..89390a8f001 100644 --- a/fluent-bit-4.0.1.bb +++ b/fluent-bit-4.0.2.bb @@ -16,7 +16,7 @@ LIC_FILES_CHKSUM = "file://LICENSE;md5=2ee41112a44fe7014dce33e26468ba93" SECTION = "net" PR = "r0" -PV = "4.0.1" +PV = "4.0.2" SRCREV = "v${PV}" SRC_URI = "git://github.com/fluent/fluent-bit.git;nobranch=1" diff --git a/include/fluent-bit/flb_config.h b/include/fluent-bit/flb_config.h index a41c7d83d2d..c2b7efdbc0a 100644 --- a/include/fluent-bit/flb_config.h +++ b/include/fluent-bit/flb_config.h @@ -287,6 +287,10 @@ struct flb_config { size_t route_mask_size; size_t route_mask_slots; uint64_t *route_empty_mask; +#ifdef FLB_SYSTEM_WINDOWS + /* maxstdio (Windows) */ + int win_maxstdio; +#endif /* Co-routines */ unsigned int coro_stack_size; @@ -373,6 +377,9 @@ enum conf_type { #define FLB_CONF_STR_HOT_RELOAD "Hot_Reload" #define FLB_CONF_STR_HOT_RELOAD_ENSURE_THREAD_SAFETY "Hot_Reload.Ensure_Thread_Safety" +/* Set up maxstdio (Windows) */ +#define FLB_CONF_STR_WINDOWS_MAX_STDIO "windows.maxstdio" + /* DNS */ #define FLB_CONF_DNS_MODE "dns.mode" #define FLB_CONF_DNS_RESOLVER "dns.resolver" diff --git a/include/fluent-bit/flb_simd.h b/include/fluent-bit/flb_simd.h index 41e48da8411..2ff0ee79792 100644 --- a/include/fluent-bit/flb_simd.h +++ b/include/fluent-bit/flb_simd.h @@ -77,8 +77,27 @@ typedef uint32x4_t flb_vector32; typedef vuint8m1_t flb_vector8; typedef vuint32m1_t flb_vector32; -#define RVV_VEC8_INST_LEN (128 / 8) /* 16 */ -#define RVV_VEC32_INST_LEN (128 / 8 / 4) /* 4 */ +static size_t vec8_vl_cached = 0; +static size_t vec32_vl_cached = 0; + +static inline size_t flb_rvv_get_vec8_vl() +{ + if (vec8_vl_cached == 0) { + vec8_vl_cached = __riscv_vsetvl_e8m1(16); + } + return vec8_vl_cached; +} + +static inline size_t flb_rvv_get_vec32_vl() +{ + if (vec32_vl_cached == 0) { + vec32_vl_cached = __riscv_vsetvl_e32m1(4); + } + return vec32_vl_cached; +} + +#define RVV_VEC8_INST_LEN flb_rvv_get_vec8_vl() /* 16 */ +#define RVV_VEC32_INST_LEN flb_rvv_get_vec32_vl() /* 4 */ #else /* diff --git a/init/az2-sethostname.in b/init/az2-sethostname.in index dcce08bf2d9..14635a4dcb1 100644 --- a/init/az2-sethostname.in +++ b/init/az2-sethostname.in @@ -1,12 +1,21 @@ [Unit] -Description=Set Hostname Workaround coreos/bugs#1272 +Description=Set Hostname Workaround coreos/bugs#1272 with EC2 IMDSv2 support Wants=network-online.target After=network-online.target [Service] Type=oneshot RemainAfterExit=yes -ExecStart=/bin/sh -c "/usr/bin/hostnamectl set-hostname $(curl -s http://169.254.169.254/latest/meta-data/hostname)" + +ExecStartPre=/bin/sh -c 'curl -sX PUT "http://169.254.169.254/latest/api/token" \ + -H "X-aws-ec2-metadata-token-ttl-seconds: 21600" > /run/imds_token' + +ExecStartPre=/bin/sh -c 'curl -s "http://169.254.169.254/latest/meta-data/hostname" \ + -H "X-aws-ec2-metadata-token: $(cat /run/imds_token)" > /run/ec2_hostname' + +ExecStart=/bin/sh -c '/usr/bin/hostnamectl set-hostname "$(cat /run/ec2_hostname)"' + +ExecStartPost=/bin/sh -c 'rm -f /run/imds_token /run/ec2_hostname' [Install] -WantedBy=multi-user.target \ No newline at end of file +WantedBy=multi-user.target diff --git a/lib/cmetrics/.github/workflows/build.yaml b/lib/cmetrics/.github/workflows/build.yaml index 7da5381104a..9c004466e0b 100644 --- a/lib/cmetrics/.github/workflows/build.yaml +++ b/lib/cmetrics/.github/workflows/build.yaml @@ -141,7 +141,7 @@ jobs: submodules: true - name: Build on ${{ matrix.os }} with ${{ matrix.compiler }} - uses: uraimo/run-on-arch-action@v3.0.0 + uses: uraimo/run-on-arch-action@v3.0.1 with: arch: aarch64 distro: ubuntu_latest diff --git a/lib/cmetrics/.github/workflows/packages.yaml b/lib/cmetrics/.github/workflows/packages.yaml index 0e4c0e0f10a..ac5f78aaec5 100644 --- a/lib/cmetrics/.github/workflows/packages.yaml +++ b/lib/cmetrics/.github/workflows/packages.yaml @@ -22,7 +22,7 @@ jobs: with: submodules: true - - uses: uraimo/run-on-arch-action@v3.0.0 + - uses: uraimo/run-on-arch-action@v3.0.1 name: Build the ${{matrix.format}} packages with: arch: aarch64 diff --git a/lib/cmetrics/CMakeLists.txt b/lib/cmetrics/CMakeLists.txt index 9ea50a23439..44839e6a5d8 100644 --- a/lib/cmetrics/CMakeLists.txt +++ b/lib/cmetrics/CMakeLists.txt @@ -6,7 +6,7 @@ set(CMAKE_EXPORT_COMPILE_COMMANDS ON) # CMetrics Version set(CMT_VERSION_MAJOR 1) set(CMT_VERSION_MINOR 0) -set(CMT_VERSION_PATCH 0) +set(CMT_VERSION_PATCH 2) set(CMT_VERSION_STR "${CMT_VERSION_MAJOR}.${CMT_VERSION_MINOR}.${CMT_VERSION_PATCH}") # Include helpers diff --git a/lib/cmetrics/src/cmt_decode_msgpack.c b/lib/cmetrics/src/cmt_decode_msgpack.c index 3730af8a294..d1c460e7593 100644 --- a/lib/cmetrics/src/cmt_decode_msgpack.c +++ b/lib/cmetrics/src/cmt_decode_msgpack.c @@ -212,6 +212,26 @@ static int unpack_opts(mpack_reader_t *reader, struct cmt_opts *opts) result = cmt_mpack_unpack_map(reader, callbacks, (void *) opts); if (CMT_DECODE_MSGPACK_SUCCESS == result) { + /* Ensure required string fields are not NULL */ + if (NULL == opts->ns) { + opts->ns = cfl_sds_create(""); + if (NULL == opts->ns) { + return CMT_DECODE_MSGPACK_ALLOCATION_ERROR; + } + } + if (NULL == opts->subsystem) { + opts->subsystem = cfl_sds_create(""); + if (NULL == opts->subsystem) { + return CMT_DECODE_MSGPACK_ALLOCATION_ERROR; + } + } + if (NULL == opts->name) { + opts->name = cfl_sds_create(""); + if (NULL == opts->name) { + return CMT_DECODE_MSGPACK_ALLOCATION_ERROR; + } + } + /* Allocate enough space for the three components, the separators * and the terminator so we don't have to worry about possible realloc issues * later on. diff --git a/lib/cmetrics/src/cmt_decode_opentelemetry.c b/lib/cmetrics/src/cmt_decode_opentelemetry.c index 9f88bfc81af..5ead2b92a86 100644 --- a/lib/cmetrics/src/cmt_decode_opentelemetry.c +++ b/lib/cmetrics/src/cmt_decode_opentelemetry.c @@ -361,6 +361,10 @@ static int decode_data_point_labels(struct cmt *cmt, attribute = (Opentelemetry__Proto__Common__V1__KeyValue *) value_index_list[map_label_index]; + if (attribute->value == NULL) { + continue; + } + if (attribute->value->value_case == OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE) { result = append_new_metric_label_value(metric, attribute->value->string_value, 0); } diff --git a/lib/cmetrics/src/cmt_decode_prometheus.l b/lib/cmetrics/src/cmt_decode_prometheus.l index 54a7c79d8e5..4a2951c6186 100644 --- a/lib/cmetrics/src/cmt_decode_prometheus.l +++ b/lib/cmetrics/src/cmt_decode_prometheus.l @@ -146,6 +146,9 @@ ["] { BEGIN(INQUOTE); + if (context->strbuf != NULL) { + cfl_sds_destroy(context->strbuf); + } context->strbuf = sds_alloc(256); } diff --git a/lib/ctraces/.github/workflows/build.yaml b/lib/ctraces/.github/workflows/build.yaml index 6d2faee05c8..ee53dd0dadb 100644 --- a/lib/ctraces/.github/workflows/build.yaml +++ b/lib/ctraces/.github/workflows/build.yaml @@ -137,7 +137,7 @@ jobs: submodules: true - name: Build on ${{ matrix.os }} with ${{ matrix.compiler }} - uses: uraimo/run-on-arch-action@v3.0.0 + uses: uraimo/run-on-arch-action@v3.0.1 with: arch: aarch64 distro: ubuntu_latest diff --git a/lib/ctraces/.github/workflows/packages.yaml b/lib/ctraces/.github/workflows/packages.yaml index fa2db17722a..8f8e88c1c0a 100644 --- a/lib/ctraces/.github/workflows/packages.yaml +++ b/lib/ctraces/.github/workflows/packages.yaml @@ -22,7 +22,7 @@ jobs: with: submodules: true - - uses: uraimo/run-on-arch-action@v3.0.0 + - uses: uraimo/run-on-arch-action@v3.0.1 name: Build the ${{matrix.format}} packages with: arch: aarch64 diff --git a/lib/ctraces/CMakeLists.txt b/lib/ctraces/CMakeLists.txt index 3b093f7bc44..73723433d53 100644 --- a/lib/ctraces/CMakeLists.txt +++ b/lib/ctraces/CMakeLists.txt @@ -27,7 +27,7 @@ endif() # CTraces Version set(CTR_VERSION_MAJOR 0) set(CTR_VERSION_MINOR 6) -set(CTR_VERSION_PATCH 4) +set(CTR_VERSION_PATCH 6) set(CTR_VERSION_STR "${CTR_VERSION_MAJOR}.${CTR_VERSION_MINOR}.${CTR_VERSION_PATCH}") # Define __FILENAME__ consistently across Operating Systems diff --git a/lib/ctraces/src/ctr_decode_opentelemetry.c b/lib/ctraces/src/ctr_decode_opentelemetry.c index f9be84fea85..8c5078078dc 100644 --- a/lib/ctraces/src/ctr_decode_opentelemetry.c +++ b/lib/ctraces/src/ctr_decode_opentelemetry.c @@ -299,6 +299,9 @@ static int convert_any_value(struct opentelemetry_decode_value *ctr_val, { int result; + if (val == NULL) { + return -1; + } switch (val->value_case) { case OPENTELEMETRY__PROTO__COMMON__V1__ANY_VALUE__VALUE_STRING_VALUE: @@ -540,7 +543,7 @@ int ctr_decode_opentelemetry_create(struct ctrace **out_ctr, for (resource_span_index = 0; resource_span_index < service_request->n_resource_spans; resource_span_index++) { otel_resource_span = service_request->resource_spans[resource_span_index]; - if (otel_resource_span == NULL) { + if (otel_resource_span == NULL || otel_resource_span->resource == NULL) { opentelemetry__proto__collector__trace__v1__export_trace_service_request__free_unpacked(service_request, NULL); ctr_destroy(ctr); return CTR_DECODE_OPENTELEMETRY_INVALID_PAYLOAD; diff --git a/packaging/test-release-packages.sh b/packaging/test-release-packages.sh index 085a3f205e8..992091beae4 100755 --- a/packaging/test-release-packages.sh +++ b/packaging/test-release-packages.sh @@ -37,9 +37,7 @@ function check_version() { fi } -APT_TARGETS=("ubuntu:18.04" - "ubuntu:20.04" - "ubuntu:22.04" +APT_TARGETS=("ubuntu:22.04" "debian:10" "debian:11") diff --git a/plugins/CMakeLists.txt b/plugins/CMakeLists.txt index 1cb6777959a..baaaabd0326 100644 --- a/plugins/CMakeLists.txt +++ b/plugins/CMakeLists.txt @@ -204,7 +204,7 @@ endmacro() macro(FLB_PLUGIN name src deps) add_library(flb-plugin-${name} STATIC ${src}) add_sanitizers(flb-plugin-${name}) - target_link_libraries(flb-plugin-${name} fluent-bit-static msgpack-c-static ${deps}) + target_link_libraries(flb-plugin-${name} fluent-bit-static ${MSGPACK_LIBRARIES} ${deps}) endmacro() # FLB_ZIG_PLUGIN: used by zig plugins to standardize the process diff --git a/plugins/in_opentelemetry/opentelemetry_logs.c b/plugins/in_opentelemetry/opentelemetry_logs.c index f6b78f72aad..8a3a2189f00 100644 --- a/plugins/in_opentelemetry/opentelemetry_logs.c +++ b/plugins/in_opentelemetry/opentelemetry_logs.c @@ -682,12 +682,14 @@ static int process_json_payload_resource_logs_entry(struct flb_opentelemetry *ct int ret; int result; size_t index; - msgpack_object obj; + msgpack_object *obj; msgpack_object_map *resource = NULL; msgpack_object *resource_attr = NULL; msgpack_object_map *resource_logs_entry = NULL; + msgpack_object *resource_schema_url = NULL; msgpack_object *scope = NULL; msgpack_object_array *scope_logs; + msgpack_object *scope_schema_url = NULL; if (resource_logs_object->type != MSGPACK_OBJECT_MAP) { flb_plg_error(ctx->ins, "unexpected resourceLogs entry type"); @@ -697,22 +699,37 @@ static int process_json_payload_resource_logs_entry(struct flb_opentelemetry *ct /* get 'resource' and resource['attributes'] */ result = find_map_entry_by_key(&resource_logs_object->via.map, "resource", 0, FLB_TRUE); if (result >= 0) { - obj = resource_logs_object->via.map.ptr[result].val; - if (obj.type == MSGPACK_OBJECT_MAP) { - resource = &obj.via.map; + obj = &resource_logs_object->via.map.ptr[result].val; + if (obj->type == MSGPACK_OBJECT_MAP) { + resource = &obj->via.map; + + /* attributes */ result = find_map_entry_by_key(resource, "attributes", 0, FLB_TRUE); if (result >= 0) { - obj = resource->ptr[result].val; - if (obj.type == MSGPACK_OBJECT_ARRAY) { - resource_attr = &obj; + obj = &resource->ptr[result].val; + if (obj->type == MSGPACK_OBJECT_ARRAY) { + resource_attr = &resource->ptr[result].val; } } } } resource_logs_entry = &resource_logs_object->via.map; - result = find_map_entry_by_key(resource_logs_entry, "scopeLogs", 0, FLB_TRUE); + /* schemaUrl */ + result = find_map_entry_by_key(resource_logs_entry, "schemaUrl", 0, FLB_TRUE); + if (result == -1) { + result = find_map_entry_by_key(resource_logs_entry, "schema_url", 0, FLB_TRUE); + } + if (result >= 0) { + obj = &resource_logs_entry->ptr[result].val; + if (obj->type == MSGPACK_OBJECT_STR) { + resource_schema_url = &resource_logs_entry->ptr[result].val; + } + } + + /* scopeLogs */ + result = find_map_entry_by_key(resource_logs_entry, "scopeLogs", 0, FLB_TRUE); if (result == -1) { result = find_map_entry_by_key(resource_logs_entry, "scope_logs", 0, FLB_TRUE); if (result == -1) { @@ -767,24 +784,42 @@ static int process_json_payload_resource_logs_entry(struct flb_opentelemetry *ct if (resource) { result = find_map_entry_by_key(resource, "droppedAttributesCount", 0, FLB_TRUE); if (result >= 0) { - obj = resource->ptr[result].val; + obj = &resource->ptr[result].val; flb_log_event_encoder_append_body_values(encoder, FLB_LOG_EVENT_CSTRING_VALUE("dropped_attributes_count"), - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj)); + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj)); } } + if (resource_schema_url) { + flb_log_event_encoder_append_body_values(encoder, + FLB_LOG_EVENT_CSTRING_VALUE("schema_url"), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(resource_schema_url)); + } + /* close resource map */ flb_log_event_encoder_body_commit_map(encoder); + /* scope schemaUrl */ + result = find_map_entry_by_key(&scope_logs->ptr[index].via.map, "schemaUrl", 0, FLB_TRUE); + if (result == -1) { + result = find_map_entry_by_key(&scope_logs->ptr[index].via.map, "schema_url", 0, FLB_TRUE); + } + if (result >= 0) { + obj = &scope_logs->ptr[index].via.map.ptr[result].val; + if (obj->type == MSGPACK_OBJECT_STR) { + scope_schema_url = &scope_logs->ptr[index].via.map.ptr[result].val; + } + } + /* scope metadata */ scope = NULL; - obj = scope_logs->ptr[index]; - if (obj.type == MSGPACK_OBJECT_MAP) { - result = find_map_entry_by_key(&obj.via.map, "scope", 0, FLB_TRUE); + obj = &scope_logs->ptr[index]; + if (obj->type == MSGPACK_OBJECT_MAP) { + result = find_map_entry_by_key(&obj->via.map, "scope", 0, FLB_TRUE); if (result >= 0) { - if (obj.via.map.ptr[result].val.type == MSGPACK_OBJECT_MAP) { - scope = &obj.via.map.ptr[result].val; + if (obj->via.map.ptr[result].val.type == MSGPACK_OBJECT_MAP) { + scope = &scope_logs->ptr[index].via.map.ptr[result].val; } } } @@ -804,40 +839,47 @@ static int process_json_payload_resource_logs_entry(struct flb_opentelemetry *ct /* scope name */ result = find_map_entry_by_key(&scope->via.map, "name", 0, FLB_TRUE); if (result >= 0) { - obj = scope->via.map.ptr[result].val; - if (obj.type == MSGPACK_OBJECT_STR) { + obj = &scope->via.map.ptr[result].val; + if (obj->type == MSGPACK_OBJECT_STR) { flb_log_event_encoder_append_body_values(encoder, FLB_LOG_EVENT_CSTRING_VALUE("name"), - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj)); + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj)); } } /* scope version */ result = find_map_entry_by_key(&scope->via.map, "version", 0, FLB_TRUE); if (result >= 0) { - obj = scope->via.map.ptr[result].val; - if (obj.type == MSGPACK_OBJECT_STR) { + obj = &scope->via.map.ptr[result].val; + if (obj->type == MSGPACK_OBJECT_STR) { flb_log_event_encoder_append_body_values(encoder, FLB_LOG_EVENT_CSTRING_VALUE("version"), - FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(&obj)); + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(obj)); } } /* scope attributes */ result = find_map_entry_by_key(&scope->via.map, "attributes", 0, FLB_TRUE); if (result >= 0) { - obj = scope->via.map.ptr[result].val; - if (obj.type == MSGPACK_OBJECT_ARRAY) { + obj = &scope->via.map.ptr[result].val; + if (obj->type == MSGPACK_OBJECT_ARRAY) { flb_log_event_encoder_append_body_string(encoder, "attributes", 10); result = json_payload_append_converted_kvlist(encoder, FLB_LOG_EVENT_BODY, - &obj); + obj); if (result != 0) { return -2; } } } + /* scope schemaUrl */ + if (scope_schema_url) { + flb_log_event_encoder_append_body_values(encoder, + FLB_LOG_EVENT_CSTRING_VALUE("schema_url"), + FLB_LOG_EVENT_MSGPACK_OBJECT_VALUE(scope_schema_url)); + } + flb_log_event_encoder_commit_map(encoder, FLB_LOG_EVENT_BODY); } @@ -891,8 +933,8 @@ static int process_json_payload_root(struct flb_opentelemetry *ctx, resource_logs = &root->ptr[result].val.via.array; - result = 0; + result = 0; for (index = 0 ; index < resource_logs->size ; index++) { result = process_json_payload_resource_logs_entry( ctx, @@ -1262,6 +1304,7 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx, for (resource_logs_index = 0; resource_logs_index < input_logs->n_resource_logs; resource_logs_index++) { resource_log = resource_logs[resource_logs_index]; + resource = resource_log->resource; scope_logs = resource_log->scope_logs; @@ -1322,18 +1365,18 @@ static int binary_payload_to_msgpack(struct flb_opentelemetry *ctx, msgpack_pack_str_body(&mp_pck, "dropped_attributes_count", 24); msgpack_pack_uint64(&mp_pck, resource->dropped_attributes_count); } - } - flb_mp_map_header_end(&mh_tmp); - if (resource_log->schema_url) { - flb_mp_map_header_append(&mh); - msgpack_pack_str(&mp_pck, 10); - msgpack_pack_str_body(&mp_pck, "schema_url", 10); + if (resource_log->schema_url) { + flb_mp_map_header_append(&mh_tmp); + msgpack_pack_str(&mp_pck, 10); + msgpack_pack_str_body(&mp_pck, "schema_url", 10); - len = strlen(resource_log->schema_url); - msgpack_pack_str(&mp_pck, len); - msgpack_pack_str_body(&mp_pck, resource_log->schema_url, len); + len = strlen(resource_log->schema_url); + msgpack_pack_str(&mp_pck, len); + msgpack_pack_str_body(&mp_pck, resource_log->schema_url, len); + } } + flb_mp_map_header_end(&mh_tmp); /* scope */ flb_mp_map_header_append(&mh); diff --git a/plugins/out_azure_kusto/azure_kusto_conf.c b/plugins/out_azure_kusto/azure_kusto_conf.c index 053d729623e..de09726052e 100644 --- a/plugins/out_azure_kusto/azure_kusto_conf.c +++ b/plugins/out_azure_kusto/azure_kusto_conf.c @@ -439,7 +439,6 @@ static flb_sds_t parse_ingestion_identity_token(struct flb_azure_kusto *ctx, } flb_free(tokens); - flb_free(t); return identity_token; } diff --git a/plugins/out_opentelemetry/opentelemetry_conf.c b/plugins/out_opentelemetry/opentelemetry_conf.c index 33d381c264b..77d42a2c764 100644 --- a/plugins/out_opentelemetry/opentelemetry_conf.c +++ b/plugins/out_opentelemetry/opentelemetry_conf.c @@ -277,13 +277,19 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output return NULL; } + ret = check_proxy(ins, ctx, host, port, protocol, logs_uri); + if (ret == -1) { + flb_opentelemetry_context_destroy(ctx); + return NULL; + } + ret = check_proxy(ins, ctx, host, port, protocol, metrics_uri); if (ret == -1) { flb_opentelemetry_context_destroy(ctx); return NULL; } - ret = check_proxy(ins, ctx, host, port, protocol, logs_uri); + ret = check_proxy(ins, ctx, host, port, protocol, traces_uri); if (ret == -1) { flb_opentelemetry_context_destroy(ctx); return NULL; @@ -496,7 +502,7 @@ struct opentelemetry_context *flb_opentelemetry_context_create(struct flb_output flb_plg_error(ins, "failed to create record accessor for resource attributes"); } - ctx->ra_resource_schema_url = flb_ra_create("$schema_url", FLB_FALSE); + ctx->ra_resource_schema_url = flb_ra_create("$resource['schema_url']", FLB_FALSE); if (ctx->ra_resource_schema_url == NULL) { flb_plg_error(ins, "failed to create record accessor for resource schema url"); } diff --git a/plugins/out_splunk/splunk.c b/plugins/out_splunk/splunk.c index 722d4c7922c..970eaa5d8d1 100644 --- a/plugins/out_splunk/splunk.c +++ b/plugins/out_splunk/splunk.c @@ -962,15 +962,15 @@ static void cb_splunk_flush(struct flb_event_chunk *event_chunk, if (ctx->http_user && ctx->http_passwd) { flb_http_basic_auth(c, ctx->http_user, ctx->http_passwd); } + else if (ctx->auth_header) { + flb_http_add_header(c, "Authorization", 13, + ctx->auth_header, flb_sds_len(ctx->auth_header)); + } else if (metadata_auth_header) { flb_http_add_header(c, "Authorization", 13, metadata_auth_header, flb_sds_len(metadata_auth_header)); } - else if (ctx->auth_header) { - flb_http_add_header(c, "Authorization", 13, - ctx->auth_header, flb_sds_len(ctx->auth_header)); - } /* Append Channel identifier header */ if (ctx->channel) { diff --git a/snap/snapcraft.yaml b/snap/snapcraft.yaml index 9f7f056949f..d02eb658c09 100644 --- a/snap/snapcraft.yaml +++ b/snap/snapcraft.yaml @@ -1,6 +1,6 @@ name: fluent-bit base: core18 -version: '4.0.1' +version: '4.0.2' summary: High performance logs and stream processor description: | Fluent Bit is a high performance log processor and stream processor for Linux. diff --git a/src/CMakeLists.txt b/src/CMakeLists.txt index 849961a149e..df8ee723da3 100644 --- a/src/CMakeLists.txt +++ b/src/CMakeLists.txt @@ -245,7 +245,7 @@ if(FLB_SQLDB) ) set(extra_libs ${extra_libs} - "sqlite3") + ${SQLITE_LIBRARIES}) endif() if(FLB_STATIC_CONF) @@ -392,11 +392,11 @@ set(FLB_DEPS ctraces-static mk_core jsmn - msgpack-c-static + ${MSGPACK_LIBRARIES} mpack-static chunkio-static miniz - libzstd_static + ${LIBZSTD_LIBRARIES} ${FLB_PLUGINS} ${FLB_PROXY_PLUGINS} diff --git a/src/flb_config.c b/src/flb_config.c index fb89f42276c..168324cbd84 100644 --- a/src/flb_config.c +++ b/src/flb_config.c @@ -183,6 +183,11 @@ struct flb_service_config service_configs[] = { offsetof(struct flb_config, enable_chunk_trace)}, #endif +#ifdef FLB_SYSTEM_WINDOWS + {FLB_CONF_STR_WINDOWS_MAX_STDIO, + FLB_CONF_TYPE_INT, + offsetof(struct flb_config, win_maxstdio)}, +#endif {FLB_CONF_STR_HOT_RELOAD, FLB_CONF_TYPE_BOOL, offsetof(struct flb_config, enable_hot_reload)}, @@ -291,6 +296,10 @@ struct flb_config *flb_config_init() config->shutdown_by_hot_reloading = FLB_FALSE; config->hot_reloading = FLB_FALSE; +#ifdef FLB_SYSTEM_WINDOWS + config->win_maxstdio = 512; +#endif + #ifdef FLB_HAVE_SQLDB mk_list_init(&config->sqldb_list); #endif @@ -864,10 +873,16 @@ static int configure_plugins_type(struct flb_config *config, struct flb_cf *cf, processors = flb_cf_group_get(cf, s, "processors"); if (processors) { if (type == FLB_CF_INPUT) { - flb_processors_load_from_config_format_group(((struct flb_input_instance *) ins)->processor, processors); + ret = flb_processors_load_from_config_format_group(((struct flb_input_instance *) ins)->processor, processors); + if (ret == -1) { + return -1; + } } else if (type == FLB_CF_OUTPUT) { - flb_processors_load_from_config_format_group(((struct flb_output_instance *) ins)->processor, processors); + ret = flb_processors_load_from_config_format_group(((struct flb_output_instance *) ins)->processor, processors); + if (ret == -1) { + return -1; + } } else { flb_error("[config] section '%s' does not support processors", s_type); diff --git a/src/flb_engine.c b/src/flb_engine.c index 5e1a8a8a563..e1069a4c4cd 100644 --- a/src/flb_engine.c +++ b/src/flb_engine.c @@ -131,6 +131,15 @@ void flb_engine_reschedule_retries(struct flb_config *config) ins = mk_list_entry(head, struct flb_input_instance, _head); mk_list_foreach_safe(t_head, tmp_task, &ins->tasks) { task = mk_list_entry(t_head, struct flb_task, _head); + + if (task->users > 0) { + flb_debug("[engine] retry=%p for task %i already scheduled to run, " + "not re-scheduling it.", + retry, task->id); + + continue; + } + mk_list_foreach_safe(rt_head, tmp_retry_task, &task->retries) { retry = mk_list_entry(rt_head, struct flb_task_retry, _head); flb_sched_request_invalidate(config, retry); @@ -754,6 +763,9 @@ int flb_engine_start(struct flb_config *config) flb_info("[fluent bit] version=%s, commit=%.10s, pid=%i", FLB_VERSION_STR, FLB_GIT_HASH, getpid()); +#ifdef FLB_SYSTEM_WINDOWS + flb_debug("[engine] maxstdio set: %d", _getmaxstdio()); +#endif /* Debug coroutine stack size */ flb_utils_bytes_to_human_readable_size(config->coro_stack_size, tmp, sizeof(tmp)); diff --git a/src/flb_input.c b/src/flb_input.c index ea0c932272c..57f360f2bf8 100644 --- a/src/flb_input.c +++ b/src/flb_input.c @@ -108,6 +108,21 @@ struct flb_config_map input_global_properties[] = { "pause until the buffer is drained. The value is in bytes. If set to 0, the buffer limit is disabled."\ "Note that if the plugin has enabled filesystem buffering, this limit will not apply." }, + { + FLB_CONFIG_MAP_STR, "storage.type", "memory", + 0, FLB_FALSE, 0, + "Sets the storage type for this input, one of: filesystem, memory or memrb." + }, + { + FLB_CONFIG_MAP_BOOL, "storage.pause_on_chunks_overlimit", "false", + 0, FLB_FALSE, 0, + "Enable pausing on an input when they reach their chunks limit" + }, + { + FLB_CONFIG_MAP_BOOL, "threaded", "false", + 0, FLB_FALSE, 0, + "Enable threading on an input" + }, {0} }; @@ -1338,6 +1353,7 @@ int flb_input_instance_init(struct flb_input_instance *ins, /* initialize processors */ ret = flb_processor_init(ins->processor); if (ret == -1) { + flb_error("[input %s] error initializing processor, aborting startup", ins->name); return -1; } diff --git a/src/flb_output.c b/src/flb_output.c index 66779c93f94..cb3cd1b6b0c 100644 --- a/src/flb_output.c +++ b/src/flb_output.c @@ -1428,6 +1428,7 @@ int flb_output_init_all(struct flb_config *config) /* initialize processors */ ret = flb_processor_init(ins->processor); if (ret == -1) { + flb_error("[output %s] error initializing processor, aborting startup", ins->name); return -1; } } diff --git a/src/flb_pack.c b/src/flb_pack.c index 9a14622eba2..fb616862b99 100644 --- a/src/flb_pack.c +++ b/src/flb_pack.c @@ -30,6 +30,9 @@ #include #include +#include +#include + /* cmetrics */ #include #include @@ -917,23 +920,21 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, flb_sds_t date_key) { int i; - int ok = MSGPACK_UNPACK_SUCCESS; - int records = 0; - int map_size; - size_t off = 0; + int ret; char time_formatted[38]; flb_sds_t out_tmp; flb_sds_t out_js; flb_sds_t out_buf = NULL; - msgpack_unpacked result; - msgpack_object root; - msgpack_object map; msgpack_sbuffer tmp_sbuf; msgpack_packer tmp_pck; - msgpack_object *obj; msgpack_object *k; msgpack_object *v; struct flb_time tms; + struct flb_log_event_decoder log_decoder; + struct flb_log_event log_event; + struct flb_mp_map_header mh_array; + struct flb_mp_map_header mh_map; + struct flb_mp_map_header mh_internal; /* For json lines and streams mode we need a pre-allocated buffer */ if (json_format == FLB_PACK_JSON_FORMAT_LINES || @@ -949,6 +950,15 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, msgpack_sbuffer_init(&tmp_sbuf); msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write); + ret = flb_log_event_decoder_init(&log_decoder, (char *) data, bytes); + if (ret != FLB_EVENT_DECODER_SUCCESS) { + flb_error("Log event decoder initialization error : %d", ret); + if (out_buf) { + flb_sds_destroy(out_buf); + } + return NULL; + } + /* * If the format is the original msgpack style of one big array, * registrate the array, otherwise is not necessary. FYI, original format: @@ -960,43 +970,25 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, * ] */ if (json_format == FLB_PACK_JSON_FORMAT_JSON) { - records = flb_mp_count(data, bytes); - if (records <= 0) { - msgpack_sbuffer_destroy(&tmp_sbuf); - return NULL; - } - msgpack_pack_array(&tmp_pck, records); + /* register the array. Note must be finalized with flb_mp_map_header_end() */ + flb_mp_array_header_init(&mh_array, &tmp_pck); } - msgpack_unpacked_init(&result); - while (msgpack_unpack_next(&result, data, bytes, &off) == ok) { - /* Each array must have two entries: time and record */ - root = result.data; - if (root.type != MSGPACK_OBJECT_ARRAY) { - continue; - } - if (root.via.array.size != 2) { - continue; + /* Iterate log records */ + while ((ret = flb_log_event_decoder_next(&log_decoder, &log_event)) == FLB_EVENT_DECODER_SUCCESS) { + if (json_format == FLB_PACK_JSON_FORMAT_JSON) { + /* register a new entry for the array entry */ + flb_mp_array_header_append(&mh_array); } + tms = log_event.timestamp; - /* Unpack time */ - flb_time_pop_from_msgpack(&tms, &result, &obj); - - /* Get the record/map */ - map = root.via.array.ptr[1]; - if (map.type != MSGPACK_OBJECT_MAP) { - continue; - } - map_size = map.via.map.size; + /* initialize the map for the record key/values */ + flb_mp_map_header_init(&mh_map, &tmp_pck); + /* date key */ if (date_key != NULL) { - msgpack_pack_map(&tmp_pck, map_size + 1); - } - else { - msgpack_pack_map(&tmp_pck, map_size); - } + flb_mp_array_header_append(&mh_map); - if (date_key != NULL) { /* Append date key */ msgpack_pack_str(&tmp_pck, flb_sds_len(date_key)); msgpack_pack_str_body(&tmp_pck, date_key, flb_sds_len(date_key)); @@ -1011,7 +1003,7 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, FLB_PACK_JSON_DATE_JAVA_SQL_TIMESTAMP_FMT, ".%06" PRIu64)) { flb_sds_destroy(out_buf); msgpack_sbuffer_destroy(&tmp_sbuf); - msgpack_unpacked_destroy(&result); + flb_log_event_decoder_destroy(&log_decoder); return NULL; } break; @@ -1020,7 +1012,7 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, FLB_PACK_JSON_DATE_ISO8601_FMT, ".%06" PRIu64 "Z")) { flb_sds_destroy(out_buf); msgpack_sbuffer_destroy(&tmp_sbuf); - msgpack_unpacked_destroy(&result); + flb_log_event_decoder_destroy(&log_decoder); return NULL; } break; @@ -1033,12 +1025,77 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, } } - /* Append remaining keys/values */ - for (i = 0; i < map_size; i++) { - k = &map.via.map.ptr[i].key; - v = &map.via.map.ptr[i].val; - msgpack_pack_object(&tmp_pck, *k); - msgpack_pack_object(&tmp_pck, *v); + /* register __internal__ data that comes from the group information */ + if ((log_event.group_attributes && log_event.group_attributes->type == MSGPACK_OBJECT_MAP && log_event.group_attributes->via.map.size > 0) || + (log_event.metadata && log_event.metadata->type == MSGPACK_OBJECT_MAP && log_event.metadata->via.map.size > 0)) { + + flb_mp_map_header_append(&mh_map); + msgpack_pack_str(&tmp_pck, 12); + msgpack_pack_str_body(&tmp_pck, "__internal__", 12); + + flb_mp_map_header_init(&mh_internal, &tmp_pck); + + /* + * group metadata: the JSON export of this record do not aim to be re-assembled into a Fluent pipeline, + * actually it is a generic JSON representation of the log record. For this reason, we need to add the group + * metadata to the JSON output. + * + * Just leaving this code commented as a reference... + */ + + /* + * if (log_event.group_metadata != NULL) { + * flb_mp_map_header_append(&mh_internal); + * msgpack_pack_str(&tmp_pck, 14); + * msgpack_pack_str_body(&tmp_pck, "group_metadata", 14); + * msgpack_pack_object(&tmp_pck, *log_event.group_metadata); + * } + */ + + /* Append group attributes */ + if (log_event.group_attributes != NULL) { + flb_mp_map_header_append(&mh_internal); + msgpack_pack_str(&tmp_pck, 16); + msgpack_pack_str_body(&tmp_pck, "group_attributes", 16); + msgpack_pack_object(&tmp_pck, *log_event.group_attributes); + } + + /* log/record metadata if exists */ + if (log_event.metadata != NULL) { + flb_mp_map_header_append(&mh_internal); + msgpack_pack_str(&tmp_pck, 12); + msgpack_pack_str_body(&tmp_pck, "log_metadata", 12); + msgpack_pack_object(&tmp_pck, *log_event.metadata); + } + + /* finalize the internal map */ + flb_mp_map_header_end(&mh_internal); + } + + /* Append keys/values from the log body */ + if (log_event.body != NULL) { + if (log_event.body->type == MSGPACK_OBJECT_MAP) { + for (i = 0; i < log_event.body->via.map.size; i++) { + flb_mp_map_header_append(&mh_map); + k = &log_event.body->via.map.ptr[i].key; + v = &log_event.body->via.map.ptr[i].val; + + /* Append key/value */ + msgpack_pack_object(&tmp_pck, *k); + msgpack_pack_object(&tmp_pck, *v); + } + + flb_mp_map_header_end(&mh_map); + } + else { + /* for any other data type, nest the content inside log */ + flb_mp_map_header_append(&mh_map); + msgpack_pack_str(&tmp_pck, 4); + msgpack_pack_str_body(&tmp_pck, "log", 3); + msgpack_pack_object(&tmp_pck, *log_event.body); + + flb_mp_map_header_end(&mh_map); + } } /* @@ -1071,7 +1128,7 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, if (!out_js) { flb_sds_destroy(out_buf); msgpack_sbuffer_destroy(&tmp_sbuf); - msgpack_unpacked_destroy(&result); + flb_log_event_decoder_destroy(&log_decoder); return NULL; } @@ -1084,7 +1141,7 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, flb_sds_destroy(out_js); flb_sds_destroy(out_buf); msgpack_sbuffer_destroy(&tmp_sbuf); - msgpack_unpacked_destroy(&result); + flb_log_event_decoder_destroy(&log_decoder); return NULL; } @@ -1102,7 +1159,7 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, if (!out_tmp) { flb_sds_destroy(out_buf); msgpack_sbuffer_destroy(&tmp_sbuf); - msgpack_unpacked_destroy(&result); + flb_log_event_decoder_destroy(&log_decoder); return NULL; } if (out_tmp != out_buf) { @@ -1113,14 +1170,18 @@ flb_sds_t flb_pack_msgpack_to_json_format(const char *data, uint64_t bytes, } } - /* Release the unpacker */ - msgpack_unpacked_destroy(&result); + /* destroy the decoder */ + flb_log_event_decoder_destroy(&log_decoder); + + /* finalize the main array */ + if (json_format == FLB_PACK_JSON_FORMAT_JSON) { + flb_mp_array_header_end(&mh_array); + } /* Format to JSON */ if (json_format == FLB_PACK_JSON_FORMAT_JSON) { out_buf = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size); msgpack_sbuffer_destroy(&tmp_sbuf); - if (!out_buf) { return NULL; } diff --git a/src/flb_processor.c b/src/flb_processor.c index 1bd2a10541b..af9b10dca8e 100644 --- a/src/flb_processor.c +++ b/src/flb_processor.c @@ -274,7 +274,7 @@ struct flb_processor_unit *flb_processor_unit_create(struct flb_processor *proc, unit_name, NULL); if (processor_instance == NULL) { - flb_error("[processor] error creating native processor instance %s", pu->name); + flb_error("[processor] error creating processor '%s': plugin doesn't exist or failed to initialize", unit_name); pthread_mutex_destroy(&pu->lock); flb_sds_destroy(pu->name); @@ -693,6 +693,7 @@ int flb_processor_init(struct flb_processor *proc) ret = flb_processor_unit_init(pu); if (ret == -1) { + flb_error("[processor] initialization of processor unit '%s' failed", pu->name); return -1; } count++; @@ -703,6 +704,7 @@ int flb_processor_init(struct flb_processor *proc) ret = flb_processor_unit_init(pu); if (ret == -1) { + flb_error("[processor] initialization of processor unit '%s' failed", pu->name); return -1; } count++; @@ -713,6 +715,7 @@ int flb_processor_init(struct flb_processor *proc) ret = flb_processor_unit_init(pu); if (ret == -1) { + flb_error("[processor] initialization of processor unit '%s' failed", pu->name); return -1; } count++; @@ -723,6 +726,7 @@ int flb_processor_init(struct flb_processor *proc) ret = flb_processor_unit_init(pu); if (ret == -1) { + flb_error("[processor] initialization of processor unit '%s' failed", pu->name); return -1; } count++; @@ -1199,7 +1203,7 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s tmp = cfl_kvlist_fetch(kvlist, "name"); if (!tmp) { - flb_error("processor configuration don't have a 'name' defined"); + flb_error("[processor] configuration missing required 'name' field"); return -1; } @@ -1208,7 +1212,6 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s pu = flb_processor_unit_create(proc, type, name); if (!pu) { - flb_error("cannot create '%s' processor unit", name); return -1; } @@ -1217,7 +1220,7 @@ static int load_from_config_format_group(struct flb_processor *proc, int type, s if (tmp) { ret = flb_processor_unit_set_property(pu, "condition", tmp); if (ret == -1) { - flb_error("failed to set condition for processor '%s'", name); + flb_error("[processor] failed to set condition for processor '%s'", name); return -1; } } diff --git a/src/fluent-bit.c b/src/fluent-bit.c index a97ebb13857..0e1b8f8840b 100644 --- a/src/fluent-bit.c +++ b/src/fluent-bit.c @@ -1067,6 +1067,9 @@ int flb_main(int argc, char **argv) { "http_port", required_argument, NULL, 'P' }, #endif { "enable-hot-reload", no_argument, NULL, 'Y' }, +#ifdef FLB_SYSTEM_WINDOWS + { "windows_maxstdio", required_argument, NULL, 'M' }, +#endif #ifdef FLB_HAVE_CHUNK_TRACE { "enable-chunk-trace", no_argument, NULL, 'Z' }, { "trace", required_argument, NULL, FLB_LONG_TRACE }, @@ -1104,7 +1107,7 @@ int flb_main(int argc, char **argv) /* Parse the command line options */ while ((opt = getopt_long(argc, argv, - "b:c:dDf:C:i:m:o:R:r:F:p:e:" + "b:c:dDf:C:i:m:M:o:R:r:F:p:e:" "t:T:l:vw:qVhJL:HP:s:SWYZ", long_opts, NULL)) != -1) { @@ -1159,6 +1162,12 @@ int flb_main(int argc, char **argv) flb_cf_section_property_add(cf_opts, s->properties, "match", 0, optarg, 0); } break; +#ifdef FLB_SYSTEM_WINDOWS + case 'M': + flb_cf_section_property_add(cf_opts, service->properties, + "windows.maxstdio", 0, optarg, 0); + break; +#endif case 'o': s = flb_cf_section_create(cf_opts, "output", 0); if (!s) { @@ -1391,6 +1400,18 @@ int flb_main(int argc, char **argv) #endif #ifdef FLB_SYSTEM_WINDOWS + /* Validate specified maxstdio */ + if (config->win_maxstdio >= 512 && config->win_maxstdio <= 2048) { + _setmaxstdio(config->win_maxstdio); + } + else { + fprintf(stderr, + "windows.maxstdio is invalid. From 512 to 2048 is vaild but got %d\n", + config->win_maxstdio); + flb_free(cfg_file); + flb_cf_destroy(cf_opts); + exit(EXIT_FAILURE); + } win32_started(); #endif diff --git a/tests/internal/data/reload/yaml/processor.yaml b/tests/internal/data/reload/yaml/processor.yaml index 3021ee7e36b..e3e484bbf82 100644 --- a/tests/internal/data/reload/yaml/processor.yaml +++ b/tests/internal/data/reload/yaml/processor.yaml @@ -16,26 +16,11 @@ pipeline: - name: modify add: hostname monox - - name: lua - call: append_tag - code: | - function append_tag(tag, timestamp, record) - new_record = record - new_record["tag"] = tag - return 1, timestamp, new_record - end - outputs: - name: stdout match: '*' processors: logs: - - name: lua - call: add_field - code: | - function add_field(tag, timestamp, record) - new_record = record - new_record["output"] = "new data" - return 1, timestamp, new_record - end + - name: modify + add: hostname monox diff --git a/tests/internal/fuzzers/CMakeLists.txt b/tests/internal/fuzzers/CMakeLists.txt index aae2b19ba85..dfa6e4ff7fe 100644 --- a/tests/internal/fuzzers/CMakeLists.txt +++ b/tests/internal/fuzzers/CMakeLists.txt @@ -4,6 +4,7 @@ set(UNIT_TESTS_FILES base64_fuzzer.c engine_fuzzer.c cmetrics_decode_fuzz.c + cmetrics_decode_prometheus.c config_fuzzer.c config_random_fuzzer.c ctrace_fuzzer.c diff --git a/tests/internal/fuzzers/cmetrics_decode_fuzz.c b/tests/internal/fuzzers/cmetrics_decode_fuzz.c index 707f0e6ee4b..913fe9c5614 100644 --- a/tests/internal/fuzzers/cmetrics_decode_fuzz.c +++ b/tests/internal/fuzzers/cmetrics_decode_fuzz.c @@ -19,7 +19,6 @@ #include #include -#include int @@ -36,7 +35,7 @@ LLVMFuzzerTestOneInput(const uint8_t * data, size_t size) return 0; } - decider = data[0] % 3; + decider = data[0] % 2; /* Adjust data pointer since the first byte is used */ data += 1; @@ -50,21 +49,12 @@ LLVMFuzzerTestOneInput(const uint8_t * data, size_t size) cmt_decode_opentelemetry_destroy (&decoded_contexts); } } - else if (decider == 1) { + else { result = cmt_decode_msgpack_create(&cmt, (char *) data, size, &off); if (result == 0) { cmt_destroy(cmt); } } - else if (decider == 2) { - if (size == 0) { - return 0; - } - struct cmt_decode_prometheus_parse_opts opts; - result = cmt_decode_prometheus_create(&cmt, data, size, &opts); - if (result == CMT_DECODE_PROMETHEUS_SUCCESS) { - cmt_decode_prometheus_destroy(cmt); - } - } + return 0; } diff --git a/tests/internal/fuzzers/cmetrics_decode_prometheus.c b/tests/internal/fuzzers/cmetrics_decode_prometheus.c new file mode 100644 index 00000000000..975478435ec --- /dev/null +++ b/tests/internal/fuzzers/cmetrics_decode_prometheus.c @@ -0,0 +1,41 @@ +/* -*- Mode: C; tab-width: 4; indent-tabs-mode: nil; c-basic-offset: 4 -*- */ + +/* Fluent Bit + * ========== + * Copyright (C) 2015-2023 The Fluent Bit Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + + +int +LLVMFuzzerTestOneInput(const uint8_t * data, size_t size) +{ + struct cmt *cmt = NULL; + int result; + + /* At least one byte is needed for deciding which decoder to use */ + if (size < 1) { + return 0; + } + + struct cmt_decode_prometheus_parse_opts opts; + result = cmt_decode_prometheus_create(&cmt, data, size, &opts); + if (result == CMT_DECODE_PROMETHEUS_SUCCESS) { + cmt_decode_prometheus_destroy(cmt); + } + + return 0; +} diff --git a/tests/runtime_shell/CMakeLists.txt b/tests/runtime_shell/CMakeLists.txt index 50678928238..30ae87a6b96 100644 --- a/tests/runtime_shell/CMakeLists.txt +++ b/tests/runtime_shell/CMakeLists.txt @@ -14,6 +14,7 @@ set(UNIT_TESTS_SH in_syslog_uds_dgram_plaintext_expect.sh in_syslog_uds_stream_plaintext_expect.sh processor_conditional.sh + processor_invalid.sh ) # Prepare list of unit tests diff --git a/tests/runtime_shell/processor_invalid.sh b/tests/runtime_shell/processor_invalid.sh new file mode 100755 index 00000000000..59a72c7ef32 --- /dev/null +++ b/tests/runtime_shell/processor_invalid.sh @@ -0,0 +1,67 @@ +#!/bin/sh + +# Setup environment if not already set +if [ -z "$FLB_BIN" ]; then + FLB_ROOT=${FLB_ROOT:-$(cd $(dirname $0)/../.. && pwd)} + FLB_BIN=${FLB_BIN:-$FLB_ROOT/build/bin/fluent-bit} +fi + +echo "Using Fluent Bit at: $FLB_BIN" + +# Create a temporary YAML config file +cat > /tmp/processor_invalid.yaml << EOL +service: + log_level: debug + flush: 1 +pipeline: + inputs: + - name: dummy + dummy: '{"message": "test message"}' + tag: test + processors: + logs: + - name: non_existent_processor + action: invalid + + outputs: + - name: stdout + match: '*' +EOL + +echo "Running Fluent Bit with invalid processor YAML config..." +echo "YAML Config:" +cat /tmp/processor_invalid.yaml + +# Redirect stdout and stderr to a file for analysis +OUTPUT_FILE="/tmp/processor_invalid_output.txt" +$FLB_BIN -c /tmp/processor_invalid.yaml -o stdout > $OUTPUT_FILE 2>&1 + +# Check exit code - we expect it to fail +EXIT_CODE=$? +echo "Fluent Bit exited with code: $EXIT_CODE" + +# Show the output +echo "Output file content:" +cat $OUTPUT_FILE + +# Check if the output contains an error related to invalid processor +INVALID_PROCESSOR=$(grep -c "error creating processor 'non_existent_processor': plugin doesn't exist or failed to initialize" $OUTPUT_FILE || true) +FAILED_INIT=$(grep -c "error initializing processor" $OUTPUT_FILE || true) + +# Clean up +echo "Cleaning up..." +rm -f /tmp/processor_invalid.yaml +rm -f $OUTPUT_FILE + +# Check results - we expect Fluent Bit to fail (non-zero exit code) +# and have an error message about the invalid processor +if [ "$EXIT_CODE" -ne 0 ] && ([ "$INVALID_PROCESSOR" -gt 0 ] || [ "$FAILED_INIT" -gt 0 ]); then + echo "Test passed: Fluent Bit failed with error about invalid processor" + exit 0 +else + echo "Test failed: Fluent Bit should fail when an invalid processor is configured" + echo "Exit code: $EXIT_CODE (expected non-zero)" + echo "Invalid processor message count: $INVALID_PROCESSOR (expected > 0)" + echo "Failed init message count: $FAILED_INIT (expected > 0)" + exit 1 +fi \ No newline at end of file