{ "name": "AnimeKai Streamer", "version": "1.4.0", "author": "Animex", "description": "Cloud-optimized AnimeKai resolver. Handles Tunnel routing and deep decryption.", "type": ["ANIME_STREAMER"], "requirements": ["httpx", "beautifulsoup4"] } --- import re import json import httpx import urllib.parse from typing import Optional, Dict, List, Any # Constants BASE_URL = "https://animekai.to" ENC_API = "https://enc-dec.app/api" DB_URL = "https://enc-dec.app/db/kai/find" UA = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36" async def get_iframe_source(mal_id: int, episode: int, dub: bool) -> Optional[str]: """ Core resolver function. 'httpx' here is the HybridClient injected by app.py. """ try: # 1. Resolve MAL ID to AnimeKai metadata meta_res = await httpx.get(f"{DB_URL}?mal_id={mal_id}") meta_res.raise_for_status() meta_data = meta_res.json() if not meta_data: return None # 2. Locate the specific episode token lang_key = "2" if dub else "1" ep_entry = meta_data[0].get("episodes", {}).get(lang_key, {}).get(str(episode)) if not ep_entry: # Fallback to sub if dub is missing ep_entry = meta_data[0].get("episodes", {}).get("1", {}).get(str(episode)) if not ep_entry: return None token = ep_entry["token"] # 3. Get Server List (requires an encrypted version of the token) enc_token_req = await httpx.get(f"{ENC_API}/enc-kai", params={"text": token}) enc_token = enc_token_req.json().get("result") servers_res = await httpx.get(f"{BASE_URL}/ajax/links/list", params={"token": token, "_": enc_token}) servers_html = servers_res.json().get("result", "") # Extract server data-lids server_ids = re.findall(r'data-lid="([^"]+)"', servers_html) if not server_ids: return None # 4. Iterate through servers to find a working stream for sid in server_ids: try: # Encrypt SID for the view-link request enc_sid_req = await httpx.get(f"{ENC_API}/enc-kai", params={"text": sid}) enc_sid = enc_sid_req.json().get("result") view_res = await httpx.get(f"{BASE_URL}/ajax/links/view", params={"id": sid, "_": enc_sid}) # Decrypt the view response to get the embed host URL and skip times dec_view_req = await httpx.post(f"{ENC_API}/dec-kai", json_data={"text": view_res.json().get("result")}) dec_view = dec_view_req.json().get("result") if not dec_view or "url" not in dec_view: continue # Convert embed URL to media API URL (e.g., megaup.nl/e/... -> megaup.nl/media/...) media_url = dec_view["url"].replace("/e/", "/media/") # Fetch the media page (this is where the Home Agent's User-Agent is crucial) media_page = await httpx.get(media_url, headers={"User-Agent": UA, "Referer": BASE_URL}) # 5. Robust Extraction of the encrypted "result" blob # We use regex directly on the HTML to find the result string blob_match = re.search(r'"result"\s*:\s*"([^"]+)"', media_page.text) if not blob_match: continue encrypted_blob = blob_match.group(1) # 6. Final decryption of the stream sources final_dec_req = await httpx.post(f"{ENC_API}/dec-mega", json_data={"text": encrypted_blob, "agent": UA}) final_data = final_dec_req.json().get("result", {}) sources = final_data.get("sources", []) if not sources: continue video_url = sources[0].get("file") # 7. Construct the final player URL for the frontend # Extract the host for the referer header parsed_uri = urllib.parse.urlparse(media_url) referer_host = f"{parsed_uri.scheme}://{parsed_uri.netloc}" params = { "video": video_url, "referer": referer_host, "id": mal_id, "episode": episode, "stream": "true", "full": "true" } # Add skip times (Intro/Outro) if available if dec_view.get("skip"): params["skip_times"] = json.dumps(dec_view["skip"]) # Add subtitles if available tracks = final_data.get("tracks", []) subs = [t for t in tracks if t.get("kind") == "captions"] if subs: params["captions"] = json.dumps(subs) return f"/video_player.html?{urllib.parse.urlencode(params)}" except Exception as e: print(f"[AnimeKai] Sid {sid} failed: {str(e)}") continue return None except Exception as e: print(f"[AnimeKai] Global Module Error: {str(e)}") return None async def get_download_link(mal_id: int, episode: int, dub: bool, quality: str) -> Optional[str]: """ Returns the direct m3u8 link. """ source_url = await get_iframe_source(mal_id, episode, dub) if source_url: parsed = urllib.parse.urlparse(source_url) qs = urllib.parse.parse_qs(parsed.query) return qs.get("video", [None])[0] return None