Conversation
|
|
||
| public class AesGcmOutputStream extends PositionOutputStream { | ||
| // AES-GCM parameters | ||
| public static final int GCM_NONCE_LENGTH = 12; // in bytes |
There was a problem hiding this comment.
looks like we should make these variables in a shared util class, instead of importing this from the output stream to the input stream
| gcmCipher = Cipher.getInstance("AES/GCM/NoPadding"); | ||
| } catch (GeneralSecurityException e) { | ||
| throw new IOException(e); | ||
| } |
There was a problem hiding this comment.
nit: newline after control statement
|
Did a comparison against my internal impl, mostly look correct to me. Will take another look after all the remaining methods are implemented. |
| import static org.apache.iceberg.encryption.AesGcmOutputStream.HEADER_SIZE_LENGTH; | ||
|
|
||
| public class AesGcmInputStream extends SeekableInputStream { | ||
| private SeekableInputStream sourceStream; |
There was a problem hiding this comment.
sure, will change in the next commit
| private int numberOfBlocks; | ||
| private int lastBlockSize; | ||
| private long plainStreamSize; | ||
| private byte[] fileAadPrefix; |
There was a problem hiding this comment.
It is not used anywhere. Do we need it?
There was a problem hiding this comment.
Oh, there is a TODO item referring it in the method read, never mind.
There was a problem hiding this comment.
Both plainStreamSize and fileAadPrefix could be final.
| private long plainStreamSize; | ||
| private byte[] fileAadPrefix; | ||
|
|
||
| AesGcmInputStream(SeekableInputStream sourceStream, long sourceLength, |
There was a problem hiding this comment.
Do we need it to be public since the class itself is public? Or we can have a builder.
There was a problem hiding this comment.
Always created by an AesGcmInputFile in the same package.
| private SeekableInputStream sourceStream; | ||
| private long netSourceFileSize; | ||
|
|
||
| private Cipher gcmCipher; |
| private PositionOutputStream targetStream; | ||
|
|
||
| private Cipher gcmCipher; | ||
| private SecureRandom random; | ||
| private SecretKey key; | ||
| private byte[] nonce; |
There was a problem hiding this comment.
These fields can be final.
flyrain
left a comment
There was a problem hiding this comment.
Thanks @ggershinsky for the patch. Can we add a unit test for these two classes?
|
@ggershinsky Is this PR still going on? |
|
same as in #2639 (comment) |
done |
|
|
||
| @Override | ||
| public long getLength() { | ||
| Preconditions.checkArgument(plaintextLength >= 0, "Length is known after new stream is created"); |
There was a problem hiding this comment.
Should we set the length if field plaintextLength is -1? Looks like this method is valid only after newStream() is called.
if(plaintextLength == -1) {
this.newStream();
}
There was a problem hiding this comment.
When do we close the stream in this case? Or do we just make and close it in that case?
There was a problem hiding this comment.
Both suggestions sound good to me.
| int toLoad = lastBlock ? lastBlockSize : cipherBlockSize; | ||
| int loaded = sourceStream.read(ciphertextBlockBuffer, 0, toLoad); | ||
| if (loaded != toLoad) { | ||
| throw new IOException("Read " + loaded + " instead of " + toLoad); |
There was a problem hiding this comment.
Minor suggestion: Would it be a bit easier to understand if message is "Should read toLoad data, but only get loaded data"?
| @Override | ||
| public void seek(long newPos) throws IOException { | ||
| if (newPos >= plainStreamSize) { | ||
| throw new IOException("At or beyond max stream size " + plainStreamSize + ", " + newPos); |
There was a problem hiding this comment.
The massage could be
"The seeking position " + newPos + " has reached or exceeded the max stream size " + plainStreamSize.
|
|
||
| @Override | ||
| public synchronized void reset() throws IOException { | ||
| throw new UnsupportedOperationException(); |
There was a problem hiding this comment.
Looks like we don't have to implement it, the abstract class InputStream has the same logic.
| try { | ||
| return new AesGcmOutputStream(targetFile.create(), dataKey, null); | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException(e); |
There was a problem hiding this comment.
Can we add message like, "Failed to create file: %s", targetFile.location()?
| try { | ||
| return new AesGcmOutputStream(targetFile.createOrOverwrite(), dataKey, null); | ||
| } catch (IOException e) { | ||
| throw new UncheckedIOException(e); |
There was a problem hiding this comment.
Can we add message like, "Failed to create or overwrite file: %s", targetFile.location()?
| private SecretKey key; | ||
| private byte[] nonce; | ||
|
|
||
| private int blockSize = 1024 * 1024; |
There was a problem hiding this comment.
Make it a final static field?
| private int positionInBuffer; | ||
| private long streamPosition; | ||
| private int currentBlockIndex; | ||
| private byte[] fileAadPrefix; |
| try { | ||
| int encrypted = gcmCipher.doFinal(plaintextBlockBuffer, 0, positionInBuffer, cipherText, GCM_NONCE_LENGTH); | ||
| Preconditions.checkArgument((encrypted == (positionInBuffer + GCM_TAG_LENGTH)), | ||
| "Wrong length of encrypted output: " + encrypted + " vs " + (positionInBuffer + GCM_TAG_LENGTH)); |
There was a problem hiding this comment.
How about a message like this?
"Wrong length of encrypted output: expected " + (positionInBuffer + GCM_TAG_LENGTH)) + " but " + encrypted;
| targetStream.write(cipherText); | ||
| } | ||
|
|
||
| static byte[] calculateAAD(byte[] fileAadPrefix, int currentBlockIndex) { |
There was a problem hiding this comment.
This can go to a common place as well since both classes need it. We may have a util class, e.g. GcmUtil
| try { | ||
| gcmCipher = Cipher.getInstance("AES/GCM/NoPadding"); | ||
| } catch (GeneralSecurityException e) { | ||
| throw new IOException(e); |
There was a problem hiding this comment.
Add message for easier debugging?
new IOException("Failed to get an instance of GCM cipher", e)
| Preconditions.checkArgument(fetched == AesGcmOutputStream.PREFIX_LENGTH, | ||
| "Insufficient read " + fetched); |
There was a problem hiding this comment.
They can be in the same line.
IIUC, this means the stream length is not long enough. Can we say something like "The stream length should be at least " + AesGcmOutputStream.PREFIX_LENGTH?
There was a problem hiding this comment.
Aren't we also in danger of the read being less than prefix bytes?
| private int currentBlockIndex; | ||
| private int currentOffsetInPlainBlock; | ||
| private int numberOfBlocks; | ||
| private int lastBlockSize; |
There was a problem hiding this comment.
Can we rename it to lastCipherBlockSize to make it more readable?
| private SecretKey key; | ||
| private byte[] nonce; |
| public TemporaryFolder temp = new TemporaryFolder(); | ||
|
|
||
| @Test | ||
| public void testRandomWriteRead() throws IOException { |
There was a problem hiding this comment.
I like this, could I ask for just 4 more test cases
Empty File (no bits)
File that aligns perfectly with encryption Chunk Size
File that is exactly one byte to larger than the aligned and one that is one byte smaller than the aligned file. (we probably hit this unaligned version with the testFileSize below but just to make sure)
| System.arraycopy(prefixBytes, 0, magic, 0, AesGcmOutputStream.MAGIC_ARRAY.length); | ||
|
|
||
| Preconditions.checkArgument(Arrays.equals(AesGcmOutputStream.MAGIC_ARRAY, magic), | ||
| "File with wrong magic string. Should start with " + AesGcmOutputStream.MAGIC_STRING); |
There was a problem hiding this comment.
Instead of "file with wrong magic string" -> "Cannot read unencrypted file, missing header containing: ..."?
| cipherBlockSize = plainBlockSize + AesGcmOutputStream.GCM_NONCE_LENGTH + AesGcmOutputStream.GCM_TAG_LENGTH; | ||
|
|
||
| try { | ||
| gcmCipher = Cipher.getInstance("AES/GCM/NoPadding"); |
There was a problem hiding this comment.
Yep. We actually already have a Ciphers class; let's re-use it (with some additions) for all GCM-related things
Refactor AesGcmInputStream.
Thanks! Merged the PR. |
| } | ||
|
|
||
| private int availableInCurrentBlock() { | ||
| if (currentPlainBlockIndex < 0) { |
There was a problem hiding this comment.
When I was refactoring, there was a bug that I fixed when the block for plainStreamPosition didn't match currentPlainBlockIndex. I ended up fixing the problem by setting currentPlainBlockIndex = -1 in a place that I had missed. Now that I'm thinking about it more, I think the right solution is actually to update the check here instead. That way the bytes available will only be non-zero if the current block matches the position.
Here's a diff that does what I'm talking about and still passes tests:
diff --git a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java
index a63134f31d..88e9b36c25 100644
--- a/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java
+++ b/core/src/main/java/org/apache/iceberg/encryption/AesGcmInputStream.java
@@ -97,7 +97,7 @@ public class AesGcmInputStream extends SeekableInputStream {
}
private int availableInCurrentBlock() {
- if (currentPlainBlockIndex < 0) {
+ if (blockIndex(plainStreamPosition) != currentPlainBlockIndex) {
return 0;
}
@@ -130,10 +130,6 @@ public class AesGcmInputStream extends SeekableInputStream {
remainingBytesToRead -= bytesToCopy;
resultBufferOffset += bytesToCopy;
this.plainStreamPosition += bytesToCopy;
- if (blockIndex(plainStreamPosition) != currentPlainBlockIndex) {
- // invalidate the current block
- this.currentPlainBlockIndex = -1;
- }
} else if (available() > 0) {
decryptBlock(blockIndex(plainStreamPosition));
@@ -157,10 +153,6 @@ public class AesGcmInputStream extends SeekableInputStream {
}
this.plainStreamPosition = newPos;
- if (blockIndex(plainStreamPosition) != currentPlainBlockIndex) {
- // invalidate the current block
- this.currentPlainBlockIndex = -1;
- }
}
@Override
@@ -177,10 +169,6 @@ public class AesGcmInputStream extends SeekableInputStream {
}
this.plainStreamPosition += n;
- if (blockIndex(plainStreamPosition) != currentPlainBlockIndex) {
- // invalidate the current block
- this.currentPlainBlockIndex = -1;
- }
return n;
}There was a problem hiding this comment.
SGTM. Applied this change.
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputFile.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputFile.java
Outdated
Show resolved
Hide resolved
| this.gcmEncryptor = new Ciphers.AesGcmEncryptor(aesKey); | ||
| this.plainBlockBuffer = new byte[Ciphers.PLAIN_BLOCK_SIZE]; | ||
| this.positionInBuffer = 0; | ||
| this.streamPosition = 0; |
There was a problem hiding this comment.
Shouldn't this always be targetStream.pos() + positionInBuffer?
There was a problem hiding this comment.
this value is returned by
@Override
public long getPos() throws IOException {
return streamPosition;
}so is not related directly to targetStream.pos() and positionInBuffer.
I'll rename positionInBuffer to positionInPlainBlock, and plainBlockBuffer to plainBlock, to make the naming more consistent.
There was a problem hiding this comment.
I don't see why what I said wouldn't always be true. If so, we could eliminate this extra state.
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Outdated
Show resolved
Hide resolved
core/src/main/java/org/apache/iceberg/encryption/AesGcmOutputStream.java
Outdated
Show resolved
Hide resolved
| + enciphered | ||
| + ". Must be " | ||
| + plaintextLength | ||
| + GCM_TAG_LENGTH); |
There was a problem hiding this comment.
I think this can be more clear because it doesn't tell the reader that the encryption didn't work right.
Instead, how about Failed to encrypt block: expected %s encrypted bytes but produced %s bytes?
core/src/test/java/org/apache/iceberg/encryption/TestCiphers.java
Outdated
Show resolved
Hide resolved
|
@ggershinsky, I opened ggershinsky#8 with the remaining changes for this PR. The main change is to write an empty block for empty files. Otherwise an attacker could replace a valid file with an empty file without detection. |
Final changes for AES GCM encryption streams
|
Cool! LGTM. This could have been detected via external check of details (such as the file length, stored in parent metadata), but I agree doing this in the stream is cleaner and more reliable. Thanks for the PR, merged. |
Implements #2060
Besides encrypting and signing metadata (avro, json, puffin), this stream can also encrypt Avro data.