yogibook_aury_new/public/yogasoulscript/email_autoresponder_yogasoul.py
2025-09-07 19:31:40 +02:00

436 lines
16 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
import os
import re
import time
import json
import imaplib
import email
import traceback
import html as html_lib
from email.utils import parseaddr
from email.mime.text import MIMEText
from email.header import decode_header, make_header
from datetime import datetime
# ==========================
# CONFIG (compila qui)
# ==========================
IMAP_SERVER = "mail.yogasoul.it"
EMAIL_ADDRESS = "info@yogasoul.it"
EMAIL_PASSWORD = "!Testolina88" # <— METTI LA PASSWORD QUI
OPENAI_API_KEY = "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" # <— METTI LA OPENAI KEY QUI
KB_PATH = "yogasoul_knowledge_base.json"
OPENAI_MODEL = "gpt-3.5-turbo"
PREFERRED_DRAFT_FOLDER = "BozzaRisposte" # se non trova Drafts, crea INBOX.<questa>
MARK_AS_SEEN = True # False per lasciare le email non lette
THROTTLE_SECONDS = 0 # ritardo tra email (0 = nessuno)
MAX_TO_PROCESS = 5 # ⬅️ NUOVO: processa al massimo N email UNSEEN (ultime)
# ==========================
# UTILS
# ==========================
def _decode_mime_words(s):
if not s:
return ""
try:
return str(make_header(decode_header(s)))
except Exception:
parts = decode_header(s)
out = []
for text, enc in parts:
if isinstance(text, bytes):
out.append(text.decode(enc or "utf-8", errors="ignore"))
else:
out.append(text or "")
return "".join(out)
def _html_to_text(html):
try:
text = re.sub(r"(?is)<(script|style).*?>.*?</\1>", "", html or "")
text = re.sub(r"(?s)<br\s*/?>", "\n", text)
text = re.sub(r"(?s)</p\s*>", "\n\n", text)
text = re.sub(r"(?s)<.*?>", "", text)
return text.strip()
except Exception:
return html or ""
def _ensure_html_blocks(s):
"""Se il modello restituisce testo piatto, convertilo in HTML semplice e leggibile."""
s = (s or "").strip()
if "<html" in s.lower() or "<p" in s.lower() or "<br" in s.lower():
body = s
else:
parts = [p.strip() for p in s.split("\n\n") if p.strip()]
if len(parts) > 1:
body = "".join(f"<p>{html_lib.escape(p).replace('\n','<br>')}</p>" for p in parts)
else:
body = "<p>" + html_lib.escape(s).replace("\n", "<br>") + "</p>"
return f"<!doctype html><html><body>{body}</body></html>"
def _make_quoted_original(body_text):
"""Crea il blocco citato del messaggio originale, safe-escaped."""
if not body_text:
return ""
escaped = html_lib.escape(body_text).replace("\n", "<br>")
return (
"<hr>"
"<p style='color:#666; font-size:90%'>— Messaggio originale —</p>"
f"<blockquote style='margin:0 0 0 1em; padding-left:1em; border-left:3px solid #ddd'>{escaped}</blockquote>"
)
def load_knowledge_base(path=KB_PATH):
if not os.path.isfile(path):
print(f"[ATTENZIONE] KB non trovata: {path}. Proseguo senza.")
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return json.load(f)
except Exception as e:
print(f"[ERRORE] Lettura KB: {e}. Proseguo senza.")
return {}
def assert_config():
problems = []
if not isinstance(IMAP_SERVER, str) or not IMAP_SERVER.strip():
problems.append("IMAP_SERVER mancante")
if not isinstance(EMAIL_ADDRESS, str) or not EMAIL_ADDRESS.strip():
problems.append("EMAIL_ADDRESS mancante")
if not isinstance(EMAIL_PASSWORD, str) or not EMAIL_PASSWORD.strip() or EMAIL_PASSWORD == "INSERISCI_LA_PASSWORD":
problems.append("EMAIL_PASSWORD non impostata")
if not isinstance(OPENAI_API_KEY, str) or not OPENAI_API_KEY.strip() or OPENAI_API_KEY == "INSERISCI_OPENAI_API_KEY":
problems.append("OPENAI_API_KEY non impostata")
if problems:
print("[CONFIG] Correggi questi parametri prima di proseguire:")
for p in problems:
print(" -", p)
return False
return True
# ==========================
# IMAP helpers robusti
# ==========================
LIST_RE = re.compile(r'^\s*\((?P<flags>[^)]*)\)\s+"(?P<delim>[^"]+)"\s+(?P<name>.+?)\s*$')
def _parse_list_line(raw: bytes):
s = raw.decode(errors="ignore")
m = LIST_RE.match(s)
if not m:
parts = s.strip().split()
name = parts[-1] if parts else ""
name = name.strip('"')
return ([], ".", name)
flags_str = m.group("flags") or ""
delim = m.group("delim") or "."
name = m.group("name").strip()
if name.startswith('"') and name.endswith('"'):
name = name[1:-1]
flags = [f.strip() for f in flags_str.split() if f.strip()]
return (flags, delim, name)
def _list_mailboxes(mail):
try:
typ, boxes = mail.list()
if typ != "OK":
return []
return [_parse_list_line(raw) for raw in (boxes or [])]
except Exception:
return []
def _find_drafts_mailbox(mail):
boxes = _list_mailboxes(mail)
if not boxes:
return (None, ".")
delim_guess = boxes[0][1] if boxes[0][1] else "."
# special-use \Drafts
for flags, delim, name in boxes:
if any("\\Drafts" in f or "\\drafts" in f for f in flags):
return (name, delim or delim_guess)
# nomi comuni
wanted = ("drafts", "bozze", "bozza", "draft")
for flags, delim, name in boxes:
low = name.lower()
if any(w in low.split(delim or ".")[-1] for w in wanted):
return (name, delim or delim_guess)
return (None, delim_guess)
def _select_or_create(mail, name, delim):
typ, _ = mail.select(name, readonly=False)
if typ == "OK":
return name
candidate = f"INBOX{delim}{name}"
typ, _ = mail.select(candidate, readonly=False)
if typ == "OK":
return candidate
mail.create(candidate)
typ, _ = mail.select(candidate, readonly=False)
if typ == "OK":
return candidate
raise RuntimeError(f"Impossibile selezionare o creare la casella '{name}' (delim='{delim}')")
# ==========================
# INTENT DETECTION (semplice)
# ==========================
SCHEDULE_WORDS = [
"orario", "orari", "quando", "che ore", "a che ora", "giorni", "mercoledì", "martedì",
"lezione di", "inizia", "finisce", "durata"
]
BOOKING_WORDS = [
"prenota", "prenotazione", "prenotare", "iscriversi", "iscrizione", "link", "come fare", "dove prenoto"
]
INFO_WORDS = [
"informazioni", "info", "cos'è", "che cos", "benefici", "a chi è adatto", "livello", "programma",
"insegnante", "maestro", "costi", "prezzo", "quanto costa", "materiale", "cosa portare"
]
def classify_intent(subject, body):
s = f"{subject or ''} {body or ''}".lower()
has_info = any(w in s for w in INFO_WORDS)
has_sched = any(w in s for w in SCHEDULE_WORDS)
has_book = any(w in s for w in BOOKING_WORDS)
if has_info:
return "extended"
if (has_sched or has_book) and not has_info:
return "brief"
return "brief"
# ==========================
# OPENAI: GENERAZIONE BOZZA
# ==========================
def generate_response(email_info, kb):
try:
from openai import OpenAI
client = OpenAI(api_key=OPENAI_API_KEY)
nickname = "amico"
if email_info.get("sender_name"):
nickname = email_info["sender_name"].split()[0]
elif email_info.get("sender_email"):
nickname = email_info["sender_email"].split("@")[0].split(".")[0] or "amico"
kb_json = json.dumps(kb, ensure_ascii=False, indent=2)
intent = classify_intent(email_info.get("subject",""), email_info.get("body_text",""))
if intent == "brief":
policy = ("Se il messaggio chiede solo orari e/o come prenotare, rispondi BREVE: "
"indica orari precisi e inserisci SOLO il link prenotazione. Non aggiungere benefici o descrizioni.")
else:
policy = ("Se il messaggio chiede informazioni sul corso, rispondi ESTESO: "
"includi orari, benefici principali, a chi è adatto, eventuale insegnante, e il link prenotazione.")
email_text = (
f"Soggetto: {email_info.get('subject','')}\n"
f"Mittente: {email_info.get('sender_email','')}\n"
f"Corpo: {email_info.get('body_text','')}"
)
prompt = f"""
Sei Aurora, fondatrice di YogaSoul (www.yogasoul.it), stile zen e informale, diretto.
Rispondi in italiano, amichevole e rilassato, con emoticon yoga (🌿, 🧘‍♀️, 😊) senza esagerare.
Saluta con "Ciao {nickname}, bello sentirti!" e firma con "Namaste, Aurora - YogaSoul".
Usa la knowledge base per corsi, orari, prezzi, benefici, insegnanti.
Includi sempre il link_prenotazione specifico come <a href='link'>Iscriviti qui</a> per prenotazioni,
e il calendario <a href='https://yogasoul.it/wp-content/uploads/2025/08/Calendario-settembre-2025-2.jpg'>qui</a>.
Se non sai, scrivi: "Contattami per dettagli! 🧘‍♀️".
{policy}
Knowledge Base:
{kb_json}
Email ricevuta:
{email_text}
Scrivi la risposta in HTML pulito (usa <p>, <ul>/<li> se utile; niente CSS superfluo).
"""
resp = client.chat.completions.create(
model=OPENAI_MODEL,
messages=[{"role": "user", "content": prompt}],
max_tokens=700,
temperature=0.7,
)
draft_raw = (resp.choices[0].message.content or "").strip()
return _ensure_html_blocks(draft_raw)
except Exception as e:
print(f"[ERRORE] OpenAI: {e}")
traceback.print_exc()
return _ensure_html_blocks(
"<p>Errore con l'AI, contattami per dettagli! 🧘‍♀️<br>"
"Namaste, Aurora - YogaSoul</p>"
)
# ==========================
# PIPELINE: processa TUTTE le UNSEEN (max N)
# ==========================
def fetch_all_unseen(mail, limit=None):
"""Ritorna lista di dict (uno per messaggio) + seq id, senza chiudere la connessione IMAP."""
results = []
typ, _ = mail.select("INBOX")
if typ != "OK":
raise RuntimeError("Impossibile selezionare INBOX")
typ, data = mail.search(None, "UNSEEN")
if typ != "OK":
raise RuntimeError("Search UNSEEN fallita")
ids = data[0].split() # solitamente in ordine crescente (più vecchie -> più nuove)
if not ids:
print("Nessuna email non letta.")
return results
if limit and limit > 0:
ids = ids[-limit:] # ⬅️ prendi SOLO le ultime N non lette
for seq_id in ids:
typ, msg_data = mail.fetch(seq_id, "(RFC822)")
if typ != "OK" or not msg_data or not msg_data[0]:
print(f"[WARN] Fetch fallito per id {seq_id}")
continue
msg = email.message_from_bytes(msg_data[0][1])
name, sender_addr = parseaddr(msg.get("From", "") or "")
subject = _decode_mime_words(msg.get("Subject", "") or "")
message_id = msg.get("Message-ID", "") or ""
references = msg.get("References", "") or ""
body_text = ""
if msg.is_multipart():
for part in msg.walk():
ctype = part.get_content_type()
disp = (part.get("Content-Disposition") or "").lower()
if ctype == "text/plain" and "attachment" not in disp:
payload = part.get_payload(decode=True)
if payload is not None:
body_text = payload.decode(errors="ignore")
break
if not body_text:
for part in msg.walk():
if part.get_content_type() == "text/html":
payload = part.get_payload(decode=True)
if payload is not None:
body_text = _html_to_text(payload.decode(errors="ignore"))
break
else:
payload = msg.get_payload(decode=True)
if payload is not None:
try:
body_text = payload.decode(errors="ignore")
except Exception:
body_text = _html_to_text(payload.decode(errors="ignore"))
results.append({
"seq_id": seq_id,
"sender_email": sender_addr,
"sender_name": name,
"subject": subject,
"message_id": message_id,
"references": references,
"body_text": body_text or "",
})
return results
def prepare_reply_mime(email_info, response_html):
"""Costruisce il MIME HTML della risposta con quote e thread headers."""
reply_body = (response_html.replace("</body>", _make_quoted_original(email_info.get("body_text","")) + "</body>")
if "</body>" in response_html.lower()
else response_html + _make_quoted_original(email_info.get("body_text","")))
msg = MIMEText(reply_body, "html", "utf-8")
subj = email_info.get("subject") or "[Risposta automatica] - YogaSoul"
if not subj.lower().startswith("re:"):
subj = f"Re: {subj}"
msg["Subject"] = subj
msg["From"] = EMAIL_ADDRESS
if email_info.get("sender_email"):
msg["To"] = email_info["sender_email"]
if email_info.get("message_id"):
msg["In-Reply-To"] = email_info["message_id"]
prev_refs = email_info.get("references", "")
if prev_refs and email_info.get("message_id"):
msg["References"] = (prev_refs + " " + email_info["message_id"]).strip()
elif email_info.get("message_id"):
msg["References"] = email_info["message_id"]
return msg
def ensure_drafts_folder(mail):
"""Trova o crea la cartella bozze; ritorna (folder_name, delim)."""
drafts_name, delim = _find_drafts_mailbox(mail)
if drafts_name:
return drafts_name, delim
folder = f"INBOX{delim}{PREFERRED_DRAFT_FOLDER}"
typ, _ = mail.select(folder, readonly=False)
if typ == "OK":
return folder, delim
mail.create(folder)
typ, _ = mail.select(folder, readonly=False)
if typ == "OK":
return folder, delim
candidate = f"INBOX{delim}Drafts"
mail.create(candidate)
typ, _ = mail.select(candidate, readonly=False)
if typ == "OK":
return candidate, delim
raise RuntimeError("Impossibile trovare/creare una cartella bozze.")
def append_draft(mail, folder, msg):
"""Esegue APPEND con flag \Draft nella folder indicata."""
flags = r"(\Draft)"
when = imaplib.Time2Internaldate(time.time())
typ, resp = mail.append(folder, flags, when, msg.as_bytes())
print("[APPEND]", folder, "=>", typ, resp)
return typ == "OK"
# ==========================
# MAIN
# ==========================
def main():
if not assert_config():
return
try:
mail = imaplib.IMAP4_SSL(IMAP_SERVER)
mail.login(EMAIL_ADDRESS, EMAIL_PASSWORD)
except Exception as e:
print(f"[ERRORE] Login IMAP fallito: {e}")
traceback.print_exc()
return
try:
drafts_folder, delim = ensure_drafts_folder(mail)
print(f"[DEBUG] drafts_folder='{drafts_folder}' delim='{delim}'")
emails = fetch_all_unseen(mail, limit=MAX_TO_PROCESS) # ⬅️ usa il limite
if not emails:
print("Nessuna nuova email da processare.")
mail.logout()
return
kb = load_knowledge_base(KB_PATH)
processed = 0
for info in emails:
draft_html = generate_response(info, kb)
mime_msg = prepare_reply_mime(info, draft_html)
ok = append_draft(mail, drafts_folder, mime_msg)
if ok and MARK_AS_SEEN:
try:
mail.store(info["seq_id"], "+FLAGS", "\\Seen")
except Exception as e:
print(f"[WARN] Non sono riuscito a marcare come letta id {info['seq_id']}: {e}")
processed += 1
if THROTTLE_SECONDS:
time.sleep(THROTTLE_SECONDS)
print(f"✅ Elaborate {processed} email (max {MAX_TO_PROCESS}). Bozze salvate in '{drafts_folder}'.")
mail.logout()
except Exception as e:
print(f"[ERRORE] Pipeline: {e}")
traceback.print_exc()
try:
mail.logout()
except Exception:
pass
if __name__ == "__main__":
main()