Model swap

Replace model identities in multiple PDPs at once

You will need an API token to send HTTP requests. See Authentication for instructions.

Quick start

Create a project to organize your images. The project_id will be used in subsequent requests. See Creating a project for details.


Upload one or more PDP images to the project. This is a two-step process:

  1. Request a pre-signed upload URL
  2. PUT the image binary to that URL

Collect all file_id values for the next step. See Uploading images for details.


Start a model swap job by providing the identity to swap onto the images and the list of uploaded file IDs. See Starting a job for details.


Track job progress with SSE or webhooks. Filter events with your job ID and stop when you receive a terminal status. See Tracking progress for details.

Uploading identities

Before starting a swap job, you need an identity_code. You can either use an existing identity from your gallery or upload a new one.

import requests

api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
identity_image_path = "path/to/identity.jpg"

with open(identity_image_path, "rb") as f:
    response = requests.post(
        api_url + "/identity/upload",
        headers={"Authorization": "Bearer " + access_token},
        files={"image": f},
        data={"name": "Model A"},  # Optional custom name
    ).json()

identity_code = response["identity_code"]
print(f"Identity uploaded: {identity_code}")
Response
{
  "identity_code": "id_xyz...",  // IDENTITY_CODE
  "name": "Model A",
  "face_detected": true,
  "success": true,
  // ...
}

You can list existing identities using:

Creating a project

import requests

api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"

response = requests.post(
    api_url + "/project",
    headers={"Authorization": "Bearer " + access_token},
    json={"project_name": "my-campaign"},
).json()
project_id = response["project_id"]
project_name = response["project_name"]
Response
{
  "project_id": "abc123...",   // PROJECT_ID
  "project_name": "my-campaign"
}

Uploading images

You can repeat this process for each image you want to process. All file_id values must be collected to start a job.

import requests

api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
project_name = "my-campaign"
image_path = "path/to/image.jpg"

# Step 1: Get pre-signed upload URL
response = requests.post(
    api_url + "/upload",
    headers={"Authorization": "Bearer " + access_token},
    json={
        "project_name": project_name,
        "filename": "model-photo-1.jpg",
    },
).json()

upload_url = response["upload_url"]
content_type = response["content_type"]
file_id = response["file_id"]

# Step 2: Upload the image binary
with open(image_path, "rb") as f:
    requests.put(
        upload_url,
        headers={"Content-Type": content_type},
        data=f.read(),
    )

print(f"Uploaded file ID: {file_id}")
Response
{
  "upload_url": "https://s3...",     // Pre-signed PUT URL
  "download_url": "https://...",
  "project_id": "abc123...",
  "project_name": "my-campaign",
  "file_id": "img_001...",           // FILE_ID
  "filename": "model-photo-1.jpg",
  "content_type": "image/jpeg"
}

The upload_url is only valid for a limited time. Upload the image immediately after receiving the response.

Starting a job

import requests

api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"

project_id = "abc123..."
identity_code = "id_xyz..."  # From identity upload or gallery
file_ids = ["img_001...", "img_002...", "img_003..."]

response = requests.post(
    api_url + "/model-swap",
    headers={"Authorization": "Bearer " + access_token},
    json={
        "identity_code": identity_code,
        "project_id": project_id,
        "images": file_ids,
        "post_process": False,  # Optional: enable post-processing
        "swap_options": {
            "model": "auto",       # Optional: see "Swap options" below
            "num_variations": 1,   # Optional: 1–4 variations per input image
        },
    },
).json()

job_id = response["job_id"]
print(f"Job started: {job_id}")
Response
{
  "job_id": "job_abc123...",  // JOB_ID
  "status": "pending",
  "message": "Job created successfully"
}

Swap options

Fields inside the swap_options object that control how the swap runs.

ParameterTypeDefaultDescription
model"auto" | "onda" | "nano_banana_pro""auto"Which swap engine runs. auto and onda both use PiktID's proprietary Onda engine; nano_banana_pro swaps via Google's Nano Banana Pro.
num_variationsinteger (1–4)1Number of variation outputs produced per input image.
Request with swap options
{
  "identity_code": "id_xyz...",
  "project_id": "abc123...",
  "images": ["img_001...", "img_002..."],
  "swap_options": {
    "model": "nano_banana_pro",  
    "num_variations": 2
  }
}

The legacy use_alternative_method flag is still accepted for backward compatibility and is mapped internally to model: "nano_banana_pro". New integrations should use model directly.

Post-processing

Set post_process to true (default) in the job request to enable automatic post-processing of results. The job status will include post_processing_status to track this additional step.

Request with post-processing
{
  "identity_code": "id_xyz...",
  "project_id": "abc123...",
  "images": ["img_001...", "img_002..."],
  "post_process": true
}

AI-generated disclosure watermark

