---
jupytext:
  text_representation:
    extension: .md
    format_name: myst
    format_version: 0.13
    jupytext_version: 1.16.0
kernelspec:
  name: python3
  display_name: Python 3
---

# Chapitre 19 — Patterns avancés

Les chapitres précédents ont couvert les bases de REST, l'authentification, le design, et l'observabilité. Ce chapitre traite des situations que ces bases ne couvrent pas : les opérations longues, les mises à jour partielles complexes, les uploads de fichiers volumineux, le streaming, et les APIs événementielles. Ces patterns apparaissent inévitablement dans des APIs de production.

## Long-running operations

Certaines opérations métier prennent plusieurs secondes, minutes, ou heures : traitement d'une vidéo, génération d'un rapport, migration de données, entraînement d'un modèle. Répondre de manière synchrone est impossible — les timeouts HTTP (30–60 s typiquement) et les proxys intermédiaires coupent la connexion.

### Le problème du timeout

Un client qui attend 5 minutes une réponse HTTP va :
1. Expirer son timeout applicatif (30 s dans `requests` par défaut)
2. Voir sa connexion coupée par un load balancer ou reverse proxy (Nginx par défaut : 60 s)
3. Ne jamais recevoir le résultat

### Pattern polling — 202 Accepted + Location

Le pattern polling est la solution REST idiomatique. Le serveur accepte la tâche immédiatement (`202 Accepted`), retourne l'URL de suivi dans le header `Location`, et le client poll cette URL jusqu'à complétion.

```
POST /api/v2/reports/generate
Content-Type: application/json
{"date_range": "2024-Q4", "format": "pdf"}

HTTP/1.1 202 Accepted
Location: /api/v2/jobs/job-7f3a91c2
Retry-After: 5
Content-Type: application/json
{"job_id": "job-7f3a91c2", "status": "pending", "created_at": "2024-11-15T14:30:00Z"}

---

GET /api/v2/jobs/job-7f3a91c2

HTTP/1.1 200 OK
{"job_id": "job-7f3a91c2", "status": "running", "progress": 45}

---

GET /api/v2/jobs/job-7f3a91c2

HTTP/1.1 200 OK
{"job_id": "job-7f3a91c2", "status": "completed",
 "result_url": "/api/v2/reports/rpt-8a2b3c4d.pdf", "completed_at": "2024-11-15T14:32:15Z"}
```

Le header `Retry-After` suggère au client combien de secondes attendre avant le prochain poll, évitant le polling agressif.

### Implémentation FastAPI complète

```python
import asyncio
import uuid
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from fastapi import FastAPI, BackgroundTasks, HTTPException
from fastapi.responses import JSONResponse
from pydantic import BaseModel

app = FastAPI()

# Store en mémoire (Redis en production)
job_store: dict[str, dict] = {}

class JobStatus(str, Enum):
    PENDING   = "pending"
    RUNNING   = "running"
    COMPLETED = "completed"
    FAILED    = "failed"

class GenerateReportRequest(BaseModel):
    date_range: str
    format: str = "pdf"

async def process_report(job_id: str, request: GenerateReportRequest):
    """Simule le traitement long d'un rapport."""
    job_store[job_id]["status"] = JobStatus.RUNNING
    job_store[job_id]["started_at"] = datetime.now(timezone.utc).isoformat()

    try:
        for i in range(10):
            await asyncio.sleep(2)  # simulation du traitement
            job_store[job_id]["progress"] = (i + 1) * 10

        job_store[job_id]["status"] = JobStatus.COMPLETED
        job_store[job_id]["result_url"] = f"/api/v2/reports/{job_id}.pdf"
        job_store[job_id]["completed_at"] = datetime.now(timezone.utc).isoformat()

    except Exception as e:
        job_store[job_id]["status"] = JobStatus.FAILED
        job_store[job_id]["error"] = str(e)


@app.post("/api/v2/reports/generate", status_code=202)
async def generate_report(
    request: GenerateReportRequest,
    background_tasks: BackgroundTasks
):
    job_id = f"job-{uuid.uuid4().hex[:12]}"
    job_store[job_id] = {
        "job_id": job_id,
        "status": JobStatus.PENDING,
        "progress": 0,
        "created_at": datetime.now(timezone.utc).isoformat(),
    }
    background_tasks.add_task(process_report, job_id, request)

    return JSONResponse(
        status_code=202,
        headers={"Location": f"/api/v2/jobs/{job_id}", "Retry-After": "5"},
        content=job_store[job_id]
    )


@app.get("/api/v2/jobs/{job_id}")
async def get_job(job_id: str):
    job = job_store.get(job_id)
    if not job:
        raise HTTPException(status_code=404, detail="Job not found")
    return job
```

