lognitor (Python)
Python SDK for Lognitor with Django, Flask, and FastAPI integrations. Fully thread-safe.
Installation
Terminal
pip install lognitorQuick Start
Python
import lognitor
lognitor.init(
api_key="your-api-key",
service="my-python-app",
environment="production",
version="1.2.0",
)
lognitor.info("Server started", metadata={"port": 8000})
lognitor.error("Database connection failed", error=ConnectionError("ECONNREFUSED"))Configuration
Python
import lognitor
client = lognitor.init(
api_key="your-api-key",
service="payment-service",
environment="production",
version="2.1.0",
batch_size=25,
flush_interval=5.0,
max_queue_size=1000,
max_retries=3,
min_level="info",
enabled=True,
redact_patterns=["email", "creditCard", "ssn", "bearer"],
scrub_url_params=["token", "password", "secret", "authorization"],
auto_truncate=True,
max_breadcrumbs=100,
debug=False,
before_send=lambda log: log,
)Configuration Options Reference
| Option | Type | Default | Description |
|---|---|---|---|
api_key | str | — | Required. Your project API key. |
api_url | str | https://api.lognitor.com/api/v1 | API endpoint. |
service | str | None | Service/app name. |
environment | str | None | Environment label. |
version | str | None | App version. |
batch_size | int | 25 | Logs per batch. |
flush_interval | float | 5.0 | Auto-flush interval in seconds. |
max_retries | int | 3 | Retry count for failed requests. |
max_queue_size | int | 1000 | Maximum logs held in memory. |
min_level | str | None | None | Minimum log level. None sends all. |
enabled | bool | True | Master switch. |
auto_truncate | bool | False | Truncate instead of dropping oversized payloads. |
max_breadcrumbs | int | 100 | Max breadcrumbs in ring buffer. |
debug | bool | False | Print SDK internals to stderr. |
redact_patterns | list[str | Pattern] | [] | Built-in: email, creditCard, ssn, bearer. Or compiled regex. |
scrub_url_params | list[str] | [...] | Query params to replace with [SCRUBBED]. |
before_send | callable | None | def fn(log_dict) -> dict | None. Return None to drop. |
Log Levels
Python
lognitor.debug("Cache miss", metadata={"key": "user:123"})
lognitor.info("Order created", metadata={"order_id": "ord_456"})
lognitor.warn("Rate limit approaching", metadata={"usage": 850, "limit": 1000})
lognitor.error("Payment failed", error=ValueError("Card declined"))
lognitor.fatal("Database corrupted, shutting down")
lognitor.log("info", "Custom level call")Per-Log Options
Python
lognitor.info("User signed up",
metadata={"plan": "pro", "referrer": "google"},
tags=["signup", "marketing"],
user={"id": "user_123", "email": "alice@example.com"},
request={"method": "POST", "url": "/api/users", "status_code": 201, "duration_ms": 45},
perf={"duration_ms": 120, "memory_mb": 256},
trace={"trace_id": "abc123", "span_id": "def456"},
deploy={"commit": "a1b2c3d", "branch": "main"},
action="user.signup",
request_id="req_abc123",
session_id="sess_xyz",
notify=True,
)User Context
Python
lognitor.set_user({"id": "user_123", "email": "alice@example.com", "name": "Alice"})
lognitor.info("Profile updated") # Includes user_id: "user_123"
lognitor.info("Admin action", user={"id": "admin_1"}) # Override per-log
lognitor.clear_user()Error Capturing
Python
try:
process_payment(order)
except Exception as e:
lognitor.capture_exception(e,
metadata={"order_id": order.id, "amount": order.total},
tags=["payment", "critical"],
request_id="req_123",
)
lognitor.error("Payment failed", error=ValueError("Card declined"))
lognitor.error("External service error",
error={"type": "TimeoutError", "message": "Request timed out after 30s"},
)Breadcrumbs
Python
lognitor.add_breadcrumb(
type="http", category="api",
message="GET /api/users 200", level="info",
data={"duration_ms": 45},
)
lognitor.error("Failed to process order", error=Exception("Insufficient stock"))Timers
Python
timer = lognitor.start_timer()
result = heavy_computation()
timer.end("Computation finished",
metadata={"input_size": 1000},
perf={"db_queries": 5, "cache_hits": 12},
)Child Loggers
Python
client = lognitor.init(api_key="your-key", service="main-app")
payment_logger = client.child(
service="payment-module",
metadata={"module": "payment"},
tags=["payments"],
)
payment_logger.info("Processing payment")
stripe_logger = payment_logger.child(service="stripe-adapter")
stripe_logger.info("Charge created")Heartbeat Monitoring
Python
hb = lognitor.heartbeat("your-monitor-token")
hb.ping()
result = hb.wrap(lambda: sync_inventory())User Feedback
Python
log_id = lognitor.error("Checkout failed", error=Exception("Payment timeout"))
lognitor.submit_feedback(
event_id=log_id,
comments="The page froze when I clicked pay",
name="Alice",
email="alice@acme.com",
)Release Tracking
Python
release = lognitor.register_release(
version="2.1.0",
commit_hash="a1b2c3d4e5f6",
branch="main",
)Framework Integrations
Django
Python
# settings.py
MIDDLEWARE = [
"lognitor.integrations.django.LognitorMiddleware",
# ... other middleware
]
LOGNITOR = {
"api_key": "your-api-key",
"service": "my-django-app",
"environment": "production",
"ignore_routes": ["/health", "/admin/jsi18n"],
"capture_user": True,
}Flask
Python
from flask import Flask
from lognitor.integrations.flask import init_app
app = Flask(__name__)
client = init_app(
app,
api_key="your-api-key",
service="my-flask-app",
environment="production",
ignore_routes=["/health", "/static"],
)FastAPI
Python
from fastapi import FastAPI
from lognitor.integrations.fastapi import LognitorMiddleware
app = FastAPI()
app.add_middleware(
LognitorMiddleware,
api_key="your-api-key",
service="my-fastapi-app",
environment="production",
ignore_routes=["/health", "/", "/openapi.json"],
)What the middleware captures
Each integration automatically sends structured request data (method, path, status code, duration, IP, user agent) to the /ingest/requests endpoint for request analytics. Unhandled exceptions are captured automatically.
Python Logging Integration
Bridge Python's standard logging module to Lognitor:
Python
import logging
from lognitor.logging_handler import LogHandler
client = lognitor.init(api_key="your-key", service="my-app")
handler = LogHandler(client=client, level=logging.WARNING)
logger = logging.getLogger("my-app")
logger.addHandler(handler)
logger.warning("Disk usage high") # → sent as level "warn"
logger.error("Service unavailable") # → sent as level "error"Flush and Shutdown
Python
lognitor.flush() # Manually flush all buffered logs
lognitor.shutdown() # Graceful shutdown
# Also registers atexit and SIGTERM handlers for automatic flush on exit.Pause and Resume
Python
lognitor.pause() # Stop sending (still buffers)
lognitor.resume() # Resume sendingReconfigure
Python
lognitor.reconfigure(
min_level="warn",
enabled=False,
batch_size=50,
debug=True,
)Thread Safety
The Python SDK is fully thread-safe. Multiple threads can call info(), error(), etc. concurrently. Internal buffers and state use threading.Lock for synchronization.
Python
import threading, lognitor
lognitor.init(api_key="your-key", service="worker")
def worker(thread_id):
for i in range(100):
lognitor.info(f"Processing item {i}", metadata={"thread": thread_id})
threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()
lognitor.flush() # All 1000 logs delivered safely