#!/usr/bin/env python3 # -*- coding: utf-8 -*- import os import re import time import json import imaplib import email import traceback import html as html_lib from email.utils import parseaddr from email.mime.text import MIMEText from email.header import decode_header, make_header from datetime import datetime # ========================== # CONFIG (compila qui) # ========================== IMAP_SERVER = "mail.yogasoul.it" EMAIL_ADDRESS = "info@yogasoul.it" EMAIL_PASSWORD = "!Testolina88" # <— METTI LA PASSWORD QUI OPENAI_API_KEY = "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" # <— METTI LA OPENAI KEY QUI KB_PATH = "yogasoul_knowledge_base.json" OPENAI_MODEL = "gpt-3.5-turbo" PREFERRED_DRAFT_FOLDER = "BozzaRisposte" # se non trova Drafts, crea INBOX. MARK_AS_SEEN = True # False per lasciare le email non lette THROTTLE_SECONDS = 0 # ritardo tra email (0 = nessuno) MAX_TO_PROCESS = 5 # ⬅️ NUOVO: processa al massimo N email UNSEEN (ultime) # ========================== # UTILS # ========================== def _decode_mime_words(s): if not s: return "" try: return str(make_header(decode_header(s))) except Exception: parts = decode_header(s) out = [] for text, enc in parts: if isinstance(text, bytes): out.append(text.decode(enc or "utf-8", errors="ignore")) else: out.append(text or "") return "".join(out) def _html_to_text(html): try: text = re.sub(r"(?is)<(script|style).*?>.*?", "", html or "") text = re.sub(r"(?s)", "\n", text) text = re.sub(r"(?s)", "\n\n", text) text = re.sub(r"(?s)<.*?>", "", text) return text.strip() except Exception: return html or "" def _ensure_html_blocks(s): """Se il modello restituisce testo piatto, convertilo in HTML semplice e leggibile.""" s = (s or "").strip() if " 1: body = "".join( "

{}

".format(html_lib.escape(p).replace("\n", "
")) for p in parts ) else: body = "

{}

