踩坑 rust 的 partial copy 导致 metrics 丢失

原创
01/04 18:01
阅读数 21

在 RisingWave 的存储代码中,我们使用 RAII 的思想来对 LSM iterator 的 metrics 进行监控,从而避免在代码中忘记上报 metrics 而导致 metrics 丢失。在实现中,我们设计了一个 rust 的 struct MonitoredStateStoreIterStats 去收集 LSM iterator 的 metrics,去统计 iterator 中 key 的数量和长度,并为这个 struct 实现了 Drop,在这个 struct 被释放的时候把在本地收集的 metrics 上报 prometheus。通过这种方式,我们不需要在每次 iterator 使用完后都手动上报 metrics,从而避免了由于代码的疏忽导致忘记上报 metrics。

以下是一段简化过的代码。我们通过 try_stream 这个宏来封装一个 iterator 的 stream 来收集这个 stream 的 metrics。在返回的 stream 被释放时,stats 随着 stream 被释放,并调用其 drop 方法来上报收集到的 metrics。

pub struct MonitoredStateStoreIter<S> {
    inner: S,
    stats: MonitoredStateStoreIterStats,
}

struct MonitoredStateStoreIterStats {
    total_items: usize,
    total_size: usize,
    storage_metrics: Arc<MonitoredStorageMetrics>,
}

impl<S: StateStoreIterItemStream> MonitoredStateStoreIter<S> {
    #[try_stream(ok = StateStoreIterItem, error = StorageError)]
    async fn into_stream_inner(mut self) {
        let inner = self.inner;
        futures::pin_mut!(inner);
        while let Some((key, value)) = inner
            .try_next()
            .await
            .inspect_err(|e| error!("Failed in next: {:?}", e))?
        {
            self.stats.total_items += 1;
            self.stats.total_size += key.encoded_len() + value.len();
            yield (key, value);
        }
    }
}

impl Drop for MonitoredStateStoreIterStats {
    fn drop(&mut self) {
        self.storage_metrics
            .iter_item
            .observe(self.total_items as f64);
        self.storage_metrics
            .iter_size
            .observe(self.total_size as f64);
    }
}

然而,在使用过程中,我们遇到了上报的 metrics 全部为 0 的问题。

1. 最小复现

由于使用了 try_stream 宏来生成 stream,因此我们怀疑在 try_stream 生成的代码中有 bug 导致 metrics 丢失。于是我们用 cargo-expand 来将查看宏生成的代码。展开后的代码如下:

fn into_stream_inner(
    mut self,
) -> impl Stream<Item = StorageResult<StateStoreIterItem>
> {
    ::futures_async_stream::__private::try_stream::from_generator(static move |
        mut __task_context: ::futures_async_stream::__private::future::ResumeTy,
    | -> ::futures_async_stream::__private::Result<(), StorageError> {
        let (): () = {
            let inner = self.inner;
            let mut inner = inner;
            #[allow(unused_mut)]
                let mut inner = unsafe {
                ::pin_utils::core_reexport::pin::Pin::new_unchecked(
                    &mut inner,
                )
            };
            while let Some((key, value))
                = {
                let mut __pinned = inner.try_next();
                loop {
                    if let ::futures_async_stream::__private::Poll::Ready(
                        result,
                    ) = unsafe {
                        poll(Pin::as_mut(&mut __pinned), get_context(__task_context))
                    } {
                        break result;
                    }
                    __task_context = (yield ::futures_async_stream::__private::Poll::Pending);
                }
            }?
            {
                self.stats.total_items += 1;
                self.stats.total_size += key.encoded_len() + value.len();
                __task_context = (yield ::futures_async_stream::__private::Poll::Ready((
                    key,
                    value,
                )));
            }
        };
        #[allow(unreachable_code)]
        {
            return ::futures_async_stream::__private::Ok(());
            loop {
                __task_context = (yield ::futures_async_stream::__private::Poll::Pending);
            }
        }
    })
}

可以看到, try_stream 宏生成的代码中,包含了一个 rust generator 的闭包。闭包中收集和上报 metrics 的逻辑与原代码基本相同,按照我们对 rust 的理解,仍然不应该会出现 metrics 丢失的问题。因此我们怀疑是 rust 编译器中与 generator 相关的逻辑存在问题。在 rust playground 上,我们尝试了以下代码来对问题进行复现。

struct Stat {
    count: usize,
    vec: Vec<u8>,
}

impl Drop for Stat {
    fn drop(&mut self) {
        println!("count: {}", self.count);
    }
}

fn main() {

    let mut stat = Stat {
        count: 0,
        vec: Vec::new(),
    };

    let mut f = move || {
        stat.count += 1;
        1
    };

    println!("num: {}", f());
}

执行以后输出如下。

num: 1
count: 0

按照预期,输出的 num 和 count 应该都为1,因为在调用闭包 f 时 stat.count += 1被调用了,但是以上代码中遇到了和最开始同样的问题。因此以上代码可以作为我们问题的一个最小复现。

2. 问题分析

对以上代码进行分析的话,我们看到闭包 f 的代码中使用了 move,因此在闭包中使用过的对象的 ownership 应该都会转移到闭包中。而 struct Stats 实现了 Drop,因此 Stats是不可以 partial move 的,其必须作为一个整体被 move 进入闭包。而在闭包中执行了 stats.count += 1,因此 stats 中的 count 应该被置为 1。但是从程序的输出可以看到在 stats 被 drop 时,stats 的 count 是 0。

我们尝试将闭包 f 改为如下代码,来显式的将 stats 的 ownership 给 move 进闭包里。

