Concepts

Webhooks

Receive notification events as signed HTTP callbacks

Webhooks deliver the same events as SSE, but push them to your server.

Configuration flow

  1. Create/update webhook URL with PUT /webhooks
  2. Receive and securely store the returned secret
  3. Verify signatures for every incoming request
  4. Optionally trigger a test ping with POST /webhooks/test

The webhook secret is only returned on create/update. Store it securely.

Payload and headers

Example payload:

{
  "id": 123,
  "name": "batch_edit",
  "data": {},
  "timestamp": 1702819200.0
}

Important headers:

  • X-Webhook-Signature: sha256=<hex-digest>
  • X-Webhook-Event: <event-name>

Signature verification (Python)

import hashlib
import hmac

def verify_signature(secret: str, body: bytes, signature_header: str) -> bool:
    expected = "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, signature_header)

If your server does not return a 2xx response, delivery is retried automatically with exponential backoff.

Testing webhooks

You can trigger a test ping with POST /webhooks/test. This sends a test event to your webhook URL, allowing you to verify your setup and signature verification logic.

The following is a fairly complete script which allows you to test your webhook endpoint locally (it requires the requests library):

#!/usr/bin/env python3
"""
Standalone webhook tester for local development.

Starts a local HTTP server, registers it as the webhook endpoint via the API,
then prints every incoming delivery with HMAC signature verification.

Usage:
    python etc/test_webhook.py --token <API_TOKEN> [options]

Example:
    python etc/test_webhook.py --token mytoken123
    python etc/test_webhook.py --token mytoken123 --port 9000
"""

import argparse
import hashlib
import hmac
import json
import re
import signal
import subprocess
import sys
import threading
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, HTTPServer

import requests

# ── ANSI colour helpers ────────────────────────────────────────────────────────

RESET = "\033[0m"
BOLD = "\033[1m"
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
DIM = "\033[2m"


def _c(color: str, text: str) -> str:
    return f"{color}{text}{RESET}"


def _ok(text: str) -> str:
    return _c(GREEN, f"✓ {text}")


def _err(text: str) -> str:
    return _c(RED, f"✗ {text}")


def _warn(text: str) -> str:
    return _c(YELLOW, f"⚠ {text}")


def _info(text: str) -> str:
    return _c(CYAN, f"→ {text}")


def _dim(text: str) -> str:
    return _c(DIM, text)


# ── HMAC verification ──────────────────────────────────────────────────────────


def _verify_signature(secret: str, body: bytes, header_value: str | None) -> bool:
    """Return True if the X-Webhook-Signature header matches the expected HMAC."""
    if not header_value:
        return False
    if not header_value.startswith("sha256="):
        return False
    expected_hex = hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
    provided_hex = header_value.removeprefix("sha256=")
    return hmac.compare_digest(expected_hex, provided_hex)


# ── Local webhook receiver ─────────────────────────────────────────────────────


class _WebhookHandler(BaseHTTPRequestHandler):
    """HTTP handler that prints and verifies each incoming webhook delivery."""

    # Set by the main thread before the server starts
    secret: str = ""

    def log_message(self, fmt, *args):  # suppress default access log
        pass

    def do_POST(self):
        length = int(self.headers.get("Content-Length", 0))
        body = self.rfile.read(length)

        event = self.headers.get("X-Webhook-Event", "<unknown>")
        signature_header = self.headers.get("X-Webhook-Signature")
        sig_ok = _verify_signature(self.server.secret, body, signature_header)  # type: ignore[attr-defined]

        timestamp = datetime.now(timezone.utc).strftime("%H:%M:%S.%f")[:-3]
        print(f"\n{_dim('─' * 60)}")
        print(f"  {BOLD}Delivery received{RESET}  {_dim(timestamp)}")
        print(f"  Event   : {_c(BOLD, event)}")

        sig_display = signature_header or "<missing>"
        if sig_ok:
            print(f"  Sig     : {_ok(sig_display)}")
        else:
            print(f"  Sig     : {_err(sig_display)}")

        try:
            parsed = json.loads(body)
            pretty = json.dumps(parsed, indent=4)
        except Exception:
            pretty = body.decode(errors="replace")

        for line in pretty.splitlines():
            print(f"  {_dim(line)}")

        # Always respond 200 so the server records success
        self.send_response(200)
        self.end_headers()
        self.wfile.write(b"OK")


class _WebhookServer(HTTPServer):
    """HTTPServer subclass that carries the HMAC secret alongside the socket."""

    def __init__(self, server_address, secret: str):
        self.secret = secret
        super().__init__(server_address, _WebhookHandler)


# ── API helpers ────────────────────────────────────────────────────────────────


class APIClient:
    def __init__(self, base_url: str, token: str):
        self.base = base_url.rstrip("/")
        self.session = requests.Session()
        self.session.headers.update({"Authorization": f"Bearer {token}"})

    def _url(self, path: str) -> str:
        return f"{self.base}{path}"

    def register_webhook(self, url: str) -> dict:
        resp = self.session.put(self._url("/webhooks"), json={"url": url})
        resp.raise_for_status()
        return resp.json()

    def trigger_test(self) -> dict:
        resp = self.session.post(self._url("/webhooks/test"))
        resp.raise_for_status()
        return resp.json()

    def delete_webhook(self) -> None:
        resp = self.session.delete(self._url("/webhooks"))
        if resp.status_code not in (200, 404):
            resp.raise_for_status()


