TL;DR

Use subprocess.run() with a list of arguments (never shell=True with user input), check return codes, and capture output as text. For kubectl, prefer the Python client from the APIs page in production automation; subprocess is fine for one-off scripts and wrappers.

subprocess Basics

The safest form is subprocess.run() with a list — it avoids shell injection, handles paths with spaces, and gives you stdout, stderr, and exit code cleanly.

pythonsubprocess_basics.py
import subprocess


# Run and capture output — raises CalledProcessError on non-zero exit
result = subprocess.run(
    ["kubectl", "get", "pods", "-n", "default", "-o", "json"],
    capture_output=True,
    text=True,
    check=True,    # raise exception if exit code != 0
)
print(result.stdout)


# Run without capturing (output goes to terminal)
subprocess.run(["kubectl", "apply", "-f", "manifest.yaml"], check=True)


# Check exit code without raising
result = subprocess.run(["kubectl", "get", "ns", "my-ns"], capture_output=True, text=True)
if result.returncode != 0:
    print(f"Namespace does not exist: {result.stderr.strip()}")


# With a timeout (prevent hanging)
try:
    result = subprocess.run(
        ["kubectl", "wait", "--for=condition=ready", "pod", "-l", "app=web", "--timeout=120s"],
        capture_output=True, text=True, timeout=130
    )
except subprocess.TimeoutExpired:
    print("Timed out waiting for pods to become ready")


# AVOID shell=True with user-supplied values — this is a shell injection risk:
# subprocess.run(f"kubectl get pods -n {user_input}", shell=True)  # DANGEROUS

kubectl Wrapper

A thin Python wrapper around kubectl lets you centralise kubeconfig handling, dry-run logic, and output parsing while keeping scripts readable.

pythonkubectl_wrapper.py
import json
import subprocess
from typing import Any


class KubectlError(RuntimeError):
    pass


class Kubectl:
    def __init__(self, kubeconfig: str | None = None, context: str | None = None, dry_run: bool = False):
        self.base_cmd = ["kubectl"]
        if kubeconfig:
            self.base_cmd += ["--kubeconfig", kubeconfig]
        if context:
            self.base_cmd += ["--context", context]
        self.dry_run = dry_run

    def run(self, *args: str, check: bool = True) -> subprocess.CompletedProcess:
        cmd = self.base_cmd + list(args)
        result = subprocess.run(cmd, capture_output=True, text=True)
        if check and result.returncode != 0:
            raise KubectlError(f"kubectl {' '.join(args)} failed:\n{result.stderr.strip()}")
        return result

    def get_json(self, *args: str) -> Any:
        result = self.run(*args, "-o", "json")
        return json.loads(result.stdout)

    def get_pods(self, namespace: str) -> list[dict]:
        data = self.get_json("get", "pods", "-n", namespace)
        return data.get("items", [])

    def apply(self, manifest_path: str) -> None:
        if self.dry_run:
            print(f"DRY RUN: kubectl apply -f {manifest_path}")
            return
        self.run("apply", "-f", manifest_path)

    def rollout_restart(self, namespace: str, deployment: str) -> None:
        if self.dry_run:
            print(f"DRY RUN: rollout restart deploy/{deployment} -n {namespace}")
            return
        self.run("rollout", "restart", f"deploy/{deployment}", "-n", namespace)


# Usage
k = Kubectl(context="prod-cluster", dry_run=True)
pods = k.get_pods("my-app")
print(f"Found {len(pods)} pods")
k.rollout_restart("my-app", "web")

Streaming Output

Use subprocess.Popen when you need to stream output line-by-line in real time — for example, following a rollout or tailing logs in an automation script.

pythonstreaming.py
import subprocess
import sys


def stream_command(*args: str) -> int:
    """Run a command and stream stdout line-by-line. Returns exit code."""
    with subprocess.Popen(
        args,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,   # merge stderr into stdout
        text=True,
        bufsize=1,                  # line-buffered
    ) as proc:
        for line in proc.stdout:
            print(line, end="", flush=True)
    return proc.returncode


# Watch a rollout until completion
rc = stream_command(
    "kubectl", "rollout", "status", "deploy/web", "-n", "prod", "--timeout=5m"
)
if rc != 0:
    print("Rollout did not complete successfully", file=sys.stderr)
    sys.exit(1)

# Follow pod logs for a limited time
stream_command(
    "kubectl", "logs", "-f", "-n", "prod", "-l", "app=web", "--max-log-requests=5"
)

Gotchas

  • !Never use shell=True with variables you don't fully control — it opens shell injection. Use a list of arguments instead.
  • !Always set a timeout on subprocess calls in automation; a hung kubectl wait can block a CI job for hours.
  • !check=True raises CalledProcessError — catch it explicitly if you want to handle non-zero exit codes gracefully.
  • !Kubeconfig context: scripts inherit the shell's KUBECONFIG and current context — always pass --context explicitly in production scripts.