Python Guide
Installation
bash
cmake -B build -DDDSTREAM_BUILD_PYTHON=ON -DCMAKE_BUILD_TYPE=Release cmake --build build -j$(nproc) export LD_LIBRARY_PATH="$(pwd)/build/source/core:$LD_LIBRARY_PATH" export PYTHONPATH="$(pwd)/build/source/bindings/python:$PYTHONPATH"
Config & Creation
python
import _ddstream as dd
import time
cfg = dd.DDStreamConfig()
cfg.api_key = "YOUR_DD_API_KEY"
cfg.app_key = "YOUR_DD_APP_KEY"
cfg.query = "avg:system.cpu.user{*}"
cfg.range_start = int(time.time()) - 86400 # last 24 hours
cfg.range_end = int(time.time())
cfg.granularity = 60 # 1-min resolution
# Optional tuning:
cfg.chunk_seconds = 14400 # 4-hour chunks
cfg.enable_prefetch = True
cfg.log_level = 0 # 0=NONE … 4=DEBUG
stream = dd.DDStream(cfg)
# or validate credentials first:
err = dd.DDStream.validate(cfg)
if err:
raise RuntimeError(f"Invalid credentials: {err}")Iteration Patterns
Two ways to iterate - choose based on your error-handling needs:
python
# Range-for: raises RuntimeError on stream error
for batch in stream:
print(f"{len(batch['timestamps'])} points")python
stream.reset()
while stream.has_next():
batch = stream.next_batch()
if batch is None:
print("Error:", stream.last_error())
break
print(f"{len(batch['timestamps'])} points")
print(f" progress: {stream.progress():.0%}")Batch Data
Each batch is a Python dict with numpy arrays. All arrays are parallel - index
i corresponds to the same time point across all arrays.
python
batch = stream.next_batch()
# Time and value arrays (numpy.ndarray, dtype int64/float64)
timestamps = batch["timestamps"] # Unix epoch seconds
avg_vals = batch["avg"]
min_vals = batch["min"]
max_vals = batch["max"]
sum_vals = batch["sum"]
# Chunk metadata
start_ms = batch["chunk_start_ms"]
end_ms = batch["chunk_end_ms"]
is_last = batch["is_last"]
# Statistics
print(f"Mean CPU: {avg_vals.mean():.2f}%")
print(f"Peak CPU: {max_vals.max():.2f}%")GIL & Threading
The Python binding releases the GIL during HTTP fetches (py::gil_scoped_release)
and re-acquires it before returning data. This means DDStream is safe to use with
ThreadPoolExecutor - multiple streams can fetch concurrently without blocking each other.
python
from concurrent.futures import ThreadPoolExecutor
import _ddstream as dd
import time
def fetch_metric(query):
cfg = dd.DDStreamConfig()
cfg.api_key = "..."
cfg.app_key = "..."
cfg.query = query
cfg.range_start = int(time.time()) - 3600
cfg.range_end = int(time.time())
results = []
for batch in dd.DDStream(cfg):
results.extend(batch["avg"].tolist())
return results
queries = ["avg:system.cpu.user{*}", "avg:system.mem.used{*}"]
with ThreadPoolExecutor(max_workers=2) as pool:
futures = [pool.submit(fetch_metric, q) for q in queries]
data = [f.result() for f in futures]Arrow Support
Build with -DDDSTREAM_ENABLE_ARROW=ON to enable zero-copy Arrow output.
The C API returns an Arrow IPC RecordBatch that you can deserialize directly into
a PyArrow table with no copies.
python
# Build with Arrow:
# cmake -B build -DDDSTREAM_BUILD_PYTHON=ON -DDDSTREAM_ENABLE_ARROW=ON
import pyarrow as pa
import _ddstream as dd
cfg = dd.DDStreamConfig()
cfg.enable_arrow = True
# ... set other fields ...
stream = dd.DDStream(cfg)
for ipc_bytes in stream:
# ipc_bytes is a bytes object containing an Arrow IPC RecordBatch
reader = pa.ipc.open_stream(ipc_bytes)
table = reader.read_all()
# Columns: timestamp, avg, min, max, sum
print(table.schema)