文档章节

基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

timeyang
 timeyang
发布于 2017/09/03 17:23
字数 2575
阅读 12
收藏 0
点赞 0
评论 0

基于Java、Kafka、ElasticSearch的搜索框架的设计与实现

Jkes是一个基于Java、Kafka、ElasticSearch的搜索框架。Jkes提供了注解驱动的Hibernate风格的对象/文档映射,使用rest api用于文档查询。

项目主页:https://github.com/chaokunyang/jkes

Jkes工作原理

索引工作原理:

  • 应用启动时,Jkes扫描所有标注@Document注解的实体,为它们构建元数据。
  • 基于构建的元数据,创建indexmappingJson格式的配置,然后通过ElasticSearch Java Rest Client将创建/更新index配置。
  • 为每个文档创建/更新Kafka ElasticSearch Connector,用于创建/更新文档
  • 为整个项目启动/更新Jkes Deleter Connector,用于删除文档
  • 拦截数据操作方法。将* save(*)方法返回的数据包装为SaveEvent保存到EventContainer;使用(* delete*(..)方法的参数,生成一个DeleteEvent/DeleteAllEvent保存到EventContainer
  • 拦截事务。在事务提交后使用JkesKafkaProducer发送SaveEvent中的实体到Kafka,Kafka会使用我们提供的JkesJsonSerializer序列化指定的数据,然后发送到Kafka。
  • SaveEvent不同,DeleteEvent会直接被序列化,然后发送到Kafka,而不是只发送一份数据
  • SaveEventDeleteEvent不同,DeleteAllEvent不会发送数据到Kafka,而是直接通过ElasticSearch Java Rest Client删除相应的index,然后重建该索引,重启Kafka ElasticSearch Connector

查询工作原理:

  • 查询服务通过rest api提供
  • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序API的接入难度
  • 查询服务是一个Spring Boot Application,使用docker打包为镜像
  • 查询服务提供多版本API,用于API进化和兼容
  • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。
  • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

流程图

Jkes流程图

安装

可以参考jkes-integration-test项目快速掌握jkes框架的使用方法。jkes-integration-test是我们用来测试功能完整性的一个Spring Boot Application。

  • 安装jkes-index-connectorjkes-delete-connector到Kafka Connect类路径
  • 安装 Smart Chinese Analysis Plugin
sudo bin/elasticsearch-plugin install analysis-smartcn

配置

  • 引入项目依赖
  • 添加配置
@EnableAspectJAutoProxy
@EnableJkes
@Configuration
public class JkesConfig {

  @Bean
  public PlatformTransactionManager transactionManager(EntityManagerFactory factory, EventSupport eventSupport) {

    return new SearchPlatformTransactionManager(new JpaTransactionManager(factory), eventSupport);
  }
}
  • 提供JkesProperties Bean
@Component
@Configuration
@PropertySource("classpath:jkes.properties")
public class JkesConf extends DefaultJkesPropertiesImpl {

    @Bean
    public static PropertySourcesPlaceholderConfigurer
    propertySourcesPlaceholderConfigurer() {
        return new PropertySourcesPlaceholderConfigurer();
    }

    @PostConstruct
    public void setUp() {
        Config.setJkesProperties(this);
    }


    @Value("${kafka.bootstrap.servers}")
    private String kafkaBootstrapServers;

    @Value("${kafka.connect.servers}")
    private String kafkaConnectServers;

    @Value("${es.bootstrap.servers}")
    private String esBootstrapServers;

    @Value("${document.base_package}")
    private String documentBasePackage;

    @Value("${jkes.client.id}")
    private String clientId;

    @Override
    public String getKafkaBootstrapServers() {
        return replaceDomainNameWithIp(this.kafkaBootstrapServers);
    }

    @Override
    public String getKafkaConnectServers() {
        return kafkaConnectServers;
    }

    @Override
    public String getEsBootstrapServers() {
        return replaceDomainNameWithIp(this.esBootstrapServers);
    }

    @Override
    public String getDocumentBasePackage() {
        return documentBasePackage;
    }

    @Override
    public String getClientId() {
        return clientId;
    }

    public void setKafkaBootstrapServers(String kafkaBootstrapServers) {
        this.kafkaBootstrapServers = kafkaBootstrapServers;
    }

    public void setKafkaConnectServers(String kafkaConnectServers) {
        this.kafkaConnectServers = kafkaConnectServers;
    }

    public void setEsBootstrapServers(String esBootstrapServers) {
        this.esBootstrapServers = esBootstrapServers;
    }

    public void setDocumentBasePackage(String documentBasePackage) {
        this.documentBasePackage = documentBasePackage;
    }

    public void setClientId(String clientId) {
        this.clientId = clientId;
    }

    // mainly used for test. in inner env, use ip directly or domainName if dns parse is available
    private String replaceDomainNameWithIp(String u) {
        StringBuilder stringBuilder = new StringBuilder();
        String[] urls = u.split(",");
        Arrays.stream(urls).forEach(urlStr -> {
            if(urlStr.startsWith("http")) {
                try {
                    URL url = new URL(urlStr);
                    InetAddress address = InetAddress.getByName(url.getHost());
                    String ip = address.getHostAddress();
                    stringBuilder
                            .append(url.getProtocol()).append("://")
                            .append(ip).append(":")
                            .append(url.getPort())
                            .append(",");
                } catch (UnknownHostException | MalformedURLException e) {
                    throw new RuntimeException(e);
                }
            }else {
                String[] split = urlStr.split(":");
                try {
                    stringBuilder.append(HttpUtils.getIpsFromDomainName(split[0])).append(":").append(split[1]).append(",");
                } catch (UnknownHostException e) {
                    throw new RuntimeException(e);
                }
            }
        });

        return stringBuilder.deleteCharAt(stringBuilder.length() - 1).toString();
    }
}
kafka.bootstrap.servers=k1-test.com:9292,k2-test.com:9292,k3-test.com:9292
kafka.connect.servers=http://k1-test.com:8084,http://k2-test.com:8084,http://k3-test.com:8084

es.bootstrap.servers=http://es1-test.com:9200,http://es2-test.com:9200,http://es3-test.com:9200
document.base_package=com.timeyang.jkes.integration_test.domain

jkes.client.id=integration_test

这里可以很灵活,如果使用Spring Boot,可以直接使用@ConfigurationProperties

  • 增加索引管理端点 因为我们不知道客户端使用的哪种web技术,所以索引端点需要在客户端添加。比如在Spring MVC中,可以按照如下方式添加索引端点
@RestController
@RequestMapping("/api/search")
public class SearchEndpoint {

    private Indexer indexer;

    @Autowired
    public SearchEndpoint(Indexer indexer) {
        this.indexer = indexer;
    }

    @RequestMapping(value = "/start_all", method = RequestMethod.POST)
    public void startAll() {
        indexer.startAll();
    }

    @RequestMapping(value = "/start/{entityClassName:.+}", method = RequestMethod.POST)
    public void start(@PathVariable("entityClassName") String entityClassName) {
        indexer.start(entityClassName);
    }

    @RequestMapping(value = "/stop_all", method = RequestMethod.PUT)
    public Map<String, Boolean> stopAll() {
        return indexer.stopAll();
    }

    @RequestMapping(value = "/stop/{entityClassName:.+}", method = RequestMethod.PUT)
    public Boolean stop(@PathVariable("entityClassName") String entityClassName) {
        return indexer.stop(entityClassName);
    }

    @RequestMapping(value = "/progress", method = RequestMethod.GET)
    public Map<String, IndexProgress> getProgress() {
        return indexer.getProgress();
    }

}

快速开始

索引API

  • 实体注解
@Entity
@Document
public class Person extends AuditedEntity {

    // @Id will be identified automatically
    // @Field(type = FieldType.Long)
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @MultiFields(
            mainField = @Field(type = FieldType.Text),
            otherFields = {
                    @InnerField(suffix = "raw", type = FieldType.Keyword),
                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
            }
    )
    private String name;

    @Field(type = FieldType.Keyword)
    private String gender;

    @Field(type = FieldType.Integer)
    private Integer age;

    // don't add @Field to test whether ignored
    // @Field(type = FieldType.Text)
    private String description;

    @Field(type = FieldType.Object)
    @ManyToOne(fetch = FetchType.EAGER)
    @JoinColumn(name = "group_id")
    private PersonGroup personGroup;

    public Long getId() {
        return id;
    }

    public String getName() {
        return name;
    }

    public String getGender() {
        return gender;
    }

    public Integer getAge() {
        return age;
    }

    public String getDescription() {
        return description;
    }

    public PersonGroup getPersonGroup() {
        return personGroup;
    }
}
@Entity
@Document(type = "person_group", alias = "person_group_alias")
public class PersonGroup extends AuditedEntity {
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;
    private String name;
    private String interests;
    @OneToMany(fetch = FetchType.EAGER, cascade = CascadeType.ALL, mappedBy = "personGroup", orphanRemoval = true)
    private List<Person> persons;
    private String description;

    @DocumentId
    @Field(type = FieldType.Long)
    public Long getId() {
        return id;
    }

    @MultiFields(
            mainField = @Field(type = FieldType.Text),
            otherFields = {
                    @InnerField(suffix = "raw", type = FieldType.Keyword),
                    @InnerField(suffix = "english", type = FieldType.Text, analyzer = "english")
            }
    )
    public String getName() {
        return name;
    }

    @Field(type = FieldType.Text)
    public String getInterests() {
        return interests;
    }

    @Field(type = FieldType.Nested)
    public List<Person> getPersons() {
        return persons;
    }

    /**
     * 不加Field注解,测试序列化时是否忽略
     */
    public String getDescription() {
        return description;
    }
}

setter方法在这里被省略。

查询API使用

  • URI query
curl -XPOST localhost:9000/api/v1/integration_test_person_group/person_group/_search?from=3&size=10
  • Nested query
integration_test_person_group/person_group/_search?from=0&size=10
{
  "query": {
    "nested": {
      "path": "persons",
      "score_mode": "avg",
      "query": {
        "bool": {
          "must": [
            {
              "range": {
                "persons.age": {
                  "gt": 5
                }
              }
            }
          ]
        }
      }
    }
  }
}
  • match query
integration_test_person_group/person_group/_search?from=0&size=10
{
  "query": {
      "match": {
        "interests": "Hadoop"
      }
    }
}
  • bool query
{
  "query": {
    "bool" : {
      "must" : {
        "match" : { "interests" : "Hadoop" }
      },
      "filter": {
        "term" : { "name.raw" : "name0" }
      },
      "should" : [
        { "match" : { "interests" : "Flink" } },
        {
            "nested" : {
                "path" : "persons",
                "score_mode" : "avg",

                "query" : {
                    "bool" : {
                        "must" : [
                        { "match" : {"persons.name" : "name40"} },
                        { "match" : {"persons.interests" : "interests"} }
                        ],
                        "must_not" : {
                            "range" : {
                              "age" : { "gte" : 50, "lte" : 60 }
                            }
                          }
                    }
                }
            }
        }

      ],
      "minimum_should_match" : 1,
      "boost" : 1.0
    }

  }

}
  • Source filtering
integration_test_person_group/person_group/_search
{
    "_source": false,
    "query" : {
        "match" : { "name" : "name17" }
    }
}
integration_test_person_group/person_group/_search
{
    "_source": {
            "includes": [ "name", "persons.*" ],
            "excludes": [ "date*", "version", "persons.age" ]
        },
    "query" : {
        "match" : { "name" : "name17" }
    }
}
  • prefix
integration_test_person_group/person_group/_search
{ 
  "query": {
    "prefix" : { "name" : "name" }
  }
}
  • wildcard
integration_test_person_group/person_group/_search
{
    "query": {
        "wildcard" : { "name" : "name*" }
    }
}
  • regexp
integration_test_person_group/person_group/_search
{
    "query": {
        "regexp":{
            "name": "na.*17"
        }
    }
}

模块介绍

jkes-core

jkes-core是整个jkes的核心部分。主要包括以下功能:

  • annotation包提供了jkes的核心注解
  • elasticsearch包封装了elasticsearch相关的操作,如为所有的文档创建/更新索引,更新mapping
  • kafka包提供了Kafka 生产者,Kafka Json Serializer,Kafka Connect Client
  • metadata包提供了核心的注解元数据的构建与结构化模型
  • event包提供了事件模型与容器
  • exception包提供了常见的Jkes异常
  • http包基于Apache Http Client封装了常见的http json请求
  • support包暴露了Jkes核心配置支持
  • util包提供了一些工具类,便于开发。如:Asserts, ClassUtils, DocumentUtils, IOUtils, JsonUtils, ReflectionUtils, StringUtils

jkes-boot

jkes-boot用于与一些第三方开源框架进行集成。

当前,我们通过jkes-spring-data-jpa,提供了与spring data jpa的集成。通过使用Spring的AOP机制,对Repository方法进行拦截,生成SaveEvent/DeleteEvent/DeleteAllEvent保存到EventContainer。通过使用我们提供的SearchPlatformTransactionManager,对常用的事务管理器(如JpaTransactionManager)进行包装,提供事务拦截功能。

在后续版本,我们会提供与更多框架的集成。

jkes-spring-data-jpa说明:

  • ContextSupport类用于从bean工厂获取Repository Bean
  • @EnableJkes让客户端能够轻松开启Jkes的功能,提供了与Spring一致的配置模型
  • EventSupport处理事件的细节,在保存和删除数据时生成相应事件存放到EventContainer,在事务提交和回滚时处理相应的事件
  • SearchPlatformTransactionManager包装了客户端的事务管理器,在事务提交和回滚时加入了回调hook
  • audit包提供了一个简单的AuditedEntity父类,方便添加审计功能,版本信息可用于结合ElasticSearch的版本机制保证不会索引过期文档数据
  • exception包封装了常见异常
  • intercept包提供了AOP切点和切面
  • index包提供了全量索引功能。当前,我们提供了基于线程池的索引机制和基于ForkJoin的索引机制。在后续版本,我们会重构代码,增加基于阻塞队列生产者-消费者模式,提供并发性能

jkes-services

jkes-services主要用来提供一些服务。 目前,jkes-services提供了以下服务:

  • jkes-delete-connector

    • jkes-delete-connector是一个Kafka Connector,用于从kafka集群获取索引删除事件(DeleteEvent),然后使用Jest Client删除ElasticSearch中相应的文档。

    • 借助于Kafka Connect的rest admin api,我们轻松地实现了多租户平台上的文档删除功能。只要为每个项目启动一个jkes-delete-connector,就可以自动处理该项目的文档删除工作。避免了每启动一个新的项目,我们都得手动启动一个Kafka Consumer来处理该项目的文档删除工作。尽管可以通过正则订阅来减少这样的工作,但是还是非常不灵活

  • jkes-search-service

    • jkes-search-service是一个restful的搜索服务,提供了多版本的rest query api。查询服务提供多版本API,用于API进化和兼容
    • jkes-search-service目前支持URI风格的搜索和JSON请求体风格的搜索。
    • 我们没有直接使用ElasticSearch进行查询,因为我们需要在后续版本使用机器学习进行搜索排序,而直接与ElasticSearch进行耦合,会增加搜索排序的接入难度
    • 查询服务是一个Spring Boot Application,使用docker打包为镜像
    • 查询服务解析json请求,进行一些预处理后,使用ElasticSearch Java Rest Client转发到ElasticSearch,将得到的响应进行解析,进一步处理后返回到客户端。
    • 为了便于客户端人员开发,查询服务提供了一个查询UI界面,开发人员可以在这个页面得到预期结果后再把json请求体复制到程序中。

后续,我们将会基于zookeeper构建索引集群,提供集群索引管理功能

jkes-integration-test

jkes-integration-test是一个基于Spring Boot集成测试项目,用于进行功能测试。同时测量一些常见操作的吞吐率

开发

To build a development version you'll need a recent version of Kafka. You can build jkes with Maven using the standard lifecycle phases.

Contribute

  • Source Code: https://github.com/chaokunyang/jkes
  • Issue Tracker: https://github.com/chaokunyang/jkes/issues

LICENSE

This project is licensed under Apache License 2.0.

© 著作权归作者所有

共有 人打赏支持
timeyang
粉丝 0
博文 1
码字总数 2575
作品 1
成都
Grafana、elasticsearch、kafka、logstash和pinpoint结合

一、Grafana 1)下载安装 wget https://s3-us-west-2.amazonaws.com/grafana-releases/release/grafana-4.2.0-1.x8664.rpm sudo yum localinstall grafana-4.2.0-1.x86_64.rpm 2)启动 serv......

