Files
Telegram_AI_bot/bot.py
2026-04-13 22:46:12 +00:00

1002 lines
42 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
Telegram AI Chatbot — Multi-persona, owner-protected, semantic memory
Powered by Groq API + PostgreSQL 16 + pgvector + Ollama embeddings
Config files:
.env — persona, bot name, trigger, rules (different per deployment)
.env_credentials — secret keys, DB and Ollama credentials
"""
import os
import re
import json
import logging
import asyncio
import aiohttp
from urllib.parse import quote_plus
import datetime
import zoneinfo
import traceback
from functools import wraps
from dotenv import load_dotenv
load_dotenv(".env_credentials")
load_dotenv(".env")
import psycopg2
from psycopg2.extras import RealDictCursor
from psycopg2 import pool as pg_pool
from telegram import Update
from telegram.ext import (
Application, CommandHandler, MessageHandler,
filters, ContextTypes
)
from groq import Groq
try:
from openai import OpenAI
except ImportError:
OpenAI = None
# ══════════════════════════════════════════════
# CONFIG
# ══════════════════════════════════════════════
# ── Credentials (.env_credentials) ────────────
TELEGRAM_BOT_TOKEN = os.getenv("TELEGRAM_BOT_TOKEN")
GROQ_API_KEY = os.getenv("GROQ_API_KEY", "")
OPENROUTER_API_KEY = os.getenv("OPENROUTER_API_KEY", "")
LLM_PROVIDER = os.getenv("LLM_PROVIDER", "groq")
GROQ_MODEL = os.getenv("GROQ_MODEL", "llama-3.3-70b-versatile")
OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "deepseek/deepseek-chat-v3-0324")
PG_CONFIG = {
"host": os.getenv("PG_HOST", "localhost"),
"port": int(os.getenv("PG_PORT", "5432")),
"dbname": os.getenv("PG_DB", "manel_bot"),
"user": os.getenv("PG_USER", "postgres"),
"password": os.getenv("PG_PASSWORD", ""),
}
OWNER_ID = int(os.getenv("OWNER_TELEGRAM_ID", "0"))
# NewsData.io
NEWSDATA_API_KEY = os.getenv("NEWSDATA_API_KEY", "")
# Ollama endpoint and embedding model
OLLAMA_URL = os.getenv("OLLAMA_URL", "http://localhost:11434")
EMBED_MODEL = os.getenv("EMBED_MODEL", "nomic-embed-text-v2-moe")
# nomic-embed-text-v2-moe output dimension — update if you switch models
EMBED_DIM = int(os.getenv("EMBED_DIM", "768"))
# How many semantically similar results to return per search_memory call
VECTOR_TOP_K = int(os.getenv("VECTOR_TOP_K", "8"))
# ── Persona (.env) ─────────────────────────────
BOT_NAME = os.getenv("BOT_NAME", "Manel")
BOT_TRIGGER = os.getenv("BOT_TRIGGER", "manel")
BOT_MAX_CONTEXT = int(os.getenv("BOT_MAX_CONTEXT", "40"))
BOT_HISTORY_SAVE = int(os.getenv("BOT_HISTORY_SAVE", "500"))
BOT_ROLE = os.getenv("BOT_ROLE", "")
BOT_RULES = os.getenv("BOT_RULES", "")
BOT_TOOL_RULES = os.getenv("BOT_TOOL_RULES", "")
BOT_MEMORY_RULES = os.getenv("BOT_MEMORY_RULES", "")
BOT_EXAMPLES = os.getenv("BOT_EXAMPLES", "")
BOT_TIMEZONE = os.getenv("BOT_TIMEZONE", "Europe/Lisbon") # home timezone for date/time display
# ─────────────────────────────────────────────
logging.basicConfig(level=logging.INFO, format="%(asctime)s %(levelname)s %(message)s")
log = logging.getLogger(__name__)
if LLM_PROVIDER == "openrouter":
if OpenAI is None:
raise ImportError("openai package not installed. Run: pip install openai")
llm_client = OpenAI(
api_key=OPENROUTER_API_KEY,
base_url="https://openrouter.ai/api/v1",
default_headers={"HTTP-Referer": "https://github.com/manel-bot"}
)
LLM_MODEL = OPENROUTER_MODEL
else:
llm_client = Groq(api_key=GROQ_API_KEY)
LLM_MODEL = GROQ_MODEL
pg_pool_conn: pg_pool.ThreadedConnectionPool | None = None
# ══════════════════════════════════════════════
# SYSTEM PROMPT BUILDER
# ══════════════════════════════════════════════
def build_system_prompt(user_first_name: str, memories: list[str]) -> str:
def env_to_bullets(raw: str) -> str:
return "\n".join("- " + l.strip() for l in raw.splitlines() if l.strip())
memory_block = ""
if memories:
mem_lines = "\n".join("- " + m for m in memories)
memory_block = "\n\n[LONG-TERM MEMORY about users in this group]\n" + mem_lines
sections = []
if BOT_ROLE: sections.append("# Role\n" + BOT_ROLE)
if BOT_RULES: sections.append("# Rules\n" + env_to_bullets(BOT_RULES))
if BOT_TOOL_RULES: sections.append("# Tool Selection Strategy\n"+ env_to_bullets(BOT_TOOL_RULES))
if BOT_MEMORY_RULES: sections.append("# Memory Rules\n" + env_to_bullets(BOT_MEMORY_RULES))
if BOT_EXAMPLES: sections.append("# Examples\n" + BOT_EXAMPLES)
try:
home_tz = zoneinfo.ZoneInfo(BOT_TIMEZONE)
except Exception:
home_tz = datetime.timezone.utc
now_home = datetime.datetime.now(datetime.timezone.utc).astimezone(home_tz)
date_str = now_home.strftime("%A, %d %B %Y")
time_str = now_home.strftime("%H:%M")
time_block = (
f"Today is: {date_str}\n"
f"Current time ({BOT_TIMEZONE}): {time_str}\n"
f"For other timezones, use the web_search tool."
)
sections.append("# Current Date and Time\n" + time_block)
sections.append(
"# Current User\n"
"The user who mentioned you is: " + user_first_name
+ memory_block
)
return "\n\n".join(sections)
# ══════════════════════════════════════════════
# OLLAMA EMBEDDINGS
# ══════════════════════════════════════════════
async def get_embedding(text: str) -> list[float] | None:
"""Call local Ollama to embed text. Returns list of floats or None on error."""
url = OLLAMA_URL.rstrip("/") + "/api/embed"
payload = {"model": EMBED_MODEL, "input": text}
try:
async with aiohttp.ClientSession() as session:
async with session.post(url, json=payload, timeout=aiohttp.ClientTimeout(total=3)) as resp:
if resp.status != 200:
log.warning(f"Ollama embed HTTP {resp.status}: {await resp.text()}")
return None
data = await resp.json()
# Ollama /api/embed returns {"embeddings": [[...]]}
embeddings = data.get("embeddings")
if embeddings and len(embeddings) > 0:
return embeddings[0]
log.warning(f"Unexpected Ollama response: {data}")
return None
except Exception as e:
log.error(f"Ollama embedding error: {e}")
return None
# ══════════════════════════════════════════════
# DATABASE — PostgreSQL 16 + pgvector
# ══════════════════════════════════════════════
def init_db():
global pg_pool_conn
pg_pool_conn = pg_pool.ThreadedConnectionPool(1, 10, **PG_CONFIG)
with get_db() as con:
with con.cursor() as cur:
# Enable pgvector extension
cur.execute("CREATE EXTENSION IF NOT EXISTS vector;")
cur.execute("""
CREATE TABLE IF NOT EXISTS users (
user_id TEXT PRIMARY KEY,
username TEXT,
full_name TEXT,
first_seen TIMESTAMPTZ DEFAULT NOW()
);
""")
cur.execute("""
CREATE TABLE IF NOT EXISTS memories (
id SERIAL PRIMARY KEY,
bot_name TEXT NOT NULL DEFAULT 'default',
user_id TEXT NOT NULL,
memory TEXT NOT NULL,
created TIMESTAMPTZ DEFAULT NOW(),
FOREIGN KEY (user_id) REFERENCES users(user_id) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS idx_memories_bot_user
ON memories(bot_name, user_id);
""")
# History table with vector column for semantic search
cur.execute(f"""
CREATE TABLE IF NOT EXISTS history (
id SERIAL PRIMARY KEY,
bot_name TEXT NOT NULL DEFAULT 'default',
chat_id TEXT NOT NULL,
user_id TEXT,
username TEXT,
role TEXT NOT NULL,
content TEXT NOT NULL,
embedding vector({EMBED_DIM}),
created TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX IF NOT EXISTS idx_history_bot_chat
ON history(bot_name, chat_id, created DESC);
""")
# IVFFlat index for fast vector search (built lazily once rows exist)
# We create it only if it doesn't exist yet
cur.execute("""
SELECT 1 FROM pg_indexes
WHERE tablename = 'history' AND indexname = 'idx_history_embedding';
""")
if not cur.fetchone():
# Only create after enough rows accumulate — skip silently for now
# The bot will still work via sequential scan until you run:
# CREATE INDEX idx_history_embedding ON history
# USING ivfflat (embedding vector_cosine_ops) WITH (lists = 100);
log.info("Vector index not yet created — will use seq scan until you create it manually.")
con.commit()
log.info(f"PostgreSQL + pgvector ready. Bot={BOT_NAME} dim={EMBED_DIM}")
class get_db:
def __enter__(self):
self.con = pg_pool_conn.getconn()
return self.con
def __exit__(self, exc_type, exc_val, exc_tb):
pg_pool_conn.putconn(self.con)
return False # don't suppress exceptions
# ── Users ──────────────────────────────────────
def upsert_user(user_id: str, username: str, full_name: str):
with get_db() as con:
with con.cursor() as cur:
cur.execute("""
INSERT INTO users (user_id, username, full_name)
VALUES (%s, %s, %s)
ON CONFLICT (user_id) DO UPDATE
SET username = EXCLUDED.username,
full_name = EXCLUDED.full_name
""", (user_id, username, full_name))
con.commit()
# ── History ────────────────────────────────────
def store_message_sync(chat_id: str, user_id: str | None, username: str,
role: str, content: str, embedding: list[float] | None):
"""Synchronous DB write — called from async context via asyncio.to_thread."""
with get_db() as con:
with con.cursor() as cur:
cur.execute("""
INSERT INTO history
(bot_name, chat_id, user_id, username, role, content, embedding)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (BOT_NAME, chat_id, user_id, username, role, content, embedding))
# Trim oldest beyond BOT_HISTORY_SAVE
cur.execute("""
DELETE FROM history
WHERE bot_name = %s AND chat_id = %s AND id NOT IN (
SELECT id FROM history
WHERE bot_name = %s AND chat_id = %s
ORDER BY created DESC LIMIT %s
)
""", (BOT_NAME, chat_id, BOT_NAME, chat_id, BOT_HISTORY_SAVE))
con.commit()
async def store_message(chat_id: str, user_id: str | None, username: str,
role: str, content: str):
"""Embed the message then write to DB. Non-blocking — runs embedding + DB in thread."""
embedding = await get_embedding(content)
if embedding is None:
log.warning("Storing message without embedding (Ollama unavailable).")
await asyncio.to_thread(store_message_sync, chat_id, user_id, username, role, content, embedding)
def get_group_context(chat_id: str, limit: int = BOT_MAX_CONTEXT) -> list[dict]:
"""Fetch most recent messages for LLM context window."""
with get_db() as con:
with con.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT role, username, content, created
FROM history
WHERE bot_name = %s AND chat_id = %s
ORDER BY created DESC LIMIT %s
""", (BOT_NAME, chat_id, limit))
rows = cur.fetchall()
return list(reversed(rows))
def get_history_for_llm(chat_id: str) -> list[dict]:
rows = get_group_context(chat_id, BOT_MAX_CONTEXT)
result = []
for r in rows:
if r["role"] == "user":
# Label each user message with their name so the LLM knows who said what
prefix = f"[{r['username']}] " if r["username"] else ""
result.append({"role": "user", "content": prefix + r["content"]})
else:
# Assistant messages: no prefix — prevents model from echoing "[Manel]" in replies
result.append({"role": "assistant", "content": r["content"]})
return result
def vector_search_sync(chat_id: str, query_embedding: list[float], top_k: int) -> list[dict]:
"""Cosine similarity search against history embeddings."""
with get_db() as con:
with con.cursor(cursor_factory=RealDictCursor) as cur:
cur.execute("""
SELECT username, content, created,
1 - (embedding <=> %s::vector) AS similarity
FROM history
WHERE bot_name = %s
AND chat_id = %s
AND embedding IS NOT NULL
ORDER BY embedding <=> %s::vector
LIMIT %s
""", (query_embedding, BOT_NAME, chat_id, query_embedding, top_k))
return cur.fetchall()
def clear_history(chat_id: str):
with get_db() as con:
with con.cursor() as cur:
cur.execute(
"DELETE FROM history WHERE bot_name = %s AND chat_id = %s",
(BOT_NAME, chat_id)
)
con.commit()
# ── Memories ───────────────────────────────────
def add_memory(user_id: str, memory: str):
with get_db() as con:
with con.cursor() as cur:
cur.execute(
"INSERT INTO memories (bot_name, user_id, memory) VALUES (%s, %s, %s)",
(BOT_NAME, user_id, memory)
)
con.commit()
def get_memories(user_id: str) -> list[str]:
with get_db() as con:
with con.cursor() as cur:
cur.execute("""
SELECT memory FROM memories
WHERE bot_name = %s AND user_id = %s
ORDER BY created DESC LIMIT 50
""", (BOT_NAME, user_id))
return [r[0] for r in cur.fetchall()]
def get_all_memories_indexed(user_id: str) -> list[tuple]:
with get_db() as con:
with con.cursor() as cur:
cur.execute("""
SELECT id, memory FROM memories
WHERE bot_name = %s AND user_id = %s
ORDER BY created DESC LIMIT 50
""", (BOT_NAME, user_id))
return cur.fetchall()
def delete_memory_by_id(memory_id: int):
with get_db() as con:
with con.cursor() as cur:
cur.execute(
"DELETE FROM memories WHERE id = %s AND bot_name = %s",
(memory_id, BOT_NAME)
)
con.commit()
def clear_memories(user_id: str):
with get_db() as con:
with con.cursor() as cur:
cur.execute(
"DELETE FROM memories WHERE bot_name = %s AND user_id = %s",
(BOT_NAME, user_id)
)
con.commit()
def clear_all_memories():
with get_db() as con:
with con.cursor() as cur:
cur.execute("DELETE FROM memories WHERE bot_name = %s", (BOT_NAME,))
con.commit()
# ══════════════════════════════════════════════
# OWNER GUARD
# ══════════════════════════════════════════════
def owner_only(func):
@wraps(func)
async def wrapper(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if update.effective_user.id != OWNER_ID:
log.warning(f"Blocked admin command from user_id={update.effective_user.id}")
await update.message.reply_text("⛔ Não tens permissão para usar este comando.")
return
return await func(update, ctx)
return wrapper
def is_owner(update: Update) -> bool:
return update.effective_user.id == OWNER_ID
# ══════════════════════════════════════════════
# WEB SEARCH — DuckDuckGo + Open-Meteo weather
# ══════════════════════════════════════════════
WEATHER_KEYWORDS = [
"tempo", "weather", "temperatura", "chuva", "chover", "sol", "vento", "neve",
"frio", "calor", "forecast", "previsão", "graus", "humidade", "nublado",
"trovoada", "granizo", "nevoeiro", "céu", "guarda-chuva",
"mínima", "máxima", "minima", "maxima", "sensação", "precipitação"
]
# Keywords that require a web search for current time in other timezones
TIMEZONE_KEYWORDS = [
"que horas são", "que horas", "horas são", "hora é", "que hora",
"what time", "horas na", "horas em", "horas no", "horas nos",
"horas são na", "horas são em", "horas aí", "horas lá",
"fuso horário", "timezone", "time zone"
]
# Location words that combined with recent time context suggest a timezone followup
TIMEZONE_PLACES = [
"islândia", "iceland", "finlândia", "finland", "portugal", "espanha",
"spain", "frança", "france", "alemanha", "germany", "reino unido",
"uk", "brasil", "brazil", "estados unidos", "usa", "london", "paris",
"lisboa", "madrid", "berlim", "berlin", "reykjavik"
]
def is_timezone_query(query: str) -> bool:
q = query.lower()
if any(kw in q for kw in TIMEZONE_KEYWORDS):
return True
# Short followup like "e na Islândia?" — place name + no other topic
if any(place in q for place in TIMEZONE_PLACES):
words = q.split()
if len(words) <= 6 and not any(kw in q for kw in WEATHER_KEYWORDS):
return True
return False
NEWS_KEYWORDS = [
"notícias", "noticias", "noticia", "notícia", "news", "últimas", "ultimas",
"hoje no jornal", "manchetes", "atualidade", "atualidades", "pesquisa",
"pesquisar", "procura na internet", "procurar na internet", "pesquisa na internet",
"o que aconteceu", "o que se passa", "novidades"
]
def is_news_query(query: str) -> bool:
q = query.lower()
return any(kw in q for kw in NEWS_KEYWORDS)
# WMO weather code → Portuguese description
WMO_CODES = {
0: "céu limpo", 1: "principalmente limpo", 2: "parcialmente nublado", 3: "nublado",
45: "nevoeiro", 48: "nevoeiro com geada",
51: "chuviscos fracos", 53: "chuviscos moderados", 55: "chuviscos densos",
61: "chuva fraca", 63: "chuva moderada", 65: "chuva forte",
71: "neve fraca", 73: "neve moderada", 75: "neve forte",
77: "grãos de neve", 80: "aguaceiros fracos", 81: "aguaceiros moderados", 82: "aguaceiros violentos",
85: "aguaceiros de neve fracos", 86: "aguaceiros de neve fortes",
95: "trovoada", 96: "trovoada com granizo", 99: "trovoada com granizo forte"
}
def is_weather_query(query: str) -> bool:
return any(kw in query.lower() for kw in WEATHER_KEYWORDS)
async def weather_search(query: str) -> str:
"""Geocode location then call Open-Meteo for current + forecast. No API key needed."""
# Extract location using word-boundary replacement
stop_words = (
WEATHER_KEYWORDS
+ [BOT_TRIGGER, "hoje", "amanhã", "agora", "atual", "atualmente",
"esta", "semana", "preciso", "precisas", "precisa",
"qual", "como", "está", "vai", "vais", "vou",
"o", "a", "os", "as", "em", "de", "do", "da", "dos", "das",
"para", "por", "com", "sem", "?", "!"]
)
location = query.lower()
for sw in sorted(stop_words, key=len, reverse=True):
location = re.sub(r"\b" + re.escape(sw) + r"\b", " ", location)
location = " ".join(location.split()).strip()
if not location:
return None # No location — let LLM answer from context
try:
# Step 1: Geocode with Nominatim
geo_url = "https://nominatim.openstreetmap.org/search"
geo_params = {"q": location, "format": "json", "limit": 1}
headers = {"User-Agent": "ManelBot/1.0"}
async with aiohttp.ClientSession() as session:
async with session.get(geo_url, params=geo_params, headers=headers,
timeout=aiohttp.ClientTimeout(total=8)) as r:
geo = await r.json()
if not geo:
return f"Não encontrei a localização '{location}'."
lat = float(geo[0]["lat"])
lon = float(geo[0]["lon"])
name = geo[0]["display_name"].split(",")[0]
# Step 2: Open-Meteo current + hourly forecast
om_url = "https://api.open-meteo.com/v1/forecast"
om_params = (
f"latitude={lat}&longitude={lon}"
f"&current=temperature_2m,apparent_temperature,relative_humidity_2m,"
f"precipitation,weather_code,wind_speed_10m,wind_gusts_10m"
f"&hourly=temperature_2m,precipitation_probability,weather_code,wind_gusts_10m"
f"&forecast_days=1&timezone=auto&models=ecmwf_ifs025"
)
async with aiohttp.ClientSession() as session:
async with session.get(f"{om_url}?{om_params}",
timeout=aiohttp.ClientTimeout(total=8)) as r:
if r.status != 200:
return f"Erro ao obter tempo (HTTP {r.status})."
data = await r.json()
cur = data["current"]
temp = cur["temperature_2m"]
feels = cur["apparent_temperature"]
humidity = cur["relative_humidity_2m"]
wind = cur["wind_speed_10m"]
gusts = cur["wind_gusts_10m"]
wcode = cur["weather_code"]
desc = WMO_CODES.get(wcode, f"código {wcode}")
result = (
f"🌍 {name}\n"
f"🌡️ {temp:.0f}°C (sensação {feels:.0f}°C)\n"
f"🌤️ {desc.capitalize()}\n"
f"💧 Humidade: {humidity}%\n"
f"💨 Vento: {wind:.0f} km/h (rajadas até {gusts:.0f} km/h)"
)
# Next few hours
hours = data["hourly"]["time"]
temps = data["hourly"]["temperature_2m"]
probs = data["hourly"]["precipitation_probability"]
wcodes = data["hourly"]["weather_code"]
# Use UTC offset from Open-Meteo response to get correct local hour
utc_offset = data.get("utc_offset_seconds", 0)
now_local_hour = (datetime.datetime.now(datetime.timezone.utc) +
datetime.timedelta(seconds=utc_offset)).hour
upcoming = [(h, t, p, w) for h, t, p, w in zip(hours, temps, probs, wcodes)
if int(h[11:13]) > now_local_hour][:4]
if upcoming:
result += "\n\n📅 Próximas horas:"
for h, t, p, w in upcoming:
d = WMO_CODES.get(w, "")
result += f"\n {h[11:16]}{t:.0f}°C, {p}% chuva, {d}"
return result
except Exception as e:
log.warning(f"Open-Meteo error: {e}")
return "Serviço meteorológico indisponível agora."
async def web_search(query: str) -> str:
"""Route weather queries to Open-Meteo, everything else to DuckDuckGo."""
if is_weather_query(query):
result = await weather_search(query)
if result is not None:
return result
return "No location found. Please answer using weather data already in conversation context."
# Use NewsData.io for news queries, DuckDuckGo for everything else
if is_news_query(query) and NEWSDATA_API_KEY:
try:
# Detect if query is about Portugal specifically
pt_keywords = ["portugal", "nacional", "nacionais", "portuguesa", "português",
"porto", "lisboa", "governo", "assembleia", "república"]
is_pt = any(kw in query.lower() for kw in pt_keywords)
if is_pt:
# Portuguese national news — Portuguese language, Portugal only
params = {
"apikey": NEWSDATA_API_KEY,
"language": "pt",
"country": "pt",
"size": 6,
}
else:
# International news — English, no country filter, use query keywords
params = {
"apikey": NEWSDATA_API_KEY,
"language": "en",
"size": 6,
"q": query,
}
async with aiohttp.ClientSession() as session:
async with session.get("https://newsdata.io/api/1/news",
params=params,
timeout=aiohttp.ClientTimeout(total=8)) as resp:
data = await resp.json()
if data.get("status") == "success" and data.get("results"):
lines = []
for art in data["results"][:5]:
title = art.get("title", "").strip()
source = art.get("source_name") or art.get("source_id", "")
source = source.strip() if source else ""
pubdate = art.get("pubDate", "")[:10]
if title:
lines.append(f"{title}" + (f" ({source}, {pubdate})" if source else ""))
return "\n".join(lines) if lines else "Sem notícias encontradas."
return "Sem notícias encontradas."
except Exception as e:
log.warning(f"NewsData.io error: {e}")
# DuckDuckGo for non-news queries
url = f"https://api.duckduckgo.com/?q={quote_plus(query)}&format=json&no_redirect=1&no_html=1&skip_disambig=1"
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=8)) as resp:
data = await resp.json(content_type=None)
results = []
if data.get("AbstractText"):
results.append(data["AbstractText"])
if data.get("AbstractURL"):
results.append("Fonte: " + data["AbstractURL"])
for topic in data.get("RelatedTopics", [])[:4]:
if isinstance(topic, dict) and topic.get("Text"):
results.append("" + topic["Text"])
return "\n".join(results) if results else "Sem resultados. Responderei com base no meu conhecimento."
except Exception as e:
log.warning(f"Web search error: {e}")
return "Pesquisa web indisponível agora."
# ══════════════════════════════════════════════
# TRIGGER CHECK
# ══════════════════════════════════════════════
def is_mentioned(text: str) -> bool:
return bool(re.search(re.escape(BOT_TRIGGER), text, re.IGNORECASE))
# ══════════════════════════════════════════════
# GROQ — tool definitions
# ══════════════════════════════════════════════
TOOLS = [
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web for current info: weather, news, prices, general questions.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string", "description": "The search query"}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "search_memory",
"description": (
"Semantic search through the full group chat history and long-term memories. "
"Use for past preferences, facts, or anything said more than a few messages ago. "
"Returns the most relevant past messages by meaning, not just keywords."
),
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Natural language description of what to look for"
}
},
"required": ["query"]
}
}
},
{
"type": "function",
"function": {
"name": "save_memory",
"description": "Save an important fact about a user for future conversations.",
"parameters": {
"type": "object",
"properties": {
"user_id": {"type": "string", "description": "Telegram user_id of the person"},
"memory": {"type": "string", "description": "The fact to save"}
},
"required": ["user_id", "memory"]
}
}
}
]
# ══════════════════════════════════════════════
# GROQ — main chat function
# ══════════════════════════════════════════════
async def chat_with_groq(chat_id: str, user_id: str, user_first_name: str, user_message: str) -> str:
memories = get_memories(user_id)
history = get_history_for_llm(chat_id)
system = build_system_prompt(user_first_name, memories)
messages = (
[{"role": "system", "content": system}]
+ history
+ [{"role": "user", "content": f"[{user_first_name}] {user_message}"}]
)
# Force web_search tool when weather, timezone or news keywords detected
if is_weather_query(user_message):
tool_choice = {"type": "function", "function": {"name": "web_search"}}
log.info("Forcing web_search for weather query")
elif is_timezone_query(user_message):
tool_choice = {"type": "function", "function": {"name": "web_search"}}
log.info("Forcing web_search for timezone query")
elif is_news_query(user_message):
tool_choice = {"type": "function", "function": {"name": "web_search"}}
log.info("Forcing web_search for news query")
# Inject intent into messages so model generates correct query
intl_keywords = ["internacionais", "internacional", "mundo", "world", "international"]
if any(kw in user_message.lower() for kw in intl_keywords):
# Prepend hint to system prompt instead of fake user message
messages[0]["content"] += "\n\nNEXT TOOL CALL: Search for INTERNATIONAL world news in English. Do NOT include Portugal in query."
else:
messages[0]["content"] += "\n\nNEXT TOOL CALL: Search for Portuguese national news. Use query: noticias portugal hoje."
else:
tool_choice = "auto"
response = llm_client.chat.completions.create(
model=LLM_MODEL,
messages=messages,
tools=TOOLS,
tool_choice=tool_choice,
max_tokens=1024,
temperature=0.7,
)
msg = response.choices[0].message
tool_calls = msg.tool_calls or []
if not tool_calls:
reply = msg.content or ""
# If forced tool_choice but model returned nothing, call weather directly
if not reply.strip() or reply.strip() == "":
if is_weather_query(user_message):
log.info("Model skipped forced tool — calling weather_search directly")
weather_result = await weather_search(user_message)
if weather_result:
messages.append({"role": "user", "content": f"[weather data]\n{weather_result}\n\nAnswer the user naturally in Portuguese based on this data."})
r2 = llm_client.chat.completions.create(
model=LLM_MODEL, messages=messages, max_tokens=1024, temperature=0.7
)
return r2.choices[0].message.content or ""
return reply or ""
messages.append({
"role": "assistant",
"content": msg.content or "",
"tool_calls": [
{"id": tc.id, "type": "function",
"function": {"name": tc.function.name, "arguments": tc.function.arguments}}
for tc in tool_calls
]
})
for tc in tool_calls:
fn = tc.function.name
args = json.loads(tc.function.arguments)
log.info(f"Tool: {fn}({args})")
if fn == "web_search":
result = await web_search(args["query"])
elif fn == "search_memory":
query = args["query"]
found = []
# ── 1. Semantic search over group history ──────────────
query_emb = await get_embedding(query)
if query_emb:
rows = await asyncio.to_thread(
vector_search_sync, chat_id, query_emb, VECTOR_TOP_K
)
for row in rows:
sim = float(row["similarity"])
if sim < 0.3: # skip very low similarity
continue
ts = row["created"].strftime("%Y-%m-%d %H:%M") if row.get("created") else "?"
found.append(
f"[{ts}] {row.get('username', '?')} (sim={sim:.2f}): {row['content']}"
)
else:
log.warning("search_memory: no embedding — skipping vector search")
# ── 2. Always include saved long-term memories ─────────
for mem in get_memories(user_id):
found.append(f"[memory] {mem}")
if found:
result = "\n".join(found)
else:
result = "Nothing relevant found in memory."
elif fn == "save_memory":
target_uid = args.get("user_id", user_id)
add_memory(target_uid, args["memory"])
result = f"Memory saved: {args['memory']}"
else:
result = "Unknown tool."
messages.append({"role": "tool", "content": result, "tool_call_id": tc.id})
response2 = llm_client.chat.completions.create(
model=LLM_MODEL,
messages=messages,
max_tokens=1024,
temperature=0.7,
)
return response2.choices[0].message.content or ""
# ══════════════════════════════════════════════
# TELEGRAM HANDLERS
# ══════════════════════════════════════════════
async def cmd_start(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
upsert_user(str(update.effective_user.id), update.effective_user.username or "", update.effective_user.full_name)
owner_hint = "\n\n🔑 És o owner deste bot." if is_owner(update) else ""
await update.message.reply_text(
f"👋 Olá {update.effective_user.first_name}! Sou o **{BOT_NAME}** 😊\n\n"
f"Menciona o meu nome numa mensagem para eu responder!\n\n"
f"Comandos:\n"
f"/memorias ver as tuas memórias\n"
f"/esquecer `<n>` apagar memória nº n\n"
f"/ajuda mostrar esta mensagem\n"
f"\n*Comandos de owner:*\n"
f"/apagarmem `<user_id>` apagar memórias de um utilizador\n"
f"/apagartudo apagar TODAS as memórias do bot\n"
f"/novachat limpar histórico deste grupo"
+ owner_hint,
parse_mode="Markdown"
)
async def cmd_help(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
await cmd_start(update, ctx)
async def cmd_memories(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
uid = str(update.effective_user.id)
rows = get_all_memories_indexed(uid)
if not rows:
await update.message.reply_text("🧠 Ainda não tenho memórias tuas. Fala comigo!")
return
text = f"🧠 **As tuas memórias ({BOT_NAME}):**\n\n" + "\n".join(f"{i+1}. {m}" for i, (_, m) in enumerate(rows))
text += "\n\nUsa /esquecer `<número>` para apagar uma."
await update.message.reply_text(text, parse_mode="Markdown")
async def cmd_forget(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
uid = str(update.effective_user.id)
args = ctx.args
if not args or not args[0].isdigit():
await update.message.reply_text("Uso: /esquecer `<número>` (obtém números com /memorias)", parse_mode="Markdown")
return
rows = get_all_memories_indexed(uid)
idx = int(args[0]) - 1
if 0 <= idx < len(rows):
delete_memory_by_id(rows[idx][0])
await update.message.reply_text("✅ Memória apagada.")
else:
await update.message.reply_text("❌ Número inválido.")
@owner_only
async def cmd_clear_user_memories(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not ctx.args:
await update.message.reply_text("Uso: /apagarmem `<user_id>`", parse_mode="Markdown")
return
clear_memories(ctx.args[0])
await update.message.reply_text(f"🗑️ Memórias do utilizador `{ctx.args[0]}` apagadas.", parse_mode="Markdown")
@owner_only
async def cmd_clearall(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
clear_all_memories()
await update.message.reply_text(f"🗑️ Todas as memórias do {BOT_NAME} foram apagadas.")
@owner_only
async def cmd_newchat(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
clear_history(str(update.effective_chat.id))
await update.message.reply_text("🔄 Histórico do grupo limpo! (Memórias de longo prazo mantidas.)")
async def handle_message(update: Update, ctx: ContextTypes.DEFAULT_TYPE):
if not update.message or not update.message.text:
return
user = update.effective_user
uid = str(user.id)
chat_id = str(update.effective_chat.id)
text = update.message.text
fname = user.first_name or "utilizador"
upsert_user(uid, user.username or "", user.full_name or fname)
# Always store + embed every message, even when bot stays silent
await store_message(chat_id, uid, fname, "user", text)
if not is_mentioned(text):
log.debug(f"[{BOT_NAME}] Stored silently: {text[:60]}")
return
await update.message.chat.send_action("typing")
error_reply = False
try:
reply = await chat_with_groq(chat_id, uid, fname, text)
except Exception as e:
log.error(f"Groq error: {e}\n{traceback.format_exc()}")
reply = "⚠️ Ocorreu um erro. Tenta novamente."
error_reply = True
if not error_reply:
await store_message(chat_id, None, BOT_NAME, "assistant", reply)
for chunk in [reply[i:i+4096] for i in range(0, len(reply), 4096)]:
if not chunk.strip():
continue
try:
await update.message.chat.send_message(chunk, parse_mode="Markdown")
except Exception:
try:
await update.message.chat.send_message(chunk)
except Exception:
pass
# ══════════════════════════════════════════════
# MAIN
# ══════════════════════════════════════════════
def main():
if not TELEGRAM_BOT_TOKEN:
raise ValueError("TELEGRAM_BOT_TOKEN not set — check .env_credentials")
if LLM_PROVIDER == "groq" and not GROQ_API_KEY:
raise ValueError("GROQ_API_KEY not set — check .env_credentials")
if LLM_PROVIDER == "openrouter" and not OPENROUTER_API_KEY:
raise ValueError("OPENROUTER_API_KEY not set — check .env_credentials")
if OWNER_ID == 0:
log.warning("OWNER_TELEGRAM_ID not set — admin commands blocked for everyone.")
init_db()
log.info(f"Starting {BOT_NAME} | trigger='{BOT_TRIGGER}' | provider={LLM_PROVIDER} | model={LLM_MODEL} | embed={EMBED_MODEL}@{OLLAMA_URL}")
app = Application.builder().token(TELEGRAM_BOT_TOKEN).build()
app.add_handler(CommandHandler("start", cmd_start))
app.add_handler(CommandHandler("ajuda", cmd_help))
app.add_handler(CommandHandler("help", cmd_help))
app.add_handler(CommandHandler("memorias", cmd_memories))
app.add_handler(CommandHandler("esquecer", cmd_forget))
app.add_handler(CommandHandler("apagarmem", cmd_clear_user_memories))
app.add_handler(CommandHandler("apagartudo", cmd_clearall))
app.add_handler(CommandHandler("novachat", cmd_newchat))
app.add_handler(MessageHandler(filters.TEXT & ~filters.COMMAND, handle_message))
app.run_polling(allowed_updates=Update.ALL_TYPES)
if __name__ == "__main__":
main()