Asynkrona operationer med coroutines
C++-coroutines ger ett elegant sätt att skriva asynkron kod med ett synkront kontrollflöde – men standardbiblioteket lämnar dig
i praktiken utan ett färdigt runtime-system. I den här artikeln bygger vi därför ett minimalt, men robust, runtime som kan
köra många coroutines samtidigt, återuppta dem på main-thread och samtidigt låta fil-I/O ske på worker-threads i en thread-pool.
Resultatet blir en radvis asynkron filläsning via co_await, utan att huvudprogrammet behöver blockera.
I min förra artikel implementerade jag en coroutine-baserad lösning runt en funktion som
läste in hela innehållet i en textfil och returnerade detta som en std::string. Själva
läsningen utfördes av en separat worker-thread i konventionellt blockerande läge (synkront).
Resultatet överfördes sedan till den task-coroutine som var suspenderad. Exekveringen
av denna coroutine fortsatte sedan på worker-thread, i stället för att återupptas på main-thread (huvudtråden).
Tankegången var att visa "minsta möjliga kod" för att skriva ett enkelt coroutine-baserat program som utför en asynkron fil-läsning och hanterar resultatet utan att blockera huvudprogrammet. Bara för en sådan enkel uppgift, blev det en hel del programkod. Detta eftersom C++-standardbiblioteket helt saknar support/runtime-system för asynkrona operationer med hjälp av coroutines.
I denna artikel kommer jag att presentera en mer komplett och robust lösning för att hantera asynkrona operationer med hjälp av coroutines i C++. Lösningen bygger på föregående artikel, men introducerar fler avancerade tekniker och mönster för att hantera flera asynkrona operationer samtidigt och hantera dem på ett effektivt sätt.
Här listar jag skillnader och tillägg, jämfört med programkoden i förra artikeln:
- Läser radvis via
co_await, i stället för hela filen på en gång och returnera viaco_return. - Programmet har flera coroutines igång samtidigt, där de läser var sin textfil.
- Task-coroutines hanteras av en scheduler så att exekveringen kan återupptas på main-thread, i stället för worker-thread.
- Läsoperationer postas som små uppgifter (tasks) till en thread-pool.
För att undvika missförstånd, så vill jag klart och tydligt kommunicera att både den synkrona och den asynkrona lösningen i denna artikel inte på något sätt avser att vara prestandamässiga, utan det primära syftet är att illustrera hur man kan bygga upp ett eget runtime-system för asynkrona operationer.
Synkront, fler-trådat program
Låt oss — precis som med förra artikeln — börja med en synkron och fler-trådad lösning, vilken tjänar som referensram för vad vi avser att åstadkomma med coroutines. Programmet utför följande:
- Skannar en fil-katalog och selekterar textfiler
- Skapar en task-thread per textfil
- Varje task räknar rader, ord och tecken, samt överför resultatet via en promise
- Huvudprogrammet håller reda på en future för respektive task
- Resultatet hämtas från varje future och skrivs ut
Huvudprogram
Så här ser main() ut:
struct Task {
fs::path filename{};
std::promise<Count> result{};
explicit Task(fs::path filename_) : filename{std::move(filename_)} {}
void operator()() {
auto cnt = Count{filename};
cnt.count();
result.set_value(cnt);
}
};
int main(int argc, char* argv[]) {
auto dir = argc > 1 ? fs::path{argv[1]} : fs::path{"../../data"};
if (not fs::is_directory(dir)) throw std::invalid_argument{"not a directory: " + dir.string()};
auto workers = std::vector<std::jthread>{};
auto results = std::vector<std::future<Count>>{};
for (auto e : fs::directory_iterator{dir}) {
if (not e.is_regular_file()) continue;
if (e.path().extension().string() != ".txt") continue;
auto task = Task{e.path()};
results.push_back(task.result.get_future());
workers.emplace_back(std::jthread{std::move(task)});
}
std::println("#) {}", Count::header());
auto fileno = 1U;
for (auto& f : results) {
auto cnt = f.get();
std::println("{}) {}", fileno++, cnt.to_string());
}
}
När vi kompilerar och kör programmet kan det se ut så här:
C++> g++ -std=c++23 -Wall -I sync-thread/lib sync-thread/lib/file-reader.cxx \
sync-thread/app/read-several-files.cxx -o build/read-files
C++> ./build/read-files ./data
#) filename lines words chars
1) crime-and-punishment.txt 22444 212066 1179323
2) jekyll-and-hyde.txt 2959 29123 161037
3) edgar-allan-poe.txt 9952 94865 596569
4) the-trial.txt 7073 89530 469798
C++>
Class Count
Klassen Count hanterar att räkna rader, ord och tecken för en enstaka textfil.
Så här ser denna klass ut:
namespace fs = std::filesystem;
namespace io = ribomation::io;
class Count {
fs::path filename{};
unsigned lines{};
unsigned words{};
unsigned chars{};
public:
explicit Count(fs::path filename_) : filename{std::move(filename_)} {}
void count() {
auto reader = io::FileReader{filename};
while (auto line = reader.next_line()) {
++lines;
words += wordsOf(*line);
chars += charsOf(*line);
}
}
auto to_string() const -> std::string {
return std::format("{:25} {:5} {:7} {:8}",
filename.filename().string(), lines, words, chars);
}
static auto header() -> std::string {
return std::format("{:25} {:5} {:>7} {:>8}", "filename", "lines", "words", "chars");
}
private:
auto charsOf(std::string const& line) -> unsigned {
return static_cast<unsigned>(line.size());
}
auto wordsOf(std::string const& line) -> unsigned {
auto is_letter = [&line](auto k) {
return isalpha(static_cast<unsigned char>(line[k]));
};
auto result = 0U;
auto k = std::size_t{0};
do {
for (; k < line.size() && not is_letter(k); ++k) {}
if (k == line.size()) return result;
++result;
for (; k < line.size() && is_letter(k); ++k) {}
} while (k < line.size());
return result;
}
};
Class FileReader
Klassen FileReader liknar i funktionalitet det vi kommer att implementera med coroutines.
Så här ser denna ut:
//file-reader.hxx
namespace ribomation::io {
namespace fs = std::filesystem;
class FileReader {
std::ifstream file;
public:
explicit FileReader(fs::path const& filename) : file{filename} {
if (not file) throw std::invalid_argument{"cannot open " + filename.string()};
}
auto next_line() -> std::optional<std::string>;
};
}
//file-reader.cxx
namespace ribomation::io {
auto FileReader::next_line() -> std::optional<std::string> {
auto line = std::string{};
line.reserve(256);
if (std::getline(file, line)) return line;
return std::nullopt;
}
}
Asynkron radvis läsning av textfil
Den asynkrona versionen gör samma sak som den vi nyss gick igenom, med den stora skillnaden att applikationsprogrammet använder coroutines. Vidare, den stora skillnaden mot min förra artikel är att här visar jag hur man bygger ett enklare runtime-system för att köra ett godtyckligt antal coroutines, samt att I/O-anrop görs på en separat worker-thread tillhörande en thread-pool. Ett annat sätt att uttrycka samma sak är att applikationskoden i coroutines körs på main-thread, medan I/O-operationer körs på en worker-thread.
Coroutine runtime-system
Runtime-systemet består av två primära komponenter: en scheduler som exekverar coroutines och en task/thread-pool som exekverar I/O-operationer. Utöver dessa finns det en del stödkomponenter, samt applikations-orienterade datatyper.
CoroScheduler- hanterar/äger alla coroutinesTaskPool– pool med worker-threads som utför små I/O-operationerCoroRuntime– behållare för de två ovanstående och API-fasad för applikationenTaskCoroutine- interface-klass för våra coroutines (samma som i förra artikeln)AsyncFileReader- applikations-orienterad datatypNextLineAwaitable- awaitable som hanteras av ovanståendeLivenessToken- stödkomponent som säkerställer att scheduler och task-pool kör så länge det behövs och avslutar när alla coroutines har kört klart
Huvudprogrammet
Det förtjänar att understrykas att vad som är asynkront i denna läsning är applikations programmet, genom att varje fil hanteras av en separat coroutine och att läsning av respektive textrad suspenderar anropade coroutine i väntan på att nästa textrad kan levereras.
Coroutine CountFile
Så här ser vår coroutine-typ ut. Den skapar ett io::AsyncFileReader objekt, vilket
tillhandahåller metoden next_line(), som i sin tur returnerar ett awaitable-objekt
i samband med att while-loopen gör co_await.
auto CountFile(fs::path filename, Results& res, io::CoroRuntime& rt)
-> io::TaskCoroutine<void>
{
auto reader = io::AsyncFileReader{filename, rt};
auto cnt = Count{filename};
while (auto line = co_await reader.next_line()) {
cnt.update(*line);
}
res.push(std::move(cnt));
co_return;
}
Function main()
Main-funktionen tar en filkatalog som program-argument, skapar ett runtime objekt,
samt skapar en coroutine per hittad textfil. Varje coroutine överförs till runtime
via metoden spawn(). Metoden run() hos runtime kör alla coroutines tills de har
terminerat. Efter detta sammanställs delresultaten, från results och skrivs ut. Så
här ser main() ut.
int main(int argc, char* argv[]) {
auto dir = argc > 1 ? fs::path{argv[1]} : fs::path{"../../data"};
if (not fs::is_directory(dir))
throw std::invalid_argument{"not a directory: " + dir.string()};
auto runtime = io::CoroRuntime{};
auto results = Results{};
auto is_text_file = [](fs::path const& p) {
return p.extension() == ".txt";
};
//find all textfiles and launch a coroutine for each
for (auto e : fs::directory_iterator{dir}) {
if (not e.is_regular_file()) continue;
if (not is_text_file(e.path())) continue;
runtime.spawn( CountFile(e.path(), results, runtime) );
}
runtime.run(); //execute all coroutines until they're done
std::println("#) {}", Count::header());
auto fileno = 1U;
for (auto const& r : results.get()) {
std::println("{}) {}", fileno++, r.to_string());
}
}
Efter att ha byggt programmet, kan vi exekvera det och en utskrift kan se ut så här:
#) filename lines words chars
1) crime-and-punishment.txt 22444 212066 1179323
2) edgar-allan-poe.txt 9952 94865 596569
3) jekyll-and-hyde.txt 2959 29123 161037
4) the-trial.txt 7073 89530 469798
Class Count
Klassen Count är i stort sett samma som för det synkrona programmet och ser ut så här:
class Count {
fs::path filename;
unsigned lines = 0;
unsigned words = 0;
unsigned chars = 0;
public:
explicit Count(fs::path const& filename_) : filename{filename_} {}
void update(std::string const& line) {
++lines;
words += words_of(line);
chars += chars_of(line);
}
auto get_filename() const noexcept -> fs::path const& { return filename; }
static auto header() -> std::string {
return std::format("{:25} {:5} {:>7} {:>8}",
"filename", "lines", "words", "chars");
}
auto to_string() const -> std::string {
return std::format("{:25} {:5} {:7} {:8}",
filename.filename().string(), lines, words, chars);
}
private:
static auto words_of(std::string const& line) -> unsigned {
auto is_letter = [&line](auto k) {
return isalpha(static_cast<unsigned char>(line[k]));
};
auto result = 0U;
auto k = std::size_t{0};
do {
for (; k < line.size() && not is_letter(k); ++k) {}
if (k == line.size()) return result;
++result;
for (; k < line.size() && is_letter(k); ++k) {}
} while (k < line.size());
return result;
}
static auto chars_of(std::string const& line) -> unsigned {
return static_cast<unsigned>(line.size());
}
};
Class Results
Klassen Results används av respektive coroutine för att samla upp delresultaten och ser ut så här:
class Results {
std::vector<Count> results{};
public:
void push(Count cnt) {
results.push_back(std::move(cnt));
}
auto get() -> std::vector<Count> {
r::sort(results, [](auto& lhs, auto& rhs) {
return lhs.get_filename().string() < rhs.get_filename().string();
});
return std::move(results);
}
};
Runtime-systemet
Resten av artikeln diskuterar runtime- och stödsystem, vi behöver för att applikations programmet ska kunna fungera. I stora drag fungerar kontroll-flödet på följande vis:
- En coroutine (
CountFile) skapas och registreras hos runtime viaspawn(). I denna startas coroutinen som sen kör fram till första suspendering, vilket ärco_await reader.next_line(). - Metoden
next_line()returnerar ettNextLineAwaitableobjekt. - I metoden
await_suspend()skapas en I/O task, som är ansvarig för att läsa nästa textrad från infilen. Detta task-lambda köas upp i task-pool, som en pool av worker-threads. - Efter att task-lambdan läst en textrad, köar den upp anropande coroutinen hos schedulern, via metoden post(),
så att denna kan vakna upp efter
co_awaitoch ta emot inläst textrad. - Metoden
run()hos runtime loopar så länge det finns coro:s att exekvera i schedulers interna readyq. - Det finns två köer i systemet, dels
readyqi scheduler, och delstasksi task-pool. En vital frågeställning är "när har alla coro:s kört färdigt?". Det räcker inte med att titta på ena eller båda dessa köer, eftersom en coroutine eller task kanske exekverar, vilket de gör efter att tagits ut ur en kö, Av detta skäl hanterar vi en s.k. liveness-token, som inkrementerar eller dekrementerar ett atomiskt heltal. På så sätt kan systemet hålla reda på om det finns pågående arbete någonstans i systemet.
Följande bild sammanfattar flödet i systemet och vad jag just punktat upp här ovan.
Class TaskCoroutine<void>
Denna utgör coroutine interface-type och diskuterades i detalj i min förra artikel. Jag har bara ordagrant kopierat in koden från den artikelns programkod.
Class AsyncFileReader
Denna är ansvarig för att läsa radvis från en textfil. För att detta ska fungera med hjälp av co_await, så skapar metoden next_line() en awaitable, vilken tar hand om coroutinen-mekaniken. Så här ser klass-strukturen ut:
class AsyncFileReader {
std::ifstream file{};
CoroRuntime& runtime;
public:
AsyncFileReader(fs::path filename, CoroRuntime& runtime_)
: file{std::move(filename)}, runtime{runtime_} {
if (not file) throw std::invalid_argument{"cannot open " + filename.string()};
}
class NextLineAwaitable {
struct State {
std::optional<std::string> line{};
std::exception_ptr error{};
bool eof = false;
};
std::shared_ptr<State> state = std::make_shared<State>();
std::ifstream& file;
CoroRuntime& runtime;
public:
NextLineAwaitable(std::ifstream& file_, CoroRuntime& runtime_) : file{file_}, runtime{runtime_} {}
bool await_ready() noexcept { return false; }
void await_suspend(std::coroutine_handle<> invoking_coro);
auto await_resume() -> std::optional<std::string>;
};
auto next_line() -> NextLineAwaitable {
return NextLineAwaitable{file, runtime};
}
};
Metoden await_suspend() skapar en I/O task i form av en lambda och köar upp denna i task-pool.
När denna task har läst en textrad, köar den upp anropande coroutinen i scheduler, så att denna kan
ta emot textraden. Textraden eller ett eventuellt uppkommet fel hanteras av ett State-objekt via
en std::shared_ptr eftersom tillståndsobjektet delas mellan task-objekt och awaitable-objekt.
void NextLineAwaitable::await_suspend(std::coroutine_handle<> invoking_coro) {
auto token = runtime.make_token();
auto task = [
s = state,
&input = file,
&sched = runtime.scheduler(),
invoking_coro,
token = std::move(token)
] mutable {
try {
//this is where the magic I/O happens
if (auto line = std::string{}; std::getline(input, line)) {
s->line = std::move(line);
} else {
s->eof = true;
}
} catch (...) {
s->error = std::current_exception();
}
sched.post(invoking_coro); //coro! wakie, wakie
};
runtime.task_pool().submit(std::move(task)); //read that pesky text line
}
Metoden await_resume() svarar för att överföra resultatet i samband med att anropande coro
vaknar upp.
auto NextLineAwaitable::await_resume() -> std::optional<std::string> {
if (state->error) std::rethrow_exception(state->error);
if (state->eof) return std::nullopt;
return std::move(state->line);
}
Class CoroScheduler
Schedulern håller reda vilka coroutine objekt som ska väckas upp. Det är en ganska liten klass, eftersom den hanteras av runtime-objektet.
class CoroScheduler {
friend class CoroRuntime;
mutable std::mutex entry{};
std::condition_variable not_empty{};
std::queue<std::coroutine_handle<>> readyq{};
public:
void post(std::coroutine_handle<> coro) {
{
auto _ = std::lock_guard{entry};
readyq.push(coro);
}
not_empty.notify_one();
}
private:
bool is_empty() const { return readyq.empty(); }
};
Class TaskPool
Task-pool är en tämligen enkel thread-pool och håller reda på vilka I/O task objekt som ska exekveras. Det finns en task-kö och en bunt worker-threads. Dessa ligger i en loop och plockar ut en task för exekvering eller väntar på att det dyker upp ett nytt jobb.
class TaskPool {
std::mutex entry{};
std::condition_variable not_empty{};
std::queue<std::move_only_function<void()>> tasks{};
std::vector<std::thread> workers{};
bool shutting_down = false;
public:
explicit TaskPool(unsigned num_workers = std::thread::hardware_concurrency()) {
if (num_workers == 0U) num_workers = 1U;
workers.reserve(num_workers);
for (auto k = 0U; k < num_workers; ++k)
workers.emplace_back([this] { worker_loop(); });
}
~TaskPool() {
{
auto _ = std::lock_guard{entry};
shutting_down = true;
}
not_empty.notify_all();
for (auto& w : workers) w.join();
}
template<typename TaskType>
void submit(TaskType&& task) {
{
auto _ = std::lock_guard{entry};
tasks.emplace(std::forward<TaskType>(task));
}
not_empty.notify_one();
}
private:
void worker_loop();
};
Varje worker-thread snurrar runt i följande loop:
void TaskPool::worker_loop() {
while (true) {
auto task = std::move_only_function<void()>{};
{ //wait for task queue to become non-empty
auto guard = std::unique_lock{entry};
not_empty.wait(guard, [this] {
return shutting_down || not tasks.empty();
});
if (shutting_down && tasks.empty()) return;
//grab one task
task = std::move(tasks.front());
tasks.pop();
}
task(); //run the task
}
}
Class CoroRuntime
Klassen Runtime håller reda på de olika delarna och driver systemet framåt. Ett coroutine
objekt registreras hos runtime via metoden spawn() och läggs in i en tabell (spawned).
Metoden run() driver exekveringen framåt, genom att plocka ut coroutine objekt från
scheduler och anropa dess resume() metod. Här följer klass-strukturen för runtime.
struct SpawnedEntry {
LivenessToken token;
TaskCoroutine<void> task;
};
class CoroRuntime {
friend class LivenessToken;
using SpawnedKeyType = void*;
CoroScheduler sched{};
TaskPool pool{};
std::atomic<std::size_t> liveness_count{0};
std::unordered_map<SpawnedKeyType, SpawnedEntry> spawned{};
std::mutex entry{};
bool should_wake() const {
return not sched.is_empty() || liveness_count.load() == 0U;
}
bool no_more_work() const {
return liveness_count.load() == 0U && sched.is_empty();
}
public:
explicit CoroRuntime(unsigned num_workers = std::thread::hardware_concurrency())
: pool{num_workers} {}
auto scheduler() -> CoroScheduler& { return sched; }
auto task_pool() -> TaskPool& { return pool; }
auto make_token() -> LivenessToken { return LivenessToken{this}; }
void spawn(TaskCoroutine<void>&& task);
void run();
private:
void acquire_liveness() {
liveness_count.fetch_add(1U, std::memory_order_relaxed);
}
void release_liveness() {
liveness_count.fetch_sub(1U, std::memory_order_relaxed);
sched.not_empty.notify_all();
}
};
Metoden spawn() ser ut så här:
void CoroRuntime::spawn(TaskCoroutine<void>&& task) {
auto h = task.get_handle();
auto key = static_cast<SpawnedKeyType>(h.address());
{
auto _ = std::lock_guard{entry};
auto token = LivenessToken{this};
auto [iter, inserted] = spawned.emplace(key,
SpawnedEntry{std::move(token), std::move(task)});
if (not inserted) return;
}
if (h && not h.done()) {
h.resume();
}
if (h.done()) {
auto _ = std::lock_guard{entry};
spawned.erase(key);
}
}
Metoden run() ser ut så här:
void CoroRuntime::run() {
while (true) {
auto handle = std::coroutine_handle<>{};
{
auto guard = std::unique_lock{sched.entry};
sched.not_empty.wait(guard, [this] { return should_wake(); });
if (no_more_work()) return; //when liveness_count==0
handle = sched.readyq.front();
sched.readyq.pop();
}
handle.resume();
if (handle.done()) {
auto key = static_cast<SpawnedKeyType>(handle.address());
auto _ = std::lock_guard{entry};
spawned.erase(key);
}
}
}
Class LivenessToken
Ett liveness-token fungerar som ett RAII-objekt, genom att öka eller minska den räknare i runtime som håller reda på hur många aktiviteter som är på gång i systemet.
class LivenessToken {
CoroRuntime* runtime = nullptr;
public:
LivenessToken() = default;
explicit LivenessToken(CoroRuntime* runtime);
~LivenessToken();
LivenessToken(LivenessToken const&) = delete;
auto operator=(LivenessToken const&) -> LivenessToken& = delete;
LivenessToken(LivenessToken&& rhs) noexcept
: runtime{std::exchange(rhs.runtime, nullptr)} {}
auto operator=(LivenessToken&& rhs) noexcept -> LivenessToken& {
if (this != &rhs) {
this->~LivenessToken();
runtime = std::exchange(rhs.runtime, nullptr);
}
return *this;
}
};
inline LivenessToken::LivenessToken(CoroRuntime* runtime_) : runtime{runtime_} {
runtime->acquire_liveness();
}
inline LivenessToken::~LivenessToken() {
if (runtime != nullptr) runtime->release_liveness();
}
Sammanfattning
I den här artikeln har vi byggt ett litet, men komplett, runtime-system för asynkrona operationer med C++-coroutines:
- Du har sett hur en scheduler kan återuppta coroutines på main-thread, så att applikationslogiken blir enkel och deterministisk.
- Du har sett hur en thread-pool kan utföra I/O på worker-threads, och sedan posta tillbaka coroutine-handles för fortsatt exekvering.
- Du har sett hur ett awaitable (här: NextLineAwaitable) kan kapsla in både I/O-task, felhantering och resultatöverföring.
- Du har sett varför en “bara-titta-på-köerna”-strategi inte räcker, och hur en liveness-token gör att runtime kan avsluta korrekt.
Det viktigaste att ta med sig är att C++-coroutines är ett språkstöd för kontrollflöde – men att du ofta behöver bygga (eller välja) ett runtime-system som står för schemaläggning, väntelogik och integration mot I/O. När du väl har de byggstenarna på plats kan du skriva asynkron kod som är både tydlig och lätt att vidareutveckla.