#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import time import json import imaplib import email import traceback import html as html_lib from email.utils import parseaddr from email.mime.text import MIMEText from email.header import decode_header, make_header from datetime import datetime from pathlib import Path import sys # ========================== # CONFIG da config.json (accanto allo script/EXE) # ========================== def app_dir() -> Path: # se "frozen" (PyInstaller), usa la cartella dell'eseguibile return Path(sys.executable).parent if getattr(sys, "frozen", False) else Path(__file__).resolve().parent CONFIG_PATH = app_dir() / "config.json" def load_config(): try: with open(CONFIG_PATH, "r", encoding="utf-8") as f: cfg = json.load(f) print(f"[CONFIG] Caricato: {CONFIG_PATH}") return cfg except Exception as e: print(f"[ERRORE] Impossibile leggere {CONFIG_PATH}: {e}") return {} CFG = load_config() # valori letti dal config (con default sensati) IMAP_SERVER = CFG.get("imap_server", "mail.yogasoul.it") EMAIL_ADDRESS = CFG.get("email_address", "") EMAIL_PASSWORD = CFG.get("email_password", "") OPENAI_API_KEY = CFG.get("openai_api_key", "") OPENAI_MODEL = CFG.get("openai_model", "gpt-3.5-turbo") PREFERRED_DRAFT_FOLDER = CFG.get("preferred_draft_folder", "BozzaRisposte") MARK_AS_SEEN = bool(CFG.get("mark_as_seen", True)) THROTTLE_SECONDS = float(CFG.get("throttle_seconds", 0) or 0) MAX_TO_PROCESS = int(CFG.get("max_to_process", 5) or 5) # Percorsi esterni (accanto all'eseguibile) KB_PATH = str(app_dir() / CFG.get("kb_path", "yogasoul_knowledge_base.json")) PROMPT_PATH = str(app_dir() / CFG.get("prompt_path", "prompt_template.txt")) # ========================== # UTILS # ========================== def _decode_mime_words(s): if not s: return "" try: return str(make_header(decode_header(s))) except Exception: parts = decode_header(s) out = [] for text, enc in parts: if isinstance(text, bytes): out.append(text.decode(enc or "utf-8", errors="ignore")) else: out.append(text or "") return "".join(out) def _html_to_text(html): try: text = re.sub(r"(?is)<(script|style).*?>.*?", "", html or "") text = re.sub(r"(?s)", "\n", text) text = re.sub(r"(?s)", "\n\n", text) text = re.sub(r"(?s)<.*?>", "", text) return text.strip() except Exception: return html or "" def _ensure_html_blocks(s): """Se il modello restituisce testo piatto, convertilo in HTML semplice e leggibile.""" s = (s or "").strip() if " 1: body = "".join( "

{}

".format(html_lib.escape(p).replace("\n", "
")) for p in parts ) else: body = "

{}

