Parquet 源码内部剖析 | AI生成和翻译

Home 2026.06

现在我已经获得了所有的实际源代码。以下是完整的源码级分析。


Parquet: 源码级内部机制

1. 格式规范(Thrift IDL)

一切都始于 parquet-format/src/main/thrift/parquet.thrift 中的 Thrift 定义。这本身就是规范——所有 Parquet 读写器都必须遵循这些结构体。

FileMetaData —— 页脚,最后写入:

struct FileMetaData {
  1: required i32 version              // 始终为 1
  2: required list<SchemaElement> schema  // DFS 展平的树
  3: required i64 num_rows
  4: required list<RowGroup> row_groups
  5: optional list<KeyValue> key_value_metadata
  6: optional string created_by
  7: optional list<ColumnOrder> column_orders  // 最小/最大排序顺序
}

RowGroup —— 水平分区:

struct RowGroup {
  1: required list<ColumnChunk> columns   // 每个 schema 叶子节点一个
  2: required i64 total_byte_size         // 未压缩大小
  3: required i64 num_rows
  4: optional list<SortingColumn> sorting_columns
  5: optional i64 file_offset             // 第一个页面的字节偏移
  6: optional i64 total_compressed_size
  7: optional i16 ordinal
}

ColumnChunk —— 指向列数据 + 元数据的指针:

struct ColumnChunk {
  1: optional string file_path           // 外部文件(极少使用)
  2: required i64 file_offset = 0        // 已弃用
  3: optional ColumnMetaData meta_data   // 关键字段
  4: optional i64 offset_index_offset    // 页面索引位置
  5: optional i32 offset_index_length
  6: optional i64 column_index_offset    // 页面级统计信息
  7: optional i32 column_index_length
}

ColumnMetaData —— 每列的统计信息、编码、压缩:

struct ColumnMetaData {
  1: required Type type
  2: required list<Encoding> encodings          // 例如 [RLE_DICTIONARY, PLAIN]
  3: required list<string> path_in_schema
  4: required CompressionCodec codec             // SNAPPY, ZSTD, GZIP 等
  5: required i64 num_values
  6: required i64 total_uncompressed_size
  7: required i64 total_compressed_size
  9: required i64 data_page_offset              // 第一个数据页面的字节偏移
  10: optional i64 index_page_offset
  11: optional i64 dictionary_page_offset       // 字典页面的字节偏移
  12: optional Statistics statistics             // 最小值/最大值/空值计数
  13: optional list<PageEncodingStats> encoding_stats
}

PageHeader —— 每个页面在数据之前都有一个头部:

struct PageHeader {
  1: required PageType type              // DATA_PAGE, DICTIONARY_PAGE, DATA_PAGE_V2
  2: required i32 uncompressed_page_size
  3: required i32 compressed_page_size
  4: optional i32 crc
  5: optional DataPageHeader data_page_header
  7: optional DictionaryPageHeader dictionary_page_header
  8: optional DataPageHeaderV2 data_page_header_v2
}

Statistics —— 在页脚中按页面和列块存储:

struct Statistics {
  1: optional binary max              // 已弃用,仅支持有符号比较
  2: optional binary min              // 已弃用
  3: optional i64 null_count
  4: optional i64 distinct_count
  5: optional binary max_value        // 当前:遵循 ColumnOrder
  6: optional binary min_value        // 当前
  9: optional i64 nan_count           // 用于 FLOAT/DOUBLE/FLOAT16
}

2. 写入器实际工作方式(Java 源码)

来自 parquet-hadoop/.../ParquetFileWriter.java

步骤 1:写入魔术头

public void start() throws IOException {
    state = state.start();
    byte[] magic = MAGIC;  // "PAR1" = [0x50, 0x41, 0x52, 0x31]
    if (null != fileEncryptor && fileEncryptor.isFooterEncrypted()) {
        magic = EFMAGIC;   // 加密页脚时使用 "PARE"
    }
    out.write(magic);
}

步骤 2:开始行组(对齐 + 元数据初始化)

public void startBlock(long recordCount) throws IOException {
    state = state.startBlock();
    alignment.alignForRowGroup(out);  // 填充到块边界(HDFS 块)
    currentBlock = new BlockMetaData();
    currentRecordCount = recordCount;
    currentColumnIndexes = new ArrayList<>();
    currentOffsetIndexes = new ArrayList<>();
}

