"""
mycin.py -- MYCIN-style backward-chaining expert system with certainty factors.
Implements:
- Production rules with certainty factors (CF in [-1.0, 1.0])
- Backward chaining goal tree with depth-first resolution
- Certainty propagation: Shortliffe-Buchanan combination formula
- Context model with ask-the-user fallback
- Simplified bacteraemia knowledge base from the 1979 MYCIN evaluation domain
Run: python mycin.py
Reference:
Shortliffe, E.H. (1976). Computer-Based Medical Consultations: MYCIN.
American Elsevier.
Buchanan, B.G. and Shortliffe, E.H. (eds) (1984). Rule-Based Expert Systems.
Addison-Wesley.
"""
from __future__ import annotations
from dataclasses import dataclass
from typing import Callable
# ---------------------------------------------------------------------------
# Certainty factor arithmetic (Shortliffe & Buchanan 1975)
# ---------------------------------------------------------------------------
def combine_cf(cf1: float, cf2: float) -> float:
"""
Combine two certainty factors bearing on the same hypothesis.
Rules:
Both positive: cf1 + cf2 * (1 - cf1)
Both negative: cf1 + cf2 * (1 + cf1)
Mixed sign: (cf1 + cf2) / (1 - min(|cf1|, |cf2|))
"""
if cf1 >= 0 and cf2 >= 0:
return cf1 + cf2 * (1.0 - cf1)
if cf1 <= 0 and cf2 <= 0:
return cf1 + cf2 * (1.0 + cf1)
# mixed sign
denom = 1.0 - min(abs(cf1), abs(cf2))
if denom == 0.0:
return 0.0
return (cf1 + cf2) / denom
def premise_cf(conjuncts: list[float]) -> float:
"""
CF of a conjunctive premise = minimum CF of its conjuncts.
(Weakest-link principle.)
"""
return min(conjuncts) if conjuncts else 1.0
def propagate_cf(rule_cf: float, prem_cf: float) -> float:
"""
CF of a conclusion = rule CF * max(0, premise CF).
Negative premise certainty blocks the rule.
"""
return rule_cf * max(0.0, prem_cf)
# ---------------------------------------------------------------------------
# Rule and knowledge base
# ---------------------------------------------------------------------------
@dataclass
class Rule:
"""
A production rule:
IF premise (list of (parameter, value) pairs)
THEN (conclusion_parameter, conclusion_value) with certainty cf
"""
rule_id: str
premise: list[tuple[str, str]] # [(param, value), ...]
conclusion: tuple[str, str] # (param, value)
cf: float # certainty factor of this rule
class KnowledgeBase:
def __init__(self) -> None:
self._rules: list[Rule] = []
def add(self, rule: Rule) -> None:
self._rules.append(rule)
def rules_concluding(self, param: str, value: str) -> list[Rule]:
"""Return all rules whose conclusion is (param, value)."""
return [r for r in self._rules if r.conclusion == (param, value)]
def all_values(self, param: str) -> list[str]:
"""Return all values that any rule can conclude for param."""
return list({r.conclusion[1] for r in self._rules if r.conclusion[0] == param})
# ---------------------------------------------------------------------------
# Context model
# ---------------------------------------------------------------------------
class Context:
"""
Stores (parameter, value) -> CF facts.
Supports 'known' facts (entered directly) and derived facts (computed).
"""
def __init__(self, known: dict[tuple[str, str], float] | None = None) -> None:
self._facts: dict[tuple[str, str], float] = dict(known or {})
self._asked: set[str] = set()
def get_cf(self, param: str, value: str) -> float | None:
return self._facts.get((param, value))
def set_cf(self, param: str, value: str, cf: float) -> None:
self._facts[(param, value)] = cf
def get_all(self, param: str) -> list[tuple[str, float]]:
return [(v, cf) for (p, v), cf in self._facts.items() if p == param]
def already_asked(self, param: str) -> bool:
return param in self._asked
def mark_asked(self, param: str) -> None:
self._asked.add(param)
# ---------------------------------------------------------------------------
# Backward chaining inference engine
# ---------------------------------------------------------------------------
AskFn = Callable[[str, list[str]], dict[str, float]]
def backward_chain(
param: str,
value: str,
kb: KnowledgeBase,
ctx: Context,
ask: AskFn,
depth: int = 0,
) -> float:
"""
Derive the certainty factor for (param, value) by backward chaining.
1. If already in context, return it.
2. Try all rules that conclude (param, value); combine their CFs.
3. If no rule fires and param has not been asked, invoke ask() for
all known possible values of the parameter at once.
"""
cached = ctx.get_cf(param, value)
if cached is not None:
return cached
# Try all applicable rules
cf_combined = 0.0
any_fired = False
for rule in kb.rules_concluding(param, value):
# Evaluate each conjunct in the premise
conjunct_cfs: list[float] = []
for prem_param, prem_value in rule.premise:
prem_cf_val = backward_chain(
prem_param, prem_value, kb, ctx, ask, depth + 1
)
conjunct_cfs.append(prem_cf_val)
prem_total = premise_cf(conjunct_cfs)
rule_contribution = propagate_cf(rule.cf, prem_total)
if rule_contribution > 0.0 or rule_contribution < 0.0:
cf_combined = combine_cf(cf_combined, rule_contribution)
any_fired = True
if any_fired:
ctx.set_cf(param, value, cf_combined)
return cf_combined
# No rule could derive it: ask the user (once per parameter)
if not ctx.already_asked(param):
ctx.mark_asked(param)
possible_values = kb.all_values(param)
if possible_values:
answers = ask(param, possible_values)
for v, cf in answers.items():
ctx.set_cf(param, v, cf)
# After populating from user, try again
cached = ctx.get_cf(param, value)
if cached is not None:
return cached
# Unknown: return 0 (no evidence either way)
ctx.set_cf(param, value, 0.0)
return 0.0
# ---------------------------------------------------------------------------
# Bacteraemia knowledge base
# (Simplified subset of MYCIN's ~600 rules; covers the most common organisms)
# ---------------------------------------------------------------------------
def build_bacteraemia_kb() -> KnowledgeBase:
"""
Subset of MYCIN bacteraemia rules.
Parameters used:
gram-stain : positive | negative
morphology : rod | coccus
portal-of-entry : gi-tract | urinary-tract | respiratory | skin | unknown
immunosuppressed : yes | no
burn-patient : yes | no
organism : e-coli | pseudomonas | bacteroides | klebsiella |
staphylococcus | streptococcus | proteus
"""
kb = KnowledgeBase()
# E. coli: gram-negative rod, GI or urinary portal
kb.add(
Rule(
"R01",
premise=[
("gram-stain", "negative"),
("morphology", "rod"),
("portal-of-entry", "gi-tract"),
],
conclusion=("organism", "e-coli"),
cf=0.8,
)
)
kb.add(
Rule(
"R02",
premise=[
("gram-stain", "negative"),
("morphology", "rod"),
("portal-of-entry", "urinary-tract"),
],
conclusion=("organism", "e-coli"),
cf=0.7,
)
)
# Pseudomonas: gram-negative rod, burn patient or immunosuppressed
kb.add(
Rule(
"R03",
premise=[
("gram-stain", "negative"),
("morphology", "rod"),
("burn-patient", "yes"),
],
conclusion=("organism", "pseudomonas"),
cf=0.8,
)
)
kb.add(
Rule(
"R04",
premise=[
("gram-stain", "negative"),
("morphology", "rod"),
("immunosuppressed", "yes"),
],
conclusion=("organism", "pseudomonas"),
cf=0.6,
)
)
# Bacteroides: gram-negative rod, GI portal
kb.add(
Rule(
"R05",
premise=[
("gram-stain", "negative"),
("morphology", "rod"),
("portal-of-entry", "gi-tract"),
],
conclusion=("organism", "bacteroides"),
cf=0.5,
)
)
# Klebsiella: gram-negative rod, respiratory portal
kb.add(
Rule(
"R06",
premise=[
("gram-stain", "negative"),
("morphology", "rod"),
("portal-of-entry", "respiratory"),
],
conclusion=("organism", "klebsiella"),
cf=0.7,
)
)
# Staphylococcus: gram-positive coccus, skin portal
kb.add(
Rule(
"R07",
premise=[
("gram-stain", "positive"),
("morphology", "coccus"),
("portal-of-entry", "skin"),
],
conclusion=("organism", "staphylococcus"),
cf=0.8,
)
)
kb.add(
Rule(
"R08",
premise=[
("gram-stain", "positive"),
("morphology", "coccus"),
("immunosuppressed", "yes"),
],
conclusion=("organism", "staphylococcus"),
cf=0.5,
)
)
# Streptococcus: gram-positive coccus, respiratory portal
kb.add(
Rule(
"R09",
premise=[
("gram-stain", "positive"),
("morphology", "coccus"),
("portal-of-entry", "respiratory"),
],
conclusion=("organism", "streptococcus"),
cf=0.7,
)
)
# Proteus: gram-negative rod, urinary portal
kb.add(
Rule(
"R10",
premise=[
("gram-stain", "negative"),
("morphology", "rod"),
("portal-of-entry", "urinary-tract"),
],
conclusion=("organism", "proteus"),
cf=0.5,
)
)
return kb
# ---------------------------------------------------------------------------
# Demo cases
# ---------------------------------------------------------------------------
CASES = [
{
"name": "Case A: burn patient, gram-negative rod, GI portal",
"known": {
("gram-stain", "negative"): 1.0,
("morphology", "rod"): 1.0,
("portal-of-entry", "gi-tract"): 1.0,
("burn-patient", "yes"): 1.0,
("immunosuppressed", "no"): 1.0,
},
},
{
"name": "Case B: immunosuppressed, gram-positive coccus, respiratory portal",
"known": {
("gram-stain", "positive"): 1.0,
("morphology", "coccus"): 1.0,
("portal-of-entry", "respiratory"): 1.0,
("immunosuppressed", "yes"): 1.0,
("burn-patient", "no"): 1.0,
},
},
{
"name": "Case C: uncertain gram stain (CF 0.6 negative), urinary portal",
"known": {
("gram-stain", "negative"): 0.6,
("morphology", "rod"): 0.9,
("portal-of-entry", "urinary-tract"): 1.0,
("burn-patient", "no"): 1.0,
("immunosuppressed", "no"): 1.0,
},
},
]
ORGANISMS = [
"e-coli",
"pseudomonas",
"bacteroides",
"klebsiella",
"staphylococcus",
"streptococcus",
"proteus",
]
ANTIBIOTIC_MAP = {
"e-coli": ["ampicillin", "gentamicin"],
"pseudomonas": ["ticarcillin", "tobramycin"],
"bacteroides": ["metronidazole", "clindamycin"],
"klebsiella": ["cefazolin", "gentamicin"],
"staphylococcus": ["nafcillin", "vancomycin"],
"streptococcus": ["penicillin-G", "ampicillin"],
"proteus": ["ampicillin", "cephalothin"],
}
def silent_ask(param: str, values: list[str]) -> dict[str, float]:
"""No-op ask function for demo (all answers pre-loaded in context)."""
return {}
def run_case(case: dict, kb: KnowledgeBase) -> None:
print(f"\n{'=' * 60}")
print(case["name"])
print(f"{'=' * 60}")
ctx = Context(known=case["known"])
results: list[tuple[str, float]] = []
for organism in ORGANISMS:
cf = backward_chain("organism", organism, kb, ctx, silent_ask)
results.append((organism, cf))
results.sort(key=lambda x: -x[1])
print("\nOrganism rankings:")
print(f" {'Organism':<20} {'CF':>6} Antibiotics")
print(f" {'-' * 20} {'-' * 6} {'-' * 30}")
for organism, cf in results:
if cf > 0.0:
drugs = ", ".join(ANTIBIOTIC_MAP.get(organism, ["unknown"]))
bar = "#" * int(cf * 20)
print(f" {organism:<20} {cf:>5.2f} [{bar:<20}] {drugs}")
if results and results[0][1] > 0.0:
best = results[0][0]
print(f"\nRecommended therapy for most likely organism ({best}):")
for drug in ANTIBIOTIC_MAP.get(best, []):
print(f" -- {drug}")
def demo_cf_arithmetic() -> None:
print("=" * 60)
print("Certainty factor arithmetic (Shortliffe-Buchanan 1975)")
print("=" * 60)
examples = [
(0.8, 0.6, "two strong positive"),
(0.8, -0.4, "positive + weak negative"),
(-0.7, -0.5, "two negatives"),
(0.5, 0.5, "two equal positives"),
(1.0, 0.0, "certain + no evidence"),
]
print(f"\n {'CF1':>5} {'CF2':>5} {'Combined':>8} Note")
print(f" {'---':>5} {'---':>5} {'--------':>8}")
for cf1, cf2, note in examples:
combined = combine_cf(cf1, cf2)
print(f" {cf1:>5.2f} {cf2:>5.2f} {combined:>8.4f} {note}")
if __name__ == "__main__":
kb = build_bacteraemia_kb()
demo_cf_arithmetic()
for case in CASES:
run_case(case, kb)