半船水
2017/10/25
0
0
Java搜索引擎选择: Elasticsearch与Solr(转)

Elasticsearch简介 Elasticsearch是一个实时的分布式搜索和分析引擎。它可以帮助你用前所未有的速度去处理大规模数据。 它可以用于全文搜索,结构化搜索以及分析,当然你也可以将这三者进行组...

easonjim
2017/11/13
0
0
Centos6搭建elk系统,监控IIS日志

**所需程序: 服务器端:java、elasticsearch、kikbana 客 户 端:IIS、logstash** 一、服务器端(192.168.10.46)操作: 先建立一个ELK专门的目录: [root@Cent65 ~]mkdir /elk/ 上传到elk...

D杀手D
04/24
0
0
elasticsearch 集群部署

Elasticsearch是一个分布式搜索服务,提供Restful API,底层基于Lucene,采用多shard的方式保证数据安全,并且提供自动resharding的功能,github等大型的站点也都采用Elasticsearch作为其搜索...

wx5b30a7c097b85
06/27
0
0
linux上Elasticsearch 安装配置、网页访问

一、ElasticSearch简述 ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTful web接口。Elasticsearch是用Java开发的,并作为Apache许可条...

yanfengwang
06/26
0
0
Elasticsearch入门实践

一. 系统环境 操作系统:CentOS release 6.8 (Final) ES版本:6.1.1 二. 安装 先确认安装了Java运行时环境: 解压ES压缩包: 三. 启动 1. 启动ES单节点 当然,对于在后台以守护进程模式运行的...

