文档章节

【技术教程】SequoiaDB对接Kafka

巨杉数据库
 巨杉数据库
发布于 2017/11/02 13:39
字数 2027
阅读 78
收藏 2

 

1、 背景

当前互联网、金融、政府等行业,活动流数据几乎无处不在。对这种数据通常的处理方式是先把各种活动以日志的形式写入某种文件,然后周期性地对这些文件进行统计分析。活动流数据的这种处理方式对实时性要求越来越高的场景已经不在适用并且这种处理方式也增加了整个系统的复杂性,为了解决这种问题,分布式开源消息系统Kakfa已被多家不同类型的公司 作为多种类型的数据管道和消息系统使用。

Kafka是一种分布式的,基于发布/订阅的消息系统。提供消息持久化能力,支持消息分区,分布式消费,同时保证每个分区内的消息顺序传输,支持在线水平扩展、高吞吐率,同时支持离线数据处理和实时数据处理。

巨杉数据库SequoiaDB支持海量分布式数据存储,并且支持垂直分区和水平分区,利用这些特性可以将Kafka中的消息存储到SequoiaDB中方便业务系统后续数据分析、数据应用。本文主要讲解巨杉数据库SequoiaDB如何消费Kafka中的消息以及将消息存储到SequoiaDB中。

2、 产品介绍

巨杉数据库SequoiaDB是一款分布式非关系型文档数据库,可以被用来存取海量非关系型的数据,其底层主要基于分布式,高可用,高性能与动态数据类型设计,它兼顾了关系型数据库中众多的优秀设计:如索引、动态查询和更新等,同时以文档记录为基础更好地处理了动态灵活的数据类型。PostgreSQL支持标准SQL,巨杉SequoiaDB SSQL套件通过扩展 PostgreSQL功能可以使用标准SQL 语句访问 SequoiaDB 数据库,完成对SequoiaDB 数据库的各种操作。将Kafka中的消息存储到SequoiaDB后,可利用巨杉SequoiaDB SSQL对这些消息数据进行在线实时的数据分析和数据应用。

3、 环境搭建

3.1、软件配置

操作系统:windows 7

JDK:1.7.0_80 64位,下载地址为:http://www.oracle.com/technetwork/java/javase/downloads/java-archive-downloads-javase7-521261.html#jdk-7u80-oth-JPR

eclipse:4.5.2

SequoiaDB:1.12.5或以上版本

Kakfa:0.10.0.0,下载地址为:http://211.162.127.20/files/5115000001D9C0FE/www-us.apache.org/dist/kafka/0.10.0.0/kafka_2.10-0.10.0.0.tgz

本项目主要实现从Kafka中消费数据并写入到SequoiaDB中来展示Kafka对接SequoiaDB的整个过程。

创建项目工程如下图:

 

图3-1-1

3.2、kafka启动及topic创建

在kafka启动前启动zookeeper,Kafka启动,执行脚本如下:

./kafka-server-start.sh ../config/server.properties &

Kafka创建topic,执行脚本如下:

./kafka-topics.sh --zookeeper localhost:2181 --create --topic kafkaSdb --partitions 1 --replication-factor 1

执行结果如下图:

 

图3-2-1

验证Kafka主题,执行脚本如下:

./kafka-topics.sh --zookeeper localhost:2181 –list

执行结果如下图:

 

图3-2-2

4、 代码演示

4.1、框架搭建代码展示

Kafka分布式系统分为生产者和消费者,生产者主要产生消息数据供消费者消费,消费者主要消费存储在Kafka中的消息数据。本项目主要演示向SequoiaDB中写入Kafka中的消息,故消息的生产只提供演示代码。生产者和消费者各种参数分别放在各自的配置文件中。

Ø 生产端配置文件如下:

kafka-producer.properties

bootstrap.servers=192.168.1.35:9092

retries=0

linger.ms=1

key.serializer=org.apache.kafka.common.serialization.StringSerializer

value.serializer=org.apache.kafka.common.serialization.StringSerializer

partitioner.class=com.sequoiadb.kafka.DefaultPartitioner

Ø 消费端配置文件如下:

kafka-consumer.properties

bootstrap.servers=192.168.1.35:9092 

enable.auto.commit=true  

auto.commit.interval.ms=60000

enable.auto.commit=false

auto.offset.reset=earliest

session.timeout.ms=30000

key.deserializer=org.apache.kafka.common.serialization.StringDeserializer

value.deserializer=org.apache.kafka.common.serialization.StringDeserializer

Ø Kafka主题、SequoiaDB集合、消息分区配置文件如下:

