Task阅读手记

error.rs

主要结构

JoinError:表示任务执行失败的错误类型,包含

  • repr:错误的具体表示
  • id:任务的唯一标识符
  • io::Error实现了From<JoinError>,方便类型转换,提升兼容性

Repr枚举:

  • Canceled:表示任务被取消
  • Panic:表示任务因panic而终止,包含panic信息

join.rs

实现了Tokio异步运行时中的JoinHandle类型,这是tokio任务系统的核心组件之一。

核心功能

JoinHandle<T>是一个表示异步任务执行结果的句柄,主要功能包括:

  1. 任务结果获取:通过await获取任务的执行结果
  2. 任务取消:可以取消正在运行的任务
  3. 任务状态查询:检查任务是否已完成
  4. 任务ID获取:获取任务的唯一标识符

Future实现

  • 允许直接await JoinHandle来等待任务完成
  • 返回Result<T, JoinError>,其中T是任务的返回类型

与其他组件的交互

  • RwaTask交互来管理任务的状态
  • 使用Header来访问任务的元数据
  • 与运行时调度器协作来执行任务

core.rs

core.rs是Tokio任务系统的核心实现文件,定义了任务的核心数据结构和底层操作。

Cell<T, S>

pub(crate) struct Cell<T: Future, S: 'static> {
    /// Header field
    pub(super) header: Header,

    /// Scheduler-specific data
    pub(super) scheduler: UnsafeCell<Option<S>>,

    /// Either the future or output, depending on the execution stage.
    pub(super) stage: CoreStage<T>,
}
  • 任务的基本内存布局
  • 包含任务头、调度器和任务阶段信息
  • 使用UnsafeCell实现内部可变性

Header

pub(crate) struct Header {
    pub(super) state: State,
    pub(super) vtable: &'static Vtable,
    pub(super) owner_id: UnsafeCell<Option<NonZeroU64>>,
    pub(super) next_atomic: UnsafeCell<Option<NonNull<Header>>>,
    pub(super) vtable_padding: *const (),
    #[cfg(tokio_unstable)]
    pub(crate) id: Id,
    #[cfg(not(tokio_unstable))]
    pub(super) id: Id,
    // ...
}
  • 任务头信息
  • 包含任务状态、虚函数表、所有者ID等
  • 使用UnsafeCell实现线程安全

Core<T, S>

pub(super) struct Core<T: Future, S: 'static> {
    pub(super) stage: CoreStage<T>,
    pub(super) scheduler: UnsafeCell<Option<S>>,
    pub(super) task_id: Id,
    // ...
}
  • 任务核心逻辑
  • 管理任务执行阶段和调度器
任务生命周期管理

任务阶段

enum Stage<T> {
    Running(T),      // 任务正在执行
    Finished(super::Result<T::Output>),  // 任务已完成
    Consumed,        // 任务结果已被消费
}
任务轮询
pub(super) fn poll(&self, mut cx: Context<'_>) -> Poll<T::Output> {
  let res = {
    self.stage.stage.with_mut(|ptr| {
      // 获取任务阶段的可变引用
      let future = match unsafe { &mut *ptr } {
        Stage::Running(future) => future,
        _ => unreachable!("unexpected stage"),
      };

      // 轮询future
      let future = unsafe { Pin::new_unchecked(future) };

      let _guard = TaskIdGuard::enter(self.task_id);
      future.poll(&mut cx)
    })
  };

  //清理工作
  if res.is_ready() {
    self.drop_future_or_output();
  }

  res
}

任务输出管理

// 存储任务输出
pub(super) fn store_output(&self, output: super::Result<T::Output>) {
  unsafe {
    self.set_stage(Stage::Finished(output));
  }
}

// 获取任务输出
pub(super) fn take_output(&self) -> super::Result<T::Output> {
  use std::mem;

  self.stage.stage.with_mut(|ptr| {
    // Safety:: the caller ensures mutual exclusion to the field.
    match mem::replace(unsafe { &mut *ptr }, Stage::Consumed) {
      Stage::Finished(output) => output,
      _ => panic!("JoinHandle polled after completion"),
    }
  })
}

内存布局优化

文件开头的注释详细说明了不同架构下的缓存行大小优化:

#[cfg_attr(
    any(
        target_arch = "x86_64",
        target_arch = "aarch64",
        target_arch = "powerpc64",
    ),
    repr(align(128))  // 128字节对齐
)]

任务ID管理

/// 在执行或删除 future 时设置和清除上下文中的任务 ID
pub(crate) struct TaskIdGuard {
    parent_task_id: Option<Id>,
}

impl TaskIdGuard {
    fn enter(id: Id) -> Self {
        TaskIdGuard {
            parent_task_id: context::set_current_task_id(Some(id)),
        }
    }
}

