很久没有写博客了。看着博客都没有更新,就把今天解决的一个kakfa小问题给写出来吧。 今天给kafka进行升配的重新机器后发现,启动Kafka的一个节点失败了。报错信息如下。
[2021-11-17 15:39:25,368] INFO [Log partition=__consumer_offsets-4, dir=/data/data/kafka] Completed load of log with 63 segments, log start offset 7708002293 and log end offset 7769713881 in 539 ms (kafka.log.Log)
[2021-11-17 15:39:25,371] ERROR There was an error in one of the threads during logs loading: java.lang.NumberFormatException: For input string: "hs_err_pid16783" (kafka.log.LogManager)
[2021-11-17 15:39:25,379] WARN [Log partition=__consumer_offsets-36, dir=/data/data/kafka] Found a corrupted index file corresponding to log file /data/data/kafka/__consumer_offsets-36/00000000000000000012.log due to Corrupt index found, index file (/data/data/kafka/__consumer_offsets-36/00000000000000000012.index) has non-zero size but the last offset is 12 which is no greater than the base offset 12.}, recovering segment and rebuilding index files... (kafka.log.Log)
[2021-11-17 15:39:25,380] INFO [ProducerStateManager partition=__consumer_offsets-36] Writing producer snapshot at offset 12 (kafka.log.ProducerStateManager)
[2021-11-17 15:39:25,381] ERROR [KafkaServer id=1] Fatal error during KafkaServer startup. Prepare to shutdown (kafka.server.KafkaServer)
java.lang.NumberFormatException: For input string: "hs_err_pid16783"
at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.lang.Long.parseLong(Long.java:589)
at java.lang.Long.parseLong(Long.java:631)
at scala.collection.immutable.StringLike$class.toLong(StringLike.scala:277)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:29)
at kafka.log.Log$.offsetFromFile(Log.scala:1846)
at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:331)
at kafka.log.Log$$anonfun$loadSegmentFiles$3.apply(Log.scala:320)
at scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:733)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186)
at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:732)
at kafka.log.Log.loadSegmentFiles(Log.scala:320)
at kafka.log.Log.loadSegments(Log.scala:403)
at kafka.log.Log.<init>(Log.scala:216)
at kafka.log.Log$.apply(Log.scala:1747)
at kafka.log.LogManager.kafka$log$LogManager$$loadLog(LogManager.scala:255)
at kafka.log.LogManager$$anonfun$loadLogs$2$$anonfun$11$$anonfun$apply$15$$anonfun$apply$2.apply$mcV$sp(LogManager.scala:335)
at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:62)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[2021-11-17 15:39:25,381] INFO [Log partition=__consumer_offsets-36, dir=/data/data/kafka] Loading producer state from offset 12 with message format version 2 (kafka.log.Log)
[2021-11-17 15:39:25,384] INFO [KafkaServer id=1] shutting down (kafka.server.KafkaServer)
尝试了几次都报了相同的错误,有人说这是OOM有人说是有某个错误的pid文件导致的。 尝试了20分钟后觉得还是代码可靠,决定去看看源码。 https://github.com/apache/kafka/blob/1.1.0/core/src/main/scala/kafka/log/Log.scala#L320
// This method does not need to convert IOException to KafkaStorageException because it is only called before all logs are loaded
private def loadSegmentFiles(): Unit = {
// load segments in ascending order because transactional data from one segment may depend on the
// segments that come before it
for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
if (isIndexFile(file)) {
// if it is an index file, make sure it has a corresponding .log file
val offset = offsetFromFile(file)
val logFile = Log.logFile(dir, offset)
if (!logFile.exists) {
warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
Files.deleteIfExists(file.toPath)
}
} else if (isLogFile(file)) {
// if it's a log file, load the corresponding log segment
val baseOffset = offsetFromFile(file)
val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
val segment = LogSegment.open(dir = dir,
baseOffset = baseOffset,
config,
time = time,
fileAlreadyExists = true)
try segment.sanityCheck(timeIndexFileNewlyCreated)
catch {
case _: NoSuchFileException =>
error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
"recovering segment and rebuilding index files...")
recoverSegment(segment)
case e: CorruptIndexException =>
warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
recoverSegment(segment)
}
addSegment(segment)
}
}
}
可以发现代码:
for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
也就是说需要对目录内的文件进行排序,如果有*.log的文件名不是数字的,那就报错了。 那就代表着kafka数据目录里面有一个叫做hs_err_pid16783.log
的文件。 使用find /data/kafka -name hs_err_pid16783.log
。
找到了,删除hs_err_pid16783.log文件就好了。