Architecture
Why This Matters
dd_stream shifts API consumption from a best-effort pattern to a controlled execution model.
Instead of each client reimplementing pagination, retries, and rate-limit handling, the system provides:
- Reliability - large queries complete without manual tuning
- Efficiency - maximum throughput without exceeding API limits
- Consistency - uniform behavior across vendors and SDKs
- Simplicity - developers interact with a continuous stream, not fragmented requests
This reduces operational risk and standardizes how APIs are consumed at scale.
Separation of Concerns
The execution engine operates as a runtime scheduler that coordinates request construction, transport, parsing, and adaptive control - without understanding API semantics.
| Layer | Responsibility |
|---|---|
ExecutionEngine<T> | Double-buffered prefetch, retry loop, cursor advancement, cancellation |
VendorAdapter<T> | API semantics: request building, response parsing, cursor logic |
ExecutionPolicy | Runtime adaptation: chunk sizing, backoff strategy, prefetch depth |
Transport | Network I/O (libcurl). No vendor knowledge. |
Adding a new vendor requires implementing VendorAdapter<T> only - no changes
to the engine, transport, policy, or C API.
Chunking & Double-Buffering
The engine divides work into chunks (time windows, pages, or token-delimited ranges).
Each chunk maps to one HTTP request. Results are exposed as a pull-based iterator —
stream.next() blocks only until the current batch is ready.
Double-buffering transforms request execution from a sequential model into a
pipelined one, eliminating idle wait between network and processing phases. While
you process the current batch, a background std::async task is already fetching
the next one.
Without prefetch:
│← Fetch A →│← Process A →│← Fetch B →│← Process B →│
With prefetch (double-buffer):
│← Fetch A ──────→│
│← Process A →│
│← Fetch B ───────→│
│← Process B →│
│← Fetch C ───────→│Adaptive Execution
The system continuously probes API capacity and converges on an optimal request size without prior knowledge of limits.
The AdaptivePolicy self-tunes chunk size based on success/failure signals. On
failure (429, 5xx), it shrinks the chunk window and the cursor itself, so the
retry immediately uses a smaller execution window - not just the next batch. On
consecutive successes, it grows the chunk back, maximizing throughput without
breaking rate limits.
Request [T, T+4h] → 429 (too much data) │ policy shrinks: 4h → 2h, cursor.time_end shrinks to T+2h │ sleep(backoff + jitter) ▼ Retry [T, T+2h] → 429 │ policy shrinks: 2h → 1h, cursor.time_end shrinks to T+1h ▼ Retry [T, T+1h] → 200 ✓ ▼ Next [T+1h, T+2h] → 200 ✓ Next [T+2h, T+3h] → 200 ✓ (3 consecutive → grow) Next [T+3h, T+4.5h] → 200 ✓ (chunk grew: 1h → 1.5h) Chunk Size ▲ │ ┌──────────────────┐ │ ┌────┘ └────┐ │ ┌────┘ └────┐ │ │ ← success: grow │ │ │ failure: shrink → │ │ │ ┌───┘ │ │ ┌────┘ └───┴─────────────────────────────┴───────── Time ▶
Backoff uses exponential delay with jitter:
min(base × 2^n, cap) + rand(0, jitter_ms). Server-mandated Retry-After
headers override the policy backoff when present.
Data Flow
User calls stream.next()
│
▼
Engine reads from prefilled buffer
(prefetch hides network latency in steady state)
│
▼
adapter.build_request(cursor) → Request
│
▼
transport.send(request) → Response
│
├── success:
│ adapter.parse_response() → records
│ policy.adjust(cursor, true) → grow chunk on consecutive success
│ adapter.next_cursor() → advance or exhaust
│ std::async kicks prefetch for next chunk
│ return batch to caller
│
└── retryable error (429, 5xx, network failure):
policy.adjust(cursor, false) → shrink execution window
policy.backoff(retry_count) → exponential + jitter
retry immediately using reduced window (policy-adjusted cursor)Key Design Decisions
Double-buffering over thread pools
Two fixed slots (A and B) are chosen over a thread pool because memory usage is bounded and predictable, the workload is single-consumer and strictly ordered, and synchronization is trivial - one future per slot, no work queue.
JSON-config C API
The C API is intentionally minimal and stable, acting as a universal
interoperability layer across all language bindings.
stream_create(adapter_name, config_json) - the adapter name selects the
factory; the JSON string carries all configuration. Adding a new vendor adapter
requires zero C API changes.
Adapter abstraction
The VendorAdapter<T> interface encapsulates all API-specific logic: request
building, response parsing, retry classification, rate-limit header parsing, and
cursor advancement. The engine calls these methods and knows nothing about the
underlying API. This is proven by five real adapters: DatadogV1, DatadogV2
(timeseries + scalar), PageAdapter, and TokenAdapter.
Cursor as opaque state
The cursor is treated as an opaque state container by the engine - only the adapter defines its meaning and progression. This is what makes the system truly generic: time-window, page-number, and token-based pagination all use the same engine without any conditional logic.
DataPoint layout invariant
DataPoint is 40 bytes, trivially copyable, no padding. This layout is the
foundation of every zero-copy trick across the bindings. static_assert +
offsetof assertions catch any accidental field change at compile time.
Memory Budget (per stream)
Memory usage is strictly bounded and independent of total dataset size.
| Allocation | Size |
|---|---|
| Double-buffer (2 batches) | ~2 MB typical |
CurlTransport::write_buffer_ | ~200–500 KB |
| JSON parse overhead (transient) | ~2× body size |
| Total steady-state | ~2–5 MB |
Zero-Copy Across Languages
| Language | Technique | Copies |
|---|---|---|
| Python (no Arrow) | numpy from buffer | 1 |
| Python (Arrow) | Arrow C Data Interface via arrow::Buffer::Wrap() | 0 |
| Node.js | NewExternalBuffer → typed arrays | 1 |
| Go | unsafe.Pointer cast | 0 |
| Java | GetPrimitiveArrayCritical | 0 |
| Rust | Box<[DDPoint]> ownership transfer | 1 |
Error Handling
All error categories map to deterministic engine behavior, ensuring consistent retry semantics across all adapters:
| Category | Behaviour |
|---|---|
RateLimit (429) | Respect Retry-After header; exponential backoff; adaptive window shrink |
ServerError (5xx) | Exponential backoff + jitter, up to max_retries; adaptive shrink |
ClientError (4xx) | Propagate immediately - no retry |
ParseError | Propagate immediately - adapter sets last_error() |
TransportError | Network failure - retry with backoff |
Cancelled | User cancelled via stream.cancel() |
ExhaustedCursor | Adapter returned same cursor twice - stuck progress guard |
