博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
RocketMQ CommitLog And Index
阅读量:6939 次
发布时间:2019-06-27

本文共 8792 字,大约阅读时间需要 29 分钟。

hot3.png

1. Commit Log

2. Index

2.1. IndexHeader

// IndexHeader.java    public static final int INDEX_HEADER_SIZE = 40;    private static int beginTimestampIndex = 0;    private static int endTimestampIndex = 8;    private static int beginPhyoffsetIndex = 16;    private static int endPhyoffsetIndex = 24;    private static int hashSlotcountIndex = 32;    private static int indexCountIndex = 36;    private final ByteBuffer byteBuffer;    private AtomicLong beginTimestamp = new AtomicLong(0);    private AtomicLong endTimestamp = new AtomicLong(0);    private AtomicLong beginPhyOffset = new AtomicLong(0);    private AtomicLong endPhyOffset = new AtomicLong(0);    private AtomicInteger hashSlotCount = new AtomicInteger(0);    private AtomicInteger indexCount = new AtomicInteger(1);    public IndexHeader(final ByteBuffer byteBuffer) {        this.byteBuffer = byteBuffer;    }    public void load() {        this.beginTimestamp.set(byteBuffer.getLong(beginTimestampIndex)); // CCC beginTimestamp  8位long类型,索引文件构建第一个索引的消息落在broker的时间        this.endTimestamp.set(byteBuffer.getLong(endTimestampIndex));     // CCC endTimestamp    8位long类型,索引文件构建最后一个索引消息落broker时间        this.beginPhyOffset.set(byteBuffer.getLong(beginPhyoffsetIndex)); // CCC beginPhyOffset  8位long类型,索引文件构建第一个索引的消息commitLog偏移量        this.endPhyOffset.set(byteBuffer.getLong(endPhyoffsetIndex));     // CCC endPhyOffset    8位long类型,索引文件构建最后一个索引消息commitLog偏移量        this.hashSlotCount.set(byteBuffer.getInt(hashSlotcountIndex));    // CCC hashSlotCount   4位int类型,构建索引占用的槽位数(这个值貌似没有具体作用)        this.indexCount.set(byteBuffer.getInt(indexCountIndex));          // CCC indexCount      4位int类型,索引文件中构建的索引个数        if (this.indexCount.get() <= 0) {            this.indexCount.set(1);        }    }    public void updateByteBuffer() {        this.byteBuffer.putLong(beginTimestampIndex, this.beginTimestamp.get());        this.byteBuffer.putLong(endTimestampIndex, this.endTimestamp.get());        this.byteBuffer.putLong(beginPhyoffsetIndex, this.beginPhyOffset.get());        this.byteBuffer.putLong(endPhyoffsetIndex, this.endPhyOffset.get());        this.byteBuffer.putInt(hashSlotcountIndex, this.hashSlotCount.get());        this.byteBuffer.putInt(indexCountIndex, this.indexCount.get());    }// ---- getter/setter ----// ...

2.2. IndexFile

