WS

AI-агент как Telegram-бот за час

Деплой AI-агента через Telegram с расширенными возможностями: file upload, streaming ответов, inline клавиатура.

📊 Средний⏱ 60 мин

# 1. АРХИТЕКТУРА С STREAMING

# Два режима получения обновлений от Telegram:
#
# 1. LONG POLLING (для разработки и простых ботов):
#    ▸ Бот постоянно опрашивает Telegram: "есть новые сообщения?"
#    ▸ Простота: не нужен домен и HTTPS
#    ▸ Минус: задержка до 1-2 секунд, сложнее масштабировать
#
# 2. WEBHOOK (для продакшена):
#    ▸ Telegram сам отправляет обновления на ваш URL
#    ▸ Мгновенная доставка, легко масштабировать
#    ▸ Требуется: домен + HTTPS (SSL)

# Streaming ответов: отправляем токены по мере генерации
curl -N http://localhost:11434/api/generate \
  -d '{"model":"llama3.1","prompt":"Напиши код на Python"}'
# -N флаг отключает буферизацию → токены приходят по одному

# 2. РАСШИРЕННЫЙ БОТ

from telegram import Update, InlineKeyboardButton, InlineKeyboardMarkup
from telegram.ext import Application, CommandHandler, CallbackQueryHandler

# Команды бота
async def task(update: Update, context):
    """Команда /task — создать новую задачу."""
    task_text = " ".join(context.args)
    if not task_text:
        await update.message.reply_text(
            "Использование: /task <описание задачи>")
        return
    await update.message.reply_text(
        f"📝 Задача создана: {task_text[:200]}",
        reply_markup=_task_keyboard()
    )

# Inline клавиатура с кнопками действий
def _task_keyboard():
    keyboard = [
        [InlineKeyboardButton("▶️ Старт", callback_data="start_task")],
        [InlineKeyboardButton("📊 Статус", callback_data="status"),
         InlineKeyboardButton("📋 Результат", callback_data="result")],
        [InlineKeyboardButton("❌ Отмена", callback_data="cancel")],
    ]
    return InlineKeyboardMarkup(keyboard)

async def button_handler(update: Update, context):
    """Обработка нажатий на кнопки."""
    query = update.callback_query
    await query.answer()
    data = query.data
    if data == "start_task":
        # Запуск задачи асинхронно
        await query.edit_message_text("🔄 Выполняю...")
    elif data == "cancel":
        await query.edit_message_text("❌ Задача отменена.")

# 3. FILE HANDLING

import os
import aiohttp
from telegram.ext import MessageHandler, filters

async def handle_document(update: Update, context):
    """Загрузка файла из Telegram → обработка агентом."""
    doc = update.message.document
    file = await context.bot.get_file(doc.file_id)

    # Скачивание во временную директорию
    tmp_dir = f"/tmp/agent_files/{update.effective_user.id}"
    os.makedirs(tmp_dir, exist_ok=True)
    filepath = os.path.join(tmp_dir, doc.file_name)
    await file.download_to_drive(filepath)

    # Передаём агенту на обработку
    result = await process_with_agent(filepath, doc.mime_type)

    # Отправка результата обратно файлом
    with open(result, 'rb') as f:
        await update.message.reply_document(
            document=f,
            filename=f"result_{doc.file_name}",
            caption="✅ Обработано агентом"
        )

# 4. STREAMING ОТВЕТОВ

# Streaming: токены приходят → обновляем сообщение в Telegram
import asyncio
import aiohttp

async def stream_to_telegram(prompt: str, chat_id: int, bot):
    """Потоковая отправка ответа LLM в Telegram."""

    # 1. Отправляем пустое сообщение — будем редактировать
    msg = await bot.send_message(chat_id=chat_id, text="🤔 Думаю...")

    # 2. Показываем индикатор печати
    await bot.send_chat_action(chat_id=chat_id, action="typing")

    # 3. Стримим ответ от Ollama
    buffer = ""
    token_count = 0
    async with aiohttp.ClientSession() as session:
        async with session.post(
            "http://localhost:11434/api/generate",
            json={"model": "llama3.1", "prompt": prompt}
        ) as resp:
            async for line in resp.content:
                token = json.loads(line).get("response", "")
                buffer += token
                token_count += 1

                # Обновляем сообщение каждые 5 токенов
                if token_count % 5 == 0:
                    try:
                        await msg.edit_text(buffer + " ▍")
                    except Exception:
                        pass  # игнорируем ошибки редактирования

    # 4. Финальное сообщение без курсора
    await msg.edit_text(buffer)

# 5. PERSISTENT SESSIONS

import sqlite3
import json
from datetime import datetime

class SessionStore:
    """SQLite-хранилище сессий пользователей."""

    def __init__(self, db_path="sessions.db"):
        self.conn = sqlite3.connect(db_path)
        self._init_db()

    def _init_db(self):
        self.conn.execute("""CREATE TABLE IF NOT EXISTS sessions (
            user_id INTEGER PRIMARY KEY,
            context TEXT NOT NULL DEFAULT '[]',
            model TEXT DEFAULT 'llama3.1',
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
        )""")
        self.conn.commit()

    def save_context(self, user_id: int, messages: list):
        """Сохранение контекста диалога (последние N сообщений)."""
        context_json = json.dumps(messages[-50:])
        self.conn.execute(
            """INSERT INTO sessions (user_id, context, updated_at)
               VALUES (?, ?, ?)
               ON CONFLICT(user_id) DO UPDATE SET
               context=excluded.context,
               updated_at=excluded.updated_at""",
            (user_id, context_json, datetime.now())
        )
        self.conn.commit()

    def load_context(self, user_id: int) -> list:
        """Загрузка контекста диалога пользователя."""
        row = self.conn.execute(
            "SELECT context FROM sessions WHERE user_id=?", (user_id,)
        ).fetchone()
        if row:
            return json.loads(row[0])
        return []

    def clear_session(self, user_id: int):
        """Сброс сессии пользователя."""
        self.conn.execute(
            "DELETE FROM sessions WHERE user_id=?", (user_id,))
        self.conn.commit()

# 6. МУЛЬТИ-ПОЛЬЗОВАТЕЛЬСКАЯ АРХИТЕКТУРА

import asyncio
from collections import deque

class TaskQueue:
    """Очередь задач с пулом воркеров."""

    def __init__(self, max_workers=4):
        self.queue = asyncio.Queue(maxsize=100)
        self.workers = []
        self.max_workers = max_workers
        self._running = True

    async def start(self):
        """Запуск пула worker'ов."""
        self.workers = [asyncio.create_task(self._worker(i))
                         for i in range(self.max_workers)]

    async def _worker(self, worker_id):
        while self._running:
            try:
                task, callback = await asyncio.wait_for(
                    self.queue.get(), timeout=5.0)
                result = await execute_in_sandbox(task)
                await callback(result)
            except asyncio.TimeoutError:
                continue

    async def submit(self, task, callback):
        await self.queue.put((task, callback))

    async def shutdown(self):
        """Graceful shutdown: завершаем текущие задачи."""
        self._running = False
        for w in self.workers:
            w.cancel()
        await asyncio.gather(*self.workers, return_exceptions=True)

🔗 Полезные ссылки

📖 python-telegram-bot📖 asyncio📖 SQLite📖 aiohttp📖 Telegram Bot API