-
只是闲着图一乐写的,本人水平有限
-
对Linux外的平台无任何了解,如果有错误可以在评论区纠正
-
有使用AI润色,因为语言组织水平太烂了
-
标题是玩《Hentai Prison》的梗
线程(英语:thread)在计算机科学中,是将进程划分为两个或多个线程(实例)或子进程,由单处理器(单线程)或多处理器(多线程)或多核处理系统并发执行。
省流:线程是独立的控制流状态
(如果和你想的一致那么就不需要再看了,如果有异议欢迎交流)
软件线程
引入
现代操作系统需要运行各种各样的程序,为了管理这些程序的运行,操作系统提出了进程的抽象。每个进程都对应于一个运行的程序,有了这种抽象后,程序运行时可以认为自己独占了整个CPU,不需要考虑其他程序的运行状况,而物理资源的分配交给操作系统负责。具体实现上,操作系统通过上下文切换 (保存和恢复进程在运行时的状态)机制实现了进程的暂停、切换和恢复,从而实现了CPU资源的共享,为程序提供了虚拟化的环境。
随着计算机的发展,CPU的数量不断增加,应用程序的需求也越来越复杂。有些程序希望同时利用多个CPU来加速计算,但传统的进程由于独立的地址空间和较高的通信开销,难以高效实现这种并行。为了解决这个问题,操作系统引入了线程的概念。线程是比进程更轻量级的执行单位,同一进程内的多个线程共享进程的地址空间和大部分资源,使得线程之间的通信和切换开销更低。在多核对称多处理(SMP)架构下,多个线程可以被调度到不同的CPU核心上真正并行运行,从而显著提高程序的性能和资源利用率。通过线程,操作系统不仅提供了更灵活的并发模型,也让应用程序能够更充分地利用现代多核计算机的计算能力。
内核态
内核线程是什么?
Linux内核把进程称为任务(task),进程的虚拟地址空间分为用户虚拟地址空间和内核虚拟地址空间,所有进程共享内核虚拟地址空间,每个进程有独立的用户虚拟地址空间,这意味着进程其实是用户态的概念,不存在内核进程,内核只有内核线程,且无用户虚拟地址空间,来看看代码
struct task_struct {
unsigned int __state;
void *stack;
pid_t pid;
pid_t tgid;
struct mm_struct *mm;
struct mm_struct *active_mm;
......
}
进程的mm和active_mm指向同一个mm_struct,对于内核线程,mm是nullptr,当内核线程运行时,active_mm指向从进程借来的mm_struct
在内核中只有task这个概念,只有task才能被调度运行,而用户态是没有权限创建全新的task的,只能通过系统调用去clone当前进程的task,线程只是从进程的task中clone出来的另一个task,所以线程会和进程共享资源,所以进程和线程都是task只是某些属性不同
想创建内核线程的话可以写内核模块,代码运行在内核态
在高级编程语言中全部创建线程的行为都是调用了一个线程库,最终会调用到clone()这种系统调用上,常说的多线程模型指的就是用户态线程库的策略
用户态
pthreads
如果每个人都用clone去编写多线程程序那就太麻烦了,于是有了线程库,线程库为应用提供了创建,退出,合并等操作线程的接口。
线程库在用户态实现了线程控制块(TCB),这个是作为内核中task的扩展而存在的,编程语言中的threadlocal变量/线程本地存储(TLS)功能就是通过这个实现的,以x86-64为例(Linux平台),当一个线程被调度时,pthread会找到该线程TLS的起始地址,存入段寄存器FS中,当线程访问TLS中的变量时,会用FS中的值加上偏移量的方式获取变量。
实现用户态线程
(示例来自cfsamson的200行Rust代码实现绿色线程)
(不是线程库,不会用clone的,需要一点基础,非常简单)
(ABI相关的东西就不展开说了)
#![feature(naked_functions)]
use std::arch::{global_asm, naked_asm};
use std::sync::atomic::{AtomicUsize, Ordering};
/// Default stack size for green thread
const DEFAULT_STACK_SIZE: usize = 1024 * 1024 * 2;
/// Max threads for user tasks running
const MAX_THREADS: usize = 4;
/// Pointer to our runtime, we're only setting this variable on initialization
static mut RUNTIME: usize = 0;
/// Runtime schedule and switch threads
pub struct Runtime {
threads: Vec<Thread>,
// the id of the currently running thread
current: usize,
}
/// Green thread
struct Thread {
id: usize,
stack: Vec<u8>,
ctx: ThreadContext,
state: State,
}
/// Thread state
#[derive(PartialEq, Eq, Debug)]
enum State {
// ready to be assigned a task if needed
Available,
// running
Running,
// ready to move forward and resume execution
Ready,
}
#[derive(Debug, Default)]
#[repr(C)]
pub struct ThreadContext {
// return address
ra: u64,
// stack pointer
sp: u64,
// s0 - s11 (callee saved registers)
s0: u64,
s1: u64,
s2: u64,
s3: u64,
s4: u64,
s5: u64,
s6: u64,
s7: u64,
s8: u64,
s9: u64,
s10: u64,
s11: u64,
// task entry
entry: u64,
}
impl Thread {
fn new(id: usize) -> Self {
Thread {
id,
stack: vec![0_u8; DEFAULT_STACK_SIZE],
ctx: ThreadContext::default(),
state: State::Available,
}
}
fn new_with_state(id: usize, state: State) -> Self {
Thread {
id,
stack: vec![0_u8; DEFAULT_STACK_SIZE],
ctx: ThreadContext::default(),
state,
}
}
}
impl Runtime {
pub fn new() -> Self {
// Base thread is for runtime running
let base_thread_id = 0;
let base_thread = Thread::new_with_state(base_thread_id, State::Running);
// These threads are for user tasks running
let mut threads = vec![base_thread];
let mut available_threads = (1..MAX_THREADS + 1).map(|i| Thread::new(i)).collect();
threads.append(&mut available_threads);
Runtime {
threads,
current: base_thread_id,
}
}
/// This is cheating a bit, but we need a pointer to our Runtime
/// stored so we can call yield on it even if we don't have a
/// reference to it.
pub fn init(&self) {
unsafe {
let r_ptr: *const Runtime = self;
RUNTIME = r_ptr as usize;
}
}
/// Start running our `Runtime`. It will continually call `t_yield()` until
/// it returns false which means that there is no more work to do.
pub fn run(&mut self) {
while self.t_yield() {}
println!("All tasks finished!");
}
/// User tasks call this function to return and schedule a new thread to be run
fn t_return(&mut self) {
// Mark current thread available, so it can be assigned a new task
self.threads[self.current].state = State::Available;
self.t_schedule();
}
/// Suspend current thread and schedule a new thread to be run
fn t_yield(&mut self) -> bool {
// Mark current thread ready, so it can be scheduled again
self.threads[self.current].state = State::Ready;
self.t_schedule()
}
/// Schedule a new thread to be run
fn t_schedule(&mut self) -> bool {
let thread_count = self.threads.len();
// Find next ready thread
let mut pos = (self.current + 1) % thread_count;
while self.threads[pos].state != State::Ready {
pos = (pos + 1) % thread_count;
// If no other ready thread, means all user tasks finished
// so current thread must be base thread
if pos == self.current {
return false;
}
}
println!("RUNTIME: schedule next thread {} to be run", pos);
// Switch to a new thread
self.threads[pos].state = State::Running;
let old_pos = self.current;
self.current = pos;
unsafe {
switch(&mut self.threads[old_pos].ctx, &self.threads[pos].ctx);
}
true
}
/// Spawn a new task to be executed by a green thread in runtime
pub fn spawn(&mut self, f: fn()) {
let available = self
.threads
.iter_mut()
.find(|t| t.state == State::Available)
.expect("no available green thread.");
println!("RUNTIME: spawning task on green thread {}", available.id);
let size = available.stack.len();
unsafe {
let s_ptr = available.stack.as_mut_ptr().offset(size as isize);
// make sure our stack itself is 8 byte aligned,
// risc-v ld/sd instructions need address 8 byte alignment
let s_ptr = (s_ptr as usize & !7) as *mut u8;
available.ctx.ra = task_return as u64; // task return address
available.ctx.sp = s_ptr as u64; // stack pointer
available.ctx.entry = f as u64; // task entry address
}
available.state = State::Ready;
}
}
/// When user task completed, then will jump to this function to return
fn task_return() {
unsafe {
let rt_ptr = RUNTIME as *mut Runtime;
(*rt_ptr).t_return();
}
}
/// Call yield from an arbitrary place in user task code
pub fn r#yield() {
unsafe {
let rt_ptr = RUNTIME as *mut Runtime;
(*rt_ptr).t_yield();
};
}
/// We could use naked function to implement switch function
#[naked]
#[no_mangle]
unsafe extern "C" fn switch(old: *mut ThreadContext, new: *const ThreadContext) {
// a0: old, a1: new
naked_asm!(
"
sd ra, 0*8(a0)
sd sp, 1*8(a0)
sd s0, 2*8(a0)
sd s1, 3*8(a0)
sd s2, 4*8(a0)
sd s3, 5*8(a0)
sd s4, 6*8(a0)
sd s5, 7*8(a0)
sd s6, 8*8(a0)
sd s7, 9*8(a0)
sd s8, 10*8(a0)
sd s9, 11*8(a0)
sd s10, 12*8(a0)
sd s11, 13*8(a0)
// When user task scheduled for the second time,
// overwrite task entry address with the return address
sd ra, 14*8(a0)
ld ra, 0*8(a1)
ld sp, 1*8(a1)
ld s0, 2*8(a1)
ld s1, 3*8(a1)
ld s2, 4*8(a1)
ld s3, 5*8(a1)
ld s4, 6*8(a1)
ld s5, 7*8(a1)
ld s6, 8*8(a1)
ld s7, 9*8(a1)
ld s8, 10*8(a1)
ld s9, 11*8(a1)
ld s10, 12*8(a1)
ld s11, 13*8(a1)
// When user task scheduled for the first time, t0 will be task entry address.
// After that, t0 will be return address
ld t0, 14*8(a1)
// pseudo instruction, actually is jalr x0, 0(t0)
jr t0
"
);
}
static FINISHED_TASK_COUNT: AtomicUsize = AtomicUsize::new(0);
fn main() {
let mut runtime = Runtime::new();
runtime.init();
runtime.spawn(|| {
test_task(1);
});
runtime.spawn(|| {
test_task(2);
});
runtime.spawn(|| {
test_task(3);
});
runtime.run();
assert_eq!(FINISHED_TASK_COUNT.load(Ordering::SeqCst), 3);
}
fn test_task(task_id: usize) {
println!("TASK {} STARTING", task_id);
for i in 0..4 * task_id {
println!("task: {} counter: {}", task_id, i);
r#yield();
}
FINISHED_TASK_COUNT.fetch_add(1, Ordering::SeqCst);
println!("TASK {} FINISHED", task_id);
}
值得注意的点在于switch的逻辑以及spawn时线程的上下文的ra被设置为函数task_return的地址,当线程被调度时,上下文存储的ra会被恢复到寄存器上,线程的结束必然回到调度逻辑上且因为状态改变这个线程不会再被调度,这个线程始终走不到switch后返回true的逻辑
(看不懂的话需要熟悉下基础的汇编,示例为RISC-V)
协程
在我看来,只要是描述task的就都是标题所指的线程,自然也要讨论讨论协程了
实际上刚才实现的用户态线程就是有栈协程,他有整个调用栈。而无栈协程只是个根据状态选择不同分支的函数,没有调用栈,常说的无栈协程高性能就来自不需要调用栈节省内存以及协程上下文极小切换没什么开销(就是个状态而已)
(由于本人掌握的编程语言较少,只能讨论cpp和rust的无栈协程了)
换句话说,协程只是可以挂起和恢复的函数
函数只有2个行为:调用和返回,函数返回后,栈上所拥有的状态会被全部销毁,协程则可以挂起,协程挂起时可以保留协程上下文,恢复则恢复协程的上下文,协程的上下文取决于协程内的局部变量等,当协程像函数一样返回,协程也要被销毁
C++20 Coroutine
c++的协程展开后和以下代码的逻辑类似
(这是伪代码,实际ABI各厂商可以自由实现)
enum State {
Start,
YieldValue,
FinalSuspend,
Done
};
struct CoroutineStateMachine {
State current_state = Start;
int a = 1, b = 1; //
// promise_type的引用,协程的promise接口
promise_type& promise;
void resume() {
try {
switch (current_state) {
case Start:
// 执行 initial_suspend
if (promise.initial_suspend()) {
current_state = YieldValue;
return; // 挂起
}
// 进入协程主体
[[fallthrough]];
case YieldValue:
while (a < 1000000) {
// co_yield a
promise.yield_value(a);
current_state = YieldValue;
std::tie(a, b) = std::make_tuple(b, a + b);
return; // 挂起
}
// co_return
promise.return_void();
current_state = FinalSuspend;
[[fallthrough]];
case FinalSuspend:
// 执行 final_suspend
if (promise.final_suspend()) {
current_state = Done;
return; // 挂起
}
// 结束
[[fallthrough]];
case Done:
return; // 协程结束
}
} catch (...) {
// 异常处理
if (!promise.initial_await_resume_called()) {
promise.unhandled_exception();
}
}
}
};






2 条回复