Parallel Fetch

Running multiple operations concurrently with when_all.

What You Will Learn

  • Using when_all to run tasks in parallel

  • Structured bindings for results

  • Error propagation in concurrent tasks

Prerequisites

Source Code

#include <boost/capy.hpp>
#include <iostream>
#include <latch>
#include <string>
#include <vector>

namespace capy = boost::capy;

// Simulated async operations
capy::task<int> fetch_user_id(std::string username)
{
    std::cout << "Fetching user ID for: " << username << "\n";
    // In real code: co_await http_get("/users/" + username);
    co_return static_cast<int>(username.length()) * 100;  // Fake ID
}

capy::task<std::string> fetch_user_name(int id)
{
    std::cout << "Fetching name for user ID: " << id << "\n";
    co_return "User" + std::to_string(id);
}

capy::task<int> fetch_order_count(int user_id)
{
    std::cout << "Fetching order count for user: " << user_id << "\n";
    co_return user_id / 10;  // Fake count
}

capy::task<double> fetch_account_balance(int user_id)
{
    std::cout << "Fetching balance for user: " << user_id << "\n";
    co_return user_id * 1.5;  // Fake balance
}

// Fetch all user data in parallel using variadic when_all.
// Heterogeneous return types are flattened into the result.
capy::task<> fetch_user_dashboard(std::string username)
{
    std::cout << "\n=== Fetching dashboard for: " << username << " ===\n";

    // First, get the user ID (needed for other queries)
    int user_id = co_await fetch_user_id(username);
    std::cout << "Got user ID: " << user_id << "\n\n";

    // when_all requires io_task children. Wrap plain tasks:
    std::cout << "Starting parallel fetches...\n";

    auto wrap = [](auto inner) -> capy::io_task<decltype(inner.await_resume())> {
        co_return capy::io_result<decltype(inner.await_resume())>{
            {}, co_await std::move(inner)};
    };

    auto [ec, name, orders, balance] = co_await capy::when_all(
        wrap(fetch_user_name(user_id)),
        wrap(fetch_order_count(user_id)),
        wrap(fetch_account_balance(user_id)));

    std::cout << "\nDashboard results:\n";
    std::cout << "  Name: " << name << "\n";
    std::cout << "  Orders: " << orders << "\n";
    std::cout << "  Balance: $" << balance << "\n";
}

// Example with void tasks
capy::io_task<> log_access(std::string resource)
{
    std::cout << "Logging access to: " << resource << "\n";
    co_return capy::io_result<>{};
}

capy::io_task<> update_metrics(std::string metric)
{
    std::cout << "Updating metric: " << metric << "\n";
    co_return capy::io_result<>{};
}

capy::task<std::string> fetch_with_side_effects()
{
    std::cout << "\n=== Fetch with side effects ===\n";

    auto r = co_await capy::when_all(
        log_access("api/data"),
        update_metrics("api_calls"));
    if (r.ec)
        co_return "error";

    auto data = co_await fetch_user_name(42);

    std::cout << "Data: " << data << "\n";
    co_return data;
}

// Error handling example
capy::io_task<int> might_fail(bool should_fail, std::string name)
{
    std::cout << "Task " << name << " starting\n";

    if (should_fail)
    {
        throw std::runtime_error(name + " failed!");
    }

    std::cout << "Task " << name << " completed\n";
    co_return capy::io_result<int>{{}, 42};
}

capy::task<> demonstrate_error_handling()
{
    std::cout << "\n=== Error handling ===\n";

    try
    {
        auto [ec2, a, b, c] = co_await capy::when_all(
            might_fail(false, "A"),
            might_fail(true, "B"),   // This one fails
            might_fail(false, "C"));
        std::cout << "All succeeded: " << a << ", "
                  << b << ", " << c << "\n";
    }
    catch (std::runtime_error const& e)
    {
        std::cout << "Caught error: " << e.what() << "\n";
        // Note: when_all waits for all tasks to complete (or respond to stop)
        // before propagating the first exception
    }
}

int main()
{
    capy::thread_pool pool;
    std::latch done(3);  // std::latch - wait for 3 tasks

    // Completion handlers signal the latch when each task finishes
    // Use generic lambda to accept any result type (or no result for task<void>)
    auto on_complete = [&done](auto&&...) { done.count_down(); };
    auto on_error = [&done](std::exception_ptr) { done.count_down(); };

    capy::run_async(pool.get_executor(), on_complete, on_error)(fetch_user_dashboard("alice"));
    capy::run_async(pool.get_executor(), on_complete, on_error)(fetch_with_side_effects());
    capy::run_async(pool.get_executor(), on_complete, on_error)(demonstrate_error_handling());

    done.wait();  // Block until all tasks complete
    return 0;
}

Build

add_executable(parallel_fetch parallel_fetch.cpp)
target_link_libraries(parallel_fetch PRIVATE capy)

Walkthrough

Basic when_all

auto [ec, name, orders, balance] = co_await capy::when_all(
    wrap(fetch_user_name(user_id)),
    wrap(fetch_order_count(user_id)),
    wrap(fetch_account_balance(user_id)));

when_all requires children returning io_result, so plain tasks are wrapped. All three run concurrently. The result is io_result<std::string, int, double>, a single ec plus the flattened payloads in input order.

Void io_tasks

auto r = co_await capy::when_all(
    log_access("api/data"),
    update_metrics("api_calls"));
if (r.ec)
    co_return "error";

io_task<> children return io_result<> (just an error code, no payload). Check r.ec to detect failure.

Error Propagation

try
{
    auto [ec2, a, b, c] = co_await capy::when_all(
        might_fail(false, "A"),
        might_fail(true, "B"),
        might_fail(false, "C"));
}
catch (...)
{
    // First exception is rethrown
    // All tasks complete before exception propagates
}

I/O errors are reported via ec in the io_result. Thrown exceptions are captured separately — Upon error cancellation is requested and the first exception is rethrown after all tasks complete.

Output

=== Fetching dashboard for: alice ===
Fetching user ID for: alice
Got user ID: 500

Starting parallel fetches...
Fetching name for user ID: 500
Fetching order count for user: 500
Fetching balance for user: 500

Dashboard results:
  Name: User500
  Orders: 50
  Balance: $750

=== Fetch with side effects ===
Logging access to: api/data
Updating metric: api_calls
Fetching name for user ID: 42
Data: User42

=== Error handling ===
Task A starting
Task B starting
Task C starting
Task A completed
Task C completed
Caught error: B failed!

Exercises

  1. Add timing to see the parallel speedup vs sequential execution

  2. Implement a "fan-out/fan-in" pattern that processes a list of items in parallel

  3. Add cancellation support so remaining tasks can exit early on error

Next Steps