ArticlesProjectsWeeklyCredentialsAbout

The Thinking Machine Chronicles #0055: MYCIN Medical Expert System

A Python implementation of MYCIN's core inference engine: production rules with certainty factors in [-1, 1], backward chaining over a goal tree, and the Shortliffe-Buchanan certainty combination formula. Includes a simplified bacteraemia knowledge base derived from the 1979 MYCIN evaluation domain, demonstrating ranked organism diagnosis and antibiotic recommendations across three clinical case scenarios.

mycinexpert-systemscertainty-factorsbackward-chainingmedical-aipython1972

MYCIN's inference model has two separable parts: the certainty factor arithmetic and the backward-chaining proof procedure. This project implements both, making every inference step explicit and inspectable.

Certainty factors are real numbers in [-1, 1]. A value of 1.0 means the hypothesis is definitely true; -1.0 means it is definitely false; 0.0 means no evidence either way. When two independent rules both support the same hypothesis with certainty factors CF1 and CF2, the Shortliffe-Buchanan formula combines them: both positive gives CF1 + CF2*(1-CF1), both negative gives CF1 + CF2*(1+CF1), and mixed sign gives (CF1+CF2)/(1-min(|CF1|,|CF2|)). This is not Bayesian probability; it is a heuristic that matched how infectious-disease specialists described their reasoning in the 1970s.

Backward chaining starts at the top-level goal (identify organism) and recursively asks: which rules can establish this? For each candidate rule, it evaluates the conjunctive premise by recursively chaining each precondition. A precondition is satisfied by querying either the context model (already-known facts) or by further backward chaining. If no rule can establish a parameter, the system invokes an ask function that would, in a real consultation, prompt the clinician. In the demo, all clinical parameters are pre-loaded.

The knowledge base encodes ten bacteraemia rules covering seven organisms: E. coli, Pseudomonas, Bacteroides, Klebsiella, Staphylococcus, Streptococcus, and Proteus. Three demonstration cases cover a burn patient with gram-negative rods, an immunosuppressed patient with gram-positive cocci, and a case with uncertain gram stain confidence. The output is a ranked list of organisms with certainty scores and the corresponding antibiotic recommendations.

To run: python mycin.py

Source code
"""
mycin.py -- MYCIN-style backward-chaining expert system with certainty factors.

Implements:
  - Production rules with certainty factors (CF in [-1.0, 1.0])
  - Backward chaining goal tree with depth-first resolution
  - Certainty propagation: Shortliffe-Buchanan combination formula
  - Context model with ask-the-user fallback
  - Simplified bacteraemia knowledge base from the 1979 MYCIN evaluation domain

Run:  python mycin.py

Reference:
  Shortliffe, E.H. (1976). Computer-Based Medical Consultations: MYCIN.
  American Elsevier.
  Buchanan, B.G. and Shortliffe, E.H. (eds) (1984). Rule-Based Expert Systems.
  Addison-Wesley.
"""

from __future__ import annotations

from dataclasses import dataclass
from typing import Callable

# ---------------------------------------------------------------------------
# Certainty factor arithmetic (Shortliffe & Buchanan 1975)
# ---------------------------------------------------------------------------


def combine_cf(cf1: float, cf2: float) -> float:
    """
    Combine two certainty factors bearing on the same hypothesis.

    Rules:
      Both positive:  cf1 + cf2 * (1 - cf1)
      Both negative:  cf1 + cf2 * (1 + cf1)
      Mixed sign:     (cf1 + cf2) / (1 - min(|cf1|, |cf2|))
    """
    if cf1 >= 0 and cf2 >= 0:
        return cf1 + cf2 * (1.0 - cf1)
    if cf1 <= 0 and cf2 <= 0:
        return cf1 + cf2 * (1.0 + cf1)
    # mixed sign
    denom = 1.0 - min(abs(cf1), abs(cf2))
    if denom == 0.0:
        return 0.0
    return (cf1 + cf2) / denom


def premise_cf(conjuncts: list[float]) -> float:
    """
    CF of a conjunctive premise = minimum CF of its conjuncts.
    (Weakest-link principle.)
    """
    return min(conjuncts) if conjuncts else 1.0


