首页 > 代码库 > Kafka深入理解-3:Kafka如何删除数据(日志)文件
Kafka深入理解-3:Kafka如何删除数据(日志)文件
Kafka作为消息中间件,数据需要按照一定的规则删除,否则数据量太大会把集群存储空间占满。
参考:apache Kafka是如何实现删除数据文件(日志)的
Kafka删除数据有两种方式
- 按照时间,超过一段时间后删除过期消息
- 按照消息大小,消息数量超过一定大小后删除最旧的数据
Kafka删除数据的最小单位:segment
Kafka删除数据主逻辑:kafka源码
def cleanupLogs() { debug("Beginning log cleanup...") var total = 0 val startMs = time.milliseconds for(log <- allLogs; if !log.config.compact) { debug("Garbage collecting ‘" + log.name + "‘") total += cleanupExpiredSegments(log) + cleanupSegmentsToMaintainSize(log) } debug("Log cleanup completed. " + total + " files deleted in " + (time.milliseconds - startMs) / 1000 + " seconds") }
Kafka一段时间(配置文件设置)调用一次 cleanupLogs,删除所有应该删除的日志数据。
cleanupExpiredSegments 负责清理超时的数据
private def cleanupExpiredSegments(log: Log): Int = { val startMs = time.milliseconds log.deleteOldSegments(startMs - _.lastModified > log.config.retentionMs) }
cleanupSegmentsToMaintainSize 负责清理超过大小的数据
private def cleanupSegmentsToMaintainSize(log: Log): Int = { if(log.config.retentionSize < 0 || log.size < log.config.retentionSize) return 0 var diff = log.size - log.config.retentionSize def shouldDelete(segment: LogSegment) = { if(diff - segment.size >= 0) { diff -= segment.size true } else { false } } log.deleteOldSegments(shouldDelete) }
Kafka深入理解-3:Kafka如何删除数据(日志)文件
声明:以上内容来自用户投稿及互联网公开渠道收集整理发布,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任,若内容有误或涉及侵权可进行投诉: 投诉/举报 工作人员会在5个工作日内联系你,一经查实,本站将立刻删除涉嫌侵权内容。