flume定时切割文件问题分析

原创
2018/01/16 17:26
阅读数 2.5K

序言:

flume中经常遇到这种情景:业务需要每到整点切数据,在flume的配置文件中一般需要配置tier1.sinks.sink1.hdfs.rollInterval=3600就可以了,但是,假设现在是14:00钟,而上一小时由于source端无数据导致在13:30分才产生数据,所有导致14:00中数据不能切,只能等到14:30才可以切,这是因为flume中按时切割的时间是顺延的。这样对后面spark按时取数据就会产生异常,。 通过网上查找了许多资源,也有相关的资源,但是没有相同的解决方案,故此写出以防有其他人也有这个需求。

源码分析及想法:

既然是需要定时切割文件,那就是到整点就将在写的文件关掉就ok了。

根据HDFSEventSink.java中的process()方法中的:
try {
          bucketWriter.append(event);
        } catch (BucketClosedException ex) {
          LOG.info("Bucket was closed while trying to append, " +
            "reinitializing bucket and writing event.");
          hdfsWriter = writerFactory.getWriter(fileType);
          bucketWriter = initializeBucketWriter(realPath, realName,
            lookupPath, hdfsWriter, closeCallback);
          synchronized (sfWritersLock) {
            sfWriters.put(lookupPath, bucketWriter);
          }
          bucketWriter.append(event);
        }

可以看出是在此处将event写入到hdfs中的,然后跟着代码找到BucketWriter.java类,这个类首先判断hdfs文件是否打开,如果没有打开则打开文件:

  if (!isOpen) {
            if (closed) {
                throw new BucketClosedException("This bucket writer was closed and " +
                        "this handle is thus no longer valid");
            }
            open();
        }

然后在打开之后将event写入其中,最后判断是否达到整点时间,如果达到则关闭,滚动tmp文件。 在BucketWriter.java类中增加的主要就两行代码:

    long nextHour = ((long) new Date().getTime() / 1000 / 3600 + 1) * 1000 * 3600;
    timedRollFuture = timedRollerPool.schedule(action, nextHour - System.currentTimeMillis(),
                    TimeUnit.MILLISECONDS);
  • 此处也许会有人问在写入hdfs时是一个for循环,如果event==null那就直接退出了。不会走到下面的open方法。
 public Status process() throws EventDeliveryException {
    Channel channel = getChannel();
    Transaction transaction = channel.getTransaction();
    List<BucketWriter> writers = Lists.newArrayList();
    transaction.begin();
    try {
      int txnEventCount = 0;
      for (txnEventCount = 0; txnEventCount < batchSize; txnEventCount++) {
        Event event = channel.take();
        if (event == null) {
          break;
        }
。。。
}
}
}

这里不用担心,因为利用了定时执行任务线程,只要小时内有数据,就能走到该处,也就是已经有了定时任务

 timedRollFuture = timedRollerPool.schedule(action, nextHour - System.currentTimeMillis(),
                    TimeUnit.MILLISECONDS);

此处是利用HdfsEventSink.java中的

public void start() {
    String timeoutName = "hdfs-" + getName() + "-call-runner-%d";
    callTimeoutPool = Executors.newFixedThreadPool(threadsPoolSize,
    new ThreadFactoryBuilder().setNameFormat(timeoutName).build());

    String rollerName = "hdfs-" + getName() + "-roll-timer-%d";
    timedRollerPool = Executors.newScheduledThreadPool(rollTimerPoolSize,
            new ThreadFactoryBuilder().setNameFormat(rollerName).build());

     this.sfWriters = new WriterLinkedHashMap(maxOpenFiles);
     sinkCounter.start();
     super.start();
  }

用来翻转的线程timedRollerPool,而这个线程是定时周期执行指定的任务:action

定时周期执行线程为:ScheduledExecutorService,此接口中有以下四种方法: 输入图片说明 定时执行线程见博客:

1.ScheduledExecutorService定时周期执行指定的任务

2.Java多线程(四)、线程池

  • 然后打包运行就ok啦。

个人测试

这里就根据传入的rollInterval的值进行切割了

 long now = System.currentTimeMillis();
      long nextIntegrationPoint = ((long) (now / 1000 / rollInterval + 1)) * rollInterval*1000;
      timedRollFuture = timedRollerPool.schedule(action, nextIntegrationPoint - now,   TimeUnit.MILLISECONDS;);
  • rollInterval为切割时间单位,间隔一个时间单位 接下来还需要修改如何传入参数,即如何将切割日志的时间参数传入其中
展开阅读全文
打赏
0
1 收藏
分享
加载中
-九天-博主
1和2都是可以实现的,但是3每天定时某几点钟实现滚动一次文件,这个好像不行吧
2019/03/12 16:31
回复
举报

引用来自“在_路_上”的评论

您好,最近在学习Flume,在做项目的时候也遇到和您同样的需求,按您的提示,修改了源码,测试间隔1小时,整点切割文件,是可以的.但是我想多增加点功能; 1: 实现每隔多少分钟切割滚动一次文件; 2: 实现每隔几个小时滚动一次文件; 3: 实现每天定时某几点钟滚动一次文件. 我也修改了相应源码,但最终结果和预期不一样.自己对Flume知之甚少,希望可以和您交流交流.如果您方便的话可以加一下我QQ吗?有一些问题向您请教.
我qq 17212056
2019/03/11 19:28
回复
举报
您好,最近在学习Flume,在做项目的时候也遇到和您同样的需求,按您的提示,修改了源码,测试间隔1小时,整点切割文件,是可以的.但是我想多增加点功能; 1: 实现每隔多少分钟切割滚动一次文件; 2: 实现每隔几个小时滚动一次文件; 3: 实现每天定时某几点钟滚动一次文件. 我也修改了相应源码,但最终结果和预期不一样.自己对Flume知之甚少,希望可以和您交流交流.如果您方便的话可以加一下我QQ吗?有一些问题向您请教.
2019/03/11 19:28
回复
举报
更多评论
打赏
3 评论
1 收藏
0
分享
返回顶部
顶部