InfluxDB是一个由InfluxData开发的开源时序数据库,专注于海量时序数据的高性能读、写、高效存储与实时分析等,在DB-Engines Ranking时序型数据库排行榜上常年排名第一。
InfluxDB可以说是当之无愧的佼佼者,但 InfluxDB CTO Paul 在 2020/12/10 号在博客中发表一篇名为:Announcing InfluxDB IOx – The Future Core of InfluxDB Built with Rust and Arrow的文章,介绍了一个新项目 InfluxDB IOx,InfluxDB 的下一代时序引擎。
接下来,我将连载对于InfluxDB IOx的源码解析过程,欢迎各位批评指正,联系方式见文章末尾。
上一章说到如何创建一个数据库,并且数据库的描述信息是如何保存的。详情见:https://my.oschina.net/u/3374539/blog/5025128
这一章记录一下,数据是如何写入并保存的,具体会分为两篇来写:
- 一篇介绍分区是如何完成的
- 一篇介绍具体的写入
说到数据写入,必然是需要能够连接到服务器。IOx
项目为提供了多种方式可以于服务器进行交互,分别是Grpc
和Http
基于这两种通信方式,又扩展支持了influxdb2_client
以及influxdb_iox_client
。
基于influxdb_iox_client
我写了一个数据写入及查询的示例来观测接口是如何组织的,代码如下:
#[tokio::main]
async fn main() {
{
let connection = Builder::default()
.build("http://127.0.0.1:8081")
.await
.unwrap();
write::Client::new(connection)
.write("a", r#"myMeasurement,tag1=value1,tag2=value2 fieldKey="123" 1556813561098000000"#)
.await
.expect("failed to write data");
}
let connection = Builder::default()
.build("http://127.0.0.1:8081")
.await
.unwrap();
let mut query = flight::Client::new(connection)
.perform_query("a", "select * from myMeasurement")
.await
.expect("query request should work");
let mut batches = vec![];
while let Some(data) = query.next().await.expect("valid batches") {
batches.push(data);
}
let format1 = format::QueryOutputFormat::Pretty;
println!("{}", format1.format(&batches).unwrap());
}
+------------+--------+--------+-------------------------+
| fieldKey | tag1 | tag2 | time |
+------------+--------+--------+-------------------------+
| 123 | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123 | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| fieldValue | value1 | value2 | 2019-05-02 16:12:41.098 |
| 123 | value1 | value2 | 2019-05-02 16:12:41.098 |
+------------+--------+--------+-------------------------+
因为我多运行了几次,所以能看到数据被重复插入了。
这里还需要说一下的是写入的语句格式可以参见:
[LineProtocol] https://docs.influxdata.com/influxdb/v2.0/reference/syntax/line-protocol/#data-types-and-format
write::Client
中的write
方法生成了一个WriteRequest
结构,并使用RPC
调用远程的write
方法。打开src/influxdb_ioxd/rpc/write.rs : 22行
可以看到方法的具体实现。
async fn write(
&self,
request: tonic::Request<WriteRequest>,
) -> Result<tonic::Response<WriteResponse>, tonic::Status> {
let request = request.into_inner();
//得到上面在客户端中写入的数据库名字,在上面的例子中传入的"a"
let db_name = request.db_name;
//这里得到了写入的LineProtocol
let lp_data = request.lp_data;
let lp_chars = lp_data.len();
//解析LineProtocol的内容
//示例中的lp会被解析为:
//measurement: "myMeasurement"
//tag_set: [("tag1", "value1"), ("tag2", "value2")]
//field_set: [("fieldKey", "123")]
//timestamp: 1556813561098000000
let lines = parse_lines(&lp_data)
.collect::<Result<Vec<_>, influxdb_line_protocol::Error>>()
.map_err(|e| FieldViolation {
field: "lp_data".into(),
description: format!("Invalid Line Protocol: {}", e),
})?;
let lp_line_count = lines.len();
debug!(%db_name, %lp_chars, lp_line_count, "Writing lines into database");
//对数据进行保存
self.server
.write_lines(&db_name, &lines)
.await
.map_err(default_server_error_handler)?;
//返回成功
let lines_written = lp_line_count as u64;
Ok(Response::new(WriteResponse { lines_written }))
}
继续看self.server.write_lines
的执行:
pub async fn write_lines(&self, db_name: &str, lines: &[ParsedLine<'_>]) -> Result<()> {
self.require_id()?;
//验证一下名字,然后拿到之前创建数据库时候在内存中存储的相关信息
let db_name = DatabaseName::new(db_name).context(InvalidDatabaseName)?;
let db = self
.config
.db(&db_name)
.context(DatabaseNotFound { db_name: &*db_name })?;
//这里就开始执行分片相关的策略
let (sharded_entries, shards) = {
//读取创建数据库时候配置的分片策略
let rules = db.rules.read();
let shard_config = &rules.shard_config;
//根据数据和shard策略,把逐个数据对应的分区找到
//写入到一个List<分区标识,List<数据>>这样的结构中
//具体的结构信息后面看
let sharded_entries = lines_to_sharded_entries(lines, shard_config.as_ref(), &*rules)
.context(LineConversion)?;
//再把所有分区的配置返回给调用者
let shards = shard_config
.as_ref()
.map(|cfg| Arc::clone(&cfg.shards))
.unwrap_or_default();
(sharded_entries, shards)
};
//根据上面返回的集合进行map方法遍历,写到每个分区中
futures_util::future::try_join_all(
sharded_entries
.into_iter()
.map(|e| self.write_sharded_entry(&db_name, &db, Arc::clone(&shards), e)),
)
.await?;
Ok(())
}
这里描述了写入一条数据的主逻辑:数据写入的时候,先把数据划分到具体的分区里(使用List结构存储下所有的分区对应的数据),然后并行的进行数据写入
接下来看,数据是如何进行分区的:
pub fn lines_to_sharded_entries(
lines: &[ParsedLine<'_>],
sharder: Option<&impl Sharder>,
partitioner: &impl Partitioner,
) -> Result<Vec<ShardedEntry>> {
let default_time = Utc::now();
let mut sharded_lines = BTreeMap::new();
//对所有要插入的数据进行遍历
for line in lines {
//先找到符合哪个shard
let shard_id = match &sharder {
Some(s) => Some(s.shard(line).context(GeneratingShardId)?),
None => None,
};
//再判断属于哪个分区
let partition_key = partitioner
.partition_key(line, &default_time)
.context(GeneratingPartitionKey)?;
let table = line.series.measurement.as_str();
//最后存储到一个map中
//shard-> partition -> table -> List<data> 的映射关系
sharded_lines
.entry(shard_id)
.or_insert_with(BTreeMap::new)
.entry(partition_key)
.or_insert_with(BTreeMap::new)
.entry(table)
.or_insert_with(Vec::new)
.push(line);
}
let default_time = Utc::now();
//最后遍历这个map 转换到之前提到的List结构中
let sharded_entries = sharded_lines
.into_iter()
.map(|(shard_id, partitions)| build_sharded_entry(shard_id, partitions, &default_time))
.collect::<Result<Vec<_>>>()?;
Ok(sharded_entries)
}
这里理解shard
的概念就是一个或者一组机器,称为一个shard
,他们负责真正的存储数据。
partition
理解为一个个文件夹,在shard
上具体的存储路径。
这里看一下是怎样完成shard
的划分的:
impl Sharder for ShardConfig {
fn shard(&self, line: &ParsedLine<'_>) -> Result<ShardId, Error> {
if let Some(specific_targets) = &self.specific_targets {
//如果对数据进行匹配,如果符合规则就返回,可以采用当前的shard
//官方的代码中只实现了根据表名进行shard的策略
//这个配置似乎只能通过grpc来进行设置,这样好处可能是将来有个什么管理界面能动态修改
if specific_targets.matcher.match_line(line) {
return Ok(specific_targets.shard);
}
}
//如果没有配置就使用hash的方式
//对整条数据进行hash,然后比较机器的hash,找到合适的节点
//如果没找到,就放在hashring的第一个节点
//hash算法见后面
if let Some(hash_ring) = &self.hash_ring {
return hash_ring
.shards
.find(LineHasher { line, hash_ring })
.context(NoShardsDefined);
}
NoShardingRuleMatches {
line: line.to_string(),
}
.fail()
}
}
//具体的Hash算法,如果全配置的话分的就会特别散,几乎不同测点都放到了不同的地方
impl<'a, 'b, 'c> Hash for LineHasher<'a, 'b, 'c> {
fn hash<H: Hasher>(&self, state: &mut H) {
//如果配置了使用table名字就在hash中加入tablename
if self.hash_ring.table_name {
self.line.series.measurement.hash(state);
}
//然后按照配置的列的值进行hash
for column in &self.hash_ring.columns {
if let Some(tag_value) = self.line.tag_value(column) {
tag_value.hash(state);
} else if let Some(field_value) = self.line.field_value(column) {
field_value.to_string().hash(state);t
}
state.write_u8(0); // column separator
}
}
}
接下来看默认的partition
分区方式:
impl Partitioner for PartitionTemplate {
fn partition_key(&self, line: &ParsedLine<'_>, default_time: &DateTime<Utc>) -> Result<String> {
let parts: Vec<_> = self
.parts
.iter()
//匹配分区策略,或者是单一的,或者是复合的
//目前支持基于表、值、时间
//其余还会支持正则表达式和strftime模式
.map(|p| match p {
TemplatePart::Table => line.series.measurement.to_string(),
TemplatePart::Column(column) => match line.tag_value(&column) {
Some(v) => format!("{}_{}", column, v),
None => match line.field_value(&column) {
Some(v) => format!("{}_{}", column, v),
None => "".to_string(),
},
},
TemplatePart::TimeFormat(format) => match line.timestamp {
Some(t) => Utc.timestamp_nanos(t).format(&format).to_string(),
None => default_time.format(&format).to_string(),
},
_ => unimplemented!(),
})
.collect();
//最后返回一个组合文件名,或者是 a-b-c 或者是一个单一的值
Ok(parts.join("-"))
}
}
到这里分区的工作就完成了,下一篇继续分析是怎样写入的。
祝玩儿的开心
欢迎关注微信公众号:
或添加微信好友: liutaohua001