---
jupytext:
  text_representation:
    extension: .md
    format_name: myst
    format_version: 0.13
    jupytext_version: 1.16.0
kernelspec:
  name: python3
  display_name: Python 3
---

# Analyse de logs

```{code-cell} python
:tags: [hide-input]

import matplotlib.pyplot as plt
import matplotlib.patches as mpatches
import matplotlib.dates as mdates
import numpy as np
import seaborn as sns
import pandas as pd
import re
import os
import time
from datetime import datetime, timedelta
import random

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)
```

## Architecture de logging Linux

### Le protocole syslog (RFC 5424)

Tous les systèmes Linux modernes s'appuient sur le protocole **syslog** pour structurer les événements système. Chaque message syslog porte deux attributs de classification :

**Facility** (source) : `kern`, `user`, `mail`, `daemon`, `auth`, `syslog`, `lpr`, `news`, `uucp`, `cron`, `local0` à `local7`

**Severity** (niveau de priorité) :

| Niveau | Valeur | Signification |
|--------|--------|---------------|
| emerg  | 0 | Système inutilisable |
| alert  | 1 | Action immédiate requise |
| crit   | 2 | Condition critique |
| err    | 3 | Erreur |
| warning | 4 | Avertissement |
| notice | 5 | Condition normale mais notable |
| info   | 6 | Message informatif |
| debug  | 7 | Messages de débogage |

La **priorité numérique** d'un message est `facility × 8 + severity`.

### rsyslog et journald — deux mondes complémentaires

**rsyslog** est le daemon syslog traditionnel. Il reçoit les messages via le socket Unix `/dev/log`, les filtre et les écrit dans des fichiers texte sous `/var/log/`. Il supporte des règles complexes, le forwarding réseau (UDP/TCP/TLS) et plusieurs formats de sortie.

**journald** (composant de systemd) collecte les messages de toutes les sources — noyau, services systemd, applications via l'API sd_journal — et les stocke dans un format binaire indexé sous `/var/log/journal/`. Ce format permet des requêtes structurées rapides que les fichiers texte ne permettent pas.

Les deux coexistent : journald peut retransmettre ses messages à rsyslog via `/run/systemd/journal/syslog`, permettant à rsyslog de continuer à écrire les fichiers classiques tout en bénéficiant de la collecte unifiée de journald.

### Principaux fichiers /var/log/

| Fichier | Contenu |
|---------|---------|
| `/var/log/syslog` | Messages syslog généraux (Debian/Ubuntu) |
| `/var/log/messages` | Équivalent sur RHEL/CentOS |
| `/var/log/auth.log` | Authentifications, sudo, PAM (Debian/Ubuntu) |
| `/var/log/secure` | Équivalent sur RHEL/CentOS |
| `/var/log/kern.log` | Messages du noyau |
| `/var/log/dpkg.log` | Opérations APT/dpkg |
| `/var/log/nginx/` | Accès et erreurs Nginx |
| `/var/log/journal/` | Journal binaire systemd |

---

## journalctl avancé

`journalctl` est l'interface de requête pour le journal systemd. Sa puissance réside dans sa capacité à filtrer sur des champs structurés plutôt que sur du texte brut.

### Filtres temporels et par unité

```bash
# Logs du service nginx depuis 2 heures
journalctl -u nginx.service --since "2 hours ago"

# Plage horaire précise
journalctl --since "2026-03-24 08:00:00" --until "2026-03-24 12:00:00"

# Niveau de priorité : erreurs et plus grave
journalctl -p err

# Combinaison : erreurs nginx d'aujourd'hui
journalctl -u nginx -p err --since today
```

### Champs structurés

```bash
# Filtrer sur le PID d'un processus
journalctl _PID=1234

# Filtrer sur l'unité systemd (champ interne)
journalctl _SYSTEMD_UNIT=sshd.service

# Tous les logs d'un UID donné
journalctl _UID=1000

# Logs du noyau (équivalent dmesg)
journalctl -k
```

### Formats de sortie

```bash
# Format JSON (une ligne par entrée) — pour le parsing
journalctl -u sshd --since "1 hour ago" --output=json | head -3

# Format court avec timestamp précis (microsecondes)
journalctl -u sshd --output=short-precise

# Afficher les curseurs (utile pour reprendre la lecture)
journalctl --show-cursor

# Suivre en temps réel
journalctl -f -u nginx
```

