yogibook_aury_new/public/yogasoulscript/email_autoresponder_yogasoul.py

512 lines
19 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import time
import json
import imaplib
import email
import traceback
import html as html_lib
from email.utils import parseaddr
from email.mime.text import MIMEText
from email.header import decode_header, make_header
from datetime import datetime
from pathlib import Path
import sys
# ==========================
# CONFIG da config.json (accanto allo script/EXE)
# ==========================
def app_dir() -> Path:
# se "frozen" (PyInstaller), usa la cartella dell'eseguibile
return Path(sys.executable).parent if getattr(sys, "frozen", False) else Path(__file__).resolve().parent
CONFIG_PATH = app_dir() / "config.json"
def load_config():
try:
with open(CONFIG_PATH, "r", encoding="utf-8") as f:
cfg = json.load(f)
print(f"[CONFIG] Caricato: {CONFIG_PATH}")
return cfg
except Exception as e:
print(f"[ERRORE] Impossibile leggere {CONFIG_PATH}: {e}")
return {}
CFG = load_config()
# valori letti dal config (con default sensati)
IMAP_SERVER = CFG.get("imap_server", "mail.yogasoul.it")
EMAIL_ADDRESS = CFG.get("email_address", "")
EMAIL_PASSWORD = CFG.get("email_password", "")
OPENAI_API_KEY = CFG.get("openai_api_key", "")
OPENAI_MODEL = CFG.get("openai_model", "gpt-3.5-turbo")
PREFERRED_DRAFT_FOLDER = CFG.get("preferred_draft_folder", "BozzaRisposte")
MARK_AS_SEEN = bool(CFG.get("mark_as_seen", True))
THROTTLE_SECONDS = float(CFG.get("throttle_seconds", 0) or 0)
MAX_TO_PROCESS = int(CFG.get("max_to_process", 5) or 5)
# Percorsi esterni (accanto all'eseguibile)
KB_PATH = str(app_dir() / CFG.get("kb_path", "yogasoul_knowledge_base.json"))
PROMPT_PATH = str(app_dir() / CFG.get("prompt_path", "prompt_template.txt"))
# ==========================
# UTILS
# ==========================
def _decode_mime_words(s):
if not s:
return ""
try:
return str(make_header(decode_header(s)))
except Exception:
parts = decode_header(s)
out = []
for text, enc in parts:
if isinstance(text, bytes):
out.append(text.decode(enc or "utf-8", errors="ignore"))
else:
out.append(text or "")
return "".join(out)
def _html_to_text(html):
try:
text = re.sub(r"(?is)<(script|style).*?>.*?</\1>", "", html or "")
text = re.sub(r"(?s)<br\s*/?>", "\n", text)
text = re.sub(r"(?s)</p\s*>", "\n\n", text)
text = re.sub(r"(?s)<.*?>", "", text)
return text.strip()
except Exception:
return html or ""
def _ensure_html_blocks(s):
"""Se il modello restituisce testo piatto, convertilo in HTML semplice e leggibile."""
s = (s or "").strip()
if "<html" in s.lower() or "<p" in s.lower() or "<br" in s.lower():
body = s
else:
parts = [p.strip() for p in s.split("\n\n") if p.strip()]
if len(parts) > 1:
body = "".join(
"<p>{}</p>".format(html_lib.escape(p).replace("\n", "<br>"))
for p in parts
)
else:
body = "<p>{}</p>".format(html_lib.escape(s).replace("\n", "<br>"))
return "<!doctype html><html><body>{}</body></html>".format(body)
def _make_quoted_original(body_text):
"""Crea il blocco citato del messaggio originale, safe-escaped."""
if not body_text:
return ""
escaped = html_lib.escape(body_text).replace("\n", "<br>")
return (
"<hr>"
"<p style='color:#666; font-size:90%'>— Messaggio originale —</p>"
f"<blockquote style='margin:0 0 0 1em; padding-left:1em; border-left:3px solid #ddd'>{escaped}</blockquote>"
)
def _inject_before_body_end(html_src: str, addition: str) -> str:
"""Inserisce 'addition' prima di </body> in modo case-insensitive; se manca </body>, appende."""
m = re.search(r'</\s*body\s*>', html_src, flags=re.I)
if not m:
return html_src + addition
start = m.start()
return html_src[:start] + addition + html_src[start:]
def load_knowledge_base(path=KB_PATH):
if not os.path.isfile(path):
print(f"[ATTENZIONE] KB non trovata: {path}. Proseguo senza.")
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"[ERRORE] Lettura KB: {e}. Proseguo senza.")
return {}
def assert_config():
problems = []
if not isinstance(IMAP_SERVER, str) or not IMAP_SERVER.strip():
problems.append("imap_server mancante in config.json")
if not isinstance(EMAIL_ADDRESS, str) or not EMAIL_ADDRESS.strip():
problems.append("email_address mancante in config.json")
if not isinstance(EMAIL_PASSWORD, str) or not EMAIL_PASSWORD.strip():
problems.append("email_password mancante in config.json")
if not isinstance(OPENAI_API_KEY, str) or not OPENAI_API_KEY.strip():
problems.append("openai_api_key mancante in config.json")
if problems:
print("[CONFIG] Correggi config.json:")
for p in problems:
print(" -", p)
return False
return True
# ==========================
# Prompt esterno
# ==========================
def load_prompt(path: str) -> str:
try:
with open(path, "r", encoding="utf-8") as f:
print(f"[PROMPT] Caricato: {path}")
return f.read()
except Exception as e:
print(f"[ATTENZIONE] Prompt non trovato o illeggibile: {path} ({e})")
return ""
def render_prompt(template: str, **vars_) -> str:
# Sostituzione semplice stile {{nome}}
out = template
for k, v in vars_.items():
out = out.replace(f"{{{{{k}}}}}", str(v))
return out
# ==========================
# IMAP helpers robusti
# ==========================
LIST_RE = re.compile(r'^\s*\((?P<flags>[^)]*)\)\s+"(?P<delim>[^"]+)"\s+(?P<name>.+?)\s*$')
def _parse_list_line(raw: bytes):
s = raw.decode(errors="ignore")
m = LIST_RE.match(s)
if not m:
parts = s.strip().split()
name = parts[-1] if parts else ""
name = name.strip('"')
return ([], ".", name)
flags_str = m.group("flags") or ""
delim = m.group("delim") or "."
name = m.group("name").strip()
if name.startswith('"') and name.endswith('"'):
name = name[1:-1]
flags = [f.strip() for f in flags_str.split() if f.strip()]
return (flags, delim, name)
def _list_mailboxes(mail):
try:
typ, boxes = mail.list()
if typ != "OK":
return []
return [_parse_list_line(raw) for raw in (boxes or [])]
except Exception:
return []
def _find_drafts_mailbox(mail):
boxes = _list_mailboxes(mail)
if not boxes:
return (None, ".")
delim_guess = boxes[0][1] if boxes[0][1] else "."
# special-use \Drafts
for flags, delim, name in boxes:
if any("\\Drafts" in f or "\\drafts" in f for f in flags):
return (name, delim or delim_guess)
# nomi comuni
wanted = ("drafts", "bozze", "bozza", "draft")
for flags, delim, name in boxes:
low = name.lower()
if any(w in low.split(delim or ".")[-1] for w in wanted):
return (name, delim or delim_guess)
return (None, delim_guess)
def _select_or_create(mail, name, delim):
typ, _ = mail.select(name, readonly=False)
if typ == "OK":
return name
candidate = f"INBOX{delim}{name}"
typ, _ = mail.select(candidate, readonly=False)
if typ == "OK":
return candidate
mail.create(candidate)
typ, _ = mail.select(candidate, readonly=False)
if typ == "OK":
return candidate
raise RuntimeError(f"Impossibile selezionare o creare la casella '{name}' (delim='{delim}')")
# ==========================
# INTENT DETECTION (semplice)
# ==========================
SCHEDULE_WORDS = [
"orario", "orari", "quando", "che ore", "a che ora", "giorni", "mercoledì", "martedì",
"lezione di", "inizia", "finisce", "durata"
]
BOOKING_WORDS = [
"prenota", "prenotazione", "prenotare", "iscriversi", "iscrizione", "link", "come fare", "dove prenoto"
]
INFO_WORDS = [
"informazioni", "info", "cos'è", "che cos", "benefici", "a chi è adatto", "livello", "programma",
"insegnante", "maestro", "costi", "prezzo", "quanto costa", "materiale", "cosa portare"
]
def classify_intent(subject, body):
s = f"{subject or ''} {body or ''}".lower()
has_info = any(w in s for w in INFO_WORDS)
has_sched = any(w in s for w in SCHEDULE_WORDS)
has_book = any(w in s for w in BOOKING_WORDS)
if has_info:
return "extended"
if (has_sched or has_book) and not has_info:
return "brief"
return "brief"
# ==========================
# OPENAI: GENERAZIONE BOZZA
# ==========================
def generate_response(email_info, kb):
try:
from openai import OpenAI
client = OpenAI(api_key=OPENAI_API_KEY)
nickname = "amico"
if email_info.get("sender_name"):
nickname = email_info["sender_name"].split()[0]
elif email_info.get("sender_email"):
nickname = email_info["sender_email"].split("@")[0].split(".")[0] or "amico"
kb_json = json.dumps(kb, ensure_ascii=False, indent=2)
intent = classify_intent(email_info.get("subject",""), email_info.get("body_text",""))
if intent == "brief":
policy = ("Se il messaggio chiede solo orari e/o come prenotare, rispondi BREVE: "
"indica orari precisi e inserisci SOLO il link prenotazione. Non aggiungere benefici o descrizioni.")
else:
policy = ("Se il messaggio chiede informazioni sul corso, rispondi ESTESO: "
"includi orari, benefici principali, a chi è adatto, eventuale insegnante, e il link prenotazione.")
email_text = (
f"Soggetto: {email_info.get('subject','')}\n"
f"Mittente: {email_info.get('sender_email','')}\n"
f"Corpo: {email_info.get('body_text','')}"
)
# Prompt da file esterno (con fallback interno)
template = load_prompt(PROMPT_PATH)
if not template:
template = (
"Sei Aurora, fondatrice di YogaSoul (www.yogasoul.it), stile zen e informale, diretto.\n"
"Rispondi in italiano, amichevole e rilassato, con emoticon yoga (🌿, 🧘‍♀️, 😊) senza esagerare.\n"
"Saluta con \"Ciao {{nickname}}, bello sentirti!\" e firma con \"Namaste, Aurora - YogaSoul\".\n"
"Usa la knowledge base per corsi, orari, prezzi, benefici, insegnanti.\n"
"Includi sempre il link_prenotazione specifico come <a href='link'>Iscriviti qui</a> per prenotazioni,\n"
"e il calendario <a href='https://yogasoul.it/wp-content/uploads/2025/08/Calendario-settembre-2025-2.jpg'>qui</a>.\n"
"Se non sai, scrivi: \"Contattami per dettagli! 🧘‍♀️\".\n\n"
"{{policy}}\n\n"
"Knowledge Base:\n"
"{{kb_json}}\n\n"
"Email ricevuta:\n"
"{{email_text}}\n\n"
"Scrivi la risposta in HTML pulito (usa <p>, <ul>/<li> se utile; niente CSS superfluo).\n"
)
prompt = render_prompt(
template,
nickname=nickname,
policy=policy,
kb_json=kb_json,
email_text=email_text,
)
resp = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[{"role": "user", "content": prompt}],
max_tokens=700,
temperature=0.7,
)
draft_raw = (resp.choices[0].message.content or "").strip()
return _ensure_html_blocks(draft_raw)
except Exception as e:
print(f"[ERRORE] OpenAI: {e}")
traceback.print_exc()
return _ensure_html_blocks(
"<p>Errore con l'AI, contattami per dettagli! 🧘‍♀️<br>"
"Namaste, Aurora - YogaSoul</p>"
)
# ==========================
# PIPELINE: processa TUTTE le UNSEEN (max N)
# ==========================
def fetch_all_unseen(mail, limit=None):
"""Ritorna lista di dict (uno per messaggio) + seq id, senza chiudere la connessione IMAP."""
results = []
typ, _ = mail.select("INBOX")
if typ != "OK":
raise RuntimeError("Impossibile selezionare INBOX")
typ, data = mail.search(None, "UNSEEN")
if typ != "OK":
raise RuntimeError("Search UNSEEN fallita")
ids = data[0].split() # solitamente in ordine crescente (più vecchie -> più nuove)
if not ids:
print("Nessuna email non letta.")
return results
if limit and limit > 0:
ids = ids[-limit:] # prendi SOLO le ultime N non lette
for seq_id in ids:
typ, msg_data = mail.fetch(seq_id, "(RFC822)")
if typ != "OK" or not msg_data or not msg_data[0]:
print(f"[WARN] Fetch fallito per id {seq_id}")
continue
msg = email.message_from_bytes(msg_data[0][1])
name, sender_addr = parseaddr(msg.get("From", "") or "")
subject = _decode_mime_words(msg.get("Subject", "") or "")
message_id = msg.get("Message-ID", "") or ""
references = msg.get("References", "") or ""
body_text = ""
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
disp = (part.get("Content-Disposition") or "").lower()
if ctype == "text/plain" and "attachment" not in disp:
payload = part.get_payload(decode=True)
if payload is not None:
body_text = payload.decode(errors="ignore")
break
if not body_text:
for part in msg.walk():
if part.get_content_type() == "text/html":
payload = part.get_payload(decode=True)
if payload is not None:
body_text = _html_to_text(payload.decode(errors="ignore"))
break
else:
payload = msg.get_payload(decode=True)
if payload is not None:
try:
body_text = payload.decode(errors="ignore")
except Exception:
body_text = _html_to_text(payload.decode(errors="ignore"))
results.append({
"seq_id": seq_id,
"sender_email": sender_addr,
"sender_name": name,
"subject": subject,
"message_id": message_id,
"references": references,
"body_text": body_text or "",
})
return results
def prepare_reply_mime(email_info, response_html):
"""Costruisce il MIME HTML della risposta con quote e thread headers."""
reply_body = _inject_before_body_end(
response_html,
_make_quoted_original(email_info.get("body_text",""))
)
msg = MIMEText(reply_body, "html", "utf-8")
subj = email_info.get("subject") or "[Risposta automatica] - YogaSoul"
if not subj.lower().startswith("re:"):
subj = f"Re: {subj}"
msg["Subject"] = subj
msg["From"] = EMAIL_ADDRESS
if email_info.get("sender_email"):
msg["To"] = email_info["sender_email"]
if email_info.get("message_id"):
msg["In-Reply-To"] = email_info["message_id"]
prev_refs = email_info.get("references", "")
if prev_refs and email_info.get("message_id"):
msg["References"] = (prev_refs + " " + email_info["message_id"]).strip()
elif email_info.get("message_id"):
msg["References"] = email_info["message_id"]
return msg
def ensure_drafts_folder(mail):
"""Trova o crea la cartella bozze; ritorna (folder_name, delim)."""
drafts_name, delim = _find_drafts_mailbox(mail)
if drafts_name:
return drafts_name, delim
folder = f"INBOX{delim}{PREFERRED_DRAFT_FOLDER}"
typ, _ = mail.select(folder, readonly=False)
if typ == "OK":
return folder, delim
mail.create(folder)
typ, _ = mail.select(folder, readonly=False)
if typ == "OK":
return folder, delim
candidate = f"INBOX{delim}Drafts"
mail.create(candidate)
typ, _ = mail.select(candidate, readonly=False)
if typ == "OK":
return candidate, delim
raise RuntimeError("Impossibile trovare/creare una cartella bozze.")
def append_draft(mail, folder, msg):
"""Esegue APPEND con flag \Draft nella folder indicata."""
flags = r"(\Draft)"
when = imaplib.Time2Internaldate(time.time())
typ, resp = mail.append(folder, flags, when, msg.as_bytes())
print("[APPEND]", folder, "=>", typ, resp)
return typ == "OK"
# ==========================
# MAIN
# ==========================
def main():
# Controllo config di base
problems = []
if not isinstance(IMAP_SERVER, str) or not IMAP_SERVER.strip():
problems.append("imap_server mancante in config.json")
if not isinstance(EMAIL_ADDRESS, str) or not EMAIL_ADDRESS.strip():
problems.append("email_address mancante in config.json")
if not isinstance(EMAIL_PASSWORD, str) or not EMAIL_PASSWORD.strip():
problems.append("email_password mancante in config.json")
if not isinstance(OPENAI_API_KEY, str) or not OPENAI_API_KEY.strip():
problems.append("openai_api_key mancante in config.json")
if problems:
print("[CONFIG] Correggi config.json:")
for p in problems:
print(" -", p)
return
# Login IMAP
try:
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
except Exception as e:
print(f"[ERRORE] Login IMAP fallito: {e}")
traceback.print_exc()
return
try:
drafts_folder, delim = ensure_drafts_folder(mail)
print(f"[DEBUG] drafts_folder='{drafts_folder}' delim='{delim}'")
emails = fetch_all_unseen(mail, limit=MAX_TO_PROCESS)
if not emails:
print("Nessuna nuova email da processare.")
mail.logout()
return
kb = load_knowledge_base(KB_PATH)
processed = 0
for info in emails:
draft_html = generate_response(info, kb)
mime_msg = prepare_reply_mime(info, draft_html)
ok = append_draft(mail, drafts_folder, mime_msg)
if ok and MARK_AS_SEEN:
try:
mail.store(info["seq_id"], "+FLAGS", "\\Seen")
except Exception as e:
print(f"[WARN] Non sono riuscito a marcare come letta id {info['seq_id']}: {e}")
processed += 1
if THROTTLE_SECONDS:
time.sleep(THROTTLE_SECONDS)
print(f"✅ Elaborate {processed} email (max {MAX_TO_PROCESS}). Bozze salvate in '{drafts_folder}'.")
mail.logout()
except Exception as e:
print(f"[ERRORE] Pipeline: {e}")
traceback.print_exc()
try:
mail.logout()
except Exception:
pass
if __name__ == "__main__":
main()