Files
deploy-test/modules/animekai.module
2026-03-30 23:05:53 -05:00

255 lines
8.2 KiB
Plaintext

{
"name": "AnimeKai Streamer",
"version": "1.3.0",
"author": "Animex",
"description": "Resolves AnimeKai.to streams using backend encrypted APIs via Cloud Tunnel. Supports Sub and Dub. No Selenium.",
"type": ["ANIME_STREAMER"],
"requirements": ["httpx", "beautifulsoup4"]
}
---
import re
import httpx
import asyncio
import json
import urllib.parse
from typing import Optional, Dict, List
# =========================
# CONSTANTS
# =========================
BASE_URL = "https://animekai.to"
ENC_API = "https://enc-dec.app/api"
DB_URL = "https://enc-dec.app/db/kai/find"
HEADERS = {
"User-Agent": (
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) "
"AppleWebKit/537.36 (KHTML, like Gecko) "
"Chrome/137.0.0.0 Safari/537.36"
),
"Referer": BASE_URL
}
# =========================
# ENCRYPT / DECRYPT HELPERS
# =========================
async def enc_kai(text: str) -> str:
r = await httpx.get(
f"{ENC_API}/enc-kai",
params={"text": text},
timeout=10,
headers=HEADERS
)
r.raise_for_status()
return r.json()["result"]
async def dec_kai(text: str) -> str:
r = await httpx.post(
f"{ENC_API}/dec-kai",
json={"text": text},
timeout=10,
headers=HEADERS
)
r.raise_for_status()
return r.json()["result"]
async def dec_mega(text: str) -> Dict:
r = await httpx.post(
f"{ENC_API}/dec-mega",
json={
"text": text,
"agent": HEADERS["User-Agent"]
},
timeout=15,
headers=HEADERS
)
r.raise_for_status()
return r.json()
# =========================
# INTERNAL LOGIC
# =========================
async def get_anime_id(slug: str) -> tuple[str, dict]:
# slug is actually mal_id (int or str)
resp = await httpx.get(f"{DB_URL}?mal_id={slug}", headers=HEADERS)
resp.raise_for_status()
data = resp.json()
if not data or not data[0].get("info", {}).get("kai_id"):
raise RuntimeError("Anime ID not found in JSON response")
return data[0]["info"]["kai_id"], data[0]
async def get_episode_token(data: dict, episode: int, dub: bool) -> str:
# "1" = Sub, "2" = Dub in this specific database structure
primary_key = "2" if dub else "1"
fallback_key = "1" if dub else "2"
ep_str = str(episode)
episodes_data = data.get("episodes", {})
# Try requested language first
if primary_key in episodes_data and ep_str in episodes_data[primary_key]:
return episodes_data[primary_key][ep_str]["token"]
# Fallback to the other language if requested isn't available
if fallback_key in episodes_data and ep_str in episodes_data[fallback_key]:
return episodes_data[fallback_key][ep_str]["token"]
raise RuntimeError(f"Episode token not found for episode {episode}")
async def get_server_id(ep_token: str, dub: bool) -> List[str]:
enc = await enc_kai(ep_token)
html = await httpx.get(
f"{BASE_URL}/ajax/links/list",
params={"token": ep_token, "_": enc},
headers=HEADERS
)
# The actual HTML is in the 'result' field of the JSON response
try:
html_content = html.json()["result"]
except Exception:
html_content = html.text
# Determine which language group to use
# Default: sub (Hard Sub), or dub if requested
lang_order = ["dub", "sub"] if dub else ["sub", "softsub", "dub"]
all_server_ids = []
for lang in lang_order:
group_pattern = rf'<div class="server-items lang-group" data-id="{lang}"[^>]*>(.*?)</div>'
group_match = re.search(group_pattern, html_content, re.DOTALL)
if group_match:
group_html = group_match.group(1)
server_pattern = r'<span class="server"[^>]+data-lid="([^"]+)"[^>]*>Server \d+</span>'
server_ids = re.findall(server_pattern, group_html)
if server_ids:
all_server_ids.extend(server_ids)
if not all_server_ids:
raise RuntimeError("No servers found in any language group")
return all_server_ids
async def resolve_stream_url(server_id: str) -> dict:
enc = await enc_kai(server_id)
view = await httpx.get(
f"{BASE_URL}/ajax/links/view",
params={"id": server_id, "_": enc},
headers=HEADERS
)
decrypted = await dec_kai(view.json()["result"])
print(f"decrypted (dec_kai): {decrypted}")
skip = {}
media_url = ""
if isinstance(decrypted, dict):
skip = decrypted.get("skip", {})
media_url = decrypted.get("url", "").replace("/e/", "/media/")
# Extract referer from media_url (main domain)
referer = ""
if media_url:
parsed = urllib.parse.urlparse(media_url)
referer = f"{parsed.scheme}://{parsed.netloc}"
media = await httpx.get(media_url, headers=HEADERS)
mega = await dec_mega(media.json()["result"])
print(mega)
sources = mega.get("result", {}).get("sources", [])
tracks = mega.get("result", {}).get("tracks", [])
# Find captions and thumbnails
captions = [t for t in tracks if t.get("kind") == "captions"]
thumbnails = [t for t in tracks if t.get("kind") == "thumbnails"]
if not sources:
raise RuntimeError("No stream sources found")
# Return both the file URL and skip times
return {
"url": sources[0]["file"],
"skip": skip,
"captions": captions,
"thumbnails": thumbnails,
"referer": referer
}
# =========================
# PUBLIC MODULE API
# =========================
async def get_iframe_source(mal_id: int, episode: int, dub: bool) -> Optional[str]:
"""
Returns resolved m3u8 stream URL integrated into the player link
"""
try:
ani_id, anime_json = await get_anime_id(str(mal_id))
print(f"ani_id: {ani_id}")
# Pass the dub parameter so it checks for the correct sub/dub token
ep_token = await get_episode_token(anime_json, episode, dub)
print(f"ep_token: {ep_token}")
server_ids = await get_server_id(ep_token, dub)
print(f"server_ids: {server_ids}")
for sid in server_ids:
try:
print(f"Trying server_id: {sid}")
result = await resolve_stream_url(sid)
if result:
url = result["url"]
skip = result.get("skip", {})
# Build new player URL
player_url = "/video_player.html?stream=true&full=true"
# Add skip_times, captions, thumbnails as separate params
if "intro" in skip or "outro" in skip:
skip_times_param = {}
if "intro" in skip:
skip_times_param["intro"] = skip["intro"]
if "outro" in skip:
skip_times_param["outro"] = skip["outro"]
player_url += f"&skip_times={urllib.parse.quote(json.dumps(skip_times_param))}"
if result.get("captions"):
player_url += f"&captions={urllib.parse.quote(json.dumps(result['captions']))}"
if result.get("thumbnails"):
player_url += f"&thumbnails={urllib.parse.quote(json.dumps(result['thumbnails']))}"
if result.get("referer"):
player_url += f"&referer={urllib.parse.quote(result['referer'])}"
player_url += f"&id={mal_id}&episode={episode}&video={urllib.parse.quote(url)}"
return player_url
except Exception as e:
print(f"Failed to resolve server {sid}: {e}")
print("All servers failed.")
return None
except Exception as e:
import traceback
print(f"Exception in get_iframe_source: {type(e).__name__}: {e}")
traceback.print_exc()
return None
async def get_download_link(mal_id: int, episode: int, dub: bool, quality: str) -> Optional[str]:
"""
Alias for get_iframe_source (AnimeKai uses adaptive m3u8)
"""
return await get_iframe_source(mal_id, episode, dub)