```bash
# Exemple de sortie JSON (extrait)
{"__REALTIME_TIMESTAMP":"1711270800123456","MESSAGE":"Accepted publickey for alice",
 "_HOSTNAME":"srv01","_SYSTEMD_UNIT":"sshd.service","PRIORITY":"6"}
```

### Curseurs — reprise de lecture

Les **curseurs** journald permettent de reprendre la lecture exactement où on s'était arrêté, sans risquer de manquer ou de dupliquer des entrées :

```bash
# Sauvegarder le curseur courant
journalctl --show-cursor 2>&1 | tail -1 > /tmp/curseur_journal.txt

# Reprendre depuis ce curseur
journalctl --after-cursor="$(cat /tmp/curseur_journal.txt)"
```

```{admonition} Persistance du journal
:class: tip
Par défaut, sur certaines distributions, le journal n'est pas persisté entre les redémarrages (`/run/log/journal/` est en RAM). Pour activer la persistance : `mkdir -p /var/log/journal && systemd-tmpfiles --create --prefix /var/log/journal && systemctl restart systemd-journald`.
```

---

## rsyslog — configuration et filtrage

### Structure de /etc/rsyslog.conf

```
# Modules d'entrée
module(load="imuxsock")   # socket Unix /dev/log
module(load="imklog")     # messages noyau via /proc/kmsg
module(load="imjournal")  # depuis journald

# Règles : facility.severity  action
auth,authpriv.*          /var/log/auth.log
*.*;auth,authpriv.none   /var/log/syslog
kern.*                   /var/log/kern.log
cron.*                   /var/log/cron.log
*.emerg                  :omusrmsg:*        # broadcast tous utilisateurs
```

### Templates — format de sortie personnalisé

```
# Template JSON pour forwarding vers un SIEM
template(name="JsonFormat" type="list") {
    constant(value="{")
    property(name="timereported" dateFormat="rfc3339" format="jsonf")
    constant(value=",")
    property(name="hostname" format="jsonf")
    constant(value=",")
    property(name="syslogseverity-text" format="jsonf")
    constant(value=",")
    property(name="msg" format="jsonf")
    constant(value="}\n")
}

*.* action(type="omfile" file="/var/log/all.json" template="JsonFormat")
```

### Forwarding réseau vers un syslog distant

```
# Forwarding TCP vers un collecteur central (port 514)
*.* action(type="omfwd"
           target="192.168.1.100"
           port="514"
           protocol="tcp"
           action.resumeRetryCount="100"
           queue.type="linkedList"
           queue.size="10000"
           queue.filename="fwd_queue")
```

L'utilisation d'une file d'attente locale (`queue`) garantit qu'aucun message n'est perdu si le collecteur distant est temporairement indisponible.

---

## logrotate — rotation des fichiers de logs

Sans rotation, les fichiers de logs grossissent indéfiniment jusqu'à saturer le disque. `logrotate` est le daemon chargé de les archiver et compresser périodiquement.

### Configuration globale /etc/logrotate.conf

```
# Rotation hebdomadaire par défaut
weekly

# Garder 4 semaines d'archives
rotate 4

# Compresser les archives
compress

# Ne pas lever d'erreur si le fichier est absent
missingok

# Ne pas tourner si le fichier est vide
notifempty

# Inclure les configurations des packages
include /etc/logrotate.d
```

### Exemple /etc/logrotate.d/nginx

```
/var/log/nginx/*.log {
    daily
    missingok
    rotate 14
    compress
    delaycompress
    notifempty
    create 0640 www-data adm
    sharedscripts
    postrotate
        if [ -f /run/nginx.pid ]; then
            kill -USR1 $(cat /run/nginx.pid)
        fi
    endscript
}
```

Directives importantes :

| Directive | Effet |
|-----------|-------|
| `daily` / `weekly` / `monthly` | Fréquence de rotation |
| `size 100M` | Rotation quand le fichier dépasse 100 MiB |
| `rotate N` | Garder N archives |
| `compress` | Compresser avec gzip |
| `delaycompress` | Compresser l'archive précédente (pas la dernière) |
| `postrotate / endscript` | Script exécuté après rotation (ex : rechargement daemon) |
| `copytruncate` | Copier puis vider le fichier original (pour les processus qui ne supportent pas SIGHUP) |

