如何优雅地组织Rust项目中的异步代码?

原创
2023/04/21 17:24
阅读数 139

概要

很多使用过Async Rust的人都可能有过被其要求的约束所困扰的经历,例如,spawned task'static的要求,MutexGuard不能跨越.await,等等。克服这些约束需要仔细地设计代码结构,很可能会导致晦涩和嵌套的代码,这对开发人员和审查人员都是一种挑战。在这篇文章中,我将首先列出我在编写async Rust代码时的一些痛点。然后,我将指出我们真正需要异步代码的场景,并讨论为什么我们应该把异步和非异步代码分开。最后,我将展示我是如何在最近的一次Curp重构中实践这一原则的。

痛点:

Spawned Task必须是'static

在spawn一个新的async task的时候,编译器并不知道该task会被执行多久,可能很短暂,也可能会一直执行至程序运行结束。所以,编译器会要求该task所含的所有类型都拥有'static的生命周期。

这样的限制使得我们常常能在spawn前看到不少的clone代码。当然,这些代码从某种角度来讲可以帮助程序员更好地理清哪些变量的所有权是要被移交给新的task的,但同时,也会使得代码看上去很啰嗦,不够简洁。

let a_arc = Arc::clone(&a);
let b_arc = Arc::clone(&b);
tokio::spawn(async move {
    // ...
});

Send的变量的持有不可以跨越.await

这点限制背后的原因tokio的task并不是固定在一个线程上执行的,空闲线程会主动“偷取”忙碌线程的task,这就需要task可以被Send

请看下面一段代码:

let mut log_l = log.lock();
log_l.append(new_entry.clone());
broadcast(new_entry).await;

尝试编译后,会发现报错:log_l不能跨越.await点持有。

自然,为了使得拿着锁的critical section尽量地短,我们不需要拿着锁过.await点,所以我们在其中加一行放锁的代码:

let mut log_l = log.lock();
log_l.append(new_entry.clone());
drop(log_l);
broadcast(new_entry).await;

很可惜,还是不能通过编译,这是因为编译器目前只能通过计算代码Scope的方式来判断一个task是否可以被Send。如果说上一个痛点还有一定的好处,那么这个问题就纯粹来源于编译器的限制了。所以我们必须把代码改成这个样子:

{
    let mut log_w = log.write();
    log_w.append(new_entry.clone());
}
broadcast(new_entry).await;

如果一个函数中有需要拿多把锁,又有很多的异步调用,代码就会嵌套起来,变得复杂晦涩。 Side Note: 我们知道tokio自己有个异步的锁tokio::sync::Mutex,它是可以被hold过.await的。但要注意的是,大多数情况下,我们并不会需要异步锁,因为异步锁通常意味着拿着锁的critical section是会非常长的。所以,如果我们需要在异步代码中拿锁,不要不加思索地使用异步锁,事实上,在tokio官方文档中,也是更加建议使用同步锁的。

使用异步Rust的场景和组织方式

如果我们经常在项目开发中遇到上述问题,自然就会开始思考其产生的原因以及该怎样避免。我认为一个很重要的因素就是没有把async和非async的代码给分开,或者说,更本质的原因是我们没有在设计项目架构的时候将需要async的部分和不需要async的部分分开。所以接下来,我将梳理我们什么时候才能真正地用到Async Rust?

I/O

当我们进行比较耗时的I/O操作,我们不想让这些操作block住我们当前的线程。所以我们用异步I/O,当运行到await 的时候,I/O就可以到后台去做,让其它的task执行。

// .await will enable other scheduled tasks to progress
let mut file = File::create(“foo.txt”).await?;

