error.rs
主要结构
JoinError:表示任务执行失败的错误类型,包含
repr
:错误的具体表示id
:任务的唯一标识符- 为
io::Error
实现了From<JoinError>
,方便类型转换,提升兼容性
Repr枚举:
Canceled
:表示任务被取消Panic
:表示任务因panic而终止,包含panic信息
join.rs
实现了Tokio异步运行时中的JoinHandle
类型,这是tokio任务系统的核心组件之一。
核心功能
JoinHandle<T>
是一个表示异步任务执行结果的句柄,主要功能包括:
- 任务结果获取:通过
await
获取任务的执行结果 - 任务取消:可以取消正在运行的任务
- 任务状态查询:检查任务是否已完成
- 任务ID获取:获取任务的唯一标识符
Future
实现
- 允许直接
await
JoinHandle
来等待任务完成 - 返回
Result<T, JoinError>
,其中T
是任务的返回类型
与其他组件的交互
- 与
RwaTask
交互来管理任务的状态 - 使用
Header
来访问任务的元数据 - 与运行时调度器协作来执行任务
core.rs
core.rs
是Tokio任务系统的核心实现文件,定义了任务的核心数据结构和底层操作。
Cell<T, S>
pub(crate) struct Cell<T: Future, S: 'static> {
/// Header field
pub(super) header: Header,
/// Scheduler-specific data
pub(super) scheduler: UnsafeCell<Option<S>>,
/// Either the future or output, depending on the execution stage.
pub(super) stage: CoreStage<T>,
}
- 任务的基本内存布局
- 包含任务头、调度器和任务阶段信息
- 使用
UnsafeCell
实现内部可变性
Header
pub(crate) struct Header {
pub(super) state: State,
pub(super) vtable: &'static Vtable,
pub(super) owner_id: UnsafeCell<Option<NonZeroU64>>,
pub(super) next_atomic: UnsafeCell<Option<NonNull<Header>>>,
pub(super) vtable_padding: *const (),
#[cfg(tokio_unstable)]
pub(crate) id: Id,
#[cfg(not(tokio_unstable))]
pub(super) id: Id,
// ...
}
- 任务头信息
- 包含任务状态、虚函数表、所有者ID等
- 使用
UnsafeCell
实现线程安全
Core<T, S>
pub(super) struct Core<T: Future, S: 'static> {
pub(super) stage: CoreStage<T>,
pub(super) scheduler: UnsafeCell<Option<S>>,
pub(super) task_id: Id,
// ...
}
- 任务核心逻辑
- 管理任务执行阶段和调度器
任务生命周期管理
任务阶段
enum Stage<T> {
Running(T), // 任务正在执行
Finished(super::Result<T::Output>), // 任务已完成
Consumed, // 任务结果已被消费
}
任务轮询
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
let res = {
self.stage.stage.with_mut(|ptr| {
// 获取任务阶段的可变引用
let future = match unsafe { &mut *ptr } {
Stage::Running(future) => future,
_ => unreachable!("unexpected stage"),
};
// 轮询future
let future = unsafe { Pin::new_unchecked(future) };
let _guard = TaskIdGuard::enter(self.task_id);
future.poll(&mut cx)
})
};
//清理工作
if res.is_ready() {
self.drop_future_or_output();
}
res
}
任务输出管理
// 存储任务输出
pub(super) fn store_output(&self, output: super::Result<T::Output>) {
unsafe {
self.set_stage(Stage::Finished(output));
}
}
// 获取任务输出
pub(super) fn take_output(&self) -> super::Result<T::Output> {
use std::mem;
self.stage.stage.with_mut(|ptr| {
// Safety:: the caller ensures mutual exclusion to the field.
match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
Stage::Finished(output) => output,
_ => panic!("JoinHandle polled after completion"),
}
})
}
内存布局优化
文件开头的注释详细说明了不同架构下的缓存行大小优化:
#[cfg_attr(
any(
target_arch = "x86_64",
target_arch = "aarch64",
target_arch = "powerpc64",
),
repr(align(128)) // 128字节对齐
)]
任务ID管理
/// 在执行或删除 future 时设置和清除上下文中的任务 ID
pub(crate) struct TaskIdGuard {
parent_task_id: Option<Id>,
}
impl TaskIdGuard {
fn enter(id: Id) -> Self {
TaskIdGuard {
parent_task_id: context::set_current_task_id(Some(id)),
}
}
}
impl Drop for TaskIdGuard {
fn drop(&mut self) {
context::set_current_task_id(self.parent_task_id);
}
}
abort.rs
实现了AbortHandle
类型,这是Tokio任务系统中用于取消任务的核心组件。
AbortHandle
提供了以下主要功能:
- 任务取消:允许外部取消正在运行的异步任务
- 任务状态查询:检查任务是否已完成
- 任务标识:获取任务的唯一标识符
AbortHandle
是一个典型的所有权代理模式:
- 轻量级句柄:只包含必要的信息来引用任务
- 引用计数:使用
Arc
风格的引用计数管理生命周期 - 线程安全:内部使用原子操作保证线程安全
引用计数的实现
引用计数增加(Clone实现)
impl Clone for AbortHandle {
fn clone(&self) -> Self {
// 1. 增加引用计数
self.raw.ref_inc();
// 2. 创建新实例
Self::new(self.raw)
}
}
引用计数减少(Drop表现)
impl Drop for AbortHandle {
fn drop(&mut self) {
// 减少引用计数
self.raw.drop_abort_handle();
}
}
RawTask
内部实现了引用计数逻辑
// 伪代码,展示 RawTask 内部实现
struct RawTask {
ptr: NonNull<Header>,
}
impl RawTask {
// 增加引用计数
pub(crate) fn ref_inc(&self) {
// 1. 获取 Header 的可变引用
// 2. 原子地增加引用计数
let ref_count = self.header().ref_count.fetch_add(1, Ordering::Relaxed);
// 防止引用计数溢出
if ref_count > MAX_REF_COUNT {
std::process::abort();
}
}
// 减少引用计数
pub(crate) fn drop_abort_handle(&self) {
// 1. 原子地减少引用计数
let ref_count = self.header().ref_count.fetch_sub(1, Ordering::Release);
// 2. 如果这是最后一个引用,执行清理
if ref_count == 1 {
// 确保所有先前的写操作对其他线程可见
std::sync::atomic::fence(Ordering::Acquire);
// 释放资源
self.free();
}
}
// 获取 Header 的引用
fn header(&self) -> &Header {
unsafe { &*self.ptr.as_ptr() }
}
}
raw.rs
raw.rs
是Tokio任务系统的核心实现文件,它定义了底层RawTask
类型,负责管理任务内存和生命周期。
- 核心结构
RawTask
pub(super) struct RawTask {
ptr: NonNull<Header>,
}
- 轻量级包装,包含指向任务头的指针
- 实现了自定义的引用计数和生命周期管理
内存布局
+------------------+
| Header | <-- 包含状态、虚表等元数据
+------------------+
| Scheduler | <-- 调度器特定数据
+------------------+
| Future | <-- 用户提供的 Future
+------------------+
| Output | <-- Future 的输出
+------------------+
| Trailer | <-- 其他元数据(如 waker)
+------------------+
主要功能
pub(super) fn new<T, S>(
task: T,
scheduler: S,
id: Id,
_spawned_at: SpawnLocation,
) -> RawTask
- 分配内存并初始化任务
- 设置初始状态和虚表
- 返回
RawTask
实例
引用计数
pub(super) fn ref_inc(&self) {
let n = self.header().state.ref_inc();
if n > MAX_REF_COUNT {
// 处理溢出
}
}
pub(super) fn ref_dec(&self) {
if self.header().state.ref_dec() {
// 最后一个引用被丢弃
self.drop_reference();
}
}
内存分配
let ptr = Box::into_raw(Cell::<_, S>::new(
task,
scheduler,
State::new(),
id,
#[cfg(tokio_unstable)]
_spawned_at.0,
));
内存释放
unsafe fn drop_abort_handle_slow(self) {
// 释放资源
let ptr = self.ptr.as_ptr();
let _ = Box::from_raw(ptr.cast::<Cell<(), S>>());
}
- 管理任务的内存布局和生命周期
- 实现自定义的引用计数
- 提供与调度器的集成点
- 处理任务状态转换
- 提供线程安全的操作
// 1. 分配内存并创建 Box
let boxed = Box::new(MyData::new());
// 2. 转换为原始指针,放弃所有权
// 放弃所有权 = 讲值的所有权从Rust的所有权系统移除
// 明确放弃 Rust 对这块内存的所有权管理
// 调用者必须手动管理内存
let raw = Box::into_raw(boxed); // 不会自动释放
// 3. 使用原始指针
unsafe { (*raw).do_something() };
// 4. 手动释放内存
let _ = unsafe { Box::from_raw(raw) }; // 析构并释放内存
// 或者更安全地:
let _boxed = unsafe { Box::from_raw(raw) }; // 重新获得所有权
// _boxed 在这里被 drop,内存被释放
放弃所有权意味着:
- 无自动析构:Rust不会自动调用
drop
或释放内存 - 内存泄漏风险:如果忘记手动释放,会导致内存泄漏
- 安全性责任:调用者必须确保
- 指针有效(非空、未释放)
- 正确调用析构函数
- 正确释放内存
为什么需要放弃所有权?
- FFI交互:与C代码交互时需要使用原始指针
- 自引用结构:构建自引用或复杂数据结构
- 性能优化:避免引用计数的开销
- 生命周期扩展:需要比Rust生命周期系统更灵活的生命周期管理
state.rs
Tokio 异步运行时中任务状态管理的核心组件之一。它负责管理异步任务的生命周期状态,使用原子操作来确保线程安全。
主要功能
- 状态管理
- 使用
State
结构体封装了一个原子整数,表示任务的当前状态 - 通过
Snapshot
类型提供对状态的不可变快照访问
- 使用
- 状态转换
- 提供线程安全的状态转换方法,如
transition_to_running()
- 使用原子操作确保状态转换的线程安全性
- 提供线程安全的状态转换方法,如
- 引用计数:
- 管理任务的引用计数,跟踪任务的所有者数量
- 处理任务的创建和销毁
- 生命周期管理
- 跟踪任务是否完成、是否被取消
- 管理任务的通知状态
关键结构体
State.rs
- 包含一个
AtomicUsize
,以线程安全的方式存储状态 - 提供各种状态转换方法
- 包含一个
Snapshot
- 状态的不可变快照
- 提供查询方法检查状态
TransitionToRunning
等枚举- 表示状态转换的结果
waker.rs
是Tokio运行时中实现Waker功能的核心模块,负责异步任务的唤醒机制。以下是其主要功能和作用:
核心功能
- Waker实现:
- 提供了
WakerRef
结构体,是std::task::Waker
的轻量级包装 - 实现了
Deref
trait,使其可以透明地作为Waker
使用
- 提供了
- 性能优化
- 使用
ManuallyDrop
避免不必要的引用计数操作 - 通过
PhantomData
确保类型安全
- 使用
- 调试支持:
- 在启用
trace
特性时,提供详细的任务唤醒日志 - 记录任务ID和操作类型,便于调试和性能分析
- 在启用
关键组件
-
WakerRef
结构体- 封装了标准库的
Waker
- 使用
PhantomData
关联调度器类型S
- 封装了标准库的
-
Waker VTable
- 定义了Waker的行为
- clone_waker:克隆Waker
- drop_waker:释放Waker资源
- wake_by_val:消费Waker并唤醒任务
- wake_by_ref:不消费Waker的情况下唤醒任务
- 定义了Waker的行为
-
raw_waker
函数- 从
Header
创建原始Waker - 使用静态的
WAKER_VTABLE
定义Waker
的行为
- 从
在Tokio的作用
- 任务唤醒
- 当异步操作完成时,通过Waker通知执行器任务可以继续执行
- 支持
wake_by_val
和wake_by_ref
两种唤醒方式
- 性能优化
- 避免不必要的Waker克隆
- 减少引用计数操作
- 调试支持
- 提供详细的唤醒日志,帮助诊断死锁和性能问题
harness.rs
是Tokio任务执行的核心模块之一,它封装了任务执行的生命周期管理。这个文件主要处理任务的轮询和状态转换。
核心功能
- 任务轮询管理
- 提供了
poll_inner
方法,负责执行异步任务的轮询 - 处理任务从运行状态到空闲状态的转换
- 提供了
- 状态机转换
- 管理任务状态的转换,包括
- 运行中(Running)
- 空闲(Idle)
- 完成(Complete)
- 取消(Cancelled)
- 管理任务状态的转换,包括
- 生命周期控制
- 处理任务的创建、执行和销毁
- 管理任务的引用计数
主要组件
-
Harness
结构体- 封装了任务执行所需的所有状态和上下文
- 提供了操作任务生命周期的方法
-
状态转换
transition_to_running
将任务状态设置为运行中transition_to_idle
将任务状态设置为空闲- 处理各种转换结果(成功、通知、取消等)
-
Waker集成
- 创建
waker_ref
用于唤醒任务 - 构建
Context
传递给future的poll
方法
- 创建
-
任务调度:
- 作为任务调度的核心组件
- 与调度器协同工作,管理任务的运行