```bash
# Tester sans appliquer (-d = dry-run)
logrotate -d /etc/logrotate.d/nginx

# Forcer la rotation immédiatement
logrotate -f /etc/logrotate.d/nginx
```

```{admonition} delaycompress et les démons
:class: note
`delaycompress` est nécessaire quand le daemon (Nginx, Apache) garde le fichier ouvert après rotation. Sans cette directive, le daemon écrirait dans le fichier compressé — résultant en un fichier corrompu et des logs perdus. La directive `postrotate` envoie `SIGUSR1` à Nginx pour qu'il rouvre ses fichiers de log sur le nouveau fichier vide.
```

---

## Analyse de logs Nginx — Combined Log Format

### Format Combined Log

Le format par défaut de Nginx est le **Combined Log Format** :

```
$remote_addr - $remote_user [$time_local] "$request" $status $body_bytes_sent "$http_referer" "$http_user_agent"
```

Exemple :

```
203.0.113.42 - alice [24/Mar/2026:14:32:01 +0100] "GET /api/users HTTP/1.1" 200 1452 "https://example.com/" "Mozilla/5.0"
```

### Génération de logs simulés et analyse

```{code-cell} python
import random
from datetime import datetime, timedelta

random.seed(42)

# --- Génération de données simulées ---
ips = [
    "203.0.113.42", "198.51.100.7", "192.0.2.15",
    "203.0.113.99", "198.51.100.33", "10.0.0.5",
    "172.16.0.12", "203.0.113.42", "198.51.100.7",
    "203.0.113.42",
]
urls = [
    "/api/users", "/", "/static/app.js", "/api/orders",
    "/login", "/api/products", "/admin", "/favicon.ico",
    "/api/users/42", "/static/style.css",
]
codes = [200]*60 + [304]*15 + [404]*12 + [500]*5 + [301]*5 + [403]*3
agents = [
    'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36',
    'curl/7.88.1',
    'python-requests/2.31.0',
    'Googlebot/2.1',
    'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
]

debut = datetime(2026, 3, 24, 0, 0, 0)
lignes_log = []
regex_pattern = (
    r'(?P<ip>\S+) - (?P<user>\S+) \[(?P<time>[^\]]+)\] '
    r'"(?P<method>\S+) (?P<url>\S+) \S+" '
    r'(?P<status>\d+) (?P<bytes>\d+)'
)

for i in range(500):
    ts = debut + timedelta(seconds=random.randint(0, 86399))
    ip     = random.choice(ips)
    url    = random.choice(urls)
    code   = random.choice(codes)
    bytes_ = random.randint(200, 50000)
    agent  = random.choice(agents)
    methode = "GET" if url != "/login" else random.choice(["GET", "POST"])
    ligne = (f'{ip} - - [{ts.strftime("%d/%b/%Y:%H:%M:%S")} +0100] '
             f'"{methode} {url} HTTP/1.1" {code} {bytes_} '
             f'"-" "{agent}"')
    lignes_log.append(ligne)

# --- Parsing ---
records = []
for ligne in lignes_log:
    m = re.match(regex_pattern, ligne)
    if m:
        records.append({
            "ip"     : m.group("ip"),
            "methode": m.group("method"),
            "url"    : m.group("url"),
            "status" : int(m.group("status")),
            "bytes"  : int(m.group("bytes")),
            "heure"  : int(re.search(r':(\d{2}):', ligne).group(1))
                       if re.search(r':(\d{2}):', ligne) else 0,
        })

df_nginx = pd.DataFrame(records)
print(f"Lignes parsées : {len(df_nginx)}")
print(df_nginx["status"].value_counts().rename("Nombre de requêtes").rename_axis("Code HTTP"))
```

