hadoop整合mongdb
hadoop整合mongdb
Java_Coder 发表于2年前
hadoop整合mongdb
  • 发表于 2年前
  • 阅读 49
  • 收藏 3
  • 点赞 0
  • 评论 0

腾讯云 十分钟定制你的第一个小程序>>>   

摘要: 花了一些時間,终于实现hadoop数据保存到mongdb里了。。(当然也可以从mongdb数据库里面取数据)

1、确保已经安装了mongdb

2、下载jar包

注意:列出了两个重要的jar包,其他的hadoop的jar包同样需要!

3、准备

数据格式是这种:

创建数据库和集合

4、代码实现

public class CityDis {

	private static class CityDisMapper extends
			Mapper<LongWritable, Text, Text, IntWritable> {

		@Override
		protected void map(LongWritable key, Text value,
				Mapper<LongWritable, Text, Text, IntWritable>.Context context)
				throws IOException, InterruptedException {

			if( value.toString().split(",")[5].contains(" ")){
				String outKey = value.toString().split(",")[5].split(" ")[1];
				context.write(new Text(outKey), new IntWritable(1));
			}

		}
	}

	private static class CityDisReduce extends
			Reducer<Text, IntWritable, NullWritable, BSONWritable> {

		@Override
		protected void reduce(Text key, Iterable<IntWritable> values,
				Reducer<Text, IntWritable, NullWritable, BSONWritable>.Context context)
				throws IOException, InterruptedException {

			int count = 0;
			for (IntWritable value : values) {
				count += value.get();
			}
			
			BasicBSONObject out = new BasicBSONObject();
			
			out.put("name",key.toString());
			out.put("value",count);
			
			context.write(NullWritable.get(), new BSONWritable(out));

		}

	}

	public static void main(String[] args) throws IOException,
			ClassNotFoundException, InterruptedException {

		Configuration configuration = HadoopConfig.getConfiguration();
		
		Job job = Job.getInstance(configuration, "地区分布");

		job.setJarByClass(CityDis.class);
		job.setMapperClass(CityDisMapper.class);
		job.setMapOutputKeyClass(Text.class);
		job.setMapOutputValueClass(IntWritable.class);
		job.setOutputKeyClass(NullWritable.class);
		job.setOutputValueClass(BSONWritable.class);
		job.setReducerClass(CityDisReduce.class);

		FileInputFormat.addInputPath(job, new Path("/data/user.txt"));
		job.setOutputFormatClass(MongoOutputFormat.class);		
		System.exit(job.waitForCompletion(true) ? 0 : 1);

	}
mongo-defaults.xml:

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
  <property>
    <!-- run the job verbosely ? -->
    <name>mongo.job.verbose</name>
    <value>false</value>
  </property>
  <property>
    <!-- Run the job in the foreground and wait for response, or background it? -->
    <name>mongo.job.background</name>
    <value>false</value>
  </property>
  <property>
      <!-- The field to pass as the mapper key. Defaults to _id if blank -->
      <name>mongo.input.key</name>
      <value></value>
  </property>
  
  <!--我们可以从这里来设置读取mongdb数据的位置-->
  <!--<property>
    <name>mongo.input.uri</name>
    <value>mongodb://127.0.0.1/bigdata.test</value>    
  </property>-->
  
  <property>
    <!-- If you are writing to mongo, the URI -->
    <name>mongo.output.uri</name>
    <value>mongodb://127.0.0.1/bigdata.city_weibo</value>
  </property>
  <property>
    <!-- The query, in JSON, to execute [OPTIONAL] -->
    <name>mongo.input.query</name>
    <value></value>
  </property>
  <property>
    <!-- The fields, in JSON, to read [OPTIONAL] -->
    <name>mongo.input.fields</name>
    <value></value>
  </property>
  <property>
    <!-- A JSON sort specification for read [OPTIONAL] -->
    <name>mongo.input.sort</name>
    <value></value>
  </property>
  <property>
    <!-- The number of documents to limit to for read [OPTIONAL] -->
    <name>mongo.input.limit</name>
    <value>0</value> <!-- 0 == no limit -->
  </property>
  <property>
    <!-- The number of documents to skip in read [OPTIONAL] -->
    <!-- TODO - Are we running limit() or skip() first? -->
    <name>mongo.input.skip</name>
    <value>0</value> <!-- 0 == no skip -->
  </property>
  <property>
    <!-- IF you want to control the split size for input, set it here.
      Should be a long indicating # of docs per split
      Affects # of mappers so be careful what you do -->
    <name>mongo.input.split_size</name>
  </property>
  <!-- These .job.* class defs are optional and only needed if you use the MongoTool baseclass -->
  <property>
    <!-- Class for the mapper -->
    <name>mongo.job.mapper</name>
    <value></value>
  </property>
  <property>
    <!-- Reducer class -->
    <name>mongo.job.reducer</name>
    <value></value>
  </property>
  <property>
    <!-- InputFormat Class -->
    <name>mongo.job.input.format</name>
    <!-- <value>com.mongodb.hadoop.MongoInputFormat</value> -->
    <value></value>
  </property>
  <property>
    <!-- OutputFormat Class -->
    <name>mongo.job.output.format</name>
    <!-- <value>com.mongodb.hadoop.MongoOutputFormat</value> -->
    <value></value>
  </property>
  <property>
    <!-- Output key class for the output format -->
    <name>mongo.job.output.key</name>
    <value></value>
  </property>
  <property>
    <!-- Output value class for the output format -->
    <name>mongo.job.output.value</name>
    <value></value>
  </property>
  <property>
    <!-- Output key class for the mapper [optional] -->
    <name>mongo.job.mapper.output.key</name>
    <value></value>
  </property>
  <property>
    <!-- Output value class for the mapper [optional] -->
    <name>mongo.job.mapper.output.value</name>
    <value></value>
  </property>
  <property>
    <!-- Class for the combiner [optional] -->
    <name>mongo.job.combiner</name>
    <value></value>
  </property>
  <property>
    <!-- Partitioner class [optional] -->
    <name>mongo.job.partitioner</name>
    <value></value>
  </property>
  <property>
    <!-- Sort Comparator class [optional] -->
    <name>mongo.job.sort_comparator</name>
    <value></value>
  </property>
</configuration>

5、运行效果

标签: mongdb hadoop
共有 人打赏支持
粉丝 56
博文 155
码字总数 102864
×
Java_Coder
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: