-
Notifications
You must be signed in to change notification settings - Fork 9.1k
HADOOP-19604. ABFS: BlockId generation based on blockCount along with full blob md5 computation change #7777
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Conversation
💔 -1 overall
This message was automatically generated. |
💔 -1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
============================================================
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Production code review. Will review test code in separate iteration.
super(outputStream, offset); | ||
this.blockId = generateBlockId(offset); | ||
this.blockIndex = blockIndex; | ||
String streamId = getOutputStream().getStreamID(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: outputstream is passed as a parameter to this constructor. I think its better to use that parameter directly. Calling a getter for this is giving an impression as if some other input stream is called upon.
Beside that getter is not used any where else.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
this.blockId = generateBlockId(offset); | ||
this.blockIndex = blockIndex; | ||
String streamId = getOutputStream().getStreamID(); | ||
UUID streamIdGuid = UUID.nameUUIDFromBytes(streamId.getBytes(StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: streamId and streamIdGuid are local variables and used only for generating block Ids. They should be moved inside that method only as it was previously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is common across all the blocks, we don't want the computation to be done repeatedly hence have put it in the constructor
if (rawLength != 0) { | ||
// Adjust to match expected decoded length | ||
if (rawBlockId.length() < rawLength) { | ||
rawBlockId = String.format("%-" + rawLength + "s", rawBlockId) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: use constants everywhere there is a hardcoded string.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
* Helper method that generates blockId. | ||
* @param position The offset needed to generate blockId. | ||
* @return String representing the block ID generated. | ||
* Generates a Base64-encoded block ID string based on the given position, stream ID, and desired raw length. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How did we arrive at this logic?
Is there some server side recommendation to follow this?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This logic is used across clients. They follow the pattern of UUID followed by index
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the comment, you have mentioned that block id is generated based on position, stream Id and raw length. Where exactly are we using position in this method? Am I missing something here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
corrected it
@@ -982,6 +985,11 @@ public AbfsRestOperation appendBlock(final String path, | |||
if (requestParameters.getLeaseId() != null) { | |||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, requestParameters.getLeaseId())); | |||
} | |||
if (isChecksumValidationEnabled()) { | |||
if (requestParameters.getMd5() != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check on reqParams and adding of headers is repeating. May be we can retain the original method and update its implementation as needed now.
That method anyway wasn't used anywhere else as of now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -1097,8 +1105,10 @@ public AbfsRestOperation flush(byte[] buffer, | |||
AbfsRestOperation op1 = getPathStatus(path, true, tracingContext, | |||
contextEncryptionAdapter); | |||
String metadataMd5 = op1.getResult().getResponseHeader(CONTENT_MD5); | |||
if (!md5Hash.equals(metadataMd5)) { | |||
throw ex; | |||
if (blobMd5 != null) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: combine if statements using &&
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -1914,7 +1924,9 @@ private List<AbfsHttpHeader> getMetadataHeadersList(final Hashtable<String, Stri | |||
// AzureBlobFileSystem supports only ASCII Characters in property values. | |||
if (isPureASCII(value)) { | |||
try { | |||
value = encodeMetadataAttribute(value); | |||
if (!XML_TAG_HDI_PERMISSION.equalsIgnoreCase(entry.getKey())) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Seems like we want to encode all other headers except XML_TAG_HDI_PERMISSION.
Can we add a comment around this explaining the reason?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
if (rawBlockId.length() < rawLength) { | ||
rawBlockId = String.format("%-" + rawLength + "s", rawBlockId) | ||
.replace(' ', '_'); | ||
} else if (rawBlockId.length() > rawLength) { | ||
rawBlockId = rawBlockId.substring(0, rawLength); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we use ternary logic here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
that will make readability a bit difficult
byte[] digest = null; | ||
String fullBlobMd5 = null; | ||
try { | ||
// Clone the MessageDigest to avoid resetting the original state | ||
MessageDigest clonedMd5 = (MessageDigest) getAbfsOutputStream().getFullBlobContentMd5().clone(); | ||
digest = clonedMd5.digest(); | ||
} catch (CloneNotSupportedException e) { | ||
LOG.warn("Failed to clone MessageDigest instance", e); | ||
} | ||
if (digest != null && digest.length != 0) { | ||
fullBlobMd5 = Base64.getEncoder().encodeToString(digest); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this code is common in both DFS and Blob ingress handler class, maybe we can add it as a protected helper method in the abstract class AzureIngressHandler?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -134,7 +134,7 @@ protected long getBlockCount() { | |||
* | |||
* @param blockCount the count of blocks to set | |||
*/ | |||
public void setBlockCount(final long blockCount) { | |||
protected void setBlockCount(final long blockCount) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why this change?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The modifier level was incorrect earlier, corrected it
@@ -1132,7 +1130,7 @@ public void testFlushSuccessWithConnectionResetOnResponseValidMd5() throws Excep | |||
Mockito.nullable(String.class), | |||
Mockito.anyString(), | |||
Mockito.nullable(ContextEncryptionAdapter.class), | |||
Mockito.any(TracingContext.class) | |||
Mockito.any(TracingContext.class), Mockito.anyString() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should this be Mockito.nullable(String.class) as the md5 here can be null?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
md = MessageDigest.getInstance(MD5); | ||
} catch (NoSuchAlgorithmException e) { | ||
// MD5 algorithm not available; md will remain null | ||
// Log this in production code if needed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we can remove this line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
pos += appendWithOffsetHelper(os, client, path, data, fs, pos, ONE_MB); | ||
pos += appendWithOffsetHelper(os, client, path, data, fs, pos, MB_2); | ||
appendWithOffsetHelper(os, client, path, data, fs, pos, MB_4 - 1); | ||
pos += appendWithOffsetHelper(os, client, path, data, fs, pos, 0, getMd5(data, 0, data.length)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: double spaces
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
@@ -42,23 +48,40 @@ public class AbfsBlobBlock extends AbfsBlock { | |||
* @param offset Used to generate blockId based on offset. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add newly added parameter in method comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -77,6 +81,7 @@ public AppendRequestParameters(final long position, | |||
* @param leaseId leaseId of the blob to be appended | |||
* @param isExpectHeaderEnabled true if the expect header is enabled | |||
* @param blobParams parameters specific to append operation on Blob Endpoint. | |||
* @param md5 The Base64-encoded MD5 hash of the block for data integrity validation. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: Format can be corrected - extra space before @param and after md5
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
this.blockId = generateBlockId(offset); | ||
this.blockIndex = blockIndex; | ||
String streamId = outputStream.getStreamID(); | ||
UUID streamIdGuid = UUID.nameUUIDFromBytes(streamId.getBytes(StandardCharsets.UTF_8)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can streamId be null? streamId.getBytes can raise null pointer exception. Better to handle it,
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StreamId can never be null as this is set in constructor of AbfsOutputStream itself, this.outputStreamId = createOutputStreamId();
* Helper method that generates blockId. | ||
* @param position The offset needed to generate blockId. | ||
* @return String representing the block ID generated. | ||
* Generates a Base64-encoded block ID string based on the given position, stream ID, and desired raw length. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the comment, you have mentioned that block id is generated based on position, stream Id and raw length. Where exactly are we using position in this method? Am I missing something here?
@@ -982,6 +983,9 @@ public AbfsRestOperation appendBlock(final String path, | |||
if (requestParameters.getLeaseId() != null) { | |||
requestHeaders.add(new AbfsHttpHeader(X_MS_LEASE_ID, requestParameters.getLeaseId())); | |||
} | |||
if (isChecksumValidationEnabled()) { | |||
addCheckSumHeaderForWrite(requestHeaders, requestParameters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT: formatting required - there must be one tab in the start
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -1032,7 +1036,7 @@ public AbfsRestOperation flush(final String path, | |||
final String cachedSasToken, | |||
final String leaseId, | |||
final ContextEncryptionAdapter contextEncryptionAdapter, | |||
final TracingContext tracingContext) throws AzureBlobFileSystemException { | |||
final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add new argument in the comments @param. Please make this change wherever required.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
@@ -1060,7 +1064,7 @@ public AbfsRestOperation flush(byte[] buffer, | |||
final String leaseId, | |||
final String eTag, | |||
ContextEncryptionAdapter contextEncryptionAdapter, | |||
final TracingContext tracingContext) throws AzureBlobFileSystemException { | |||
final TracingContext tracingContext, String blobMd5) throws AzureBlobFileSystemException { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
taken
* @param tracingContext for tracing the server calls. | ||
* @return executed rest operation containing response from server. | ||
* @throws AzureBlobFileSystemException if rest operation fails. | ||
* Flushes previously uploaded data to the specified path. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
NIT - Format can be consistent across places.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Taken
🎊 +1 overall
This message was automatically generated. |
🎊 +1 overall
This message was automatically generated. |
Jira :- https://issues.apache.org/jira/browse/HADOOP-19604
BlockId computation to be consistent across clients for PutBlock and PutBlockList so made use of blockCount instead of offset.
Block IDs were previously derived from the data offset, which could lead to inconsistency across different clients. The change now uses blockCount (i.e., the index of the block) to compute the Block ID, ensuring deterministic and consistent ID generation for both PutBlock and PutBlockList operations across clients.
Restrict URL encoding of certain JSON metadata during setXAttr calls.
When setting extended attributes (xAttrs), the JSON metadata (hdi_permission) was previously URL-encoded, which could cause unnecessary escaping or compatibility issues. This change ensures that only required metadata are encoded.
Maintain the MD5 hash of the whole block to validate data int 8000 egrity during flush.
During flush operations, the MD5 hash of the entire block's data is computed and stored. This hash is later used to validate that the block correctly persisted, ensuring data integrity and helping detect corruption or transmission errors.