### Pattern callback (webhook)

Si le client peut recevoir des appels entrants, le pattern callback est plus efficace que le polling. À la création de la tâche, le client fournit une URL de callback :

```python
class GenerateReportRequest(BaseModel):
    date_range: str
    format: str = "pdf"
    callback_url: Optional[str] = None  # URL de notification

# À la fin du traitement :
async def notify_callback(callback_url: str, job: dict):
    async with httpx.AsyncClient() as client:
        await client.post(callback_url, json=job, timeout=10)
```

### Pattern SSE pour le suivi

Server-Sent Events permettent au client de recevoir des mises à jour en temps réel sans polling :

```python
from fastapi.responses import StreamingResponse
import asyncio

@app.get("/api/v2/jobs/{job_id}/stream")
async def stream_job_progress(job_id: str):
    async def event_generator():
        while True:
            job = job_store.get(job_id)
            if not job:
                yield "event: error\ndata: {\"message\": \"Job not found\"}\n\n"
                break

            yield f"event: progress\ndata: {json.dumps(job)}\n\n"

            if job["status"] in (JobStatus.COMPLETED, JobStatus.FAILED):
                break
            await asyncio.sleep(1)

    return StreamingResponse(event_generator(), media_type="text/event-stream")
```

## PATCH et mises à jour partielles

HTTP PATCH est prévu pour les mises à jour partielles. Mais le format du corps de la requête PATCH n'est pas standardisé par HTTP — il dépend du type MIME. Deux standards existent.

### JSON Patch — RFC 6902

JSON Patch représente les modifications comme une liste d'opérations atomiques.

```json
PATCH /api/v2/users/42
Content-Type: application/json-patch+json

[
  {"op": "replace", "path": "/name",       "value": "Alice Martin"},
  {"op": "add",     "path": "/tags/-",     "value": "premium"},
  {"op": "remove",  "path": "/legacy_id"},
  {"op": "move",    "path": "/nickname",   "from": "/alias"},
  {"op": "copy",    "path": "/display_name", "from": "/name"},
  {"op": "test",    "path": "/version",    "value": 5}
]
```

Les opérations :
- **add** : ajoute une valeur à un chemin (crée les nœuds intermédiaires, ou ajoute en fin de tableau si `-`)
- **remove** : supprime la valeur au chemin
- **replace** : équivalent à remove + add (le chemin doit exister)
- **move** : déplace la valeur de `from` vers `path`
- **copy** : copie la valeur de `from` vers `path`
- **test** : vérifie que la valeur au chemin correspond à `value` — si non, tout le patch échoue (atomicité)

L'opération `test` permet l'optimistic locking : vérifier la version avant de modifier.

### JSON Merge Patch — RFC 7396

JSON Merge Patch est plus simple : on envoie un objet JSON partiel. Les champs présents remplacent les champs existants. Les champs avec valeur `null` sont supprimés. Les champs absents sont inchangés.

```json
PATCH /api/v2/users/42
Content-Type: application/merge-patch+json

{
  "name": "Alice Martin",
  "bio": null,
  "preferences": {"theme": "dark"}
}
```

Résultat : `name` est mis à jour, `bio` est supprimé, `preferences.theme` est mis à jour sans toucher aux autres clés de `preferences`.

**Limite :** on ne peut pas mettre une valeur à `null` (null = suppression). On ne peut pas opérer sur des éléments individuels d'un tableau.

### Comparaison et recommandations