config.json

[{

topicName:'kafkaSdb',

sdbCLName:'kafkaSdb',

partitionNum:1,

topicGroupName:'kafkaSdb-consumer-group',

pollTimeout:5000

}]

4.2、业务实现代码展示

4.2.1、配置代码展示

本项目将Kafka的配置放在配置文件中如Kafka的主题,主题的分区数,SequoiaDB集合并用java对象进行封装,利用工具类进行获取。

配置信息java实体类如下:

package com.sequoiadb.kafka.bean;

public class KafkaConsumerConfig {

private String topicName;

private String sdbCLName;

private int partitionNum = 1;

private String topicGroupName;

private long pollTimeout = Long.MAX_VALUE;

public String getTopicName() {

return topicName;

}

public void setTopicName(String topicName) {

this.topicName = topicName;

}

public String getSdbCLName() {

return sdbCLName;

}

public void setSdbCLName(String sdbCLName) {

this.sdbCLName = sdbCLName;

}

public int getPartitionNum() {

return partitionNum;

}

public void setPartitionNum(int partitionNum) {

this.partitionNum = partitionNum;

}

public String getTopicGroupName() {

return topicGroupName;

}

public void setTopicGroupName(String topicGroupName) {

this.topicGroupName = topicGroupName;

}



public long getPollTimeout() {

return pollTimeout;

}

public void setPollTimeout(long pollTimeout) {

this.pollTimeout = pollTimeout;

}

public String toString(){

return "[topicName="+this.topicName+",sdbCLName="+this.sdbCLName+",partitionNum="+this.partitionNum",topicGroupName="+this.topicGroupName+",pollTimeout="+this.pollTimeout+"]";

}

}

配置信息获取工具类如下:

package com.sequoiadb.utils;

import java.io.IOException;

import java.io.InputStream;

import java.util.Properties;

public class PropertiesUtils {

private static Properties prop = null;

static{

InputStream in = PropertiesUtils.class.getClassLoader().getResourceAsStream("config.properties");

prop = new Properties();

try {

prop.load(in);

} catch (IOException e) {

e.printStackTrace();

}

}

public static String getProperties(String key){

return (String)prop.get(key);

}

public static void main(String[] argc){

System.out.println(PropertiesUtils.getProperties("scm.url"));

}

}

4.2.2、业务逻辑代码演示

生产者业务逻辑代码展示:

package com.sequoiadb.kafka;

import java.io.IOException;

import java.io.InputStream;

import java.util.Properties;

import org.apache.commons.io.IOUtils;

import org.apache.kafka.clients.producer.Callback;

import org.apache.kafka.clients.producer.KafkaProducer;

import org.apache.kafka.clients.producer.ProducerRecord;

import org.apache.kafka.clients.producer.RecordMetadata;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.sequoiadb.utils.Configuration;

public class PartitionTest {

private static Logger log = LoggerFactory.getLogger(PartitionTest.class);

private static String location = "kafka-producer.properties";// 配置文件位置

public static void main(String[] args) {

Properties props = new Properties();

String json = null;

try {

props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));

InputStream in = Configuration.class.getClassLoader().getResourceAsStream("oracle.json");

json = IOUtils.toString(in);

} catch (IOException e) {

e.printStackTrace();

}

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props);

for (int i = 0; i < 1000; i++) {

ProducerRecord<String, String> record = new ProducerRecord<String, String>("oracle", json);

producer.send(record, new Callback() {

@Override

public void onCompletion(RecordMetadata metadata, Exception e) {

if (e != null) {

log.error("the producer has a error:" + e.getMessage());

}

}

});

}

try {

Thread.sleep(1000);

producer.close();

} catch (InterruptedException e1) {

e1.printStackTrace();

}

}

}

消费者业务逻辑采用一线程一主题的方式进行消息的消费,主程序入口代码如下:

package com.sequoiadb.kafka;

import java.util.ArrayList;

import java.util.List;

import java.util.Map;

import java.util.concurrent.ExecutorService;

import java.util.concurrent.Executors;

import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.sequoiadb.kafka.bean.KafkaConsumerConfig;

import com.sequoiadb.utils.Configuration;

import com.sequoiadb.utils.Constants;