# ── Entry point ────────────────────────────────────────────────────────────────


def _parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Local webhook receiver for testing the PiktID webhook system.",
        formatter_class=argparse.RawDescriptionHelpFormatter,
        epilog=__doc__,
    )
    parser.add_argument("--token", required=True, help="Your API token")
    parser.add_argument(
        "--hook-url",
        help="URL to register as webhook endpoint (overrides --host and --port). If not provided, a localhost.run tunnel will be created automatically.",
    )
    parser.add_argument(
        "--api-url", default="https://v2.api.piktid.com", help="API base URL"
    )
    parser.add_argument("--port", type=int, default=9876, help="Local listener port")
    parser.add_argument(
        "--host", default="localhost", help="Hostname for the local server URL"
    )
    parser.add_argument(
        "--no-clean",
        action="store_true",
        help="Do not delete the webhook configuration on exit",
    )
    return parser.parse_args()


def _start_tunnel(port: int) -> subprocess.Popen:
    """Start a localhost.run ssh tunnel for the given port and yield its output."""
    print(_info(f"Starting localhost.run tunnel for port {port}…"))
    # We pipe stdout and stderr to capture the generated domain name
    proc = subprocess.Popen(
        [
            "ssh",
            "-o",
            "StrictHostKeyChecking=no",
            "-R",
            f"80:localhost:{port}",
            "nokey@localhost.run",
            "--",
            "--output",
            "text",
        ],
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT,
        text=True,
        bufsize=1,  # line-buffered
    )
    return proc


def _get_tunnel_url(proc: subprocess.Popen) -> str | None:
    """Read the output of the tunnel process until the URL is found."""
    if not proc.stdout:
        return None

    # Pattern to match: https://<subdomain>.lhr.life
    pattern = re.compile(r"(https://[a-zA-Z0-9-]+\.lhr\.life)")

    for line in proc.stdout:
        # print(_dim(f"tunnel: {line.strip()}")) # optional debug
        match = pattern.search(line)
        if match:
            return match.group(1)

    return None


def main() -> None:
    args = _parse_args()

    client = APIClient(base_url=args.api_url, token=args.token)

    tunnel_proc = None

    if args.hook_url:
        local_url = args.hook_url
    else:
        # Start tunnel
        tunnel_proc = _start_tunnel(args.port)
        tunnel_url = _get_tunnel_url(tunnel_proc)
        if not tunnel_url:
            print(_err("Failed to get tunnel URL from localhost.run"))
            if tunnel_proc:
                tunnel_proc.terminate()
            sys.exit(1)

        local_url = tunnel_url
        print(_ok(f"Tunnel established: {local_url}"))

    # ── 1. Register the webhook ────────────────────────────────────────────────
    print(_info(f"Registering webhook → {local_url}"))
    try:
        data = client.register_webhook(local_url)
    except requests.HTTPError as exc:
        print(
            _err(
                f"Failed to register webhook: {exc.response.status_code} {exc.response.text}"
            )
        )
        sys.exit(1)
    except requests.ConnectionError:
        print(_err(f"Could not connect to API at {args.api_url}"))
        sys.exit(1)

    secret: str = data["secret"]
    action = "Created" if data.get("is_active") else "Updated"
    print(_ok(f"{action} webhook (id={data['id']})"))
    print(_dim(f"  Secret  : {secret}"))
    print(_dim(f"  URL     : {data['url']}"))

    # ── 2. Start local server ──────────────────────────────────────────────────
    server = _WebhookServer(("0.0.0.0", args.port), secret=secret)
    thread = threading.Thread(target=server.serve_forever, daemon=True)
    thread.start()
    print(_ok(f"Listening on port {args.port}"))

    # ── 3. Graceful shutdown on Ctrl+C ─────────────────────────────────────────
    def _shutdown(sig, frame):
        print(f"\n{_warn('Shutting down…')}")
        server.shutdown()
        if not args.no_clean:
            print(_info("Deleting webhook from API…"))
            try:
                client.delete_webhook()
                print(_ok("Webhook deleted"))
            except Exception as exc:
                print(_warn(f"Could not delete webhook: {exc}"))

        if tunnel_proc:
            print(_info("Closing tunnel…"))
            tunnel_proc.terminate()
            tunnel_proc.wait(timeout=2)

        sys.exit(0)

    signal.signal(signal.SIGTERM, _shutdown)

    # ── 4. Inform user + block ─────────────────────────────────────────────────
    print()
    print(_c(BOLD, "Waiting for webhook deliveries. Press Ctrl+C to stop."))
    print(
        _dim(
            "Tip: trigger a test ping with: "
            f"curl -X POST {args.api_url}/webhooks/test "
            f"-H 'Authorization: Bearer {args.token}'"
        )
    )

    # Block the main thread; KeyboardInterrupt (Ctrl+C) is portable on all platforms.
    try:
        threading.Event().wait()
    except KeyboardInterrupt:
        _shutdown(None, None)


if __name__ == "__main__":
    main()

On this page