lognitor (Python)

Python SDK for Lognitor with Django, Flask, and FastAPI integrations. Fully thread-safe.

Installation

Terminal
pip install lognitor

Quick Start

Python
import lognitor

lognitor.init(
api_key="your-api-key",
service="my-python-app",
environment="production",
version="1.2.0",
)

lognitor.info("Server started", metadata={"port": 8000})
lognitor.error("Database connection failed", error=ConnectionError("ECONNREFUSED"))

Configuration

Python
import lognitor

client = lognitor.init(
api_key="your-api-key",
service="payment-service",
environment="production",
version="2.1.0",

batch_size=25,
flush_interval=5.0,
max_queue_size=1000,
max_retries=3,

min_level="info",
enabled=True,

redact_patterns=["email", "creditCard", "ssn", "bearer"],
scrub_url_params=["token", "password", "secret", "authorization"],

auto_truncate=True,
max_breadcrumbs=100,
debug=False,
before_send=lambda log: log,
)

Configuration Options Reference

OptionTypeDefaultDescription
api_keystrRequired. Your project API key.
api_urlstrhttps://api.lognitor.com/api/v1API endpoint.
servicestrNoneService/app name.
environmentstrNoneEnvironment label.
versionstrNoneApp version.
batch_sizeint25Logs per batch.
flush_intervalfloat5.0Auto-flush interval in seconds.
max_retriesint3Retry count for failed requests.
max_queue_sizeint1000Maximum logs held in memory.
min_levelstr | NoneNoneMinimum log level. None sends all.
enabledboolTrueMaster switch.
auto_truncateboolFalseTruncate instead of dropping oversized payloads.
max_breadcrumbsint100Max breadcrumbs in ring buffer.
debugboolFalsePrint SDK internals to stderr.
redact_patternslist[str | Pattern][]Built-in: email, creditCard, ssn, bearer. Or compiled regex.
scrub_url_paramslist[str][...]Query params to replace with [SCRUBBED].
before_sendcallableNonedef fn(log_dict) -> dict | None. Return None to drop.

Log Levels

Python
lognitor.debug("Cache miss", metadata={"key": "user:123"})
lognitor.info("Order created", metadata={"order_id": "ord_456"})
lognitor.warn("Rate limit approaching", metadata={"usage": 850, "limit": 1000})
lognitor.error("Payment failed", error=ValueError("Card declined"))
lognitor.fatal("Database corrupted, shutting down")

lognitor.log("info", "Custom level call")

Per-Log Options

Python
lognitor.info("User signed up",
metadata={"plan": "pro", "referrer": "google"},
tags=["signup", "marketing"],
user={"id": "user_123", "email": "alice@example.com"},
request={"method": "POST", "url": "/api/users", "status_code": 201, "duration_ms": 45},
perf={"duration_ms": 120, "memory_mb": 256},
trace={"trace_id": "abc123", "span_id": "def456"},
deploy={"commit": "a1b2c3d", "branch": "main"},
action="user.signup",
request_id="req_abc123",
session_id="sess_xyz",
notify=True,
)

User Context

Python
lognitor.set_user({"id": "user_123", "email": "alice@example.com", "name": "Alice"})
lognitor.info("Profile updated")  # Includes user_id: "user_123"

lognitor.info("Admin action", user={"id": "admin_1"})  # Override per-log

lognitor.clear_user()

Error Capturing

Python
try:
process_payment(order)
except Exception as e:
lognitor.capture_exception(e,
  metadata={"order_id": order.id, "amount": order.total},
  tags=["payment", "critical"],
  request_id="req_123",
)

lognitor.error("Payment failed", error=ValueError("Card declined"))

lognitor.error("External service error",
error={"type": "TimeoutError", "message": "Request timed out after 30s"},
)
Python
lognitor.add_breadcrumb(
type="http", category="api",
message="GET /api/users 200", level="info",
data={"duration_ms": 45},
)

lognitor.error("Failed to process order", error=Exception("Insufficient stock"))

Timers

Python
timer = lognitor.start_timer()
result = heavy_computation()
timer.end("Computation finished",
metadata={"input_size": 1000},
perf={"db_queries": 5, "cache_hits": 12},
)

Child Loggers

Python
client = lognitor.init(api_key="your-key", service="main-app")

payment_logger = client.child(
service="payment-module",
metadata={"module": "payment"},
tags=["payments"],
)
payment_logger.info("Processing payment")

stripe_logger = payment_logger.child(service="stripe-adapter")
stripe_logger.info("Charge created")

Heartbeat Monitoring

Python
hb = lognitor.heartbeat("your-monitor-token")
hb.ping()

result = hb.wrap(lambda: sync_inventory())

User Feedback

Python
log_id = lognitor.error("Checkout failed", error=Exception("Payment timeout"))

lognitor.submit_feedback(
event_id=log_id,
comments="The page froze when I clicked pay",
name="Alice",
email="alice@acme.com",
)

Release Tracking

Python
release = lognitor.register_release(
version="2.1.0",
commit_hash="a1b2c3d4e5f6",
branch="main",
)

Framework Integrations

Django

Python
# settings.py
MIDDLEWARE = [
"lognitor.integrations.django.LognitorMiddleware",
# ... other middleware
]

LOGNITOR = {
"api_key": "your-api-key",
"service": "my-django-app",
"environment": "production",
"ignore_routes": ["/health", "/admin/jsi18n"],
"capture_user": True,
}

Flask

Python
from flask import Flask
from lognitor.integrations.flask import init_app

app = Flask(__name__)

client = init_app(
app,
api_key="your-api-key",
service="my-flask-app",
environment="production",
ignore_routes=["/health", "/static"],
)

FastAPI

Python
from fastapi import FastAPI
from lognitor.integrations.fastapi import LognitorMiddleware

app = FastAPI()

app.add_middleware(
LognitorMiddleware,
api_key="your-api-key",
service="my-fastapi-app",
environment="production",
ignore_routes=["/health", "/", "/openapi.json"],
)
What the middleware captures

Each integration automatically sends structured request data (method, path, status code, duration, IP, user agent) to the /ingest/requests endpoint for request analytics. Unhandled exceptions are captured automatically.

Python Logging Integration

Bridge Python's standard logging module to Lognitor:

Python
import logging
from lognitor.logging_handler import LogHandler

client = lognitor.init(api_key="your-key", service="my-app")

handler = LogHandler(client=client, level=logging.WARNING)
logger = logging.getLogger("my-app")
logger.addHandler(handler)

logger.warning("Disk usage high")    # → sent as level "warn"
logger.error("Service unavailable")  # → sent as level "error"

Flush and Shutdown

Python
lognitor.flush()    # Manually flush all buffered logs
lognitor.shutdown()  # Graceful shutdown
# Also registers atexit and SIGTERM handlers for automatic flush on exit.

Pause and Resume

Python
lognitor.pause()   # Stop sending (still buffers)
lognitor.resume()  # Resume sending

Reconfigure

Python
lognitor.reconfigure(
min_level="warn",
enabled=False,
batch_size=50,
debug=True,
)

Thread Safety

The Python SDK is fully thread-safe. Multiple threads can call info(), error(), etc. concurrently. Internal buffers and state use threading.Lock for synchronization.

Python
import threading, lognitor

lognitor.init(api_key="your-key", service="worker")

def worker(thread_id):
for i in range(100):
  lognitor.info(f"Processing item {i}", metadata={"thread": thread_id})

threads = [threading.Thread(target=worker, args=(i,)) for i in range(10)]
for t in threads: t.start()
for t in threads: t.join()

lognitor.flush()  # All 1000 logs delivered safely