{ "name": "AnimeKai Streamer", "version": "1.2.1", "author": "Animex", "description": "Resolves AnimeKai.to streams using backend encrypted APIs. Fully optimized for Cloud Tunneling.", "type": ["ANIME_STREAMER"], "requirements":["httpx", "beautifulsoup4"] } --- import re import httpx import asyncio import json import urllib.parse from typing import Optional, Dict, List # ========================= # CONSTANTS # ========================= BASE_URL = "https://animekai.to" ENC_API = "https://enc-dec.app/api" DB_URL = "https://enc-dec.app/db/kai/find" HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/137.0.0.0 Safari/537.36" ), "Referer": BASE_URL } # ========================= # ENCRYPT / DECRYPT HELPERS # ========================= async def enc_kai(client, text: str) -> str: r = await client.get( f"{ENC_API}/enc-kai", params={"text": text}, headers=HEADERS, timeout=10 ) r.raise_for_status() return r.json()["result"] async def dec_kai(client, text: str) -> str: r = await client.post( f"{ENC_API}/dec-kai", json={"text": text}, headers=HEADERS, timeout=10 ) r.raise_for_status() return r.json()["result"] async def dec_mega(client, text: str) -> Dict: r = await client.post( f"{ENC_API}/dec-mega", json={ "text": text, "agent": HEADERS["User-Agent"] }, headers=HEADERS, timeout=15 ) r.raise_for_status() return r.json() # ========================= # INTERNAL LOGIC # ========================= async def get_anime_id(client, slug: str) -> str: # slug is actually mal_id (int) resp = await client.get(f"{DB_URL}?mal_id={slug}", headers=HEADERS) resp.raise_for_status() data = resp.json() if not data or not data[0].get("info", {}).get("kai_id"): raise RuntimeError("Anime ID not found in JSON response") return data[0]["info"]["kai_id"], data[0] async def get_episode_token(data: dict, episode: int) -> str: # Always fetch the sub ("1") token for the episode try: token = data["episodes"]["1"][str(episode)]["token"] except Exception: raise RuntimeError("Episode token not found in JSON response") return token async def get_server_id(client, ep_token: str, dub: bool) -> str: enc = await enc_kai(client, ep_token) html = await client.get( f"{BASE_URL}/ajax/links/list", params={"token": ep_token, "_": enc}, headers=HEADERS ) # The actual HTML is in the 'result' field of the JSON response try: html_content = html.json()["result"] except Exception: html_content = html.text # Determine which language group to use # Default: sub (Hard Sub), or dub if requested lang_order = ["dub", "sub"] if dub else ["sub", "softsub", "dub"] all_server_ids =[] for lang in lang_order: group_pattern = rf'
]*>(.*?)
' group_match = re.search(group_pattern, html_content, re.DOTALL) if group_match: group_html = group_match.group(1) server_pattern = r']+data-lid="([^"]+)"[^>]*>Server \d+' server_ids = re.findall(server_pattern, group_html) if server_ids: all_server_ids.extend(server_ids) if not all_server_ids: raise RuntimeError("No servers found in any language group") return all_server_ids async def resolve_stream_url(client, server_id: str) -> dict: enc = await enc_kai(client, server_id) view = await client.get( f"{BASE_URL}/ajax/links/view", params={"id": server_id, "_": enc}, headers=HEADERS ) decrypted = await dec_kai(client, view.json()["result"]) print(f"decrypted (dec_kai): {decrypted}") skip = {} media_url = "" if isinstance(decrypted, dict): skip = decrypted.get("skip", {}) media_url = decrypted.get("url", "").replace("/e/", "/media/") # Extract referer from media_url (main domain) referer = "" if media_url: parsed = urllib.parse.urlparse(media_url) referer = f"{parsed.scheme}://{parsed.netloc}" media = await client.get(media_url, headers=HEADERS) mega = await dec_mega(client, media.json()["result"]) print(mega) sources = mega.get("result", {}).get("sources",[]) tracks = mega.get("result", {}).get("tracks",[]) # Find captions and thumbnails captions =[t for t in tracks if t.get("kind") == "captions"] thumbnails = [t for t in tracks if t.get("kind") == "thumbnails"] if not sources: raise RuntimeError("No stream sources found") # Return both the file URL and skip times return { "url": sources[0]["file"], "skip": skip, "captions": captions, "thumbnails": thumbnails, "referer": referer } # ========================= # PUBLIC MODULE API # ========================= async def get_iframe_source( mal_id: int, episode: int, dub: bool ) -> Optional[str]: """ Returns resolved m3u8 stream URL using the injected Tunneling HybridClient """ # The 'httpx' variable resolves to the injected HybridClient dynamically # replaced during module processing in app.py's `get_iframe_source` endpoint. client = httpx try: ani_id, anime_json = await get_anime_id(client, mal_id) print(f"ani_id: {ani_id}") ep_token = await get_episode_token(anime_json, episode) print(f"ep_token: {ep_token}") server_ids = await get_server_id(client, ep_token, dub) print(f"server_ids: {server_ids}") for sid in server_ids: try: print(f"Trying server_id: {sid}") result = await resolve_stream_url(client, sid) if result: url = result["url"] skip = result.get("skip", {}) # Build new player URL player_url = "/video_player.html?stream=true&full=true" # Add skip_times, captions, thumbnails as separate params if "intro" in skip or "outro" in skip: skip_times_param = {} if "intro" in skip: skip_times_param["intro"] = skip["intro"] if "outro" in skip: skip_times_param["outro"] = skip["outro"] player_url += f"&skip_times={urllib.parse.quote(json.dumps(skip_times_param))}" if result.get("captions"): player_url += f"&captions={urllib.parse.quote(json.dumps(result['captions']))}" if result.get("thumbnails"): player_url += f"&thumbnails={urllib.parse.quote(json.dumps(result['thumbnails']))}" if result.get("referer"): player_url += f"&referer={urllib.parse.quote(result['referer'])}" player_url += f"&id={mal_id}&episode={episode}&video={urllib.parse.quote(url)}" return player_url except Exception as e: print(f"Failed to resolve server {sid}: {e}") print("All servers failed.") return None except Exception as e: import traceback print(f"Exception in get_iframe_source: {type(e).__name__}: {e}") traceback.print_exc() return None async def get_download_link( mal_id: int, episode: int, dub: bool, quality: str ) -> Optional[str]: """ Alias for get_iframe_source (AnimeKai uses adaptive m3u8) """ return await get_iframe_source(mal_id, episode, dub)