```{admonition} JSON Patch vs JSON Merge Patch
:class: note
**JSON Merge Patch** est plus lisible et suffit pour 80% des cas. Privilégiez-le pour les ressources simples.

**JSON Patch** est nécessaire pour : manipuler des tableaux (ajout/suppression d'éléments), l'optimistic locking via `test`, les modifications conditionnelles atomiques.
```

## Bulk APIs

Les opérations bulk permettent de traiter plusieurs ressources en un seul appel HTTP, réduisant la latence réseau pour les clients qui doivent créer ou modifier des dizaines ou centaines d'objets.

### POST /batch

```json
POST /api/v2/batch
Content-Type: application/json

{
  "requests": [
    {"method": "POST",   "path": "/api/v2/users",    "body": {"name": "Alice"}},
    {"method": "GET",    "path": "/api/v2/users/42"},
    {"method": "DELETE", "path": "/api/v2/users/99"}
  ]
}
```

### PATCH /resources — mise à jour de collection

```json
PATCH /api/v2/products
Content-Type: application/json

[
  {"id": "prod-1", "price": 29.99},
  {"id": "prod-2", "price": 49.99},
  {"id": "prod-3", "status": "archived"}
]
```

### Multi-Status

Les opérations bulk doivent gérer les succès partiels. Le code `207 Multi-Status` (WebDAV, mais accepté en REST) permet de retourner un status par item :

```json
HTTP/1.1 207 Multi-Status
Content-Type: application/json

{
  "results": [
    {"index": 0, "status": 201, "id": "usr-456"},
    {"index": 1, "status": 422, "error": "email already exists"},
    {"index": 2, "status": 201, "id": "usr-789"}
  ]
}
```

```{admonition} Idempotence des bulk
:class: important
Les opérations bulk doivent être idempotentes si elles modifient des données. Utilisez un `batch_id` unique en header (`Idempotency-Key`) pour que le serveur puisse dépliquer les retry réseau.
```

## File upload

### multipart/form-data

L'upload standard combine des métadonnées et un fichier binaire dans un seul POST :

```python
from fastapi import FastAPI, UploadFile, File, Form

@app.post("/api/v2/documents")
async def upload_document(
    file: UploadFile = File(...),
    title: str = Form(...),
    folder_id: str = Form(...)
):
    content = await file.read()
    # Traitement du fichier...
    return {"document_id": "doc-123", "size": len(content)}
```

**Limite :** le fichier passe par le serveur API, qui doit le lire en RAM ou le streamer. Impraticable pour des fichiers de plusieurs gigaoctets.

### Direct upload — presigned URL S3

La solution scalable contourne le serveur API : le client upload directement dans le stockage objet (S3, GCS).

```python
import boto3
from datetime import timedelta

s3 = boto3.client("s3")

@app.post("/api/v2/documents/upload-url")
async def get_upload_url(filename: str, content_type: str):
    """
    Génère une presigned URL pour upload direct vers S3.
    Le serveur API n'est plus dans le chemin du fichier.
    """
    key = f"uploads/{uuid.uuid4()}/{filename}"

    presigned = s3.generate_presigned_post(
        Bucket="my-bucket",
        Key=key,
        Fields={"Content-Type": content_type},
        Conditions=[
            {"Content-Type": content_type},
            ["content-length-range", 1, 100 * 1024 * 1024]  # max 100 MB
        ],
        ExpiresIn=3600
    )

    return {
        "upload_url": presigned["url"],
        "upload_fields": presigned["fields"],
        "storage_key": key,
        "expires_in": 3600
    }

@app.post("/api/v2/documents/confirm")
async def confirm_upload(storage_key: str, title: str):
    """Après l'upload S3, le client confirme pour créer la ressource."""
    # Vérifier que le fichier existe en S3
    # Créer la ressource en BDD
    return {"document_id": "doc-123", "storage_key": storage_key}
```

### TUS protocol — uploads résumables

Le protocole TUS (tus.io) standardise les uploads résumables. Si l'upload est interrompu, le client reprend depuis le dernier octet reçu.

Workflow :
1. `POST /uploads` → créer la session, retourne l'URL de l'upload + `Upload-Offset: 0`
2. `PATCH /uploads/{id}` → envoyer un chunk (header `Upload-Offset`, `Content-Type: application/offset+octet-stream`)
3. Répéter jusqu'à ce que `Upload-Offset == Upload-Length`
4. Le serveur notifie la complétion