impl Drop for TaskIdGuard {
    fn drop(&mut self) {
        context::set_current_task_id(self.parent_task_id);
    }
}

abort.rs

实现了AbortHandle类型,这是Tokio任务系统中用于取消任务的核心组件。

AbortHandle提供了以下主要功能:

  • 任务取消:允许外部取消正在运行的异步任务
  • 任务状态查询:检查任务是否已完成
  • 任务标识:获取任务的唯一标识符

AbortHandle是一个典型的所有权代理模式:

  1. 轻量级句柄:只包含必要的信息来引用任务
  2. 引用计数:使用Arc风格的引用计数管理生命周期
  3. 线程安全:内部使用原子操作保证线程安全

引用计数的实现

引用计数增加(Clone实现)

impl Clone for AbortHandle {
    fn clone(&self) -> Self {
        // 1. 增加引用计数
        self.raw.ref_inc();
        // 2. 创建新实例
        Self::new(self.raw)
    }
}

引用计数减少(Drop表现)

impl Drop for AbortHandle {
    fn drop(&mut self) {
        // 减少引用计数
        self.raw.drop_abort_handle();
    }
}

RawTask内部实现了引用计数逻辑

// 伪代码,展示 RawTask 内部实现
struct RawTask {
    ptr: NonNull<Header>,
}

impl RawTask {
    // 增加引用计数
    pub(crate) fn ref_inc(&self) {
        // 1. 获取 Header 的可变引用
        // 2. 原子地增加引用计数
        let ref_count = self.header().ref_count.fetch_add(1, Ordering::Relaxed);
        
        // 防止引用计数溢出
        if ref_count > MAX_REF_COUNT {
            std::process::abort();
        }
    }
    
    // 减少引用计数
    pub(crate) fn drop_abort_handle(&self) {
        // 1. 原子地减少引用计数
        let ref_count = self.header().ref_count.fetch_sub(1, Ordering::Release);
        
        // 2. 如果这是最后一个引用,执行清理
        if ref_count == 1 {
            // 确保所有先前的写操作对其他线程可见
            std::sync::atomic::fence(Ordering::Acquire);
            
            // 释放资源
            self.free();
        }
    }
    
    // 获取 Header 的引用
    fn header(&self) -> &Header {
        unsafe { &*self.ptr.as_ptr() }
    }
}

raw.rs

raw.rs是Tokio任务系统的核心实现文件,它定义了底层RawTask类型,负责管理任务内存和生命周期。

  1. 核心结构

RawTask

pub(super) struct RawTask {
    ptr: NonNull<Header>,
}
  • 轻量级包装,包含指向任务头的指针
  • 实现了自定义的引用计数和生命周期管理

内存布局

