Concepts
Webhooks
Receive notification events as signed HTTP callbacks
Webhooks deliver the same events as SSE, but push them to your server.
Configuration flow
- Create/update webhook URL with
PUT /webhooks - Receive and securely store the returned
secret - Verify signatures for every incoming request
- Optionally trigger a test ping with
POST /webhooks/test
The webhook secret is only returned on create/update. Store it securely.
Payload and headers
Example payload:
{
"id": 123,
"name": "batch_edit",
"data": {},
"timestamp": 1702819200.0
}Important headers:
X-Webhook-Signature: sha256=<hex-digest>X-Webhook-Event: <event-name>
Signature verification (Python)
import hashlib
import hmac
def verify_signature(secret: str, body: bytes, signature_header: str) -> bool:
expected = "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature_header)If your server does not return a 2xx response, delivery is retried automatically with exponential backoff.
Testing webhooks
You can trigger a test ping with POST /webhooks/test. This sends a test event to your webhook URL, allowing you to verify your setup and signature verification logic.
The following is a fairly complete script which allows you to test your webhook endpoint locally (it requires the requests library):
#!/usr/bin/env python3
"""
Standalone webhook tester for local development.
Starts a local HTTP server, registers it as the webhook endpoint via the API,
then prints every incoming delivery with HMAC signature verification.
Usage:
python etc/test_webhook.py --token <API_TOKEN> [options]
Example:
python etc/test_webhook.py --token mytoken123
python etc/test_webhook.py --token mytoken123 --port 9000
"""
import argparse
import hashlib
import hmac
import json
import re
import signal
import subprocess
import sys
import threading
from datetime import datetime, timezone
from http.server import BaseHTTPRequestHandler, HTTPServer
import requests
# ── ANSI colour helpers ────────────────────────────────────────────────────────
RESET = "\033[0m"
BOLD = "\033[1m"
GREEN = "\033[92m"
RED = "\033[91m"
YELLOW = "\033[93m"
CYAN = "\033[96m"
DIM = "\033[2m"
def _c(color: str, text: str) -> str:
return f"{color}{text}{RESET}"
def _ok(text: str) -> str:
return _c(GREEN, f"✓ {text}")
def _err(text: str) -> str:
return _c(RED, f"✗ {text}")
def _warn(text: str) -> str:
return _c(YELLOW, f"⚠ {text}")
def _info(text: str) -> str:
return _c(CYAN, f"→ {text}")
def _dim(text: str) -> str:
return _c(DIM, text)
# ── HMAC verification ──────────────────────────────────────────────────────────
def _verify_signature(secret: str, body: bytes, header_value: str | None) -> bool:
"""Return True if the X-Webhook-Signature header matches the expected HMAC."""
if not header_value:
return False
if not header_value.startswith("sha256="):
return False
expected_hex = hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
provided_hex = header_value.removeprefix("sha256=")
return hmac.compare_digest(expected_hex, provided_hex)
# ── Local webhook receiver ─────────────────────────────────────────────────────
class _WebhookHandler(BaseHTTPRequestHandler):
"""HTTP handler that prints and verifies each incoming webhook delivery."""
# Set by the main thread before the server starts
secret: str = ""
def log_message(self, fmt, *args): # suppress default access log
pass
def do_POST(self):
length = int(self.headers.get("Content-Length", 0))
body = self.rfile.read(length)
event = self.headers.get("X-Webhook-Event", "<unknown>")
signature_header = self.headers.get("X-Webhook-Signature")
sig_ok = _verify_signature(self.server.secret, body, signature_header) # type: ignore[attr-defined]
timestamp = datetime.now(timezone.utc).strftime("%H:%M:%S.%f")[:-3]
print(f"\n{_dim('─' * 60)}")
print(f" {BOLD}Delivery received{RESET} {_dim(timestamp)}")
print(f" Event : {_c(BOLD, event)}")
sig_display = signature_header or "<missing>"
if sig_ok:
print(f" Sig : {_ok(sig_display)}")
else:
print(f" Sig : {_err(sig_display)}")
try:
parsed = json.loads(body)
pretty = json.dumps(parsed, indent=4)
except Exception:
pretty = body.decode(errors="replace")
for line in pretty.splitlines():
print(f" {_dim(line)}")
# Always respond 200 so the server records success
self.send_response(200)
self.end_headers()
self.wfile.write(b"OK")
class _WebhookServer(HTTPServer):
"""HTTPServer subclass that carries the HMAC secret alongside the socket."""
def __init__(self, server_address, secret: str):
self.secret = secret
super().__init__(server_address, _WebhookHandler)
# ── API helpers ────────────────────────────────────────────────────────────────
class APIClient:
def __init__(self, base_url: str, token: str):
self.base = base_url.rstrip("/")
self.session = requests.Session()
self.session.headers.update({"Authorization": f"Bearer {token}"})
def _url(self, path: str) -> str:
return f"{self.base}{path}"
def register_webhook(self, url: str) -> dict:
resp = self.session.put(self._url("/webhooks"), json={"url": url})
resp.raise_for_status()
return resp.json()
def trigger_test(self) -> dict:
resp = self.session.post(self._url("/webhooks/test"))
resp.raise_for_status()
return resp.json()
def delete_webhook(self) -> None:
resp = self.session.delete(self._url("/webhooks"))
if resp.status_code not in (200, 404):
resp.raise_for_status()
# ── Entry point ────────────────────────────────────────────────────────────────
def _parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Local webhook receiver for testing the PiktID webhook system.",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
parser.add_argument("--token", required=True, help="Your API token")
parser.add_argument(
"--hook-url",
help="URL to register as webhook endpoint (overrides --host and --port). If not provided, a localhost.run tunnel will be created automatically.",
)
parser.add_argument(
"--api-url", default="https://v2.api.piktid.com", help="API base URL"
)
parser.add_argument("--port", type=int, default=9876, help="Local listener port")
parser.add_argument(
"--host", default="localhost", help="Hostname for the local server URL"
)
parser.add_argument(
"--no-clean",
action="store_true",
help="Do not delete the webhook configuration on exit",
)
return parser.parse_args()
def _start_tunnel(port: int) -> subprocess.Popen:
"""Start a localhost.run ssh tunnel for the given port and yield its output."""
print(_info(f"Starting localhost.run tunnel for port {port}…"))
# We pipe stdout and stderr to capture the generated domain name
proc = subprocess.Popen(
[
"ssh",
"-o",
"StrictHostKeyChecking=no",
"-R",
f"80:localhost:{port}",
"nokey@localhost.run",
"--",
"--output",
"text",
],
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
text=True,
bufsize=1, # line-buffered
)
return proc
def _get_tunnel_url(proc: subprocess.Popen) -> str | None:
"""Read the output of the tunnel process until the URL is found."""
if not proc.stdout:
return None
# Pattern to match: https://<subdomain>.lhr.life
pattern = re.compile(r"(https://[a-zA-Z0-9-]+\.lhr\.life)")
for line in proc.stdout:
# print(_dim(f"tunnel: {line.strip()}")) # optional debug
match = pattern.search(line)
if match:
return match.group(1)
return None
def main() -> None:
args = _parse_args()
client = APIClient(base_url=args.api_url, token=args.token)
tunnel_proc = None
if args.hook_url:
local_url = args.hook_url
else:
# Start tunnel
tunnel_proc = _start_tunnel(args.port)
tunnel_url = _get_tunnel_url(tunnel_proc)
if not tunnel_url:
print(_err("Failed to get tunnel URL from localhost.run"))
if tunnel_proc:
tunnel_proc.terminate()
sys.exit(1)
local_url = tunnel_url
print(_ok(f"Tunnel established: {local_url}"))
# ── 1. Register the webhook ────────────────────────────────────────────────
print(_info(f"Registering webhook → {local_url}"))
try:
data = client.register_webhook(local_url)
except requests.HTTPError as exc:
print(
_err(
f"Failed to register webhook: {exc.response.status_code} {exc.response.text}"
)
)
sys.exit(1)
except requests.ConnectionError:
print(_err(f"Could not connect to API at {args.api_url}"))
sys.exit(1)
secret: str = data["secret"]
action = "Created" if data.get("is_active") else "Updated"
print(_ok(f"{action} webhook (id={data['id']})"))
print(_dim(f" Secret : {secret}"))
print(_dim(f" URL : {data['url']}"))
# ── 2. Start local server ──────────────────────────────────────────────────
server = _WebhookServer(("0.0.0.0", args.port), secret=secret)
thread = threading.Thread(target=server.serve_forever, daemon=True)
thread.start()
print(_ok(f"Listening on port {args.port}"))
# ── 3. Graceful shutdown on Ctrl+C ─────────────────────────────────────────
def _shutdown(sig, frame):
print(f"\n{_warn('Shutting down…')}")
server.shutdown()
if not args.no_clean:
print(_info("Deleting webhook from API…"))
try:
client.delete_webhook()
print(_ok("Webhook deleted"))
except Exception as exc:
print(_warn(f"Could not delete webhook: {exc}"))
if tunnel_proc:
print(_info("Closing tunnel…"))
tunnel_proc.terminate()
tunnel_proc.wait(timeout=2)
sys.exit(0)
signal.signal(signal.SIGTERM, _shutdown)
# ── 4. Inform user + block ─────────────────────────────────────────────────
print()
print(_c(BOLD, "Waiting for webhook deliveries. Press Ctrl+C to stop."))
print(
_dim(
"Tip: trigger a test ping with: "
f"curl -X POST {args.api_url}/webhooks/test "
f"-H 'Authorization: Bearer {args.token}'"
)
)
# Block the main thread; KeyboardInterrupt (Ctrl+C) is portable on all platforms.
try:
threading.Event().wait()
except KeyboardInterrupt:
_shutdown(None, None)
if __name__ == "__main__":
main()