Building a Custom Adapter
The VendorAdapter Interface
To add support for a new API, implement VendorAdapter<T> where T is your record
type. The engine calls these six methods and knows nothing about your API's
authentication, URL format, response schema, or pagination model.
template <typename T>
class VendorAdapter {
public:
virtual ~VendorAdapter() = default;
// Build the next HTTP request from the current cursor position.
virtual Request build_request(const Cursor& cursor) const = 0;
// Parse the raw HTTP response into output records.
// Return false on failure; call last_error() for detail.
virtual bool parse_response(const Response& response, std::vector<T>& out) = 0;
// True if the engine should retry this response (typically 429 and 5xx).
virtual bool is_retryable(const Response& response) const = 0;
// Server-mandated wait time (e.g. Retry-After header). nullopt = use policy backoff.
virtual std::optional<std::chrono::milliseconds>
retry_after(const Response& response) const = 0;
// Advance the cursor. Return nullopt when the stream is exhausted.
virtual std::optional<Cursor>
next_cursor(const Cursor& current, const std::vector<T>& records) const = 0;
// Human-readable description of the last parse error.
virtual std::string last_error() const = 0;
};That's it. No registration boilerplate, no factory classes, no XML config. Implement these six methods and plug into the engine.
Step-by-Step: Prometheus Adapter
Let's build a complete adapter for the Prometheus /api/v1/query_range endpoint
as a worked example.
1. Define the record type
struct PrometheusPoint {
double timestamp; // Unix seconds (float)
double value;
std::string metric_name;
};2. Define the config
struct PrometheusConfig {
std::string base_url; // e.g. "http://prometheus:9090"
std::string query; // PromQL, e.g. "rate(http_requests_total[5m])"
double range_start; // Unix seconds
double range_end;
double step; // query resolution (seconds)
double chunk_seconds = 3600; // window per request
};3. Implement the adapter
class PrometheusAdapter final : public VendorAdapter<PrometheusPoint> {
public:
explicit PrometheusAdapter(PrometheusConfig cfg) : cfg_(std::move(cfg)) {}
Request build_request(const Cursor& cursor) const override {
double from = cursor.time_start_ms.value_or(0) / 1000.0;
double to = cursor.time_end_ms.value_or(0) / 1000.0;
std::ostringstream url;
url << cfg_.base_url << "/api/v1/query_range"
<< "?query=" << cfg_.query
<< "&start=" << std::fixed << from
<< "&end=" << std::fixed << to
<< "&step=" << cfg_.step;
return { "GET", url.str(), {{"Accept", "application/json"}}, "" };
}
bool parse_response(const Response& res, std::vector<PrometheusPoint>& out) override {
auto doc = nlohmann::json::parse(res.body, nullptr, false);
if (doc.is_discarded()) { last_err_ = "invalid JSON"; return false; }
if (doc["status"] != "success") {
last_err_ = doc.value("error", "unknown error");
return false;
}
for (auto& result : doc["data"]["result"]) {
std::string name = result["metric"].value("__name__", "");
for (auto& pt : result["values"]) {
out.push_back({
pt[0].get<double>(),
std::stod(pt[1].get<std::string>()),
name
});
}
}
return true;
}
bool is_retryable(const Response& r) const override {
return r.rate_limited() || r.server_error();
}
std::optional<std::chrono::milliseconds>
retry_after(const Response& r) const override {
auto it = r.headers.find("retry-after");
if (it != r.headers.end()) {
try { return std::chrono::seconds(std::stoi(it->second)); }
catch (...) {}
}
return std::nullopt;
}
std::optional<Cursor>
next_cursor(const Cursor& c, const std::vector<PrometheusPoint>&) const override {
int64_t chunk_ms = static_cast<int64_t>(cfg_.chunk_seconds * 1000);
auto it = c.extra.find("adaptive_chunk_ms");
if (it != c.extra.end()) {
try { chunk_ms = std::stoll(it->second); } catch (...) {}
}
int64_t next_start = c.time_end_ms.value_or(0);
int64_t range_end_ms = static_cast<int64_t>(cfg_.range_end * 1000);
if (next_start >= range_end_ms) return std::nullopt;
Cursor next = c;
next.time_start_ms = next_start;
next.time_end_ms = std::min(next_start + chunk_ms, range_end_ms);
return next;
}
std::string last_error() const override { return last_err_; }
private:
PrometheusConfig cfg_;
std::string last_err_;
};4. Use it
PrometheusConfig cfg;
cfg.base_url = "http://prometheus:9090";
cfg.query = "rate(http_requests_total[5m])";
cfg.range_start = 1700000000;
cfg.range_end = 1700086400;
cfg.step = 60;
cfg.chunk_seconds = 3600;
Cursor initial;
initial.time_start_ms = static_cast<int64_t>(cfg.range_start * 1000);
initial.time_end_ms = static_cast<int64_t>((cfg.range_start + cfg.chunk_seconds) * 1000);
auto engine = std::make_unique<ExecutionEngine<PrometheusPoint>>(
std::make_unique<PrometheusAdapter>(cfg),
std::make_unique<CurlTransport>(),
std::make_unique<AdaptivePolicy>(), // adapts to Prometheus rate limits too
initial
);Choosing a Cursor Strategy
The Cursor struct supports three pagination models. Use the one that matches your API:
| API type | Cursor field | Example APIs |
|---|---|---|
| Time-window | time_start_ms / time_end_ms | Datadog, Prometheus, InfluxDB, Grafana |
| Page-number | page | REST APIs with ?page=N |
| Opaque token | token | Slack, GitHub, AWS APIs with next_cursor |
For APIs that combine models (e.g. Datadog Logs uses time_start_ms + token),
use both fields. The extra map handles adapter-specific fields that don't fit
the standard ones.
Testing with MockTransport
Test your adapter entirely offline using MockTransport:
#include "tests/fixtures/MockTransport.hpp"
auto transport = std::make_unique<MockTransport>();
// Enqueue scripted responses
transport->enqueue(200, R"({"status":"success","data":{"result":[...]}})");
transport->enqueue(429, "rate limited"); // test retry
transport->enqueue(200, R"({"status":"success","data":{"result":[...]}})");
// Or use a dynamic handler
transport->set_handler([](const Request& req, Response& res) {
if (req.url.find("start=0") != std::string::npos) {
res.status_code = 200;
res.body = "...";
} else {
res.status_code = 429;
}
return true;
});
auto engine = std::make_unique<ExecutionEngine<PrometheusPoint>>(
std::make_unique<PrometheusAdapter>(cfg),
std::move(transport),
std::make_unique<FixedPolicy>(),
initial
);Registering in the C API
To expose your adapter through the C API for language bindings, add a factory
branch in stream_c_api.cpp:
// In stream_create():
} else if (h->adapter_name == "prometheus") {
auto cfg = parse_prometheus_config(j); // your JSON parser
auto cursor = make_time_cursor(cfg.range_start, cfg.chunk_seconds);
auto adapter = std::make_unique<PrometheusAdapter>(std::move(cfg));
h->prometheus_engine = std::make_unique<ExecutionEngine<PrometheusPoint>>(
std::move(adapter), std::move(transport), std::move(policy), cursor);
}Then all five language bindings can create a Prometheus stream with
stream_create("prometheus", config_json) - no binding-layer changes needed.