TUS est supporté nativement par `tus-js-client` (navigateur), `tusd` (serveur Go), et des plugins pour AWS S3/GCS.

## API de recherche

### Paramètres de recherche vs endpoint dédié

Pour une recherche simple (filtrage), des query params sur la collection suffisent :

```
GET /api/v2/users?name=alice&status=active&created_after=2024-01-01
```

Pour une recherche complexe (full-text, facettes, scoring, suggestions), un endpoint dédié est plus clair :

```
POST /api/v2/search
Content-Type: application/json

{
  "query": "machine learning",
  "filters": {"category": "books", "price_max": 50},
  "facets": ["category", "author"],
  "sort": {"field": "relevance"},
  "page": {"limit": 20, "cursor": "eyJpZCI6MTIz"}
}
```

L'utilisation de POST pour la recherche est acceptable quand la requête est trop complexe pour tenir dans une URL.

### Facettes

Les facettes agrègent les résultats par dimension pour permettre le filtrage progressif :

```json
{
  "results": [...],
  "facets": {
    "category": [
      {"value": "books", "count": 142},
      {"value": "videos", "count": 87}
    ],
    "price_range": [
      {"value": "0-25", "count": 89},
      {"value": "25-50", "count": 63}
    ]
  },
  "total": 229
}
```

## Streaming de réponses

Certaines ressources sont trop volumineuses pour tenir en mémoire ou pour être envoyées en une seule réponse. Le streaming découple la génération de la réponse de sa transmission.

### Transfer-Encoding: chunked

HTTP/1.1 supporte le chunked transfer encoding : le serveur envoie la réponse par morceaux sans connaître sa taille totale à l'avance.

### StreamingResponse FastAPI

```python
import csv
import io
from fastapi.responses import StreamingResponse

@app.get("/api/v2/reports/export.csv")
async def export_csv():
    """Export CSV streamed — pas de limite de taille."""
    async def generate_rows():
        # En-tête CSV
        yield "id,name,email,created_at\n"

        # Les lignes sont lues et envoyées par batch
        async for batch in fetch_users_by_batch(batch_size=1000):
            for user in batch:
                yield f'{user["id"]},{user["name"]},{user["email"]},{user["created_at"]}\n'

    return StreamingResponse(
        generate_rows(),
        media_type="text/csv",
        headers={"Content-Disposition": "attachment; filename=users.csv"}
    )
```

### NDJSON streaming

NDJSON (Newline-Delimited JSON) est idéal pour streamer des collections d'objets JSON :

```python
import json

@app.get("/api/v2/events/stream")
async def stream_events():
    async def generate():
        async for event in fetch_events_stream():
            yield json.dumps(event) + "\n"

    return StreamingResponse(generate(), media_type="application/x-ndjson")
```

Chaque ligne est un document JSON valide, facilement parseable côté client avec `readline()`.

## API événementielles

### Poll vs push

**Poll :** le client interroge régulièrement l'API pour détecter les changements. Simple à implémenter, mais inefficace (nombreuses requêtes vides).

**Push :** le serveur notifie le client quand un changement survient. Plus efficace, mais nécessite un canal persistant (webhook, SSE, WebSocket) ou une infrastructure de messagerie.

### ETag pour les listes changeantes

Le polling peut être rendu efficace avec les ETags. Le serveur retourne un ETag représentant l'état de la collection. Le client envoie ce tag dans sa prochaine requête :

```
GET /api/v2/notifications
ETag: "v2-1234567890"

→ HTTP 200 OK si les notifications ont changé
→ HTTP 304 Not Modified si rien de nouveau (pas de corps, très rapide)
```

### Changes endpoints

Certaines APIs exposent un endpoint de changements : retourne uniquement les ressources créées/modifiées/supprimées depuis un cursor donné.

```
GET /api/v2/users/changes?since=2024-11-15T14:00:00Z&limit=100

{
  "changes": [
    {"type": "created",  "resource": {...}, "timestamp": "2024-11-15T14:05:00Z"},
    {"type": "updated",  "resource": {...}, "timestamp": "2024-11-15T14:07:00Z"},
    {"type": "deleted",  "id": "usr-99",    "timestamp": "2024-11-15T14:10:00Z"}
  ],
  "next_cursor": "2024-11-15T14:10:00Z",
  "has_more": false
}
```

