Examples
Streaming with prefetch
The double-buffered prefetch pipeline runs the next fetch concurrently while
you process the current batch. Wall time converges toward
max(fetch_latency, process_latency) × N rather than their sum.
// Example: record-level streaming with ForEach.
//
// Demonstrates the ForEach interface for iterating batches as they arrive.
// Works with prefetch enabled so I/O overlaps with processing.
//
// Usage: go run main.go
package main
import (
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strconv"
"time"
"github.com/voseghale/apiexec/bindings/go/apiexec"
)
func main() {
...For C callers the pattern is the same - use the C API directly:
#include "c_api/c_api.h"
#include <stdio.h>
int run_query(const char* adapter, const char* config) {
StreamHandle* s = stream_create(adapter, config, NULL);
if (!s) return -1;
char buf[131072]; /* 128 KiB - tune to your expected batch size */
int32_t count = 0;
int32_t rc = STREAM_OK;
size_t total = 0;
while ((rc = stream_has_next(s)) == 1) {
rc = stream_next_batch_v1(s, buf, sizeof buf, &count);
if (rc != STREAM_OK) break;
total += (size_t)count;
/* hand buf to your processing pipeline */
}
stream_destroy(s);
printf("total records: %zu, final rc: %d\n", total, rc);
return rc == STREAM_OK || rc == STREAM_EXHAUSTED ? 0 : rc;
}Retry and rate limiting
429 and 5xx responses are retried with exponential backoff + jitter. The
Retry-After header is honored when present. Callers see only successful
batches - errors are invisible unless retries are exhausted.
"""Example: retry and rate-limit adaptation.
Demonstrates automatic 429/5xx retry with exponential backoff. The mock
server returns 3 x 429 before succeeding; the engine retries transparently
and the caller never sees the rate-limit errors.
Run: LD_LIBRARY_PATH=../../build python3 examples/retry.py
"""
import http.server
import json
import sys
import threading
import urllib.parse
sys.path.insert(0, str(__import__("pathlib").Path(__file__).parent.parent))
from apiexec import Stream, ApiExecError
class MockHandler(http.server.BaseHTTPRequestHandler):
...Cancellation
cancel() / stream_cancel() is the only call that is safe from a
different thread. A watchdog, signal handler, or deadline goroutine can
use it to halt an in-flight stream cleanly.
"""Example: cancelling an in-progress stream from another thread.
cancel() is thread-safe. A watchdog thread can cancel the stream based
on a timeout; the main thread sees the cancellation on its next iteration.
Run: LD_LIBRARY_PATH=../../build python3 examples/cancellation.py
"""
import http.server
import json
import sys
import threading
import time
import urllib.parse
sys.path.insert(0, str(__import__("pathlib").Path(__file__).parent.parent))
from apiexec import Stream, ApiExecError
class MockHandler(http.server.BaseHTTPRequestHandler):
...C signal handler pattern:
#include <signal.h>
static volatile StreamHandle* g_active = NULL;
static void on_sigint(int sig) {
(void)sig;
StreamHandle* s = (StreamHandle*)g_active;
if (s) stream_cancel(s);
}
int main(void) {
signal(SIGINT, on_sigint);
StreamHandle* s = stream_create("datadog-v1", config, NULL);
g_active = s;
char buf[65536];
int32_t count = 0;
while (stream_has_next(s) == 1) {
int32_t rc = stream_next_batch_v1(s, buf, sizeof buf, &count);
if (rc == STREAM_ERROR_CANCELLED) {
printf("cancelled cleanly\n");
break;
}
if (rc != STREAM_OK) break;
}
g_active = NULL;
stream_destroy(s);
return 0;
}Multi-adapter dispatch
The same stream_create(adapter_name, config, policy) call selects any
registered adapter at runtime. Switching from generic_rest to openai
or datadog_metrics requires only a name and config change - no
recompilation.
"""Example: runtime adapter selection via the registry.
The same Stream(adapter_name, config) API dispatches to any of the 4
registered adapters. This shows openai and datadog_metrics.
Run: LD_LIBRARY_PATH=../../build python3 examples/multi_adapter.py
"""
import http.server
import json
import sys
import threading
sys.path.insert(0, str(__import__("pathlib").Path(__file__).parent.parent))
from apiexec import Stream
openai_port = None
dd_port = None
...Paginate any REST API
Point the generic_rest adapter at any cursor-based REST endpoint. Pass
page_size in the config; the engine handles the cursor loop, retry, and
prefetch automatically.
// Example: paginate a REST API using apiexec as backend.
//
// Usage: go run main.go <base_url>
// Example: go run main.go http://localhost:8080/api/data
package main
import (
"fmt"
"os"
"github.com/voseghale/apiexec/bindings/go/apiexec"
)
func main() {
if len(os.Args) < 2 {
fmt.Fprintf(os.Stderr, "Usage: %s <base_url>\n", os.Args[0])
os.Exit(1)
}
baseURL := os.Args[1]
...Cost budget (AI adapters)
For AI adapters (openai, anthropic), the engine tracks token spend via
the response_cost() hook on each batch. A CostAwarePolicy with a
budget_tokens limit halts the stream when the budget is reached, returning
ErrBudgetExhausted to the caller.
// Example: cost budget enforcement. // // Demonstrates CostAwarePolicy via JSON config. The policy halts execution // when cumulative cost reaches the configured budget. Useful for bounding // spend on AI API calls. // // Note: CostAwarePolicy is set by passing a policy that sets cost hooks. // Since DefaultPolicy::from_json does not expose budget fields yet, // this example uses the generic_rest adapter (no token cost) to show // the ErrBudgetExhausted surface - a real AI adapter (openai/anthropic) // would report token cost via response_cost() and halt at the budget. // // Usage: go run main.go package main import ( "encoding/json" "errors" "fmt" "net/http" ...
Sizing the buffer
stream_next_batch_v1 returns STREAM_ERROR_CLIENT if the buffer is too
small for the batch. The safe approach is to start large and double on failure:
/* Try with a 256 KiB buffer; double on failure up to 4 MiB. */
size_t buf_size = 262144;
char* buf = malloc(buf_size);
while (stream_has_next(s) == 1) {
int32_t count = 0;
int32_t rc = stream_next_batch_v1(s, buf, (int32_t)buf_size, &count);
if (rc == STREAM_ERROR_CLIENT && buf_size < 4194304) {
buf_size *= 2;
buf = realloc(buf, buf_size);
continue; /* retry the same position */
}
if (rc != STREAM_OK) break;
/* process buf */
}
free(buf);
stream_destroy(s);Retry is safe after STREAM_ERROR_CLIENT
Unlike network errors (where the engine retries internally),
STREAM_ERROR_CLIENT due to a small buffer leaves the stream at the same
cursor position. It is safe to call stream_next_batch_v1 again with a
larger buffer.
Handling every error code
const char* error_string(int32_t rc) {
switch (rc) {
case STREAM_OK: return "ok";
case STREAM_EXHAUSTED: return "exhausted";
case STREAM_ERROR_RATE_LIMIT: return "rate limited";
case STREAM_ERROR_SERVER: return "server error";
case STREAM_ERROR_CLIENT: return "client error";
case STREAM_ERROR_PARSE: return "parse failure";
case STREAM_ERROR_NETWORK: return "network error";
case STREAM_ERROR_CANCELLED: return "cancelled";
case STREAM_ERROR_INVALID_ARG: return "invalid argument";
default: return "unknown";
}
}Custom execution policy
Pass a JSON policy object as the third argument to stream_create to
override defaults:
const char* policy =
"{"
" \"max_retries\": 5,"
" \"initial_backoff_ms\": 500,"
" \"max_backoff_ms\": 30000,"
" \"prefetch_depth\": 2"
"}";
StreamHandle* s = stream_create("datadog-v2-ts", config, policy);| Field | Default | Effect |
|---|---|---|
max_retries | 3 | Maximum retry attempts per request |
initial_backoff_ms | 1000 | First backoff interval |
max_backoff_ms | 60000 | Backoff cap |
prefetch_depth | 1 | Batches fetched ahead of caller |
Ollama: local AI streaming
ollama_demo.cpp is a standalone C++ program that sends 10 factual prompts to a
local Ollama instance and streams the responses through the
apiexec engine. No cloud API key is needed — Ollama runs fully on-device.
What it demonstrates:
OpenAIAdapterrepointed at Ollama's OpenAI-compatible/v1/chat/completionsendpointCostAwarePolicywith a token budget — tracks spend and halts cleanly at the cap- Metrics callback printing live counters after every batch
- Structured log callback routing WARN/ERROR events to stderr
prefetch_depth = 0(sequential) for deterministic output ordering- Handling
qwen3:8bthinking-mode responses (see note below)
Build and run
All commands run from the codebase/ directory:
cmake --build build --target ollama_demo ./build/ollama_demo 192.168.1.104:11434 qwen3:8b 5000 1024 # ^^^^^^^^^^^^^^^^^^^ ^^^^^^^^ ^^^^ ^^^^ # host:port model budget max_tokens_per_response
| Argument | Default | Notes |
|---|---|---|
host:port | 192.168.1.104:11434 | Ollama API address |
model | qwen3:8b | Any model loaded in Ollama |
budget_tokens | 5000 | Total token budget across all prompts |
max_tokens_per_response | 1024 | Per-response token cap; 100 truncates mid-sentence on most models |
For longer responses (code generation, detailed explanations):
./build/ollama_demo 192.168.1.104:11434 qwen3:8b 10000 2048
Adapter and policy configuration
The adapter is built from a JSON config that mirrors the standard OpenAI adapter fields:
nlohmann::json config; config["base_url"] = "http://" + host + "/v1/chat/completions"; config["api_key"] = "ollama"; // Ollama ignores this — field is required by the adapter config["model"] = model; // e.g. "qwen3:8b", "llama3", "mistral" config["prompts"] = prompts; // vector<string> — one HTTP request per prompt config["max_tokens"] = max_tokens; // 1024 default — enough for complete answers config["temperature"] = 0.7;
The policy enforces a hard token budget:
DefaultPolicy::Config policy_cfg; policy_cfg.max_retries = 3; policy_cfg.base_backoff_ms = 500; policy_cfg.prefetch_depth_val = 0; // sequential — each prompt completes before the next starts CostAwarePolicy::CostConfig cost_cfg; cost_cfg.budget_tokens = budget; // halt when cumulative tokens >= budget cost_cfg.price_per_token = 0.0; // Ollama is free; set > 0 for cloud APIs
Metrics and logging callbacks
// After every batch: print live counters to stdout
engine->set_metrics_callback([](const MetricsSnapshot& s) {
std::cout << " [metrics] requests=" << s.request_count
<< " retries=" << s.retry_count
<< " success=" << s.success_count
<< " records=" << s.records_total
<< " window=" << s.window_size_ms << "ms\n";
});
// WARN / ERROR events to stderr with HTTP status and retry count
set_log_level(LogLevel::WARN);
engine->set_log_callback([](const LogEntry& entry) {
std::cerr << " [WARN/ERROR] " << entry.message
<< " (HTTP " << entry.http_status << ")\n";
});Source
// Example: talk to a local Ollama instance using apiexec.
//
// Demonstrates:
// - OpenAI-compatible adapter against Ollama
// - CostAwarePolicy with token budget enforcement
// - Real-time metrics callback with Prometheus format
// - Structured logging on every error path
// - Multi-prompt streaming with prefetch disabled
//
// Usage: ./ollama_demo [ollama_host] [model] [budget_tokens] [max_tokens_per_response]
// Defaults: 192.168.1.104:11434 qwen3:8b 5000 1024
#include "core/engine.hpp"
#include "core/logging.hpp"
#include "adapters/openai.hpp"
#include "policy/cost_aware_policy.hpp"
#include "transport/curl_transport.hpp"
#include <chrono>
#include <iomanip>
#include <iostream>
#include <nlohmann/json.hpp>
#include <string>
#include <vector>
using namespace apiexec;
constexpr double DEFAULT_BUDGET_TOKENS = 5000.0;
constexpr double PRICE_PER_TOKEN = 0.0; // Ollama is free
constexpr int DEFAULT_MAX_TOKENS_PER_RESPONSE = 1024;
auto main(int argc, char* argv[]) -> int {
std::string host = "192.168.1.104:11434";
std::string model = "qwen3:8b";
double budget = DEFAULT_BUDGET_TOKENS;
int max_tokens = DEFAULT_MAX_TOKENS_PER_RESPONSE;
if (argc > 1) host = argv[1];
if (argc > 2) model = argv[2];
if (argc > 3) budget = std::stod(argv[3]);
if (argc > 4) max_tokens = std::stoi(argv[4]);
std::string base_url = "http://" + host + "/v1/chat/completions";
std::vector<std::string> prompts = {
"What is the capital of France? Answer in one sentence.",
"Explain what an API is in two sentences.",
"What is the speed of light? One sentence.",
"Name three programming languages created after 2010.",
"What is the difference between TCP and UDP? Brief answer.",
"Who wrote Romeo and Juliet?",
"What is a hash table? One sentence.",
"Name the four largest planets in our solar system.",
"What year did the first iPhone launch?",
"What does CPU stand for?",
};
std::cout << "=== apiexec Ollama Demo ===\n"
<< "Host: " << host << "\n"
<< "Model: " << model << "\n"
<< "Prompts: " << prompts.size() << "\n"
<< "Budget: " << budget << " tokens\n"
<< "Max per response: " << max_tokens << " tokens\n\n";
// Build adapter config
nlohmann::json config;
config["base_url"] = base_url;
config["api_key"] = "ollama";
config["model"] = model;
config["prompts"] = prompts;
config["max_tokens"] = max_tokens;
config["temperature"] = 0.7;
// Build cost-aware policy
DefaultPolicy::Config policy_cfg;
policy_cfg.max_retries = 3;
policy_cfg.base_backoff_ms = 500;
policy_cfg.prefetch_depth_val = 0; // sequential for clear output ordering
CostAwarePolicy::CostConfig cost_cfg;
cost_cfg.budget_tokens = budget;
cost_cfg.price_per_token = PRICE_PER_TOKEN;
auto* policy = new CostAwarePolicy(policy_cfg, cost_cfg);
// Create engine
auto engine = std::make_unique<ExecutionEngine<JsonBatch>>(
std::make_unique<OpenAIAdapter>(OpenAIAdapter::from_json(config.dump())),
std::unique_ptr<ExecutionPolicy>(policy),
std::make_unique<CurlTransport>(),
Cursor{}
);
// Metrics callback — fires after every successful batch
engine->set_metrics_callback([](const MetricsSnapshot& s) {
std::cout << " [metrics] requests=" << s.request_count
<< " retries=" << s.retry_count
<< " success=" << s.success_count
<< " records=" << s.records_total
<< " window=" << s.window_size_ms << "ms\n";
});
// Log callback — WARN/ERROR only, to stderr
set_log_level(LogLevel::WARN);
engine->set_log_callback([](const LogEntry& entry) {
const char* level_str = "INFO";
switch (entry.level) {
case LogLevel::WARN: level_str = "WARN"; break;
case LogLevel::ERROR: level_str = "ERROR"; break;
default: break;
}
std::cerr << " [" << level_str << "] " << entry.message;
if (entry.http_status > 0) std::cerr << " (HTTP " << entry.http_status << ")";
if (entry.retry_count > 0) std::cerr << " (retry " << entry.retry_count << ")";
std::cerr << "\n";
});
auto start = std::chrono::steady_clock::now();
int completed = 0;
std::cout << "--- Responses ---\n\n";
while (engine->has_next()) {
auto result = engine->next_batch();
if (result.error == StreamErrorCode::BUDGET_EXHAUSTED) {
std::cout << "\n[Budget exhausted after " << completed << " prompts]\n";
break;
}
if (result.error == StreamErrorCode::EXHAUSTED) break;
if (result.error != StreamErrorCode::OK) {
std::cout << "\n[Error: " << static_cast<int>(result.error) << "]\n";
break;
}
// Extract the assistant's response from the choices.
// Some models (e.g., qwen3 in thinking mode) put output in "reasoning"
// instead of "content". Check both.
for (const auto& batch : result.records) {
for (const auto& choice : batch.records) {
std::string content;
if (choice.contains("message")) {
auto& msg = choice["message"];
if (msg.contains("content") && !msg["content"].get<std::string>().empty())
content = msg["content"].get<std::string>();
else if (msg.contains("reasoning"))
content = msg["reasoning"].get<std::string>();
}
std::cout << "Q" << (completed + 1) << ": " << prompts[completed] << "\n"
<< "A" << (completed + 1) << ": " << content << "\n\n";
}
}
++completed;
auto remaining = policy->remaining_budget();
std::cout << " [cost] tokens_used=" << policy->cumulative_tokens()
<< " remaining=";
if (remaining.has_value()) std::cout << remaining.value();
else std::cout << "unlimited";
std::cout << "\n\n";
}
auto elapsed = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now() - start);
std::cout << "=== Summary ===\n"
<< "Prompts completed: " << completed << " / " << prompts.size() << "\n"
<< "Wall time: " << elapsed.count() << "ms\n"
<< "Tokens consumed: " << policy->cumulative_tokens() << "\n";
if (budget > 0) {
auto remaining = policy->remaining_budget();
std::cout << "Budget remaining: "
<< (remaining.has_value() ? std::to_string(remaining.value()) : "n/a")
<< " tokens\n"
<< "Budget exhausted: " << (policy->budget_exceeded() ? "yes" : "no") << "\n";
}
std::cout << "\n=== Prometheus Metrics ===\n"
<< engine->metrics().to_prometheus("ollama_demo");
return 0;
}Real output
Captured from ./build/ollama_demo 192.168.1.104:11434 qwen3:8b 5000 1024:
=== apiexec Ollama Demo ===
Host: 192.168.1.104:11434
Model: qwen3:8b
Prompts: 10
Budget: 5000 tokens
Max per response: 1024 tokens
--- Responses ---
[metrics] requests=1 retries=0 success=1 records=1 window=0ms
Q1: What is the capital of France? Answer in one sentence.
A1: The capital of France is Paris.
[cost] tokens_used=145 remaining=4855
[metrics] requests=2 retries=0 success=2 records=2 window=0ms
Q2: Explain what an API is in two sentences.
A2: An API (Application Programming Interface) is a set of rules and protocols that
allows different software applications to communicate and interact with each other.
It acts as an intermediary, enabling developers to access specific functionalities
of another system without needing to understand its internal workings.
[cost] tokens_used=453 remaining=4547
[metrics] requests=3 retries=0 success=3 records=3 window=0ms
Q3: What is the speed of light? One sentence.
A3: The speed of light in a vacuum is exactly 299,792,458 meters per second.
[cost] tokens_used=687 remaining=4313
[metrics] requests=4 retries=0 success=4 records=4 window=0ms
Q4: Name three programming languages created after 2010.
A4: [qwen3:8b chain-of-thought reasoning about Kotlin, Rust, Swift, TypeScript...]
[cost] tokens_used=1733 remaining=3267
[metrics] requests=5 retries=0 success=5 records=5 window=0ms
Q5: What is the difference between TCP and UDP? Brief answer.
A5: TCP (Transmission Control Protocol) and UDP (User Datagram Protocol) are both
transport layer protocols, but differ in key aspects:
- Connection-Oriented vs. Connectionless
- Reliability: TCP ensures ordered delivery; UDP does not guarantee delivery
- Overhead: TCP has higher overhead; UDP has smaller header for faster transmission
- Use Cases: TCP for critical data; UDP for real-time applications
[cost] tokens_used=2524 remaining=2476
[metrics] requests=6 retries=0 success=6 records=6 window=0ms
Q6: Who wrote Romeo and Juliet?
A6: William Shakespeare wrote Romeo and Juliet, likely in 1595-1596.
[cost] tokens_used=3176 remaining=1824
[metrics] requests=7 retries=0 success=7 records=7 window=0ms
Q7: What is a hash table? One sentence.
A7: A hash table is a data structure that maps keys to values using a hash function
to compute an index in an array, enabling efficient insertion, deletion, and lookup.
[cost] tokens_used=3609 remaining=1391
[metrics] requests=8 retries=0 success=8 records=8 window=0ms
Q8: Name the four largest planets in our solar system.
A8: Jupiter, Saturn, Uranus, Neptune.
[cost] tokens_used=4145 remaining=855
[metrics] requests=9 retries=0 success=9 records=9 window=0ms
Q9: What year did the first iPhone launch?
A9: The first iPhone was launched in 2007.
[cost] tokens_used=4386 remaining=614
[metrics] requests=10 retries=0 success=10 records=10 window=0ms
Q10: What does CPU stand for?
A10: CPU stands for Central Processing Unit — often referred to as the "brain" of a
computer, it executes program instructions and manages data flow.
[cost] tokens_used=4663 remaining=337
=== Summary ===
Prompts completed: 10 / 10
Wall time: 398233ms
Tokens consumed: 4663
Budget remaining: 337.000000 tokens
Budget exhausted: no
=== Prometheus Metrics ===
ollama_demo_requests_total 10
ollama_demo_retries_total 0
ollama_demo_successes_total 10
ollama_demo_errors_rate_limit_total 0
ollama_demo_errors_server_total 0
ollama_demo_errors_client_total 0
ollama_demo_errors_network_total 0
ollama_demo_errors_parse_total 0
ollama_demo_records_total 10
ollama_demo_window_size_ms 0.000000
ollama_demo_cumulative_cost_units 0.000000Reading the output:
| Line | What it means |
|---|---|
[metrics] requests=N | One HTTP request per prompt — no batching |
[metrics] window=0ms | OpenAIAdapter uses prompt-index cursors, not time windows |
[cost] tokens_used=N remaining=M | Running total after each prompt; checked before the next fetch |
Prompts completed: 10 / 10 | All prompts finished within the 5000-token budget |
Budget remaining: 337 | 5000 − 4663 tokens left after all 10 prompts |
cumulative_cost_units 0.000000 | PRICE_PER_TOKEN = 0.0 — Ollama is free; token count tracked separately via policy->cumulative_tokens() |
Wall time: 398233ms | ~40s per prompt — local inference time depends on hardware and model size |
qwen3:8b thinking mode
qwen3:8b runs in thinking mode by default, meaning it reasons through its answer
before responding. With max_tokens = 100 (the old default), most responses were
cut off mid-thought. With max_tokens = 1024, the model completes its reasoning
and produces a full answer.
Some model configurations return an empty content field and place all output in
a message.reasoning field instead. The demo handles both:
if (msg.contains("content") && !msg["content"].get<std::string>().empty())
content = msg["content"];
else if (msg.contains("reasoning"))
content = msg["reasoning"];For models that do not use thinking mode (e.g. llama3, mistral), only the
content branch is ever used.
Document summarisation with cost budget
summarise.cpp demonstrates AIAdapter prompt chunking and CostAwarePolicy
budget enforcement without requiring any external service — it starts an inline
mock HTTP server on a random port, so it runs entirely offline.
What it demonstrates:
OpenAIAdapterreceiving responses from an in-process mock server- 50-prompt workload (simulating a chunked document)
CostAwarePolicyhalting cleanly atbudget_tokens- Final cost report: chunks processed, tokens consumed, estimated dollar cost, remaining budget
Build and run
cmake --build build --target summarise ./build/summarise 500 # ^^^ # budget_tokens (default: 500)
The binary starts a mock server on a random port, processes chunks until the budget is exhausted, then shuts down the server and exits.
How the mock server works
The inline server handles each HTTP request by parsing the messages[0].content
field from the request body and returning a fixed JSON response:
resp["choices"] = [{ "message": { "content": "Summary of: <first 50 chars>..." } }];
resp["usage"] = { "total_tokens": 30 }; // 30 tokens per response, fixedEvery response costs exactly 30 tokens. With a 500-token budget, the engine processes
floor(500 / 30) = 16 full chunks, then one more that takes the total to 510 and
triggers BUDGET_EXHAUSTED.
Source
// Example: chunked document summarisation with token budget enforcement.
//
// Demonstrates AIAdapter prompt chunking and CostAwarePolicy budget cap.
// Uses an inline mock server — no external API key needed.
//
// Usage: ./summarise [budget_tokens] Default: 500
#include "core/engine.hpp"
#include "adapters/openai.hpp"
#include "policy/cost_aware_policy.hpp"
#include "transport/curl_transport.hpp"
#include "adapters/ai_adapter.hpp"
#include <arpa/inet.h>
#include <atomic>
#include <iostream>
#include <nlohmann/json.hpp>
#include <sstream>
#include <sys/socket.h>
#include <thread>
#include <unistd.h>
using namespace apiexec;
constexpr int TOKENS_PER_RESPONSE = 30;
static std::atomic<bool> g_running{true};
// Inline mock HTTP server — one thread, handles requests sequentially.
static auto serve_mock(int fd) -> void {
while (g_running.load()) {
sockaddr_in addr{};
socklen_t len = sizeof(addr);
int client = accept(fd, reinterpret_cast<sockaddr*>(&addr), &len);
if (client < 0) continue;
char buf[8192];
recv(client, buf, sizeof(buf) - 1, 0);
// Parse prompt from request body
std::string raw(buf);
auto body_start = raw.find("\r\n\r\n");
std::string prompt;
if (body_start != std::string::npos) {
auto j = nlohmann::json::parse(raw.substr(body_start + 4), nullptr, false);
if (!j.is_discarded() && j.contains("messages"))
prompt = j["messages"][0].value("content", "");
}
// Return a fixed-cost mock response
nlohmann::json resp;
resp["choices"] = nlohmann::json::array({
{{"index", 0}, {"message", {{"role", "assistant"},
{"content", "Summary of: " + prompt.substr(0, 50) + "..."}}}}
});
resp["usage"] = {
{"prompt_tokens", 10}, {"completion_tokens", 20},
{"total_tokens", TOKENS_PER_RESPONSE}
};
auto body = resp.dump();
std::ostringstream http;
http << "HTTP/1.1 200 OK\r\nContent-Type: application/json\r\n"
<< "Content-Length: " << body.size() << "\r\nConnection: close\r\n\r\n"
<< body;
auto s = http.str();
send(client, s.data(), s.size(), 0);
close(client);
}
}
auto main(int argc, char* argv[]) -> int {
double budget_tokens = 500;
if (argc > 1) budget_tokens = std::stod(argv[1]);
// Start mock server on an ephemeral port
int server_fd = socket(AF_INET, SOCK_STREAM, 0);
int opt = 1;
setsockopt(server_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
sockaddr_in addr{};
addr.sin_family = AF_INET;
addr.sin_addr.s_addr = htonl(INADDR_LOOPBACK);
addr.sin_port = 0; // OS assigns a free port
bind(server_fd, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
listen(server_fd, 16);
socklen_t alen = sizeof(addr);
getsockname(server_fd, reinterpret_cast<sockaddr*>(&addr), &alen);
int port = ntohs(addr.sin_port);
std::thread server_thread(serve_mock, server_fd);
// 50 Lorem Ipsum chunks simulate a large document
std::vector<std::string> chunks;
for (int i = 0; i < 50; ++i) {
chunks.push_back("Chunk " + std::to_string(i) + ": Lorem ipsum dolor sit amet, "
"consectetur adipiscing elit. Sed do eiusmod tempor incididunt "
"ut labore et dolore magna aliqua.");
}
nlohmann::json config;
config["base_url"] = "http://127.0.0.1:" + std::to_string(port) + "/v1/chat/completions";
config["api_key"] = "example-key";
config["model"] = "gpt-4";
config["prompts"] = chunks;
config["max_tokens"] = 100;
DefaultPolicy::Config base_cfg;
base_cfg.prefetch_depth_val = 0;
CostAwarePolicy::CostConfig cost_cfg;
cost_cfg.budget_tokens = budget_tokens;
cost_cfg.price_per_token = 0.00003; // $0.03 / 1k tokens (GPT-4 estimate)
auto* policy = new CostAwarePolicy(base_cfg, cost_cfg);
ExecutionEngine<JsonBatch> engine(
std::make_unique<OpenAIAdapter>(OpenAIAdapter::from_json(config.dump())),
std::unique_ptr<ExecutionPolicy>(policy),
std::make_unique<CurlTransport>(),
Cursor{}
);
std::cout << "Summarising " << chunks.size() << " chunks with budget="
<< budget_tokens << " tokens\n\n";
int completed = 0;
while (engine.has_next()) {
auto result = engine.next_batch();
if (result.error == StreamErrorCode::BUDGET_EXHAUSTED) {
std::cout << "\n[Budget exhausted after " << completed << " chunks]\n";
break;
}
if (result.error != StreamErrorCode::OK) {
std::cout << "\n[Error: " << static_cast<int>(result.error) << "]\n";
break;
}
for (const auto& batch : result.records) {
for (const auto& choice : batch.records) {
std::string content = choice.value("message", nlohmann::json{})
.value("content", "");
std::cout << "Chunk " << completed << ": " << content << "\n";
}
}
++completed;
}
std::cout << "\n--- Cost Report ---\n"
<< "Chunks processed: " << completed << " / " << chunks.size() << "\n"
<< "Tokens consumed: " << policy->cumulative_tokens() << "\n"
<< "Estimated cost: quot; << policy->cumulative_dollars() << "\n";
auto remaining = policy->remaining_budget();
if (remaining.has_value())
std::cout << "Remaining budget: " << remaining.value() << " tokens\n";
// Shutdown mock server
g_running.store(false);
int wake = socket(AF_INET, SOCK_STREAM, 0);
connect(wake, reinterpret_cast<sockaddr*>(&addr), sizeof(addr));
close(wake);
server_thread.join();
close(server_fd);
return 0;
}Real output
Captured from ./build/summarise 500:
Summarising 50 chunks with budget=500 tokens Chunk 0: Summary of: Chunk 0: Lorem ipsum dolor sit amet, consectetur a... Chunk 1: Summary of: Chunk 1: Lorem ipsum dolor sit amet, consectetur a... Chunk 2: Summary of: Chunk 2: Lorem ipsum dolor sit amet, consectetur a... ... Chunk 16: Summary of: Chunk 16: Lorem ipsum dolor sit amet, consectetur ... [Budget exhausted after 17 chunks] --- Cost Report --- Chunks processed: 17 / 50 Tokens consumed: 510 Estimated cost: $0.0153 Remaining budget: -10 tokens
Reading the output:
| Value | Explanation |
|---|---|
| 17 chunks processed | floor(500 / 30) = 16 full chunks, plus one final chunk that pushes total to 510 |
| 510 tokens consumed | 17 × 30 tokens/response — the last batch exceeds the cap by 10 |
| $0.0153 | 510 tokens × $0.00003/token — matches GPT-4 pricing estimate in the config |
| Remaining budget: -10 | Budget check runs before each fetch; the last fetch was allowed through with 10 tokens remaining and consumed 30 |