步骤 3:开始列(重置累加器)

public void startColumn(ColumnDescriptor descriptor, long valueCount,
                         CompressionCodecName compressionCodecName) {
    state = state.startColumn();
    currentChunkPath = ColumnPath.get(descriptor.getPath());
    currentChunkType = descriptor.getPrimitiveType();
    currentChunkCodec = compressionCodecName;
    currentChunkValueCount = valueCount;
    currentChunkFirstDataPage = -1;
    compressedLength = 0;
    uncompressedLength = 0;
    currentStatistics = null;
    columnIndexBuilder = ColumnIndexBuilder.getBuilder(...);
    offsetIndexBuilder = OffsetIndexBuilder.getBuilder();
}

步骤 4:写入字典页(如果使用字典编码)

public void writeDictionaryPage(DictionaryPage dictionaryPage, ...) {
    state = state.write();
    currentChunkDictionaryPageOffset = out.getPos();  // 记录偏移
    int uncompressedSize = dictionaryPage.getUncompressedSize();
    int compressedPageSize = dictionaryPage.getBytes().size();
    // 写入 PageHeader(Thrift 序列化)和 DictionaryPageHeader
    metadataConverter.writeDictionaryPageHeader(
        uncompressedSize, compressedPageSize,
        dictionaryPage.getDictionarySize(),
        dictionaryPage.getEncoding(), ...);
    // 写入实际的字典字节
    dictionaryPage.getBytes().writeAllTo(out);
    this.uncompressedLength += uncompressedSize + headerSize;
    this.compressedLength += compressedPageSize + headerSize;
}

步骤 5:写入数据页

public void writeDataPage(int valueCount, int uncompressedPageSize,
                           BytesInput bytes, Statistics<?> statistics, ...) {
    state = state.write();
    long beforeHeader = out.getPos();
    if (currentChunkFirstDataPage < 0) {
        currentChunkFirstDataPage = beforeHeader;  // 第一个数据页偏移
    }
    int compressedPageSize = bytes.size();
    // 写入 PageHeader 和 DataPageHeader(包含编码、统计信息)
    metadataConverter.writeDataPageV1Header(
        uncompressedPageSize, compressedPageSize, valueCount,
        rlEncoding, dlEncoding, valuesEncoding, ...);
    // 写入实际页面数据(重复级别 + 定义级别 + 值)
    bytes.writeAllTo(out);
    this.uncompressedLength += uncompressedPageSize + headerSize;
    this.compressedLength += compressedPageSize + headerSize;
    // 更新统计信息
    currentStatistics = Statistics.merge(currentStatistics, statistics);
}

步骤 6:结束列(构建 ColumnChunkMetaData)

public void endColumn() throws IOException {
    state = state.endColumn();
    // 构建页级列索引(每页的最小/最大值)
    currentColumnIndexes.add(columnIndexBuilder.build());
    currentOffsetIndexes.add(offsetIndexBuilder.build(currentChunkFirstDataPage));
    // 将列块元数据添加到当前行组
    currentBlock.addColumn(ColumnChunkMetaData.get(
        currentChunkPath, currentChunkType, currentChunkCodec,
        encodingStatsBuilder.build(), currentEncodings,
        currentStatistics, currentChunkFirstDataPage,
        currentChunkDictionaryPageOffset,
        currentChunkValueCount, compressedLength, uncompressedLength, ...));
}

步骤 7:结束行组

public void endBlock() throws IOException {
    state = state.endBlock();
    currentBlock.setRowCount(currentRecordCount);
    currentBlock.setOrdinal(blocks.size());
    blocks.add(currentBlock);  // 追加到行组列表
}

步骤 8:序列化页脚(关键步骤)

public void end(Map<String, String> extraMetaData) throws IOException {
    state = state.end();
    // 在数据之后写入列索引和偏移索引
    serializeColumnIndexes(columnIndexes, blocks, out, fileEncryptor);
    serializeOffsetIndexes(offsetIndexes, blocks, out, fileEncryptor);
    serializeBloomFilters(bloomFilters, blocks, out, fileEncryptor);
    // 构建页脚并序列化
    this.footer = new ParquetMetadata(
        new FileMetaData(schema, extraMetaData, Version.FULL_VERSION), blocks);
    serializeFooter(footer, out, fileEncryptor, metadataConverter);
}