def propagate_cf(rule_cf: float, prem_cf: float) -> float:
    """
    CF of a conclusion = rule CF * max(0, premise CF).
    Negative premise certainty blocks the rule.
    """
    return rule_cf * max(0.0, prem_cf)


# ---------------------------------------------------------------------------
# Rule and knowledge base
# ---------------------------------------------------------------------------


@dataclass
class Rule:
    """
    A production rule:
        IF premise (list of (parameter, value) pairs)
        THEN (conclusion_parameter, conclusion_value) with certainty cf
    """

    rule_id: str
    premise: list[tuple[str, str]]  # [(param, value), ...]
    conclusion: tuple[str, str]  # (param, value)
    cf: float  # certainty factor of this rule


class KnowledgeBase:
    def __init__(self) -> None:
        self._rules: list[Rule] = []

    def add(self, rule: Rule) -> None:
        self._rules.append(rule)

    def rules_concluding(self, param: str, value: str) -> list[Rule]:
        """Return all rules whose conclusion is (param, value)."""
        return [r for r in self._rules if r.conclusion == (param, value)]

    def all_values(self, param: str) -> list[str]:
        """Return all values that any rule can conclude for param."""
        return list({r.conclusion[1] for r in self._rules if r.conclusion[0] == param})


# ---------------------------------------------------------------------------
# Context model
# ---------------------------------------------------------------------------


class Context:
    """
    Stores (parameter, value) -> CF facts.
    Supports 'known' facts (entered directly) and derived facts (computed).
    """

    def __init__(self, known: dict[tuple[str, str], float] | None = None) -> None:
        self._facts: dict[tuple[str, str], float] = dict(known or {})
        self._asked: set[str] = set()

    def get_cf(self, param: str, value: str) -> float | None:
        return self._facts.get((param, value))

    def set_cf(self, param: str, value: str, cf: float) -> None:
        self._facts[(param, value)] = cf

    def get_all(self, param: str) -> list[tuple[str, float]]:
        return [(v, cf) for (p, v), cf in self._facts.items() if p == param]

    def already_asked(self, param: str) -> bool:
        return param in self._asked

    def mark_asked(self, param: str) -> None:
        self._asked.add(param)


# ---------------------------------------------------------------------------
# Backward chaining inference engine
# ---------------------------------------------------------------------------

AskFn = Callable[[str, list[str]], dict[str, float]]


def backward_chain(
    param: str,
    value: str,
    kb: KnowledgeBase,
    ctx: Context,
    ask: AskFn,
    depth: int = 0,
) -> float:
    """
    Derive the certainty factor for (param, value) by backward chaining.

    1. If already in context, return it.
    2. Try all rules that conclude (param, value); combine their CFs.
    3. If no rule fires and param has not been asked, invoke ask() for
       all known possible values of the parameter at once.
    """
    cached = ctx.get_cf(param, value)
    if cached is not None:
        return cached

    # Try all applicable rules
    cf_combined = 0.0
    any_fired = False

    for rule in kb.rules_concluding(param, value):
        # Evaluate each conjunct in the premise
        conjunct_cfs: list[float] = []
        for prem_param, prem_value in rule.premise:
            prem_cf_val = backward_chain(
                prem_param, prem_value, kb, ctx, ask, depth + 1
            )
            conjunct_cfs.append(prem_cf_val)

        prem_total = premise_cf(conjunct_cfs)
        rule_contribution = propagate_cf(rule.cf, prem_total)

        if rule_contribution > 0.0 or rule_contribution < 0.0:
            cf_combined = combine_cf(cf_combined, rule_contribution)
            any_fired = True

    if any_fired:
        ctx.set_cf(param, value, cf_combined)
        return cf_combined

    # No rule could derive it: ask the user (once per parameter)
    if not ctx.already_asked(param):
        ctx.mark_asked(param)
        possible_values = kb.all_values(param)
        if possible_values:
            answers = ask(param, possible_values)
            for v, cf in answers.items():
                ctx.set_cf(param, v, cf)
        # After populating from user, try again
        cached = ctx.get_cf(param, value)
        if cached is not None:
            return cached

    # Unknown: return 0 (no evidence either way)
    ctx.set_cf(param, value, 0.0)
    return 0.0


