1002 lines
42 KiB
Python
1002 lines
42 KiB
Python
"""
|
||
Telegram AI Chatbot — Multi-persona, owner-protected, semantic memory
|
||
Powered by Groq API + PostgreSQL 16 + pgvector + Ollama embeddings
|
||
|
||
Config files:
|
||
.env — persona, bot name, trigger, rules (different per deployment)
|
||
.env_credentials — secret keys, DB and Ollama credentials
|
||
"""
|
||
|
||
import os
|
||
import re
|
||
import json
|
||
import logging
|
||
import asyncio
|
||
import aiohttp
|
||
from urllib.parse import quote_plus
|
||
import datetime
|
||
import zoneinfo
|
||
import traceback
|
||
from functools import wraps
|
||
|
||
from dotenv import load_dotenv
|
||
|
||
load_dotenv(".env_credentials")
|
||
load_dotenv(".env")
|
||
|
||
import psycopg2
|
||
from psycopg2.extras import RealDictCursor
|
||
from psycopg2 import pool as pg_pool
|
||
|
||
from telegram import Update
|
||
from telegram.ext import (
|
||
Application, CommandHandler, MessageHandler,
|
||
filters, ContextTypes
|
||
)
|
||
from groq import Groq
|
||
try:
|
||
from openai import OpenAI
|
||
except ImportError:
|
||
OpenAI = None
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# CONFIG
|
||
# ══════════════════════════════════════════════
|
||
|
||
# ── Credentials (.env_credentials) ────────────
|
||
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
|
||
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")
|
||
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
|
||
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "groq")
|
||
GROQ_MODEL = os.getenv("GROQ_MODEL", "llama-3.3-70b-versatile")
|
||
OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "deepseek/deepseek-chat-v3-0324")
|
||
|
||
PG_CONFIG = {
|
||
"host": os.getenv("PG_HOST", "localhost"),
|
||
"port": int(os.getenv("PG_PORT", "5432")),
|
||
"dbname": os.getenv("PG_DB", "manel_bot"),
|
||
"user": os.getenv("PG_USER", "postgres"),
|
||
"password": os.getenv("PG_PASSWORD", ""),
|
||
}
|
||
|
||
OWNER_ID = int(os.getenv("OWNER_TELEGRAM_ID", "0"))
|
||
|
||
# NewsData.io
|
||
NEWSDATA_API_KEY = os.getenv("NEWSDATA_API_KEY", "")
|
||
|
||
# Ollama endpoint and embedding model
|
||
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
|
||
EMBED_MODEL = os.getenv("EMBED_MODEL", "nomic-embed-text-v2-moe")
|
||
# nomic-embed-text-v2-moe output dimension — update if you switch models
|
||
EMBED_DIM = int(os.getenv("EMBED_DIM", "768"))
|
||
# How many semantically similar results to return per search_memory call
|
||
VECTOR_TOP_K = int(os.getenv("VECTOR_TOP_K", "8"))
|
||
|
||
# ── Persona (.env) ─────────────────────────────
|
||
BOT_NAME = os.getenv("BOT_NAME", "Manel")
|
||
BOT_TRIGGER = os.getenv("BOT_TRIGGER", "manel")
|
||
BOT_MAX_CONTEXT = int(os.getenv("BOT_MAX_CONTEXT", "40"))
|
||
BOT_HISTORY_SAVE = int(os.getenv("BOT_HISTORY_SAVE", "500"))
|
||
|
||
BOT_ROLE = os.getenv("BOT_ROLE", "")
|
||
BOT_RULES = os.getenv("BOT_RULES", "")
|
||
BOT_TOOL_RULES = os.getenv("BOT_TOOL_RULES", "")
|
||
BOT_MEMORY_RULES = os.getenv("BOT_MEMORY_RULES", "")
|
||
BOT_EXAMPLES = os.getenv("BOT_EXAMPLES", "")
|
||
BOT_TIMEZONE = os.getenv("BOT_TIMEZONE", "Europe/Lisbon") # home timezone for date/time display
|
||
|
||
# ─────────────────────────────────────────────
|
||
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
|
||
log = logging.getLogger(__name__)
|
||
|
||
if LLM_PROVIDER == "openrouter":
|
||
if OpenAI is None:
|
||
raise ImportError("openai package not installed. Run: pip install openai")
|
||
llm_client = OpenAI(
|
||
api_key=OPENROUTER_API_KEY,
|
||
base_url="https://openrouter.ai/api/v1",
|
||
default_headers={"HTTP-Referer": "https://github.com/manel-bot"}
|
||
)
|
||
LLM_MODEL = OPENROUTER_MODEL
|
||
else:
|
||
llm_client = Groq(api_key=GROQ_API_KEY)
|
||
LLM_MODEL = GROQ_MODEL
|
||
pg_pool_conn: pg_pool.ThreadedConnectionPool | None = None
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# SYSTEM PROMPT BUILDER
|
||
# ══════════════════════════════════════════════
|
||
def build_system_prompt(user_first_name: str, memories: list[str]) -> str:
|
||
def env_to_bullets(raw: str) -> str:
|
||
return "\n".join("- " + l.strip() for l in raw.splitlines() if l.strip())
|
||
|
||
memory_block = ""
|
||
if memories:
|
||
mem_lines = "\n".join("- " + m for m in memories)
|
||
memory_block = "\n\n[LONG-TERM MEMORY about users in this group]\n" + mem_lines
|
||
|
||
sections = []
|
||
if BOT_ROLE: sections.append("# Role\n" + BOT_ROLE)
|
||
if BOT_RULES: sections.append("# Rules\n" + env_to_bullets(BOT_RULES))
|
||
if BOT_TOOL_RULES: sections.append("# Tool Selection Strategy\n"+ env_to_bullets(BOT_TOOL_RULES))
|
||
if BOT_MEMORY_RULES: sections.append("# Memory Rules\n" + env_to_bullets(BOT_MEMORY_RULES))
|
||
if BOT_EXAMPLES: sections.append("# Examples\n" + BOT_EXAMPLES)
|
||
|
||
try:
|
||
home_tz = zoneinfo.ZoneInfo(BOT_TIMEZONE)
|
||
except Exception:
|
||
home_tz = datetime.timezone.utc
|
||
now_home = datetime.datetime.now(datetime.timezone.utc).astimezone(home_tz)
|
||
date_str = now_home.strftime("%A, %d %B %Y")
|
||
time_str = now_home.strftime("%H:%M")
|
||
|
||
time_block = (
|
||
f"Today is: {date_str}\n"
|
||
f"Current time ({BOT_TIMEZONE}): {time_str}\n"
|
||
f"For other timezones, use the web_search tool."
|
||
)
|
||
|
||
sections.append("# Current Date and Time\n" + time_block)
|
||
|
||
sections.append(
|
||
"# Current User\n"
|
||
"The user who mentioned you is: " + user_first_name
|
||
+ memory_block
|
||
)
|
||
return "\n\n".join(sections)
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# OLLAMA EMBEDDINGS
|
||
# ══════════════════════════════════════════════
|
||
async def get_embedding(text: str) -> list[float] | None:
|
||
"""Call local Ollama to embed text. Returns list of floats or None on error."""
|
||
url = OLLAMA_URL.rstrip("/") + "/api/embed"
|
||
payload = {"model": EMBED_MODEL, "input": text}
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=3)) as resp:
|
||
if resp.status != 200:
|
||
log.warning(f"Ollama embed HTTP {resp.status}: {await resp.text()}")
|
||
return None
|
||
data = await resp.json()
|
||
# Ollama /api/embed returns {"embeddings": [[...]]}
|
||
embeddings = data.get("embeddings")
|
||
if embeddings and len(embeddings) > 0:
|
||
return embeddings[0]
|
||
log.warning(f"Unexpected Ollama response: {data}")
|
||
return None
|
||
except Exception as e:
|
||
log.error(f"Ollama embedding error: {e}")
|
||
return None
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# DATABASE — PostgreSQL 16 + pgvector
|
||
# ══════════════════════════════════════════════
|
||
def init_db():
|
||
global pg_pool_conn
|
||
pg_pool_conn = pg_pool.ThreadedConnectionPool(1, 10, **PG_CONFIG)
|
||
with get_db() as con:
|
||
with con.cursor() as cur:
|
||
# Enable pgvector extension
|
||
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
|
||
|
||
cur.execute("""
|
||
CREATE TABLE IF NOT EXISTS users (
|
||
user_id TEXT PRIMARY KEY,
|
||
username TEXT,
|
||
full_name TEXT,
|
||
first_seen TIMESTAMPTZ DEFAULT NOW()
|
||
);
|
||
""")
|
||
|
||
cur.execute("""
|
||
CREATE TABLE IF NOT EXISTS memories (
|
||
id SERIAL PRIMARY KEY,
|
||
bot_name TEXT NOT NULL DEFAULT 'default',
|
||
user_id TEXT NOT NULL,
|
||
memory TEXT NOT NULL,
|
||
created TIMESTAMPTZ DEFAULT NOW(),
|
||
FOREIGN KEY (user_id) REFERENCES users(user_id) ON DELETE CASCADE
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_memories_bot_user
|
||
ON memories(bot_name, user_id);
|
||
""")
|
||
|
||
# History table with vector column for semantic search
|
||
cur.execute(f"""
|
||
CREATE TABLE IF NOT EXISTS history (
|
||
id SERIAL PRIMARY KEY,
|
||
bot_name TEXT NOT NULL DEFAULT 'default',
|
||
chat_id TEXT NOT NULL,
|
||
user_id TEXT,
|
||
username TEXT,
|
||
role TEXT NOT NULL,
|
||
content TEXT NOT NULL,
|
||
embedding vector({EMBED_DIM}),
|
||
created TIMESTAMPTZ DEFAULT NOW()
|
||
);
|
||
CREATE INDEX IF NOT EXISTS idx_history_bot_chat
|
||
ON history(bot_name, chat_id, created DESC);
|
||
""")
|
||
|
||
# IVFFlat index for fast vector search (built lazily once rows exist)
|
||
# We create it only if it doesn't exist yet
|
||
cur.execute("""
|
||
SELECT 1 FROM pg_indexes
|
||
WHERE tablename = 'history' AND indexname = 'idx_history_embedding';
|
||
""")
|
||
if not cur.fetchone():
|
||
# Only create after enough rows accumulate — skip silently for now
|
||
# The bot will still work via sequential scan until you run:
|
||
# CREATE INDEX idx_history_embedding ON history
|
||
# USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
|
||
log.info("Vector index not yet created — will use seq scan until you create it manually.")
|
||
|
||
con.commit()
|
||
log.info(f"PostgreSQL + pgvector ready. Bot={BOT_NAME} dim={EMBED_DIM}")
|
||
|
||
|
||
class get_db:
|
||
def __enter__(self):
|
||
self.con = pg_pool_conn.getconn()
|
||
return self.con
|
||
def __exit__(self, exc_type, exc_val, exc_tb):
|
||
pg_pool_conn.putconn(self.con)
|
||
return False # don't suppress exceptions
|
||
|
||
|
||
# ── Users ──────────────────────────────────────
|
||
def upsert_user(user_id: str, username: str, full_name: str):
|
||
with get_db() as con:
|
||
with con.cursor() as cur:
|
||
cur.execute("""
|
||
INSERT INTO users (user_id, username, full_name)
|
||
VALUES (%s, %s, %s)
|
||
ON CONFLICT (user_id) DO UPDATE
|
||
SET username = EXCLUDED.username,
|
||
full_name = EXCLUDED.full_name
|
||
""", (user_id, username, full_name))
|
||
con.commit()
|
||
|
||
|
||
# ── History ────────────────────────────────────
|
||
def store_message_sync(chat_id: str, user_id: str | None, username: str,
|
||
role: str, content: str, embedding: list[float] | None):
|
||
"""Synchronous DB write — called from async context via asyncio.to_thread."""
|
||
with get_db() as con:
|
||
with con.cursor() as cur:
|
||
cur.execute("""
|
||
INSERT INTO history
|
||
(bot_name, chat_id, user_id, username, role, content, embedding)
|
||
VALUES (%s, %s, %s, %s, %s, %s, %s)
|
||
""", (BOT_NAME, chat_id, user_id, username, role, content, embedding))
|
||
|
||
# Trim oldest beyond BOT_HISTORY_SAVE
|
||
cur.execute("""
|
||
DELETE FROM history
|
||
WHERE bot_name = %s AND chat_id = %s AND id NOT IN (
|
||
SELECT id FROM history
|
||
WHERE bot_name = %s AND chat_id = %s
|
||
ORDER BY created DESC LIMIT %s
|
||
)
|
||
""", (BOT_NAME, chat_id, BOT_NAME, chat_id, BOT_HISTORY_SAVE))
|
||
con.commit()
|
||
|
||
|
||
async def store_message(chat_id: str, user_id: str | None, username: str,
|
||
role: str, content: str):
|
||
"""Embed the message then write to DB. Non-blocking — runs embedding + DB in thread."""
|
||
embedding = await get_embedding(content)
|
||
if embedding is None:
|
||
log.warning("Storing message without embedding (Ollama unavailable).")
|
||
await asyncio.to_thread(store_message_sync, chat_id, user_id, username, role, content, embedding)
|
||
|
||
|
||
def get_group_context(chat_id: str, limit: int = BOT_MAX_CONTEXT) -> list[dict]:
|
||
"""Fetch most recent messages for LLM context window."""
|
||
with get_db() as con:
|
||
with con.cursor(cursor_factory=RealDictCursor) as cur:
|
||
cur.execute("""
|
||
SELECT role, username, content, created
|
||
FROM history
|
||
WHERE bot_name = %s AND chat_id = %s
|
||
ORDER BY created DESC LIMIT %s
|
||
""", (BOT_NAME, chat_id, limit))
|
||
rows = cur.fetchall()
|
||
return list(reversed(rows))
|
||
|
||
|
||
def get_history_for_llm(chat_id: str) -> list[dict]:
|
||
rows = get_group_context(chat_id, BOT_MAX_CONTEXT)
|
||
result = []
|
||
for r in rows:
|
||
if r["role"] == "user":
|
||
# Label each user message with their name so the LLM knows who said what
|
||
prefix = f"[{r['username']}] " if r["username"] else ""
|
||
result.append({"role": "user", "content": prefix + r["content"]})
|
||
else:
|
||
# Assistant messages: no prefix — prevents model from echoing "[Manel]" in replies
|
||
result.append({"role": "assistant", "content": r["content"]})
|
||
return result
|
||
|
||
|
||
def vector_search_sync(chat_id: str, query_embedding: list[float], top_k: int) -> list[dict]:
|
||
"""Cosine similarity search against history embeddings."""
|
||
with get_db() as con:
|
||
with con.cursor(cursor_factory=RealDictCursor) as cur:
|
||
cur.execute("""
|
||
SELECT username, content, created,
|
||
1 - (embedding <=> %s::vector) AS similarity
|
||
FROM history
|
||
WHERE bot_name = %s
|
||
AND chat_id = %s
|
||
AND embedding IS NOT NULL
|
||
ORDER BY embedding <=> %s::vector
|
||
LIMIT %s
|
||
""", (query_embedding, BOT_NAME, chat_id, query_embedding, top_k))
|
||
return cur.fetchall()
|
||
|
||
|
||
def clear_history(chat_id: str):
|
||
with get_db() as con:
|
||
with con.cursor() as cur:
|
||
cur.execute(
|
||
"DELETE FROM history WHERE bot_name = %s AND chat_id = %s",
|
||
(BOT_NAME, chat_id)
|
||
)
|
||
con.commit()
|
||
|
||
|
||
# ── Memories ───────────────────────────────────
|
||
def add_memory(user_id: str, memory: str):
|
||
with get_db() as con:
|
||
with con.cursor() as cur:
|
||
cur.execute(
|
||
"INSERT INTO memories (bot_name, user_id, memory) VALUES (%s, %s, %s)",
|
||
(BOT_NAME, user_id, memory)
|
||
)
|
||
con.commit()
|
||
|
||
|
||
def get_memories(user_id: str) -> list[str]:
|
||
with get_db() as con:
|
||
with con.cursor() as cur:
|
||
cur.execute("""
|
||
SELECT memory FROM memories
|
||
WHERE bot_name = %s AND user_id = %s
|
||
ORDER BY created DESC LIMIT 50
|
||
""", (BOT_NAME, user_id))
|
||
return [r[0] for r in cur.fetchall()]
|
||
|
||
|
||
def get_all_memories_indexed(user_id: str) -> list[tuple]:
|
||
with get_db() as con:
|
||
with con.cursor() as cur:
|
||
cur.execute("""
|
||
SELECT id, memory FROM memories
|
||
WHERE bot_name = %s AND user_id = %s
|
||
ORDER BY created DESC LIMIT 50
|
||
""", (BOT_NAME, user_id))
|
||
return cur.fetchall()
|
||
|
||
|
||
def delete_memory_by_id(memory_id: int):
|
||
with get_db() as con:
|
||
with con.cursor() as cur:
|
||
cur.execute(
|
||
"DELETE FROM memories WHERE id = %s AND bot_name = %s",
|
||
(memory_id, BOT_NAME)
|
||
)
|
||
con.commit()
|
||
|
||
|
||
def clear_memories(user_id: str):
|
||
with get_db() as con:
|
||
with con.cursor() as cur:
|
||
cur.execute(
|
||
"DELETE FROM memories WHERE bot_name = %s AND user_id = %s",
|
||
(BOT_NAME, user_id)
|
||
)
|
||
con.commit()
|
||
|
||
|
||
def clear_all_memories():
|
||
with get_db() as con:
|
||
with con.cursor() as cur:
|
||
cur.execute("DELETE FROM memories WHERE bot_name = %s", (BOT_NAME,))
|
||
con.commit()
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# OWNER GUARD
|
||
# ══════════════════════════════════════════════
|
||
def owner_only(func):
|
||
@wraps(func)
|
||
async def wrapper(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
if update.effective_user.id != OWNER_ID:
|
||
log.warning(f"Blocked admin command from user_id={update.effective_user.id}")
|
||
await update.message.reply_text("⛔ Não tens permissão para usar este comando.")
|
||
return
|
||
return await func(update, ctx)
|
||
return wrapper
|
||
|
||
def is_owner(update: Update) -> bool:
|
||
return update.effective_user.id == OWNER_ID
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# WEB SEARCH — DuckDuckGo + Open-Meteo weather
|
||
# ══════════════════════════════════════════════
|
||
WEATHER_KEYWORDS = [
|
||
"tempo", "weather", "temperatura", "chuva", "chover", "sol", "vento", "neve",
|
||
"frio", "calor", "forecast", "previsão", "graus", "humidade", "nublado",
|
||
"trovoada", "granizo", "nevoeiro", "céu", "guarda-chuva",
|
||
"mínima", "máxima", "minima", "maxima", "sensação", "precipitação"
|
||
]
|
||
|
||
# Keywords that require a web search for current time in other timezones
|
||
TIMEZONE_KEYWORDS = [
|
||
"que horas são", "que horas", "horas são", "hora é", "que hora",
|
||
"what time", "horas na", "horas em", "horas no", "horas nos",
|
||
"horas são na", "horas são em", "horas aí", "horas lá",
|
||
"fuso horário", "timezone", "time zone"
|
||
]
|
||
|
||
# Location words that combined with recent time context suggest a timezone followup
|
||
TIMEZONE_PLACES = [
|
||
"islândia", "iceland", "finlândia", "finland", "portugal", "espanha",
|
||
"spain", "frança", "france", "alemanha", "germany", "reino unido",
|
||
"uk", "brasil", "brazil", "estados unidos", "usa", "london", "paris",
|
||
"lisboa", "madrid", "berlim", "berlin", "reykjavik"
|
||
]
|
||
|
||
def is_timezone_query(query: str) -> bool:
|
||
q = query.lower()
|
||
if any(kw in q for kw in TIMEZONE_KEYWORDS):
|
||
return True
|
||
# Short followup like "e na Islândia?" — place name + no other topic
|
||
if any(place in q for place in TIMEZONE_PLACES):
|
||
words = q.split()
|
||
if len(words) <= 6 and not any(kw in q for kw in WEATHER_KEYWORDS):
|
||
return True
|
||
return False
|
||
|
||
NEWS_KEYWORDS = [
|
||
"notícias", "noticias", "noticia", "notícia", "news", "últimas", "ultimas",
|
||
"hoje no jornal", "manchetes", "atualidade", "atualidades", "pesquisa",
|
||
"pesquisar", "procura na internet", "procurar na internet", "pesquisa na internet",
|
||
"o que aconteceu", "o que se passa", "novidades"
|
||
]
|
||
|
||
def is_news_query(query: str) -> bool:
|
||
q = query.lower()
|
||
return any(kw in q for kw in NEWS_KEYWORDS)
|
||
|
||
# WMO weather code → Portuguese description
|
||
WMO_CODES = {
|
||
0: "céu limpo", 1: "principalmente limpo", 2: "parcialmente nublado", 3: "nublado",
|
||
45: "nevoeiro", 48: "nevoeiro com geada",
|
||
51: "chuviscos fracos", 53: "chuviscos moderados", 55: "chuviscos densos",
|
||
61: "chuva fraca", 63: "chuva moderada", 65: "chuva forte",
|
||
71: "neve fraca", 73: "neve moderada", 75: "neve forte",
|
||
77: "grãos de neve", 80: "aguaceiros fracos", 81: "aguaceiros moderados", 82: "aguaceiros violentos",
|
||
85: "aguaceiros de neve fracos", 86: "aguaceiros de neve fortes",
|
||
95: "trovoada", 96: "trovoada com granizo", 99: "trovoada com granizo forte"
|
||
}
|
||
|
||
def is_weather_query(query: str) -> bool:
|
||
return any(kw in query.lower() for kw in WEATHER_KEYWORDS)
|
||
|
||
|
||
async def weather_search(query: str) -> str:
|
||
"""Geocode location then call Open-Meteo for current + forecast. No API key needed."""
|
||
|
||
# Extract location using word-boundary replacement
|
||
stop_words = (
|
||
WEATHER_KEYWORDS
|
||
+ [BOT_TRIGGER, "hoje", "amanhã", "agora", "atual", "atualmente",
|
||
"esta", "semana", "preciso", "precisas", "precisa",
|
||
"qual", "como", "está", "vai", "vais", "vou",
|
||
"o", "a", "os", "as", "em", "de", "do", "da", "dos", "das",
|
||
"para", "por", "com", "sem", "?", "!"]
|
||
)
|
||
location = query.lower()
|
||
for sw in sorted(stop_words, key=len, reverse=True):
|
||
location = re.sub(r"\b" + re.escape(sw) + r"\b", " ", location)
|
||
location = " ".join(location.split()).strip()
|
||
|
||
if not location:
|
||
return None # No location — let LLM answer from context
|
||
|
||
try:
|
||
# Step 1: Geocode with Nominatim
|
||
geo_url = "https://nominatim.openstreetmap.org/search"
|
||
geo_params = {"q": location, "format": "json", "limit": 1}
|
||
headers = {"User-Agent": "ManelBot/1.0"}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(geo_url, params=geo_params, headers=headers,
|
||
timeout=aiohttp.ClientTimeout(total=8)) as r:
|
||
geo = await r.json()
|
||
|
||
if not geo:
|
||
return f"Não encontrei a localização '{location}'."
|
||
|
||
lat = float(geo[0]["lat"])
|
||
lon = float(geo[0]["lon"])
|
||
name = geo[0]["display_name"].split(",")[0]
|
||
|
||
# Step 2: Open-Meteo current + hourly forecast
|
||
om_url = "https://api.open-meteo.com/v1/forecast"
|
||
om_params = (
|
||
f"latitude={lat}&longitude={lon}"
|
||
f"¤t=temperature_2m,apparent_temperature,relative_humidity_2m,"
|
||
f"precipitation,weather_code,wind_speed_10m,wind_gusts_10m"
|
||
f"&hourly=temperature_2m,precipitation_probability,weather_code,wind_gusts_10m"
|
||
f"&forecast_days=1&timezone=auto&models=ecmwf_ifs025"
|
||
)
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(f"{om_url}?{om_params}",
|
||
timeout=aiohttp.ClientTimeout(total=8)) as r:
|
||
if r.status != 200:
|
||
return f"Erro ao obter tempo (HTTP {r.status})."
|
||
data = await r.json()
|
||
|
||
cur = data["current"]
|
||
temp = cur["temperature_2m"]
|
||
feels = cur["apparent_temperature"]
|
||
humidity = cur["relative_humidity_2m"]
|
||
wind = cur["wind_speed_10m"]
|
||
gusts = cur["wind_gusts_10m"]
|
||
wcode = cur["weather_code"]
|
||
desc = WMO_CODES.get(wcode, f"código {wcode}")
|
||
|
||
result = (
|
||
f"🌍 {name}\n"
|
||
f"🌡️ {temp:.0f}°C (sensação {feels:.0f}°C)\n"
|
||
f"🌤️ {desc.capitalize()}\n"
|
||
f"💧 Humidade: {humidity}%\n"
|
||
f"💨 Vento: {wind:.0f} km/h (rajadas até {gusts:.0f} km/h)"
|
||
)
|
||
|
||
# Next few hours
|
||
hours = data["hourly"]["time"]
|
||
temps = data["hourly"]["temperature_2m"]
|
||
probs = data["hourly"]["precipitation_probability"]
|
||
wcodes = data["hourly"]["weather_code"]
|
||
|
||
# Use UTC offset from Open-Meteo response to get correct local hour
|
||
utc_offset = data.get("utc_offset_seconds", 0)
|
||
now_local_hour = (datetime.datetime.now(datetime.timezone.utc) +
|
||
datetime.timedelta(seconds=utc_offset)).hour
|
||
upcoming = [(h, t, p, w) for h, t, p, w in zip(hours, temps, probs, wcodes)
|
||
if int(h[11:13]) > now_local_hour][:4]
|
||
|
||
if upcoming:
|
||
result += "\n\n📅 Próximas horas:"
|
||
for h, t, p, w in upcoming:
|
||
d = WMO_CODES.get(w, "")
|
||
result += f"\n {h[11:16]} — {t:.0f}°C, {p}% chuva, {d}"
|
||
|
||
return result
|
||
|
||
except Exception as e:
|
||
log.warning(f"Open-Meteo error: {e}")
|
||
return "Serviço meteorológico indisponível agora."
|
||
|
||
|
||
async def web_search(query: str) -> str:
|
||
"""Route weather queries to Open-Meteo, everything else to DuckDuckGo."""
|
||
if is_weather_query(query):
|
||
result = await weather_search(query)
|
||
if result is not None:
|
||
return result
|
||
return "No location found. Please answer using weather data already in conversation context."
|
||
|
||
# Use NewsData.io for news queries, DuckDuckGo for everything else
|
||
if is_news_query(query) and NEWSDATA_API_KEY:
|
||
try:
|
||
# Detect if query is about Portugal specifically
|
||
pt_keywords = ["portugal", "nacional", "nacionais", "portuguesa", "português",
|
||
"porto", "lisboa", "governo", "assembleia", "república"]
|
||
is_pt = any(kw in query.lower() for kw in pt_keywords)
|
||
|
||
if is_pt:
|
||
# Portuguese national news — Portuguese language, Portugal only
|
||
params = {
|
||
"apikey": NEWSDATA_API_KEY,
|
||
"language": "pt",
|
||
"country": "pt",
|
||
"size": 6,
|
||
}
|
||
else:
|
||
# International news — English, no country filter, use query keywords
|
||
params = {
|
||
"apikey": NEWSDATA_API_KEY,
|
||
"language": "en",
|
||
"size": 6,
|
||
"q": query,
|
||
}
|
||
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get("https://newsdata.io/api/1/news",
|
||
params=params,
|
||
timeout=aiohttp.ClientTimeout(total=8)) as resp:
|
||
data = await resp.json()
|
||
|
||
if data.get("status") == "success" and data.get("results"):
|
||
lines = []
|
||
for art in data["results"][:5]:
|
||
title = art.get("title", "").strip()
|
||
source = art.get("source_name") or art.get("source_id", "")
|
||
source = source.strip() if source else ""
|
||
pubdate = art.get("pubDate", "")[:10]
|
||
if title:
|
||
lines.append(f"• {title}" + (f" ({source}, {pubdate})" if source else ""))
|
||
return "\n".join(lines) if lines else "Sem notícias encontradas."
|
||
return "Sem notícias encontradas."
|
||
except Exception as e:
|
||
log.warning(f"NewsData.io error: {e}")
|
||
|
||
# DuckDuckGo for non-news queries
|
||
url = f"https://api.duckduckgo.com/?q={quote_plus(query)}&format=json&no_redirect=1&no_html=1&skip_disambig=1"
|
||
try:
|
||
async with aiohttp.ClientSession() as session:
|
||
async with session.get(url, timeout=aiohttp.ClientTimeout(total=8)) as resp:
|
||
data = await resp.json(content_type=None)
|
||
results = []
|
||
if data.get("AbstractText"):
|
||
results.append(data["AbstractText"])
|
||
if data.get("AbstractURL"):
|
||
results.append("Fonte: " + data["AbstractURL"])
|
||
for topic in data.get("RelatedTopics", [])[:4]:
|
||
if isinstance(topic, dict) and topic.get("Text"):
|
||
results.append("• " + topic["Text"])
|
||
return "\n".join(results) if results else "Sem resultados. Responderei com base no meu conhecimento."
|
||
except Exception as e:
|
||
log.warning(f"Web search error: {e}")
|
||
return "Pesquisa web indisponível agora."
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# TRIGGER CHECK
|
||
# ══════════════════════════════════════════════
|
||
def is_mentioned(text: str) -> bool:
|
||
return bool(re.search(re.escape(BOT_TRIGGER), text, re.IGNORECASE))
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# GROQ — tool definitions
|
||
# ══════════════════════════════════════════════
|
||
TOOLS = [
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "web_search",
|
||
"description": "Search the web for current info: weather, news, prices, general questions.",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {"type": "string", "description": "The search query"}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
}
|
||
},
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "search_memory",
|
||
"description": (
|
||
"Semantic search through the full group chat history and long-term memories. "
|
||
"Use for past preferences, facts, or anything said more than a few messages ago. "
|
||
"Returns the most relevant past messages by meaning, not just keywords."
|
||
),
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"query": {
|
||
"type": "string",
|
||
"description": "Natural language description of what to look for"
|
||
}
|
||
},
|
||
"required": ["query"]
|
||
}
|
||
}
|
||
},
|
||
{
|
||
"type": "function",
|
||
"function": {
|
||
"name": "save_memory",
|
||
"description": "Save an important fact about a user for future conversations.",
|
||
"parameters": {
|
||
"type": "object",
|
||
"properties": {
|
||
"user_id": {"type": "string", "description": "Telegram user_id of the person"},
|
||
"memory": {"type": "string", "description": "The fact to save"}
|
||
},
|
||
"required": ["user_id", "memory"]
|
||
}
|
||
}
|
||
}
|
||
]
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# GROQ — main chat function
|
||
# ══════════════════════════════════════════════
|
||
async def chat_with_groq(chat_id: str, user_id: str, user_first_name: str, user_message: str) -> str:
|
||
memories = get_memories(user_id)
|
||
history = get_history_for_llm(chat_id)
|
||
system = build_system_prompt(user_first_name, memories)
|
||
|
||
messages = (
|
||
[{"role": "system", "content": system}]
|
||
+ history
|
||
+ [{"role": "user", "content": f"[{user_first_name}] {user_message}"}]
|
||
)
|
||
|
||
# Force web_search tool when weather, timezone or news keywords detected
|
||
if is_weather_query(user_message):
|
||
tool_choice = {"type": "function", "function": {"name": "web_search"}}
|
||
log.info("Forcing web_search for weather query")
|
||
elif is_timezone_query(user_message):
|
||
tool_choice = {"type": "function", "function": {"name": "web_search"}}
|
||
log.info("Forcing web_search for timezone query")
|
||
elif is_news_query(user_message):
|
||
tool_choice = {"type": "function", "function": {"name": "web_search"}}
|
||
log.info("Forcing web_search for news query")
|
||
# Inject intent into messages so model generates correct query
|
||
intl_keywords = ["internacionais", "internacional", "mundo", "world", "international"]
|
||
if any(kw in user_message.lower() for kw in intl_keywords):
|
||
# Prepend hint to system prompt instead of fake user message
|
||
messages[0]["content"] += "\n\nNEXT TOOL CALL: Search for INTERNATIONAL world news in English. Do NOT include Portugal in query."
|
||
else:
|
||
messages[0]["content"] += "\n\nNEXT TOOL CALL: Search for Portuguese national news. Use query: noticias portugal hoje."
|
||
else:
|
||
tool_choice = "auto"
|
||
|
||
response = llm_client.chat.completions.create(
|
||
model=LLM_MODEL,
|
||
messages=messages,
|
||
tools=TOOLS,
|
||
tool_choice=tool_choice,
|
||
max_tokens=1024,
|
||
temperature=0.7,
|
||
)
|
||
|
||
msg = response.choices[0].message
|
||
tool_calls = msg.tool_calls or []
|
||
|
||
if not tool_calls:
|
||
reply = msg.content or ""
|
||
# If forced tool_choice but model returned nothing, call weather directly
|
||
if not reply.strip() or reply.strip() == "…":
|
||
if is_weather_query(user_message):
|
||
log.info("Model skipped forced tool — calling weather_search directly")
|
||
weather_result = await weather_search(user_message)
|
||
if weather_result:
|
||
messages.append({"role": "user", "content": f"[weather data]\n{weather_result}\n\nAnswer the user naturally in Portuguese based on this data."})
|
||
r2 = llm_client.chat.completions.create(
|
||
model=LLM_MODEL, messages=messages, max_tokens=1024, temperature=0.7
|
||
)
|
||
return r2.choices[0].message.content or "…"
|
||
return reply or "…"
|
||
|
||
messages.append({
|
||
"role": "assistant",
|
||
"content": msg.content or "",
|
||
"tool_calls": [
|
||
{"id": tc.id, "type": "function",
|
||
"function": {"name": tc.function.name, "arguments": tc.function.arguments}}
|
||
for tc in tool_calls
|
||
]
|
||
})
|
||
|
||
for tc in tool_calls:
|
||
fn = tc.function.name
|
||
args = json.loads(tc.function.arguments)
|
||
log.info(f"Tool: {fn}({args})")
|
||
|
||
if fn == "web_search":
|
||
result = await web_search(args["query"])
|
||
|
||
elif fn == "search_memory":
|
||
query = args["query"]
|
||
found = []
|
||
|
||
# ── 1. Semantic search over group history ──────────────
|
||
query_emb = await get_embedding(query)
|
||
if query_emb:
|
||
rows = await asyncio.to_thread(
|
||
vector_search_sync, chat_id, query_emb, VECTOR_TOP_K
|
||
)
|
||
for row in rows:
|
||
sim = float(row["similarity"])
|
||
if sim < 0.3: # skip very low similarity
|
||
continue
|
||
ts = row["created"].strftime("%Y-%m-%d %H:%M") if row.get("created") else "?"
|
||
found.append(
|
||
f"[{ts}] {row.get('username', '?')} (sim={sim:.2f}): {row['content']}"
|
||
)
|
||
else:
|
||
log.warning("search_memory: no embedding — skipping vector search")
|
||
|
||
# ── 2. Always include saved long-term memories ─────────
|
||
for mem in get_memories(user_id):
|
||
found.append(f"[memory] {mem}")
|
||
|
||
if found:
|
||
result = "\n".join(found)
|
||
else:
|
||
result = "Nothing relevant found in memory."
|
||
|
||
elif fn == "save_memory":
|
||
target_uid = args.get("user_id", user_id)
|
||
add_memory(target_uid, args["memory"])
|
||
result = f"Memory saved: {args['memory']}"
|
||
|
||
else:
|
||
result = "Unknown tool."
|
||
|
||
messages.append({"role": "tool", "content": result, "tool_call_id": tc.id})
|
||
|
||
response2 = llm_client.chat.completions.create(
|
||
model=LLM_MODEL,
|
||
messages=messages,
|
||
max_tokens=1024,
|
||
temperature=0.7,
|
||
)
|
||
return response2.choices[0].message.content or "…"
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# TELEGRAM HANDLERS
|
||
# ══════════════════════════════════════════════
|
||
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
upsert_user(str(update.effective_user.id), update.effective_user.username or "", update.effective_user.full_name)
|
||
owner_hint = "\n\n🔑 És o owner deste bot." if is_owner(update) else ""
|
||
await update.message.reply_text(
|
||
f"👋 Olá {update.effective_user.first_name}! Sou o **{BOT_NAME}** 😊\n\n"
|
||
f"Menciona o meu nome numa mensagem para eu responder!\n\n"
|
||
f"Comandos:\n"
|
||
f"/memorias – ver as tuas memórias\n"
|
||
f"/esquecer `<n>` – apagar memória nº n\n"
|
||
f"/ajuda – mostrar esta mensagem\n"
|
||
f"\n*Comandos de owner:*\n"
|
||
f"/apagarmem `<user_id>` – apagar memórias de um utilizador\n"
|
||
f"/apagartudo – apagar TODAS as memórias do bot\n"
|
||
f"/novachat – limpar histórico deste grupo"
|
||
+ owner_hint,
|
||
parse_mode="Markdown"
|
||
)
|
||
|
||
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
await cmd_start(update, ctx)
|
||
|
||
async def cmd_memories(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
uid = str(update.effective_user.id)
|
||
rows = get_all_memories_indexed(uid)
|
||
if not rows:
|
||
await update.message.reply_text("🧠 Ainda não tenho memórias tuas. Fala comigo!")
|
||
return
|
||
text = f"🧠 **As tuas memórias ({BOT_NAME}):**\n\n" + "\n".join(f"{i+1}. {m}" for i, (_, m) in enumerate(rows))
|
||
text += "\n\nUsa /esquecer `<número>` para apagar uma."
|
||
await update.message.reply_text(text, parse_mode="Markdown")
|
||
|
||
async def cmd_forget(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
uid = str(update.effective_user.id)
|
||
args = ctx.args
|
||
if not args or not args[0].isdigit():
|
||
await update.message.reply_text("Uso: /esquecer `<número>` (obtém números com /memorias)", parse_mode="Markdown")
|
||
return
|
||
rows = get_all_memories_indexed(uid)
|
||
idx = int(args[0]) - 1
|
||
if 0 <= idx < len(rows):
|
||
delete_memory_by_id(rows[idx][0])
|
||
await update.message.reply_text("✅ Memória apagada.")
|
||
else:
|
||
await update.message.reply_text("❌ Número inválido.")
|
||
|
||
@owner_only
|
||
async def cmd_clear_user_memories(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
if not ctx.args:
|
||
await update.message.reply_text("Uso: /apagarmem `<user_id>`", parse_mode="Markdown")
|
||
return
|
||
clear_memories(ctx.args[0])
|
||
await update.message.reply_text(f"🗑️ Memórias do utilizador `{ctx.args[0]}` apagadas.", parse_mode="Markdown")
|
||
|
||
@owner_only
|
||
async def cmd_clearall(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
clear_all_memories()
|
||
await update.message.reply_text(f"🗑️ Todas as memórias do {BOT_NAME} foram apagadas.")
|
||
|
||
@owner_only
|
||
async def cmd_newchat(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
clear_history(str(update.effective_chat.id))
|
||
await update.message.reply_text("🔄 Histórico do grupo limpo! (Memórias de longo prazo mantidas.)")
|
||
|
||
|
||
async def handle_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
|
||
if not update.message or not update.message.text:
|
||
return
|
||
|
||
user = update.effective_user
|
||
uid = str(user.id)
|
||
chat_id = str(update.effective_chat.id)
|
||
text = update.message.text
|
||
fname = user.first_name or "utilizador"
|
||
|
||
upsert_user(uid, user.username or "", user.full_name or fname)
|
||
|
||
# Always store + embed every message, even when bot stays silent
|
||
await store_message(chat_id, uid, fname, "user", text)
|
||
|
||
if not is_mentioned(text):
|
||
log.debug(f"[{BOT_NAME}] Stored silently: {text[:60]}")
|
||
return
|
||
|
||
await update.message.chat.send_action("typing")
|
||
|
||
error_reply = False
|
||
try:
|
||
reply = await chat_with_groq(chat_id, uid, fname, text)
|
||
except Exception as e:
|
||
log.error(f"Groq error: {e}\n{traceback.format_exc()}")
|
||
reply = "⚠️ Ocorreu um erro. Tenta novamente."
|
||
error_reply = True
|
||
|
||
if not error_reply:
|
||
await store_message(chat_id, None, BOT_NAME, "assistant", reply)
|
||
|
||
for chunk in [reply[i:i+4096] for i in range(0, len(reply), 4096)]:
|
||
if not chunk.strip():
|
||
continue
|
||
try:
|
||
await update.message.chat.send_message(chunk, parse_mode="Markdown")
|
||
except Exception:
|
||
try:
|
||
await update.message.chat.send_message(chunk)
|
||
except Exception:
|
||
pass
|
||
|
||
|
||
# ══════════════════════════════════════════════
|
||
# MAIN
|
||
# ══════════════════════════════════════════════
|
||
def main():
|
||
if not TELEGRAM_BOT_TOKEN:
|
||
raise ValueError("TELEGRAM_BOT_TOKEN not set — check .env_credentials")
|
||
if LLM_PROVIDER == "groq" and not GROQ_API_KEY:
|
||
raise ValueError("GROQ_API_KEY not set — check .env_credentials")
|
||
if LLM_PROVIDER == "openrouter" and not OPENROUTER_API_KEY:
|
||
raise ValueError("OPENROUTER_API_KEY not set — check .env_credentials")
|
||
if OWNER_ID == 0:
|
||
log.warning("OWNER_TELEGRAM_ID not set — admin commands blocked for everyone.")
|
||
|
||
init_db()
|
||
log.info(f"Starting {BOT_NAME} | trigger='{BOT_TRIGGER}' | provider={LLM_PROVIDER} | model={LLM_MODEL} | embed={EMBED_MODEL}@{OLLAMA_URL}")
|
||
|
||
app = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
|
||
|
||
app.add_handler(CommandHandler("start", cmd_start))
|
||
app.add_handler(CommandHandler("ajuda", cmd_help))
|
||
app.add_handler(CommandHandler("help", cmd_help))
|
||
app.add_handler(CommandHandler("memorias", cmd_memories))
|
||
app.add_handler(CommandHandler("esquecer", cmd_forget))
|
||
app.add_handler(CommandHandler("apagarmem", cmd_clear_user_memories))
|
||
app.add_handler(CommandHandler("apagartudo", cmd_clearall))
|
||
app.add_handler(CommandHandler("novachat", cmd_newchat))
|
||
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
|
||
|
||
app.run_polling(allowed_updates=Update.ALL_TYPES)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|