Sökresultat för

Asynkrona operationer med coroutines

38 minuter i lästid
Jens Riboe
Jens Riboe
Senior/Expert Software Developer
Asynkrona operationer med coroutines

C++-coroutines ger ett elegant sätt att skriva asynkron kod med ett synkront kontrollflöde – men standardbiblioteket lämnar dig i praktiken utan ett färdigt runtime-system. I den här artikeln bygger vi därför ett minimalt, men robust, runtime som kan köra många coroutines samtidigt, återuppta dem på main-thread och samtidigt låta fil-I/O ske på worker-threads i en thread-pool. Resultatet blir en radvis asynkron filläsning via co_await, utan att huvudprogrammet behöver blockera.

I min förra artikel implementerade jag en coroutine-baserad lösning runt en funktion som läste in hela innehållet i en textfil och returnerade detta som en std::string. Själva läsningen utfördes av en separat worker-thread i konventionellt blockerande läge (synkront). Resultatet överfördes sedan till den task-coroutine som var suspenderad. Exekveringen av denna coroutine fortsatte sedan på worker-thread, i stället för att återupptas på main-thread (huvudtråden).

Tankegången var att visa "minsta möjliga kod" för att skriva ett enkelt coroutine-baserat program som utför en asynkron fil-läsning och hanterar resultatet utan att blockera huvudprogrammet. Bara för en sådan enkel uppgift, blev det en hel del programkod. Detta eftersom C++-standardbiblioteket helt saknar support/runtime-system för asynkrona operationer med hjälp av coroutines.

I denna artikel kommer jag att presentera en mer komplett och robust lösning för att hantera asynkrona operationer med hjälp av coroutines i C++. Lösningen bygger på föregående artikel, men introducerar fler avancerade tekniker och mönster för att hantera flera asynkrona operationer samtidigt och hantera dem på ett effektivt sätt.

Här listar jag skillnader och tillägg, jämfört med programkoden i förra artikeln:

  • Läser radvis via co_await, i stället för hela filen på en gång och returnera via co_return.
  • Programmet har flera coroutines igång samtidigt, där de läser var sin textfil.
  • Task-coroutines hanteras av en scheduler så att exekveringen kan återupptas på main-thread, i stället för worker-thread.
  • Läsoperationer postas som små uppgifter (tasks) till en thread-pool.

För att undvika missförstånd, så vill jag klart och tydligt kommunicera att både den synkrona och den asynkrona lösningen i denna artikel inte på något sätt avser att vara prestandamässiga, utan det primära syftet är att illustrera hur man kan bygga upp ett eget runtime-system för asynkrona operationer.


Synkront, fler-trådat program

Låt oss — precis som med förra artikeln — börja med en synkron och fler-trådad lösning, vilken tjänar som referensram för vad vi avser att åstadkomma med coroutines. Programmet utför följande:

  • Skannar en fil-katalog och selekterar textfiler
  • Skapar en task-thread per textfil
  • Varje task räknar rader, ord och tecken, samt överför resultatet via en promise
  • Huvudprogrammet håller reda på en future för respektive task
  • Resultatet hämtas från varje future och skrivs ut

Huvudprogram

Så här ser main() ut:

struct Task {
    fs::path filename{};
    std::promise<Count> result{};
    explicit Task(fs::path filename_) : filename{std::move(filename_)} {}

    void operator()() {
        auto cnt = Count{filename};
        cnt.count();
        result.set_value(cnt);
    }
};

int main(int argc, char* argv[]) {
    auto dir = argc > 1 ? fs::path{argv[1]} : fs::path{"../../data"};
    if (not fs::is_directory(dir)) throw std::invalid_argument{"not a directory: " + dir.string()};

    auto workers = std::vector<std::jthread>{};
    auto results = std::vector<std::future<Count>>{};

    for (auto e : fs::directory_iterator{dir}) {
        if (not e.is_regular_file()) continue;
        if (e.path().extension().string() != ".txt") continue;

        auto task = Task{e.path()};
        results.push_back(task.result.get_future());
        workers.emplace_back(std::jthread{std::move(task)});
    }

    std::println("#) {}", Count::header());
    auto fileno = 1U;
    for (auto& f : results) {
        auto cnt = f.get();
        std::println("{}) {}", fileno++, cnt.to_string());
    }
}

När vi kompilerar och kör programmet kan det se ut så här:

C++> g++ -std=c++23 -Wall -I sync-thread/lib sync-thread/lib/file-reader.cxx \
     sync-thread/app/read-several-files.cxx -o build/read-files
C++> ./build/read-files ./data
#) filename                  lines   words    chars
1) crime-and-punishment.txt  22444  212066  1179323
2) jekyll-and-hyde.txt        2959   29123   161037
3) edgar-allan-poe.txt        9952   94865   596569
4) the-trial.txt              7073   89530   469798
C++> 

Class Count

Klassen Count hanterar att räkna rader, ord och tecken för en enstaka textfil. Så här ser denna klass ut:

namespace fs = std::filesystem;
namespace io = ribomation::io;

class Count {
    fs::path filename{};
    unsigned lines{};
    unsigned words{};
    unsigned chars{};

public:
    explicit Count(fs::path filename_) : filename{std::move(filename_)} {}

    void count() {
        auto reader = io::FileReader{filename};
        while (auto line = reader.next_line()) {
            ++lines;
            words += wordsOf(*line);
            chars += charsOf(*line);
        }
    }

    auto to_string() const -> std::string {
        return std::format("{:25} {:5} {:7} {:8}",
                           filename.filename().string(), lines, words, chars);
    }

    static auto header() -> std::string {
        return std::format("{:25} {:5} {:>7} {:>8}", "filename", "lines", "words", "chars");
    }

private:
    auto charsOf(std::string const& line) -> unsigned {
        return static_cast<unsigned>(line.size());
    }

    auto wordsOf(std::string const& line) -> unsigned {
        auto is_letter = [&line](auto k) {
            return isalpha(static_cast<unsigned char>(line[k]));
        };

        auto result = 0U;
        auto k = std::size_t{0};
        do {
            for (; k < line.size() && not is_letter(k); ++k) {}
            if (k == line.size()) return result;
            ++result;
            for (; k < line.size() && is_letter(k); ++k) {}
        } while (k < line.size());

        return result;
    }
};

Class FileReader

Klassen FileReader liknar i funktionalitet det vi kommer att implementera med coroutines. Så här ser denna ut:

//file-reader.hxx
namespace ribomation::io {
    namespace fs = std::filesystem;

    class FileReader {
        std::ifstream file;
    public:
        explicit FileReader(fs::path const& filename) : file{filename} {
            if (not file) throw std::invalid_argument{"cannot open " + filename.string()};
        }
        auto next_line()  -> std::optional<std::string>;
    };
}

//file-reader.cxx
namespace ribomation::io {
    auto FileReader::next_line()  -> std::optional<std::string> {
        auto line = std::string{};
        line.reserve(256);
        
        if (std::getline(file, line)) return line;
        return std::nullopt;
    }
}

Asynkron radvis läsning av textfil

Den asynkrona versionen gör samma sak som den vi nyss gick igenom, med den stora skillnaden att applikationsprogrammet använder coroutines. Vidare, den stora skillnaden mot min förra artikel är att här visar jag hur man bygger ett enklare runtime-system för att köra ett godtyckligt antal coroutines, samt att I/O-anrop görs på en separat worker-thread tillhörande en thread-pool. Ett annat sätt att uttrycka samma sak är att applikationskoden i coroutines körs på main-thread, medan I/O-operationer körs på en worker-thread.

Coroutine runtime-system

Runtime-systemet består av två primära komponenter: en scheduler som exekverar coroutines och en task/thread-pool som exekverar I/O-operationer. Utöver dessa finns det en del stödkomponenter, samt applikations-orienterade datatyper.

  • CoroScheduler - hanterar/äger alla coroutines
  • TaskPool – pool med worker-threads som utför små I/O-operationer
  • CoroRuntime – behållare för de två ovanstående och API-fasad för applikationen
  • TaskCoroutine - interface-klass för våra coroutines (samma som i förra artikeln)
  • AsyncFileReader - applikations-orienterad datatyp
  • NextLineAwaitable - awaitable som hanteras av ovanstående
  • LivenessToken - stödkomponent som säkerställer att scheduler och task-pool kör så länge det behövs och avslutar när alla coroutines har kört klart
Coroutine runtime architecture

Huvudprogrammet

Det förtjänar att understrykas att vad som är asynkront i denna läsning är applikations programmet, genom att varje fil hanteras av en separat coroutine och att läsning av respektive textrad suspenderar anropade coroutine i väntan på att nästa textrad kan levereras.

Coroutine CountFile

Så här ser vår coroutine-typ ut. Den skapar ett io::AsyncFileReader objekt, vilket tillhandahåller metoden next_line(), som i sin tur returnerar ett awaitable-objekt i samband med att while-loopen gör co_await.

auto CountFile(fs::path filename, Results& res, io::CoroRuntime& rt) 
    -> io::TaskCoroutine<void> 
{
    auto reader = io::AsyncFileReader{filename, rt};
    auto cnt    = Count{filename};
    while (auto line = co_await reader.next_line()) {
        cnt.update(*line);
    }
    res.push(std::move(cnt));
    co_return;
}

Function main()

Main-funktionen tar en filkatalog som program-argument, skapar ett runtime objekt, samt skapar en coroutine per hittad textfil. Varje coroutine överförs till runtime via metoden spawn(). Metoden run() hos runtime kör alla coroutines tills de har terminerat. Efter detta sammanställs delresultaten, från results och skrivs ut. Så här ser main() ut.

int main(int argc, char* argv[]) {
    auto dir = argc > 1 ? fs::path{argv[1]} : fs::path{"../../data"};
    if (not fs::is_directory(dir)) 
        throw std::invalid_argument{"not a directory: " + dir.string()};

    auto runtime = io::CoroRuntime{};
    auto results = Results{};

    auto is_text_file = [](fs::path const& p) {
        return p.extension() == ".txt";
    };

    //find all textfiles and launch a coroutine for each
    for (auto e : fs::directory_iterator{dir}) {
        if (not e.is_regular_file()) continue;
        if (not is_text_file(e.path())) continue;

        runtime.spawn( CountFile(e.path(), results, runtime) );
    }

    runtime.run(); //execute all coroutines until they're done

    std::println("#) {}", Count::header());
    auto fileno = 1U;
    for (auto const& r : results.get()) {
        std::println("{}) {}", fileno++, r.to_string());
    }
}

Efter att ha byggt programmet, kan vi exekvera det och en utskrift kan se ut så här:

#) filename                  lines   words    chars
1) crime-and-punishment.txt  22444  212066  1179323
2) edgar-allan-poe.txt        9952   94865   596569
3) jekyll-and-hyde.txt        2959   29123   161037
4) the-trial.txt              7073   89530   469798

Class Count

Klassen Count är i stort sett samma som för det synkrona programmet och ser ut så här:

class Count {
    fs::path filename;
    unsigned lines = 0;
    unsigned words = 0;
    unsigned chars = 0;

public:
    explicit Count(fs::path const& filename_) : filename{filename_} {}

    void update(std::string const& line) {
        ++lines;
        words += words_of(line);
        chars += chars_of(line);
    }

    auto get_filename() const noexcept -> fs::path const& { return filename; }

    static auto header() -> std::string {
        return std::format("{:25} {:5} {:>7} {:>8}", 
                     "filename", "lines", "words", "chars");
    }

    auto to_string() const -> std::string {
        return std::format("{:25} {:5} {:7} {:8}",
                     filename.filename().string(), lines, words, chars);
    }

private:
    static auto words_of(std::string const& line) -> unsigned {
        auto is_letter = [&line](auto k) {
            return isalpha(static_cast<unsigned char>(line[k]));
        };

        auto result = 0U;
        auto k      = std::size_t{0};
        do {
            for (; k < line.size() && not is_letter(k); ++k) {}
            if (k == line.size()) return result;
            ++result;
            for (; k < line.size() && is_letter(k); ++k) {}
        } while (k < line.size());

        return result;
    }

    static auto chars_of(std::string const& line) -> unsigned {
        return static_cast<unsigned>(line.size());
    }
};

Class Results

Klassen Results används av respektive coroutine för att samla upp delresultaten och ser ut så här:

class Results {
    std::vector<Count> results{};
public:
    void push(Count cnt) {
        results.push_back(std::move(cnt));
    }
    
    auto get() -> std::vector<Count> {
        r::sort(results, [](auto& lhs, auto& rhs) {
            return lhs.get_filename().string() < rhs.get_filename().string();
        });
        return std::move(results);
    }
};

Runtime-systemet

Resten av artikeln diskuterar runtime- och stödsystem, vi behöver för att applikations programmet ska kunna fungera. I stora drag fungerar kontroll-flödet på följande vis:

  1. En coroutine (CountFile) skapas och registreras hos runtime via spawn(). I denna startas coroutinen som sen kör fram till första suspendering, vilket är co_await reader.next_line().
  2. Metoden next_line() returnerar ett NextLineAwaitable objekt.
  3. I metoden await_suspend() skapas en I/O task, som är ansvarig för att läsa nästa textrad från infilen. Detta task-lambda köas upp i task-pool, som en pool av worker-threads.
  4. Efter att task-lambdan läst en textrad, köar den upp anropande coroutinen hos schedulern, via metoden post(), så att denna kan vakna upp efter co_await och ta emot inläst textrad.
  5. Metoden run() hos runtime loopar så länge det finns coro:s att exekvera i schedulers interna readyq.
  6. Det finns två köer i systemet, dels readyq i scheduler, och dels tasks i task-pool. En vital frågeställning är "när har alla coro:s kört färdigt?". Det räcker inte med att titta på ena eller båda dessa köer, eftersom en coroutine eller task kanske exekverar, vilket de gör efter att tagits ut ur en kö, Av detta skäl hanterar vi en s.k. liveness-token, som inkrementerar eller dekrementerar ett atomiskt heltal. På så sätt kan systemet hålla reda på om det finns pågående arbete någonstans i systemet.

Följande bild sammanfattar flödet i systemet och vad jag just punktat upp här ovan.

Coroutine runtime architecture

Class TaskCoroutine<void>

Denna utgör coroutine interface-type och diskuterades i detalj i min förra artikel. Jag har bara ordagrant kopierat in koden från den artikelns programkod.

Class AsyncFileReader

Denna är ansvarig för att läsa radvis från en textfil. För att detta ska fungera med hjälp av co_await, så skapar metoden next_line() en awaitable, vilken tar hand om coroutinen-mekaniken. Så här ser klass-strukturen ut:

class AsyncFileReader {
    std::ifstream file{};
    CoroRuntime& runtime;
public:
    AsyncFileReader(fs::path filename, CoroRuntime& runtime_)
        : file{std::move(filename)}, runtime{runtime_} {
        if (not file) throw std::invalid_argument{"cannot open " + filename.string()};
    }

    class NextLineAwaitable {
        struct State {
            std::optional<std::string> line{};
            std::exception_ptr error{};
            bool eof = false;
        };
        std::shared_ptr<State> state = std::make_shared<State>();

        std::ifstream& file;
        CoroRuntime& runtime;
    public:
        NextLineAwaitable(std::ifstream& file_, CoroRuntime& runtime_) : file{file_}, runtime{runtime_} {}
        bool await_ready() noexcept { return false; }
        void await_suspend(std::coroutine_handle<> invoking_coro);
        auto await_resume() -> std::optional<std::string>;
    };

    auto next_line() -> NextLineAwaitable {
        return NextLineAwaitable{file, runtime};
    }
};

Metoden await_suspend() skapar en I/O task i form av en lambda och köar upp denna i task-pool. När denna task har läst en textrad, köar den upp anropande coroutinen i scheduler, så att denna kan ta emot textraden. Textraden eller ett eventuellt uppkommet fel hanteras av ett State-objekt via en std::shared_ptr eftersom tillståndsobjektet delas mellan task-objekt och awaitable-objekt.

void NextLineAwaitable::await_suspend(std::coroutine_handle<> invoking_coro) {
    auto token = runtime.make_token();

    auto task = [
            s = state,
            &input = file,
            &sched = runtime.scheduler(),
            invoking_coro,
            token = std::move(token)
        ] mutable {
        try {
            //this is where the magic I/O happens
            if (auto line = std::string{}; std::getline(input, line)) {
                s->line = std::move(line);
            } else {
                s->eof = true;
            }
        } catch (...) {
            s->error = std::current_exception();
        }

        sched.post(invoking_coro); //coro! wakie, wakie
    };

    runtime.task_pool().submit(std::move(task)); //read that pesky text line
}

Metoden await_resume() svarar för att överföra resultatet i samband med att anropande coro vaknar upp.

auto NextLineAwaitable::await_resume() -> std::optional<std::string> {
    if (state->error) std::rethrow_exception(state->error);
    if (state->eof) return std::nullopt;
    return std::move(state->line);
}

Class CoroScheduler

Schedulern håller reda vilka coroutine objekt som ska väckas upp. Det är en ganska liten klass, eftersom den hanteras av runtime-objektet.

class CoroScheduler {
    friend class CoroRuntime;
    mutable std::mutex  entry{};
    std::condition_variable not_empty{};
    std::queue<std::coroutine_handle<>> readyq{};
public:
    void post(std::coroutine_handle<> coro) {
        {
            auto _ = std::lock_guard{entry};
            readyq.push(coro);
        }
        not_empty.notify_one();
    }
private:
    bool is_empty() const { return readyq.empty(); }
};

Class TaskPool

Task-pool är en tämligen enkel thread-pool och håller reda på vilka I/O task objekt som ska exekveras. Det finns en task-kö och en bunt worker-threads. Dessa ligger i en loop och plockar ut en task för exekvering eller väntar på att det dyker upp ett nytt jobb.

class TaskPool {
    std::mutex entry{};
    std::condition_variable not_empty{};
    std::queue<std::move_only_function<void()>> tasks{};
    
    std::vector<std::thread> workers{};
    bool shutting_down = false;
public:
    explicit TaskPool(unsigned num_workers = std::thread::hardware_concurrency()) {
        if (num_workers == 0U) num_workers = 1U;
        workers.reserve(num_workers);
        for (auto k = 0U; k < num_workers; ++k)
            workers.emplace_back([this] { worker_loop(); });
    }
    ~TaskPool() {
        {
            auto _ = std::lock_guard{entry};
            shutting_down = true;
        }
        not_empty.notify_all();
        for (auto& w : workers) w.join();
    }

    template<typename TaskType>
    void submit(TaskType&& task) {
        {
            auto _ = std::lock_guard{entry};
            tasks.emplace(std::forward<TaskType>(task));
        }
        not_empty.notify_one();
    }
private:
    void worker_loop();
};

Varje worker-thread snurrar runt i följande loop:

void TaskPool::worker_loop() {
    while (true) {
        auto task = std::move_only_function<void()>{};

        { //wait for task queue to become non-empty
            auto guard = std::unique_lock{entry};
            not_empty.wait(guard, [this] {
                return shutting_down || not tasks.empty();
            });
            if (shutting_down && tasks.empty()) return;

            //grab one task
            task = std::move(tasks.front());
            tasks.pop();
        }

        task(); //run the task
    }
}

Class CoroRuntime

Klassen Runtime håller reda på de olika delarna och driver systemet framåt. Ett coroutine objekt registreras hos runtime via metoden spawn() och läggs in i en tabell (spawned). Metoden run() driver exekveringen framåt, genom att plocka ut coroutine objekt från scheduler och anropa dess resume() metod. Här följer klass-strukturen för runtime.

struct SpawnedEntry {
    LivenessToken token;
    TaskCoroutine<void> task;
};

class CoroRuntime {
    friend class LivenessToken;
    using SpawnedKeyType = void*;

    CoroScheduler sched{};
    TaskPool pool{};
    std::atomic<std::size_t> liveness_count{0};

    std::unordered_map<SpawnedKeyType, SpawnedEntry> spawned{};
    std::mutex entry{};