```{code-cell} python
:tags: [hide-input]

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)

fig, axes = plt.subplots(2, 2, figsize=(13, 9))

# Top 5 IPs
top_ips = df_nginx["ip"].value_counts().head(5)
axes[0, 0].barh(top_ips.index[::-1], top_ips.values[::-1],
                color=sns.color_palette("muted")[0])
axes[0, 0].set_xlabel("Nombre de requêtes")
axes[0, 0].set_title("Top 5 adresses IP")

# Top 8 URLs
top_urls = df_nginx["url"].value_counts().head(8)
axes[0, 1].barh(top_urls.index[::-1], top_urls.values[::-1],
                color=sns.color_palette("muted")[1])
axes[0, 1].set_xlabel("Nombre de requêtes")
axes[0, 1].set_title("Top 8 URLs")

# Distribution codes HTTP
codes_count = df_nginx["status"].value_counts().sort_index()
couleurs_codes = {200: "#4CAF50", 301: "#2196F3", 304: "#03A9F4",
                  403: "#FF9800", 404: "#F44336", 500: "#9C27B0"}
barres_codes = axes[1, 0].bar(
    codes_count.index.astype(str),
    codes_count.values,
    color=[couleurs_codes.get(c, "#607D8B") for c in codes_count.index]
)
axes[1, 0].set_xlabel("Code HTTP")
axes[1, 0].set_ylabel("Nombre de requêtes")
axes[1, 0].set_title("Distribution des codes HTTP")
for b, v in zip(barres_codes, codes_count.values):
    axes[1, 0].text(b.get_x() + b.get_width()/2, v + 1, str(v),
                    ha="center", va="bottom", fontsize=9)

# Requêtes par heure
req_par_heure = df_nginx["heure"].value_counts().sort_index()
axes[1, 1].plot(req_par_heure.index, req_par_heure.values,
                marker="o", color=sns.color_palette("muted")[2], linewidth=2)
axes[1, 1].fill_between(req_par_heure.index, req_par_heure.values, alpha=0.2,
                         color=sns.color_palette("muted")[2])
axes[1, 1].set_xlabel("Heure")
axes[1, 1].set_ylabel("Nombre de requêtes")
axes[1, 1].set_title("Répartition horaire du trafic")

plt.suptitle("Analyse des logs Nginx — Combined Log Format", fontsize=13, fontweight="bold")
plt.show()
```

---

## Analyse de logs SSH/auth

### Structure de /var/log/auth.log

```
Mar 24 14:32:01 srv01 sshd[2341]: Accepted publickey for alice from 192.168.1.10 port 52341 ssh2
Mar 24 14:35:12 srv01 sshd[2342]: Failed password for root from 203.0.113.99 port 44231 ssh2
Mar 24 14:35:13 srv01 sshd[2342]: Failed password for root from 203.0.113.99 port 44231 ssh2
Mar 24 14:35:14 srv01 sshd[2342]: Failed password for invalid user admin from 203.0.113.99 port 44233 ssh2
```

```{code-cell} python
import re
from collections import Counter

# Tentative de lecture du vrai fichier, sinon simulation
auth_lines = []
for chemin in ["/var/log/auth.log", "/var/log/secure"]:
    if os.path.exists(chemin):
        try:
            with open(chemin, errors="replace") as f:
                auth_lines = f.readlines()
            if auth_lines:
                print(f"Fichier réel lu : {chemin} ({len(auth_lines)} lignes)")
                break
        except PermissionError:
            pass

if not auth_lines:
    print("Fichier auth.log absent ou inaccessible — utilisation de données simulées")
    random.seed(0)
    ips_attaquants = ["203.0.113.99", "198.51.100.200", "192.0.2.88", "10.10.10.5"]
    utilisateurs_cibles = ["root", "admin", "ubuntu", "oracle", "test", "pi"]
    utilisateurs_legit  = ["alice", "bob", "charlie"]
    debut_sim = datetime(2026, 3, 24, 0, 0, 0)

    for i in range(800):
        ts = debut_sim + timedelta(seconds=random.randint(0, 86399))
        ts_str = ts.strftime("%b %d %H:%M:%S")
        if random.random() < 0.85:
            ip   = random.choice(ips_attaquants)
            user = random.choice(utilisateurs_cibles)
            pid  = random.randint(2000, 9999)
            auth_lines.append(
                f"{ts_str} srv01 sshd[{pid}]: Failed password for {user} "
                f"from {ip} port {random.randint(30000, 65535)} ssh2\n"
            )
        else:
            ip   = f"192.168.1.{random.randint(10, 50)}"
            user = random.choice(utilisateurs_legit)
            pid  = random.randint(2000, 9999)
            auth_lines.append(
                f"{ts_str} srv01 sshd[{pid}]: Accepted publickey for {user} "
                f"from {ip} port {random.randint(40000, 65535)} ssh2\n"
            )

# Parsing
re_failed  = re.compile(r"Failed password for (?:invalid user )?(\S+) from (\S+)")
re_accepted = re.compile(r"Accepted \S+ for (\S+) from (\S+)")

echecs  = []
succes  = []

for ligne in auth_lines:
    m = re_failed.search(ligne)
    if m:
        echecs.append({"user": m.group(1), "ip": m.group(2)})
        continue
    m = re_accepted.search(ligne)
    if m:
        succes.append({"user": m.group(1), "ip": m.group(2)})

df_echecs = pd.DataFrame(echecs) if echecs else pd.DataFrame(columns=["user", "ip"])
df_succes  = pd.DataFrame(succes) if succes  else pd.DataFrame(columns=["user", "ip"])

print(f"\nTentatives échouées : {len(df_echecs)}")
print(f"Connexions réussies  : {len(df_succes)}")
if not df_echecs.empty:
    print("\nTop 5 IPs attaquantes :")
    print(df_echecs["ip"].value_counts().head(5).to_string())
    print("\nTop 5 comptes ciblés :")
    print(df_echecs["user"].value_counts().head(5).to_string())
```

```{code-cell} python
:tags: [hide-input]

if not df_echecs.empty:
    sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)

    fig, axes = plt.subplots(1, 2, figsize=(12, 5))

    top_ips_ssh = df_echecs["ip"].value_counts().head(8)
    axes[0].barh(top_ips_ssh.index[::-1], top_ips_ssh.values[::-1],
                 color=sns.color_palette("muted")[3])
    axes[0].set_xlabel("Nombre de tentatives")
    axes[0].set_title("Top 8 IPs — tentatives SSH échouées")

    top_users_ssh = df_echecs["user"].value_counts().head(8)
    axes[1].barh(top_users_ssh.index[::-1], top_users_ssh.values[::-1],
                 color=sns.color_palette("muted")[4])
    axes[1].set_xlabel("Nombre de tentatives")
    axes[1].set_title("Top 8 comptes ciblés")

    plt.suptitle(f"Analyse auth.log — {len(df_echecs)} tentatives échouées",
                 fontsize=13, fontweight="bold")
    plt.show()
```

```{admonition} fail2ban — réponse automatique aux attaques
:class: important
`fail2ban` surveille les logs d'authentification et bannit automatiquement les IPs dépassant un seuil de tentatives échouées via des règles iptables/nftables. Configuration de base : `maxretry = 5` sur une fenêtre de `findtime = 600s`, bannissement de `bantime = 3600s`. Les tentatives SSH sont la première source de bruit dans auth.log sur tout serveur exposé à Internet.
```

---

## Heatmap des niveaux de logs par heure

```{code-cell} python
:tags: [hide-input]

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)

# Simulation d'un flux de logs sur 24h avec distribution réaliste
random.seed(7)
np.random.seed(7)

niveaux = ["debug", "info", "notice", "warning", "error", "critical"]
heures  = list(range(24))

# Probabilités par niveau (info dominant, critical rare)
proba_base = {
    "debug"   : 0.20,
    "info"    : 0.50,
    "notice"  : 0.15,
    "warning" : 0.10,
    "error"   : 0.04,
    "critical": 0.01,
}

# Charge horaire : faible la nuit, pic à 9-11h et 14-16h
charge_horaire = np.array([
    0.2, 0.1, 0.1, 0.1, 0.2, 0.3,   # 0h-5h
    0.5, 0.8, 1.0, 1.2, 1.2, 1.1,   # 6h-11h
    0.9, 1.0, 1.3, 1.3, 1.1, 0.9,   # 12h-17h
    0.7, 0.6, 0.5, 0.4, 0.3, 0.2,   # 18h-23h
])

matrice = np.zeros((len(niveaux), 24))
for h_idx, h in enumerate(heures):
    nb_messages = int(charge_horaire[h_idx] * 200)
    for _ in range(nb_messages):
        niv = random.choices(niveaux, weights=list(proba_base.values()))[0]
        matrice[niveaux.index(niv), h_idx] += 1

df_heatmap = pd.DataFrame(matrice, index=niveaux, columns=heures)

fig, ax = plt.subplots(figsize=(14, 5))
sns.heatmap(
    df_heatmap,
    ax=ax,
    cmap="YlOrRd",
    annot=True,
    fmt=".0f",
    linewidths=0.5,
    cbar_kws={"label": "Nombre de messages"},
)
ax.set_xlabel("Heure de la journée")
ax.set_ylabel("Niveau syslog")
ax.set_title("Distribution des messages syslog par heure et niveau (simulation 24h)")
plt.show()
```

