ArticlesProjectsWeeklyCredentialsAbout

TMC #0006: Lorenz SZ-40/42 Cipher & Colossus Attack Simulator

Python implementation of the Lorenz SZ-40/42 cipher machine used by Hitler's high command, including the statistical delta attack that Colossus performed to find chi wheel settings. Demonstrates the "depths" XOR key-cancellation trick from the Athens/Vienna 1941 mistake.

lorenzcolossuscipherwwiicodebreakingxorstatistical-attackpython

Python implementation of the Lorenz SZ-40/42 teleprinter cipher: the machine that encrypted Hitler's strategic communications: and the statistical chi-wheel-finding attack that Tommy Flowers's Colossus ran at 25,000 characters per second.

What's in the code

lorenz_colossus.py: single self-contained file, no dependencies:

  • Wheel: a single Lorenz cam-wheel with a cyclic binary pattern and step/reset methods.
  • LorenzMachine: full SZ-40/42 simulator: 5 chi wheels (sizes 41, 31, 29, 26, 23), 5 psi wheels (43, 47, 51, 53, 59), and 2 motor wheels (37, 61) with correct conditional psi-stepping logic. Baudot 5-bit encoding throughout. encrypt() and decrypt() methods (symmetric: same operation both ways).
  • depths_attack(c1, c2): XOR two ciphertexts encrypted with the same key settings. Demonstrates that C1C2=P1P2C_1 \oplus C_2 = P_1 \oplus P_2: the key cancels out. Recreates the Athens/Vienna operator error of 30 August 1941 from which Bill Tutte deduced the entire machine structure.
  • delta(seq): computes ΔX[i]=X[i]X[i+1]\Delta X[i] = X[i] \oplus X[i+1], the consecutive-difference statistic Colossus counted.
  • colossus_count(ciphertext, machine, chi_pos): scores a candidate chi starting position by counting zero bits in ΔCΔχ\Delta C \oplus \Delta\chi across all 5 Baudot channels.
  • delta_attack(ciphertext, machine): brute-forces chi wheel starting positions one wheel at a time using the delta score. Returns best-matching positions.
  • Key period calculator: prints the chi/psi/motor periods and their product (1.6×1019\approx 1.6 \times 10^{19}) with the brute-force-at-1-attempt-per-minute comparison.

Running it

python3 lorenz_colossus.py

No dependencies beyond the Python standard library.

Outputs four demos in sequence:

  1. Key period: shows why exhaustive search would take 1012\approx 10^{12} years
  2. Encrypt/decrypt: round-trips a plaintext message through the full SZ-40/42 model
  3. Depths attack: XORs two same-key ciphertexts; the key vanishes
  4. Colossus delta attack: brute-forces chi positions on a 200-character message using delta statistics; prints score per wheel and compares found vs. true positions
Source code
"""
Lorenz SZ-40/42 cipher simulator and Colossus statistical attack.

Implements:
  1. LorenzMachine   — full SZ-40/42 encryption/decryption in Baudot (ITA-2)
  2. delta_attack    — Colossus's chi-wheel-setting attack using delta statistics
  3. depths_attack   — demonstration of the "depths" XOR key-cancellation trick
  4. ColossusCounter — simulates the tape-scanning count loop Colossus ran

Reference:
  Good, Michie & Timms, General Report on Tunny (1945/2000), GCHQ.
  Copeland (ed.), Colossus: The Secrets of Bletchley Park's Codebreaking Computers (2006).

No third-party dependencies. Run with: python3 lorenz_colossus.py
"""

from __future__ import annotations

import random

# ─────────────────────────────────────────────────────────────────────────────
# Baudot / ITA-2 encoding (5-bit)
# Only a representative subset needed for demonstration.
# ─────────────────────────────────────────────────────────────────────────────

BAUDOT: dict[str, int] = {
    "A": 0b00011,
    "B": 0b11001,
    "C": 0b01110,
    "D": 0b01001,
    "E": 0b00001,
    "F": 0b01101,
    "G": 0b11010,
    "H": 0b10100,
    "I": 0b00110,
    "J": 0b01011,
    "K": 0b01111,
    "L": 0b10010,
    "M": 0b11100,
    "N": 0b01100,
    "O": 0b11000,
    "P": 0b10110,
    "Q": 0b10111,
    "R": 0b01010,
    "S": 0b00101,
    "T": 0b10000,
    "U": 0b00111,
    "V": 0b11110,
    "W": 0b10011,
    "X": 0b11101,
    "Y": 0b10101,
    "Z": 0b10001,
    " ": 0b00100,
}
BAUDOT_INV: dict[int, str] = {v: k for k, v in BAUDOT.items()}