```{admonition} Cursor vs timestamp
:class: warning
Utilisez un cursor opaque plutôt qu'un timestamp pour les changes endpoints. Les timestamps ont des problèmes d'horloge (clock skew) et de granularité. Un cursor peut encoder l'ID du dernier événement traité, ce qui est exact et atomique.
```

---

## Cellules exécutables

### Implémentation JSON Patch RFC 6902

```{code-cell} python3
import json
import copy
from typing import Any

def json_pointer_get(doc: Any, pointer: str) -> tuple[Any, str]:
    """Navigue dans doc en suivant le JSON Pointer (RFC 6901). Retourne (parent, key)."""
    if pointer == "":
        return None, ""

    parts = pointer.lstrip("/").split("/")
    # Décoder les caractères spéciaux RFC 6901
    parts = [p.replace("~1", "/").replace("~0", "~") for p in parts]

    parent = doc
    for part in parts[:-1]:
        if isinstance(parent, list):
            parent = parent[int(part)]
        else:
            parent = parent[part]

    return parent, parts[-1]


def apply_json_patch(doc: dict, patch: list[dict]) -> dict:
    """
    Applique une liste d'opérations JSON Patch (RFC 6902) à un document.
    Retourne le document modifié. L'original n'est pas muté.
    Lève une ValueError si une opération test échoue.
    """
    result = copy.deepcopy(doc)

    for op_index, op in enumerate(patch):
        operation = op["op"]
        path      = op.get("path", "")
        value     = op.get("value")
        from_path = op.get("from", "")

        if operation == "test":
            parent, key = json_pointer_get(result, path)
            target = parent[int(key)] if isinstance(parent, list) else parent[key]
            if target != value:
                raise ValueError(
                    f"Test failed at op {op_index}: path='{path}', "
                    f"expected={value!r}, got={target!r}"
                )

        elif operation == "add":
            parent, key = json_pointer_get(result, path)
            if isinstance(parent, list):
                if key == "-":
                    parent.append(value)
                else:
                    parent.insert(int(key), value)
            else:
                parent[key] = value

        elif operation == "remove":
            parent, key = json_pointer_get(result, path)
            if isinstance(parent, list):
                parent.pop(int(key))
            else:
                del parent[key]

        elif operation == "replace":
            parent, key = json_pointer_get(result, path)
            if isinstance(parent, list):
                parent[int(key)] = value
            else:
                if key not in parent:
                    raise ValueError(f"Replace: path '{path}' does not exist")
                parent[key] = value

        elif operation == "move":
            src_parent, src_key = json_pointer_get(result, from_path)
            val = src_parent.pop(src_key) if not isinstance(src_parent, list) else src_parent.pop(int(src_key))
            dst_parent, dst_key = json_pointer_get(result, path)
            if isinstance(dst_parent, list):
                dst_parent.insert(int(dst_key) if dst_key != "-" else len(dst_parent), val)
            else:
                dst_parent[dst_key] = val

        elif operation == "copy":
            src_parent, src_key = json_pointer_get(result, from_path)
            val = copy.deepcopy(src_parent[int(src_key)] if isinstance(src_parent, list) else src_parent[src_key])
            dst_parent, dst_key = json_pointer_get(result, path)
            if isinstance(dst_parent, list):
                dst_parent.insert(int(dst_key) if dst_key != "-" else len(dst_parent), val)
            else:
                dst_parent[dst_key] = val

    return result


# --- Démo ---
original = {
    "id": 42,
    "name": "Alice Dupont",
    "email": "alice@example.com",
    "tags": ["user", "trial"],
    "address": {"city": "Paris", "zip": "75001"},
    "version": 3
}

patch = [
    {"op": "test",    "path": "/version",        "value": 3},         # vérification
    {"op": "replace", "path": "/name",            "value": "Alice Martin"},
    {"op": "add",     "path": "/tags/-",          "value": "premium"}, # ajout en fin
    {"op": "remove",  "path": "/tags/0"},                              # supprime "user"
    {"op": "replace", "path": "/address/city",    "value": "Lyon"},
    {"op": "copy",    "path": "/display_name",    "from": "/name"},
    {"op": "replace", "path": "/version",         "value": 4},
]

result = apply_json_patch(original, patch)

print("=== Document original ===")
print(json.dumps(original, indent=2, ensure_ascii=False))
print("\n=== Après JSON Patch ===")
print(json.dumps(result, indent=2, ensure_ascii=False))

# Test de l'opération test en échec
print("\n=== Test d'une opération test en échec ===")
try:
    apply_json_patch(original, [{"op": "test", "path": "/version", "value": 99}])
except ValueError as e:
    print(f"ValueError levée : {e}")
```