public class KafkaSdb {

private static Logger log = LoggerFactory.getLogger(KafkaSdb.class);

private static ExecutorService executor;

public static void main(String[] args) {

// 获取kafka主题配置

List<KafkaConsumerConfig> topicSdbList = Configuration.getConfiguration();

if (topicSdbList != null && topicSdbList.size() > 0) {

executor = Executors.newFixedThreadPool(topicSdbList.size());

final List<ConsumerThread> consumerList = new ArrayList<ConsumerThread>();

for (int i = 0; i < topicSdbList.size(); i++) {

KafkaConsumerConfig consumerConfig = topicSdbList.get(i);

ConsumerThread consumer = new ConsumerThread(consumerConfig);

consumerList.add(consumer);

executor.submit(consumer);

}

Runtime.getRuntime().addShutdownHook(new Thread() {

@Override

public void run() {

for (ConsumerThread consumer : consumerList) {

consumer.shutdown();

}

executor.shutdown();

try {

executor.awaitTermination(5000, TimeUnit.MILLISECONDS);

} catch (InterruptedException e) {

e.printStackTrace();

}

}

});

} else {

log.error("主题为空,请确认主题配置是否正确!");

}

}

}

线程类负责具体的消息的消费,并且将消息数据写入到SequoiaDB中,具体代码如下:

package com.sequoiadb.kafka;

import java.io.IOException;

import java.util.ArrayList;

import java.util.Arrays;

import java.util.Iterator;

import java.util.List;

import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;

import org.apache.kafka.clients.consumer.ConsumerRecords;

import org.apache.kafka.clients.consumer.KafkaConsumer;

import org.apache.kafka.common.errors.WakeupException;

import org.bson.BSONObject;

import org.bson.BasicBSONObject;

import org.slf4j.Logger;

import org.slf4j.LoggerFactory;

import com.sequoiadb.base.CollectionSpace;

import com.sequoiadb.base.DBCollection;

import com.sequoiadb.base.Sequoiadb;

import com.sequoiadb.exception.BaseException;

import com.sequoiadb.kafka.bean.KafkaConsumerConfig;

import com.sequoiadb.utils.ConnectionPool;

import com.sequoiadb.utils.Constants;

import net.sf.json.JSONArray;

import net.sf.json.JSONObject;

public class ConsumerThread implements Runnable {

private static Logger log = LoggerFactory.getLogger(ConsumerThread.class);

private String location = "kafka-consumer.properties";// 配置文件位置

private Sequoiadb sdb = null;

private CollectionSpace cs = null;

private DBCollection cl = null;

private KafkaConsumer<String, String> consumer = null;

// private String topicName = null;

// private String clName = null;

// private String topicGroupName = null;

// private long pollTimeout = 1000;

private KafkaConsumerConfig consumerConfig;



public ConsumerThread(KafkaConsumerConfig consumerConfig) {

if (null == sdb) {

sdb = ConnectionPool.getInstance().getConnection();

}

if (sdb.isCollectionSpaceExist(Constants.CS_NAME)) {

cs = sdb.getCollectionSpace(Constants.CS_NAME);

} else {

throw new BaseException("集合空间" + Constants.CS_NAME + "不存在!");

}

if (null == cs) {

throw new BaseException("集合空间不能为null!");

} else {

this.consumerConfig = consumerConfig;

this.cl = cs.getCollection(this.consumerConfig.getSdbCLName());



}

Properties props = new Properties();

try {

props.load(Thread.currentThread().getContextClassLoader().getResourceAsStream(location));

} catch (IOException e) {

e.printStackTrace();

}

props.put("group.id", this.consumerConfig.getTopicGroupName());

consumer = new KafkaConsumer<>(props);

}

@Override

public void run() {

log.info("主题为" + this.consumerConfig.getTopicName() + "的消费者线程启动!");

try {

// 订阅topic

consumer.subscribe(Arrays.asList(this.consumerConfig.getTopicName()));

while (true) {

ConsumerRecords<String, String> records = consumer.poll(this.consumerConfig.getPollTimeout());

// consumer.seekToBeginning(Arrays.asList(new

// TopicPartition(this.topicName, 0)));

// consumer.seek(new TopicPartition(this.topicName, 0), 0);

List<BSONObject> list = new ArrayList<BSONObject>();

for (ConsumerRecord<String, String> record : records) {

String value = record.value();

JSONObject valueJson = JSONObject.fromObject(value);

if (valueJson.containsKey("data")) {

JSONArray dataJsonArray = valueJson.getJSONArray("data");

for (int i = 0; i < dataJsonArray.size(); i++) {

BSONObject httpBson = new BasicBSONObject();

JSONObject dataJson = dataJsonArray.getJSONObject(i);

Iterator iter = dataJson.keys();

while (iter.hasNext()) {

String key = (String) iter.next();

String bsonValue = dataJson.getString(key);

httpBson.put(key, bsonValue);

}

list.add(httpBson);

// clHttp.insert(httpBson);

}

} else {

log.error("消息中不存在data节点!");

}

}

if (list != null && list.size() > 0) {

try {

this.cl.bulkInsert(list, DBCollection.FLG_INSERT_CONTONDUP);

log.info("主题为"+this.consumerConfig.getTopicName()+"的消息插入SDB成功,插入记录数为:"+list.size());

} catch (BaseException e) {

e.printStackTrace();

}

}

consumer.commitSync();

}

} catch (WakeupException e) {

} finally {

consumer.close();

}

}

public void shutdown(){

consumer.wakeup();

}

}

