Seamless integration into your data pipeline to secure sensitive data fields.
Getting Started »
Report Issue
·
Report Bug
·
Request Feature
Table of Contents
SecuredFlow enhances data security with efficient field level encryption, supporting both AES (Symmetric Key) and RSA (Asymmetric Key) encryption methods. Ensure the safety of your critical information throughout its journey in the Apache Kafka® and Confluent Platform.
SecuredFlow is the perfect solution for organizations aiming to safeguard their data in the digital era.
- Configurable Field Selection : Encrypt/Decrypt specific fields based on configuration.
-
- Mode :
GCM
- Padding :
NoPadding
- Initialization Vector (IV) :
96-bit (12-byte)
- Authentication tag length :
128 bit
- Provider :
SunJCE
- Accepts keys in :
Base64-encoded key.
Java KeyStore File (JKS) with Both KeyStore and Key Password.
- Mode :
-
- Mode :
ECB
- Padding :
OAEPWithSHA-256AndMGF1Padding
- Provider :
SunJCE
- Accepts keys in :
Privacy Enhanced Mail File (PEM).
Java KeyStore File (JKS) with Both KeyStore and Key Password.
- Mode :
- Apache Kafka® 3.3+ with Java version 17.
- Confluent Platform : 7.3+ with Java version 17.
- Additional details about the compatibility of Apache Kafka® and the Confluent Platform.
Name | Default | Acceptable | Require | Description |
---|---|---|---|---|
fields | - | List of fields | âś… | List of fields to be encrypted or decrypted. |
service | - | local-encryption |
âś… | Service to use for encryption.local-encryption : Use a self-generated key for encryption. |
mode | - | encrypt decrypt |
âś… | Mode to specific SecuredFlow to worked with encrypt or decrypt . |
message.encrypted.format | byte |
byte string |
âś… | Message format after encryption. |
message.encrypt.error | fail |
fail log ignore |
âś… | Behavior to handle SecuredFlow when meet the error while encryption.fail : Force the Connector or Application using SecuredFlow to go down.log : Only log an ERROR message, but the Connector or Application using SecuredFlow will continue to run.ignore : Ignore the encountered error and continue running the Connector or Application using SecuredFlow as normal. |
local.encrypt.type | RSA |
AES RSA |
âś… | Algorithm to specific SecuredFlow to worked. AES : GCM NoPadding 96-bit (12-byte) 128 bit SunJCE RSA : ECB OAEPWithSHA-256AndMGF1Padding SunJCE Note : AES , local.key or local.key.location (with .JKS ) must also be provided.RSA , local.key.location (with .JKS or .PEM ) must also be provided. |
local.key | - | Base64-encoded key | - | Base64-encoded key for AES encryption. |
local.key.location | - | .PEM .JKS |
- | Key file path for encryption.PEM : RSA algorithm and the SunRsaSign provider with java.security.KeyFactory to read the key from the file. java.security.spec.X509EncodedKeySpec to read the PublicKey from the file. java.security.spec.PKCS8EncodedKeySpec to read the PrivateKey from the file. .JKS : JKS type and the SUN provider with java.security.KeyStore to read the key from the file.Note : AES encryption. If local.key is already specified, local.key.location will be ignored. .JKS file, local.keystore.password, local.key.password and local.key.alias must also be provided. |
local.keystore.password | - | Keystore password | - | Keystore password for JKS file. |
local.key.password | - | Key password | - | Key password for JKS file. |
local.key.alias | - | Key alias | - | Key alias for key in JKS file. |
-
Supported only ConnectRecords (record from Connector):
- With AvroConverter. (
io.confluent.connect.avro.AvroConverter
) - With ProtobufConverter. (
io.confluent.connect.protobuf.ProtobufConverter
) - With JsonSchemaConverter. (
io.confluent.connect.json.JsonSchemaConverter
)
- With AvroConverter. (
-
Fields to be used for encryption must be:
- Part of the ConnectRecords as mentioned above.
- The Schema must be either STRING_SCHEMA or OPTIONAL_STRING_SCHEMA.
This is an example of using SecuredFlow to encrypt and decrypt data while it flows through Apache Kafka® and Confluent Platform.
-
-
-
"transforms": "Encrypt" "transforms.Encrypt.type": "org.mfec.kafka.connect.smt.FieldEncrypt$Value" "transforms.Encrypt.fields": "name, nickname" "transforms.Encrypt.service": "local-encryption" "transforms.Encrypt.mode": "encrypt" "transforms.Encrypt.message.encrypted.format": "string" "transforms.Encrypt.message.encrypt.error": "fail" "transforms.Encrypt.local.encrypt.type": "AES" "transforms.Encrypt.local.key": "eKk6hlEH2jo0MrTYQ7IzETPv7eOPLOcrRNsAHgvFb3o="
Input
{ "name" : "MFEC", "lastname" : "SecuredFlow", "nickname" : "DataWise", "age" : 25 }
Output
{ "name" : "cF8o3P40NcXXzSMPyO/tGw==", "lastname" : "SecuredFlow", "nickname" : "9z52wNc/Dahb6MptDizetg==", "age" : 25 }
-
"transforms": "Encrypt" "transforms.Encrypt.type": "org.mfec.kafka.connect.smt.FieldEncrypt$Value" "transforms.Encrypt.fields": "name, nickname" "transforms.Encrypt.service": "local-encryption" "transforms.Encrypt.mode": "encrypt" "transforms.Encrypt.message.encrypted.format": "string" "transforms.Encrypt.message.encrypt.error": "fail" "transforms.Encrypt.local.encrypt.type": "AES" "transforms.Encrypt.local.key.location": "/MFEC/DataWise/SecuredFlow/secretKey.jks" "transforms.Encrypt.local.keystore.password": "P@ssw0rd" "transforms.Encrypt.local.key.password": "P@ssw0rd" "transforms.Encrypt.local.key.alias": "keyAlias"
Input
{ "name" : "MFEC", "lastname" : "SecuredFlow", "nickname" : "DataWise", "age" : 25 }
Output
{ "name" : "E3sWHNFYEZRns+WXZFQa4A==", "lastname" : "SecuredFlow", "nickname" : "GQanhezkXQ3kSMh0dunEKw==", "age" : 25 }
-
-
-
"transforms": "Encrypt" "transforms.Encrypt.type": "org.mfec.kafka.connect.smt.FieldEncrypt$Value" "transforms.Encrypt.fields": "name, nickname" "transforms.Encrypt.service": "local-encryption" "transforms.Encrypt.mode": "encrypt" "transforms.Encrypt.message.encrypted.format": "byte" "transforms.Encrypt.message.encrypt.error": "fail" "transforms.Encrypt.local.encrypt.type": "RSA" "transforms.Encrypt.local.key.location": "/MFEC/DataWise/SecuredFlow/key.pem"
Input
{ "name" : "MFEC", "lastname" : "SecuredFlow", "nickname" : "DataWise", "age" : 25 }
Output
{ "name" : "[B@dbf57b3", "lastname" : "SecuredFlow", "nickname" : "[B@6973b51b", "age" : 25 }
-
"transforms": "Encrypt" "transforms.Encrypt.type": "org.mfec.kafka.connect.smt.FieldEncrypt$Value" "transforms.Encrypt.fields": "name, nickname" "transforms.Encrypt.service": "local-encryption" "transforms.Encrypt.mode": "encrypt" "transforms.Encrypt.message.encrypted.format": "byte" "transforms.Encrypt.message.encrypt.error": "fail" "transforms.Encrypt.local.encrypt.type": "RSA" "transforms.Encrypt.local.key.location": "/MFEC/DataWise/SecuredFlow/key.jks" "transforms.Encrypt.local.keystore.password": "P@ssw0rd" "transforms.Encrypt.local.key.password": "P@ssw0rd" "transforms.Encrypt.local.key.alias": "keyAlias"
Input
{ "name" : "MFEC", "lastname" : "SecuredFlow", "nickname" : "DataWise", "age" : 25 }
Output
{ "name" : "[B@689604d9", "lastname" : "SecuredFlow", "nickname" : "[B@409bf450", "age" : 25 }
-
-
-
-
Decryption must use the same key as was used during encryption.
-
The message.encrypted.format and local.encrypt.type configuration during decryption must match the configuration used during encryption.
-
-
"transforms": "Decrypt" "transforms.Decrypt.type": "org.mfec.kafka.connect.smt.FieldEncrypt$Value" "transforms.Decrypt.fields": "name, nickname" "transforms.Decrypt.service": "local-encryption" "transforms.Decrypt.mode": "decrypt" "transforms.Decrypt.message.encrypted.format": "string" "transforms.Decrypt.message.encrypt.error": "fail" "transforms.Decrypt.local.encrypt.type": "AES" "transforms.Decrypt.local.key": "eKk6hlEH2jo0MrTYQ7IzETPv7eOPLOcrRNsAHgvFb3o="
Input
{ "name" : "cF8o3P40NcXXzSMPyO/tGw==", "lastname" : "SecuredFlow", "nickname" : "9z52wNc/Dahb6MptDizetg==", "age" : 25 }
Output
{ "name" : "MFEC", "lastname" : "SecuredFlow", "nickname" : "DataWise", "age" : 25 }
-
"transforms": "Decrypt" "transforms.Decrypt.type": "org.mfec.kafka.connect.smt.FieldEncrypt$Value" "transforms.Decrypt.fields": "name, nickname" "transforms.Decrypt.service": "local-encryption" "transforms.Decrypt.mode": "decrypt" "transforms.Decrypt.message.encrypted.format": "string" "transforms.Decrypt.message.encrypt.error": "fail" "transforms.Decrypt.local.encrypt.type": "AES" "transforms.Decrypt.local.key.location": "/MFEC/DataWise/SecuredFlow/secretKey.jks" "transforms.Decrypt.local.keystore.password": "P@ssw0rd" "transforms.Decrypt.local.key.password": "P@ssw0rd" "transforms.Decrypt.local.key.alias": "keyAlias"
Input
{ "name" : "E3sWHNFYEZRns+WXZFQa4A==", "lastname" : "SecuredFlow", "nickname" : "GQanhezkXQ3kSMh0dunEKw==", "age" : 25 }
Output
{ "name" : "MFEC", "lastname" : "SecuredFlow", "nickname" : "DataWise", "age" : 25 }
-
-
-
"transforms": "Decrypt" "transforms.Decrypt.type": "org.mfec.kafka.connect.smt.FieldEncrypt$Value" "transforms.Decrypt.fields": "name, nickname" "transforms.Decrypt.service": "local-encryption" "transforms.Decrypt.mode": "decrypt" "transforms.Decrypt.message.encrypted.format": "byte" "transforms.Decrypt.message.encrypt.error": "fail" "transforms.Decrypt.local.encrypt.type": "RSA" "transforms.Decrypt.local.key.location": "/MFEC/DataWise/SecuredFlow/key.pem"
Input
{ "name" : "[B@dbf57b3", "lastname" : "SecuredFlow", "nickname" : "[B@6973b51b", "age" : 25 }
Output
{ "name" : "MFEC", "lastname" : "SecuredFlow", "nickname" : "DataWise", "age" : 25 }
-
"transforms": "Decrypt" "transforms.Decrypt.type": "org.mfec.kafka.connect.smt.FieldEncrypt$Value" "transforms.Decrypt.fields": "name, nickname" "transforms.Decrypt.service": "local-encryption" "transforms.Decrypt.mode": "decrypt" "transforms.Decrypt.message.encrypted.format": "byte" "transforms.Decrypt.message.encrypt.error": "fail" "transforms.Decrypt.local.encrypt.type": "RSA" "transforms.Decrypt.local.key.location": "/MFEC/DataWise/SecuredFlow/key.jks" "transforms.Decrypt.local.keystore.password": "P@ssw0rd" "transforms.Decrypt.local.key.password": "P@ssw0rd" "transforms.Decrypt.local.key.alias": "keyAlias"
Input
{ "name" : "[B@689604d9", "lastname" : "SecuredFlow", "nickname" : "[B@409bf450", "age" : 25 }
Output
{ "name" : "MFEC", "lastname" : "SecuredFlow", "nickname" : "DataWise", "age" : 25 }
-
-
-
-
SecuredFlow can be applied to both the key and value of ConnectRecords (record from Connector) :
-
If you want to apply encryption on the key of ConnectRecords specify configuration
type
as follows :"transforms": "Encrypt" "transforms.Encrypt.type": "org.mfec.kafka.connect.smt.FieldEncrypt$Key"
-
If you want to apply encryption on the value of ConnectRecords specify configuration
type
as follows :"transforms": "Encrypt" "transforms.Encrypt.type": "org.mfec.kafka.connect.smt.FieldEncrypt$Value"
-
There is no license restriction for usage.
If you require urgent support or need additional encryption features with high priority for your project, please feel free to contact us for further consultation. We are more than happy to assist you to the fullest extent.