Model swap
Replace model identities in multiple PDPs at once
You will need an API token to send HTTP requests. See Authentication for instructions.
Quick start
Create a project to organize your images. The project_id will be used in subsequent requests. See Creating a project for details.
Upload one or more PDP images to the project. This is a two-step process:
- Request a pre-signed upload URL
- PUT the image binary to that URL
Collect all file_id values for the next step. See Uploading images for details.
Start a model swap job by providing the identity to swap onto the images and the list of uploaded file IDs. See Starting a job for details.
Track job progress with SSE or webhooks. Filter events with your job ID and stop when you receive a terminal status. See Tracking progress for details.
Uploading identities
Before starting a swap job, you need an identity_code. You can either use an existing identity from your gallery or upload a new one.
import requests
api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
identity_image_path = "path/to/identity.jpg"
with open(identity_image_path, "rb") as f:
response = requests.post(
api_url + "/identity/upload",
headers={"Authorization": "Bearer " + access_token},
files={"image": f},
data={"name": "Model A"}, # Optional custom name
).json()
identity_code = response["identity_code"]
print(f"Identity uploaded: {identity_code}"){
"identity_code": "id_xyz...", // IDENTITY_CODE
"name": "Model A",
"face_detected": true,
"success": true,
// ...
}You can list existing identities using:
Creating a project
import requests
api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
response = requests.post(
api_url + "/project",
headers={"Authorization": "Bearer " + access_token},
json={"project_name": "my-campaign"},
).json()
project_id = response["project_id"]
project_name = response["project_name"]{
"project_id": "abc123...", // PROJECT_ID
"project_name": "my-campaign"
}Uploading images
You can repeat this process for each image you want to process. All file_id values must be collected to start a job.
import requests
api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
project_name = "my-campaign"
image_path = "path/to/image.jpg"
# Step 1: Get pre-signed upload URL
response = requests.post(
api_url + "/upload",
headers={"Authorization": "Bearer " + access_token},
json={
"project_name": project_name,
"filename": "model-photo-1.jpg",
},
).json()
upload_url = response["upload_url"]
content_type = response["content_type"]
file_id = response["file_id"]
# Step 2: Upload the image binary
with open(image_path, "rb") as f:
requests.put(
upload_url,
headers={"Content-Type": content_type},
data=f.read(),
)
print(f"Uploaded file ID: {file_id}"){
"upload_url": "https://s3...", // Pre-signed PUT URL
"download_url": "https://...",
"project_id": "abc123...",
"project_name": "my-campaign",
"file_id": "img_001...", // FILE_ID
"filename": "model-photo-1.jpg",
"content_type": "image/jpeg"
}The upload_url is only valid for a limited time. Upload the image immediately after receiving the response.
Starting a job
import requests
api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
project_id = "abc123..."
identity_code = "id_xyz..." # From identity upload or gallery
file_ids = ["img_001...", "img_002...", "img_003..."]
response = requests.post(
api_url + "/model-swap",
headers={"Authorization": "Bearer " + access_token},
json={
"identity_code": identity_code,
"project_id": project_id,
"images": file_ids,
"post_process": False, # Optional: enable post-processing
"swap_options": {
"model": "auto", # Optional: see "Swap options" below
"num_variations": 1, # Optional: 1–4 variations per input image
},
},
).json()
job_id = response["job_id"]
print(f"Job started: {job_id}"){
"job_id": "job_abc123...", // JOB_ID
"status": "pending",
"message": "Job created successfully"
}Swap options
Fields inside the swap_options object that control how the swap runs.
| Parameter | Type | Default | Description |
|---|---|---|---|
model | "auto" | "onda" | "nano_banana_pro" | "auto" | Which swap engine runs. auto and onda both use PiktID's proprietary Onda engine; nano_banana_pro swaps via Google's Nano Banana Pro. |
num_variations | integer (1–4) | 1 | Number of variation outputs produced per input image. |
{
"identity_code": "id_xyz...",
"project_id": "abc123...",
"images": ["img_001...", "img_002..."],
"swap_options": {
"model": "nano_banana_pro",
"num_variations": 2
}
}The legacy use_alternative_method flag is still accepted for backward compatibility and is mapped internally to model: "nano_banana_pro". New integrations should use model directly.
Post-processing
Set post_process to true (default) in the job request to enable automatic post-processing of results. The job status will include post_processing_status to track this additional step.
{
"identity_code": "id_xyz...",
"project_id": "abc123...",
"images": ["img_001...", "img_002..."],
"post_process": true
}AI-generated disclosure watermark
If your jurisdiction or distribution platform requires AI-generated images to be visibly marked as such (for example, the EU AI Act), set add_ai_watermark to true inside swap_options. When enabled, the batch image processor bakes a small "AI-generated" disclosure mark into the bottom-right corner of every output image (and its thumbnails). The mark is irreversible as it is part of the clean output file, not a removable overlay.
This flag is independent of the on-model branding watermark and defaults to false, so existing integrations are unaffected.
{
"identity_code": "id_xyz...",
"project_id": "abc123...",
"images": ["img_001...", "img_002..."],
"swap_options": {
"add_ai_watermark": true
}
}Tracking progress
Use either SSE or webhooks to receive notifications for job updates.
import json
import requests
api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
job_id = "job_abc123..."
headers = {"Authorization": "Bearer " + access_token}
with requests.get(
api_url + "/notifications/events",
headers=headers,
stream=True,
timeout=600,
) as response:
response.raise_for_status()
for raw_line in response.iter_lines(decode_unicode=True):
if not raw_line or raw_line.startswith(":"):
continue
if not raw_line.startswith("data: "):
continue
notification = json.loads(raw_line[6:])
data = notification.get("data", {})
task_id = data.get("id_task") or data.get("job_id")
if task_id != job_id:
continue
print(f"Notification: {notification['name']}")
print(f"Data: {data}")
if notification["name"] == "completed":
print("Job completed!")
break
if notification["name"] == "error":
raise RuntimeError(str(data))import hashlib
import hmac
import requests
from flask import Flask, abort, request
api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
job_id = "job_abc123..."
public_webhook_url = "https://example.com/webhooks/piktid"
setup = requests.put(
api_url + "/webhooks",
headers={"Authorization": "Bearer " + access_token},
json={"url": public_webhook_url},
).json()
webhook_secret = setup["secret"]
app = Flask(__name__)
def verify_signature(secret: str, body: bytes, signature_header: str) -> bool:
expected = "sha256=" + hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest()
return hmac.compare_digest(expected, signature_header)
@app.post("/webhooks/piktid")
def handle_piktid_webhook():
body = request.get_data()
signature = request.headers.get("X-Webhook-Signature", "")
if not verify_signature(webhook_secret, body, signature):
abort(401)
notification = request.get_json()
data = notification.get("data", {})
task_id = data.get("id_task") or data.get("job_id")
if task_id != job_id:
return "", 204
if notification["name"] == "completed":
print("Job completed!")
elif notification["name"] == "error":
print(f"Error: {data}")
return "", 204
app.run(port=8000)[
{
"id": 12345,
"name": "completed", // Notification type
"timestamp": 1702819200.0,
"data": { // Job-specific data
"job_id": "job_abc123...",
"status": "completed"
}
}
]Use DELETE /notifications/{id} after processing events so they are not replayed on reconnect.
Retrieving results
Once the job is complete, retrieve the processed images.
import requests
api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
job_id = "job_abc123..."
response = requests.get(
api_url + f"/jobs/{job_id}/results",
headers={"Authorization": "Bearer " + access_token},
).json()
for result in response["results"]:
print(f"Image {result['image_index']}: {result['url']}"){
"job_id": "job_abc123...",
"job_type": "model_swap",
"status": "completed",
"results": [
{
"image_index": 0,
"url": "https://...", // Result image URL
"model_used": "onda", // Engine that produced this output
"status": "completed"
},
{
"image_index": 1,
"url": "https://...",
"model_used": "onda",
"status": "completed"
}
],
"summary": {
// Job statistics
}
}Which engine produced each output
Each result carries a model_used string indicating which engine actually generated the image. Inspect this field when you need to know per output which model was used.
for result in response["results"]:
print(f"Image {result['image_index']}: model_used = {result.get('model_used')}")Bulk download
For bulk downloads, generate a temporary download URL that packages all results into a ZIP file.
import requests
api_url = "https://v2.api.piktid.com"
access_token = "your_access_token"
job_id = "job_abc123..."
# Generate download URL
response = requests.post(
api_url + "/download",
headers={"Authorization": "Bearer " + access_token},
json={"job_id": job_id},
).json()
download_url = response["download_url"]
expires = response["expires"]
print(f"Download URL: {download_url}")
print(f"Expires: {expires}")
# Download the ZIP file (no auth required for the token URL)
zip_response = requests.get(download_url)
with open("results.zip", "wb") as f:
f.write(zip_response.content){
"download_url": "https://v2.api.piktid.com/download/token123...",
"expires": "2024-12-17T11:00:00Z" // URL expiration time
}