# ---------------------------------------------------------------------------
# Bacteraemia knowledge base
# (Simplified subset of MYCIN's ~600 rules; covers the most common organisms)
# ---------------------------------------------------------------------------


def build_bacteraemia_kb() -> KnowledgeBase:
    """
    Subset of MYCIN bacteraemia rules.
    Parameters used:
      gram-stain        : positive | negative
      morphology        : rod | coccus
      portal-of-entry   : gi-tract | urinary-tract | respiratory | skin | unknown
      immunosuppressed  : yes | no
      burn-patient      : yes | no
      organism          : e-coli | pseudomonas | bacteroides | klebsiella |
                          staphylococcus | streptococcus | proteus
    """
    kb = KnowledgeBase()

    # E. coli: gram-negative rod, GI or urinary portal
    kb.add(
        Rule(
            "R01",
            premise=[
                ("gram-stain", "negative"),
                ("morphology", "rod"),
                ("portal-of-entry", "gi-tract"),
            ],
            conclusion=("organism", "e-coli"),
            cf=0.8,
        )
    )

    kb.add(
        Rule(
            "R02",
            premise=[
                ("gram-stain", "negative"),
                ("morphology", "rod"),
                ("portal-of-entry", "urinary-tract"),
            ],
            conclusion=("organism", "e-coli"),
            cf=0.7,
        )
    )

    # Pseudomonas: gram-negative rod, burn patient or immunosuppressed
    kb.add(
        Rule(
            "R03",
            premise=[
                ("gram-stain", "negative"),
                ("morphology", "rod"),
                ("burn-patient", "yes"),
            ],
            conclusion=("organism", "pseudomonas"),
            cf=0.8,
        )
    )

    kb.add(
        Rule(
            "R04",
            premise=[
                ("gram-stain", "negative"),
                ("morphology", "rod"),
                ("immunosuppressed", "yes"),
            ],
            conclusion=("organism", "pseudomonas"),
            cf=0.6,
        )
    )

    # Bacteroides: gram-negative rod, GI portal
    kb.add(
        Rule(
            "R05",
            premise=[
                ("gram-stain", "negative"),
                ("morphology", "rod"),
                ("portal-of-entry", "gi-tract"),
            ],
            conclusion=("organism", "bacteroides"),
            cf=0.5,
        )
    )

    # Klebsiella: gram-negative rod, respiratory portal
    kb.add(
        Rule(
            "R06",
            premise=[
                ("gram-stain", "negative"),
                ("morphology", "rod"),
                ("portal-of-entry", "respiratory"),
            ],
            conclusion=("organism", "klebsiella"),
            cf=0.7,
        )
    )

    # Staphylococcus: gram-positive coccus, skin portal
    kb.add(
        Rule(
            "R07",
            premise=[
                ("gram-stain", "positive"),
                ("morphology", "coccus"),
                ("portal-of-entry", "skin"),
            ],
            conclusion=("organism", "staphylococcus"),
            cf=0.8,
        )
    )

    kb.add(
        Rule(
            "R08",
            premise=[
                ("gram-stain", "positive"),
                ("morphology", "coccus"),
                ("immunosuppressed", "yes"),
            ],
            conclusion=("organism", "staphylococcus"),
            cf=0.5,
        )
    )

    # Streptococcus: gram-positive coccus, respiratory portal
    kb.add(
        Rule(
            "R09",
            premise=[
                ("gram-stain", "positive"),
                ("morphology", "coccus"),
                ("portal-of-entry", "respiratory"),
            ],
            conclusion=("organism", "streptococcus"),
            cf=0.7,
        )
    )

    # Proteus: gram-negative rod, urinary portal
    kb.add(
        Rule(
            "R10",
            premise=[
                ("gram-stain", "negative"),
                ("morphology", "rod"),
                ("portal-of-entry", "urinary-tract"),
            ],
            conclusion=("organism", "proteus"),
            cf=0.5,
        )
    )

    return kb


# ---------------------------------------------------------------------------
# Demo cases
# ---------------------------------------------------------------------------

