Python读取Redis数据导出到Elasticsearch
Python读取Redis数据导出到Elasticsearch
go2school 发表于2年前
Python读取Redis数据导出到Elasticsearch
  • 发表于 2年前
  • 阅读 548
  • 收藏 7
  • 点赞 0
  • 评论 0
摘要: 写一个简单的Python脚本读取Redis的数据,导出到Elasticsearch。用于做笔记的。里面涉及的概念有:python命令行解析参数、redis的keys和hgetall命令,elasticsearch的创建index、创建mapping、bulk写入数据
#! usr/bin/python
# -*- coding:utf-8 -*-
import redis
import datetime
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
import sys, getopt

def usage():
	print 'usage: python cmd.py -e <elasticsearch host> -r <redis host> -p <redis list key prefix>'

if __name__ == '__main__':
	
	opts, args = getopt.getopt(sys.argv[1:], "e:r:p:")

	elasticsearch_host = ''
	redis_host = ''
	redis_list_prefix= ''
	for op, value in opts:
		if op == "-e":
			elasticsearch_host = value
		elif op == "-r":
			redis_host = value
		elif op == "-p":
			redis_list_prefix = value
		
	if elasticsearch_host == '' or redis_host == '' or redis_list_prefix == '':
		usage()
		sys.exit(1)
		
	client = Elasticsearch([{'host' : elasticsearch_host}])

	r = redis.Redis(host = redis_host, port=6379)

	doctype = "watchvideocount"

	#get all keys ends with day description except today
	videocount_keys = r.keys(redis_list_prefix + '_*')

	#start query data
	for k_index in videocount_keys:
		#get all <k, v> data
		kvalues = r.hgetall(k_index)
		
		#get day
		p = k_index.index('_')
		day = k_index[p+1:]	
			
		#make index name	
		index_name = k_index.lower()		
		#create index
		try:
			client.indices.create(index=index_name)
		except Exception:
			print 'index ', index_name, ' exist'
		namapping = {
				doctype: {
					"properties": {
						"day":{"type": "date"},
						"EID":{"type": "string"},
						"CID":{"type": "string"},
						"VID":{"type": "string"},
						"VALUE":{"type": "integer"}
					}
				}
			}
		client.indices.put_mapping(index=index_name, doc_type=doctype, body=namapping)
		
		#data cache
		data_cache = []	
		for key in kvalues:
			fields = key.split('_')
			eid = fields[0]
			cid = fields[1]
			vid = fields[2]
			value = kvalues[key]
			#make primary key
			ID = day + '_' + eid + '_' + cid + '_' + vid		
			#make JSON data
			json_data = {'_index':index_name, '_type':doctype, "_id":ID, "DAY":day, "EID":eid, "CID":cid, "VID":vid, "VALUE":value}	
			#append data cache
			data_cache.append(json_data)
		
		#index into elasticsearch	
		bulk(client, actions=data_cache, stats_only=True)
		print k_index, 'done'


共有 人打赏支持
粉丝 11
博文 30
码字总数 13621
×
go2school
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: