Datadog Adapter
Overview
The Datadog adapter family provides three implementations of VendorAdapter<T>
for the Datadog metrics API:
| Adapter | API | HTTP | Output type |
|---|---|---|---|
DatadogV1Adapter | /api/v1/query | GET | DataPoint (40 bytes, flat) |
DatadogV2TimeseriesAdapter | /api/v2/query/timeseries | POST | V2TimeseriesBatch (columnar) |
DatadogV2ScalarAdapter | /api/v2/query/scalar | POST | V2ScalarBatch (aggregated) |
All three share the same execution engine, transport, and policy layers. The only difference is how requests are built, responses are parsed, and cursors are advanced.
v1 (DatadogV1Adapter):
Cursor(time) → GET /api/v1/query → JSON {series:[{aggr,pointlist}]} → DataPoint[]
v2 Timeseries (DatadogV2TimeseriesAdapter):
Cursor(time) → POST /api/v2/query/timeseries → JSON {data:{attributes:{times,values}}} → V2TimeseriesBatch
v2 Scalar (DatadogV2ScalarAdapter):
Cursor(time) → POST /api/v2/query/scalar → JSON {data:{attributes:{columns}}} → V2ScalarBatch
Shared engine (ExecutionEngine<T>) - all three paths run through:
Cursor → build_request() → Transport.send() → parse_response() → double-buffer → next()v1 vs v2
| Dimension | v1 | v2 Timeseries | v2 Scalar |
|---|---|---|---|
| Queries per request | 1 (4 aggregations) | N + formulas | N + formulas |
| Time axis | epoch seconds | epoch milliseconds | none (per-chunk aggregate) |
| Response shape | flat DataPoint[] | columnar times[] + series[] | columns[]{name, values[]} |
| Group-by tags | - | per-series group_tags[] | per-column name |
| Rate limit header | x-ratelimit-reset (seconds) - all three adapters parse this |
Adaptive Scaling
Datadog enforces strict rate limits and response size limits. A request that covers
too much data will return 429 Too Many Requests or 503 Service Unavailable. The
AdaptivePolicy handles this automatically by shrinking the time window on each
retry, then growing it back once requests succeed consistently.
How it works
When a request fails with a retryable error (429 or 5xx), the adaptive policy:
- Shrinks the internal chunk size by
shrink_factor(default 0.5×) - Shrinks
time_end_mson the cursor so the retry immediately uses a smaller window - not just the next batch - Waits using exponential backoff with jitter, or the server's
Retry-Afterheader if present - Retries with the smaller window
When requests succeed consecutively (grow_after times, default 3), the chunk
size grows back by grow_factor (default 1.5×), up to max_chunk_ms.
Example: querying 24 hours of high-cardinality metrics Request [0h, 4h] → 429 (too much data) │ policy shrinks: 4h → 2h, cursor.time_end shrinks to 2h │ sleep(backoff + jitter) ▼ Retry [0h, 2h] → 429 again │ policy shrinks: 2h → 1h, cursor.time_end shrinks to 1h │ sleep(longer backoff) ▼ Retry [0h, 1h] → 200 ✓ (window small enough) │ ▼ Next [1h, 2h] → 200 ✓ Next [2h, 3h] → 200 ✓ (3 consecutive successes → grow) Next [3h, 4.5h] → 200 ✓ (chunk grew: 1h → 1.5h) Next [4.5h, 6h] → 200 ✓ ...continues self-tuning across the full 24h range Chunk Size ▲ │ ┌──────────────────┐ │ ┌────┘ └────┐ │ ┌────┘ └────┐ │ │ ← success: grow │ │ │ failure: shrink → │ │ │ ┌───┘ │ │ ┌────┘ └───┴─────────────────────────────┴───────── Time ▶
Adaptive vs Fixed policy
| Policy | Chunk sizing | Backoff | Use when |
|---|---|---|---|
FixedPolicy | Static (never changes) | Fixed delay | Predictable workloads, known-safe chunk sizes |
AdaptivePolicy | Shrinks on failure, grows on success | Exponential + jitter | High-cardinality queries, unpredictable rate limits |
Configuration
AdaptivePolicy::Config cfg; cfg.initial_chunk_ms = 4 * 3600 * 1000; // start at 4 hours cfg.min_chunk_ms = 60 * 1000; // never smaller than 1 minute cfg.max_chunk_ms = 24 * 3600 * 1000; // never larger than 24 hours cfg.shrink_factor = 0.5; // halve on failure cfg.grow_factor = 1.5; // grow 50% on success cfg.grow_after = 3; // 3 consecutive successes to grow cfg.backoff_base = 500ms; // base backoff cfg.backoff_cap = 60000ms; // max backoff 60s cfg.jitter_max = 500ms; // ±500ms random jitter
v1 Usage
DatadogV1Config cfg;
cfg.api_key = "your-api-key";
cfg.app_key = "your-app-key";
cfg.query = "avg:system.cpu.user{*}";
cfg.range_start = 1700000000; // Unix seconds
cfg.range_end = 1700086400; // 24 hours later
cfg.chunk_seconds = 3600; // 1-hour chunks
Cursor initial;
initial.time_start_ms = cfg.range_start * 1000LL;
initial.time_end_ms = (cfg.range_start + cfg.chunk_seconds) * 1000LL;
auto engine = std::make_unique<ExecutionEngine<DataPoint>>(
std::make_unique<DatadogV1Adapter>(cfg),
std::make_unique<CurlTransport>(),
std::make_unique<AdaptivePolicy>(), // or FixedPolicy
initial
);
while (auto batch = engine->next()) {
if (!batch->ok()) { /* handle error */ break; }
for (auto& pt : batch->records) {
// pt.timestamp, pt.avg, pt.min, pt.max, pt.sum
}
}The v1 adapter requests all four aggregations (avg, min, max, sum) in a
single HTTP request and merges them by timestamp into a flat DataPoint array.
Missing values are represented as NaN.
v2 Timeseries
DatadogV2Config cfg;
cfg.api_key = "your-api-key";
cfg.app_key = "your-app-key";
cfg.range_start = 1700000000;
cfg.range_end = 1700086400;
cfg.interval_ms = 60000; // 1-minute granularity
cfg.queries = {{ "q0", "avg:system.cpu.user{*} by {host}", "metrics" }};
cfg.formulas = {{ "q0 * 100", "" }}; // optional
auto engine = std::make_unique<ExecutionEngine<V2TimeseriesBatch>>(
std::make_unique<DatadogV2TimeseriesAdapter>(cfg),
std::make_unique<CurlTransport>(),
std::make_unique<AdaptivePolicy>(),
initial
);
while (auto batch = engine->next()) {
for (auto& rec : batch->records) {
// rec.times - vector<int64_t> (milliseconds)
// rec.series[] - each has query_index, group_tags[], values[]
}
}v2 Scalar
DatadogV2Config cfg;
// ... same credentials and range ...
cfg.queries = {{ "q0", "avg:system.cpu.user{*}", "metrics" }};
cfg.formulas = {{ "q0", "avg" }}; // aggregator required for scalar
auto engine = std::make_unique<ExecutionEngine<V2ScalarBatch>>(
std::make_unique<DatadogV2ScalarAdapter>(cfg),
std::make_unique<CurlTransport>(),
std::make_unique<FixedPolicy>(),
initial
);
while (auto batch = engine->next()) {
for (auto& rec : batch->records) {
// rec.columns[] - each has name and values[]
}
}Configuration
C API (JSON config)
All language bindings use the same JSON configuration via
stream_create(adapter_name, config_json):
{
"api_key": "your-api-key",
"app_key": "your-app-key",
"site": "datadoghq.com",
"query": "avg:system.cpu.user{*}",
"range_start": 1700000000,
"range_end": 1700086400,
"chunk_seconds": 3600
}{
"api_key": "your-api-key",
"app_key": "your-app-key",
"site": "datadoghq.com",
"range_start": 1700000000,
"range_end": 1700086400,
"chunk_seconds": 3600,
"interval_ms": 60000,
"queries": [
{ "name": "q0", "query": "avg:system.cpu.user{*}", "data_source": "metrics" }
],
"formulas": [
{ "formula": "q0", "aggregator": "avg" }
]
}Error Handling
| HTTP status | Behaviour |
|---|---|
| 429 Too Many Requests | Parse x-ratelimit-reset header. If present, sleep that many seconds. Otherwise, exponential backoff. With AdaptivePolicy, the time window is shrunk on the retry. |
| 5xx Server Error | Exponential backoff + jitter, up to max_retries (default 5). Adaptive policy shrinks window. |
| 4xx Client Error (not 429) | Propagate immediately - no retry. Check stream_last_error(). |
| Parse error | Invalid JSON or unexpected response shape. No retry. |
