首页 > 代码库 > 手动删除Kafka Topic

手动删除Kafka Topic

一、删除Kafka topic

   运行./bin/kafka-topics  --delete --zookeeper 【zookeeper server】  --topic 【topic name】;如果kafaka启动时加载的配置文件中server.properties没有配置delete.topic.enable=true,那么此时的删除并不是真正的删除,而是把topic标记为:marked for deletion

  可以通过命令:./bin/kafka-topics --zookeeper 【zookeeper server】 --list 来查看所有topic

  此时若想真正删除它,可以登录zookeeper客户端,命令:./bin/zookeeper-client

  找到topic所在的目录:ls /brokers/topics

  找到要删除的topic,执行命令:rmr /brokers/topics/【topic name】即可,此时topic被彻底删除。

  另外被标记为marked for deletion的topic可以在zookeeper客户端中通过命令获得:ls /admin/delete_topics/【topic name】,如果你删除了此处的topic,那么marked for deletion 标记消失;

二、Zookeeper数据清理

  下面分四种场景,具体如下:

1、停止kafka,保持zookeeper仍旧运行

  A.用zookeeper客户端zkClient连接zookeeper,如:

./zkCli.sh -server 10.67.2.21:2181,10.67.2.20:2181

  连接成功后,可以用命令查看zk的目录结构:

ls /brokers/topics

  然后可以看到该目录下有pjtest目录,删除方式:

delete /brokers/topics/pjtest/partitions/0/state ...delete /brokers/topics/pjtest/partitions/9/statedelete /brokers/topics/pjtest/partitions/0 ...delete /brokers/topics/pjtest/partitions/9delete /brokers/topics/pjtest/partitionsdelete /brokers/topics/pjtest

2、停止kafka和zookeeper

  •   查看server.properties中配置的log.dirs路径;
  •  进入此路径中(如果kafka是一个集群,需要在每台机器上执行) 假定要删除的topic名称为:pjtest,则rm -rf pjtest-*;
  • vi replication-offset-checkpoint
    假设原文为:
    0
    15
    pjtest 4 0
    pjtest 8 0
    managejob 5 0
    pjtest 6 0
    pjtest 0 0
    pjtest 5 0
    pjtest 3 0
    pjtest 7 0
    pjtest 9 0
    pjtest 2 0
    managejob 9 0
    managejob 1 0
    managejob 3 0
    pjtest 1 0
    managejob 7 0

    修改为:
    0
    5
    managejob 5 0
    managejob 9 0
    managejob 1 0
    managejob 3 0
    managejob 7 0
  • 同理需要修改vi recovery-point-offset-checkpoint

 3、启动kafka和zookeeper

     一、kafka日志清理

    Kafka将会保留所有发布的消息,不论他们是否被消费过.如果需要清理,则需要进行配置.server.properties配置:
    log.cleanup.policy=delete
    日志清理策略

     log.retention.hours=240
    数据存储的最大时间超过这个时间会根据log.cleanup.policy设置的策略处理数据,也就是消费端能够多久去消费数据

    log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除

    log.retention.bytes=-1

    topic每个分区的最大文件大小,一个topic的大小限制=分区数*log.retention.bytes.-1表示没有大小限制
    log.retention.bytes和log.retention.minutes任意一个达到要求,都会执行删除
    log.retention.check.interval.ms=5minutes 文件大小检查的周期时间

手动删除Kafka Topic