{ "name": "AnimeKai Streamer", "version": "1.3.0", "author": "Animex", "description": "Resolves AnimeKai.to streams using backend encrypted APIs via Cloud Tunnel. Supports Sub and Dub. No Selenium.", "type": ["ANIME_STREAMER"], "requirements": ["httpx", "beautifulsoup4"] } --- import re import httpx import asyncio import json import urllib.parse from typing import Optional, Dict, List # ========================= # CONSTANTS # ========================= BASE_URL = "https://animekai.to" ENC_API = "https://enc-dec.app/api" DB_URL = "https://enc-dec.app/db/kai/find" HEADERS = { "User-Agent": ( "Mozilla/5.0 (Windows NT 10.0; Win64; x64) " "AppleWebKit/537.36 (KHTML, like Gecko) " "Chrome/137.0.0.0 Safari/537.36" ), "Referer": BASE_URL } # ========================= # ENCRYPT / DECRYPT HELPERS # ========================= async def enc_kai(text: str) -> str: r = await httpx.get( f"{ENC_API}/enc-kai", params={"text": text}, timeout=10, headers=HEADERS ) r.raise_for_status() return r.json()["result"] async def dec_kai(text: str) -> str: r = await httpx.post( f"{ENC_API}/dec-kai", json={"text": text}, timeout=10, headers=HEADERS ) r.raise_for_status() return r.json()["result"] async def dec_mega(text: str) -> Dict: r = await httpx.post( f"{ENC_API}/dec-mega", json={ "text": text, "agent": HEADERS["User-Agent"] }, timeout=15, headers=HEADERS ) r.raise_for_status() return r.json() # ========================= # INTERNAL LOGIC # ========================= async def get_anime_id(slug: str) -> tuple[str, dict]: # slug is actually mal_id (int or str) resp = await httpx.get(f"{DB_URL}?mal_id={slug}", headers=HEADERS) resp.raise_for_status() data = resp.json() if not data or not data[0].get("info", {}).get("kai_id"): raise RuntimeError("Anime ID not found in JSON response") return data[0]["info"]["kai_id"], data[0] async def get_episode_token(data: dict, episode: int, dub: bool) -> str: # "1" = Sub, "2" = Dub in this specific database structure primary_key = "2" if dub else "1" fallback_key = "1" if dub else "2" ep_str = str(episode) episodes_data = data.get("episodes", {}) # Try requested language first if primary_key in episodes_data and ep_str in episodes_data[primary_key]: return episodes_data[primary_key][ep_str]["token"] # Fallback to the other language if requested isn't available if fallback_key in episodes_data and ep_str in episodes_data[fallback_key]: return episodes_data[fallback_key][ep_str]["token"] raise RuntimeError(f"Episode token not found for episode {episode}") async def get_server_id(ep_token: str, dub: bool) -> List[str]: enc = await enc_kai(ep_token) html = await httpx.get( f"{BASE_URL}/ajax/links/list", params={"token": ep_token, "_": enc}, headers=HEADERS ) # The actual HTML is in the 'result' field of the JSON response try: html_content = html.json()["result"] except Exception: html_content = html.text # Determine which language group to use # Default: sub (Hard Sub), or dub if requested lang_order = ["dub", "sub"] if dub else ["sub", "softsub", "dub"] all_server_ids = [] for lang in lang_order: group_pattern = rf'
]*>(.*?)
' group_match = re.search(group_pattern, html_content, re.DOTALL) if group_match: group_html = group_match.group(1) server_pattern = r']+data-lid="([^"]+)"[^>]*>Server \d+' server_ids = re.findall(server_pattern, group_html) if server_ids: all_server_ids.extend(server_ids) if not all_server_ids: raise RuntimeError("No servers found in any language group") return all_server_ids async def resolve_stream_url(server_id: str) -> dict: enc = await enc_kai(server_id) view = await httpx.get( f"{BASE_URL}/ajax/links/view", params={"id": server_id, "_": enc}, headers=HEADERS ) decrypted = await dec_kai(view.json()["result"]) print(f"decrypted (dec_kai): {decrypted}") skip = {} media_url = "" if isinstance(decrypted, dict): skip = decrypted.get("skip", {}) media_url = decrypted.get("url", "").replace("/e/", "/media/") # Extract referer from media_url (main domain) referer = "" if media_url: parsed = urllib.parse.urlparse(media_url) referer = f"{parsed.scheme}://{parsed.netloc}" media = await httpx.get(media_url, headers=HEADERS) mega = await dec_mega(media.json()["result"]) print(mega) sources = mega.get("result", {}).get("sources", []) tracks = mega.get("result", {}).get("tracks", []) # Find captions and thumbnails captions = [t for t in tracks if t.get("kind") == "captions"] thumbnails = [t for t in tracks if t.get("kind") == "thumbnails"] if not sources: raise RuntimeError("No stream sources found") # Return both the file URL and skip times return { "url": sources[0]["file"], "skip": skip, "captions": captions, "thumbnails": thumbnails, "referer": referer } # ========================= # PUBLIC MODULE API # ========================= async def get_iframe_source(mal_id: int, episode: int, dub: bool) -> Optional[str]: """ Returns resolved m3u8 stream URL integrated into the player link """ try: ani_id, anime_json = await get_anime_id(str(mal_id)) print(f"ani_id: {ani_id}") # Pass the dub parameter so it checks for the correct sub/dub token ep_token = await get_episode_token(anime_json, episode, dub) print(f"ep_token: {ep_token}") server_ids = await get_server_id(ep_token, dub) print(f"server_ids: {server_ids}") for sid in server_ids: try: print(f"Trying server_id: {sid}") result = await resolve_stream_url(sid) if result: url = result["url"] skip = result.get("skip", {}) # Build new player URL player_url = "/video_player.html?stream=true&full=true" # Add skip_times, captions, thumbnails as separate params if "intro" in skip or "outro" in skip: skip_times_param = {} if "intro" in skip: skip_times_param["intro"] = skip["intro"] if "outro" in skip: skip_times_param["outro"] = skip["outro"] player_url += f"&skip_times={urllib.parse.quote(json.dumps(skip_times_param))}" if result.get("captions"): player_url += f"&captions={urllib.parse.quote(json.dumps(result['captions']))}" if result.get("thumbnails"): player_url += f"&thumbnails={urllib.parse.quote(json.dumps(result['thumbnails']))}" if result.get("referer"): player_url += f"&referer={urllib.parse.quote(result['referer'])}" player_url += f"&id={mal_id}&episode={episode}&video={urllib.parse.quote(url)}" return player_url except Exception as e: print(f"Failed to resolve server {sid}: {e}") print("All servers failed.") return None except Exception as e: import traceback print(f"Exception in get_iframe_source: {type(e).__name__}: {e}") traceback.print_exc() return None async def get_download_link(mal_id: int, episode: int, dub: bool, quality: str) -> Optional[str]: """ Alias for get_iframe_source (AnimeKai uses adaptive m3u8) """ return await get_iframe_source(mal_id, episode, dub)