// IndexFile.java    /* CCC 写index文件 */    public boolean putKey(final String key, final long phyOffset, final long storeTimestamp) {        if (this.indexHeader.getIndexCount() < this.indexNum) {            int keyHash = indexKeyHashMethod(key);            int slotPos = keyHash % this.hashSlotNum;            int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize;            FileLock fileLock = null;            try {                int slotValue = this.mappedByteBuffer.getInt(absSlotPos); // CCC 获取当前slot段中的值                if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount()) {                    slotValue = invalidIndex;                }                long timeDiff = storeTimestamp - this.indexHeader.getBeginTimestamp();                timeDiff = timeDiff / 1000;                if (this.indexHeader.getBeginTimestamp() <= 0) {                    timeDiff = 0;                } else if (timeDiff > Integer.MAX_VALUE) {                    timeDiff = Integer.MAX_VALUE;                } else if (timeDiff < 0) {                    timeDiff = 0;                }                int absIndexPos = // CCC 索引段位置                    IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize                        + this.indexHeader.getIndexCount() * indexSize;                this.mappedByteBuffer.putInt(absIndexPos, keyHash);                       // CCC  keyHash           4位int值,存储的是key的hash值                this.mappedByteBuffer.putLong(absIndexPos + 4, phyOffset);          // CCC  commitlog offset  8位long值,存储的是消息在commitlog的物理偏移量phyOffset                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8, (int) timeDiff);  // CCC  timediff          4位int值,存储了当前消息跟索引文件中第一个消息在broker落地的时间差                this.mappedByteBuffer.putInt(absIndexPos + 4 + 8 + 4, slotValue);   // CCC  slotValue         4位int值,无hash冲突,slotValue=0;若hash冲突存储上一个消息的slotValue槽位,计算索引地址                this.mappedByteBuffer.putInt(absSlotPos, this.indexHeader.getIndexCount());// CCC absSlotPos        4位int值,slot槽位存储消息个数索引                if (this.indexHeader.getIndexCount() <= 1) {                    this.indexHeader.setBeginPhyOffset(phyOffset);                    this.indexHeader.setBeginTimestamp(storeTimestamp);                }                this.indexHeader.incHashSlotCount();                this.indexHeader.incIndexCount();                this.indexHeader.setEndPhyOffset(phyOffset);                this.indexHeader.setEndTimestamp(storeTimestamp);                return true;            } catch (Exception e) {                log.error("putKey exception, Key: " + key + " KeyHashCode: " + key.hashCode(), e);            } finally {                if (fileLock != null) {                    try {                        fileLock.release();                    } catch (IOException e) {                        log.error("Failed to release the lock", e);                    }                }            }        } else {            log.warn("Over index file capacity: index count = " + this.indexHeader.getIndexCount()                + "; index max num = " + this.indexNum);        }        return false;    }    public int indexKeyHashMethod(final String key) {        int keyHash = key.hashCode();        int keyHashPositive = Math.abs(keyHash);        if (keyHashPositive < 0)            keyHashPositive = 0;        return keyHashPositive;    }    /* CCC 读Index文件 */    public void selectPhyOffset(final List
phyOffsets, final String key, final int maxNum, final long begin, final long end, boolean lock) { if (this.mappedFile.hold()) { int keyHash = indexKeyHashMethod(key); int slotPos = keyHash % this.hashSlotNum; int absSlotPos = IndexHeader.INDEX_HEADER_SIZE + slotPos * hashSlotSize; FileLock fileLock = null; try { if (lock) { // fileLock = this.fileChannel.lock(absSlotPos, // hashSlotSize, true); } int slotValue = this.mappedByteBuffer.getInt(absSlotPos); // if (fileLock != null) { // fileLock.release(); // fileLock = null; // } if (slotValue <= invalidIndex || slotValue > this.indexHeader.getIndexCount() || this.indexHeader.getIndexCount() <= 1) { } else { for (int nextIndexToRead = slotValue; ; ) { if (phyOffsets.size() >= maxNum) { break; } int absIndexPos = IndexHeader.INDEX_HEADER_SIZE + this.hashSlotNum * hashSlotSize + nextIndexToRead * indexSize; int keyHashRead = this.mappedByteBuffer.getInt(absIndexPos); long phyOffsetRead = this.mappedByteBuffer.getLong(absIndexPos + 4); long timeDiff = (long) this.mappedByteBuffer.getInt(absIndexPos + 4 + 8); int prevIndexRead = this.mappedByteBuffer.getInt(absIndexPos + 4 + 8 + 4); if (timeDiff < 0) { break; } timeDiff *= 1000L; long timeRead = this.indexHeader.getBeginTimestamp() + timeDiff; boolean timeMatched = (timeRead >= begin) && (timeRead <= end); if (keyHash == keyHashRead && timeMatched) { phyOffsets.add(phyOffsetRead); } if (prevIndexRead <= invalidIndex || prevIndexRead > this.indexHeader.getIndexCount() || prevIndexRead == nextIndexToRead || timeRead < begin) { break; } nextIndexToRead = prevIndexRead; } } } catch (Exception e) { log.error("selectPhyOffset exception ", e); } finally { if (fileLock != null) { try { fileLock.release(); } catch (IOException e) { log.error("Failed to release the lock", e); } } this.mappedFile.release(); } } }

3. 参考资料

略。

转载于:https://my.oschina.net/dexterman/blog/1797271

你可能感兴趣的文章
SQLite操作
查看>>
安装Gogs及简单配置(使用默认数据库)
查看>>
奔向新纪元,Vista安装经历
查看>>
Centos7无法使用ssh登陆及解决方案
查看>>
应用强制访问控制管理网络服务
查看>>
Exchange 2013多租户托管PART 2:Exchange基本配置
查看>>
Mellanox发布升级版RoCE软件 简化以太网RDMA部署
查看>>
《认知设计:提升学习体验的艺术》——学习者不希望觉得自己愚蠢
查看>>
大数据产业“跑”出“长春速度”
查看>>
YII2.0框架分页
查看>>
c#数据库编程
查看>>
我的友情链接
查看>>
MongoDB 更新文档
查看>>
javaweb Servlet开发
查看>>
websocket-bench压力测试
查看>>
http://91.213.30.151/
查看>>
Android ViewStub详解
查看>>
JavaScript中的prototype、__proto__和constructor
查看>>
mysql把一个表某个字段的内容复制到另一张表的某个字段的SQL语句写法
查看>>
java类初始化顺序-阿里笔试题
查看>>