{ "name": "AnimeKai Streamer", "version": "1.3.0", "author": "Animex", "description": "Resolves AnimeKai.to streams using backend encrypted APIs via Cloud Tunnel. Supports Sub and Dub. No Selenium.", "type": ["ANIME_STREAMER"], "requirements": ["httpx", "beautifulsoup4"] } --- import re import httpx import asyncio import json import urllib.parse from typing import Optional, Dict, List # ========================= # CONSTANTS # ========================= BASE_URL = "https://animekai.to" ENC_API = "https://enc-dec.app/api" DB_URL = "https://enc-dec.app/db/kai/find" HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/137.0.0.0 Safari/537.36" ), "Referer": BASE_URL } # ========================= # ENCRYPT / DECRYPT HELPERS # ========================= async def enc_kai(text: str) -> str: r = await httpx.get( f"{ENC_API}/enc-kai", params={"text": text}, timeout=10, headers=HEADERS ) r.raise_for_status() return r.json()["result"] async def dec_kai(text: str) -> str: r = await httpx.post( f"{ENC_API}/dec-kai", json={"text": text}, timeout=10, headers=HEADERS ) r.raise_for_status() return r.json()["result"] async def dec_mega(text: str) -> Dict: r = await httpx.post( f"{ENC_API}/dec-mega", json={ "text": text, "agent": HEADERS["User-Agent"] }, timeout=15, headers=HEADERS ) r.raise_for_status() return r.json() # ========================= # INTERNAL LOGIC # ========================= async def get_anime_id(slug: str) -> tuple[str, dict]: # slug is actually mal_id (int or str) resp = await httpx.get(f"{DB_URL}?mal_id={slug}", headers=HEADERS) resp.raise_for_status() data = resp.json() if not data or not data[0].get("info", {}).get("kai_id"): raise RuntimeError("Anime ID not found in JSON response") return data[0]["info"]["kai_id"], data[0] async def get_episode_token(data: dict, episode: int, dub: bool) -> str: # "1" = Sub, "2" = Dub in this specific database structure primary_key = "2" if dub else "1" fallback_key = "1" if dub else "2" ep_str = str(episode) episodes_data = data.get("episodes", {}) # Try requested language first if primary_key in episodes_data and ep_str in episodes_data[primary_key]: return episodes_data[primary_key][ep_str]["token"] # Fallback to the other language if requested isn't available if fallback_key in episodes_data and ep_str in episodes_data[fallback_key]: return episodes_data[fallback_key][ep_str]["token"] raise RuntimeError(f"Episode token not found for episode {episode}") async def get_server_id(ep_token: str, dub: bool) -> List[str]: enc = await enc_kai(ep_token) html = await httpx.get( f"{BASE_URL}/ajax/links/list", params={"token": ep_token, "_": enc}, headers=HEADERS ) # The actual HTML is in the 'result' field of the JSON response try: html_content = html.json()["result"] except Exception: html_content = html.text # Determine which language group to use # Default: sub (Hard Sub), or dub if requested lang_order = ["dub", "sub"] if dub else ["sub", "softsub", "dub"] all_server_ids = [] for lang in lang_order: group_pattern = rf'