图解Kafka的RecordBatch结构
🔥《Kafka运维管控平台LogiKM》🔥 ✏️更强大的管控能力✏️ 🎾更高效的问题定位能力🎾 🌅更便捷的集群运维能力🌅 🎼更专业的资源治理🎼 🌞更友好的运维生态🌞
文章目录
-
- RecordBatch
-
- RecordBatch初始化
- 写入消息
- Record结构图
- 关闭ProducerBatch
-
- 关闭输出流appendStream并压缩数据
- 填充RecordBatchHeader数据
- RecordBatchHeader结构图
- RecordBatch整体结构图
阅读完本文你大概会获得以下知识
- 什么时候执行消息的压缩操作
- RecordBatch结构图
RecordBatch
我们之前有讲过生产者的ProducerBatch, 这个RecordBatch跟ProducerBatch的区别是什么呢?
RecordBatch是在ProducerBatch里面的一个专门存放消息的对象, 除此之外ProducerBatch还有其他相关属性,例如还有重试、回调等等相关属性。
RecordBatch初始化
在创建一个需要创建一个新的ProducerBatch的时候,同时需要构建一个MemoryRecordsBuilder, 这个对象我们可以理解为消息构造器,所有的消息相关都存放到这个里面。
public MemoryRecordsBuilder(ByteBufferOutputStream bufferStream, byte magic, CompressionType compressionType, TimestampType timestampType, long baseOffset, long logAppendTime, long producerId, short producerEpoch, int baseSequence, boolean isTransactional, boolean isControlBatch, int partitionLeaderEpoch, int writeLimit) { // 省略部分.... this.magic = magic; this.timestampType = timestampType; this.compressionType = compressionType; this.baseOffset = baseOffset; this.logAppendTime = logAppendTime; this.numRecords = 0; this.uncompressedRecordsSizeInBytes = 0; this.actualCompressionRatio = 1; this.maxTimestamp = RecordBatch.NO_TIMESTAMP; this.producerId = producerId; this.producerEpoch = producerEpoch; this.baseSequence = baseSequence; this.isTransactional = isTransactional; this.isControlBatch = isControlBatch; this.partitionLeaderEpoch = partitionLeaderEpoch; this.writeLimit = writeLimit; this.initialPosition = bufferStream.position(); this.batchHeaderSizeInBytes = AbstractRecords.recordBatchHeaderSizeInBytes(magic, compressionType);// Buffer一开始就需要预留61B的位置用于 存放消息投 RecordHeader bufferStream.position(initialPosition + batchHeaderSizeInBytes); this.bufferStream = bufferStream; //选择合适的压缩器实现类 this.appendStream = new DataOutputStream(compressionType.wrapForOutput(this.bufferStream, magic)); }
上面的源码可知重点:
- bufferStream 一开始的时候就需要预留 61B 的位置给 消息头使用,也就是RecordHeader。batchHeaderSizeInBytes = 61
- 根据配置的压缩类型
compression.type
,选择对应的压缩输出流。例如假设使用lz4
压缩类型,返回的输出流实体对象为KafkaLZ4BlockOutputStream
, 这里面有写入消息的方法和压缩方法。
写入消息
创建了Batch之后,自然需要写入消息
源码位置:
private void appendDefaultRecord(long offset, long timestamp, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { ensureOpenForRecordAppend(); // 位移偏移量 ;offset 是当前lastOffset+1, 如果是最开始的时候,它是0; baseOffset 默认是0 int offsetDelta = (int) (offset - baseOffset); long timestampDelta = timestamp - firstTimestamp; //将数据 写到appendStream中。 int sizeInBytes = DefaultRecord.writeTo(appendStream, offsetDelta, timestampDelta, key, value, headers); // 记录一下 写入了多少数据 recordWritten(offset, timestamp, sizeInBytes); }
-
offsetDelta:表示该条消息的相对整个RecordBatch的位移偏移量, 计算逻辑是(
offset - baseOffset
); 使用偏移量可以节省字节数
offset 值等于当前RecordBatch的最后一个offset+1,计算逻辑是(offset = lastOffset == null ? baseOffset : lastOffset + 1;
)
baseOffset 值是RecordBatch的起始偏移量,一般值为0 ; -
timestampDelta : 表示该条消息的相对整个RecordBatch的时间戳的偏移量,计算逻辑(
timestamp - firstTimestamp
) ,使用偏移量可以节省字节数
timestamp 值逻辑timestamp = record.timestamp() == null ? nowMs : record.timestamp()
,意思是这个值也是可以通过设置record属性来设置的。
firstTimestamp 值就是timestamp第一次的值。 -
得到了上面的基础值之后, 就将消息写入到Buffer中, 这里的写入涉及到变长字段Varints,一定程度节省空间。这里写入
write()
的时候,底层执行的是根据你选择的压缩类型决定使用哪个实现类,例如KafkaLZ4BlockOutputStream。 具体的Record的格式请看下面的 Record格式
注意: 这里写入消息的时候,第一条消息,是从第62位写入的,因为前面的61B已经被BatchHeader先预定了(初始化的时候)。
Record结构图
要了解消息的格式,我们先看看消息是怎么写入的
DefaultRecord#writeTo
public static int writeTo(DataOutputStream out, int offsetDelta, long timestampDelta, ByteBuffer key, ByteBuffer value, Header[] headers) throws IOException { int sizeInBytes = sizeOfBodyInBytes(offsetDelta, timestampDelta, key, value, headers); ByteUtils.writeVarint(sizeInBytes, out); byte attributes = 0; // there are no used record attributes at the moment out.write(attributes); ByteUtils.writeVarlong(timestampDelta, out); ByteUtils.writeVarint(offsetDelta, out); if (key == null) { ByteUtils.writeVarint(-1, out); } else { int keySize = key.remaining(); ByteUtils.writeVarint(keySize, out); Utils.writeTo(out, key, keySize); } if (value == null) { ByteUtils.writeVarint(-1, out); } else { int valueSize = value.remaining(); ByteUtils.writeVarint(valueSize, out); Utils.writeTo(out, value, valueSize); } if (headers == null) throw new IllegalArgumentException("Headers cannot be null"); ByteUtils.writeVarint(headers.length, out); for (Header header : headers) { String headerKey = header.key(); if (headerKey == null) throw new IllegalArgumentException("Invalid null header key found in headers"); byte[] utf8Bytes = Utils.utf8(headerKey); ByteUtils.writeVarint(utf8Bytes.length, out); out.write(utf8Bytes); byte[] headerValue = header.value(); if (headerValue == null) { ByteUtils.writeVarint(-1, out); } else { ByteUtils.writeVarint(headerValue.length, out); out.write(headerValue); } } return ByteUtils.sizeOfVarint(sizeInBytes) + sizeInBytes; }
从源码可以得知消息格式为:
Record属性解释:
- length:整个Record的消息总大小, 使用可变字段。
- attributes:已经弃用,默认为0,固定占用了1B
- timestampDelta: 时间戳的增量,使用可变字段。使用增量可以有效节约内存
- offsetDelta: 位移的增量,使用可变字段, 使用增量可以有效节约内存
- keyLength: key的长度,使用可变字段, 如果没有key,该值为-1。
- key: key的信息,正常存储。如果key==null,则该值不存在。
- valueLength:value的长度,使用可变字段, 如果没有key,改值为-1.
- value: value的信息,正常存储,如果value==null,则该值也不存在。
- headers:消息头,这个字段用于支持应用级别的扩展,可以携带很多信息,例如你带一个TraceId也不过分。
- header counts : 消息头的数量,使用可变字段
Varints 是可变长自动,可以有效的节省空间
Header属性解释:
类似,就不再赘述了。
关闭ProducerBatch
当一个ProducerBatch即将发送出去的时候(ReadyBatch), 会先将Batch关闭掉batch.close()
。
关闭输出流appendStream并压缩数据
在这个过程中,也会将appendStream
关闭掉, 也就是用于存储消息体的输出流,那么在它调用 out.flush()
的时候就会调用对应的实现类流,比如我们的压缩类型是lz4, 那么这里实现类就是 KafkaLZ4BlockOutputStream
MemoryRecordsBuilder#closeForRecordAppends
KafkaLZ4BlockOutputStream#flush
public void flush() throws IOException { if (!finished) { writeBlock(); } if (out != null) { out.flush(); } }
什么时候执行压缩操作
其中的 writeBlock()就是在执行压缩操作, 所以你应该知道, 这个时候压缩了Records。并且只是Records。
填充RecordBatchHeader数据
上面我们已经给Records消息集压缩过了, 还记得我们在写入消息的时候是从 position 61 后面开始写的吗?
这个61B的空间是用来干嘛的呢?
MemoryRecordsBuilder#writeDefaultBatchHeader
private int writeDefaultBatchHeader() { ensureOpenForRecordBatchWrite(); ByteBuffer buffer = bufferStream.buffer(); //当前buffer的位置 int pos = buffer.position(); //将位置移动到初始位置0 buffer.position(initialPosition); // 大小 int size = pos - initialPosition; //已压缩的大小 int writtenCompressed = size - DefaultRecordBatch.RECORD_BATCH_OVERHEAD; // 偏移量增量 int offsetDelta = (int) (lastOffset - baseOffset); final long maxTimestamp; if (timestampType == TimestampType.LOG_APPEND_TIME) maxTimestamp = logAppendTime; else maxTimestamp = this.maxTimestamp; //讲RecordBatch 消息头写入buffer DefaultRecordBatch.writeHeader(buffer, baseOffset, offsetDelta, size, magic, compressionType, timestampType, firstTimestamp, maxTimestamp, producerId, producerEpoch, baseSequence, isTransactional, isControlBatch, partitionLeaderEpoch, numRecords);//重新定位 buffer.position(pos); return writtenCompressed; }
真正写入数据的地方的
DefaultRecordBatch#writeHeader
static void writeHeader(ByteBuffer buffer,long baseOffset,int lastOffsetDelta,int sizeInBytes,byte magic,CompressionType compressionType,TimestampType timestampType,long firstTimestamp,long maxTimestamp,long producerId,short epoch,int sequence,boolean isTransactional,boolean isControlBatch,int partitionLeaderEpoch,int numRecords) { if (magic < RecordBatch.CURRENT_MAGIC_VALUE) throw new IllegalArgumentException("Invalid magic value " + magic); if (firstTimestamp < 0 && firstTimestamp != NO_TIMESTAMP) throw new IllegalArgumentException("Invalid message timestamp " + firstTimestamp); short attributes = computeAttributes(compressionType, timestampType, isTransactional, isControlBatch); int position = buffer.position(); buffer.putLong(position + BASE_OFFSET_OFFSET, baseOffset); buffer.putInt(position + LENGTH_OFFSET, sizeInBytes - LOG_OVERHEAD); buffer.putInt(position + PARTITION_LEADER_EPOCH_OFFSET, partitionLeaderEpoch); buffer.put(position + MAGIC_OFFSET, magic); buffer.putShort(position + ATTRIBUTES_OFFSET, attributes); buffer.putLong(position + FIRST_TIMESTAMP_OFFSET, firstTimestamp); buffer.putLong(position + MAX_TIMESTAMP_OFFSET, maxTimestamp); buffer.putInt(position + LAST_OFFSET_DELTA_OFFSET, lastOffsetDelta); buffer.putLong(position + PRODUCER_ID_OFFSET, producerId); buffer.putShort(position + PRODUCER_EPOCH_OFFSET, epoch); buffer.putInt(position + BASE_SEQUENCE_OFFSET, sequence); buffer.putInt(position + RECORDS_COUNT_OFFSET, numRecords); long crc = Crc32C.compute(buffer, ATTRIBUTES_OFFSET, sizeInBytes - ATTRIBUTES_OFFSET); buffer.putInt(position + CRC_OFFSET, (int) crc); buffer.position(position + RECORD_BATCH_OVERHEAD); }
可以看到CRC的计算,是在最后面的时候计算,然后填充到buffer里面的,但是这个并不意味着crc32是放在最后一个, CRC_OFFSET的位置是17的位置。
RecordBatchHeader结构图
RecordBatchHeader属性解释:
-
baseOffset: 当然RecordBatch的起始位移,一般默认为0
-
length:计算从
partition leader epoch
字段开始到整体末尾的长度,计算的逻辑是(sizeInBytes - LOG_OVERHEAD), 这个sizeInBytes
就是整个RecordBatch的长度。LOG_OVERHEAD = 12 -
partition leader epoch: 分区的Leader纪元,也就是版本号
-
magic: 消息格式版本号, V2版本 该值为2
-
crc32: 该RecordBatch的校验值, 计算该值是从attributes的位置开始计算的。
-
attributes:消息的属性,这里用了2个字节, 低3位表示压缩格式,第4位表示时间戳,第5位表示事务标识,第6位表示是否控制消息。如下图
-
last offset delta : RecordBatch中最后一个Record的offset与first offset的差值。
-
first timestamp: 第一条Record的时间戳。对于Record的时间戳的值 ,如果在构造待发送的ProducerRecord的时候设置了timestamp,那么就是这个设置的值,如果没有设置那就是当前时间戳的值。
-
max timestamp: RecordBatch中最大时间戳。
-
producer id : 用于支持幂等和事务的属性。
-
producer epoch :用于支持幂等和事务的属性。
-
base sequence :用于支持幂等和事务的属性。
-
record count : 消息数量
RecordBatch整体结构图
- 在创建RecordBatch的时候,会先预留61B的位置给BatchHeader, 实现方式就是让buffer的位置移动到61位
buffer.possition(61)
- 消息写入的时候并不会压缩,只有等到即将发送这个Batch的时候,会关闭Batch,从而进行压缩(如果配置了压缩策略的话), 压缩的知识Records, 不包含 RecordBatchHeader
- 填充RecordBatchHeader