    bool should_wake() const {
        return not sched.is_empty() || liveness_count.load() == 0U;
    }
    bool no_more_work() const {
        return liveness_count.load() == 0U && sched.is_empty();
    }
public:
    explicit CoroRuntime(unsigned num_workers = std::thread::hardware_concurrency())
        : pool{num_workers} {}

    auto scheduler() -> CoroScheduler& { return sched; }
    auto task_pool() -> TaskPool& { return pool; }
    auto make_token() -> LivenessToken { return LivenessToken{this}; }

    void spawn(TaskCoroutine<void>&& task);
    void run();
private:
    void acquire_liveness() {
        liveness_count.fetch_add(1U, std::memory_order_relaxed);
    }
    void release_liveness() {
        liveness_count.fetch_sub(1U, std::memory_order_relaxed);
        sched.not_empty.notify_all();
    }
};

Metoden spawn() ser ut så här:

void CoroRuntime::spawn(TaskCoroutine<void>&& task) {
    auto h = task.get_handle();
    auto key = static_cast<SpawnedKeyType>(h.address());

    {
        auto _ = std::lock_guard{entry};
        auto token = LivenessToken{this};

        auto [iter, inserted] = spawned.emplace(key, 
                                SpawnedEntry{std::move(token), std::move(task)});
        if (not inserted) return;
    }

    if (h && not h.done()) {
        h.resume();
    }
    if (h.done()) {
        auto _ = std::lock_guard{entry};
        spawned.erase(key);
    }
}

Metoden run() ser ut så här:

void CoroRuntime::run() {
    while (true) {
        auto handle = std::coroutine_handle<>{};

        {
            auto guard = std::unique_lock{sched.entry};
            sched.not_empty.wait(guard, [this] { return should_wake(); });
            if (no_more_work()) return; //when liveness_count==0

            handle = sched.readyq.front();
            sched.readyq.pop();
        }

        handle.resume();
        if (handle.done()) {
            auto key = static_cast<SpawnedKeyType>(handle.address());
            auto _ = std::lock_guard{entry};
            spawned.erase(key);
        }
    }
}

Class LivenessToken

Ett liveness-token fungerar som ett RAII-objekt, genom att öka eller minska den räknare i runtime som håller reda på hur många aktiviteter som är på gång i systemet.

class LivenessToken {
    CoroRuntime* runtime = nullptr;
public:
    LivenessToken() = default;
    explicit LivenessToken(CoroRuntime* runtime);
    ~LivenessToken();

    LivenessToken(LivenessToken const&) = delete;
    auto operator=(LivenessToken const&) -> LivenessToken& = delete;

    LivenessToken(LivenessToken&& rhs) noexcept
        : runtime{std::exchange(rhs.runtime, nullptr)} {}

    auto operator=(LivenessToken&& rhs) noexcept -> LivenessToken& {
        if (this != &rhs) {
            this->~LivenessToken();
            runtime = std::exchange(rhs.runtime, nullptr);
        }
        return *this;
    }
};

inline LivenessToken::LivenessToken(CoroRuntime* runtime_) : runtime{runtime_} {
    runtime->acquire_liveness();
}

inline LivenessToken::~LivenessToken() {
    if (runtime != nullptr) runtime->release_liveness();
}

Sammanfattning

I den här artikeln har vi byggt ett litet, men komplett, runtime-system för asynkrona operationer med C++-coroutines:

  • Du har sett hur en scheduler kan återuppta coroutines på main-thread, så att applikationslogiken blir enkel och deterministisk.
  • Du har sett hur en thread-pool kan utföra I/O på worker-threads, och sedan posta tillbaka coroutine-handles för fortsatt exekvering.
  • Du har sett hur ett awaitable (här: NextLineAwaitable) kan kapsla in både I/O-task, felhantering och resultatöverföring.
  • Du har sett varför en “bara-titta-på-köerna”-strategi inte räcker, och hur en liveness-token gör att runtime kan avsluta korrekt.

Det viktigaste att ta med sig är att C++-coroutines är ett språkstöd för kontrollflöde – men att du ofta behöver bygga (eller välja) ett runtime-system som står för schemaläggning, väntelogik och integration mot I/O. När du väl har de byggstenarna på plats kan du skriva asynkron kod som är både tydlig och lätt att vidareutveckla.