kafka报错:java.lang.NumberFormatException: For input string: "hs_err_pid16783"

原创
2021/11/17 17:57
阅读数 1K
AI总结

很久没有写博客了。看着博客都没有更新,就把今天解决的一个kakfa小问题给写出来吧。 今天给kafka进行升配的重新机器后发现,启动Kafka的一个节点失败了。报错信息如下。

[2021-11-17 15:39:25,368] INFO [Log partition=__consumer_offsets-4, dir=/data/data/kafka] Completed load of log with 63 segments, log start offset 7708002293 and log end offset 7769713881 in 539 ms (kafka.log.Log)
[2021-11-17 15:39:25,371] ERROR There was an error in one of the threads during logs loading: java.lang.NumberFormatException: For input string: "hs_err_pid16783" (kafka.log.LogManager)
[2021-11-17 15:39:25,379] WARN [Log partition=__consumer_offsets-36, dir=/data/data/kafka] Found a corrupted index file corresponding to log file /data/data/kafka/__consumer_offsets-36/00000000000000000012.log due to Corrupt index found, index file (/data/data/kafka/__consumer_offsets-36/00000000000000000012.index) has non-zero size but the last offset is 12 which is no greater than the base offset 12.}, recovering segment and rebuilding index files... (kafka.log.Log)
[2021-11-17 15:39:25,380] INFO [ProducerStateManager partition=__consumer_offsets-36] Writing producer snapshot at offset 12 (kafka.log.ProducerStateManager)
[2021-11-17 15:39:25,381] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.NumberFormatException: For input string: "hs_err_pid16783"
        at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
        at java.lang.Long.parseLong(Long.java:589)
        at java.lang.Long.parseLong(Long.java:631)
        at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
        at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
        at kafka.log.Log$.offsetFromFile(Log.scala:1846)
        at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:331)
        at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:320)
        at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
        at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
        at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
        at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
        at kafka.log.Log.loadSegmentFiles(Log.scala:320)
        at kafka.log.Log.loadSegments(Log.scala:403)
        at kafka.log.Log.<init>(Log.scala:216)
        at kafka.log.Log$.apply(Log.scala:1747)
        at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:255)
        at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:335)
        at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
        at java.lang.Thread.run(Thread.java:748)
[2021-11-17 15:39:25,381] INFO [Log partition=__consumer_offsets-36, dir=/data/data/kafka] Loading producer state from offset 12 with message format version 2 (kafka.log.Log)
[2021-11-17 15:39:25,384] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)

尝试了几次都报了相同的错误,有人说这是OOM有人说是有某个错误的pid文件导致的。 尝试了20分钟后觉得还是代码可靠,决定去看看源码。 https://github.com/apache/kafka/blob/1.1.0/core/src/main/scala/kafka/log/Log.scala#L320

  // This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded
  private def loadSegmentFiles(): Unit = {
    // load segments in ascending order because transactional data from one segment may depend on the
    // segments that come before it
    for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
      if (isIndexFile(file)) {
        // if it is an index file, make sure it has a corresponding .log file
        val offset = offsetFromFile(file)
        val logFile = Log.logFile(dir, offset)
        if (!logFile.exists) {
          warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
          Files.deleteIfExists(file.toPath)
        }
      } else if (isLogFile(file)) {
        // if it's a log file, load the corresponding log segment
        val baseOffset = offsetFromFile(file)
        val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
        val segment = LogSegment.open(dir = dir,
          baseOffset = baseOffset,
          config,
          time = time,
          fileAlreadyExists = true)

        try segment.sanityCheck(timeIndexFileNewlyCreated)
        catch {
          case _: NoSuchFileException =>
            error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
              "recovering segment and rebuilding index files...")
            recoverSegment(segment)
          case e: CorruptIndexException =>
            warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
              s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
            recoverSegment(segment)
        }
        addSegment(segment)
      }
    }
  }

可以发现代码:

 for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {

也就是说需要对目录内的文件进行排序,如果有*.log的文件名不是数字的,那就报错了。 那就代表着kafka数据目录里面有一个叫做hs_err_pid16783.log的文件。 使用find /data/kafka -name hs_err_pid16783.log

找到了,删除hs_err_pid16783.log文件就好了。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
AI总结
返回顶部
顶部