时序数据库Influx-IOx源码学习六-1(数据写入之分区)

原创
2021/04/20 17:08
阅读数 1.2W

InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。

InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。

接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。


上一章说到如何创建一个数据库,并且数据库的描述信息是如何保存的。详情见:https://my.oschina.net/u/3374539/blog/5025128

这一章记录一下,数据是如何写入并保存的,具体会分为两篇来写:

  • 一篇介绍分区是如何完成的
  • 一篇介绍具体的写入

说到数据写入,必然是需要能够连接到服务器。IOx项目为提供了多种方式可以于服务器进行交互,分别是GrpcHttp基于这两种通信方式,又扩展支持了influxdb2_client以及influxdb_iox_client

基于influxdb_iox_client我写了一个数据写入及查询的示例来观测接口是如何组织的,代码如下:

#[tokio::main]
async fn main() {
    {
        let connection = Builder::default()
            .build("http://127.0.0.1:8081")
            .await
            .unwrap();
        write::Client::new(connection)
            .write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#)
            .await
            .expect("failed to write data");
    }

    let connection = Builder::default()
        .build("http://127.0.0.1:8081")
        .await
        .unwrap();

    let mut query = flight::Client::new(connection)
        .perform_query("a", "select * from myMeasurement")
        .await
        .expect("query request should work");

    let mut batches = vec![];

    while let Some(data) = query.next().await.expect("valid batches") {
        batches.push(data);
    }

    let format1 = format::QueryOutputFormat::Pretty;
    println!("{}", format1.format(&batches).unwrap());
}


+------------+--------+--------+-------------------------+
| fieldKey   | tag1   | tag2   | time                    |
+------------+--------+--------+-------------------------+
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123        | value1 | value2 | 2019-05-02 16:12:41.098 |
+------------+--------+--------+-------------------------+

因为我多运行了几次,所以能看到数据被重复插入了。

这里还需要说一下的是写入的语句格式可以参见:

[LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format


write::Client中的write方法生成了一个WriteRequest结构,并使用RPC调用远程的write方法。打开src/influxdb_ioxd/rpc/write.rs : 22行可以看到方法的具体实现。

async fn write(
        &self,
        request: tonic::Request<WriteRequest>,
    ) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
        let request = request.into_inner();
        //得到上面在客户端中写入的数据库名字,在上面的例子中传入的"a"
        let db_name = request.db_name;
        //这里得到了写入的LineProtocol
        let lp_data = request.lp_data;
        let lp_chars = lp_data.len();
        //解析LineProtocol的内容
        //示例中的lp会被解析为:
        //measurement: "myMeasurement"
        //tag_set: [("tag1", "value1"), ("tag2", "value2")]
        //field_set: [("fieldKey", "123")]
        //timestamp: 1556813561098000000
        let lines = parse_lines(&lp_data)
            .collect::<Result<Vec<_>, influxdb_line_protocol::Error>>()
            .map_err(|e| FieldViolation {
                field: "lp_data".into(),
                description: format!("Invalid Line Protocol: {}", e),
            })?;

        let lp_line_count = lines.len();
        debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database");
        //对数据进行保存
        self.server
            .write_lines(&db_name, &lines)
            .await
            .map_err(default_server_error_handler)?;
        //返回成功
        let lines_written = lp_line_count as u64;
        Ok(Response::new(WriteResponse { lines_written }))
    }

继续看self.server.write_lines的执行:

 pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
        self.require_id()?;
        //验证一下名字,然后拿到之前创建数据库时候在内存中存储的相关信息
        let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
        let db = self
            .config
            .db(&db_name)
            .context(DatabaseNotFound { db_name: &*db_name })?;
        //这里就开始执行分片相关的策略
        let (sharded_entries, shards) = {
            //读取创建数据库时候配置的分片策略
            let rules = db.rules.read();
            let shard_config = &rules.shard_config;
            //根据数据和shard策略,把逐个数据对应的分区找到
            //写入到一个List<分区标识,List<数据>>这样的结构中
            //具体的结构信息后面看
            let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules)
                .context(LineConversion)?;
            //再把所有分区的配置返回给调用者
            let shards = shard_config
                .as_ref()
                .map(|cfg| Arc::clone(&cfg.shards))
                .unwrap_or_default();

            (sharded_entries, shards)
        };

        //根据上面返回的集合进行map方法遍历,写到每个分区中
        futures_util::future::try_join_all(
            sharded_entries
                .into_iter()
                .map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
        )
        .await?;

        Ok(())
    }

这里描述了写入一条数据的主逻辑:数据写入的时候,先把数据划分到具体的分区里(使用List结构存储下所有的分区对应的数据),然后并行的进行数据写入

