HomeSample Page

Sample Page Title


def _now_iso() -> str:
   return datetime.utcnow().substitute(microsecond=0).isoformat() + "Z"


def _stable_id(prefix: str, seed: str) -> str:
   h = hashlib.sha256(seed.encode("utf-8")).hexdigest()[:10]
   return f"{prefix}_{h}"


class MockEHR:
   def __init__(self):
       self.orders_queue: Checklist[SurgeryOrder] = []
       self.patient_docs: Dict[str, List[ClinicalDocument]] = {}


   def seed_data(self, n_orders: int = 5):
       random.seed(7)


       def make_patient(i: int) -> Affected person:
           pid = f"PT{i:04d}"
           plan = random.selection(listing(InsurancePlan))
           return Affected person(
               patient_id=pid,
               title=f"Affected person {i}",
               dob="1980-01-01",
               member_id=f"M{i:08d}",
               plan=plan,
           )


       def docs_for_order(affected person: Affected person, surgical procedure: SurgeryType) -> Checklist[ClinicalDocument]:
           base = [
               ClinicalDocument(
                   doc_id=_stable_id("DOC", patient.patient_id + "H&P"),
                   doc_type=DocType.H_AND_P,
                   created_at=_now_iso(),
                   content="H&P: Relevant history, exam findings, and surgical indication.",
                   source="EHR",
               ),
               ClinicalDocument(
                   doc_id=_stable_id("DOC", patient.patient_id + "NOTE"),
                   doc_type=DocType.CLINICAL_NOTE,
                   created_at=_now_iso(),
                   content="Clinical note: Symptoms, conservative management attempted, clinician assessment.",
                   source="EHR",
               ),
               ClinicalDocument(
                   doc_id=_stable_id("DOC", patient.patient_id + "MEDS"),
                   doc_type=DocType.MED_LIST,
                   created_at=_now_iso(),
                   content="Medication list: Current meds, allergies, contraindications.",
                   source="EHR",
               ),
           ]


           perhaps = []
           if surgical procedure in [SurgeryType.KNEE_ARTHROPLASTY, SurgeryType.SPINE_FUSION, SurgeryType.BARIATRIC]:
               perhaps.append(
                   ClinicalDocument(
                       doc_id=_stable_id("DOC", affected person.patient_id + "LABS"),
                       doc_type=DocType.LABS,
                       created_at=_now_iso(),
                       content material="Labs: CBC/CMP inside final 30 days.",
                       supply="LabSystem",
                   )
               )


           if surgical procedure in [SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY]:
               perhaps.append(
                   ClinicalDocument(
                       doc_id=_stable_id("DOC", affected person.patient_id + "IMG"),
                       doc_type=DocType.IMAGING,
                       created_at=_now_iso(),
                       content material="Imaging: MRI/X-ray report supporting analysis and severity.",
                       supply="Radiology",
                   )
               )


           last = base + [d for d in maybe if random.random() > 0.35]


           if random.random() > 0.6:
               last.append(
                   ClinicalDocument(
                       doc_id=_stable_id("DOC", affected person.patient_id + "PRIOR_TX"),
                       doc_type=DocType.PRIOR_TX,
                       created_at=_now_iso(),
                       content material="Prior remedies: PT, meds, injections tried over 6+ weeks.",
                       supply="EHR",
                   )
               )


           if random.random() > 0.5:
               last.append(
                   ClinicalDocument(
                       doc_id=_stable_id("DOC", affected person.patient_id + "CONSENT"),
                       doc_type=DocType.CONSENT,
                       created_at=_now_iso(),
                       content material="Consent: Signed process consent and danger disclosure.",
                       supply="EHR",
                   )
               )


           return last


       for i in vary(1, n_orders + 1):
           affected person = make_patient(i)
           surgical procedure = random.selection(listing(SurgeryType))
           order = SurgeryOrder(
               order_id=_stable_id("ORD", affected person.patient_id + surgical procedure.worth),
               affected person=affected person,
               surgery_type=surgical procedure,
               scheduled_date=(datetime.utcnow().date() + timedelta(days=random.randint(3, 21))).isoformat(),
               ordering_provider_npi=str(random.randint(1000000000, 1999999999)),
               diagnosis_codes=["M17.11", "M54.5"] if surgical procedure != SurgeryType.CATARACT else ["H25.9"],
               created_at=_now_iso(),
           )
           self.orders_queue.append(order)
           self.patient_docs[patient.patient_id] = docs_for_order(affected person, surgical procedure)


   def poll_new_surgery_orders(self, max_n: int = 1) -> Checklist[SurgeryOrder]:
       pulled = self.orders_queue[:max_n]
       self.orders_queue = self.orders_queue[max_n:]
       return pulled


   def get_patient_documents(self, patient_id: str) -> Checklist[ClinicalDocument]:
       return listing(self.patient_docs.get(patient_id, []))


   def fetch_additional_docs(self, patient_id: str, wanted: Checklist[DocType]) -> Checklist[ClinicalDocument]:
       generated = []
       for dt in wanted:
           generated.append(
               ClinicalDocument(
                   doc_id=_stable_id("DOC", patient_id + dt.worth + str(time.time())),
                   doc_type=dt,
                   created_at=_now_iso(),
                   content material=f"Auto-collected doc for {dt.worth}: extracted and formatted per payer coverage.",
                   supply="AutoCollector",
               )
           )
       self.patient_docs.setdefault(patient_id, []).lengthen(generated)
       return generated