CASES = [
    {
        "name": "Case A: burn patient, gram-negative rod, GI portal",
        "known": {
            ("gram-stain", "negative"): 1.0,
            ("morphology", "rod"): 1.0,
            ("portal-of-entry", "gi-tract"): 1.0,
            ("burn-patient", "yes"): 1.0,
            ("immunosuppressed", "no"): 1.0,
        },
    },
    {
        "name": "Case B: immunosuppressed, gram-positive coccus, respiratory portal",
        "known": {
            ("gram-stain", "positive"): 1.0,
            ("morphology", "coccus"): 1.0,
            ("portal-of-entry", "respiratory"): 1.0,
            ("immunosuppressed", "yes"): 1.0,
            ("burn-patient", "no"): 1.0,
        },
    },
    {
        "name": "Case C: uncertain gram stain (CF 0.6 negative), urinary portal",
        "known": {
            ("gram-stain", "negative"): 0.6,
            ("morphology", "rod"): 0.9,
            ("portal-of-entry", "urinary-tract"): 1.0,
            ("burn-patient", "no"): 1.0,
            ("immunosuppressed", "no"): 1.0,
        },
    },
]

ORGANISMS = [
    "e-coli",
    "pseudomonas",
    "bacteroides",
    "klebsiella",
    "staphylococcus",
    "streptococcus",
    "proteus",
]

ANTIBIOTIC_MAP = {
    "e-coli": ["ampicillin", "gentamicin"],
    "pseudomonas": ["ticarcillin", "tobramycin"],
    "bacteroides": ["metronidazole", "clindamycin"],
    "klebsiella": ["cefazolin", "gentamicin"],
    "staphylococcus": ["nafcillin", "vancomycin"],
    "streptococcus": ["penicillin-G", "ampicillin"],
    "proteus": ["ampicillin", "cephalothin"],
}


def silent_ask(param: str, values: list[str]) -> dict[str, float]:
    """No-op ask function for demo (all answers pre-loaded in context)."""
    return {}


def run_case(case: dict, kb: KnowledgeBase) -> None:
    print(f"\n{'=' * 60}")
    print(case["name"])
    print(f"{'=' * 60}")

    ctx = Context(known=case["known"])
    results: list[tuple[str, float]] = []

    for organism in ORGANISMS:
        cf = backward_chain("organism", organism, kb, ctx, silent_ask)
        results.append((organism, cf))

    results.sort(key=lambda x: -x[1])

    print("\nOrganism rankings:")
    print(f"  {'Organism':<20} {'CF':>6}  Antibiotics")
    print(f"  {'-' * 20}  {'-' * 6}  {'-' * 30}")
    for organism, cf in results:
        if cf > 0.0:
            drugs = ", ".join(ANTIBIOTIC_MAP.get(organism, ["unknown"]))
            bar = "#" * int(cf * 20)
            print(f"  {organism:<20}  {cf:>5.2f}  [{bar:<20}]  {drugs}")

    if results and results[0][1] > 0.0:
        best = results[0][0]
        print(f"\nRecommended therapy for most likely organism ({best}):")
        for drug in ANTIBIOTIC_MAP.get(best, []):
            print(f"  -- {drug}")


def demo_cf_arithmetic() -> None:
    print("=" * 60)
    print("Certainty factor arithmetic (Shortliffe-Buchanan 1975)")
    print("=" * 60)

    examples = [
        (0.8, 0.6, "two strong positive"),
        (0.8, -0.4, "positive + weak negative"),
        (-0.7, -0.5, "two negatives"),
        (0.5, 0.5, "two equal positives"),
        (1.0, 0.0, "certain + no evidence"),
    ]
    print(f"\n  {'CF1':>5} {'CF2':>5}  {'Combined':>8}  Note")
    print(f"  {'---':>5} {'---':>5}  {'--------':>8}")
    for cf1, cf2, note in examples:
        combined = combine_cf(cf1, cf2)
        print(f"  {cf1:>5.2f} {cf2:>5.2f}  {combined:>8.4f}  {note}")


if __name__ == "__main__":
    kb = build_bacteraemia_kb()
    demo_cf_arithmetic()
    for case in CASES:
        run_case(case, kb)