".format(html_lib.escape(s).replace("\n", "
")) return "{}".format(body) def _make_quoted_original(body_text): """Crea il blocco citato del messaggio originale, safe-escaped.""" if not body_text: return "" escaped = html_lib.escape(body_text).replace("\n", "
") return ( "
" "

— Messaggio originale —

" f"
{escaped}
" ) def _inject_before_body_end(html_src: str, addition: str) -> str: """Inserisce 'addition' prima di in modo case-insensitive; se manca , appende.""" m = re.search(r'', html_src, flags=re.I) if not m: return html_src + addition start = m.start() return html_src[:start] + addition + html_src[start:] def load_knowledge_base(path=KB_PATH): if not os.path.isfile(path): print(f"[ATTENZIONE] KB non trovata: {path}. Proseguo senza.") return {} try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"[ERRORE] Lettura KB: {e}. Proseguo senza.") return {} def assert_config(): problems = [] if not isinstance(IMAP_SERVER, str) or not IMAP_SERVER.strip(): problems.append("imap_server mancante in config.json") if not isinstance(EMAIL_ADDRESS, str) or not EMAIL_ADDRESS.strip(): problems.append("email_address mancante in config.json") if not isinstance(EMAIL_PASSWORD, str) or not EMAIL_PASSWORD.strip(): problems.append("email_password mancante in config.json") if not isinstance(OPENAI_API_KEY, str) or not OPENAI_API_KEY.strip(): problems.append("openai_api_key mancante in config.json") if problems: print("[CONFIG] Correggi config.json:") for p in problems: print(" -", p) return False return True # ========================== # Prompt esterno # ========================== def load_prompt(path: str) -> str: try: with open(path, "r", encoding="utf-8") as f: print(f"[PROMPT] Caricato: {path}") return f.read() except Exception as e: print(f"[ATTENZIONE] Prompt non trovato o illeggibile: {path} ({e})") return "" def render_prompt(template: str, **vars_) -> str: # Sostituzione semplice stile {{nome}} out = template for k, v in vars_.items(): out = out.replace(f"{{{{{k}}}}}", str(v)) return out # ========================== # IMAP helpers robusti # ========================== LIST_RE = re.compile(r'^\s*\((?P[^)]*)\)\s+"(?P[^"]+)"\s+(?P.+?)\s*$') def _parse_list_line(raw: bytes): s = raw.decode(errors="ignore") m = LIST_RE.match(s) if not m: parts = s.strip().split() name = parts[-1] if parts else "" name = name.strip('"') return ([], ".", name) flags_str = m.group("flags") or "" delim = m.group("delim") or "." name = m.group("name").strip() if name.startswith('"') and name.endswith('"'): name = name[1:-1] flags = [f.strip() for f in flags_str.split() if f.strip()] return (flags, delim, name) def _list_mailboxes(mail): try: typ, boxes = mail.list() if typ != "OK": return [] return [_parse_list_line(raw) for raw in (boxes or [])] except Exception: return [] def _find_drafts_mailbox(mail): boxes = _list_mailboxes(mail) if not boxes: return (None, ".") delim_guess = boxes[0][1] if boxes[0][1] else "." # special-use \Drafts for flags, delim, name in boxes: if any("\\Drafts" in f or "\\drafts" in f for f in flags): return (name, delim or delim_guess) # nomi comuni wanted = ("drafts", "bozze", "bozza", "draft") for flags, delim, name in boxes: low = name.lower() if any(w in low.split(delim or ".")[-1] for w in wanted): return (name, delim or delim_guess) return (None, delim_guess) def _select_or_create(mail, name, delim): typ, _ = mail.select(name, readonly=False) if typ == "OK": return name candidate = f"INBOX{delim}{name}" typ, _ = mail.select(candidate, readonly=False) if typ == "OK": return candidate mail.create(candidate) typ, _ = mail.select(candidate, readonly=False) if typ == "OK": return candidate raise RuntimeError(f"Impossibile selezionare o creare la casella '{name}' (delim='{delim}')") # ========================== # INTENT DETECTION (semplice) # ========================== SCHEDULE_WORDS = [ "orario", "orari", "quando", "che ore", "a che ora", "giorni", "mercoledì", "martedì", "lezione di", "inizia", "finisce", "durata" ] BOOKING_WORDS = [ "prenota", "prenotazione", "prenotare", "iscriversi", "iscrizione", "link", "come fare", "dove prenoto" ] INFO_WORDS = [ "informazioni", "info", "cos'è", "che cos", "benefici", "a chi è adatto", "livello", "programma", "insegnante", "maestro", "costi", "prezzo", "quanto costa", "materiale", "cosa portare" ] def classify_intent(subject, body): s = f"{subject or ''} {body or ''}".lower() has_info = any(w in s for w in INFO_WORDS) has_sched = any(w in s for w in SCHEDULE_WORDS) has_book = any(w in s for w in BOOKING_WORDS) if has_info: return "extended" if (has_sched or has_book) and not has_info: return "brief" return "brief" # ========================== # OPENAI: GENERAZIONE BOZZA # ========================== def generate_response(email_info, kb): try: from openai import OpenAI client = OpenAI(api_key=OPENAI_API_KEY) nickname = "amico" if email_info.get("sender_name"): nickname = email_info["sender_name"].split()[0] elif email_info.get("sender_email"): nickname = email_info["sender_email"].split("@")[0].split(".")[0] or "amico" kb_json = json.dumps(kb, ensure_ascii=False, indent=2) intent = classify_intent(email_info.get("subject",""), email_info.get("body_text","")) if intent == "brief": policy = ("Se il messaggio chiede solo orari e/o come prenotare, rispondi BREVE: " "indica orari precisi e inserisci SOLO il link prenotazione. Non aggiungere benefici o descrizioni.") else: policy = ("Se il messaggio chiede informazioni sul corso, rispondi ESTESO: " "includi orari, benefici principali, a chi è adatto, eventuale insegnante, e il link prenotazione.") email_text = ( f"Soggetto: {email_info.get('subject','')}\n" f"Mittente: {email_info.get('sender_email','')}\n" f"Corpo: {email_info.get('body_text','')}" ) # Prompt da file esterno (con fallback interno) template = load_prompt(PROMPT_PATH) if not template: template = ( "Sei Aurora, fondatrice di YogaSoul (www.yogasoul.it), stile zen e informale, diretto.\n" "Rispondi in italiano, amichevole e rilassato, con emoticon yoga (🌿, 🧘‍♀️, 😊) senza esagerare.\n" "Saluta con \"Ciao {{nickname}}, bello sentirti!\" e firma con \"Namaste, Aurora - YogaSoul\".\n" "Usa la knowledge base per corsi, orari, prezzi, benefici, insegnanti.\n" "Includi sempre il link_prenotazione specifico come Iscriviti qui per prenotazioni,\n" "e il calendario qui.\n" "Se non sai, scrivi: \"Contattami per dettagli! 🧘‍♀️\".\n\n" "{{policy}}\n\n" "Knowledge Base:\n" "{{kb_json}}\n\n" "Email ricevuta:\n" "{{email_text}}\n\n" "Scrivi la risposta in HTML pulito (usa

,

    /
  • se utile; niente CSS superfluo).\n" ) prompt = render_prompt( template, nickname=nickname, policy=policy, kb_json=kb_json, email_text=email_text, ) resp = client.chat.completions.create( model=OPENAI_MODEL, messages=[{"role": "user", "content": prompt}], max_tokens=700, temperature=0.7, ) draft_raw = (resp.choices[0].message.content or "").strip() return _ensure_html_blocks(draft_raw) except Exception as e: print(f"[ERRORE] OpenAI: {e}") traceback.print_exc() return _ensure_html_blocks( "

    Errore con l'AI, contattami per dettagli! 🧘‍♀️
    " "Namaste, Aurora - YogaSoul

    " ) # ========================== # PIPELINE: processa TUTTE le UNSEEN (max N) # ========================== def fetch_all_unseen(mail, limit=None): """Ritorna lista di dict (uno per messaggio) + seq id, senza chiudere la connessione IMAP.""" results = [] typ, _ = mail.select("INBOX") if typ != "OK": raise RuntimeError("Impossibile selezionare INBOX") typ, data = mail.search(None, "UNSEEN") if typ != "OK": raise RuntimeError("Search UNSEEN fallita") ids = data[0].split() # solitamente in ordine crescente (più vecchie -> più nuove) if not ids: print("Nessuna email non letta.") return results if limit and limit > 0: ids = ids[-limit:] # prendi SOLO le ultime N non lette for seq_id in ids: typ, msg_data = mail.fetch(seq_id, "(RFC822)") if typ != "OK" or not msg_data or not msg_data[0]: print(f"[WARN] Fetch fallito per id {seq_id}") continue msg = email.message_from_bytes(msg_data[0][1]) name, sender_addr = parseaddr(msg.get("From", "") or "") subject = _decode_mime_words(msg.get("Subject", "") or "") message_id = msg.get("Message-ID", "") or "" references = msg.get("References", "") or "" body_text = "" if msg.is_multipart(): for part in msg.walk(): ctype = part.get_content_type() disp = (part.get("Content-Disposition") or "").lower() if ctype == "text/plain" and "attachment" not in disp: payload = part.get_payload(decode=True) if payload is not None: body_text = payload.decode(errors="ignore") break if not body_text: for part in msg.walk(): if part.get_content_type() == "text/html": payload = part.get_payload(decode=True) if payload is not None: body_text = _html_to_text(payload.decode(errors="ignore")) break else: payload = msg.get_payload(decode=True) if payload is not None: try: body_text = payload.decode(errors="ignore") except Exception: body_text = _html_to_text(payload.decode(errors="ignore")) results.append({ "seq_id": seq_id, "sender_email": sender_addr, "sender_name": name, "subject": subject, "message_id": message_id, "references": references, "body_text": body_text or "", }) return results def prepare_reply_mime(email_info, response_html): """Costruisce il MIME HTML della risposta con quote e thread headers.""" reply_body = _inject_before_body_end( response_html, _make_quoted_original(email_info.get("body_text","")) ) msg = MIMEText(reply_body, "html", "utf-8") subj = email_info.get("subject") or "[Risposta automatica] - YogaSoul" if not subj.lower().startswith("re:"): subj = f"Re: {subj}" msg["Subject"] = subj msg["From"] = EMAIL_ADDRESS if email_info.get("sender_email"): msg["To"] = email_info["sender_email"] if email_info.get("message_id"): msg["In-Reply-To"] = email_info["message_id"] prev_refs = email_info.get("references", "") if prev_refs and email_info.get("message_id"): msg["References"] = (prev_refs + " " + email_info["message_id"]).strip() elif email_info.get("message_id"): msg["References"] = email_info["message_id"] return msg def ensure_drafts_folder(mail): """Trova o crea la cartella bozze; ritorna (folder_name, delim).""" drafts_name, delim = _find_drafts_mailbox(mail) if drafts_name: return drafts_name, delim folder = f"INBOX{delim}{PREFERRED_DRAFT_FOLDER}" typ, _ = mail.select(folder, readonly=False) if typ == "OK": return folder, delim mail.create(folder) typ, _ = mail.select(folder, readonly=False) if typ == "OK": return folder, delim candidate = f"INBOX{delim}Drafts" mail.create(candidate) typ, _ = mail.select(candidate, readonly=False) if typ == "OK": return candidate, delim raise RuntimeError("Impossibile trovare/creare una cartella bozze.") def append_draft(mail, folder, msg): """Esegue APPEND con flag \Draft nella folder indicata.""" flags = r"(\Draft)" when = imaplib.Time2Internaldate(time.time()) typ, resp = mail.append(folder, flags, when, msg.as_bytes()) print("[APPEND]", folder, "=>", typ, resp) return typ == "OK" # ========================== # MAIN # ========================== def main(): # Controllo config di base problems = [] if not isinstance(IMAP_SERVER, str) or not IMAP_SERVER.strip(): problems.append("imap_server mancante in config.json") if not isinstance(EMAIL_ADDRESS, str) or not EMAIL_ADDRESS.strip(): problems.append("email_address mancante in config.json") if not isinstance(EMAIL_PASSWORD, str) or not EMAIL_PASSWORD.strip(): problems.append("email_password mancante in config.json") if not isinstance(OPENAI_API_KEY, str) or not OPENAI_API_KEY.strip(): problems.append("openai_api_key mancante in config.json") if problems: print("[CONFIG] Correggi config.json:") for p in problems: print(" -", p) return # Login IMAP try: mail = imaplib.IMAP4_SSL(IMAP_SERVER) mail.login(EMAIL_ADDRESS, EMAIL_PASSWORD) except Exception as e: print(f"[ERRORE] Login IMAP fallito: {e}") traceback.print_exc() return try: drafts_folder, delim = ensure_drafts_folder(mail) print(f"[DEBUG] drafts_folder='{drafts_folder}' delim='{delim}'") emails = fetch_all_unseen(mail, limit=MAX_TO_PROCESS) if not emails: print("Nessuna nuova email da processare.") mail.logout() return kb = load_knowledge_base(KB_PATH) processed = 0 for info in emails: draft_html = generate_response(info, kb) mime_msg = prepare_reply_mime(info, draft_html) ok = append_draft(mail, drafts_folder, mime_msg) if ok and MARK_AS_SEEN: try: mail.store(info["seq_id"], "+FLAGS", "\\Seen") except Exception as e: print(f"[WARN] Non sono riuscito a marcare come letta id {info['seq_id']}: {e}") processed += 1 if THROTTLE_SECONDS: time.sleep(THROTTLE_SECONDS) print(f"✅ Elaborate {processed} email (max {MAX_TO_PROCESS}). Bozze salvate in '{drafts_folder}'.") mail.logout() except Exception as e: print(f"[ERRORE] Pipeline: {e}") traceback.print_exc() try: mail.logout() except Exception: pass if __name__ == "__main__": main()