### Simulation long-running operation avec polling

```{code-cell} python3
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns
import time

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.0)

class JobStore:
    """Store de jobs simulé."""
    def __init__(self):
        self._jobs: dict[str, dict] = {}
        self._counter = 0

    def create(self, job_type: str) -> str:
        self._counter += 1
        job_id = f"job-{self._counter:04d}"
        self._jobs[job_id] = {
            "job_id": job_id,
            "type": job_type,
            "status": "pending",
            "progress": 0,
            "created_t": 0
        }
        return job_id

    def advance(self, job_id: str, t: int):
        job = self._jobs[job_id]
        if job["status"] == "pending":
            job["status"] = "running"
            job["started_t"] = t
        job["progress"] = min(100, job["progress"] + 12)
        if job["progress"] >= 100:
            job["status"] = "completed"
            job["completed_t"] = t

    def get(self, job_id: str) -> dict:
        return self._jobs[job_id]


store = JobStore()

# Simulation de 3 clients qui soumettent des jobs et pollent
timeline = []  # (t, client, event, job_id, status)

# Client A : soumet à t=0, poll toutes les 2s
# Client B : soumet à t=3, poll toutes les 3s
# Client C : soumet à t=5, poll toutes les 5s
scenarios = [
    ("Client A", 0, 2, "rapport-annuel"),
    ("Client B", 3, 3, "export-csv"),
    ("Client C", 5, 5, "migration-BDD"),
]

job_ids = {}
for client, start_t, poll_interval, job_type in scenarios:
    jid = store.create(job_type)
    job_ids[client] = jid
    timeline.append((start_t, client, "submit", jid, "pending"))

MAX_T = 20
for t in range(MAX_T + 1):
    for client, start_t, poll_interval, _ in scenarios:
        jid = job_ids[client]
        job = store.get(jid)
        if t < start_t or job["status"] == "completed":
            continue
        store.advance(jid, t)
        if (t - start_t) % poll_interval == 0:
            status = store.get(jid)["status"]
            progress = store.get(jid)["progress"]
            timeline.append((t, client, "poll", jid, f"{status} {progress}%"))


# Visualisation
fig, ax = plt.subplots(figsize=(13, 5))

client_y = {"Client A": 2.5, "Client B": 1.5, "Client C": 0.5}
colors_map = {"submit": "#4c72b0", "poll": "#55a868"}

for t, client, event, jid, status in timeline:
    y = client_y[client]
    color = colors_map[event]
    marker = "^" if event == "submit" else "o"
    ax.scatter(t, y, color=color, marker=marker, s=80, zorder=3)
    label = f"{'▶' if event == 'submit' else '?'} {status}"
    ax.text(t, y + 0.12, label, ha="center", fontsize=6.5, color=color)

for client, y in client_y.items():
    ax.axhline(y=y, color="lightgray", linewidth=1, zorder=1)
    ax.text(-0.5, y, client, ha="right", va="center", fontsize=9, fontweight="bold")

legend_patches = [
    mpatches.Patch(color="#4c72b0", label="Soumission (POST → 202)"),
    mpatches.Patch(color="#55a868", label="Poll (GET job status)"),
]
ax.legend(handles=legend_patches, loc="upper right")
ax.set_xlabel("Temps simulé (secondes)")
ax.set_title("Pattern polling — 3 clients, long-running operations")
ax.set_xlim(-1, MAX_T + 1)
ax.set_ylim(0, 3.5)
ax.set_yticks([])

plt.show()
```

