文档章节

druid.io 从本地批(batch)导入数据与从hdfs 批导入数据的index task配置

一只小江
 一只小江
发布于 2016/03/29 19:39
字数 1720
阅读 1579
收藏 3

先搭建几个节点:coordinator、historical、overlord、middleManager。并且启动服务。

前提:需要准备好mysql(http://my.oschina.net/u/2460844/blog/637334 该文中说明了mysql的配置)、hdfs集群、zookeeper(单机版就可以)

1. __common 配置:

 druid.extensions.coordinates=["io.druid.extensions:druid-examples","io.druid.extensions:druid-kafka-eight","io.druid.extensions:mysql-metadata-storage","io.druid.extensions:druid-hdfs-storage"]
druid.extensions.localRepository=extensions-repo
druid.zk.service.host=druid01:2181
druid.metadata.storage.type=mysql
druid.metadata.storage.connector.connectURI=jdbc:mysql://druid01:3306/druid
druid.metadata.storage.connector.user=druid
druid.metadata.storage.connector.password=diurd1234
druid.storage.type=hdfs
druid.storage.storageDirectory=hdfs://vm1.cci/tmp/druid/localStorage
druid.cache.type=local
druid.cache.sizeInBytes=10000000
druid.selectors.indexing.serviceName=overlord
druid.selectors.coordinator.serviceName=coordinator
druid.emitter=logging

2. coordinator 配置:

druid.host=druid01
druid.port=8081
druid.service=coordinator
druid.coordinator.startDelay=PT5M

3. historical 配置:

druid.host=druid02
druid.port=8082
druid.service=druid/historical
druid.historical.cache.useCache=true
druid.historical.cache.populateCache=true
druid.processing.buffer.sizeBytes=100000000
druid.processing.numThreads=3
druid.server.http.numThreads=5
druid.server.maxSize=300000000000
druid.segmentCache.locations=[{"path": " /tmp/druid/indexCache", "maxSize": 300000000000}]
druid.monitoring.monitors=["io.druid.server.metrics.HistoricalMetricsMonitor", "com.metamx.metrics.JvmMonitor"]

4. overlord 配置:

druid.host=druid03
druid.port=8090
druid.service=overlord
druid.indexer.autoscale.doAutoscale=true
druid.indexer.autoscale.strategy=ec2
druid.indexer.autoscale.workerIdleTimeout=PT90m
druid.indexer.autoscale.terminatePeriod=PT5M
druid.indexer.autoscale.workerVersion=0
druid.indexer.logs.type=local
druid.indexer.logs.directory=/tmp/druid/indexlog
druid.indexer.runner.type=remote
druid.indexer.runner.minWorkerVersion=0
# Store all task state in the metadata storage
druid.indexer.storage.type=metadata
#druid.indexer.fork.property.druid.processing.numThreads=1
#druid.indexer.fork.property.druid.computation.buffer.size=100000000
druid.indexer.runner.type=remote

5. middleManager 配置:

druid.host=druid04
druid.port=8091
druid.service=druid/middlemanager
druid.indexer.logs.type=local
druid.indexer.logs.directory=/tmp/druid/indexlog
druid.indexer.fork.property.druid.processing.numThreads=5
druid.indexer.fork.property.druid.computation.buffer.size=100000000
# Resources for peons
druid.indexer.runner.javaOpts=-server -Xmx3g 
druid.indexer.task.baseTaskDir=/tmp/persistent/task/

6. 分别启动各个节点,如果出现了启动问题,很能是因为内存问题,可适当调整java运行参数。

7. 需要导入的数据 wikipedia_data.csv , wikipedia_data.json

   ---wikipedia_data.json:

{"timestamp": "2013-08-31T01:02:33Z", "page": "Gypsy Danger", "language" : "en", "user" : "nuclear", "unpatrolled" : "true", "newPage" : "true", "robot": "false", "anonymous": "false", "namespace":"article", "continent":"North America", "country":"United States", "region":"Bay Area", "city":"San Francisco", "added": 57, "deleted": 200, "delta": -143}
{"timestamp": "2013-08-31T03:32:45Z", "page": "Striker Eureka", "language" : "en", "user" : "speed", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Australia", "country":"Australia", "region":"Cantebury", "city":"Syndey", "added": 459, "deleted": 129, "delta": 330}
{"timestamp": "2013-08-31T07:11:21Z", "page": "Cherno Alpha", "language" : "ru", "user" : "masterYi", "unpatrolled" : "false", "newPage" : "true", "robot": "true", "anonymous": "false", "namespace":"article", "continent":"Asia", "country":"Russia", "region":"Oblast", "city":"Moscow", "added": 123, "deleted": 12, "delta": 111}
{"timestamp": "2013-08-31T11:58:39Z", "page": "Crimson Typhoon", "language" : "zh", "user" : "triplets", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"China", "region":"Shanxi", "city":"Taiyuan", "added": 905, "deleted": 5, "delta": 900}
{"timestamp": "2013-08-31T12:41:27Z", "page": "Coyote Tango", "language" : "ja", "user" : "cancer", "unpatrolled" : "true", "newPage" : "false", "robot": "true", "anonymous": "false", "namespace":"wikipedia", "continent":"Asia", "country":"Japan", "region":"Kanto", "city":"Tokyo", "added": 1, "deleted": 10, "delta": -9}

   ---wikipedia_data.csv:

 2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Francisco, 57, 200, -143
2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Francisc, 57, 200, -143
2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Francis, 57, 200, -143
2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Franci, 57, 200, -143
2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Franc, 57, 200, -143
2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Fran, 57, 200, -143
2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Fra, 57, 200, -143
2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San Fr, 57, 200, -143
2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, San F, 57, 200, -143
2013-08-31T01:02:33Z, Gypsy Danger, en, nuclear, true, true, false, false, article, North America, United States, Bay Area, Sa , 57, 200, -143

8. 注意 这里导入的数据 如果保存在本机磁盘导入时,数据文件必须保存在middleManager节点上,不然提交task后无法找到文件。如果是从hdfs中导入,只需要先put到hdfs文件系统中。这里的overlord 节点是druid03(你可以换成ip)。

9. 在任意一个节点上(保证这个节点能够访问druid03)。创建一个json的index task任务:

--9.1 导入一个 本地local保存的、json格式的文件,这个task的json怎么来写

  先将数据wikipedia_data.jso保存在middleManager节点的druid的文件夹下(比如/root/druid-0.8.3)。

  命令为wikipedia_index_local_json_task.json 文件:

{
  "type" : "index_hadoop",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          },
          "dimensionsSpec" : {
            "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [
        {
          "type" : "count",
          "name" : "count"
        },
        {
          "type" : "doubleSum",
          "name" : "added",
          "fieldName" : "added"
        },
        {
          "type" : "doubleSum",
          "name" : "deleted",
          "fieldName" : "deleted"
        },
        {
          "type" : "doubleSum",
          "name" : "delta",
          "fieldName" : "delta"
        }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "NONE",
        "intervals" : [ "2013-08-31/2013-09-01" ]
      }
    },
    "ioConfig": {
      "type": "index",
      "firehose": {
        "type": "local",
        "baseDir": "./",
        "filter": "wikipedia_data.json"
      }
    },
    "tuningConfig": {
      "type": "index",
      "targetPartitionSize": 0,
      "rowFlushBoundary": 0
    }
  }
}