哲别0
06/06
0
0
Elasticsearch 入门(1)

Elasticsearch是什么 Elasticsearch是基于Apache Lucene的开源搜索引擎。Lucene是可以被认为迄今最好的搜索引擎库。 但是,Lucene只是一个库,你需要Java开发语言基础,更糟糕的是,Lucene非...

Favour
2017/10/27
0
0
ElasticSearch入门 —— 集群搭建

一、环境介绍与安装准备 1、环境说明 2台虚拟机,OS为ubuntu13.04,ip分别为xxx.xxx.xxx.140和xxx.xxx.xxx.145。 2、安装准备 ElasticSearch(简称ES)由java语言实现,运行环境依赖java。E...

萧十一郎君
2014/04/30
0
3
logstash+elasticsearch +kibana 日志管理系统

Logstash是一个完全开源的工具,他可以对你的日志进行收集、分析,并将其存储供以后使用(如,搜索),您可以使用它。说到搜索,logstash带有一个web界面,搜索和展示所有日志。 kibana 也是...

xiaoyuan234
07/04
0
0
ELK实时日志分析平台部署搭建详细实现过程

原文地址:http://www.linuxidc.com/Linux/2016-09/135137.htm 1、ELK平台介绍 在搜索ELK资料的时候,发现这篇文章比较好,于是摘抄一小段: 日志主要包括系统日志、应用程序日志和安全日志。...