".format(html_lib.escape(s).replace("\n", "
")) return "{}".format(body) def _make_quoted_original(body_text): """Crea il blocco citato del messaggio originale, safe-escaped.""" if not body_text: return "" escaped = html_lib.escape(body_text).replace("\n", "
") return ( "
" "

— Messaggio originale —

" f"
{escaped}
" ) def load_knowledge_base(path=KB_PATH): if not os.path.isfile(path): print(f"[ATTENZIONE] KB non trovata: {path}. Proseguo senza.") return {} try: with open(path, "r", encoding="utf-8") as f: return json.load(f) except Exception as e: print(f"[ERRORE] Lettura KB: {e}. Proseguo senza.") return {} def assert_config(): problems = [] if not isinstance(IMAP_SERVER, str) or not IMAP_SERVER.strip(): problems.append("IMAP_SERVER mancante") if not isinstance(EMAIL_ADDRESS, str) or not EMAIL_ADDRESS.strip(): problems.append("EMAIL_ADDRESS mancante") if not isinstance(EMAIL_PASSWORD, str) or not EMAIL_PASSWORD.strip() or EMAIL_PASSWORD == "INSERISCI_LA_PASSWORD": problems.append("EMAIL_PASSWORD non impostata") if not isinstance(OPENAI_API_KEY, str) or not OPENAI_API_KEY.strip() or OPENAI_API_KEY == "INSERISCI_OPENAI_API_KEY": problems.append("OPENAI_API_KEY non impostata") if problems: print("[CONFIG] Correggi questi parametri prima di proseguire:") for p in problems: print(" -", p) return False return True # ========================== # IMAP helpers robusti # ========================== LIST_RE = re.compile(r'^\s*\((?P[^)]*)\)\s+"(?P[^"]+)"\s+(?P.+?)\s*$') def _parse_list_line(raw: bytes): s = raw.decode(errors="ignore") m = LIST_RE.match(s) if not m: parts = s.strip().split() name = parts[-1] if parts else "" name = name.strip('"') return ([], ".", name) flags_str = m.group("flags") or "" delim = m.group("delim") or "." name = m.group("name").strip() if name.startswith('"') and name.endswith('"'): name = name[1:-1] flags = [f.strip() for f in flags_str.split() if f.strip()] return (flags, delim, name) def _list_mailboxes(mail): try: typ, boxes = mail.list() if typ != "OK": return [] return [_parse_list_line(raw) for raw in (boxes or [])] except Exception: return [] def _find_drafts_mailbox(mail): boxes = _list_mailboxes(mail) if not boxes: return (None, ".") delim_guess = boxes[0][1] if boxes[0][1] else "." # special-use \Drafts for flags, delim, name in boxes: if any("\\Drafts" in f or "\\drafts" in f for f in flags): return (name, delim or delim_guess) # nomi comuni wanted = ("drafts", "bozze", "bozza", "draft") for flags, delim, name in boxes: low = name.lower() if any(w in low.split(delim or ".")[-1] for w in wanted): return (name, delim or delim_guess) return (None, delim_guess) def _select_or_create(mail, name, delim): typ, _ = mail.select(name, readonly=False) if typ == "OK": return name candidate = f"INBOX{delim}{name}" typ, _ = mail.select(candidate, readonly=False) if typ == "OK": return candidate mail.create(candidate) typ, _ = mail.select(candidate, readonly=False) if typ == "OK": return candidate raise RuntimeError(f"Impossibile selezionare o creare la casella '{name}' (delim='{delim}')") # ========================== # INTENT DETECTION (semplice) # ========================== SCHEDULE_WORDS = [ "orario", "orari", "quando", "che ore", "a che ora", "giorni", "mercoledì", "martedì", "lezione di", "inizia", "finisce", "durata" ] BOOKING_WORDS = [ "prenota", "prenotazione", "prenotare", "iscriversi", "iscrizione", "link", "come fare", "dove prenoto" ] INFO_WORDS = [ "informazioni", "info", "cos'è", "che cos", "benefici", "a chi è adatto", "livello", "programma", "insegnante", "maestro", "costi", "prezzo", "quanto costa", "materiale", "cosa portare" ] def classify_intent(subject, body): s = f"{subject or ''} {body or ''}".lower() has_info = any(w in s for w in INFO_WORDS) has_sched = any(w in s for w in SCHEDULE_WORDS) has_book = any(w in s for w in BOOKING_WORDS) if has_info: return "extended" if (has_sched or has_book) and not has_info: return "brief" return "brief" # ========================== # OPENAI: GENERAZIONE BOZZA # ========================== def generate_response(email_info, kb): try: from openai import OpenAI client = OpenAI(api_key=OPENAI_API_KEY) nickname = "amico" if email_info.get("sender_name"): nickname = email_info["sender_name"].split()[0] elif email_info.get("sender_email"): nickname = email_info["sender_email"].split("@")[0].split(".")[0] or "amico" kb_json = json.dumps(kb, ensure_ascii=False, indent=2) intent = classify_intent(email_info.get("subject",""), email_info.get("body_text","")) if intent == "brief": policy = ("Se il messaggio chiede solo orari e/o come prenotare, rispondi BREVE: " "indica orari precisi e inserisci SOLO il link prenotazione. Non aggiungere benefici o descrizioni.") else: policy = ("Se il messaggio chiede informazioni sul corso, rispondi ESTESO: " "includi orari, benefici principali, a chi è adatto, eventuale insegnante, e il link prenotazione.") email_text = ( f"Soggetto: {email_info.get('subject','')}\n" f"Mittente: {email_info.get('sender_email','')}\n" f"Corpo: {email_info.get('body_text','')}" ) prompt = f""" Sei Aurora, fondatrice di YogaSoul (www.yogasoul.it), stile zen e informale, diretto. Rispondi in italiano, amichevole e rilassato, con emoticon yoga (🌿, 🧘‍♀️, 😊) senza esagerare. Saluta con "Ciao {nickname}, bello sentirti!" e firma con "Namaste, Aurora - YogaSoul". Usa la knowledge base per corsi, orari, prezzi, benefici, insegnanti. Includi sempre il link_prenotazione specifico come Iscriviti qui per prenotazioni, e il calendario qui. Se non sai, scrivi: "Contattami per dettagli! 🧘‍♀️". {policy} Knowledge Base: {kb_json} Email ricevuta: {email_text} Scrivi la risposta in HTML pulito (usa

,

    /
  • se utile; niente CSS superfluo). """ resp = client.chat.completions.create( model=OPENAI_MODEL, messages=[{"role": "user", "content": prompt}], max_tokens=700, temperature=0.7, ) draft_raw = (resp.choices[0].message.content or "").strip() return _ensure_html_blocks(draft_raw) except Exception as e: print(f"[ERRORE] OpenAI: {e}") traceback.print_exc() return _ensure_html_blocks( "

    Errore con l'AI, contattami per dettagli! 🧘‍♀️
    " "Namaste, Aurora - YogaSoul

    " ) # ========================== # PIPELINE: processa TUTTE le UNSEEN (max N) # ========================== def fetch_all_unseen(mail, limit=None): """Ritorna lista di dict (uno per messaggio) + seq id, senza chiudere la connessione IMAP.""" results = [] typ, _ = mail.select("INBOX") if typ != "OK": raise RuntimeError("Impossibile selezionare INBOX") typ, data = mail.search(None, "UNSEEN") if typ != "OK": raise RuntimeError("Search UNSEEN fallita") ids = data[0].split() # solitamente in ordine crescente (più vecchie -> più nuove) if not ids: print("Nessuna email non letta.") return results if limit and limit > 0: ids = ids[-limit:] # ⬅️ prendi SOLO le ultime N non lette for seq_id in ids: typ, msg_data = mail.fetch(seq_id, "(RFC822)") if typ != "OK" or not msg_data or not msg_data[0]: print(f"[WARN] Fetch fallito per id {seq_id}") continue msg = email.message_from_bytes(msg_data[0][1]) name, sender_addr = parseaddr(msg.get("From", "") or "") subject = _decode_mime_words(msg.get("Subject", "") or "") message_id = msg.get("Message-ID", "") or "" references = msg.get("References", "") or "" body_text = "" if msg.is_multipart(): for part in msg.walk(): ctype = part.get_content_type() disp = (part.get("Content-Disposition") or "").lower() if ctype == "text/plain" and "attachment" not in disp: payload = part.get_payload(decode=True) if payload is not None: body_text = payload.decode(errors="ignore") break if not body_text: for part in msg.walk(): if part.get_content_type() == "text/html": payload = part.get_payload(decode=True) if payload is not None: body_text = _html_to_text(payload.decode(errors="ignore")) break else: payload = msg.get_payload(decode=True) if payload is not None: try: body_text = payload.decode(errors="ignore") except Exception: body_text = _html_to_text(payload.decode(errors="ignore")) results.append({ "seq_id": seq_id, "sender_email": sender_addr, "sender_name": name, "subject": subject, "message_id": message_id, "references": references, "body_text": body_text or "", }) return results def prepare_reply_mime(email_info, response_html): """Costruisce il MIME HTML della risposta con quote e thread headers.""" reply_body = (response_html.replace("", _make_quoted_original(email_info.get("body_text","")) + "") if "" in response_html.lower() else response_html + _make_quoted_original(email_info.get("body_text",""))) msg = MIMEText(reply_body, "html", "utf-8") subj = email_info.get("subject") or "[Risposta automatica] - YogaSoul" if not subj.lower().startswith("re:"): subj = f"Re: {subj}" msg["Subject"] = subj msg["From"] = EMAIL_ADDRESS if email_info.get("sender_email"): msg["To"] = email_info["sender_email"] if email_info.get("message_id"): msg["In-Reply-To"] = email_info["message_id"] prev_refs = email_info.get("references", "") if prev_refs and email_info.get("message_id"): msg["References"] = (prev_refs + " " + email_info["message_id"]).strip() elif email_info.get("message_id"): msg["References"] = email_info["message_id"] return msg def ensure_drafts_folder(mail): """Trova o crea la cartella bozze; ritorna (folder_name, delim).""" drafts_name, delim = _find_drafts_mailbox(mail) if drafts_name: return drafts_name, delim folder = f"INBOX{delim}{PREFERRED_DRAFT_FOLDER}" typ, _ = mail.select(folder, readonly=False) if typ == "OK": return folder, delim mail.create(folder) typ, _ = mail.select(folder, readonly=False) if typ == "OK": return folder, delim candidate = f"INBOX{delim}Drafts" mail.create(candidate) typ, _ = mail.select(candidate, readonly=False) if typ == "OK": return candidate, delim raise RuntimeError("Impossibile trovare/creare una cartella bozze.") def append_draft(mail, folder, msg): """Esegue APPEND con flag \Draft nella folder indicata.""" flags = r"(\Draft)" when = imaplib.Time2Internaldate(time.time()) typ, resp = mail.append(folder, flags, when, msg.as_bytes()) print("[APPEND]", folder, "=>", typ, resp) return typ == "OK" # ========================== # MAIN # ========================== def main(): if not assert_config(): return try: mail = imaplib.IMAP4_SSL(IMAP_SERVER) mail.login(EMAIL_ADDRESS, EMAIL_PASSWORD) except Exception as e: print(f"[ERRORE] Login IMAP fallito: {e}") traceback.print_exc() return try: drafts_folder, delim = ensure_drafts_folder(mail) print(f"[DEBUG] drafts_folder='{drafts_folder}' delim='{delim}'") emails = fetch_all_unseen(mail, limit=MAX_TO_PROCESS) # ⬅️ usa il limite if not emails: print("Nessuna nuova email da processare.") mail.logout() return kb = load_knowledge_base(KB_PATH) processed = 0 for info in emails: draft_html = generate_response(info, kb) mime_msg = prepare_reply_mime(info, draft_html) ok = append_draft(mail, drafts_folder, mime_msg) if ok and MARK_AS_SEEN: try: mail.store(info["seq_id"], "+FLAGS", "\\Seen") except Exception as e: print(f"[WARN] Non sono riuscito a marcare come letta id {info['seq_id']}: {e}") processed += 1 if THROTTLE_SECONDS: time.sleep(THROTTLE_SECONDS) print(f"✅ Elaborate {processed} email (max {MAX_TO_PROCESS}). Bozze salvate in '{drafts_folder}'.") mail.logout() except Exception as e: print(f"[ERRORE] Pipeline: {e}") traceback.print_exc() try: mail.logout() except Exception: pass if __name__ == "__main__": main()