let mut f = move || {
    let mut stat = stat;
    stat.count += 1;
    1
};

输出恢复正常。

num: 1
count: 1

我们再次尝试在闭包 f 中使用 stat 中的另一个字段 vec:

let mut f = move || {
    let _ = stat.vec.len();
    stat.count += 1;
    1
};

输出同样恢复正常。

num: 1
count: 1

可以看到,我们显式地将 stat 整个 move 进闭包,或者在闭包中使用类型为 vec 的字段,都会使得 stat 的 ownership 被 move 进闭包。

于是我们推测,尽管 stat 实现了自己的 drop 导致不能被 partial move,但是如果我们在 move 的闭包中只使用了 stat 中实现了 Copy 类型的字段,则这个字段的值会被 Copy 到闭包中,而闭包中对这个字段的修改只会作用于被 Copy 后的值,而原字段并不会改变。

3. 验证猜想

我们可以通过将以上代码编译成二进制代码后,对其汇编代码进行分析,从而验证我们的猜想。然而,编译后的汇编代码会过于复杂且晦涩难懂,同时编译器对其进行的一些优化也会改变原有的逻辑导致汇编代码难以理解。因此我们打算通过分析在编译过程中产生的 MIR 中间代码来对问题进行分析。在 rust playground 上可以十分方便地生成 MIR 代码。

首先我们对存在问题的最小复现代码生成 MIR,生成后与闭包相关的 MIR 如下。可以看到这个闭包确实只包含了一个类型为 usize 的字段,这个字段的值取的是 stat 中的 count 值。

bb1: {
    _1 = Stat { count: const 0_usize, vec: move _2 };
    _3 = {closure@src/main.rs:19:17: 19:24} { stat: (_1.0: usize) };
}

而我们对后续测试中有正常输出的代码生成 MIR,生成后与闭包相关的 MIR 如下。可以看到这个闭包将整个 stat 的 ownership 给 move 了进去。

bb1: {
  _1 = Stat { count: const 0_usize, vec: move _2 };
  _3 = {closure@src/main.rs:19:17: 19:24} { stat: move _1 };
}

于是,我们的猜想得到了验证,在我们出现问题的代码中,闭包确实没有捕获 stat 的 ownership。

4. 后续与总结

我们向 rust 社区反映了这个问题,得到的反馈是,这个是 rust 2021 后实现的一个 feature。在 rust 2021 中,一个使用了 move 的闭包在捕获一个 struct 的时候,会尽可能少地去捕获 struct 中的字段。

  • 如果一个 struct 没有实现 Drop,这意味着他里面的字段可以被分开 move,而闭包只会捕获闭包中用到的字段。
  • 如果某个被闭包使用的字段实现了 Copy,那他闭包并不会捕获这个字段的 ownership,而是将这个字段 copy 一份放在闭包中。
  • 如果一个 struct 实现了 Drop,那他里面的字段只能作为一个整体被捕获。但如果闭包只使用了这个闭包中实现了 Copy 的字段,那这个闭包不会捕获这个 struct,而是将使用到的字段 copy 一份。

我们的代码中,正是因为这个行为,导致我们的代码产生了歧义,而出现了 metrics 的丢失。

针对这个问题,我们认为有两个地方有提升的空间。

  • 首先, try_stream 这个宏的封装存在一定的问题。在使用宏来声明代码中,其暴露出来的使用方法是通过调用一个方法来生成 stream,而在调用方法时,如果参数是通过 move ownership 的形式传入的,同时在生成 stream 的代码中我们使用了这个参数,那我们应该认为这个 stream 包含了这个参数的 ownership。然而,由于这个宏在实现的时候使用了闭包,导致这个 stream 并没有包含这个参数的 ownership,从而导致问题。这个是宏封装逻辑时的问题。
  • 其次,rust 在语言设计上,由于引入了这个闭包捕获 ownership 的特殊逻辑,导致会写出有歧义的代码。例如,在上述代码中,很难想象 stat.count += 1 并没有去修改 stat 中的 count 值。我们也向 rust 社区反映了这个问题 

最后,回到我们最开始的问题中。要想解决 metrics 丢失的问题,在我们的代码中,我们只需要做以下修改就能让代码正常运行

#[try_stream(ok = StateStoreIterItem, error = StorageError)]
async fn into_stream_inner(mut self) {
    let inner = self.inner;
    ...
    self.stats.total_items += 1;
    self.stats.total_size += key.encoded_len() + value.len();
}

修改为

#[try_stream(ok = StateStoreIterItem, error = StorageError)]
async fn into_stream_inner(self) {
    let inner = self.inner;
    let mut stats = self.stats;
    ...
    stats.total_items += 1;
    stats.total_size += key.encoded_len() + value.len();
}

关于 RisingWave

RisingWave 是一款基于 Apache 2.0 协议开源的分布式流数据库,致力于降低流计算使用门槛。RisingWave 采用存算分离架构,实现了高效的复杂查询、瞬时动态扩缩容以及快速故障恢复,帮助用户轻松快速搭建稳定高效的流计算系统。使用 RisingWave 处理流数据的方式类似使用 PostgreSQL,通过创建实时物化视图,让用户能够轻松编写流计算逻辑,并通过访问物化视图来进行即时、一致的查询流计算结果。了解更多:

官网 risingwave.com

文档risingwave.dev

GitHubrisingwave.com/github

Slack risingwave.com/slack

B 站:RisingWave 中文开源社区

微信公众号:RisingWave中文开源社区

社区用户交流群:risingwave_assistant

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部