SDK & Client Examples
InfraSage exposes a REST API. This page shows how to send telemetry and query the API from Python, Go, and Node.js. No official client library is required — the API is straightforward HTTP.
Authentication
All requests require an API key:
X-API-Key: isage_<your-key>
Get or create API keys from the Admin UI or via the API:
curl -X POST http://your-infrasage/api/v1/keys \
-H "Authorization: Bearer $JWT" \
-d '{"name": "my-service", "scope": "ingestion"}'
Python
Install dependencies
pip install requests
Send a single telemetry event
import requests
import time
GATEWAY_URL = "http://your-infrasage:8080"
API_KEY = "isage_your_key_here"
def send_metric(service_id: str, metric_name: str, value: float, tags: dict = None):
payload = {
"type": "metric",
"service_id": service_id,
"metric_name": metric_name,
"value": value,
"timestamp": int(time.time() * 1000),
"tags": tags or {},
}
resp = requests.post(
f"{GATEWAY_URL}/api/v1/telemetry",
json=payload,
headers={"X-API-Key": API_KEY},
timeout=5,
)
resp.raise_for_status()
return resp.json()
send_metric("payment-service", "request_latency_ms", 142.3, {"region": "eu-west-1"})
Send a batch
def send_batch(events: list[dict]):
resp = requests.post(
f"{GATEWAY_URL}/api/v1/telemetry/batch",
json={"events": events},
headers={"X-API-Key": API_KEY},
timeout=10,
)
resp.raise_for_status()
return resp.json()
events = [
{"type": "metric", "service_id": "payment-service", "metric_name": "rps", "value": 1240, "timestamp": int(time.time() * 1000)},
{"type": "metric", "service_id": "payment-service", "metric_name": "error_rate", "value": 0.02, "timestamp": int(time.time() * 1000)},
{"type": "log", "service_id": "payment-service", "log_body": "Payment processed OK", "severity": "info", "timestamp": int(time.time() * 1000)},
]
result = send_batch(events)
print(f"Accepted: {result['accepted']}, Rejected: {result['rejected']}")
Query recent anomalies
def get_anomalies(service_id: str = None, limit: int = 20):
params = {"limit": limit}
if service_id:
params["service_id"] = service_id
resp = requests.get(
f"{GATEWAY_URL}/api/v1/anomalies",
params=params,
headers={"X-API-Key": API_KEY},
timeout=5,
)
resp.raise_for_status()
return resp.json()["anomalies"]
for anomaly in get_anomalies("payment-service"):
print(f"[{anomaly['severity']}] {anomaly['service_id']} - {anomaly['metric_name']}: {anomaly['description']}")
Periodic metric reporter (background thread)
import threading
class InfraSageReporter:
def __init__(self, service_id: str, interval_seconds: int = 15):
self.service_id = service_id
self.interval = interval_seconds
self._stop = threading.Event()
def start(self):
t = threading.Thread(target=self._run, daemon=True)
t.start()
def stop(self):
self._stop.set()
def _run(self):
while not self._stop.wait(self.interval):
try:
self._report()
except Exception as e:
print(f"InfraSage report failed: {e}")
def _report(self):
import psutil
send_batch([
{"type": "metric", "service_id": self.service_id, "metric_name": "cpu_percent", "value": psutil.cpu_percent(), "timestamp": int(time.time() * 1000)},
{"type": "metric", "service_id": self.service_id, "metric_name": "memory_percent", "value": psutil.virtual_memory().percent, "timestamp": int(time.time() * 1000)},
])
Go
Send a single event
package main
import (
"bytes"
"encoding/json"
"fmt"
"net/http"
"time"
)
const (
gatewayURL = "http://your-infrasage:8080"
apiKey = "isage_your_key_here"
)
type TelemetryEvent struct {
Type string `json:"type"`
ServiceID string `json:"service_id"`
MetricName string `json:"metric_name,omitempty"`
Value float64 `json:"value,omitempty"`
LogBody string `json:"log_body,omitempty"`
Severity string `json:"severity,omitempty"`
Timestamp int64 `json:"timestamp"`
Tags map[string]string `json:"tags,omitempty"`
}
func sendEvent(event TelemetryEvent) error {
event.Timestamp = time.Now().UnixMilli()
body, err := json.Marshal(event)
if err != nil {
return err
}
req, err := http.NewRequest("POST", gatewayURL+"/api/v1/telemetry", bytes.NewReader(body))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-API-Key", apiKey)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
return fmt.Errorf("infrasage: unexpected status %d", resp.StatusCode)
}
return nil
}
func main() {
err := sendEvent(TelemetryEvent{
Type: "metric",
ServiceID: "payment-service",
MetricName: "request_latency_ms",
Value: 142.3,
Tags: map[string]string{"region": "eu-west-1"},
})
if err != nil {
fmt.Printf("error: %v\n", err)
}
}
Batch sender with buffering
type BatchSender struct {
buffer []TelemetryEvent
mu sync.Mutex
maxSize int
flushFn func([]TelemetryEvent) error
}
func NewBatchSender(maxSize int) *BatchSender {
s := &BatchSender{
maxSize: maxSize,
buffer: make([]TelemetryEvent, 0, maxSize),
}
s.flushFn = s.flush
return s
}
func (s *BatchSender) Add(e TelemetryEvent) {
s.mu.Lock()
defer s.mu.Unlock()
s.buffer = append(s.buffer, e)
if len(s.buffer) >= s.maxSize {
go s.flushFn(s.buffer)
s.buffer = make([]TelemetryEvent, 0, s.maxSize)
}
}
func (s *BatchSender) flush(events []TelemetryEvent) error {
body, _ := json.Marshal(map[string]any{"events": events})
req, _ := http.NewRequest("POST", gatewayURL+"/api/v1/telemetry/batch", bytes.NewReader(body))
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-API-Key", apiKey)
resp, err := http.DefaultClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
return nil
}
HTTP middleware for automatic tracing
func InfraSageMiddleware(serviceID string, next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
start := time.Now()
rw := &responseWriter{ResponseWriter: w, statusCode: 200}
next.ServeHTTP(rw, r)
latency := time.Since(start).Milliseconds()
go sendEvent(TelemetryEvent{
Type: "metric",
ServiceID: serviceID,
MetricName: "http_request_latency_ms",
Value: float64(latency),
Tags: map[string]string{
"method": r.Method,
"path": r.URL.Path,
"status_code": fmt.Sprintf("%d", rw.statusCode),
},
})
})
}
Node.js / TypeScript
Install dependencies
npm install axios
Client class
import axios, { AxiosInstance } from 'axios';
interface TelemetryEvent {
type: 'metric' | 'log' | 'trace' | 'event';
service_id: string;
metric_name?: string;
value?: number;
log_body?: string;
severity?: 'debug' | 'info' | 'warn' | 'error';
timestamp?: number;
tags?: Record<string, string>;
}
class InfraSageClient {
private client: AxiosInstance;
constructor(gatewayUrl: string, apiKey: string) {
this.client = axios.create({
baseURL: gatewayUrl,
headers: { 'X-API-Key': apiKey },
timeout: 5000,
});
}
async send(event: TelemetryEvent): Promise<void> {
await this.client.post('/api/v1/telemetry', {
...event,
timestamp: event.timestamp ?? Date.now(),
});
}
async sendBatch(events: TelemetryEvent[]): Promise<{ accepted: number; rejected: number }> {
const payload = events.map(e => ({ ...e, timestamp: e.timestamp ?? Date.now() }));
const { data } = await this.client.post('/api/v1/telemetry/batch', { events: payload });
return data;
}
async getAnomalies(serviceId?: string, limit = 20) {
const { data } = await this.client.get('/api/v1/anomalies', {
params: { service_id: serviceId, limit },
});
return data.anomalies;
}
}
// Usage
const sage = new InfraSageClient('http://your-infrasage:8080', 'isage_your_key_here');
await sage.send({
type: 'metric',
service_id: 'payment-service',
metric_name: 'checkout_duration_ms',
value: 234,
tags: { region: 'eu-west-1', version: 'v2.3.1' },
});
Express.js middleware
import { Request, Response, NextFunction } from 'express';
function infrasageMiddleware(client: InfraSageClient, serviceId: string) {
return (req: Request, res: Response, next: NextFunction) => {
const start = Date.now();
res.on('finish', () => {
client.send({
type: 'metric',
service_id: serviceId,
metric_name: 'http_request_latency_ms',
value: Date.now() - start,
tags: {
method: req.method,
path: req.route?.path ?? req.path,
status_code: String(res.statusCode),
},
}).catch(() => {}); // fire-and-forget, don't block response
});
next();
};
}
// In your Express app:
app.use(infrasageMiddleware(sage, 'api-gateway'));
Batch buffer with auto-flush
class BufferedSender {
private buffer: TelemetryEvent[] = [];
private timer: NodeJS.Timeout | null = null;
constructor(
private client: InfraSageClient,
private maxSize = 100,
private flushIntervalMs = 5000,
) {
this.scheduleFlush();
}
add(event: TelemetryEvent): void {
this.buffer.push(event);
if (this.buffer.length >= this.maxSize) {
this.flush();
}
}
private scheduleFlush(): void {
this.timer = setInterval(() => this.flush(), this.flushIntervalMs);
}
flush(): void {
if (this.buffer.length === 0) return;
const batch = this.buffer.splice(0);
this.client.sendBatch(batch).catch(err => console.error('InfraSage flush failed:', err));
}
destroy(): void {
if (this.timer) clearInterval(this.timer);
this.flush();
}
}
Error Handling
All endpoints return standard HTTP status codes. Retry on 429 (rate limit) and 503 (gateway unavailable) with exponential backoff. Do not retry 400 (invalid payload) or 401 (invalid API key).
# Python retry example
from tenacity import retry, stop_after_attempt, wait_exponential, retry_if_exception
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10),
retry=retry_if_exception(lambda e: isinstance(e, requests.HTTPError) and e.response.status_code in (429, 503)),
)
def send_with_retry(event):
return send_metric(**event)