[Rust] async/await: Future를 바닥부터 구현해보기

Rust의 동시성 매커니즘을 가리켜 "fearless concurrency(두려움 없는 동시성)"이라고 부르곤 한다.
안전하고 효율적으로 비동기 구조를 구현했다는 점을 자축하는 의미에서 사용하는 표현이다.

하지만 그 이면에는 생각보다 복잡한 구조가 숨어있고, low-level 개발자라면 시스템이 어떻게 동작하는지 알아야 한다.

이번 포스트에서는 Future를 직접 구현해보면서 Rust의 async/await을 이해해보는 시간을 갖도록 하겠다.




Future trait

Future 자체는 매우 간단한 구조를 갖고 있다.
완료 상태를 확인하는 poll이라는 메서드 하나뿐이다.

pub trait Future {
    type Output;

    fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

저 Pin이니 context니 하는 것까지 파고들면 머리가 아파지지만.. 일단 최대한 단순하게 접근해보겠다.
Pin은 우선 Rust의 빡빡한 소유권 검사를 회피하는 용도로 감싼 박싱이고,
Context는 executor와 future가 서로 상호작용할 수 있게 해주는 오작교다.

Rust의 Executor를 통한 async/await은 기본적으로 다음과 같은 라이프사이클을 가진다.
tokio.rs 같은 Executor들이 그렇다.

하나씩 짚어보겠다.

async fn foo() 
{ 
   ...
}

foo().await; // 이 부분에서 발생하는 일
  1. await을 걸면 async/await 시스템은 내부적으로 poll 메서드를 호출한다. (보통 poll을 최초 호출하는 이 시점에서 Future 내부의 스레드가 시작된다. 다만 구현하기 나름인 부분이라, 아닐 수도 있다.)
  2. 최초 poll의 결과가 Ready라면 Ready를 통해 전달된 값을 await을 한 시점으로 return한다.
  3. 최초 poll의 결과가 Pending이라면, 아직 끝나지 않은 것이다. Executor는 리소스를 낭비하지 않기 위해, await 컨텍스트를 Sleep 상태로 전환한다.
  4. 내부의 Future 스레드에서 작업을 완료했다면, Context를 통해서 Executor에게 wake 명령을 보낸다.
  5. Executor가 wake 명령을 받았다면, 다시 일어나서 poll 메서드를 호출한다. 이 때 Ready가 오면 끝난 것으로 간주하고 Ready의 리턴값을 await을 한 시점으로 return한다.



SetTimeout 구현해보기

보통 자바스크립트에서 많이들 사용하는 SetTimeout을 Rust로 구현해보겠다. 시간 제한을 걸고, 시간이 지나면 특정 콜백을 수행하는 기능이다.

일단 최대한 단순하게 가겠다.
일반적인 async Executor들의 구조도 고려하지 않는다. 그러면 너무 복잡해진다...

우선 Future를 구현할 구조체를 먼저 정의했다.

당장 필요한 것은 많지 않다.
Future가 완료됐는지 여부를 관리할 상태값 필드를 하나 추가했고, 멀티스레드 간에도 문제없이 동작하도록 Atomic 타입으로 넣어놨다.

그리고 Future 생성 기능을 구현한다.

Future 객체가 생성되자마자 바로 스레드를 띄워서 타임아웃 동작을 수행하도록 했다.

그런데 tokio 같은 대중적인 Executor 구현체들은 보통 poll 시점에서 스레드를 띄우는 것을 선호하더라.
아마 기술적인 문제나 디자인적인 문제가 좀 있는 것 같은데, 바로 띄운다고 해서 대단한 문제가 생기지는 않는다.

그리고 Future 트레잇을 본격적으로 구현했다.

완료가 됐다면 Ready로 끝냈다고 알려주고, 아니라면 Pending을 반환한다.

전체 코드다.

use std::{
    future::Future,
    sync::{atomic::AtomicBool, Arc},
    thread,
};

#[derive(Clone)]
pub struct SetTimeout {
    pub done: Arc<AtomicBool>,
}

impl SetTimeout {
    pub fn new<F>(delay: usize, callback: F) -> Self
    where
        F: FnOnce() + Send + 'static,
    {
        let _done = Arc::new(AtomicBool::new(false));

        let done = _done.clone();
        thread::spawn(move || {
            thread::sleep(std::time::Duration::from_millis(delay as u64));
            callback();
            done.store(true, std::sync::atomic::Ordering::Relaxed);
        });

        SetTimeout {
            done: _done,
        }
    }
}

impl Future for SetTimeout {
    type Output = ();