If your jurisdiction or distribution platform requires AI-generated images to be visibly marked as such (for example, the EU AI Act), set add_ai_watermark to true inside swap_options. When enabled, the batch image processor bakes a small "AI-generated" disclosure mark into the bottom-right corner of every output image (and its thumbnails). The mark is irreversible as it is part of the clean output file, not a removable overlay.

This flag is independent of the on-model branding watermark and defaults to false, so existing integrations are unaffected.

Request with AI disclosure
{
  "identity_code": "id_xyz...",
  "project_id": "abc123...",
  "images": ["img_001...", "img_002..."],
  "swap_options": {
    "add_ai_watermark": true
  }
}

Tracking progress

Use either SSE or webhooks to receive notifications for job updates.

import json
import requests

api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
job_id = "job_abc123..."

headers = {"Authorization": "Bearer " + access_token}

with requests.get(
    api_url + "/notifications/events",
    headers=headers,
    stream=True,
    timeout=600,
) as response:
    response.raise_for_status()

    for raw_line in response.iter_lines(decode_unicode=True):
        if not raw_line or raw_line.startswith(":"):
            continue
        if not raw_line.startswith("data: "):
            continue

        notification = json.loads(raw_line[6:])
        data = notification.get("data", {})
        task_id = data.get("id_task") or data.get("job_id")
        if task_id != job_id:
            continue

        print(f"Notification: {notification['name']}")
        print(f"Data: {data}")

        if notification["name"] == "completed":
            print("Job completed!")
            break
        if notification["name"] == "error":
            raise RuntimeError(str(data))
import hashlib
import hmac

import requests
from flask import Flask, abort, request

api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
job_id = "job_abc123..."
public_webhook_url = "https://example.com/webhooks/piktid"

setup = requests.put(
    api_url + "/webhooks",
    headers={"Authorization": "Bearer " + access_token},
    json={"url": public_webhook_url},
).json()
webhook_secret = setup["secret"]

app = Flask(__name__)


def verify_signature(secret: str, body: bytes, signature_header: str) -> bool:
    expected = "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
    return hmac.compare_digest(expected, signature_header)


@app.post("/webhooks/piktid")
def handle_piktid_webhook():
    body = request.get_data()
    signature = request.headers.get("X-Webhook-Signature", "")
    if not verify_signature(webhook_secret, body, signature):
        abort(401)

    notification = request.get_json()
    data = notification.get("data", {})
    task_id = data.get("id_task") or data.get("job_id")
    if task_id != job_id:
        return "", 204

    if notification["name"] == "completed":
        print("Job completed!")
    elif notification["name"] == "error":
        print(f"Error: {data}")

    return "", 204


app.run(port=8000)
Response
[
  {
    "id": 12345,
    "name": "completed",              // Notification type
    "timestamp": 1702819200.0,
    "data": {                         // Job-specific data
      "job_id": "job_abc123...",
      "status": "completed"
    }
  }
]

Use DELETE /notifications/{id} after processing events so they are not replayed on reconnect.

Retrieving results

Once the job is complete, retrieve the processed images.

import requests

api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
job_id = "job_abc123..."

response = requests.get(
    api_url + f"/jobs/{job_id}/results",
    headers={"Authorization": "Bearer " + access_token},
).json()

for result in response["results"]:
    print(f"Image {result['image_index']}: {result['url']}")
Response
{
  "job_id": "job_abc123...",
  "job_type": "model_swap",
  "status": "completed",
  "results": [
    {
      "image_index": 0,
      "url": "https://...",            // Result image URL
      "model_used": "onda",            // Engine that produced this output
      "status": "completed"
    },
    {
      "image_index": 1,
      "url": "https://...",
      "model_used": "onda",
      "status": "completed"
    }
  ],
  "summary": {
    // Job statistics
  }
}

Which engine produced each output

Each result carries a model_used string indicating which engine actually generated the image. Inspect this field when you need to know per output which model was used.

for result in response["results"]:
    print(f"Image {result['image_index']}: model_used = {result.get('model_used')}")

Bulk download

For bulk downloads, generate a temporary download URL that packages all results into a ZIP file.

import requests

api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
job_id = "job_abc123..."

# Generate download URL
response = requests.post(
    api_url + "/download",
    headers={"Authorization": "Bearer " + access_token},
    json={"job_id": job_id},
).json()

download_url = response["download_url"]
expires = response["expires"]

print(f"Download URL: {download_url}")
print(f"Expires: {expires}")

# Download the ZIP file (no auth required for the token URL)
zip_response = requests.get(download_url)
with open("results.zip", "wb") as f:
    f.write(zip_response.content)
Response
{
  "download_url": "https://v2.api.piktid.com/download/token123...",
  "expires": "2024-12-17T11:00:00Z"  // URL expiration time
}

On this page