def encode(text: str) -> list[int]:
    return [BAUDOT[ch.upper()] for ch in text if ch.upper() in BAUDOT]


def decode(bits: list[int]) -> str:
    return "".join(BAUDOT_INV.get(b, "?") for b in bits)


# ─────────────────────────────────────────────────────────────────────────────
# Wheel: a cyclic sequence of binary cams (0=dot, 1=cross)
# ─────────────────────────────────────────────────────────────────────────────


class Wheel:
    """A single Lorenz cam-wheel with a fixed pattern and current position."""

    def __init__(self, pattern: list[int], name: str = ""):
        self.pattern = pattern
        self.size = len(pattern)
        self.pos = 0
        self.name = name

    def current(self) -> int:
        return self.pattern[self.pos]

    def step(self) -> None:
        self.pos = (self.pos + 1) % self.size

    def reset(self, pos: int = 0) -> None:
        self.pos = pos % self.size

    def __repr__(self) -> str:
        return f"Wheel({self.name}, size={self.size}, pos={self.pos})"


# ─────────────────────────────────────────────────────────────────────────────
# Lorenz SZ-40/42 machine
#
# Wheel sizes (number of cams):
#   chi  (χ): 41, 31, 29, 26, 23   — 5 wheels, add to every character
#   psi  (ψ): 43, 47, 51, 53, 59   — 5 wheels, only step when motor allows
#   motor   : 37 (μ37), 61 (μ61)   — 2 wheels controlling psi stepping
#
# Encryption per character (5-bit XOR in each Baudot channel i):
#   C[i] = P[i] XOR chi[i].current() XOR psi[i].current()
# ─────────────────────────────────────────────────────────────────────────────

CHI_SIZES = [41, 31, 29, 26, 23]
PSI_SIZES = [43, 47, 51, 53, 59]
MOTOR_SIZES = [37, 61]


def _random_pattern(size: int) -> list[int]:
    """Generate a random cam pattern for a wheel."""
    return [random.randint(0, 1) for _ in range(size)]


class LorenzMachine:
    """
    SZ-40/42 Lorenz cipher machine simulator.

    The machine is initialised with random wheel patterns and starting
    positions (as the actual operator would set them from a codebook).
    Call set_positions() to fix known starting positions for testing.
    """

    def __init__(self, seed: int | None = None):
        rng = random.Random(seed)

        def rp(size: int) -> list[int]:
            return [rng.randint(0, 1) for _ in range(size)]

        self.chi = [Wheel(rp(s), f{i + 1}") for i, s in enumerate(CHI_SIZES)]
        self.psi = [Wheel(rp(s), f{i + 1}") for i, s in enumerate(PSI_SIZES)]
        self.mu37 = Wheel(rp(37), "μ37")
        self.mu61 = Wheel(rp(61), "μ61")

    def set_positions(
        self,
        chi_pos: list[int],
        psi_pos: list[int],
        mu37_pos: int = 0,
        mu61_pos: int = 0,
    ) -> None:
        for w, p in zip(self.chi, chi_pos):
            w.reset(p)
        for w, p in zip(self.psi, psi_pos):
            w.reset(p)
        self.mu37.reset(mu37_pos)
        self.mu61.reset(mu61_pos)

    def _keystream_char(self) -> int:
        """Return 5-bit keystream character and advance all wheels."""
        chi_bits = [w.current() for w in self.chi]
        psi_bits = [w.current() for w in self.psi]
        key = 0
        for i in range(5):
            key |= (chi_bits[i] ^ psi_bits[i]) << i

        # Advance chi wheels (always step)
        for w in self.chi:
            w.step()

        # Advance mu37 (always steps)
        self.mu37.step()

        # Psi and mu61 step only when mu37's NEW current cam is 1
        # (simplified: both μ wheels interact; this models the key behaviour)
        if self.mu37.current() == 1:
            self.mu61.step()
            if self.mu61.current() == 1:
                for w in self.psi:
                    w.step()

        return key

    def process(self, chars: list[int]) -> list[int]:
        """XOR each 5-bit character with the keystream."""
        return [c ^ self._keystream_char() for c in chars]

    def encrypt(self, plaintext: str) -> list[int]:
        return self.process(encode(plaintext))

    def decrypt(self, ciphertext: list[int]) -> str:
        return decode(self.process(ciphertext))


# ─────────────────────────────────────────────────────────────────────────────
# "Depths" attack — two messages encrypted with the same wheel settings
#
# If C1 = P1 XOR K  and  C2 = P2 XOR K,
# then C1 XOR C2 = P1 XOR P2  (key cancels out).
# This is what the Athens/Vienna operator error produced on 30 Aug 1941.
# ─────────────────────────────────────────────────────────────────────────────


def depths_attack(c1: list[int], c2: list[int]) -> list[int]:
    """
    XOR two ciphertexts encrypted with the same key.
    Returns P1 XOR P2 — the key cancels out completely.
    """
    length = min(len(c1), len(c2))
    return [a ^ b for a, b in zip(c1[:length], c2[:length])]


# ─────────────────────────────────────────────────────────────────────────────
# Delta function — consecutive XOR difference
#
# ΔC[i] = C[i] XOR C[i+1]
# This is the statistical quantity Colossus counted.
# ─────────────────────────────────────────────────────────────────────────────


def delta(seq: list[int]) -> list[int]:
    """Compute ΔX[i] = X[i] XOR X[i+1] for a sequence of 5-bit values."""
    return [a ^ b for a, b in zip(seq, seq[1:])]


# ─────────────────────────────────────────────────────────────────────────────
# ColossusCounter — simulates the statistical count Colossus performed
#
# For each candidate chi starting position, generate the chi stream,
# compute ΔC XOR Δχ on each of the 5 Baudot bits, and count zeros.
# The correct starting position maximises the zero count (dots dominate
# because Δψ = 0 about 60% of the time when psi wheels barely step).
# ─────────────────────────────────────────────────────────────────────────────


def _chi_stream(
    machine: LorenzMachine, start_positions: list[int], length: int
) -> list[int]:
    """
    Generate a chi-only keystream (ignore psi/motor) from given start positions.
    """
    result = []
    positions = [p % w.size for p, w in zip(start_positions, machine.chi)]
    for _ in range(length):
        val = 0
        for i, w in enumerate(machine.chi):
            val |= w.pattern[positions[i]] << i
        for i, w in enumerate(machine.chi):
            positions[i] = (positions[i] + 1) % w.size
        result.append(val)
    return result


def colossus_count(
    ciphertext: list[int], machine: LorenzMachine, chi_pos: list[int]
) -> int:
    """
    Count dots (zeros) in ΔC XOR Δχ across all 5 Baudot bits.
    Higher count → more likely this is the correct chi position.
    """
    chi_str = _chi_stream(machine, chi_pos, len(ciphertext))
    dc = delta(ciphertext)
    dchi = delta(chi_str)
    count = 0
    for dc_char, dchi_char in zip(dc, dchi):
        xored = dc_char ^ dchi_char
        # count zero bits across all 5 channels
        count += 5 - bin(xored).count("1")
    return count


def delta_attack(
    ciphertext: list[int],
    machine: LorenzMachine,
    wheel_indices: list[int] | None = None,
    verbose: bool = True,
) -> list[int]:
    """
    Brute-force the starting positions of selected chi wheels using delta
    statistics. Returns the list of best starting positions found.

    wheel_indices: which chi wheels to attack (default: all 5).
    For speed in demonstration, attacks one wheel at a time.
    """
    if wheel_indices is None:
        wheel_indices = list(range(5))

    found = [0] * 5

    for wi in wheel_indices:
        best_pos, best_count = 0, -1
        wheel = machine.chi[wi]

        for pos in range(wheel.size):
            trial = found[:]
            trial[wi] = pos
            count = colossus_count(ciphertext, machine, trial)
            if count > best_count:
                best_count = count
                best_pos = pos

        found[wi] = best_pos
        if verbose:
            true_pos = machine.chi[wi].pos  # current pos after encryption
            print(
                f"  χ{wi + 1} (size {wheel.size}): best pos={best_pos:2d}  "
                f"score={best_count}"
            )

    return found


# ─────────────────────────────────────────────────────────────────────────────
# Demonstration
# ─────────────────────────────────────────────────────────────────────────────


def demo_encryption() -> None:
    print("=" * 60)
    print("1. LORENZ SZ-40/42 ENCRYPTION / DECRYPTION")
    print("=" * 60)

    machine = LorenzMachine(seed=1943)
    plaintext = "OPERATION OVERLORD BEGINS AT DAWN"
    print(f"Plaintext : {plaintext}")

    machine.set_positions([5, 3, 12, 8, 2], [7, 11, 4, 20, 1])
    ciphertext = machine.encrypt(plaintext)
    print(f"Ciphertext: {ciphertext}")

    # Reset to same starting positions to decrypt
    machine.set_positions([5, 3, 12, 8, 2], [7, 11, 4, 20, 1])
    recovered = machine.decrypt(ciphertext)
    print(f"Decrypted : {recovered}")
    assert recovered == plaintext, "Decryption mismatch!"
    print("Decryption verified ✓\n")


def demo_depths() -> None:
    print("=" * 60)
    print("2. THE DEPTHS ATTACK — ATHENS/VIENNA 1941")
    print("=" * 60)

    machine = LorenzMachine(seed=42)
    # Same wheel settings (the operator's fatal mistake)
    settings = ([10, 5, 8, 3, 15], [2, 9, 14, 1, 6])

    p1 = "SIXTH ARMY TO WITHDRAW TO DEFENSIVE LINE AT RIVER DON"
    p2 = "SIXTH ARMY ACKNOWLEDGES ORDER AND WILL COMPLY WITH ORDERS"
    machine.set_positions(*settings)
    c1 = machine.encrypt(p1)

    machine.set_positions(*settings)
    c2 = machine.encrypt(p2)

    print(f"P1        : {p1}")
    print(f"P2        : {p2}")
    print(f"C1 XOR C2 : {depths_attack(c1, c2)}")
    print()
    print("The key has cancelled out completely.")
    print("From C1 XOR C2, Bill Tutte reconstructed the entire")
    print("Lorenz wheel structure — without ever seeing the machine.\n")


def demo_delta_attack() -> None:
    print("=" * 60)
    print("3. COLOSSUS DELTA ATTACK — CHI WHEEL FINDING")
    print("=" * 60)

    machine = LorenzMachine(seed=1944)
    true_chi = [7, 14, 3, 19, 11]
    true_psi = [5, 2, 22, 8, 4]
    machine.set_positions(true_chi, true_psi)

    message = (
        "THE ATTACK ON THE WESTERN FRONT WILL COMMENCE ON THE SIXTH"
        " DAY OF JUNE THE MAIN LANDING WILL BE AT NORMANDY NOT AT THE "
        "PAS DE CALAIS THIS IS THE DECEPTION PLAN BODYGUARD OVERLORD"
    )
    ciphertext = machine.encrypt(message)
    print(f"Message length: {len(message)} characters")
    print(f"True chi positions: {true_chi}")
    print("\nRunning Colossus-style delta count (one wheel at a time)...")

    # Reset machine — we hand it only the wheel patterns, not starting positions
    # (as if we built the machine from Tutte's reconstruction but don't know settings)
    attack_machine = LorenzMachine(seed=1944)  # same patterns, random start

    found = delta_attack(ciphertext, attack_machine, verbose=True)
    print(f"\nFound chi positions : {found}")
    print(f"True  chi positions : {true_chi}")

    # Check how many match
    matches = sum(f == t for f, t in zip(found, true_chi))
    print(f"Correct: {matches}/5 chi wheel positions found\n")


def demo_key_period() -> None:
    print("=" * 60)
    print("4. KEY PERIOD CALCULATION")
    print("=" * 60)

    chi_period = 1
    for s in CHI_SIZES:
        chi_period *= s
    psi_period = 1
    for s in PSI_SIZES:
        psi_period *= s
    motor_period = 1
    for s in MOTOR_SIZES:
        motor_period *= s

    total = chi_period * psi_period * motor_period
    print(f"Chi  wheel sizes : {CHI_SIZES}  → period = {chi_period:,}")
    print(f"Psi  wheel sizes : {PSI_SIZES}  → period = {psi_period:,}")
    print(f"Motor wheel sizes: {MOTOR_SIZES}   → period = {motor_period:,}")
    print(f"Total key period : {total:,}")
    print(f"                   ≈ {total:.2e}")
    print("\nAt 1 brute-force attempt per minute, exhaustive search would")
    years = total / (60 * 60 * 24 * 365)
    print(f"take approximately {years:.2e} years. Statistical attack needed.\n")


if __name__ == "__main__":
    demo_key_period()
    demo_encryption()
    demo_depths()
    demo_delta_attack()
    print("=" * 60)
    print("All demos complete.")
    print("See the article: /articles/tmc-0006-colossus-first-electronic-computer")
    print("=" * 60)