    fn poll(
        self: std::pin::Pin<&mut Self>,
        _context: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        if self.done.load(std::sync::atomic::Ordering::Relaxed) {
            std::task::Poll::Ready(())
        } else {
            std::task::Poll::Pending
        }
    }
}

이러면 일단 간단하게나마 사용을 해볼 수는 있다.

2를 찍는 함수가 1000밀리초 후에 실행되게 했으니, 딱 기대한대로 동작했다.

그러면... 저 Future가 다 끝나기 전까지 기다리게 하려면 어떻게 해야할까?
한번 수동으로 무식하게 await을 해보자

윽.. 뭐가 이상한게 잔뜩 늘었다.
poll 메서드에는 context라는 것을 전달해줘야 하는데, 이게 요구사항이 좀 복잡하기 때문이다. 저것도 야매로 채워둔 것이다.

일단 잡스러운 걸 빼고 보면

이정도다.
Future를 Pin으로 고정하고, poll을 계속 호출해서 끝날때까지 무한루프를 돌도록 했다.

실제 Executor들은 저렇게 무식하게 무한루프를 돌리는 구조는 아니지만..

아무튼 실행하면 기대한대로 동작하긴 할 것이다.

아래는 테스트코드다.

// ...

use std::task::{Context, RawWakerVTable, Waker};

fn main() {
    const VTABLE: RawWakerVTable = RawWakerVTable::new(
        |s| std::task::RawWaker::new(s, &VTABLE),
        |s| println!("wake"),
        |s| println!("wake by ref"),
        |s| println!("drop"),
    );
    let raw_waker = std::task::RawWaker::new(std::ptr::null(), &VTABLE);
    let waker = unsafe { Waker::from_raw(raw_waker) };
    let mut context = Context::from_waker(&waker);

    let future_object = SetTimeout::new(1000, || {
        println!("2");
    });

    let mut pinned = std::pin::pin!(future_object);
    loop {
        if let std::task::Poll::Ready(_) = pinned.as_mut().poll(&mut context) {
            break;
        } else {
            continue;
        }
    }

    println!("1");

    loop {}
}



Context와 Waker

앞서 말했듯이, 일반적인 async Executor 구현들은 Sleep-Wake의 프로세스를 거친다고 했다.
위에서 우리는 그건 무시하고 구현을 했었다.

이번에는 Waker를 직접 구현해서 전체 프로세스를 완성하고, "await"을 사용할 수 있도록 재구성해보겠다.




Future에 Waker 적용

우선 기존 future 구현에 Waker를 위한 설정을 좀 추가해야 한다.

waker를 주고받을 상태 타입을 추가하고


그걸 메인 Future 구현체에 임베딩한다.


스레드가 끝났을때 waker로 알려주게끔 하고


polling한 시점에서 waker를 주입해주게 한다.

사실 여기까지는 어려울게 없다.
머리아픈건 저기에 context를 전달해주고, wake 이벤트도 받아주고, 이런저런 잡다한 노가다를 대신해줄 Executor를 구현하는 것이다.

아래는 여기까지의 future 코드다.

use std::{
    future::Future,
    sync::{atomic::AtomicBool, Arc, Mutex},
    thread,
};

pub struct SetTimeout {
    pub shared_state: Arc<Mutex<SharedState>>,
    pub done: Arc<AtomicBool>,
}

pub struct SharedState {
    pub waker: Option<Waker>,
}

impl SetTimeout {
    pub fn new<F>(delay: usize, callback: F) -> Self
    where
        F: FnOnce() + Send + 'static + Clone,
    {
        let _shared_state = Arc::new(Mutex::new(SharedState { waker: None }));
        let _done = Arc::new(AtomicBool::new(false));

        let done = _done.clone();
        let shared_state = _shared_state.clone();

        thread::spawn(move || {
            thread::sleep(std::time::Duration::from_millis(delay as u64));
            callback();
            done.store(true, std::sync::atomic::Ordering::Relaxed);

            if let Some(waker) = shared_state.lock().unwrap().waker.take() {
                // 아빠! 일어나!
                waker.wake();
            }
        });

        SetTimeout {
            done: _done ,
            shared_state: _shared_state,
        }
    }
}

impl Future for SetTimeout {
    type Output = ();