---

## Centralisation — ELK Stack

### Architecture ELK

La centralisation des logs est indispensable dès qu'un parc dépasse quelques serveurs. La pile **ELK** (Elastic Stack) est la référence :

```
┌─────────┐   ┌──────────┐   ┌─────────────┐   ┌──────────┐   ┌────────┐
│ Serveur │──▶│ Filebeat │──▶│  Logstash   │──▶│Elasticsearch│──▶│Kibana  │
│  (logs) │   │(collecte)│   │(parse/enrich)│   │  (stockage) │   │(viz)   │
└─────────┘   └──────────┘   └─────────────┘   └──────────┘   └────────┘
```

- **Filebeat** : agent léger installé sur chaque serveur, lit les fichiers de log et les envoie à Logstash ou directement à Elasticsearch
- **Logstash** : pipeline de traitement (input → filter → output) avec des centaines de plugins. Permet le parsing (grok), l'enrichissement (GeoIP, DNS), la normalisation des champs
- **Elasticsearch** : moteur de recherche et d'indexation distribué, stocke les documents JSON
- **Kibana** : interface web de visualisation, dashboards, alertes (Watcher), Machine Learning

### Configuration Filebeat minimale

```yaml
# /etc/filebeat/filebeat.yml
filebeat.inputs:
  - type: log
    enabled: true
    paths:
      - /var/log/nginx/access.log
    fields:
      service: nginx
      env: production

output.logstash:
  hosts: ["logstash.interne:5044"]
```

### Pipeline Logstash pour logs Nginx

```
input {
  beats { port => 5044 }
}

filter {
  if [fields][service] == "nginx" {
    grok {
      match => { "message" => "%{COMBINEDAPACHELOG}" }
    }
    geoip { source => "clientip" }
    date  { match => ["timestamp", "dd/MMM/yyyy:HH:mm:ss Z"] }
    mutate { remove_field => ["message", "agent", "ecs"] }
  }
}

output {
  elasticsearch {
    hosts => ["http://elasticsearch:9200"]
    index => "nginx-%{+YYYY.MM.dd}"
  }
}
```

---

## Alertes sur logs

### tail -f et grep en pipeline

```bash
# Surveiller les erreurs 500 en temps réel
tail -f /var/log/nginx/access.log | grep --line-buffered '" 500 '

# Compter les erreurs par minute
tail -f /var/log/nginx/access.log | \
  awk '/HTTP\/[0-9.]+" [5][0-9]{2}/ {count++} NR%100==0 {print count " erreurs/100 lignes"; count=0}'
```

### Script de surveillance avec webhook

```bash
#!/bin/bash
# /usr/local/bin/watch_errors.sh
WEBHOOK="https://hooks.slack.com/services/XXX/YYY/ZZZ"
FICHIER="/var/log/nginx/error.log"
SEUIL=10
FENETRE=60  # secondes

while true; do
    ERREURS=$(tail -n 200 "$FICHIER" | \
              awk -v limite="$(date -d "-${FENETRE} seconds" +%s)" \
              'BEGIN{c=0} {cmd="date -d \""$1" "$2"\" +%s 2>/dev/null"; cmd | getline ts; if(ts > limite) c++} END{print c}')

    if [ "$ERREURS" -gt "$SEUIL" ]; then
        MSG="⚠ ${ERREURS} erreurs Nginx dans les ${FENETRE}s sur $(hostname)"
        curl -s -X POST -H 'Content-type: application/json' \
             --data "{\"text\":\"${MSG}\"}" "$WEBHOOK"
    fi
    sleep 30
done
```

### journalctl -f avec filtre par priorité

```bash
# Suivre uniquement les erreurs et plus grave, en JSON
journalctl -f -p err --output=json | \
  python3 -c "
import sys, json
for line in sys.stdin:
    try:
        d = json.loads(line)
        print(f\"[{d.get('_HOSTNAME','?')}] {d.get('_SYSTEMD_UNIT','kernel')}: {d.get('MESSAGE','')}\")
    except: pass
"
```