+------------------+
|     Header       |  <-- 包含状态、虚表等元数据
+------------------+
|   Scheduler      |  <-- 调度器特定数据
+------------------+
|   Future         |  <-- 用户提供的 Future
+------------------+
|   Output         |  <-- Future 的输出
+------------------+
|    Trailer       |  <-- 其他元数据(如 waker
+------------------+

主要功能

pub(super) fn new<T, S>(
    task: T,
    scheduler: S,
    id: Id,
    _spawned_at: SpawnLocation,
) -> RawTask
  • 分配内存并初始化任务
  • 设置初始状态和虚表
  • 返回RawTask实例

引用计数

pub(super) fn ref_inc(&self) {
    let n = self.header().state.ref_inc();
    if n > MAX_REF_COUNT {
        // 处理溢出
    }
}

pub(super) fn ref_dec(&self) {
    if self.header().state.ref_dec() {
        // 最后一个引用被丢弃
        self.drop_reference();
    }
}

内存分配

let ptr = Box::into_raw(Cell::<_, S>::new(
    task,
    scheduler,
    State::new(),
    id,
    #[cfg(tokio_unstable)]
    _spawned_at.0,
));

内存释放

unsafe fn drop_abort_handle_slow(self) {
    // 释放资源
    let ptr = self.ptr.as_ptr();
    let _ = Box::from_raw(ptr.cast::<Cell<(), S>>());
}
  1. 管理任务的内存布局和生命周期
  2. 实现自定义的引用计数
  3. 提供与调度器的集成点
  4. 处理任务状态转换
  5. 提供线程安全的操作
// 1. 分配内存并创建 Box
let boxed = Box::new(MyData::new());

// 2. 转换为原始指针,放弃所有权
// 放弃所有权 = 讲值的所有权从Rust的所有权系统移除
// 明确放弃 Rust 对这块内存的所有权管理
// 调用者必须手动管理内存
let raw = Box::into_raw(boxed);  // 不会自动释放

// 3. 使用原始指针
unsafe { (*raw).do_something() };

// 4. 手动释放内存
let _ = unsafe { Box::from_raw(raw) };  // 析构并释放内存
// 或者更安全地:
let _boxed = unsafe { Box::from_raw(raw) };  // 重新获得所有权
// _boxed 在这里被 drop,内存被释放

放弃所有权意味着:

  1. 无自动析构:Rust不会自动调用drop或释放内存
  2. 内存泄漏风险:如果忘记手动释放,会导致内存泄漏
  3. 安全性责任:调用者必须确保
    1. 指针有效(非空、未释放)
    2. 正确调用析构函数
    3. 正确释放内存

为什么需要放弃所有权?

  1. FFI交互:与C代码交互时需要使用原始指针
  2. 自引用结构:构建自引用或复杂数据结构
  3. 性能优化:避免引用计数的开销
  4. 生命周期扩展:需要比Rust生命周期系统更灵活的生命周期管理

state.rs

Tokio 异步运行时中任务状态管理的核心组件之一。它负责管理异步任务的生命周期状态,使用原子操作来确保线程安全。

主要功能
  1. 状态管理
    • 使用State结构体封装了一个原子整数,表示任务的当前状态
    • 通过Snapshot类型提供对状态的不可变快照访问
  2. 状态转换
    • 提供线程安全的状态转换方法,如transition_to_running()
    • 使用原子操作确保状态转换的线程安全性
  3. 引用计数:
    • 管理任务的引用计数,跟踪任务的所有者数量
    • 处理任务的创建和销毁
  4. 生命周期管理
    • 跟踪任务是否完成、是否被取消
    • 管理任务的通知状态
关键结构体
  1. State.rs
    • 包含一个AtomicUsize,以线程安全的方式存储状态
    • 提供各种状态转换方法
  2. Snapshot
    • 状态的不可变快照
    • 提供查询方法检查状态
  3. TransitionToRunning等枚举
    • 表示状态转换的结果

waker.rs

是Tokio运行时中实现Waker功能的核心模块,负责异步任务的唤醒机制。以下是其主要功能和作用:

核心功能
  1. Waker实现:
    • 提供了WakerRef结构体,是std::task::Waker的轻量级包装
    • 实现了Dereftrait,使其可以透明地作为Waker使用
  2. 性能优化
    • 使用ManuallyDrop避免不必要的引用计数操作
    • 通过PhantomData确保类型安全
  3. 调试支持:
    • 在启用trace特性时,提供详细的任务唤醒日志
    • 记录任务ID和操作类型,便于调试和性能分析
关键组件
  1. WakerRef结构体

    • 封装了标准库的Waker
    • 使用PhantomData关联调度器类型S
  2. Waker VTable

    • 定义了Waker的行为
      • clone_waker:克隆Waker
      • drop_waker:释放Waker资源
      • wake_by_val:消费Waker并唤醒任务
      • wake_by_ref:不消费Waker的情况下唤醒任务
  3. raw_waker函数

    • Header创建原始Waker
    • 使用静态的WAKER_VTABLE定义Waker的行为
在Tokio的作用
  1. 任务唤醒
    • 当异步操作完成时,通过Waker通知执行器任务可以继续执行
    • 支持wake_by_valwake_by_ref两种唤醒方式
  2. 性能优化
    • 避免不必要的Waker克隆
    • 减少引用计数操作
  3. 调试支持
    • 提供详细的唤醒日志,帮助诊断死锁和性能问题

harness.rs

是Tokio任务执行的核心模块之一,它封装了任务执行的生命周期管理。这个文件主要处理任务的轮询和状态转换。

核心功能
  1. 任务轮询管理
    • 提供了poll_inner方法,负责执行异步任务的轮询
    • 处理任务从运行状态到空闲状态的转换
  2. 状态机转换
    • 管理任务状态的转换,包括
      • 运行中(Running)
      • 空闲(Idle)
      • 完成(Complete)
      • 取消(Cancelled)
  3. 生命周期控制
    • 处理任务的创建、执行和销毁
    • 管理任务的引用计数
主要组件
  1. Harness结构体

    • 封装了任务执行所需的所有状态和上下文
    • 提供了操作任务生命周期的方法
  2. 状态转换

    • transition_to_running将任务状态设置为运行中
    • transition_to_idle将任务状态设置为空闲
    • 处理各种转换结果(成功、通知、取消等)
  3. Waker集成

    • 创建waker_ref用于唤醒任务
    • 构建Context传递给future的poll方法
  4. 任务调度:

    • 作为任务调度的核心组件
    • 与调度器协同工作,管理任务的运行
使用 Hugo 构建
主题 StackJimmy 设计