5、 总结

从上述对接过程中,Kafka中的消息写入SequoiaDB难点是Kafka中主题分区的配置以及多线程如何消费各主题分区中的消息,并且处理消息消费失败的情况。

 

 

     

© 著作权归作者所有

巨杉数据库

巨杉数据库

粉丝 56
博文 135
码字总数 294787
作品 1
朝阳
数据库管理员
私信 提问
加载中

评论(0)

【入门教程】SequoiaDB+Postgresql数据实时检索最佳实践

1. 背景 SequoiaDB数据实时检索的能力体现在索引和数据切分的使用上,创建合适的索引能够快速查询到具备某一特征数据的能力;合理的切分方式能够提高数据查询性能。比如按时间,按地区去统计...

巨杉数据库
2017/10/20
35
0
巨杉Tech | SparkSQL+SequoiaDB 性能调优策略

当今时代,企业数据越发膨胀。数据是企业的价值,但数据处理也是一种技术挑战。在海量数据处理的场景,即使单机计算能力再强,也无法满足日益增长的数据处理需求。所以,分布式才是解决该类问...

巨杉数据库
2019/10/31
41
0
SequoiaDB(巨杉数据库)成为国内首家Spark认证数据库

近日,Spark的官方博客中刊登了其全球战略合作伙伴SequoiaDB发布的技术博客,介绍SequoiaDB对于Spark的整合以及SequoiaDB+Spark的解决方案。目前,SequoiaDB也成为了Spark官方认证的全球合作...

ark43420
2015/08/04
3
0
数据库SQL和分布式不可兼得?这一架构也许是答案

  选自陈利人老师公众号(待字闺中)   原文|巨杉数据库 王涛   分布式数据库技术发展多年,但是在应用、业务的驱动下,分布式数据库的架构一直在不断发展和演进。   开源金融级分布...

乌镇智库
2018/09/21
0
0
什么是最适合云数据库的架构设计?

分布式数据库技术发展多年,但是在应用、业务的驱动下,分布式数据库的架构一直在不断发展和演进。 开源金融级分布式数据库SequoiaDB,经过6年的研发,坚持从零开始打造数据库核心引擎。在技...

巨杉数据库
2018/09/26
84
0

没有更多内容

加载失败,请刷新页面

加载更多

Java的类反射机制(Java高级)

概述 为什么要用反射:任意调用类中的私有内容,使类的使用更加灵活。 反射:针对性地映射 某一个完整事务的行为或特征(单独操作类中任意内容) 1、反射机制 - 获取对象 什么是类对象? 类对象...

庭前云落
40分钟前
17
0
你知道你自己最经常使用的是哪几个Linux命令呢?

不知道大家自接触 Linux 以来,都使用过哪些命令,其中最常用的命令是什么? 我最常用的命令之一是 sudo ,因为我每天都在使用它在 Linux 上安装、更新、删除软件包以及其它各种需要超级用户权...

老孟的Linux私房菜
41分钟前
36
0
读 《HTML5 揭秘》有感

最近在补一些 HTML 的书籍,偶尔读到这本书,虽然这本书已经是10年以前的书籍了,不过其中有些有趣的知识点与观点被我提取了出来。 标准创建与技术实现冲突 作者在开始就提出了 Mozilla 开发...

jump--jump
43分钟前
37
0
数据结构与算法之美_15_二分查找(上):如何用最省内存的方式实现快速查找功能?

今天学习一种针对有序数据集合的查找算法:二分查找(Binary Search)算法,也叫折半查找。 先看一道思考题,假设我们有 1000 万个整数数据,每个数据占 8 个字节,如何设计数据结构和算法,...

SP_K
今天
69
0
Docker 记录

Docker Docker 分为社区版(Community Edition) 和企业版( Enterprice Edition) 一般使用CE,EE收费 查看linux内核版本: uname -r 查看centos 版本:cat /etc/redhat-release Centos 如果是最...

天空飘来五个字儿
今天
105
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部