    fn poll(
        self: std::pin::Pin<&mut Self>,
        context: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Self::Output> {
        if self.done.load(std::sync::atomic::Ordering::Relaxed) {
            std::task::Poll::Ready(())
        } else {
            {
                let mut shared_state = self.shared_state.lock().unwrap();
                shared_state.waker = Some(context.waker().clone()); // 여기서 주입
            }

            std::task::Poll::Pending
        }
    }
}



Executor 구현해보기

직접 만들면서 tokio 같은 executor들이 어떤 식으로 구성되어있는지 한번 헤집어보자.
우선 futures 크레이트를 추가한다.

이게 없으면 보일러플레이트들이 어마무시하게 늘어나고 머리가 아파질 것이다.


적당히 import하고


우선 Executor는 Task를 받을 수 있는 queue의 형태로 구현한다.
동시에 여러 컨텍스트에서 await을 걸고 Task가 날라올 수 있으니, mpsc 채널을 사용했다.


Spawner는 Task Queue에 태스크를 집어넣어주는 역할을 한다.
tokio::spawn 같은 기능을 여기에서 구현하는 것이다.


그리고 task queue 초기화 함수다.

Arc에서 Waker가 생성되고 wake()가 호출되면 Arc의 복사본이 task channel로 전송된다.
그러면 executor는 task을 선택하고 polling해야 한다.
아래는 그에 대한 구현이다.

spawn은 그에 비해 구현이 간단하다.
future가 넘어오면 벅당히 박싱해서 새로운 Arc로 채널에 넘긴다.

Task는 future에 대한 래퍼 타입이다.

여기서 조금 복잡한 제어가 추가됐다.

Waker는 wake가 호출되면 다시 polling될 태스크를 예약하는 일을 담당한다. ArcWake trait을 구현하면 Task에 futures의 waker_ref를 써서 Arc를 Waker로 바꾸는 것이 가능해진다.

다시 Executor로 돌아가서 보면

이 호출을 위해 구현했다고 보면 된다.

아래는 executor에 대한 전체 코드다.

use futures::{
    future::{BoxFuture, FutureExt},
    task::{waker_ref, ArcWake},
};
use std::sync::mpsc::{sync_channel, Receiver, SyncSender};
use std::task::{Context, Waker};

/// executor는 채널에서 Task를 받아 실행하는 프로그램입니다.
pub struct Executor {
    ready_queue: Receiver<Arc<Task>>,
}

impl Executor {
    pub fn run(&self) {
        while let Ok(task) = self.ready_queue.recv() {
            // future를 꺼내와서 아직 완료되지 않은 경우, 완료를 재촉하기 위해 poll을 날립니다.
            let mut future_slot = task.future.lock().unwrap();
            if let Some(mut future) = future_slot.take() {
                // task 자체를 통해 LocalWaker를 만듭니다.
                let waker = waker_ref(&task);
                let context = &mut Context::from_waker(&*waker);
                // `BoxFuture<T>`는 `Pin<Box<dyn Future<Output = T> + Send + 'static>>`에 대한 alias
                if let std::task::Poll::Pending = future.as_mut().poll(context) {
                    // future 완료되지 않았다면 다시 task에 돌려둡니다.
                    *future_slot = Some(future);
                }
            }
        }
    }
}

/// Spawner는 새로운 future들을 task channel에 생성합니다.
#[derive(Clone)]
pub struct Spawner {
    task_sender: SyncSender<Arc<Task>>,
}

impl Spawner {
    fn spawn(&self, future: impl Future<Output = ()> + 'static + Send) {
        let future = future.boxed();
        let task = Arc::new(Task {
            future: Mutex::new(Some(future)),
            task_sender: self.task_sender.clone(),
        });
        self.task_sender.send(task).expect("too many tasks queued");
    }
}

fn new_executor_and_spawner() -> (Executor, Spawner) {
    // 한 번에 큐에 허용되는 최대 작업 수.
    // sync_channel을 단순하기 위해 만든거고, 상용 코드에서는 하지 않습니다.
    const MAX_QUEUED_TASKS: usize = 10_000;
    let (task_sender, ready_queue) = sync_channel(MAX_QUEUED_TASKS);
    (Executor { ready_queue }, Spawner { task_sender })
}

/// "Executor"에게 polling되기 위해 스스로를 reschedule할 수 있는 "Future"입니다.
pub struct Task {
    // 진행 중인 futures들.
    // Mutex는 싱글스레드라서 꼭 필요하지는 않습니다. 문법을 회피하기 위해 넣었고, 대신 UnsafeCell을 써도 됩니다.
    future: Mutex<Option<BoxFuture<'static, ()>>>,