### Comparaison JSON Patch vs JSON Merge Patch

```{code-cell} python3
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns

sns.set_theme(style="whitegrid", font_scale=0.95)

fig, ax = plt.subplots(figsize=(13, 7))
ax.axis("off")

headers = ["Critère", "JSON Patch (RFC 6902)", "JSON Merge Patch (RFC 7396)"]
rows = [
    ("Content-Type",             "application/json-patch+json",       "application/merge-patch+json"),
    ("Format",                   "Tableau d'opérations",              "Objet JSON partiel"),
    ("Lisibilité",                "⚠ Verbeux",                        "✓ Intuitif"),
    ("Manipulation de tableau",  "✓ add/remove sur index précis",     "✗ Remplace tout le tableau"),
    ("Supprimer un champ",       "remove op",                         "Mettre null"),
    ("Mettre une valeur null",   "✓ Possible (replace, value: null)", "✗ Null = suppression"),
    ("Optimistic locking",       "✓ Opération test",                  "✗ Non supporté"),
    ("Opérations atomiques",     "✓ Tout ou rien",                    "✓ Implicitement atomique"),
    ("Complexité serveur",       "⚠ Implémentation non triviale",     "✓ Simple (merge récursif)"),
    ("Cas d'usage principal",    "Patches complexes, arrays",         "Mises à jour simples"),
]

col_widths = [3.0, 4.5, 4.5]
x_offsets  = [0.2, 3.4, 8.1]
row_h      = 0.5

# En-têtes
header_colors = ["#d9d9d9", "#aec7e8", "#ffbb78"]
for col_idx, (header, x, w) in enumerate(zip(headers, x_offsets, col_widths)):
    rect = mpatches.FancyBboxPatch(
        (x, len(rows) * row_h + 0.1), w, 0.55,
        boxstyle="round,pad=0.04",
        facecolor=header_colors[col_idx], edgecolor="#555555", linewidth=1.2
    )
    ax.add_patch(rect)
    ax.text(x + w / 2, len(rows) * row_h + 0.38, header,
            ha="center", va="center", fontsize=9, fontweight="bold")

# Lignes
for row_idx, row in enumerate(rows):
    y = (len(rows) - row_idx - 1) * row_h + 0.1
    bg = "#f7f7f7" if row_idx % 2 == 0 else "white"
    for col_idx, (cell, x, w) in enumerate(zip(row, x_offsets, col_widths)):
        rect = mpatches.FancyBboxPatch(
            (x, y), w, row_h - 0.06,
            boxstyle="round,pad=0.03",
            facecolor=bg, edgecolor="#cccccc", linewidth=0.7
        )
        ax.add_patch(rect)
        ax.text(x + 0.15, y + (row_h - 0.06) / 2, cell,
                ha="left", va="center", fontsize=8.5)

ax.set_xlim(0, 13)
ax.set_ylim(0, len(rows) * row_h + 1.0)
ax.set_title("JSON Patch vs JSON Merge Patch — comparaison", fontsize=12,
             fontweight="bold", pad=8)

plt.show()
```

### Simulation chunked upload TUS — diagramme de séquence

