TL;DR

Use requests for HTTP APIs with retry logic, the standard json module for JSON, pyyaml for YAML, and the official kubernetes client library for programmatic K8s access. Always set timeouts on HTTP calls.

HTTP Requests with Retries

Use requests.Session with a HTTPAdapter mounted to get automatic connection pooling and retry-on-failure; set explicit timeouts so scripts never hang forever.

pythonhttp_client.py
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry


def make_session(
    retries: int = 3,
    backoff: float = 1.0,
    status_forcelist: tuple = (429, 500, 502, 503, 504),
) -> requests.Session:
    """Create a requests.Session with retry + backoff configured."""
    session = requests.Session()
    retry = Retry(
        total=retries,
        backoff_factor=backoff,          # waits: 1s, 2s, 4s...
        status_forcelist=status_forcelist,
        allowed_methods=["GET", "POST", "PUT", "PATCH", "DELETE"],
        raise_on_status=False,
    )
    adapter = HTTPAdapter(max_retries=retry)
    session.mount("https://", adapter)
    session.mount("http://",  adapter)
    return session


# Example: call an internal service API
def get_resource(base_url: str, path: str, token: str) -> dict:
    session = make_session()
    url = f"{base_url.rstrip('/')}/{path.lstrip('/')}"
    resp = session.get(
        url,
        headers={"Authorization": f"Bearer {token}", "Accept": "application/json"},
        timeout=(5, 30),    # (connect_timeout, read_timeout) in seconds
    )
    resp.raise_for_status()
    return resp.json()


# Patch request with JSON body
def patch_resource(base_url: str, path: str, token: str, payload: dict) -> dict:
    session = make_session()
    resp = session.patch(
        f"{base_url}/{path}",
        json=payload,                    # automatically sets Content-Type: application/json
        headers={"Authorization": f"Bearer {token}"},
        timeout=(5, 30),
    )
    resp.raise_for_status()
    return resp.json()

JSON Processing

Parse, query, and transform JSON data; use jq-style field access with safe .get() calls to avoid KeyError on unexpected API responses.

pythonjson_utils.py
import json
from pathlib import Path


def jpath(obj: dict, *keys, default=None):
    """Safely traverse nested dict/list. jpath(d, 'spec', 'replicas', default=1)."""
    for key in keys:
        if isinstance(obj, dict):
            obj = obj.get(key, default)
        elif isinstance(obj, list) and isinstance(key, int):
            obj = obj[key] if 0 <= key < len(obj) else default
        else:
            return default
    return obj


# Pretty print JSON (useful in interactive sessions)
def pp(obj) -> None:
    print(json.dumps(obj, indent=2, default=str))


# Parse kubectl JSON output
import subprocess

def kubectl_json(*args) -> dict | list:
    """Run kubectl with -o json and return parsed output."""
    cmd = ["kubectl", *args, "-o", "json"]
    result = subprocess.run(cmd, capture_output=True, text=True, check=True)
    return json.loads(result.stdout)


# Example: extract image tags from all pods in a namespace
def list_images(namespace: str) -> list[dict]:
    pods = kubectl_json("get", "pods", "-n", namespace)
    images = []
    for pod in pods.get("items", []):
        name = jpath(pod, "metadata", "name")
        for c in jpath(pod, "spec", "containers", default=[]):
            images.append({"pod": name, "container": c["name"], "image": c["image"]})
    return images

YAML Processing

Load, patch, and emit Kubernetes YAML; always use yaml.safe_load (never bare yaml.load) and yaml.dump with default_flow_style=False for readable output.

pythonyaml_utils.py
import yaml
from pathlib import Path


def load_manifest(path: str) -> dict:
    return yaml.safe_load(Path(path).read_text())


def load_all_manifests(path: str) -> list[dict]:
    """Load multi-document YAML (documents separated by ---)."""
    return [doc for doc in yaml.safe_load_all(Path(path).read_text()) if doc]


def patch_image(manifest: dict, container: str, new_image: str) -> dict:
    """Update a container image in a Deployment/StatefulSet manifest."""
    containers = manifest["spec"]["template"]["spec"]["containers"]
    for c in containers:
        if c["name"] == container:
            c["image"] = new_image
    return manifest


def set_replicas(manifest: dict, count: int) -> dict:
    manifest["spec"]["replicas"] = count
    return manifest


# End-to-end: patch and print updated manifest
if __name__ == "__main__":
    m = load_manifest("deployment.yaml")
    m = patch_image(m, "web", "myapp:v2.1.0")
    m = set_replicas(m, 3)
    print(yaml.dump(m, default_flow_style=False))

Kubernetes Python Client

The official kubernetes library mirrors the K8s API; use it when you need to list resources, watch events, or apply patches from Python — better than shelling out to kubectl in long-running services.

pythonk8s_client.py
"""pip install kubernetes"""

from kubernetes import client, config, watch


def get_k8s_client():
    """Load in-cluster config if running as a pod, otherwise use local kubeconfig."""
    try:
        config.load_incluster_config()
    except config.ConfigException:
        config.load_kube_config()
    return client.CoreV1Api(), client.AppsV1Api()


# List pods and their status
def list_pods(namespace: str) -> list[dict]:
    v1, _ = get_k8s_client()
    pods = v1.list_namespaced_pod(namespace)
    return [
        {
            "name": p.metadata.name,
            "phase": p.status.phase,
            "restarts": sum(cs.restart_count for cs in (p.status.container_statuses or [])),
        }
        for p in pods.items
    ]


# Scale a deployment
def scale_deployment(namespace: str, name: str, replicas: int, dry_run: bool = True) -> None:
    _, apps = get_k8s_client()
    if dry_run:
        print(f"DRY RUN: scale {namespace}/{name} to {replicas}")
        return
    apps.patch_namespaced_deployment_scale(
        name=name,
        namespace=namespace,
        body={"spec": {"replicas": replicas}},
    )
    print(f"Scaled {namespace}/{name} to {replicas}")


# Watch for pod events in real time
def watch_pods(namespace: str):
    v1, _ = get_k8s_client()
    w = watch.Watch()
    for event in w.stream(v1.list_namespaced_pod, namespace=namespace, timeout_seconds=60):
        pod = event["object"]
        etype = event["type"]     # ADDED / MODIFIED / DELETED
        print(f"{etype} {pod.metadata.name} phase={pod.status.phase}")