    /// 스스로를 task queue에 넣는 Handle (다른 작업을 위해 일시중지할때 사용)
    task_sender: SyncSender<Arc<Task>>,
}

impl ArcWake for Task {
    fn wake_by_ref(arc_self: &Arc<Self>) {
        // 이 task를 다시 task channel로 전송하여 executor에 의해 다시 폴링되도록 'wake'를 구현한다.
        let cloned = arc_self.clone();
        arc_self
            .task_sender
            .send(cloned)
            .expect("too many tasks queued");
    }
}

그러면 이제 이런 식으로 사용할 수 있다.

fn main() {
    let (executor, spawner) = new_executor_and_spawner();

    spawner.spawn(async {
        let future = SetTimeout::new(1000, || {
            println!("2");
        });
        future.await;
        println!("1");
    });

    // "spawner"를 삭제하여 executor가 더 이상 실행할 작업이 없음을 알고 더 이상 들어오는 작업을 받지 않도록 합니다.
    drop(spawner);

    // 이 executor는 작업 큐가 비어 있을 때까지 실행됩니다. 이렇게하면 "howdy!"를 인쇄하고 일시 중지 한 다음 "done!"을 인쇄합니다.
    executor.run();
}

executor를 만들고 적당히 실행하도록 했다.
실제로 실행되는 시점은 .run()부터다.

이제 저렇게 돌리면

기대한 순서대로 await을 기다리고 동작할 것이다.

근데 매번 저렇게 보일러플레이트를 만드는 것도 꽤 불편한 짓이다.
그래서 보통 executor들은 block_on 같은 함수를 만들어서 쓰거나, 매크로로 흑마법을 써서 좀더 간단해 보이게끔 저걸 숨긴다.

이런식으로 말이다.


참조
https://bertptrs.nl/2023/04/27/how-does-async-rust-work.html
https://github.com/cfsamson/examples-futures/blob/master/src/main.rs
https://namsoocho.github.io/async-book/02_execution/03_wakeups.html
https://docs.rs/futures/latest/futures/task/trait.ArcWake.html
https://rust-lang.github.io/async-book/02_execution/04_executor.html