接下来看,数据是如何进行分区的:

pub fn lines_to_sharded_entries(
    lines: &[ParsedLine<'_>],
    sharder: Option<&impl Sharder>,
    partitioner: &impl Partitioner,
) -> Result<Vec<ShardedEntry>> {
    let default_time = Utc::now();
    let mut sharded_lines = BTreeMap::new();

    //对所有要插入的数据进行遍历
    for line in lines {
        //先找到符合哪个shard
        let shard_id = match &sharder {
            Some(s) => Some(s.shard(line).context(GeneratingShardId)?),
            None => None,
        };
        //再判断属于哪个分区
        let partition_key = partitioner
            .partition_key(line, &default_time)
            .context(GeneratingPartitionKey)?;
        let table = line.series.measurement.as_str();
        //最后存储到一个map中
        //shard-> partition -> table -> List<data> 的映射关系
        sharded_lines
            .entry(shard_id)
            .or_insert_with(BTreeMap::new)
            .entry(partition_key)
            .or_insert_with(BTreeMap::new)
            .entry(table)
            .or_insert_with(Vec::new)
            .push(line);
    }

    let default_time = Utc::now();
    //最后遍历这个map 转换到之前提到的List结构中
    let sharded_entries = sharded_lines
        .into_iter()
        .map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time))
        .collect::<Result<Vec<_>>>()?;
    Ok(sharded_entries)
}

这里理解shard的概念就是一个或者一组机器,称为一个shard,他们负责真正的存储数据。

partition理解为一个个文件夹,在shard上具体的存储路径。

这里看一下是怎样完成shard的划分的:

impl Sharder for ShardConfig {
    fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> {
        if let Some(specific_targets) = &self.specific_targets {
            //如果对数据进行匹配,如果符合规则就返回,可以采用当前的shard
            //官方的代码中只实现了根据表名进行shard的策略
            //这个配置似乎只能通过grpc来进行设置,这样好处可能是将来有个什么管理界面能动态修改
            if specific_targets.matcher.match_line(line) {
                return Ok(specific_targets.shard);
            }
        }
        //如果没有配置就使用hash的方式
        //对整条数据进行hash,然后比较机器的hash,找到合适的节点
        //如果没找到,就放在hashring的第一个节点
        //hash算法见后面
        if let Some(hash_ring) = &self.hash_ring {
            return hash_ring
                .shards
                .find(LineHasher { line, hash_ring })
                .context(NoShardsDefined);
        }

        NoShardingRuleMatches {
            line: line.to_string(),
        }
        .fail()
    }
}
//具体的Hash算法,如果全配置的话分的就会特别散,几乎不同测点都放到了不同的地方
impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> {
    fn hash<H: Hasher>(&self, state: &mut H) {
        //如果配置了使用table名字就在hash中加入tablename
        if self.hash_ring.table_name {
            self.line.series.measurement.hash(state);
        }
        //然后按照配置的列的值进行hash
        for column in &self.hash_ring.columns {
            if let Some(tag_value) = self.line.tag_value(column) {
                tag_value.hash(state);
            } else if let Some(field_value) = self.line.field_value(column) {
                field_value.to_string().hash(state);t
            }
            state.write_u8(0); // column separator
        }
    }
}

接下来看默认的partition分区方式:

impl Partitioner for PartitionTemplate {
    fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> {
        let parts: Vec<_> = self
            .parts
            .iter()
             //匹配分区策略,或者是单一的,或者是复合的
             //目前支持基于表、值、时间
             //其余还会支持正则表达式和strftime模式
            .map(|p| match p {
                TemplatePart::Table => line.series.measurement.to_string(),
                TemplatePart::Column(column) => match line.tag_value(&column) {
                    Some(v) => format!("{}_{}", column, v),
                    None => match line.field_value(&column) {
                        Some(v) => format!("{}_{}", column, v),
                        None => "".to_string(),
                    },
                },
                TemplatePart::TimeFormat(format) => match line.timestamp {
                    Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(),
                    None => default_time.format(&format).to_string(),
                },
                _ => unimplemented!(),
            })
            .collect();
        //最后返回一个组合文件名,或者是 a-b-c 或者是一个单一的值
        Ok(parts.join("-"))
    }
}

到这里分区的工作就完成了,下一篇继续分析是怎样写入的。

祝玩儿的开心


欢迎关注微信公众号:

或添加微信好友: liutaohua001

展开阅读全文
加载中
点击加入讨论🔥(2) 发布并加入讨论🔥
2 评论
0 收藏
1
分享
返回顶部
顶部