1. Commit Log
2. Index
2.1. IndexHeader
// IndexHeader.java public static final int INDEX_HEADER_SIZE = 40; private static int beginTimestampIndex = 0; private static int endTimestampIndex = 8; private static int beginPhyoffsetIndex = 16; private static int endPhyoffsetIndex = 24; private static int hashSlotcountIndex = 32; private static int indexCountIndex = 36; private final ByteBuffer byteBuffer; private AtomicLong beginTimestamp = new AtomicLong(0); private AtomicLong endTimestamp = new AtomicLong(0); private AtomicLong beginPhyOffset = new AtomicLong(0); private AtomicLong endPhyOffset = new AtomicLong(0); private AtomicInteger hashSlotCount = new AtomicInteger(0); private AtomicInteger indexCount = new AtomicInteger(1); public IndexHeader(final ByteBuffer byteBuffer) { this.byteBuffer = byteBuffer; } public void load() { this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex)); // CCC beginTimestamp 8位long类型,索引文件构建第一个索引的消息落在broker的时间 this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex)); // CCC endTimestamp 8位long类型,索引文件构建最后一个索引消息落broker时间 this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex)); // CCC beginPhyOffset 8位long类型,索引文件构建第一个索引的消息commitLog偏移量 this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex)); // CCC endPhyOffset 8位long类型,索引文件构建最后一个索引消息commitLog偏移量 this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex)); // CCC hashSlotCount 4位int类型,构建索引占用的槽位数(这个值貌似没有具体作用) this.indexCount.set(byteBuffer.getInt(indexCountIndex)); // CCC indexCount 4位int类型,索引文件中构建的索引个数 if (this.indexCount.get() <= 0) { this.indexCount.set(1); } } public void updateByteBuffer() { this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get()); this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get()); this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get()); this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get()); this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get()); this.byteBuffer.putInt(indexCountIndex, this.indexCount.get()); }// ---- getter/setter ----// ...
2.2. IndexFile
// IndexFile.java /* CCC 写index文件 */ public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) { if (this.indexHeader.getIndexCount() < this.indexNum) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { int slotValue = this.mappedByteBuffer.getInt(absSlotPos); // CCC 获取当前slot段中的值 if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) { slotValue = invalidIndex; } long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp(); timeDiff = timeDiff / 1000; if (this.indexHeader.getBeginTimestamp() <= 0) { timeDiff = 0; } else if (timeDiff > Integer.MAX_VALUE) { timeDiff = Integer.MAX_VALUE; } else if (timeDiff < 0) { timeDiff = 0; } int absIndexPos = // CCC 索引段位置 IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + this.indexHeader.getIndexCount() * indexSize; this.mappedByteBuffer.putInt(absIndexPos, keyHash); // CCC keyHash 4位int值,存储的是key的hash值 this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset); // CCC commitlog offset 8位long值,存储的是消息在commitlog的物理偏移量phyOffset this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff); // CCC timediff 4位int值,存储了当前消息跟索引文件中第一个消息在broker落地的时间差 this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue); // CCC slotValue 4位int值,无hash冲突,slotValue=0;若hash冲突存储上一个消息的slotValue槽位,计算索引地址 this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());// CCC absSlotPos 4位int值,slot槽位存储消息个数索引 if (this.indexHeader.getIndexCount() <= 1) { this.indexHeader.setBeginPhyOffset(phyOffset); this.indexHeader.setBeginTimestamp(storeTimestamp); } this.indexHeader.incHashSlotCount(); this.indexHeader.incIndexCount(); this.indexHeader.setEndPhyOffset(phyOffset); this.indexHeader.setEndTimestamp(storeTimestamp); return true; } catch (Exception e) { log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } } } else { log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount() + "; index max num = " + this.indexNum); } return false; } public int indexKeyHashMethod(final String key) { int keyHash = key.hashCode(); int keyHashPositive = Math.abs(keyHash); if (keyHashPositive < 0) keyHashPositive = 0; return keyHashPositive; } /* CCC 读Index文件 */ public void selectPhyOffset(final ListphyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { if (lock) { // fileLock = this.fileChannel.lock(absSlotPos, // hashSlotSize, true); } int slotValue = this.mappedByteBuffer.getInt(absSlotPos); // if (fileLock != null) { // fileLock.release(); // fileLock = null; // } if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { } else { for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; } int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); if (timeDiff < 0) { break; } timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); } if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; } nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } this.mappedFile.release(); } } }
3. 参考资料
略。