---

## Visualisation de la rotation des logs

```{code-cell} python
:tags: [hide-input]

sns.set_theme(style="whitegrid", palette="muted", font_scale=1.1)

# Simulation de la taille des logs sur 30 jours avec rotation quotidienne
random.seed(3)
np.random.seed(3)

jours = pd.date_range(end=datetime(2026, 3, 24), periods=30, freq="D")
tailles_actif = []

# Croissance journalière entre 50 et 200 MiB selon la charge
for j in jours:
    heure_semaine = j.dayofweek
    if heure_semaine < 5:  # semaine
        taille = random.uniform(80, 200)
    else:  # weekend
        taille = random.uniform(30, 80)
    tailles_actif.append(taille)

# Archives compressées : taille réduite à ~15% après gzip
archives = []
for i, t in enumerate(tailles_actif[:-1]):
    archives.append({"jour": jours[i], "taille": t * 0.15, "generation": i + 1})

df_archives = pd.DataFrame(archives)

fig, axes = plt.subplots(2, 1, figsize=(13, 8))

# Taille du log actif par jour
axes[0].bar(jours, tailles_actif,
            color=[sns.color_palette("muted")[0] if j.dayofweek < 5
                   else sns.color_palette("muted")[3]
                   for j in jours],
            alpha=0.8)
axes[0].axhline(y=150, color="red", linestyle="--", linewidth=1.5,
                label="Seuil rotation (150 MiB)")
axes[0].set_ylabel("Taille (MiB)")
axes[0].set_title("Taille journalière du log Nginx actif (bleu=semaine, orange=weekend)")
axes[0].legend()
axes[0].xaxis.set_major_formatter(mdates.DateFormatter("%d/%m"))

# Archives — empilement sur 14 derniers jours
df_14 = df_archives.tail(14)
largeur = timedelta(hours=18)
for _, row in df_14.iterrows():
    generation = int(row["generation"])
    couleur_idx = min(generation - 1, len(sns.color_palette("muted")) - 1)
    axes[1].bar(row["jour"], row["taille"],
                width=0.7,
                color=sns.color_palette("muted")[couleur_idx % 8],
                alpha=0.7,
                label=f"J-{29 - int(row['generation'])}" if generation <= 4 else "")

axes[1].set_ylabel("Taille compressée (MiB)")
axes[1].set_title("Archives logs compressées (14 derniers jours, ~15% taille originale)")
axes[1].xaxis.set_major_formatter(mdates.DateFormatter("%d/%m"))

plt.suptitle("Cycle de rotation des logs — logrotate daily, rotate 14",
             fontsize=13, fontweight="bold")
plt.show()
```

---

## Résumé

L'analyse de logs en Linux mobilise deux familles d'outils : les outils de collecte/stockage (journald, rsyslog, logrotate) et les outils d'analyse (journalctl, grep/awk, Python, ELK). La maîtrise des deux est nécessaire pour diagnostiquer efficacement les incidents en production.

Points à retenir :

- **journald** offre des requêtes structurées puissantes grâce aux champs indexés ; préférer `--output=json` pour le parsing programmatique.
- **rsyslog** reste indispensable pour le forwarding réseau, la centralisation et l'intégration avec des SIEMs.
- **logrotate** doit être configuré avant que les logs grossissent — le `postrotate` et `delaycompress` sont les directives les plus souvent oubliées.
- Le **Combined Log Format** de Nginx/Apache se parse facilement avec une regex standard ; l'analyse statistique des codes HTTP et des IPs détecte la majorité des incidents.
- Les logs SSH dans `auth.log` sont un indicateur direct de la surface d'attaque exposée — automatiser leur surveillance avec fail2ban.
- À l'échelle, la pile ELK (Filebeat + Logstash + Elasticsearch + Kibana) est la solution standard pour la centralisation et la corrélation multi-sources.

| Outil | Rôle | Usage typique |
|-------|------|---------------|
| journald | Collecte structurée systemd | `journalctl -u service -p err` |
| rsyslog | Filtrage et forwarding | Centralisation syslog réseau |
| logrotate | Archivage et compression | Automatisation quotidienne |
| Python/pandas | Analyse statistique | Rapports, détection d'anomalies |
| ELK Stack | Centralisation à l'échelle | Parcs de > 10 serveurs |