9.2 提交任务,前面已经说过了overlord节点在druid03上,所以想druid03提交任务

   curl -X 'POST' -H 'Content-Type:application/json' -d @wikipedia_index_local_json_task.json druid03:8090/druid/indexer/v1/task

在overlord节点的日志上可以看出任务的情况,当出现如下信息表示任务成功

2016-03-29T17:35:11,385 INFO [forking-task-runner-1] io.druid.indexing.overlord.ForkingTaskRunner - Logging task index_hadoop_NN_2016-03-29T17:35:11.510+08:00 output to: /tmp/persistent/task/index_hadoop_NN_2016-03-29T17:35:11.510+08:00/log
2016-03-29T17:42:15,263 INFO [forking-task-runner-1] io.druid.indexing.overlord.ForkingTaskRunner - Process exited with status[0] for task: index_hadoop_NN_2016-03-29T17:35:11.510+08:00
2016-03-29T17:42:15,265 INFO [forking-task-runner-1] io.druid.indexing.common.tasklogs.FileTaskLogs - Wrote task log to: /tmp/druid/indexlog/index_hadoop_NN_2016-03-29T17:35:11.510+08:00.log
2016-03-29T17:42:15,267 INFO [forking-task-runner-1] io.druid.indexing.overlord.ForkingTaskRunner - Removing task directory: /tmp/persistent/task/index_hadoop_NN_2016-03-29T17:35:11.510+08:00
2016-03-29T17:42:15,284 INFO [WorkerTaskMonitor-1] io.druid.indexing.worker.WorkerTaskMonitor - Job's finished. Completed [index_hadoop_NN_2016-03-29T17:35:11.510+08:00] with status [SUCCESS