private static void serializeFooter(...) throws IOException {
    long footerIndex = out.getPos();
    // 将内部元数据转换为 Thrift 格式
    FileMetaData parquetMetadata = metadataConverter.toParquetMetadata(...);
    // 写入 Thrift 序列化的 FileMetaData
    writeFileMetaData(parquetMetadata, out);
    // 将页脚长度写入为小端序 4 字节
    BytesUtils.writeIntLittleEndian(out, (out.getPos() - footerIndex));
    // 再次写入 PAR1 魔术
    out.write(MAGIC);  // "PAR1"
}

3. 字典编码:实际的编码器

来自 parquet-column/.../DictionaryValuesWriter.java

public abstract class DictionaryValuesWriter extends ValuesWriter {
    protected int maxDictionaryByteSize;  // 例如 parquet.dictionary.page.size
    protected boolean dictionaryTooBig;
    protected long dictionaryByteSize;
    protected IntList encodedValues;      // 字典中的整数 ID

    // 何时回退到 PLAIN 编码
    public boolean shouldFallBack() {
        return dictionaryByteSize > maxDictionaryByteSize
            || getDictionarySize() > MAX_DICTIONARY_ENTRIES;
    }

    // 检查字典是否真正节省空间
    public boolean isCompressionSatisfying(long rawSize, long encodedSize) {
        return (encodedSize + dictionaryByteSize) < rawSize;
    }

    // 序列化:使用 RLE/位打包编码整数 ID
    public BytesInput getBytes() {
        int maxDicId = getDictionarySize() - 1;
        int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
        RunLengthBitPackingHybridEncoder encoder =
            new RunLengthBitPackingHybridEncoder(bitWidth, ...);
        IntIterator iterator = encodedValues.iterator();
        while (iterator.hasNext()) {
            encoder.writeInt(iterator.next());
        }
        // 格式:[1 字节 bitWidth] [RLE 编码的 ID]
        byte[] bytesHeader = new byte[] {(byte) bitWidth};
        BytesInput rleEncodedBytes = encoder.toBytes();
        return concat(BytesInput.from(bytesHeader), rleEncodedBytes);
    }
}

具体实现:PlainBinaryDictionaryValuesWriter(用于字符串):

public static class PlainBinaryDictionaryValuesWriter extends DictionaryValuesWriter {
    protected Object2IntMap<Binary> binaryDictionaryContent;

    public void writeBytes(Binary v) {
        int id = binaryDictionaryContent.getInt(v);
        if (id == -1) {
            id = binaryDictionaryContent.size();
            binaryDictionaryContent.put(v.copy(), id);
            dictionaryByteSize += 4L + v.length();  // 4 字节长度前缀 + 字节
        }
        encodedValues.add(id);
    }

    public DictionaryPage toDictPageAndClose() {
        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(...);
        Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
        for (int i = 0; i < lastUsedDictionarySize; i++) {
            dictionaryEncoder.writeBytes(binaryIterator.next());
        }
        return dictPage(dictionaryEncoder);
    }
}

4. 物理文件布局

以上所有内容按以下确切顺序写入字节:

[PAR1]                                          ← 4 字节,start() 写入
[Row Group 0]
  [PageHeader (Thrift)] [Dictionary Page bytes] ← writeDictionaryPage()
  [PageHeader (Thrift)] [Data Page bytes]       ← writeDataPage() × N
  [PageHeader (Thrift)] [Data Page bytes]
  ...
[Row Group 1]
  [PageHeader (Thrift)] [Dictionary Page bytes]
  [PageHeader (Thrift)] [Data Page bytes]
  ...
[Column Index 0] [Column Index 1] ...          ← serializeColumnIndexes()
[Offset Index 0] [Offset Index 1] ...          ← serializeOffsetIndexes()
[Bloom Filter 0] [Bloom Filter 1] ...          ← serializeBloomFilters()
[FileMetaData (Thrift)]                         ← writeFileMetaData(), 页脚
[footer_length (4 bytes, LE)]                   ← BytesUtils.writeIntLittleEndian()
[PAR1]                                          ← 4 字节,文件末尾

读取器首先定位到 file_size - 8,读取 4 字节的页脚长度,然后回退读取 Thrift 序列化的 FileMetaData。这样,无需读取任何实际数据就能获得所有行组偏移、列偏移、统计信息、编码和压缩编码器。

5. 这种设计为何有效

参考文献


Back Donate