file.write(b"some bytes”).await?;

后台任务

后台任务的task通常会伴随着一个channel的接收端出现。

tokio::spawn(async move {
    while let Some(job) = rx.recv().await {
        // ...
    }
};

并发任务

并发地spawn多个task可以更高效地利用多核处理器。

let chunks = data.chunks(data.len() / N_TASKS);
for chunk in chunks {
  tokio::spawn(work_on(chunk));
}

依赖等待

使用.await等待依赖。这种使用相对较少一些。

// wait for some event
event.listen().await;

// barrier
barrier.wait().await;

可以看到,使用Async代码的地方,主要集中在I/O、并发与后台任务。在开发之前,我们也不妨有意识地去分离项目中的async与sync部分:缩小Async部分的函数,将处理逻辑移动至普通函数中。将这两部分分离,不仅可以缓解文章开头所说的痛点,更可以帮我理清代码结构。

{
    let mut log_w = log.write();
    log_w.append(new_entry.clone());
    // ...
}
broadcast(new_entry).await;

// move the logic to another function instead

fn update_log(log: &mut Log, new_entry: Entry) {
    log.append(new_entry);
    // ...
}

update_log(&mut log.write(), new_entry.clone());
broadcast(new_entry).await;

关于Curp的一次大型重构

在重构之前,由于一次次的迭代,代码的可读性和结构变得越来越差。具体来说,由于我们有若干个带锁结构需要在curp server的各个部分中共享,而curp server的大部分函数又是async的,async和拿锁的代码混杂在一起,就导致了我们常常在开发过程中遇到上述痛点。

所以,我们重新调整了curp server的结构,将其分为了async部分的CurpNode和非async部分的RawCurpCurpNode包括了异步IO(接收,发送网络请求,数据持久化),后台任务(定时检查leader活性,leader在每个节点上复制数据、校准各follower);RawCurp可被视为一个状态机,它接收来自CurpNode的调用,并更新状态。如果RawCurp想要做一些异步操作(比如广播心跳),它就可以通过返回值让CurpNode去替它发请求。

举一个tick task的例子,在未refactor之前,由于我们不能LockGuard不能过.await点,以及有多逻辑分支的限制,不得不将代码组织成这样的一个形式:


    loop {
        let _now = ticker.tick().await;
        let task = {
            let state_c = Arc::clone(&state);
            let state_r = state.upgradable_read();
            if state_r.is_leader() {
                if state_r.needs_hb
                {
                    let resps = bcast_heartbeats(connects.clone(), state_r, rpc_timeout);
                    Either::Left(handle_heartbeat_responses(
                        resps,
                        state_c,
                        Arc::clone(&timeout),
                    ))
                } else {
                    continue;
                }
            } else {
                let mut state_w = RwLockUgradableReadGuard::upgrade(state_r);
                // ...
                let resps = bcast_votes(connects.clone(), state_r, rpc_timeout);
                Either::Right(handle_vote_responses(resps, state_c))
            }
        };
        task.await;
    }

在refactor之后,处理逻辑都被放在了RawCurp中,CurpNode中的代码就清晰多了:


loop {
    let _now = ticker.tick().await;
    let action = curp.tick();
    match action {
        TickAction::Heartbeat(hbs) => {
            Self::bcast_heartbeats(Arc::clone(&curp), &connects, hbs).await;
        }
        TickAction::Votes(votes) => {
            Self::bcast_votes(Arc::clone(&curp), &connects, votes).await;
        }
        TickAction::Nothing => {}
    }
}

我们的项目:Xline

Xline是一个用于元数据管理的分布式KV存储。以上为对Xline中使用的Curp共识协议的重构总结。 如果你想了解更多关于Xline的信息,请参考我们的Github:https://github.com/datenlord/Xline

达坦科技(DatenLord)专注下一代云计算——“天空计算”的基础设施技术,致力于拓宽云计算的边界。达坦科技打造的新一代开源跨云存储平台DatenLord,通过软硬件深度融合的方式打通云云壁垒,实现无限制跨云存储、跨云联通,建立海量异地、异构数据的统一存储访问机制,为云上应用提供高性能安全存储支持。以满足不同行业客户对海量数据跨云、跨数据中心高性能访问的需求。 公众号:达坦科技DatenLord 知乎账号: https://www.zhihu.com/org/da-tan-ke-ji B站: https://space.bilibili.com/2017027518

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
0 收藏
0
分享
返回顶部
顶部