9.3 本地导入csv格式数据的 task文件示例,wikipedia_data.csv 需要先保存在middleManager节点的druid目录下(比如/root/druid-0.8.3)。

{
  "type": "index_hadoop",
  "spec": {
    "dataSchema": {
      "dataSource": "wikipedia",
      "parser": {
        "type": "string",
      
 "parseSpec":
 {
       "format" : "csv",
       "timestampSpec" : 
   {
         "column" : "timestamp"
       },
       "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],
      "dimensionsSpec" : 
   {
        "dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
       } 
        }
      },
      "metricsSpec": [
        {
          "type": "count",
          "name": "count"
        },
        {
          "type": "doubleSum",
          "name": "added",
          "fieldName": "added"
        },
        {
          "type": "doubleSum",
          "name": "deleted",
          "fieldName": "deleted"
        },
        {
          "type": "doubleSum",
          "name": "delta",
          "fieldName": "delta"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "NONE",
        "intervals": ["2013-08-31/2013-09-01"]
      }
    },
    "ioConfig": {
      "type": "index",
      "firehose": {
        "type": "local",
        "baseDir": "./",
        "filter": "wikipedia_data.csv"
      }
    },
    "tuningConfig": {
      "type": "index",
      "targetPartitionSize": 0,
      "rowFlushBoundary": 0
    }
  }
}

9.4 导入hdfs中的json文件。先需要把wikipedia_data.json put到hdfs中,记住目录然后在task文件中给定路径,hdfs路径中要带有hdfs 的namenode的 名字或者ip。这里使用vm1.cci代替namenode的ip。注意对比与本地导入task文件的区别,这些区别决定你能否导入成功。

 {
  "type" : "index_hadoop",
  "spec" : {
    "dataSchema" : {
      "dataSource" : "wikipedia",
      "parser" : {
        "type" : "string",
        "parseSpec" : {
          "format" : "json",
          "timestampSpec" : {
            "column" : "timestamp",
            "format" : "auto"
          },
          "dimensionsSpec" : {
            "dimensions": ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"],
            "dimensionExclusions" : [],
            "spatialDimensions" : []
          }
        }
      },
      "metricsSpec" : [
        {
          "type" : "count",
          "name" : "count"
        },
        {
          "type" : "doubleSum",
          "name" : "added",
          "fieldName" : "added"
        },
        {
          "type" : "doubleSum",
          "name" : "deleted",
          "fieldName" : "deleted"
        },
        {
          "type" : "doubleSum",
          "name" : "delta",
          "fieldName" : "delta"
        }
      ],
      "granularitySpec" : {
        "type" : "uniform",
        "segmentGranularity" : "DAY",
        "queryGranularity" : "NONE",
        "intervals" : [ "2013-08-31/2013-09-01" ]
      }
    },
    "ioConfig" : {
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "paths" : "hdfs://vm1.cci/tmp/druid/datasource/wikipedia_data.json"
      }
    },
    "tuningConfig" : {
      "type": "hadoop"
    }
  }
}

9.5 导入hdfs中的csv格式文件。task文件描述如下:

{
  "type": "index_hadoop",
  "spec": {
    "dataSchema": {
      "dataSource": "wikipedia",
      "parser": {
        "type": "string",
      
 "parseSpec":
 {
       "format" : "csv",
       "timestampSpec" : 
   {
         "column" : "timestamp"
       },
       "columns" : ["timestamp","page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city","added","deleted","delta"],
      "dimensionsSpec" : 
   {
        "dimensions" : ["page","language","user","unpatrolled","newPage","robot","anonymous","namespace","continent","country","region","city"]
       } 
        }
      },
      "metricsSpec": [
        {
          "type": "count",
          "name": "count"
        },
        {
          "type": "doubleSum",
          "name": "added",
          "fieldName": "added"
        },
        {
          "type": "doubleSum",
          "name": "deleted",
          "fieldName": "deleted"
        },
        {
          "type": "doubleSum",
          "name": "delta",
          "fieldName": "delta"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "DAY",
        "queryGranularity": "NONE",
        "intervals": ["2013-08-31/2013-09-01"]
      }
    },
    "ioConfig" : {
      "type" : "hadoop",
      "inputSpec" : {
        "type" : "static",
        "paths" : "hdfs://vm1.cci/tmp/druid/datasource/wikipedia_data.csv"
      }
    },
    "tuningConfig" : {
      "type": "hadoop"
    }
  }
}

总结: druid.io 可以配置的项超级多,任何一个地方配置疏忽都可能会导致task失败。这里给出四种示例,还是有必要细分其中的差别。初学者磕绊在此很难免。

 

© 著作权归作者所有

共有 人打赏支持
一只小江
粉丝 100
博文 21
码字总数 51352
作品 0
杭州
程序员
私信 提问
sqoop简介以及架构介绍

本篇文章在具体介绍Sqoop之前,先给大家用一个流程图介绍Hadoop业务的开发流程以及Sqoop在业务当中的实际地位。 如上图所示:在实际的业务当中,我们首先对原始数据集通过MapReduce进行数据清...

qi49125
2017/11/15
0
0
Sqoop架构以及应用介绍

本篇文章在具体介绍Sqoop之前,先给大家用一个流程图介绍Hadoop业务的开发流程以及Sqoop在业务当中的实际地位。 如上图所示:在实际的业务当中,我们首先对原始数据集通过MapReduce进行数据清...

a2011480169
2016/05/25
0
0
EMR Druid 探索(二)

EMR Druid 探索(二) EMR Druid 上文介绍了 Druid 的特点、使用场景以及性能。EMR 在 3.11.0 引入了 Druid,并专门推出了一种新的集群类型:Druid 集群。在具体使用时,Druid 集群可以与 Ha...

xy_xind
2018/06/01
0
0
druid.io 使用indexing service 配置出现 - Received FAILED

多节点部署druid.io,使用indexing service进行批量数据导入,出现问题。 提交任务后,在overlord节点消息中出现如下信息: 2016-03-22T19:25:17,555 INFO [Curator-PathChildrenCache-0] i...

一只小江
2016/03/22
457
0
Spring XD 1.0.0.M5 发布

Spring XD 团队发布了 Spring XD 1.0.0 Milestone 5 版本,现已提供下载。 此版本简化了解决通用大数据问题,例如数据的摄入和导出、实时的分析和批工作量编排的功能。 新特性: Pre-define...

oschina
2014/01/14
2.7K
3

没有更多内容

加载失败,请刷新页面

加载更多

C++ vector和list的区别

1.vector数据结构 vector和数组类似,拥有一段连续的内存空间,并且起始地址不变。 因此能高效的进行随机存取,时间复杂度为o(1); 但因为内存空间是连续的,所以在进行插入和删除操作时,会造...

shzwork
今天
3
0
Spring之invokeBeanFactoryPostProcessors详解

Spring的refresh的invokeBeanFactoryPostProcessors,就是调用所有注册的、原始的BeanFactoryPostProcessor。 相关源码 public static void invokeBeanFactoryPostProcessors(Configu......

cregu
昨天
4
0
ibmcom/db2express-c_docker官方使用文档

(DEPRECIATED) Please check DB2 Developer-C Edition for the replacement. What is IBM DB2 Express-C ? ``IBM DB2 Express-C``` is the no-charge community edition of DB2 server, a si......

BG2KNT
昨天
3
0
Ubuntu 18.04.2 LTS nvidia-docker2 : 依赖: docker-ce (= 5:18.09.0~3-0~ubuntu-bionic)

平台:Ubuntu 18.04.2 LTS nvidia-docker2 版本:2.0.3 错误描述:在安装nvidia-docker2的时候报dpkg依赖错误 nvidia-docker2 : 依赖: docker-ce (= 5:18.09.0~3-0~ubuntu-bionic) 先看一下依......

Pulsar-V
昨天
4
0
学习笔记1-goland结构体(struct)

写在前面:若有侵权,请发邮件by.su@qq.com告知。 转载者告知:如果本文被转载,但凡涉及到侵权相关事宜,转载者需负责。请知悉! 本文永久更新地址:https://my.oschina.net/bysu/blog/3036...

不最醉不龟归
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部