tanga842428
2017/07/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式 Factory工厂模式 Singleton单例模式 Delegate委派模式 Strategy策略模式 Prototype原型模式 Template模板模式 Spring5 beans 接口实例化 代理Bean操作 ...

小致dad
17分钟前
0
0
SpringBoot | 第十章:Swagger2的集成和使用

前言 前一章节介绍了mybatisPlus的集成和简单使用,本章节开始接着上一章节的用户表,进行Swagger2的集成。现在都奉行前后端分离开发和微服务大行其道,分微服务及前后端分离后,前后端开发的...

oKong
今天
9
0
Python 最小二乘法 拟合 二次曲线

Python 二次拟合 随机生成数据,并且加上噪声干扰 构造需要拟合的函数形式,使用最小二乘法进行拟合 输出拟合后的参数 将拟合后的函数与原始数据绘图后进行对比 import numpy as npimport...

阿豪boy
今天
12
0
云拿 无人便利店

附近(上海市-航南路)开了家无人便利店.特意进去体验了一下.下面把自己看到的跟大家分享下. 经得现场工作人员同意后拍了几张照片.从外面看是这样.店门口的指导里强调:不要一次扫码多个人进入....

周翔
昨天
1
0
Java设计模式学习之工厂模式

在Java(或者叫做面向对象语言)的世界中,工厂模式被广泛应用于项目中,也许你并没有听说过,不过也许你已经在使用了。 简单来说,工厂模式的出现源于增加程序序的可扩展性,降低耦合度。之...

