"""
Lorenz SZ-40/42 cipher simulator and Colossus statistical attack.
Implements:
1. LorenzMachine — full SZ-40/42 encryption/decryption in Baudot (ITA-2)
2. delta_attack — Colossus's chi-wheel-setting attack using delta statistics
3. depths_attack — demonstration of the "depths" XOR key-cancellation trick
4. ColossusCounter — simulates the tape-scanning count loop Colossus ran
Reference:
Good, Michie & Timms, General Report on Tunny (1945/2000), GCHQ.
Copeland (ed.), Colossus: The Secrets of Bletchley Park's Codebreaking Computers (2006).
No third-party dependencies. Run with: python3 lorenz_colossus.py
"""
from __future__ import annotations
import random
# ─────────────────────────────────────────────────────────────────────────────
# Baudot / ITA-2 encoding (5-bit)
# Only a representative subset needed for demonstration.
# ─────────────────────────────────────────────────────────────────────────────
BAUDOT: dict[str, int] = {
"A": 0b00011,
"B": 0b11001,
"C": 0b01110,
"D": 0b01001,
"E": 0b00001,
"F": 0b01101,
"G": 0b11010,
"H": 0b10100,
"I": 0b00110,
"J": 0b01011,
"K": 0b01111,
"L": 0b10010,
"M": 0b11100,
"N": 0b01100,
"O": 0b11000,
"P": 0b10110,
"Q": 0b10111,
"R": 0b01010,
"S": 0b00101,
"T": 0b10000,
"U": 0b00111,
"V": 0b11110,
"W": 0b10011,
"X": 0b11101,
"Y": 0b10101,
"Z": 0b10001,
" ": 0b00100,
}
BAUDOT_INV: dict[int, str] = {v: k for k, v in BAUDOT.items()}
def encode(text: str) -> list[int]:
return [BAUDOT[ch.upper()] for ch in text if ch.upper() in BAUDOT]
def decode(bits: list[int]) -> str:
return "".join(BAUDOT_INV.get(b, "?") for b in bits)
# ─────────────────────────────────────────────────────────────────────────────
# Wheel: a cyclic sequence of binary cams (0=dot, 1=cross)
# ─────────────────────────────────────────────────────────────────────────────
class Wheel:
"""A single Lorenz cam-wheel with a fixed pattern and current position."""
def __init__(self, pattern: list[int], name: str = ""):
self.pattern = pattern
self.size = len(pattern)
self.pos = 0
self.name = name
def current(self) -> int:
return self.pattern[self.pos]
def step(self) -> None:
self.pos = (self.pos + 1) % self.size
def reset(self, pos: int = 0) -> None:
self.pos = pos % self.size
def __repr__(self) -> str:
return f"Wheel({self.name}, size={self.size}, pos={self.pos})"
# ─────────────────────────────────────────────────────────────────────────────
# Lorenz SZ-40/42 machine
#
# Wheel sizes (number of cams):
# chi (χ): 41, 31, 29, 26, 23 — 5 wheels, add to every character
# psi (ψ): 43, 47, 51, 53, 59 — 5 wheels, only step when motor allows
# motor : 37 (μ37), 61 (μ61) — 2 wheels controlling psi stepping
#
# Encryption per character (5-bit XOR in each Baudot channel i):
# C[i] = P[i] XOR chi[i].current() XOR psi[i].current()
# ─────────────────────────────────────────────────────────────────────────────
CHI_SIZES = [41, 31, 29, 26, 23]
PSI_SIZES = [43, 47, 51, 53, 59]
MOTOR_SIZES = [37, 61]
def _random_pattern(size: int) -> list[int]:
"""Generate a random cam pattern for a wheel."""
return [random.randint(0, 1) for _ in range(size)]
class LorenzMachine:
"""
SZ-40/42 Lorenz cipher machine simulator.
The machine is initialised with random wheel patterns and starting
positions (as the actual operator would set them from a codebook).
Call set_positions() to fix known starting positions for testing.
"""
def __init__(self, seed: int | None = None):
rng = random.Random(seed)
def rp(size: int) -> list[int]:
return [rng.randint(0, 1) for _ in range(size)]
self.chi = [Wheel(rp(s), f"χ{i + 1}") for i, s in enumerate(CHI_SIZES)]
self.psi = [Wheel(rp(s), f"ψ{i + 1}") for i, s in enumerate(PSI_SIZES)]
self.mu37 = Wheel(rp(37), "μ37")
self.mu61 = Wheel(rp(61), "μ61")
def set_positions(
self,
chi_pos: list[int],
psi_pos: list[int],
mu37_pos: int = 0,
mu61_pos: int = 0,
) -> None:
for w, p in zip(self.chi, chi_pos):
w.reset(p)
for w, p in zip(self.psi, psi_pos):
w.reset(p)
self.mu37.reset(mu37_pos)
self.mu61.reset(mu61_pos)
def _keystream_char(self) -> int:
"""Return 5-bit keystream character and advance all wheels."""
chi_bits = [w.current() for w in self.chi]
psi_bits = [w.current() for w in self.psi]
key = 0
for i in range(5):
key |= (chi_bits[i] ^ psi_bits[i]) << i
# Advance chi wheels (always step)
for w in self.chi:
w.step()
# Advance mu37 (always steps)
self.mu37.step()
# Psi and mu61 step only when mu37's NEW current cam is 1
# (simplified: both μ wheels interact; this models the key behaviour)
if self.mu37.current() == 1:
self.mu61.step()
if self.mu61.current() == 1:
for w in self.psi:
w.step()
return key
def process(self, chars: list[int]) -> list[int]:
"""XOR each 5-bit character with the keystream."""
return [c ^ self._keystream_char() for c in chars]
def encrypt(self, plaintext: str) -> list[int]:
return self.process(encode(plaintext))
def decrypt(self, ciphertext: list[int]) -> str:
return decode(self.process(ciphertext))
# ─────────────────────────────────────────────────────────────────────────────
# "Depths" attack — two messages encrypted with the same wheel settings
#
# If C1 = P1 XOR K and C2 = P2 XOR K,
# then C1 XOR C2 = P1 XOR P2 (key cancels out).
# This is what the Athens/Vienna operator error produced on 30 Aug 1941.
# ─────────────────────────────────────────────────────────────────────────────
def depths_attack(c1: list[int], c2: list[int]) -> list[int]:
"""
XOR two ciphertexts encrypted with the same key.
Returns P1 XOR P2 — the key cancels out completely.
"""
length = min(len(c1), len(c2))
return [a ^ b for a, b in zip(c1[:length], c2[:length])]
# ─────────────────────────────────────────────────────────────────────────────
# Delta function — consecutive XOR difference
#
# ΔC[i] = C[i] XOR C[i+1]
# This is the statistical quantity Colossus counted.
# ─────────────────────────────────────────────────────────────────────────────
def delta(seq: list[int]) -> list[int]:
"""Compute ΔX[i] = X[i] XOR X[i+1] for a sequence of 5-bit values."""
return [a ^ b for a, b in zip(seq, seq[1:])]
# ─────────────────────────────────────────────────────────────────────────────
# ColossusCounter — simulates the statistical count Colossus performed
#
# For each candidate chi starting position, generate the chi stream,
# compute ΔC XOR Δχ on each of the 5 Baudot bits, and count zeros.
# The correct starting position maximises the zero count (dots dominate
# because Δψ = 0 about 60% of the time when psi wheels barely step).
# ─────────────────────────────────────────────────────────────────────────────
def _chi_stream(
machine: LorenzMachine, start_positions: list[int], length: int
) -> list[int]:
"""
Generate a chi-only keystream (ignore psi/motor) from given start positions.
"""
result = []
positions = [p % w.size for p, w in zip(start_positions, machine.chi)]
for _ in range(length):
val = 0
for i, w in enumerate(machine.chi):
val |= w.pattern[positions[i]] << i
for i, w in enumerate(machine.chi):
positions[i] = (positions[i] + 1) % w.size
result.append(val)
return result
def colossus_count(
ciphertext: list[int], machine: LorenzMachine, chi_pos: list[int]
) -> int:
"""
Count dots (zeros) in ΔC XOR Δχ across all 5 Baudot bits.
Higher count → more likely this is the correct chi position.
"""
chi_str = _chi_stream(machine, chi_pos, len(ciphertext))
dc = delta(ciphertext)
dchi = delta(chi_str)
count = 0
for dc_char, dchi_char in zip(dc, dchi):
xored = dc_char ^ dchi_char
# count zero bits across all 5 channels
count += 5 - bin(xored).count("1")
return count
def delta_attack(
ciphertext: list[int],
machine: LorenzMachine,
wheel_indices: list[int] | None = None,
verbose: bool = True,
) -> list[int]:
"""
Brute-force the starting positions of selected chi wheels using delta
statistics. Returns the list of best starting positions found.
wheel_indices: which chi wheels to attack (default: all 5).
For speed in demonstration, attacks one wheel at a time.
"""
if wheel_indices is None:
wheel_indices = list(range(5))
found = [0] * 5
for wi in wheel_indices:
best_pos, best_count = 0, -1
wheel = machine.chi[wi]
for pos in range(wheel.size):
trial = found[:]
trial[wi] = pos
count = colossus_count(ciphertext, machine, trial)
if count > best_count:
best_count = count
best_pos = pos
found[wi] = best_pos
if verbose:
true_pos = machine.chi[wi].pos # current pos after encryption
print(
f" χ{wi + 1} (size {wheel.size}): best pos={best_pos:2d} "
f"score={best_count}"
)
return found
# ─────────────────────────────────────────────────────────────────────────────
# Demonstration
# ─────────────────────────────────────────────────────────────────────────────
def demo_encryption() -> None:
print("=" * 60)
print("1. LORENZ SZ-40/42 ENCRYPTION / DECRYPTION")
print("=" * 60)
machine = LorenzMachine(seed=1943)
plaintext = "OPERATION OVERLORD BEGINS AT DAWN"
print(f"Plaintext : {plaintext}")
machine.set_positions([5, 3, 12, 8, 2], [7, 11, 4, 20, 1])
ciphertext = machine.encrypt(plaintext)
print(f"Ciphertext: {ciphertext}")
# Reset to same starting positions to decrypt
machine.set_positions([5, 3, 12, 8, 2], [7, 11, 4, 20, 1])
recovered = machine.decrypt(ciphertext)
print(f"Decrypted : {recovered}")
assert recovered == plaintext, "Decryption mismatch!"
print("Decryption verified ✓\n")
def demo_depths() -> None:
print("=" * 60)
print("2. THE DEPTHS ATTACK — ATHENS/VIENNA 1941")
print("=" * 60)
machine = LorenzMachine(seed=42)
# Same wheel settings (the operator's fatal mistake)
settings = ([10, 5, 8, 3, 15], [2, 9, 14, 1, 6])
p1 = "SIXTH ARMY TO WITHDRAW TO DEFENSIVE LINE AT RIVER DON"
p2 = "SIXTH ARMY ACKNOWLEDGES ORDER AND WILL COMPLY WITH ORDERS"
machine.set_positions(*settings)
c1 = machine.encrypt(p1)
machine.set_positions(*settings)
c2 = machine.encrypt(p2)
print(f"P1 : {p1}")
print(f"P2 : {p2}")
print(f"C1 XOR C2 : {depths_attack(c1, c2)}")
print()
print("The key has cancelled out completely.")
print("From C1 XOR C2, Bill Tutte reconstructed the entire")
print("Lorenz wheel structure — without ever seeing the machine.\n")
def demo_delta_attack() -> None:
print("=" * 60)
print("3. COLOSSUS DELTA ATTACK — CHI WHEEL FINDING")
print("=" * 60)
machine = LorenzMachine(seed=1944)
true_chi = [7, 14, 3, 19, 11]
true_psi = [5, 2, 22, 8, 4]
machine.set_positions(true_chi, true_psi)
message = (
"THE ATTACK ON THE WESTERN FRONT WILL COMMENCE ON THE SIXTH"
" DAY OF JUNE THE MAIN LANDING WILL BE AT NORMANDY NOT AT THE "
"PAS DE CALAIS THIS IS THE DECEPTION PLAN BODYGUARD OVERLORD"
)
ciphertext = machine.encrypt(message)
print(f"Message length: {len(message)} characters")
print(f"True chi positions: {true_chi}")
print("\nRunning Colossus-style delta count (one wheel at a time)...")
# Reset machine — we hand it only the wheel patterns, not starting positions
# (as if we built the machine from Tutte's reconstruction but don't know settings)
attack_machine = LorenzMachine(seed=1944) # same patterns, random start
found = delta_attack(ciphertext, attack_machine, verbose=True)
print(f"\nFound chi positions : {found}")
print(f"True chi positions : {true_chi}")
# Check how many match
matches = sum(f == t for f, t in zip(found, true_chi))
print(f"Correct: {matches}/5 chi wheel positions found\n")
def demo_key_period() -> None:
print("=" * 60)
print("4. KEY PERIOD CALCULATION")
print("=" * 60)
chi_period = 1
for s in CHI_SIZES:
chi_period *= s
psi_period = 1
for s in PSI_SIZES:
psi_period *= s
motor_period = 1
for s in MOTOR_SIZES:
motor_period *= s
total = chi_period * psi_period * motor_period
print(f"Chi wheel sizes : {CHI_SIZES} → period = {chi_period:,}")
print(f"Psi wheel sizes : {PSI_SIZES} → period = {psi_period:,}")
print(f"Motor wheel sizes: {MOTOR_SIZES} → period = {motor_period:,}")
print(f"Total key period : {total:,}")
print(f" ≈ {total:.2e}")
print("\nAt 1 brute-force attempt per minute, exhaustive search would")
years = total / (60 * 60 * 24 * 365)
print(f"take approximately {years:.2e} years. Statistical attack needed.\n")
if __name__ == "__main__":
demo_key_period()
demo_encryption()
demo_depths()
demo_delta_attack()
print("=" * 60)
print("All demos complete.")
print("See the article: /articles/tmc-0006-colossus-first-electronic-computer")
print("=" * 60)