```{code-cell} python3
import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import seaborn as sns

sns.set_theme(style="whitegrid", font_scale=0.95)

fig, ax = plt.subplots(figsize=(13, 8))
ax.axis("off")
ax.set_xlim(0, 13)
ax.set_ylim(0, 8.5)

# Acteurs
actors = [("Client", 2), ("Serveur TUS", 7), ("Stockage\nobjet", 11)]
actor_y = 8.0

for name, x in actors:
    rect = mpatches.FancyBboxPatch(
        (x - 0.9, actor_y - 0.25), 1.8, 0.55,
        boxstyle="round,pad=0.08",
        facecolor="#aec7e8", edgecolor="#555555", linewidth=1.5
    )
    ax.add_patch(rect)
    ax.text(x, actor_y + 0.02, name, ha="center", va="center",
            fontsize=9, fontweight="bold")
    # Ligne de vie
    ax.axvline(x=x, color="#aaaaaa", linewidth=1, linestyle="--",
               ymin=0.0, ymax=(actor_y - 0.3) / 8.5)

# Messages (y, x_from, x_to, label, color, direction)
messages = [
    (7.2, 2, 7, "POST /uploads  (Upload-Length: 10MB, filename: video.mp4)", "#4c72b0", "→"),
    (6.7, 7, 2, "201 Created  Location: /uploads/abc123  Upload-Offset: 0",  "#2ca02c", "←"),
    (6.1, 2, 7, "PATCH /uploads/abc123  (chunk 1/5, Upload-Offset: 0)",       "#4c72b0", "→"),
    (5.6, 7, 2, "204 No Content  Upload-Offset: 2097152  (2 MB reçus)",        "#2ca02c", "←"),
    (5.1, 2, 7, "PATCH /uploads/abc123  (chunk 2/5, Upload-Offset: 2097152)", "#4c72b0", "→"),
    (4.6, 7, 2, "204 No Content  Upload-Offset: 4194304",                      "#2ca02c", "←"),
    (4.2, 2, 2, "⚡ Interruption réseau", "#d62728", "•"),
    (3.7, 2, 7, "HEAD /uploads/abc123  (où en est l'upload ?)",                "#ff7f0e", "→"),
    (3.2, 7, 2, "200 OK  Upload-Offset: 4194304  (reprise depuis 4 MB)",       "#2ca02c", "←"),
    (2.7, 2, 7, "PATCH /uploads/abc123  (chunk 3/5, Upload-Offset: 4194304)", "#4c72b0", "→"),
    (2.1, 7, 11,"PUT /bucket/abc123  (stream final vers stockage)",           "#9467bd", "→"),
    (1.6, 11, 7,"200 OK  (fichier stocké)",                                    "#55a868", "←"),
    (1.1, 7, 2, "204 No Content  Upload-Offset: 10485760  (complet)",          "#2ca02c", "←"),
]

for y, x1, x2, label, color, direction in messages:
    if direction == "•":
        ax.scatter(x1, y, color=color, s=100, zorder=3)
        ax.text(x1 + 0.2, y, label, color=color, fontsize=8, va="center")
        continue

    arrow_x1 = x1 + (0.1 if x1 < x2 else -0.1)
    arrow_x2 = x2 - (0.1 if x1 < x2 else -0.1)
    ax.annotate("", xy=(arrow_x2, y), xytext=(arrow_x1, y),
                arrowprops=dict(arrowstyle="->", color=color, lw=1.3))

    mid_x = (x1 + x2) / 2
    ax.text(mid_x, y + 0.12, label, ha="center", va="bottom",
            fontsize=7.5, color=color)

ax.set_title("Upload résumable TUS — diagramme de séquence avec reprise", fontsize=11,
             fontweight="bold", pad=8)

plt.show()
```

## Résumé

Ce chapitre a couvert les patterns qui comblent les lacunes du CRUD standard.

- **Long-running operations :** le pattern polling (`202 Accepted` + `Location` + `Retry-After`) est la solution REST idiomatique. Le pattern webhook est plus efficace quand le client peut recevoir des appels entrants. SSE convient pour le suivi en temps réel dans un navigateur.
- **PATCH :** JSON Merge Patch (RFC 7396) est suffisant pour 80% des cas de mise à jour partielle. JSON Patch (RFC 6902) est nécessaire pour manipuler des tableaux, mettre une valeur à `null`, ou implémenter l'optimistic locking via l'opération `test`.
- **Bulk APIs :** le code `207 Multi-Status` gère les succès partiels. L'`Idempotency-Key` header protège contre les retry réseau.
- **File upload :** les presigned URLs S3 déchargent le serveur API pour les gros fichiers. Le protocole TUS standardise les uploads résumables pour les fichiers très volumineux ou les connexions instables.
- **Streaming :** `StreamingResponse` FastAPI avec NDJSON est la solution la plus simple pour streamer des collections. `Transfer-Encoding: chunked` est transparent pour le code applicatif.
- **Changes endpoints :** préférez un cursor opaque à un timestamp pour les endpoints de changements, afin d'éviter les problèmes de clock skew et de granularité.