class MockPayerPortal:
   def __init__(self):
       self.db: Dict[str, Dict[str, Any]] = {}
       random.seed(11)


   def required_docs_policy(self, plan: InsurancePlan, surgical procedure: SurgeryType) -> Checklist[DocType]:
       base = [DocType.H_AND_P, DocType.CLINICAL_NOTE, DocType.MED_LIST]
       if surgical procedure in [SurgeryType.SPINE_FUSION, SurgeryType.KNEE_ARTHROPLASTY]:
           base += [DocType.IMAGING, DocType.LABS, DocType.PRIOR_TX]
       if surgical procedure == SurgeryType.BARIATRIC:
           base += [DocType.LABS, DocType.PRIOR_TX]
       if plan in [InsurancePlan.PAYER_BETA, InsurancePlan.PAYER_GAMMA]:
           base += [DocType.CONSENT]
       return sorted(listing(set(base)), key=lambda x: x.worth)


   def submit(self, pa: PriorAuthRequest) -> PayerResponse:
       payer_ref = _stable_id("PAYREF", pa.request_id + _now_iso())
       docs_present = {d.doc_type for d in pa.docs_attached}
       required = self.required_docs_policy(pa.order.affected person.plan, pa.order.surgery_type)
       lacking = [d for d in required if d not in docs_present]


       self.db[payer_ref] = {
           "standing": AuthStatus.SUBMITTED,
           "order_id": pa.order.order_id,
           "plan": pa.order.affected person.plan,
           "surgical procedure": pa.order.surgery_type,
           "lacking": lacking,
           "polls": 0,
           "submitted_at": _now_iso(),
           "denial_reason": None,
       }


       msg = "Submission acquired. Case queued for assessment."
       if lacking:
           msg += " Preliminary validation signifies incomplete documentation."
       return PayerResponse(standing=AuthStatus.SUBMITTED, payer_ref=payer_ref, message=msg)


   def check_status(self, payer_ref: str) -> PayerResponse:
       if payer_ref not in self.db:
           return PayerResponse(
               standing=AuthStatus.DENIED,
               payer_ref=payer_ref,
               message="Case not discovered (doable payer system error).",
               denial_reason=DenialReason.OTHER,
               confidence=0.4,
           )


       case = self.db[payer_ref]
       case["polls"] += 1


       if case["status"] == AuthStatus.SUBMITTED and case["polls"] >= 1:
           case["status"] = AuthStatus.IN_REVIEW


       if case["status"] == AuthStatus.IN_REVIEW and case["polls"] >= 3:
           if case["missing"]:
               case["status"] = AuthStatus.DENIED
               case["denial_reason"] = DenialReason.MISSING_DOCS
           else:
               roll = random.random()
               if roll < 0.10:
                   case["status"] = AuthStatus.DENIED
                   case["denial_reason"] = DenialReason.CODING_ISSUE
               elif roll < 0.18:
                   case["status"] = AuthStatus.DENIED
                   case["denial_reason"] = DenialReason.MEDICAL_NECESSITY
               else:
                   case["status"] = AuthStatus.APPROVED


       if case["status"] == AuthStatus.DENIED:
           dr = case["denial_reason"] or DenialReason.OTHER
           lacking = case["missing"] if dr == DenialReason.MISSING_DOCS else []
           conf = 0.9 if dr != DenialReason.OTHER else 0.55
           return PayerResponse(
               standing=AuthStatus.DENIED,
               payer_ref=payer_ref,
               message=f"Denied. Motive={dr.worth}.",
               denial_reason=dr,
               missing_docs=lacking,
               confidence=conf,
           )


       if case["status"] == AuthStatus.APPROVED:
           return PayerResponse(
               standing=AuthStatus.APPROVED,
               payer_ref=payer_ref,
               message="Authorised. Authorization issued.",
               confidence=0.95,
           )


       return PayerResponse(
           standing=case["status"],
           payer_ref=payer_ref,
           message=f"Standing={case['status'].worth}. Polls={case['polls']}.",
           confidence=0.9,
       )


   def file_appeal(self, payer_ref: str, appeal_text: str, attached_docs: Checklist[ClinicalDocument]) -> PayerResponse:
       if payer_ref not in self.db:
           return PayerResponse(
               standing=AuthStatus.DENIED,
               payer_ref=payer_ref,
               message="Attraction failed: case not discovered.",
               denial_reason=DenialReason.OTHER,
               confidence=0.4,
           )


       case = self.db[payer_ref]
       docs_present = {d.doc_type for d in attached_docs}
       still_missing = [d for d in case["missing"] if d not in docs_present]
       case["missing"] = still_missing
       case["status"] = AuthStatus.APPEALED
       case["polls"] = 0


       msg = "Attraction submitted and queued for assessment."
       if still_missing:
           msg += f" Warning: nonetheless lacking {', '.be a part of([d.value for d in still_missing])}."
       return PayerResponse(standing=AuthStatus.APPEALED, payer_ref=payer_ref, message=msg, confidence=0.9)

Related Articles

LEAVE A REPLY

Please enter your comment!
Please enter your name here

Latest Articles