路小磊
昨天
203
1
npm profile 新功能介绍

转载地址 npm profile 新功能介绍 npm新版本新推来一个功能,npm profile,这个可以更改自己简介信息的命令,以后可以不用去登录网站来修改自己的简介了 具体的这个功能的支持大概是在6这个版...

durban
昨天
1
0
Serial2Ethernet Bi-redirection

Serial Tool Serial Tool is a utility for developing serial communications, custom protocols or device testing. You can set up bytes to send accordingly to your protocol and save......

zungyiu
昨天
1
0
python里求解物理学上的双弹簧质能系统

物理的模型如下: 在这个系统里有两个物体,它们的质量分别是m1和m2,被两个弹簧连接在一起,伸缩系统为k1和k2,左端固定。假定没有外力时,两个弹簧的长度为L1和L2。 由于两物体有重力,那么...

wangxuwei
昨天
0
0
apolloxlua 介绍

##项目介绍 apolloxlua 目前支持javascript到lua的翻译。可以在openresty和luajit里使用。这个工具分为两种模式, 一种是web模式,可以通过网页使用。另外一种是tool模式, 通常作为大规模翻...

钟元OSS
昨天
2
0
Mybatis入门

简介: 定义:Mybatis是一个支持普通SQL查询、存储过程和高级映射的持久层框架。 途径:MyBatis通过XML文件或者注解的形式配置映射,实现数据库查询。 特性:动态SQL语句。 文件结构:Mybat...

霍淇滨
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部