initialˆ

This commit is contained in:
2025-09-30 12:54:29 +08:00
commit acdf544b08
117 changed files with 20260 additions and 0 deletions

14
jm/.idea/jm.iml generated Normal file
View File

@@ -0,0 +1,14 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="PYTHON_MODULE" version="4">
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$">
<excludeFolder url="file://$MODULE_DIR$/.venv" />
</content>
<orderEntry type="jdk" jdkName="Python 3.9 (jm)" jdkType="Python SDK" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
<component name="PyDocumentationSettings">
<option name="format" value="PLAIN" />
<option name="myDocStringFormat" value="Plain" />
</component>
</module>

View File

@@ -0,0 +1,289 @@
from fastapi import FastAPI, HTTPException
from jmcomic import *
from typing import List, Union
import redis
from datetime import timedelta
from functools import wraps
import json
def cache_result(expire_time: timedelta):
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
# 生成缓存键
cache_key = f"{func.__name__}:{json.dumps(kwargs)}"
# 尝试从缓存中获取数据
cached_data = redis_client.get(cache_key)
if cached_data:
return json.loads(cached_data)
# 如果缓存中没有数据,则调用函数获取数据
result = func(*args, **kwargs)
# 将结果存入缓存
redis_client.setex(cache_key, int(expire_time.total_seconds()), json.dumps(result))
return result
return wrapper
return decorator
# 配置Redis连接
redis_client = redis.StrictRedis(host='localhost', port=6379, db=0, decode_responses=True)
app = FastAPI()
# 初始化客户端
option = JmOption.default()
client = option.new_jm_client()
@app.post("/login/")
def login(username: str, password: str):
try:
client.login(username, password)
return {"message": "Login successful"}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
from datetime import timedelta
# 设置缓存时间为3天
cache_time = timedelta(days=3)
@app.get("/search/")
@cache_result(cache_time)
def search_site(search_query: str, page: int = 1):
try:
page = client.search_site(search_query=search_query, page=page)
results = [{"album_id": album_id, "title": title} for album_id, title in page]
return results
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/album/{album_id}/")
@cache_result(cache_time)
def get_album_details(album_id: int):
try:
page = client.search_site(search_query=str(album_id))
album = page.single_album
# 存储所有图片的URL
image_urls = []
nums = []
# 遍历每个章节
for photo in album:
# 章节实体类
photo_detail = client.get_photo_detail(photo.photo_id, False)
# 遍历每个图片
for image in photo_detail:
# 图片实体类
image_urls.append(image.img_url)
nums.append(JmImageTool.get_num_by_url(image.scramble_id, image.img_url))
return {
"album_id": album.album_id,
"scramble_id": album.scramble_id,
"name": album.name,
"page_count": album.page_count,
"pub_date": album.pub_date,
"update_date": album.update_date,
"likes": album.likes,
"views": album.views,
"comment_count": album.comment_count,
"works": album.works,
"actors": album.actors,
"authors": album.authors,
"tags": album.tags,
"related_list": album.related_list,
"episode_list": album.episode_list,
"image_urls": image_urls,
"nums": nums
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/album/{album_id}/chapters/")
@cache_result(cache_time)
def get_album_chapters_paginated(album_id: int, page: int = 1, per_page: int = 5):
"""
分页获取专辑章节列表
:param album_id: 专辑ID
:param page: 页码从1开始
:param per_page: 每页章节数
"""
try:
page_result = client.search_site(search_query=str(album_id))
album = page_result.single_album
# 计算分页信息
total_chapters = len(album.photos)
total_pages = (total_chapters + per_page - 1) // per_page # 向上取整
if page < 1 or page > total_pages:
raise HTTPException(status_code=404, detail="Page out of range")
# 计算当前页的章节范围
start_index = (page - 1) * per_page
end_index = min(start_index + per_page, total_chapters)
# 获取当前页的章节信息
chapters = []
for i in range(start_index, end_index):
photo = album.photos[i]
chapters.append({
"chapter_index": i,
"chapter_id": photo.photo_id,
"title": photo.name,
"page_count": photo.page_count,
"pub_date": photo.pub_date
})
return {
"album_id": album.album_id,
"album_name": album.name,
"current_page": page,
"per_page": per_page,
"total_chapters": total_chapters,
"total_pages": total_pages,
"chapters": chapters
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/fast/{album_id}/")
# @cache_result(cache_time)
def get_album_details(album_id: int):
try:
page = client.search_site(search_query=str(album_id))
album = page.single_album
# 存储所有图片的URL
image_urls = []
nums = []
# 遍历每个章节
cut = 0
cut2 = 0
for photo in album:
if cut == 1:
break
cut = cut + 1
# 章节实体类
photo_detail = client.get_photo_detail(photo.photo_id, False)
# 遍历每个图片
for image in photo_detail:
# 图片实体类
if cut2 == 1:
break
cut2 = cut2 + 1
image_urls.append(image.img_url)
nums.append(JmImageTool.get_num_by_url(image.scramble_id, image.img_url))
return {
"album_id": album.album_id,
"scramble_id": album.scramble_id,
"name": album.name,
"page_count": album.page_count,
"pub_date": album.pub_date,
"update_date": album.update_date,
"likes": album.likes,
"views": album.views,
"comment_count": album.comment_count,
"works": album.works,
"actors": album.actors,
"authors": album.authors,
"tags": album.tags,
"related_list": album.related_list,
"episode_list": album.episode_list,
"image_urls": image_urls,
"nums": nums
}
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/favorites/")
@cache_result(cache_time)
def get_favorites(username: str):
try:
favorites = []
for page in client.favorite_folder_gen(username=username):
for aid, atitle in page:
favorites.append({"album_id": aid, "title": atitle})
return favorites
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/categories/")
@cache_result(cache_time)
def get_categories(page: int = 1, time: str = JmMagicConstants.TIME_ALL, category: str = JmMagicConstants.CATEGORY_ALL,
order_by: str = JmMagicConstants.ORDER_BY_LATEST):
try:
page = client.categories_filter(page=page, time=time, category=category, order_by=order_by)
results = [{"album_id": aid, "title": atitle} for aid, atitle in page]
return results
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/rankings/year")
@cache_result(cache_time)
def get_rankings_year(page: int):
try:
op = JmOption.default()
cl = op.new_jm_client()
page: JmCategoryPage = cl.categories_filter(
page=page,
time=JmMagicConstants.TIME_ALL,
category=JmMagicConstants.CATEGORY_ALL,
order_by=JmMagicConstants.ORDER_BY_LATEST,
)
results = [{"album_id": aid, "title": atitle} for aid, atitle in page]
return results
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/rankings/mouth")
@cache_result(cache_time)
def get_rankings_mouth(page: int):
try:
op = JmOption.default()
cl = op.new_jm_client()
page2: JmCategoryPage = cl.categories_filter(
page=page,
time=JmMagicConstants.TIME_MONTH,
category=JmMagicConstants.CATEGORY_ALL,
order_by=JmMagicConstants.ORDER_BY_LATEST,
)
results = [{"album_id": aid, "title": atitle} for aid, atitle in page2]
return results
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
@app.get("/rankings/week")
@cache_result(cache_time)
def get_rankings_week(page: int):
try:
op = JmOption.default()
cl = op.new_jm_client()
page2: JmCategoryPage = cl.categories_filter(
page=page,
time=JmMagicConstants.TIME_WEEK,
category=JmMagicConstants.CATEGORY_ALL,
order_by=JmMagicConstants.ORDER_BY_LATEST,
)
results = [{"album_id": aid, "title": atitle} for aid, atitle in page2]
return results
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))
# 启动API
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8982)

29
jm/src/jmcomic/__init__.py Executable file
View File

@@ -0,0 +1,29 @@
# 模块依赖关系如下:
# 被依赖方 <--- 使用方
# config <--- entity <--- toolkit <--- client <--- option <--- downloader
__version__ = '2.5.35'
from .api import *
from .jm_plugin import *
# 下面进行注册组件(客户端、插件)
gb = dict(filter(lambda pair: isinstance(pair[1], type), globals().items()))
def register_jmcomic_component(variables: Dict[str, Any], method, valid_interface: type):
for v in variables.values():
if v != valid_interface and issubclass(v, valid_interface):
method(v)
# 注册客户端
register_jmcomic_component(gb,
JmModuleConfig.register_client,
JmcomicClient,
)
# 注册插件
register_jmcomic_component(gb,
JmModuleConfig.register_plugin,
JmOptionPlugin,
)

131
jm/src/jmcomic/api.py Executable file
View File

@@ -0,0 +1,131 @@
from .jm_downloader import *
__DOWNLOAD_API_RET = Tuple[JmAlbumDetail, JmDownloader]
def download_batch(download_api,
jm_id_iter: Union[Iterable, Generator],
option=None,
downloader=None,
) -> Set[__DOWNLOAD_API_RET]:
"""
批量下载 album / photo
一个album/photo对应一个线程对应一个option
:param download_api: 下载api
:param jm_id_iter: jmid (album_id, photo_id) 的迭代器
:param option: 下载选项所有的jmid共用一个option
:param downloader: 下载器类
"""
from common import multi_thread_launcher
if option is None:
option = JmModuleConfig.option_class().default()
result = set()
def callback(*ret):
result.add(ret)
multi_thread_launcher(
iter_objs=set(
JmcomicText.parse_to_jm_id(jmid)
for jmid in jm_id_iter
),
apply_each_obj_func=lambda aid: download_api(aid,
option,
downloader,
callback=callback,
),
wait_finish=True
)
return result
def download_album(jm_album_id,
option=None,
downloader=None,
callback=None,
check_exception=True,
) -> Union[__DOWNLOAD_API_RET, Set[__DOWNLOAD_API_RET]]:
"""
下载一个本子album包含其所有的章节photo
当jm_album_id不是str或int时视为批量下载相当于调用 download_batch(download_album, jm_album_id, option, downloader)
:param jm_album_id: 本子的禁漫车号
:param option: 下载选项
:param downloader: 下载器类
:param callback: 返回值回调函数,可以拿到 album 和 downloader
:param check_exception: 是否检查异常, 如果为True会检查downloader是否有下载异常并上抛PartialDownloadFailedException
:return: 对于的本子实体类下载器如果是上述的批量情况返回值为download_batch的返回值
"""
if not isinstance(jm_album_id, (str, int)):
return download_batch(download_album, jm_album_id, option, downloader)
with new_downloader(option, downloader) as dler:
album = dler.download_album(jm_album_id)
if callback is not None:
callback(album, dler)
if check_exception:
dler.raise_if_has_exception()
return album, dler
def download_photo(jm_photo_id,
option=None,
downloader=None,
callback=None,
check_exception=True,
):
"""
下载一个章节photo参数同 download_album
"""
if not isinstance(jm_photo_id, (str, int)):
return download_batch(download_photo, jm_photo_id, option)
with new_downloader(option, downloader) as dler:
photo = dler.download_photo(jm_photo_id)
if callback is not None:
callback(photo, dler)
if check_exception:
dler.raise_if_has_exception()
return photo, dler
def new_downloader(option=None, downloader=None) -> JmDownloader:
if option is None:
option = JmModuleConfig.option_class().default()
if downloader is None:
downloader = JmModuleConfig.downloader_class()
return downloader(option)
def create_option_by_file(filepath):
return JmModuleConfig.option_class().from_file(filepath)
def create_option_by_env(env_name='JM_OPTION_PATH'):
from .cl import get_env
filepath = get_env(env_name, None)
ExceptionTool.require_true(filepath is not None,
f'未配置环境变量: {env_name}请配置为option的文件路径')
return create_option_by_file(filepath)
def create_option_by_str(text: str, mode=None):
if mode is None:
mode = PackerUtil.mode_yml
data = PackerUtil.unpack_by_str(text, mode)[0]
return JmModuleConfig.option_class().construct(data)
create_option = create_option_by_file

121
jm/src/jmcomic/cl.py Executable file
View File

@@ -0,0 +1,121 @@
"""
command-line usage
for example, download album 123 456, photo 333:
$ jmcomic 123 456 p333 --option="D:/option.yml"
"""
import os.path
from typing import List, Optional
def get_env(name, default):
import os
value = os.getenv(name, None)
if value is None or value == '':
return default
return value
class JmcomicUI:
def __init__(self) -> None:
self.option_path: Optional[str] = None
self.raw_id_list: List[str] = []
self.album_id_list: List[str] = []
self.photo_id_list: List[str] = []
def parse_arg(self):
import argparse
parser = argparse.ArgumentParser(prog='python -m jmcomic', description='JMComic Command Line Downloader')
parser.add_argument(
'id_list',
nargs='*',
help='input all album/photo ids that you want to download, separating them by spaces. '
'Need add a "p" prefix to indicate a photo id, such as `123 456 p333`.',
default=[],
)
parser.add_argument(
'--option',
help='path to the option file, you can also specify it by env `JM_OPTION_PATH`',
type=str,
default=get_env('JM_OPTION_PATH', ''),
)
args = parser.parse_args()
option = args.option
if len(option) == 0 or option == "''":
self.option_path = None
else:
self.option_path = os.path.abspath(option)
self.raw_id_list = args.id_list
self.parse_raw_id()
def parse_raw_id(self):
def parse(text):
from .jm_toolkit import JmcomicText
try:
return JmcomicText.parse_to_jm_id(text)
except Exception as e:
print(e.args[0])
exit(1)
for raw_id in self.raw_id_list:
if raw_id.startswith('p'):
self.photo_id_list.append(parse(raw_id[1:]))
elif raw_id.startswith('a'):
self.album_id_list.append(parse(raw_id[1:]))
else:
self.album_id_list.append(parse(raw_id))
def main(self):
self.parse_arg()
from .api import jm_log
jm_log('command_line',
f'start downloading...\n'
f'- using option: [{self.option_path or "default"}]\n'
f'to be downloaded: \n'
f'- album: {self.album_id_list}\n'
f'- photo: {self.photo_id_list}')
from .api import create_option, JmOption
if self.option_path is not None:
option = create_option(self.option_path)
else:
option = JmOption.default()
self.run(option)
def run(self, option):
from .api import download_album, download_photo
from common import MultiTaskLauncher
if len(self.album_id_list) == 0:
download_photo(self.photo_id_list, option)
elif len(self.photo_id_list) == 0:
download_album(self.album_id_list, option)
else:
# 同时下载album和photo
launcher = MultiTaskLauncher()
launcher.create_task(
target=download_album,
args=(self.album_id_list, option)
)
launcher.create_task(
target=download_photo,
args=(self.photo_id_list, option)
)
launcher.wait_finish()
def main():
JmcomicUI().main()

1176
jm/src/jmcomic/jm_client_impl.py Executable file

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,606 @@
from .jm_toolkit import *
"""
Response Entity
"""
class JmResp:
def __init__(self, resp):
ExceptionTool.require_true(not isinstance(resp, JmResp), f'重复包装: {resp}')
self.resp = resp
@property
def is_success(self) -> bool:
return self.http_code == 200 and len(self.content) != 0
@property
def is_not_success(self) -> bool:
return not self.is_success
@property
def content(self):
return self.resp.content
@property
def http_code(self):
return self.resp.status_code
@property
def text(self) -> str:
return self.resp.text
@property
def url(self) -> str:
return self.resp.url
def require_success(self):
if self.is_not_success:
ExceptionTool.raises_resp(self.error_msg(), self)
def error_msg(self):
return self.text
class JmImageResp(JmResp):
def error_msg(self):
msg = f'禁漫图片获取失败: [{self.url}]'
if self.http_code != 200:
msg += f'http状态码={self.http_code}'
if len(self.content) == 0:
msg += f',响应数据为空'
return msg
def transfer_to(self,
path,
scramble_id,
decode_image=True,
img_url=None,
):
img_url = img_url or self.url
if decode_image is False or scramble_id is None:
# 不解密图片,直接保存文件
JmImageTool.save_resp_img(
self,
path,
need_convert=suffix_not_equal(img_url[:img_url.find("?")], path),
)
else:
# 解密图片并保存文件
JmImageTool.decode_and_save(
JmImageTool.get_num_by_url(scramble_id, img_url),
JmImageTool.open_image(self.content),
path,
)
class JmJsonResp(JmResp):
@field_cache()
def json(self) -> Dict:
try:
return self.resp.json()
except Exception as e:
ExceptionTool.raises_resp(f'json解析失败: {e}', self, JsonResolveFailException)
def model(self) -> AdvancedDict:
return AdvancedDict(self.json())
class JmApiResp(JmJsonResp):
def __init__(self, resp, ts: str):
super().__init__(resp)
self.ts = ts
@property
def is_success(self) -> bool:
return super().is_success and self.json()['code'] == 200
@property
@field_cache()
def decoded_data(self) -> str:
return JmCryptoTool.decode_resp_data(self.encoded_data, self.ts)
@property
def encoded_data(self) -> str:
return self.json()['data']
@property
def res_data(self) -> Any:
self.require_success()
from json import loads
return loads(self.decoded_data)
@property
def model_data(self) -> AdvancedDict:
self.require_success()
return AdvancedDict(self.res_data)
# album-comment
class JmAlbumCommentResp(JmJsonResp):
def is_success(self) -> bool:
return super().is_success and self.json()['err'] is False
"""
Client Interface
"""
class JmDetailClient:
def get_album_detail(self, album_id) -> JmAlbumDetail:
raise NotImplementedError
def get_photo_detail(self,
photo_id,
fetch_album=True,
fetch_scramble_id=True,
) -> JmPhotoDetail:
raise NotImplementedError
def check_photo(self, photo: JmPhotoDetail):
"""
photo来源有两种:
1. album[?]
2. client.get_photo_detail(?)
其中,只有[2]是可以包含下载图片的url信息的。
本方法会检查photo是不是[1]
如果是[1],通过请求获取[2]然后把2中的一些重要字段更新到1中
:param photo: 被检查的JmPhotoDetail对象
"""
# 检查 from_album
if photo.from_album is None:
photo.from_album = self.get_album_detail(photo.album_id)
# 检查 page_arr 和 data_original_domain
if photo.page_arr is None or photo.data_original_domain is None:
new = self.get_photo_detail(photo.photo_id, False)
new.from_album = photo.from_album
photo.__dict__.update(new.__dict__)
class JmUserClient:
def login(self,
username: str,
password: str,
):
"""
1. 返回response响应对象
2. 保证当前client拥有登录cookies
"""
raise NotImplementedError
def album_comment(self,
video_id,
comment,
originator='',
status='true',
comment_id=None,
**kwargs,
) -> JmAlbumCommentResp:
"""
评论漫画/评论回复
:param video_id: album_id/photo_id
:param comment: 评论内容
:param status: 是否 "有劇透"
:param comment_id: 被回复评论的id
:param originator:
:returns: JmAcResp 对象
"""
raise NotImplementedError
def favorite_folder(self,
page=1,
order_by=JmMagicConstants.ORDER_BY_LATEST,
folder_id='0',
username='',
) -> JmFavoritePage:
"""
获取收藏了的漫画,文件夹默认是全部
:param folder_id: 文件夹id
:param page: 分页
:param order_by: 排序
:param username: 用户名
"""
raise NotImplementedError
def add_favorite_album(self,
album_id,
folder_id='0',
):
"""
把漫画加入收藏夹
"""
raise NotImplementedError
class JmImageClient:
# -- 下载图片 --
def download_image(self,
img_url: str,
img_save_path: str,
scramble_id: Optional[int] = None,
decode_image=True,
):
"""
下载JM的图片
:param img_url: 图片url
:param img_save_path: 图片保存位置
:param scramble_id: 图片所在photo的scramble_id
:param decode_image: 要保存的是解密后的图还是原图
"""
# 请求图片
resp = self.get_jm_image(img_url)
resp.require_success()
return self.save_image_resp(decode_image, img_save_path, img_url, resp, scramble_id)
# noinspection PyMethodMayBeStatic
def save_image_resp(self, decode_image, img_save_path, img_url, resp, scramble_id):
resp.transfer_to(img_save_path, scramble_id, decode_image, img_url)
def download_by_image_detail(self,
image: JmImageDetail,
img_save_path,
decode_image=True,
):
return self.download_image(
image.download_url,
img_save_path,
int(image.scramble_id),
decode_image=decode_image,
)
def get_jm_image(self, img_url) -> JmImageResp:
raise NotImplementedError
@classmethod
def img_is_not_need_to_decode(cls, data_original: str, _resp) -> bool:
# https://cdn-msp2.18comic.vip/media/photos/498976/00027.gif?v=1697541064
query_params_index = data_original.find('?')
if query_params_index != -1:
data_original = data_original[:query_params_index]
# https://cdn-msp2.18comic.vip/media/photos/498976/00027.gif
return data_original.endswith('.gif')
class JmSearchAlbumClient:
"""
搜尋的最佳姿勢?
【包含搜尋】
搜尋[+]全彩[空格][+]人妻,僅顯示全彩且是人妻的本本
範例:+全彩 +人妻
【排除搜尋】
搜尋全彩[空格][-]人妻,顯示全彩並排除人妻的本本
範例:全彩 -人妻
【我都要搜尋】
搜尋全彩[空格]人妻,會顯示所有包含全彩及人妻的本本
範例:全彩 人妻
"""
def search(self,
search_query: str,
page: int,
main_tag: int,
order_by: str,
time: str,
category: str,
sub_category: Optional[str],
) -> JmSearchPage:
"""
搜索【成人A漫】
网页端与移动端的搜索有差别:
- 移动端不支持 category, sub_category参数网页端支持全部参数
"""
raise NotImplementedError
def search_site(self,
search_query: str,
page: int = 1,
order_by: str = JmMagicConstants.ORDER_BY_LATEST,
time: str = JmMagicConstants.TIME_ALL,
category: str = JmMagicConstants.CATEGORY_ALL,
sub_category: Optional[str] = None,
):
"""
对应禁漫的站内搜索
"""
return self.search(search_query, page, 0, order_by, time, category, sub_category)
def search_work(self,
search_query: str,
page: int = 1,
order_by: str = JmMagicConstants.ORDER_BY_LATEST,
time: str = JmMagicConstants.TIME_ALL,
category: str = JmMagicConstants.CATEGORY_ALL,
sub_category: Optional[str] = None,
):
"""
搜索album的作品 work
"""
return self.search(search_query, page, 1, order_by, time, category, sub_category)
def search_author(self,
search_query: str,
page: int = 1,
order_by: str = JmMagicConstants.ORDER_BY_LATEST,
time: str = JmMagicConstants.TIME_ALL,
category: str = JmMagicConstants.CATEGORY_ALL,
sub_category: Optional[str] = None,
):
"""
搜索album的作者 author
"""
return self.search(search_query, page, 2, order_by, time, category, sub_category)
def search_tag(self,
search_query: str,
page: int = 1,
order_by: str = JmMagicConstants.ORDER_BY_LATEST,
time: str = JmMagicConstants.TIME_ALL,
category: str = JmMagicConstants.CATEGORY_ALL,
sub_category: Optional[str] = None,
):
"""
搜索album的标签 tag
"""
return self.search(search_query, page, 3, order_by, time, category, sub_category)
def search_actor(self,
search_query: str,
page: int = 1,
order_by: str = JmMagicConstants.ORDER_BY_LATEST,
time: str = JmMagicConstants.TIME_ALL,
category: str = JmMagicConstants.CATEGORY_ALL,
sub_category: Optional[str] = None,
):
"""
搜索album的登场角色 actor
"""
return self.search(search_query, page, 4, order_by, time, category, sub_category)
class JmCategoryClient:
"""
该接口可以看作是对全体禁漫本子的排行,热门排行的功能也派生于此
月排行 = 分类【时间=月,排序=观看】
周排行 = 分类【时间=周,排序=观看】
日排行 = 分类【时间=周,排序=观看】
"""
def categories_filter(self,
page: int,
time: str,
category: str,
order_by: str,
sub_category: Optional[str] = None,
) -> JmCategoryPage:
"""
分类
:param page: 页码
:param time: 时间范围,默认是全部时间
:param category: 类别,默认是最新,即显示最新的禁漫本子
:param sub_category: 副分类,仅网页端有这功能
:param order_by: 排序方式,默认是观看数
"""
raise NotImplementedError
def month_ranking(self,
page: int,
category: str = JmMagicConstants.CATEGORY_ALL,
):
"""
月排行 = 分类【时间=月,排序=观看】
"""
return self.categories_filter(page,
JmMagicConstants.TIME_MONTH,
category,
JmMagicConstants.ORDER_BY_VIEW,
)
def week_ranking(self,
page: int,
category: str = JmMagicConstants.CATEGORY_ALL,
):
"""
周排行 = 分类【时间=周,排序=观看】
"""
return self.categories_filter(page,
JmMagicConstants.TIME_WEEK,
category,
JmMagicConstants.ORDER_BY_VIEW,
)
def day_ranking(self,
page: int,
category: str = JmMagicConstants.CATEGORY_ALL,
):
"""
日排行 = 分类【时间=日,排序=观看】
"""
return self.categories_filter(page,
JmMagicConstants.TIME_TODAY,
category,
JmMagicConstants.ORDER_BY_VIEW,
)
# noinspection PyAbstractClass
class JmcomicClient(
JmImageClient,
JmDetailClient,
JmUserClient,
JmSearchAlbumClient,
JmCategoryClient,
Postman,
):
client_key: None
def get_domain_list(self) -> List[str]:
"""
获取当前client的域名配置
"""
raise NotImplementedError
def set_domain_list(self, domain_list: List[str]):
"""
设置当前client的域名配置
"""
raise NotImplementedError
def set_cache_dict(self, cache_dict: Optional[Dict]):
raise NotImplementedError
def get_cache_dict(self) -> Optional[Dict]:
raise NotImplementedError
def of_api_url(self, api_path, domain):
raise NotImplementedError
def get_html_domain(self):
return JmModuleConfig.get_html_domain(self.get_root_postman())
def get_html_domain_all(self):
return JmModuleConfig.get_html_domain_all(self.get_root_postman())
def get_html_domain_all_via_github(self):
return JmModuleConfig.get_html_domain_all_via_github(self.get_root_postman())
# noinspection PyMethodMayBeStatic
def do_page_iter(self, params: dict, page: int, get_page_method):
from math import inf
def update(value: Optional[Dict], page: int, page_content: JmPageContent):
if value is None:
return page + 1, page_content.page_count
ExceptionTool.require_true(isinstance(value, dict), 'require dict params')
# 根据外界传递的参数更新params和page
page = value.get('page', page)
params.update(value)
return page, inf
total = inf
while page <= total:
params['page'] = page
page_content = get_page_method(**params)
value = yield page_content
page, total = update(value, page, page_content)
def favorite_folder_gen(self,
page=1,
order_by=JmMagicConstants.ORDER_BY_LATEST,
folder_id='0',
username='',
) -> Generator[JmFavoritePage, Dict, None]:
"""
见 search_gen
"""
params = {
'order_by': order_by,
'folder_id': folder_id,
'username': username,
}
yield from self.do_page_iter(params, page, self.favorite_folder)
def search_gen(self,
search_query: str,
main_tag=0,
page: int = 1,
order_by: str = JmMagicConstants.ORDER_BY_LATEST,
time: str = JmMagicConstants.TIME_ALL,
category: str = JmMagicConstants.CATEGORY_ALL,
sub_category: Optional[str] = None,
) -> Generator[JmSearchPage, Dict, None]:
"""
搜索结果的生成器,支持下面这种调用方式:
```
for page in self.search_gen('无修正'):
# 每次循环page为新页的结果
pass
```
同时支持外界send参数可以改变搜索的设定例如
```
gen = client.search_gen('MANA')
for i, page in enumerate(gen):
print(page.page_count)
page = gen.send({
'search_query': '+MANA +无修正',
'page': 1
})
print(page.page_count)
break
```
"""
params = {
'search_query': search_query,
'main_tag': main_tag,
'order_by': order_by,
'time': time,
'category': category,
'sub_category': sub_category,
}
yield from self.do_page_iter(params, page, self.search)
def categories_filter_gen(self,
page: int = 1,
time: str = JmMagicConstants.TIME_ALL,
category: str = JmMagicConstants.CATEGORY_ALL,
order_by: str = JmMagicConstants.ORDER_BY_LATEST,
sub_category: Optional[str] = None,
) -> Generator[JmCategoryPage, Dict, None]:
"""
见 search_gen
"""
params = {
'time': time,
'category': category,
'order_by': order_by,
'sub_category': sub_category,
}
yield from self.do_page_iter(params, page, self.categories_filter)
def is_given_type(self, ctype: Type['JmcomicClient']) -> bool:
"""
Client代理的此方法会被路由到内部client的方法
ClientProxy(AClient()).is_given_type(AClient) is True
但是: ClientProxy(AClient()).client_key != AClient.client_key
"""
if isinstance(self, ctype):
return True
if self.client_key == ctype.client_key:
return True
return False

497
jm/src/jmcomic/jm_config.py Executable file
View File

@@ -0,0 +1,497 @@
from common import time_stamp, field_cache, ProxyBuilder
def shuffled(lines):
from random import shuffle
from common import str_to_list
ls = str_to_list(lines)
shuffle(ls)
return ls
def default_jm_logging(topic: str, msg: str):
from common import format_ts, current_thread
print('[{}] [{}]:【{}{}'.format(format_ts(), current_thread().name, topic, msg))
# 禁漫常量
class JmMagicConstants:
# 搜索参数-排序
ORDER_BY_LATEST = 'mr'
ORDER_BY_VIEW = 'mv'
ORDER_BY_PICTURE = 'mp'
ORDER_BY_LIKE = 'tf'
ORDER_MONTH_RANKING = 'mv_m'
ORDER_WEEK_RANKING = 'mv_w'
ORDER_DAY_RANKING = 'mv_t'
# 搜索参数-时间段
TIME_TODAY = 't'
TIME_WEEK = 'w'
TIME_MONTH = 'm'
TIME_ALL = 'a'
# 分类参数API接口的category
CATEGORY_ALL = '0' # 全部
CATEGORY_DOUJIN = 'doujin' # 同人
CATEGORY_SINGLE = 'single' # 单本
CATEGORY_SHORT = 'short' # 短篇
CATEGORY_ANOTHER = 'another' # 其他
CATEGORY_HANMAN = 'hanman' # 韩漫
CATEGORY_MEIMAN = 'meiman' # 美漫
CATEGORY_DOUJIN_COSPLAY = 'doujin_cosplay' # cosplay
CATEGORY_3D = '3D' # 3D
CATEGORY_ENGLISH_SITE = 'english_site' # 英文站
# 副分类
SUB_CHINESE = 'chinese' # 汉化,通用副分类
SUB_JAPANESE = 'japanese' # 日语,通用副分类
# 其他类CATEGORY_ANOTHER的副分类
SUB_ANOTHER_OTHER = 'other' # 其他漫画
SUB_ANOTHER_3D = '3d' # 3D
SUB_ANOTHER_COSPLAY = 'cosplay' # cosplay
# 同人SUB_CHINESE的副分类
SUB_DOUJIN_CG = 'CG' # CG
SUB_DOUJIN_CHINESE = SUB_CHINESE
SUB_DOUJIN_JAPANESE = SUB_JAPANESE
# 短篇CATEGORY_SHORT的副分类
SUB_SHORT_CHINESE = SUB_CHINESE
SUB_SHORT_JAPANESE = SUB_JAPANESE
# 单本CATEGORY_SINGLE的副分类
SUB_SINGLE_CHINESE = SUB_CHINESE
SUB_SINGLE_JAPANESE = SUB_JAPANESE
SUB_SINGLE_YOUTH = 'youth'
# 图片分割参数
SCRAMBLE_220980 = 220980
SCRAMBLE_268850 = 268850
SCRAMBLE_421926 = 421926 # 2023-02-08后改了图片切割算法
# 移动端API密钥
APP_TOKEN_SECRET = '18comicAPP'
APP_TOKEN_SECRET_2 = '18comicAPPContent'
APP_DATA_SECRET = '185Hcomic3PAPP7R'
APP_VERSION = '1.7.9'
# 模块级别共用配置
class JmModuleConfig:
# 网站相关
PROT = "https://"
JM_REDIRECT_URL = f'{PROT}jm365.work/3YeBdF' # 永久網域,怕走失的小伙伴收藏起来
JM_PUB_URL = f'{PROT}jmcomic-fb.vip'
JM_CDN_IMAGE_URL_TEMPLATE = PROT + 'cdn-msp.{domain}/media/photos/{photo_id}/{index:05}{suffix}' # index 从1开始
JM_IMAGE_SUFFIX = ['.jpg', '.webp', '.png', '.gif']
# JM的异常网页内容
JM_ERROR_RESPONSE_TEXT = {
"Could not connect to mysql! Please check your database settings!": "禁漫服务器内部报错",
"Restricted Access!": "禁漫拒绝你所在ip地区的访问你可以选择: 换域名/换代理",
}
# JM的异常网页code
JM_ERROR_STATUS_CODE = {
403: 'ip地区禁止访问/爬虫被识别',
500: '500: 禁漫服务器内部异常可能是服务器过载可以切换ip或稍后重试',
520: '520: Web server is returning an unknown error (禁漫服务器内部报错)',
524: '524: The origin web server timed out responding to this request. (禁漫服务器处理超时)',
}
# 分页大小
PAGE_SIZE_SEARCH = 80
PAGE_SIZE_FAVORITE = 20
# 图片分隔相关
SCRAMBLE_CACHE = {}
# 当本子没有作者名字时,顶替作者名字
DEFAULT_AUTHOR = 'default_author'
# cookies目前只在移动端使用因为移动端请求接口须携带但不会校验cookies的内容。
APP_COOKIES = None
# 移动端图片域名
DOMAIN_IMAGE_LIST = shuffled('''
cdn-msp.jmapiproxy1.cc
cdn-msp.jmapiproxy2.cc
cdn-msp2.jmapiproxy2.cc
cdn-msp3.jmapiproxy2.cc
cdn-msp.jmapinodeudzn.net
cdn-msp3.jmapinodeudzn.net
''')
# 移动端API域名
DOMAIN_API_LIST = shuffled('''
www.cdnmhwscc.vip
www.cdnblackmyth.club
www.cdnmhws.cc
www.cdnuc.vip
''')
APP_HEADERS_TEMPLATE = {
'Accept-Encoding': 'gzip, deflate',
'user-agent': 'Mozilla/5.0 (Linux; Android 9; V1938CT Build/PQ3A.190705.11211812; wv) AppleWebKit/537.36 (KHTML, '
'like Gecko) Version/4.0 Chrome/91.0.4472.114 Safari/537.36',
}
APP_HEADERS_IMAGE = {
'Accept': 'image/avif,image/webp,image/apng,image/svg+xml,image/*,*/*;q=0.8',
'X-Requested-With': 'com.jiaohua_browser',
'Referer': PROT + DOMAIN_API_LIST[0],
'Accept-Language': 'zh-CN,zh;q=0.9,en-US;q=0.8,en;q=0.7',
}
# 网页端headers
HTML_HEADERS_TEMPLATE = {
'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,'
'application/signed-exchange;v=b3;q=0.7',
'accept-language': 'zh-CN,zh;q=0.9',
'cache-control': 'no-cache',
'dnt': '1',
'pragma': 'no-cache',
'priority': 'u=0, i',
'referer': 'https://18comic.vip/',
'sec-ch-ua': '"Chromium";v="124", "Google Chrome";v="124", "Not-A.Brand";v="99"',
'sec-ch-ua-mobile': '?0',
'sec-ch-ua-platform': '"Windows"',
'sec-fetch-dest': 'document',
'sec-fetch-mode': 'navigate',
'sec-fetch-site': 'none',
'sec-fetch-user': '?1',
'upgrade-insecure-requests': '1',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/124.0.0.0 '
'Safari/537.36',
}
# 网页端域名配置
# 无需配置默认为None需要的时候会发起请求获得
# 使用优先级:
# 1. DOMAIN_HTML_LIST
# 2. [DOMAIN_HTML]
DOMAIN_HTML = None
DOMAIN_HTML_LIST = None
# 模块级别的可重写类配置
CLASS_DOWNLOADER = None
CLASS_OPTION = None
CLASS_ALBUM = None
CLASS_PHOTO = None
CLASS_IMAGE = None
# 客户端注册表
REGISTRY_CLIENT = {}
# 插件注册表
REGISTRY_PLUGIN = {}
# 异常监听器
# key: 异常类
# value: 函数,参数只有异常对象,无需返回值
# 这个异常类或者这个异常的子类的实例将要被raise前你的listener方法会被调用
REGISTRY_EXCEPTION_LISTENER = {}
# 执行log的函数
EXECUTOR_LOG = default_jm_logging
# 使用固定时间戳
FLAG_USE_FIX_TIMESTAMP = True
# 移动端Client初始化cookies
FLAG_API_CLIENT_REQUIRE_COOKIES = True
# log开关标记
FLAG_ENABLE_JM_LOG = True
# log时解码url
FLAG_DECODE_URL_WHEN_LOGGING = True
# 当内置的版本号落后时使用最新的禁漫app版本号
FLAG_USE_VERSION_NEWER_IF_BEHIND = True
# 关联dir_rule的自定义字段与对应的处理函数
# 例如:
# Amyname -> JmModuleConfig.AFIELD_ADVICE['myname'] = lambda album: "自定义名称"
AFIELD_ADVICE = dict()
PFIELD_ADVICE = dict()
# 当发生 oserror: [Errno 36] File name too long 时,
# 把文件名限制在指定个字符以内
VAR_FILE_NAME_LENGTH_LIMIT = 100
@classmethod
def downloader_class(cls):
if cls.CLASS_DOWNLOADER is not None:
return cls.CLASS_DOWNLOADER
from .jm_downloader import JmDownloader
return JmDownloader
@classmethod
def option_class(cls):
if cls.CLASS_OPTION is not None:
return cls.CLASS_OPTION
from .jm_option import JmOption
return JmOption
@classmethod
def album_class(cls):
if cls.CLASS_ALBUM is not None:
return cls.CLASS_ALBUM
from .jm_entity import JmAlbumDetail
return JmAlbumDetail
@classmethod
def photo_class(cls):
if cls.CLASS_PHOTO is not None:
return cls.CLASS_PHOTO
from .jm_entity import JmPhotoDetail
return JmPhotoDetail
@classmethod
def image_class(cls):
if cls.CLASS_IMAGE is not None:
return cls.CLASS_IMAGE
from .jm_entity import JmImageDetail
return JmImageDetail
@classmethod
def client_impl_class(cls, client_key: str):
clazz_dict = cls.REGISTRY_CLIENT
clazz = clazz_dict.get(client_key, None)
if clazz is None:
from .jm_toolkit import ExceptionTool
ExceptionTool.raises(f'not found client impl class for key: "{client_key}"')
return clazz
@classmethod
@field_cache("DOMAIN_HTML")
def get_html_domain(cls, postman=None):
"""
由于禁漫的域名经常变化,调用此方法可以获取一个当前可用的最新的域名 domain
并且设置把 domain 设置为禁漫模块的默认域名。
这样一来,配置文件也不用配置域名了,一切都在运行时动态获取。
"""
from .jm_toolkit import JmcomicText
return JmcomicText.parse_to_jm_domain(cls.get_html_url(postman))
@classmethod
def get_html_url(cls, postman=None):
"""
访问禁漫的永久网域,从而得到一个可用的禁漫网址
:returns: https://jm-comic2.cc
"""
postman = postman or cls.new_postman(session=True)
url = postman.with_redirect_catching().get(cls.JM_REDIRECT_URL)
cls.jm_log('module.html_url', f'获取禁漫网页URL: [{cls.JM_REDIRECT_URL}] → [{url}]')
return url
@classmethod
@field_cache("DOMAIN_HTML_LIST")
def get_html_domain_all(cls, postman=None):
"""
访问禁漫发布页,得到所有的禁漫网页域名
:returns: ['18comic.vip', ..., 'jm365.xyz/ZNPJam'], 最后一个是【APP軟件下載】
"""
postman = postman or cls.new_postman(session=True)
resp = postman.get(cls.JM_PUB_URL)
if resp.status_code != 200:
from .jm_toolkit import ExceptionTool
ExceptionTool.raises_resp(f'请求失败访问禁漫发布页获取所有域名HTTP状态码为: {resp.status_code}', resp)
from .jm_toolkit import JmcomicText
domain_list = JmcomicText.analyse_jm_pub_html(resp.text)
cls.jm_log('module.html_domain_all', f'获取禁漫网页全部域名: [{resp.url}] → {domain_list}')
return domain_list
@classmethod
def get_html_domain_all_via_github(cls,
postman=None,
template='https://jmcmomic.github.io/go/{}.html',
index_range=(300, 309)
):
"""
通过禁漫官方的github号的repo获取最新的禁漫域名
https://github.com/jmcmomic/jmcmomic.github.io
"""
postman = postman or cls.new_postman(headers={
'authority': 'github.com',
'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 '
'Safari/537.36'
})
domain_set = set()
def fetch_domain(url):
resp = postman.get(url, allow_redirects=False)
text = resp.text
from .jm_toolkit import JmcomicText
for domain in JmcomicText.analyse_jm_pub_html(text):
if domain.startswith('jm365'):
continue
domain_set.add(domain)
from common import multi_thread_launcher
multi_thread_launcher(
iter_objs=[template.format(i) for i in range(*index_range)],
apply_each_obj_func=fetch_domain,
)
return domain_set
@classmethod
def new_html_headers(cls, domain='18comic.vip'):
"""
网页端的headers
"""
headers = cls.HTML_HEADERS_TEMPLATE.copy()
headers.update({
'authority': domain,
'origin': f'https://{domain}',
'referer': f'https://{domain}',
})
return headers
@classmethod
@field_cache()
def get_fix_ts_token_tokenparam(cls):
ts = time_stamp()
from .jm_toolkit import JmCryptoTool
token, tokenparam = JmCryptoTool.token_and_tokenparam(ts)
return ts, token, tokenparam
# noinspection PyUnusedLocal
@classmethod
def jm_log(cls, topic: str, msg: str):
if cls.FLAG_ENABLE_JM_LOG is True:
cls.EXECUTOR_LOG(topic, msg)
@classmethod
def disable_jm_log(cls):
cls.FLAG_ENABLE_JM_LOG = False
@classmethod
def new_postman(cls, session=False, **kwargs):
kwargs.setdefault('impersonate', 'chrome110')
kwargs.setdefault('headers', JmModuleConfig.new_html_headers())
kwargs.setdefault('proxies', JmModuleConfig.DEFAULT_PROXIES)
from common import Postmans
if session is True:
return Postmans.new_session(**kwargs)
return Postmans.new_postman(**kwargs)
# option 相关的默认配置
# 一般情况下建议使用option配置文件来定制配置
# 而如果只想修改几个简单常用的配置也可以下方的DEFAULT_XXX属性
JM_OPTION_VER = '2.1'
DEFAULT_CLIENT_IMPL = 'api' # 默认Client实现类型为网页端
DEFAULT_CLIENT_CACHE = None # 默认关闭Client缓存。缓存的配置详见 CacheRegistry
DEFAULT_PROXIES = ProxyBuilder.system_proxy() # 默认使用系统代理
DEFAULT_OPTION_DICT: dict = {
'log': None,
'dir_rule': {'rule': 'Bd_Pname', 'base_dir': None},
'download': {
'cache': True,
'image': {'decode': True, 'suffix': None},
'threading': {
'image': 30,
'photo': None,
},
},
'client': {
'cache': None, # see CacheRegistry
'domain': [],
'postman': {
'type': 'cffi',
'meta_data': {
'impersonate': 'chrome110',
'headers': None,
'proxies': None,
}
},
'impl': None,
'retry_times': 5,
},
'plugins': {
# 如果插件抛出参数校验异常只log。全局配置可以被插件的局部配置覆盖
# 可选值ignore忽略log打印日志raise抛异常
'valid': 'log',
},
}
@classmethod
def option_default_dict(cls) -> dict:
"""
返回JmOption.default()的默认配置字典。
这样做是为了支持外界自行覆盖option默认配置字典
"""
from copy import deepcopy
option_dict = deepcopy(cls.DEFAULT_OPTION_DICT)
# log
if option_dict['log'] is None:
option_dict['log'] = cls.FLAG_ENABLE_JM_LOG
# dir_rule.base_dir
dir_rule = option_dict['dir_rule']
if dir_rule['base_dir'] is None:
import os
dir_rule['base_dir'] = os.getcwd()
# client cache
client = option_dict['client']
if client['cache'] is None:
client['cache'] = cls.DEFAULT_CLIENT_CACHE
# client impl
if client['impl'] is None:
client['impl'] = cls.DEFAULT_CLIENT_IMPL
# postman proxies
meta_data = client['postman']['meta_data']
if meta_data['proxies'] is None:
# use system proxy by default
meta_data['proxies'] = cls.DEFAULT_PROXIES
# threading photo
dt = option_dict['download']['threading']
if dt['photo'] is None:
import os
dt['photo'] = os.cpu_count()
return option_dict
@classmethod
def register_plugin(cls, plugin_class):
from .jm_toolkit import ExceptionTool
ExceptionTool.require_true(getattr(plugin_class, 'plugin_key', None) is not None,
f'未配置plugin_key, class: {plugin_class}')
cls.REGISTRY_PLUGIN[plugin_class.plugin_key] = plugin_class
@classmethod
def register_client(cls, client_class):
from .jm_toolkit import ExceptionTool
ExceptionTool.require_true(getattr(client_class, 'client_key', None) is not None,
f'未配置client_key, class: {client_class}')
cls.REGISTRY_CLIENT[client_class.client_key] = client_class
@classmethod
def register_exception_listener(cls, etype, listener):
cls.REGISTRY_EXCEPTION_LISTENER[etype] = listener
jm_log = JmModuleConfig.jm_log
disable_jm_log = JmModuleConfig.disable_jm_log

350
jm/src/jmcomic/jm_downloader.py Executable file
View File

@@ -0,0 +1,350 @@
from .jm_option import *
def catch_exception(func):
from functools import wraps
@wraps(func)
def wrapper(self, *args, **kwargs):
self: JmDownloader
try:
return func(self, *args, **kwargs)
except Exception as e:
detail: JmBaseEntity = args[0]
if detail.is_image():
detail: JmImageDetail
jm_log('image.failed', f'图片下载失败: [{detail.download_url}], 异常: [{e}]')
self.download_failed_image.append((detail, e))
elif detail.is_photo():
detail: JmPhotoDetail
jm_log('photo.failed', f'章节下载失败: [{detail.id}], 异常: [{e}]')
self.download_failed_photo.append((detail, e))
raise e
return wrapper
# noinspection PyMethodMayBeStatic
class DownloadCallback:
def before_album(self, album: JmAlbumDetail):
jm_log('album.before',
f'本子获取成功: [{album.id}], '
f'作者: [{album.author}], '
f'章节数: [{len(album)}], '
f'总页数: [{album.page_count}], '
f'标题: [{album.name}], '
f'关键词: {album.tags}'
)
def after_album(self, album: JmAlbumDetail):
jm_log('album.after', f'本子下载完成: [{album.id}]')
def before_photo(self, photo: JmPhotoDetail):
jm_log('photo.before',
f'开始下载章节: {photo.id} ({photo.album_id}[{photo.index}/{len(photo.from_album)}]), '
f'标题: [{photo.name}], '
f'图片数为[{len(photo)}]'
)
def after_photo(self, photo: JmPhotoDetail):
jm_log('photo.after',
f'章节下载完成: [{photo.id}] ({photo.album_id}[{photo.index}/{len(photo.from_album)}])')
def before_image(self, image: JmImageDetail, img_save_path):
if image.exists:
jm_log('image.before',
f'图片已存在: {image.tag} ← [{img_save_path}]'
)
else:
jm_log('image.before',
f'图片准备下载: {image.tag}, [{image.img_url}] → [{img_save_path}]'
)
def after_image(self, image: JmImageDetail, img_save_path):
jm_log('image.after',
f'图片下载完成: {image.tag}, [{image.img_url}] → [{img_save_path}]')
class JmDownloader(DownloadCallback):
"""
JmDownloader = JmOption + 调度逻辑
"""
def __init__(self, option: JmOption) -> None:
self.option = option
self.client = option.build_jm_client()
# 下载成功的记录dict
self.download_success_dict: Dict[JmAlbumDetail, Dict[JmPhotoDetail, List[Tuple[str, JmImageDetail]]]] = {}
# 下载失败的记录list
self.download_failed_image: List[Tuple[JmImageDetail, BaseException]] = []
self.download_failed_photo: List[Tuple[JmPhotoDetail, BaseException]] = []
def download_album(self, album_id):
album = self.client.get_album_detail(album_id)
self.download_by_album_detail(album)
return album
def download_by_album_detail(self, album: JmAlbumDetail):
self.before_album(album)
if album.skip:
return
self.execute_on_condition(
iter_objs=album,
apply=self.download_by_photo_detail,
count_batch=self.option.decide_photo_batch_count(album)
)
self.after_album(album)
def download_photo(self, photo_id):
photo = self.client.get_photo_detail(photo_id)
self.download_by_photo_detail(photo)
return photo
@catch_exception
def download_by_photo_detail(self, photo: JmPhotoDetail):
self.client.check_photo(photo)
self.before_photo(photo)
if photo.skip:
return
self.execute_on_condition(
iter_objs=photo,
apply=self.download_by_image_detail,
count_batch=self.option.decide_image_batch_count(photo)
)
self.after_photo(photo)
@catch_exception
def download_by_image_detail(self, image: JmImageDetail):
img_save_path = self.option.decide_image_filepath(image)
image.save_path = img_save_path
image.exists = file_exists(img_save_path)
self.before_image(image, img_save_path)
if image.skip:
return
# let option decide use_cache and decode_image
use_cache = self.option.decide_download_cache(image)
decode_image = self.option.decide_download_image_decode(image)
# skip download
if use_cache is True and image.exists:
return
self.client.download_by_image_detail(
image,
img_save_path,
decode_image=decode_image,
)
self.after_image(image, img_save_path)
def execute_on_condition(self,
iter_objs: DetailEntity,
apply: Callable,
count_batch: int,
):
"""
调度本子/章节的下载
"""
iter_objs = self.do_filter(iter_objs)
count_real = len(iter_objs)
if count_real == 0:
return
if count_batch >= count_real:
# 一个图/章节 对应 一个线程
multi_thread_launcher(
iter_objs=iter_objs,
apply_each_obj_func=apply,
)
else:
# 创建batch个线程的线程池
thread_pool_executor(
iter_objs=iter_objs,
apply_each_obj_func=apply,
max_workers=count_batch,
)
# noinspection PyMethodMayBeStatic
def do_filter(self, detail: DetailEntity):
"""
该方法可用于过滤本子/章节,默认不会做过滤。
例如:
只想下载 本子的最新一章,返回 [album[-1]]
只想下载 章节的前10张图片返回 [photo[:10]]
:param detail: 可能是本子或者章节,需要自行使用 isinstance / detail.is_xxx 判断
:returns: 只想要下载的 本子的章节 或 章节的图片
"""
return detail
@property
def all_success(self) -> bool:
"""
是否成功下载了全部图片
该属性需要等到downloader的全部download_xxx方法完成后才有意义。
注意如果使用了filter机制例如通过filter只下载3张图片那么all_success也会为False
"""
if self.has_download_failures:
return False
for album, photo_dict in self.download_success_dict.items():
if len(album) != len(photo_dict):
return False
for photo, image_list in photo_dict.items():
if len(photo) != len(image_list):
return False
return True
@property
def has_download_failures(self):
return len(self.download_failed_image) != 0 or len(self.download_failed_photo) != 0
# 下面是回调方法
def before_album(self, album: JmAlbumDetail):
super().before_album(album)
self.download_success_dict.setdefault(album, {})
self.option.call_all_plugin(
'before_album',
album=album,
downloader=self,
)
def after_album(self, album: JmAlbumDetail):
super().after_album(album)
self.option.call_all_plugin(
'after_album',
album=album,
downloader=self,
)
def before_photo(self, photo: JmPhotoDetail):
super().before_photo(photo)
self.download_success_dict.setdefault(photo.from_album, {})
self.download_success_dict[photo.from_album].setdefault(photo, [])
self.option.call_all_plugin(
'before_photo',
photo=photo,
downloader=self,
)
def after_photo(self, photo: JmPhotoDetail):
super().after_photo(photo)
self.option.call_all_plugin(
'after_photo',
photo=photo,
downloader=self,
)
def before_image(self, image: JmImageDetail, img_save_path):
super().before_image(image, img_save_path)
self.option.call_all_plugin(
'before_image',
image=image,
downloader=self,
)
def after_image(self, image: JmImageDetail, img_save_path):
super().after_image(image, img_save_path)
photo = image.from_photo
album = photo.from_album
self.download_success_dict.get(album).get(photo).append((img_save_path, image))
self.option.call_all_plugin(
'after_image',
image=image,
downloader=self,
)
def raise_if_has_exception(self):
if not self.has_download_failures:
return
msg_ls = ['部分下载失败', '', '']
if len(self.download_failed_photo) != 0:
msg_ls[1] = f'{len(self.download_failed_photo)}个章节下载失败: {self.download_failed_photo}'
if len(self.download_failed_image) != 0:
msg_ls[2] = f'{len(self.download_failed_image)}个图片下载失败: {self.download_failed_image}'
ExceptionTool.raises(
'\n'.join(msg_ls),
{'downloader': self},
PartialDownloadFailedException,
)
# 下面是对with语法的支持
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
jm_log('dler.exception',
f'{self.__class__.__name__} Exit with exception: {exc_type, exc_val}'
)
@classmethod
def use(cls, *args, **kwargs):
"""
让本类替换JmModuleConfig.CLASS_DOWNLOADER
"""
JmModuleConfig.CLASS_DOWNLOADER = cls
class DoNotDownloadImage(JmDownloader):
"""
不会下载任何图片的Downloader用作测试
"""
def download_by_image_detail(self, image: JmImageDetail):
# ensure make dir
self.option.decide_image_filepath(image)
class JustDownloadSpecificCountImage(JmDownloader):
"""
只下载特定数量图片的Downloader用作测试
"""
from threading import Lock
count_lock = Lock()
count = 0
@catch_exception
def download_by_image_detail(self, image: JmImageDetail):
# ensure make dir
self.option.decide_image_filepath(image)
if self.try_countdown():
return super().download_by_image_detail(image)
def try_countdown(self):
if self.count < 0:
return False
with self.count_lock:
if self.count < 0:
return False
self.count -= 1
return self.count >= 0
@classmethod
def use(cls, count):
cls.count = count
super().use()

680
jm/src/jmcomic/jm_entity.py Executable file
View File

@@ -0,0 +1,680 @@
from functools import lru_cache
from common import *
from .jm_config import *
class Downloadable:
def __init__(self):
self.save_path: str = ''
self.exists: bool = False
self.skip = False
class JmBaseEntity:
def to_file(self, filepath):
from common import PackerUtil
PackerUtil.pack(self, filepath)
@classmethod
def is_image(cls):
return False
@classmethod
def is_photo(cls):
return False
@classmethod
def is_album(cls):
return False
@classmethod
def is_page(cls):
return False
class IndexedEntity:
def getindex(self, index: int):
raise NotImplementedError
def __len__(self):
raise NotImplementedError
def __getitem__(self, item) -> Any:
if isinstance(item, slice):
start = item.start or 0
stop = item.stop or len(self)
step = item.step or 1
return [self.getindex(index) for index in range(start, stop, step)]
elif isinstance(item, int):
return self.getindex(item)
else:
raise TypeError(f"Invalid item type for {self.__class__}")
def __iter__(self):
for index in range(len(self)):
yield self.getindex(index)
class DetailEntity(JmBaseEntity, IndexedEntity):
@property
def id(self) -> str:
raise NotImplementedError
@property
def title(self) -> str:
return getattr(self, 'name')
@property
def author(self):
raise NotImplementedError
@property
def oname(self) -> str:
"""
oname = original name
示例:
title"喂我吃吧 老師! [欶瀾漢化組] [BLVEFO9] たべさせて、せんせい! (ブルーアーカイブ) [中國翻譯] [無修正]"
oname"喂我吃吧 老師!"
:return: 返回本子的原始名称
"""
from .jm_toolkit import JmcomicText
oname = JmcomicText.parse_orig_album_name(self.title)
if oname is not None:
return oname
jm_log('entity', f'无法提取出原album名字: {self.title}')
return self.title
@property
def authoroname(self):
"""
authoroname = author + oname
个人认为识别度比较高的本子名称,一眼看去就能获取到本子的关键信息
具体格式: '【author】oname'
示例:
Pname喂我吃吧 老師! [欶瀾漢化組] [BLVEFO9] たべさせて、せんせい! (ブルーアーカイブ) [中國翻譯] [無修正]
Pauthoroname【BLVEFO9】喂我吃吧 老師!
:return: 返回作者名+本子原始名称,格式为: '【author】oname'
"""
return f'{self.author}{self.oname}'
@property
def idoname(self):
"""
类似 authoroname
:return: '[id] oname'
"""
return f'[{self.id}] {self.oname}'
def __str__(self):
return f'''{self.__class__.__name__}({self.__alias__()}-{self.id}: "{self.title}")'''
__repr__ = __str__
@classmethod
def __alias__(cls):
# "JmAlbumDetail" -> "album" (本子)
# "JmPhotoDetail" -> "photo" (章节)
cls_name = cls.__name__
return cls_name[cls_name.index("m") + 1: cls_name.rfind("Detail")].lower()
@classmethod
def get_dirname(cls, detail: 'DetailEntity', ref: str) -> str:
"""
该方法被 DirDule 调用,用于生成特定层次的文件夹
通常调用方式如下:
Atitle -> ref = 'title' -> DetailEntity.get_dirname(album, 'title')
该方法需要返回 ref 对应的文件夹名,默认实现直接返回 getattr(detail, 'title')
用户可重写此方法,来实现自定义文件夹名
v2.4.5: 此方法支持优先从 JmModuleConfig.XFIELD_ADVICE 中获取自定义函数并调用返回结果
:param detail: 本子/章节 实例
:param ref: 字段名
:returns: 文件夹名
"""
advice_func = (JmModuleConfig.AFIELD_ADVICE
if isinstance(detail, JmAlbumDetail)
else JmModuleConfig.PFIELD_ADVICE
).get(ref, None)
if advice_func is not None:
return advice_func(detail)
return getattr(detail, ref)
class JmImageDetail(JmBaseEntity, Downloadable):
def __init__(self,
aid,
scramble_id,
img_url,
img_file_name,
img_file_suffix,
from_photo=None,
query_params=None,
index=-1,
):
super().__init__()
if scramble_id is None or (isinstance(scramble_id, str) and scramble_id == ''):
from .jm_toolkit import ExceptionTool
ExceptionTool.raises(f'图片的scramble_id不能为空')
self.aid: str = str(aid)
self.scramble_id: str = str(scramble_id)
self.img_url: str = img_url
self.img_file_name: str = img_file_name # without suffix
self.img_file_suffix: str = img_file_suffix
self.from_photo: Optional[JmPhotoDetail] = from_photo
self.query_params: Optional[str] = query_params
self.index = index # 从1开始
@property
def filename_without_suffix(self):
return self.img_file_name
@property
def filename(self):
return self.img_file_name + self.img_file_suffix
@property
def is_gif(self):
return self.img_file_suffix == '.gif'
@property
def download_url(self) -> str:
"""
图片的下载路径
与 self.img_url 的唯一不同是,在最后会带上 ?{self.query_params}
:returns: 图片的下载路径
"""
if self.query_params is None:
return self.img_url
return f'{self.img_url}?{self.query_params}'
@classmethod
def of(cls,
photo_id: str,
scramble_id: str,
data_original: str,
from_photo=None,
query_params=None,
index=-1,
) -> 'JmImageDetail':
"""
该方法用于创建 JmImageDetail 对象
"""
# /xxx.yyy
# ↑ ↑
# x y
x = data_original.rfind('/')
y = data_original.rfind('.')
return JmImageDetail(
aid=photo_id,
scramble_id=scramble_id,
img_url=data_original,
img_file_name=data_original[x + 1:y],
img_file_suffix=data_original[y:],
from_photo=from_photo,
query_params=query_params,
index=index,
)
@property
def tag(self) -> str:
"""
this tag is used to print pretty info when logging
"""
return f'{self.aid}/{self.img_file_name}{self.img_file_suffix} [{self.index}/{len(self.from_photo)}]'
@classmethod
def is_image(cls):
return True
def __str__(self):
return f'''{self.__class__.__name__}(image-[{self.download_url}])'''
__repr__ = __str__
class JmPhotoDetail(DetailEntity, Downloadable):
def __init__(self,
photo_id,
name,
series_id,
sort,
tags='',
scramble_id='',
page_arr=None,
data_original_domain=None,
data_original_0=None,
author=None,
from_album=None,
):
super().__init__()
self.photo_id: str = str(photo_id)
self.scramble_id: str = str(scramble_id)
self.name: str = str(name).strip()
self.sort: int = int(sort)
self._tags: str = tags
self._series_id: int = int(series_id)
self._author: Optional[str] = author
self.from_album: Optional[JmAlbumDetail] = from_album
self.index = self.album_index
# 下面的属性和图片url有关
if isinstance(page_arr, str):
import json
page_arr = json.loads(page_arr)
# page_arr存放了该photo的所有图片文件名 img_name
self.page_arr: List[str] = page_arr
# 图片的cdn域名
self.data_original_domain: Optional[str] = data_original_domain
# 第一张图的URL
self.data_original_0 = data_original_0
# 2023-07-14
# 禁漫的图片url加上了一个参数v如果没有带上这个参数v图片会返回空数据
# 参数v的特点
# 1. 值似乎是该photo的更新时间的时间戳因此所有图片都使用同一个值
# 2. 值目前在网页端只在photo页面的图片标签的data-original属性出现
# 这里的模拟思路是获取到第一个图片标签的data-original
# 取出其query参数 → self.data_original_query_params, 该值未来会传递给 JmImageDetail
# self.data_original_query_params = self.get_data_original_query_params(data_original_0)
self.data_original_query_params = None
@property
def is_single_album(self) -> bool:
return self._series_id == 0
@property
def tags(self) -> List[str]:
if self.from_album is not None:
return self.from_album.tags
tag_str = self._tags
if ',' in tag_str:
# html
return tag_str.split(',')
else:
# api
return tag_str.split()
@property
def indextitle(self):
return f'{self.album_index}{self.name}'
@property
def album_id(self) -> str:
return self.photo_id if self.is_single_album else str(self._series_id)
@property
def album_index(self) -> int:
"""
返回这个章节在本子中的序号从1开始
"""
# 如果是单章本子JM给的sort为2。
# 这里返回1比较符合语义定义
if self.is_single_album and self.sort == 2:
return 1
return self.sort
@property
def author(self) -> str:
# 优先使用 from_album
if self.from_album is not None:
return self.from_album.author
if self._author is not None and self._author != '':
return self._author.strip()
# 使用默认
return JmModuleConfig.DEFAULT_AUTHOR
def create_image_detail(self, index) -> JmImageDetail:
# 校验参数
length = len(self.page_arr)
if index >= length:
raise IndexError(f'image index out of range for photo-{self.photo_id}: {index} >= {length}')
data_original = self.get_img_data_original(self.page_arr[index])
return JmModuleConfig.image_class().of(
self.photo_id,
self.scramble_id,
data_original,
from_photo=self,
query_params=self.data_original_query_params,
index=index + 1,
)
def get_img_data_original(self, img_name: str) -> str:
"""
根据图片名,生成图片的完整请求路径 URL
例如img_name = 01111.webp
返回https://cdn-msp2.18comic.org/media/photos/147643/01111.webp
"""
domain = self.data_original_domain
from .jm_toolkit import ExceptionTool
ExceptionTool.require_true(domain is not None, f'图片域名为空: {domain}')
return f'{JmModuleConfig.PROT}{domain}/media/photos/{self.photo_id}/{img_name}'
# noinspection PyMethodMayBeStatic
def get_data_original_query_params(self, data_original_0: Optional[str]) -> str:
if data_original_0 is None:
return f'v={time_stamp()}'
index = data_original_0.rfind('?')
if index == -1:
return f'v={time_stamp()}'
return data_original_0[index + 1:]
@property
def id(self):
return self.photo_id
@lru_cache(None)
def getindex(self, index) -> JmImageDetail:
return self.create_image_detail(index)
def __getitem__(self, item) -> Union[JmImageDetail, List[JmImageDetail]]:
return super().__getitem__(item)
def __len__(self):
return len(self.page_arr)
def __iter__(self) -> Generator[JmImageDetail, None, None]:
return super().__iter__()
@classmethod
def is_photo(cls):
return True
class JmAlbumDetail(DetailEntity, Downloadable):
def __init__(self,
album_id,
scramble_id,
name,
episode_list,
page_count,
pub_date,
update_date,
likes,
views,
comment_count,
works,
actors,
authors,
tags,
related_list=None,
):
super().__init__()
self.album_id: str = str(album_id)
self.scramble_id: str = str(scramble_id)
self.name: str = str(name).strip()
self.page_count: int = int(page_count) # 总页数
self.pub_date: str = pub_date # 发布日期
self.update_date: str = update_date # 更新日期
self.likes: str = likes # [1K] 點擊喜歡
self.views: str = views # [40K] 次觀看
self.comment_count: int = int(comment_count) # 评论数
self.works: List[str] = works # 作品
self.actors: List[str] = actors # 登場人物
self.tags: List[str] = tags # 標籤
self.authors: List[str] = authors # 作者
# 有的 album 没有章节,则自成一章。
episode_list: List[Tuple[str, str, str]]
if len(episode_list) == 0:
# photo_id, photo_index, photo_title, photo_pub_date
episode_list = [(album_id, "1", name)]
else:
episode_list = self.distinct_episode(episode_list)
self.episode_list = episode_list
self.related_list = related_list
@property
def author(self):
"""
作者
禁漫本子的作者标签可能有多个,全部作者请使用字段 self.author_list
"""
if len(self.authors) >= 1:
return self.authors[0]
return JmModuleConfig.DEFAULT_AUTHOR
@property
def id(self):
return self.album_id
@staticmethod
def distinct_episode(episode_list: list):
"""
去重章节
photo_id, photo_index, photo_title, photo_pub_date
"""
episode_list.sort(key=lambda e: int(e[1])) # 按照photo_index排序
ret = [episode_list[0]]
for i in range(1, len(episode_list)):
if ret[-1][1] != episode_list[i][1]:
ret.append(episode_list[i])
return ret
def create_photo_detail(self, index) -> JmPhotoDetail:
# 校验参数
length = len(self.episode_list)
if index >= length:
raise IndexError(f'photo index out of range for album-{self.album_id}: {index} >= {length}')
# ('212214', '81', '94 突然打來', '2020-08-29')
pid, pindex, pname = self.episode_list[index]
photo = JmModuleConfig.photo_class()(
photo_id=pid,
scramble_id=self.scramble_id,
name=pname,
series_id=self.album_id,
sort=pindex,
from_album=self,
)
return photo
@lru_cache(None)
def getindex(self, item) -> JmPhotoDetail:
return self.create_photo_detail(item)
def __getitem__(self, item) -> Union[JmPhotoDetail, List[JmPhotoDetail]]:
return super().__getitem__(item)
def __len__(self):
return len(self.episode_list)
def __iter__(self) -> Generator[JmPhotoDetail, None, None]:
return super().__iter__()
@classmethod
def is_album(cls):
return True
class JmPageContent(JmBaseEntity, IndexedEntity):
ContentItem = Tuple[str, Dict[str, Any]]
def __init__(self, content: List[ContentItem], total: int):
"""
content:
[
album_id, {title, tags, ...}
]
:param content: 分页数据
:param total: 总结果数
"""
self.content = content
self.total = total
@property
def page_count(self) -> int:
"""
页数
"""
page_size = self.page_size
import math
return math.ceil(int(self.total) / page_size)
@property
def page_size(self) -> int:
"""
页大小
"""
raise NotImplementedError
def iter_id(self) -> Generator[str, None, None]:
"""
返回 album_id 的迭代器
"""
for aid, ainfo in self.content:
yield aid
def iter_id_title(self) -> Generator[Tuple[str, str], None, None]:
"""
返回 album_id, album_title 的迭代器
"""
for aid, ainfo in self.content:
yield aid, ainfo['name']
def iter_id_title_tag(self) -> Generator[Tuple[str, str, List[str]], None, None]:
"""
返回 album_id, album_title, album_tags 的迭代器
"""
for aid, ainfo in self.content:
ainfo.setdefault('tags', [])
yield aid, ainfo['name'], ainfo['tags']
# 下面的方法实现方便的元素访问
def __len__(self):
return len(self.content)
def __iter__(self):
return self.iter_id_title()
def __getitem__(self, item) -> Union[ContentItem, List[ContentItem]]:
return super().__getitem__(item)
def getindex(self, index: int):
return self.content[index]
@classmethod
def is_page(cls):
return True
class JmSearchPage(JmPageContent):
@property
def page_size(self) -> int:
return JmModuleConfig.PAGE_SIZE_SEARCH
# 下面的方法是对单个album的包装
@property
def is_single_album(self):
return hasattr(self, 'album')
@property
def single_album(self) -> JmAlbumDetail:
return getattr(self, 'album')
@classmethod
def wrap_single_album(cls, album: JmAlbumDetail) -> 'JmSearchPage':
page = JmSearchPage([(
album.album_id, {
'name': album.name,
'tags': album.tags,
'scramble_id': album.scramble_id,
'page_count': album.page_count,
'pub_date': album.pub_date,
'update_date': album.update_date,
'likes': album.likes,
'views': album.views,
'comment_count': album.comment_count,
'works': album.works,
'actors': album.actors,
'authors': album.authors,
'related_list': album.related_list,
}
)], 1)
setattr(page, 'album', album)
return page
JmCategoryPage = JmSearchPage
class JmFavoritePage(JmPageContent):
def __init__(self, content, folder_list, total):
"""
:param content: 收藏夹一页数据
:param folder_list: 所有的收藏夹的信息
:param total: 收藏夹的收藏总数
"""
super().__init__(content, total)
self.folder_list = folder_list
@property
def page_size(self) -> int:
return JmModuleConfig.PAGE_SIZE_FAVORITE
def iter_folder_id_name(self) -> Generator[Tuple[str, str], None, None]:
"""
用户文件夹的迭代器
"""
for folder_info in self.folder_list:
fid, fname = folder_info['FID'], folder_info['name']
yield fid, fname

191
jm/src/jmcomic/jm_exception.py Executable file
View File

@@ -0,0 +1,191 @@
# 该文件存放jmcomic的异常机制设计和实现
from .jm_entity import *
class JmcomicException(Exception):
description = 'jmcomic 模块异常'
def __init__(self, msg: str, context: dict):
self.msg = msg
self.context = context
def from_context(self, key):
return self.context[key]
def __str__(self):
return self.msg
class ResponseUnexpectedException(JmcomicException):
description = '响应不符合预期异常'
@property
def resp(self):
return self.from_context(ExceptionTool.CONTEXT_KEY_RESP)
class RegularNotMatchException(JmcomicException):
description = '正则表达式不匹配异常'
@property
def resp(self):
"""
可能为None
"""
return self.context.get(ExceptionTool.CONTEXT_KEY_RESP, None)
@property
def error_text(self):
return self.from_context(ExceptionTool.CONTEXT_KEY_HTML)
@property
def pattern(self):
return self.from_context(ExceptionTool.CONTEXT_KEY_RE_PATTERN)
class JsonResolveFailException(ResponseUnexpectedException):
description = 'Json解析异常'
class MissingAlbumPhotoException(ResponseUnexpectedException):
description = '不存在本子或章节异常'
@property
def error_jmid(self) -> str:
return self.from_context(ExceptionTool.CONTEXT_KEY_MISSING_JM_ID)
class RequestRetryAllFailException(JmcomicException):
description = '请求重试全部失败异常'
class PartialDownloadFailedException(JmcomicException):
description = '部分章节或图片下载失败异常'
@property
def downloader(self):
return self.from_context(ExceptionTool.CONTEXT_KEY_DOWNLOADER)
class ExceptionTool:
"""
抛异常的工具
1: 能简化 if-raise 语句的编写
2: 有更好的上下文信息传递方式
"""
CONTEXT_KEY_RESP = 'resp'
CONTEXT_KEY_HTML = 'html'
CONTEXT_KEY_RE_PATTERN = 'pattern'
CONTEXT_KEY_MISSING_JM_ID = 'missing_jm_id'
CONTEXT_KEY_DOWNLOADER = 'downloader'
@classmethod
def raises(cls,
msg: str,
context: dict = None,
etype: Optional[Type[Exception]] = None,
):
"""
抛出异常
:param msg: 异常消息
:param context: 异常上下文数据
:param etype: 异常类型,默认使用 JmcomicException
"""
if context is None:
context = {}
if etype is None:
etype = JmcomicException
# 异常对象
e = etype(msg, context)
# 异常处理建议
cls.notify_all_listeners(e)
raise e
@classmethod
def raises_regex(cls,
msg: str,
html: str,
pattern: Pattern,
):
cls.raises(
msg,
{
cls.CONTEXT_KEY_HTML: html,
cls.CONTEXT_KEY_RE_PATTERN: pattern,
},
RegularNotMatchException,
)
@classmethod
def raises_resp(cls,
msg: str,
resp,
etype=ResponseUnexpectedException
):
cls.raises(
msg, {
cls.CONTEXT_KEY_RESP: resp
},
etype,
)
@classmethod
def raise_missing(cls,
resp,
jmid: str,
):
"""
抛出本子/章节的异常
:param resp: 响应对象
:param jmid: 禁漫本子/章节id
"""
from .jm_toolkit import JmcomicText
url = JmcomicText.format_album_url(jmid)
req_type = "本子" if "album" in url else "章节"
cls.raises(
(
f'请求的{req_type}不存在!({url})\n'
'原因可能为:\n'
f'1. id有误检查你的{req_type}id\n'
'2. 该漫画只对登录用户可见请配置你的cookies或者使用移动端Clientapi\n'
),
{
cls.CONTEXT_KEY_RESP: resp,
cls.CONTEXT_KEY_MISSING_JM_ID: jmid,
},
MissingAlbumPhotoException,
)
@classmethod
def require_true(cls, case: bool, msg: str):
if case:
return
cls.raises(msg)
@classmethod
def replace_old_exception_executor(cls, raises: Callable[[Callable, str, dict], None]):
old = cls.raises
def new(msg, context=None, _etype=None):
if context is None:
context = {}
raises(old, msg, context)
cls.raises = new
@classmethod
def notify_all_listeners(cls, e):
registry: Dict[Type, Callable[Type]] = JmModuleConfig.REGISTRY_EXCEPTION_LISTENER
if not registry:
return None
for accept_type, listener in registry.items():
if isinstance(e, accept_type):
listener(e)

670
jm/src/jmcomic/jm_option.py Executable file
View File

@@ -0,0 +1,670 @@
from .jm_client_impl import *
class CacheRegistry:
REGISTRY = {}
@classmethod
def level_option(cls, option, _client):
registry = cls.REGISTRY
registry.setdefault(option, {})
return registry[option]
@classmethod
def level_client(cls, _option, client):
registry = cls.REGISTRY
registry.setdefault(client, {})
return registry[client]
@classmethod
def enable_client_cache_on_condition(cls,
option: 'JmOption',
client: JmcomicClient,
cache: Union[None, bool, str, Callable],
):
"""
cache parameter
if None: no cache
if bool:
true: level_option
false: no cache
if str:
(invoke corresponding Cache class method)
:param option: JmOption
:param client: JmcomicClient
:param cache: config dsl
"""
if cache is None:
return
elif isinstance(cache, bool):
if cache is False:
return
else:
cache = cls.level_option
elif isinstance(cache, str):
func = getattr(cls, cache, None)
ExceptionTool.require_true(func is not None, f'未实现的cache配置名: {cache}')
cache = func
cache: Callable
client.set_cache_dict(cache(option, client))
class DirRule:
rule_sample = [
# 根目录 / Album-id / Photo-序号 /
'Bd_Aid_Pindex', # 禁漫网站的默认下载方式
# 根目录 / Album-作者 / Album-标题 / Photo-序号 /
'Bd_Aauthor_Atitle_Pindex',
# 根目录 / Photo-序号&标题 /
'Bd_Pindextitle',
# 根目录 / Photo-自定义类属性 /
'Bd_Aauthor_Atitle_Pcustomfield',
# 需要替换JmModuleConfig.CLASS_ALBUM / CLASS_PHOTO才能让自定义属性生效
]
Detail = Union[JmAlbumDetail, JmPhotoDetail, None]
RuleFunc = Callable[[Detail], str]
RuleSolver = Tuple[str, RuleFunc, str]
RuleSolverList = List[RuleSolver]
def __init__(self, rule: str, base_dir=None):
base_dir = JmcomicText.parse_to_abspath(base_dir)
self.base_dir = base_dir
self.rule_dsl = rule
self.solver_list = self.get_role_solver_list(rule, base_dir)
def decide_image_save_dir(self,
album: JmAlbumDetail,
photo: JmPhotoDetail,
) -> str:
path_ls = []
for solver in self.solver_list:
try:
ret = self.apply_rule_solver(album, photo, solver)
except BaseException as e:
# noinspection PyUnboundLocalVariable
jm_log('dir_rule', f'路径规则"{solver[2]}"的解析出错: {e}, album={album}, photo={photo}')
raise e
path_ls.append(str(ret))
return fix_filepath('/'.join(path_ls), is_dir=True)
def decide_album_root_dir(self, album: JmAlbumDetail) -> str:
path_ls = []
for solver in self.solver_list:
key, _, rule = solver
if key != 'Bd' and key != 'A':
continue
try:
ret = self.apply_rule_solver(album, None, solver)
except BaseException as e:
# noinspection PyUnboundLocalVariable
jm_log('dir_rule', f'路径规则"{rule}"的解析出错: {e}, album={album}')
raise e
path_ls.append(str(ret))
return fix_filepath('/'.join(path_ls), is_dir=True)
def get_role_solver_list(self, rule_dsl: str, base_dir: str) -> RuleSolverList:
"""
解析下载路径dsl得到一个路径规则解析列表
"""
rule_list = self.split_rule_dsl(rule_dsl)
solver_ls: List[DirRule.RuleSolver] = []
for rule in rule_list:
rule = rule.strip()
if rule == 'Bd':
solver_ls.append(('Bd', lambda _: base_dir, 'Bd'))
continue
rule_solver = self.get_rule_solver(rule)
if rule_solver is None:
ExceptionTool.raises(f'不支持的dsl: "{rule}" in "{rule_dsl}"')
solver_ls.append(rule_solver)
return solver_ls
# noinspection PyMethodMayBeStatic
def split_rule_dsl(self, rule_dsl: str) -> List[str]:
if rule_dsl == 'Bd':
return [rule_dsl]
if '/' in rule_dsl:
return rule_dsl.split('/')
if '_' in rule_dsl:
return rule_dsl.split('_')
ExceptionTool.raises(f'不支持的rule配置: "{rule_dsl}"')
@classmethod
def get_rule_solver(cls, rule: str) -> Optional[RuleSolver]:
# 检查dsl
if not rule.startswith(('A', 'P')):
return None
def solve_func(detail):
return fix_windir_name(str(DetailEntity.get_dirname(detail, rule[1:]))).strip()
return rule[0], solve_func, rule
@classmethod
def apply_rule_solver(cls, album, photo, rule_solver: RuleSolver) -> str:
"""
应用规则解析器(RuleSolver)
:param album: JmAlbumDetail
:param photo: JmPhotoDetail
:param rule_solver: Ptitle
:returns: photo.title
"""
def choose_detail(key):
if key == 'Bd':
return None
if key == 'A':
return album
if key == 'P':
return photo
key, func, _ = rule_solver
detail = choose_detail(key)
return func(detail)
@classmethod
def apply_rule_directly(cls, album, photo, rule: str) -> str:
return cls.apply_rule_solver(album, photo, cls.get_rule_solver(rule))
class JmOption:
def __init__(self,
dir_rule: Dict,
download: Dict,
client: Dict,
plugins: Dict,
filepath=None,
call_after_init_plugin=True,
):
# 路径规则配置
self.dir_rule = DirRule(**dir_rule)
# 客户端配置
self.client = AdvancedDict(client)
# 下载配置
self.download = AdvancedDict(download)
# 插件配置
self.plugins = AdvancedDict(plugins)
# 其他配置
self.filepath = filepath
# 需要主线程等待完成的插件
self.need_wait_plugins = []
if call_after_init_plugin:
self.call_all_plugin('after_init', safe=True)
def copy_option(self):
return self.__class__(
dir_rule={
'rule': self.dir_rule.rule_dsl,
'base_dir': self.dir_rule.base_dir,
},
download=self.download.src_dict,
client=self.client.src_dict,
plugins=self.plugins.src_dict,
filepath=self.filepath,
call_after_init_plugin=False
)
"""
下面是decide系列方法为了支持重写和增加程序动态性。
"""
# noinspection PyUnusedLocal
def decide_image_batch_count(self, photo: JmPhotoDetail):
return self.download.threading.image
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def decide_photo_batch_count(self, album: JmAlbumDetail):
return self.download.threading.photo
# noinspection PyMethodMayBeStatic
def decide_image_filename(self, image: JmImageDetail) -> str:
"""
返回图片的文件名,不包含后缀
默认返回禁漫的图片文件名例如00001 (.jpg)
"""
return image.filename_without_suffix
def decide_image_suffix(self, image: JmImageDetail) -> str:
"""
返回图片的后缀,如果返回的后缀和原后缀不一致,则会进行图片格式转换
"""
# 动图则使用原后缀
if image.is_gif:
return image.img_file_suffix
# 非动图,以配置为先
return self.download.image.suffix or image.img_file_suffix
def decide_image_save_dir(self, photo, ensure_exists=True) -> str:
# 使用 self.dir_rule 决定 save_dir
save_dir = self.dir_rule.decide_image_save_dir(
photo.from_album,
photo
)
if ensure_exists:
save_dir = JmcomicText.try_mkdir(save_dir)
return save_dir
def decide_image_filepath(self, image: JmImageDetail, consider_custom_suffix=True) -> str:
# 以此决定保存文件夹、后缀、不包含后缀的文件名
save_dir = self.decide_image_save_dir(image.from_photo)
suffix = self.decide_image_suffix(image) if consider_custom_suffix else image.img_file_suffix
return os.path.join(save_dir, fix_windir_name(self.decide_image_filename(image)) + suffix)
def decide_download_cache(self, _image: JmImageDetail) -> bool:
return self.download.cache
def decide_download_image_decode(self, image: JmImageDetail) -> bool:
# .gif file needn't be decoded
if image.is_gif:
return False
return self.download.image.decode
"""
下面是创建对象相关方法
"""
@classmethod
def default_dict(cls) -> Dict:
return JmModuleConfig.option_default_dict()
@classmethod
def default(cls) -> 'JmOption':
"""
使用默认的 JmOption
"""
return cls.construct({})
@classmethod
def construct(cls, origdic: Dict, cover_default=True) -> 'JmOption':
dic = cls.merge_default_dict(origdic) if cover_default else origdic
# log
log = dic.pop('log', True)
if log is False:
disable_jm_log()
# version
version = dic.pop('version', None)
# noinspection PyTypeChecker
if version is not None and float(version) >= float(JmModuleConfig.JM_OPTION_VER):
# 版本号更高,跳过兼容代码
return cls(**dic)
# 旧版本option做兼容
cls.compatible_with_old_versions(dic)
return cls(**dic)
@classmethod
def compatible_with_old_versions(cls, dic):
"""
兼容旧的option版本
"""
# 1: 并发配置项
dt: dict = dic['download']['threading']
if 'batch_count' in dt:
batch_count = dt.pop('batch_count')
dt['image'] = batch_count
# 2: 插件配置项 plugin -> plugins
if 'plugin' in dic:
dic['plugins'] = dic.pop('plugin')
def deconstruct(self) -> Dict:
return {
'version': JmModuleConfig.JM_OPTION_VER,
'log': JmModuleConfig.FLAG_ENABLE_JM_LOG,
'dir_rule': {
'rule': self.dir_rule.rule_dsl,
'base_dir': self.dir_rule.base_dir,
},
'download': self.download.src_dict,
'client': self.client.src_dict,
'plugins': self.plugins.src_dict
}
"""
下面是文件IO方法
"""
@classmethod
def from_file(cls, filepath: str) -> 'JmOption':
dic: dict = PackerUtil.unpack(filepath)[0]
dic.setdefault('filepath', filepath)
return cls.construct(dic)
def to_file(self, filepath=None):
if filepath is None:
filepath = self.filepath
ExceptionTool.require_true(filepath is not None, "未指定JmOption的保存路径")
PackerUtil.pack(self.deconstruct(), filepath)
"""
下面是创建客户端的相关方法
"""
@field_cache()
def build_jm_client(self, **kwargs):
"""
该方法会首次调用会创建JmcomicClient对象
然后保存在self中
多次调用`不会`创建新的JmcomicClient对象
"""
return self.new_jm_client(**kwargs)
def new_jm_client(self, domain_list=None, impl=None, cache=None, **kwargs) -> Union[JmHtmlClient, JmApiClient]:
"""
创建新的Client客户端不同Client之间的元数据不共享
"""
from copy import deepcopy
# 所有需要用到的 self.client 配置项如下
postman_conf: dict = deepcopy(self.client.postman.src_dict) # postman dsl 配置
meta_data: dict = postman_conf['meta_data'] # 元数据
retry_times: int = self.client.retry_times # 重试次数
cache: str = cache if cache is not None else self.client.cache # 启用缓存
impl: str = impl or self.client.impl # client_key
if isinstance(impl, type):
# eg: impl = JmHtmlClient
# noinspection PyUnresolvedReferences
impl = impl.client_key
# start construct client
# domain
def decide_domain_list():
nonlocal domain_list
if domain_list is None:
domain_list = self.client.domain
if not isinstance(domain_list, (list, str)):
# dict
domain_list = domain_list.get(impl, [])
if isinstance(domain_list, str):
# multi-lines text
domain_list = str_to_list(domain_list)
# list or str
if len(domain_list) == 0:
domain_list = self.decide_client_domain(impl)
return domain_list
# support kwargs overwrite meta_data
if len(kwargs) != 0:
meta_data.update(kwargs)
# postman
postman = Postmans.create(data=postman_conf)
# client
clazz = JmModuleConfig.client_impl_class(impl)
if clazz == AbstractJmClient or not issubclass(clazz, AbstractJmClient):
raise NotImplementedError(clazz)
client: AbstractJmClient = clazz(
postman=postman,
domain_list=decide_domain_list(),
retry_times=retry_times,
)
# enable cache
CacheRegistry.enable_client_cache_on_condition(self, client, cache)
# noinspection PyTypeChecker
return client
def update_cookies(self, cookies: dict):
metadata: dict = self.client.postman.meta_data.src_dict
orig_cookies: Optional[Dict] = metadata.get('cookies', None)
if orig_cookies is None:
metadata['cookies'] = cookies
else:
orig_cookies.update(cookies)
metadata['cookies'] = orig_cookies
# noinspection PyMethodMayBeStatic
def decide_client_domain(self, client_key: str) -> List[str]:
def is_client_type(ctype) -> bool:
return self.client_key_is_given_type(client_key, ctype)
if is_client_type(JmApiClient):
# 移动端
return JmModuleConfig.DOMAIN_API_LIST
if is_client_type(JmHtmlClient):
# 网页端
domain_list = JmModuleConfig.DOMAIN_HTML_LIST
if domain_list is not None:
return domain_list
return [JmModuleConfig.get_html_domain()]
ExceptionTool.raises(f'没有配置域名且是无法识别的client类型: {client_key}')
@classmethod
def client_key_is_given_type(cls, client_key, ctype: Type[JmcomicClient]):
if client_key == ctype.client_key:
return True
clazz = JmModuleConfig.client_impl_class(client_key)
if issubclass(clazz, ctype):
return True
return False
@classmethod
def merge_default_dict(cls, user_dict, default_dict=None):
"""
深度合并两个字典
"""
if default_dict is None:
default_dict = cls.default_dict()
for key, value in user_dict.items():
if isinstance(value, dict) and isinstance(default_dict.get(key), dict):
default_dict[key] = cls.merge_default_dict(value, default_dict[key])
else:
default_dict[key] = value
return default_dict
# 下面的方法提供面向对象的调用风格
def download_album(self,
album_id,
downloader=None,
callback=None,
):
from .api import download_album
download_album(album_id, self, downloader, callback)
def download_photo(self,
photo_id,
downloader=None,
callback=None
):
from .api import download_photo
download_photo(photo_id, self, downloader, callback)
# 下面的方法为调用插件提供支持
def call_all_plugin(self, group: str, safe=True, **extra):
plugin_list: List[dict] = self.plugins.get(group, [])
if plugin_list is None or len(plugin_list) == 0:
return
# 保证 jm_plugin.py 被加载
from .jm_plugin import JmOptionPlugin
plugin_registry = JmModuleConfig.REGISTRY_PLUGIN
for pinfo in plugin_list:
key, kwargs = pinfo['plugin'], pinfo.get('kwargs', None) # kwargs为None
pclass: Optional[Type[JmOptionPlugin]] = plugin_registry.get(key, None)
ExceptionTool.require_true(pclass is not None, f'[{group}] 未注册的plugin: {key}')
try:
self.invoke_plugin(pclass, kwargs, extra, pinfo)
except BaseException as e:
if safe is True:
traceback_print_exec()
else:
raise e
def invoke_plugin(self, pclass, kwargs: Optional[Dict], extra: dict, pinfo: dict):
# 检查插件的参数类型
kwargs = self.fix_kwargs(kwargs)
# 把插件的配置数据kwargs和附加数据extra合并extra会覆盖kwargs
if len(extra) != 0:
kwargs.update(extra)
# 保证 jm_plugin.py 被加载
from .jm_plugin import JmOptionPlugin, PluginValidationException
pclass: Type[JmOptionPlugin]
plugin: Optional[JmOptionPlugin] = None
try:
# 构建插件对象
plugin: JmOptionPlugin = pclass.build(self)
# 设置日志开关
if pinfo.get('log', True) is not True:
plugin.log_enable = False
jm_log('plugin.invoke', f'调用插件: [{pclass.plugin_key}]')
# 调用插件功能
plugin.invoke(**kwargs)
except PluginValidationException as e:
# 插件抛出的参数校验异常
self.handle_plugin_valid_exception(e, pinfo, kwargs, plugin, pclass)
except JmcomicException as e:
# 模块内部异常通过不是插件抛出的而是插件调用了例如ClientClient请求失败抛出的
self.handle_plugin_jmcomic_exception(e, pinfo, kwargs, plugin, pclass)
except BaseException as e:
# 为插件兜底,捕获其他所有异常
self.handle_plugin_unexpected_error(e, pinfo, kwargs, plugin, pclass)
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def handle_plugin_valid_exception(self, e, pinfo: dict, kwargs: dict, _plugin, _pclass):
from .jm_plugin import PluginValidationException
e: PluginValidationException
mode = pinfo.get('valid', self.plugins.valid)
if mode == 'ignore':
# ignore
return
if mode == 'log':
# log
jm_log('plugin.validation',
f'插件 [{e.plugin.plugin_key}] 参数校验异常:{e.msg}'
)
return
if mode == 'raise':
# raise
raise e
# 其他的mode可以通过继承+方法重写来扩展
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def handle_plugin_unexpected_error(self, e, pinfo: dict, kwargs: dict, _plugin, pclass):
msg = str(e)
jm_log('plugin.error', f'插件 [{pclass.plugin_key}],运行遇到未捕获异常,异常信息: [{msg}]')
raise e
# noinspection PyMethodMayBeStatic,PyUnusedLocal
def handle_plugin_jmcomic_exception(self, e, pinfo: dict, kwargs: dict, _plugin, pclass):
msg = str(e)
jm_log('plugin.exception', f'插件 [{pclass.plugin_key}] 调用失败,异常信息: [{msg}]')
raise e
# noinspection PyMethodMayBeStatic
def fix_kwargs(self, kwargs: Optional[Dict]) -> Dict[str, Any]:
"""
kwargs将来要传给方法参数这要求kwargs的key是str类型
该方法检查kwargs的key的类型如果不是str尝试转为str不行则抛异常。
"""
if kwargs is None:
kwargs = {}
else:
ExceptionTool.require_true(
isinstance(kwargs, dict),
f'插件的kwargs参数必须为dict类型而不能是类型: {type(kwargs)}'
)
kwargs: dict
new_kwargs: Dict[str, Any] = {}
for k, v in kwargs.items():
if isinstance(v, str):
newv = JmcomicText.parse_dsl_text(v)
v = newv
if isinstance(k, str):
new_kwargs[k] = v
continue
if isinstance(k, (int, float)):
newk = str(k)
jm_log('plugin.kwargs', f'插件参数类型转换: {k} ({type(k)}) -> {newk} ({type(newk)})')
new_kwargs[newk] = v
continue
ExceptionTool.raises(
f'插件kwargs参数类型有误'
f'字段: {k}预期类型为str实际类型为{type(k)}'
)
return new_kwargs
def wait_all_plugins_finish(self):
from .jm_plugin import JmOptionPlugin
for plugin in self.need_wait_plugins:
plugin: JmOptionPlugin
plugin.wait_until_finish()

1222
jm/src/jmcomic/jm_plugin.py Executable file

File diff suppressed because it is too large Load Diff

927
jm/src/jmcomic/jm_toolkit.py Executable file
View File

@@ -0,0 +1,927 @@
from PIL import Image
from .jm_exception import *
class JmcomicText:
pattern_jm_domain = compile(r'https://([\w.-]+)')
pattern_jm_pa_id = [
(compile(r'(photos?|albums?)/(\d+)'), 2),
(compile(r'id=(\d+)'), 1),
]
pattern_html_jm_pub_domain = compile(r'[\w-]+\.\w+/?\w+')
pattern_html_photo_photo_id = compile(r'<meta property="og:url" content=".*?/photo/(\d+)/?.*?">')
pattern_html_photo_scramble_id = compile(r'var scramble_id = (\d+);')
pattern_html_photo_name = compile(r'<title>([\s\S]*?)\|.*</title>')
# pattern_html_photo_data_original_list = compile(r'data-original="(.*?)" id="album_photo_.+?"')
pattern_html_photo_data_original_domain = compile(r'src="https://(.*?)/media/albums/blank')
pattern_html_photo_data_original_0 = compile(r'data-original="(.*?)"[^>]*?id="album_photo[^>]*?data-page="0"')
pattern_html_photo_tags = compile(r'<meta name="keywords"[\s\S]*?content="(.*?)"')
pattern_html_photo_series_id = compile(r'var series_id = (\d+);')
pattern_html_photo_sort = compile(r'var sort = (\d+);')
pattern_html_photo_page_arr = compile(r'var page_arr = (.*?);')
pattern_html_album_album_id = compile(r'<span class="number">.*?JM(\d+)</span>')
pattern_html_album_scramble_id = compile(r'var scramble_id = (\d+);')
pattern_html_album_name = compile(r'<h1 class="book-name" id="book-name">([\s\S]*?)</h1>')
pattern_html_album_episode_list = compile(r'data-album="(\d+)"[^>]*>\s*?<li.*?>\s*?第(\d+)[话話]([\s\S]*?)<[\s\S]*?>')
pattern_html_album_page_count = compile(r'<span class="pagecount">.*?:(\d+)</span>')
pattern_html_album_pub_date = compile(r'>上架日期 : (.*?)</span>')
pattern_html_album_update_date = compile(r'>更新日期 : (.*?)</span>')
pattern_html_tag_a = compile(r'<a[^>]*?>\s*(\S*)\s*</a>')
# 作品
pattern_html_album_works = [
compile(r'<span itemprop="author" data-type="works">([\s\S]*?)</span>'),
pattern_html_tag_a,
]
# 登場人物
pattern_html_album_actors = [
compile(r'<span itemprop="author" data-type="actor">([\s\S]*?)</span>'),
pattern_html_tag_a,
]
# 标签
pattern_html_album_tags = [
compile(r'<span itemprop="genre" data-type="tags">([\s\S]*?)</span>'),
pattern_html_tag_a,
]
# 作者
pattern_html_album_authors = [
compile(r'作者: *<span itemprop="author" data-type="author">([\s\S]*?)</span>'),
pattern_html_tag_a,
]
# 點擊喜歡
pattern_html_album_likes = compile(r'<span id="albim_likes_\d+">(.*?)</span>')
# 觀看
pattern_html_album_views = compile(r'<span>(.*?)</span>\n *<span>(次觀看|观看次数|次观看次数)</span>')
# 評論(div)
pattern_html_album_comment_count = compile(r'<div class="badge"[^>]*?id="total_video_comments">(\d+)</div>'), 0
# 提取接口返回值信息
pattern_ajax_favorite_msg = compile(r'</button>(.*?)</div>')
@classmethod
def parse_to_jm_domain(cls, text: str):
if text.startswith(JmModuleConfig.PROT):
return cls.pattern_jm_domain.search(text)[1]
return text
@classmethod
def parse_to_jm_id(cls, text) -> str:
if isinstance(text, int):
return str(text)
ExceptionTool.require_true(isinstance(text, str), f"无法解析jm车号, 参数类型为: {type(text)}")
# 43210
if text.isdigit():
return text
# Jm43210
ExceptionTool.require_true(len(text) >= 2, f"无法解析jm车号, 文本太短: {text}")
# text: JM12341
c0 = text[0]
c1 = text[1]
if (c0 == 'J' or c0 == 'j') and (c1 == 'M' or c1 == 'm'):
# JM123456
return text[2:]
else:
# https://xxx/photo/412038
# https://xxx/album/?id=412038
for p, i in cls.pattern_jm_pa_id:
match = p.search(text)
if match is not None:
return match[i]
ExceptionTool.raises(f"无法解析jm车号, 文本为: {text}")
@classmethod
def analyse_jm_pub_html(cls, html: str, domain_keyword=('jm', 'comic')) -> List[str]:
domain_ls = cls.pattern_html_jm_pub_domain.findall(html)
return list(filter(
lambda domain: any(kw in domain for kw in domain_keyword),
domain_ls
))
@classmethod
def analyse_jm_photo_html(cls, html: str) -> JmPhotoDetail:
return cls.reflect_new_instance(
html,
"pattern_html_photo_",
JmModuleConfig.photo_class()
)
@classmethod
def analyse_jm_album_html(cls, html: str) -> JmAlbumDetail:
return cls.reflect_new_instance(
html,
"pattern_html_album_",
JmModuleConfig.album_class()
)
@classmethod
def reflect_new_instance(cls, html: str, cls_field_prefix: str, clazz: type):
def match_field(field_name: str, pattern: Union[Pattern, List[Pattern]], text):
if isinstance(pattern, list):
# 如果是 pattern 是 List[re.Pattern]
# 取最后一个 pattern 用于 match field
# 其他的 pattern 用来给文本缩小范围(相当于多次正则匹配)
last_pattern = pattern[len(pattern) - 1]
# 缩小文本
for i in range(0, len(pattern) - 1):
match: Match = pattern[i].search(text)
if match is None:
return None
text = match[0]
return last_pattern.findall(text)
if field_name.endswith("_list"):
return pattern.findall(text)
else:
match = pattern.search(text)
if match is not None:
return match[1]
return None
field_dict = {}
pattern_name: str
for pattern_name, pattern in cls.__dict__.items():
if not pattern_name.startswith(cls_field_prefix):
continue
# 支持如果不匹配,使用默认值
if isinstance(pattern, tuple):
pattern, default = pattern
else:
default = None
# 获取字段名和值
field_name = pattern_name[pattern_name.index(cls_field_prefix) + len(cls_field_prefix):]
field_value = match_field(field_name, pattern, html)
if field_value is None:
if default is None:
ExceptionTool.raises_regex(
f"文本没有匹配上字段:字段名为'{field_name}'pattern: [{pattern}]"
+ (f"\n响应文本=[{html}]" if len(html) < 200 else
f'响应文本过长(len={len(html)}),不打印'
),
html=html,
pattern=pattern,
)
else:
field_value = default
# 保存字段
field_dict[field_name] = field_value
return clazz(**field_dict)
@classmethod
def format_url(cls, path, domain):
ExceptionTool.require_true(isinstance(domain, str) and len(domain) != 0, '域名为空')
if domain.startswith(JmModuleConfig.PROT):
return f'{domain}{path}'
return f'{JmModuleConfig.PROT}{domain}{path}'
@classmethod
def format_album_url(cls, aid, domain='18comic.vip'):
"""
把album_id变为可访问的URL方便print打印后用浏览器访问
"""
return cls.format_url(f'/album/{aid}/', domain)
class DSLReplacer:
def __init__(self):
self.dsl_dict: Dict[Pattern, Callable[[Match], str]] = {}
def parse_dsl_text(self, text) -> str:
for pattern, replacer in self.dsl_dict.items():
text = pattern.sub(replacer, text)
return text
def add_dsl_and_replacer(self, dsl: str, replacer: Callable[[Match], str]):
pattern = compile(dsl)
self.dsl_dict[pattern] = replacer
@classmethod
def match_os_env(cls, match: Match) -> str:
name = match[1]
value = os.getenv(name, None)
ExceptionTool.require_true(value is not None, f'未配置环境变量: {name}')
return value
dsl_replacer = DSLReplacer()
@classmethod
def parse_to_abspath(cls, dsl_text: str) -> str:
return os.path.abspath(cls.parse_dsl_text(dsl_text))
@classmethod
def parse_dsl_text(cls, dsl_text: str) -> str:
return cls.dsl_replacer.parse_dsl_text(dsl_text)
bracket_map = {'(': ')',
'[': ']',
'': '',
'': '',
}
@classmethod
def parse_orig_album_name(cls, name: str, default=None):
word_list = cls.tokenize(name)
for word in word_list:
if word[0] in cls.bracket_map:
continue
return word
return default
@classmethod
def tokenize(cls, title: str) -> List[str]:
"""
繞道#2 [暴碧漢化組] [えーすけ123] よりみち#2 (COMIC 快樂天 2024年1月號) [中國翻譯] [DL版]
:return: ['繞道#2', '[暴碧漢化組]', '[えーすけ123]', 'よりみち#2', '(COMIC 快樂天 2024年1月號)', '[中國翻譯]', '[DL版]']
"""
title = title.strip()
ret = []
bracket_map = cls.bracket_map
char_list = []
i = 0
length = len(title)
def add(w=None):
if w is None:
w = ''.join(char_list).strip()
if w == '':
return
ret.append(w)
char_list.clear()
def find_right_pair(left_pair, i):
stack = [left_pair]
j = i + 1
while j < length and len(stack) != 0:
c = title[j]
if c in bracket_map:
stack.append(c)
elif c == bracket_map[stack[-1]]:
stack.pop()
j += 1
if len(stack) == 0:
return j
else:
return -1
while i < length:
c = title[i]
if c in bracket_map:
# 上一个单词结束
add()
# 定位右括号
j = find_right_pair(c, i)
if j == -1:
# 括号未闭合
char_list.append(c)
i += 1
continue
# 整个括号的单词结束
add(title[i:j])
# 移动指针
i = j
else:
char_list.append(c)
i += 1
add()
return ret
@classmethod
def to_zh_cn(cls, s):
import zhconv
return zhconv.convert(s, 'zh-cn')
@classmethod
def try_mkdir(cls, save_dir: str):
try:
mkdir_if_not_exists(save_dir)
except OSError as e:
if e.errno == 36:
# 目录名过长
limit = JmModuleConfig.VAR_FILE_NAME_LENGTH_LIMIT
jm_log('error', f'目录名过长,无法创建目录,强制缩短到{limit}个字符并重试')
save_dir = save_dir[0:limit]
return cls.try_mkdir(save_dir)
raise e
return save_dir
# 支持dsl: #{???} -> os.getenv(???)
JmcomicText.dsl_replacer.add_dsl_and_replacer(r'\$\{(.*?)\}', JmcomicText.match_os_env)
class PatternTool:
@classmethod
def match_or_default(cls, html: str, pattern: Pattern, default):
match = pattern.search(html)
return default if match is None else match[1]
@classmethod
def require_match(cls, html: str, pattern: Pattern, msg, rindex=1):
match = pattern.search(html)
if match is not None:
return match[rindex] if rindex is not None else match
ExceptionTool.raises_regex(
msg,
html=html,
pattern=pattern,
)
@classmethod
def require_not_match(cls, html: str, pattern: Pattern, *, msg_func):
match = pattern.search(html)
if match is None:
return
ExceptionTool.raises_regex(
msg_func(match),
html=html,
pattern=pattern,
)
class JmPageTool:
# 用来缩减html的长度
pattern_html_search_shorten_for = compile(r'<div class="well well-sm">([\s\S]*)<div class="row">')
# 用来提取搜索页面的album的信息
pattern_html_search_album_info_list = compile(
r'<a href="/album/(\d+)/[\s\S]*?title="(.*?)"([\s\S]*?)<div class="title-truncate tags .*>([\s\S]*?)</div>'
)
# 用来提取分类页面的album的信息
pattern_html_category_album_info_list = compile(
r'<a href="/album/(\d+)/[^>]*>[^>]*?'
r'title="(.*?)"[^>]*>[ \n]*</a>[ \n]*'
r'<div class="label-loveicon">([\s\S]*?)'
r'<div class="clearfix">'
)
# 用来查找tag列表
pattern_html_search_tags = compile(r'<a[^>]*?>(.*?)</a>')
# 查找错误,例如 [错误,關鍵字過短,請至少輸入兩個字以上。]
pattern_html_search_error = compile(r'<fieldset>\n<legend>(.*?)</legend>\n<div class=.*?>\n(.*?)\n</div>\n</fieldset>')
pattern_html_search_total = compile(r'class="text-white">(\d+)</span> A漫.'), 0
# 收藏页面的本子结果
pattern_html_favorite_content = compile(
r'<div id="favorites_album_[^>]*?>[\s\S]*?'
r'<a href="/album/(\d+)/[^"]*">[\s\S]*?'
r'<div class="video-title title-truncate">([^<]*?)'
r'</div>'
)
# 收藏夹的收藏总数
pattern_html_favorite_total = compile(r' : (\d+)[^/]*/\D*(\d+)')
# 所有的收藏夹
pattern_html_favorite_folder_list = [
compile(r'<select class="user-select" name="movefolder-fid">([\s\S]*)</select>'),
compile(r'<option value="(\d+)">([^<]*?)</option>')
]
@classmethod
def parse_html_to_search_page(cls, html: str) -> JmSearchPage:
# 1. 检查是否失败
PatternTool.require_not_match(
html,
cls.pattern_html_search_error,
msg_func=lambda match: '{}: {}'.format(match[1], match[2])
)
# 2. 缩小文本范围
html = PatternTool.require_match(
html,
cls.pattern_html_search_shorten_for,
msg='未匹配到搜索结果',
)
# 3. 提取结果
content = [] # content这个名字来源于api版搜索返回值
total = int(PatternTool.match_or_default(html, *cls.pattern_html_search_total)) # 总结果数
album_info_list = cls.pattern_html_search_album_info_list.findall(html)
for (album_id, title, _label_category_text, tag_text) in album_info_list:
# 从label_category_text中可以解析出label-category和label-sub
# 这里不作解析,因为没什么用...
tags = cls.pattern_html_search_tags.findall(tag_text)
content.append((
album_id, {
'name': title, # 改成name是为了兼容 parse_api_resp_to_page
'tags': tags
}
))
return JmSearchPage(content, total)
@classmethod
def parse_html_to_category_page(cls, html: str) -> JmSearchPage:
content = []
total = int(PatternTool.match_or_default(html, *cls.pattern_html_search_total))
album_info_list = cls.pattern_html_category_album_info_list.findall(html)
for (album_id, title, tag_text) in album_info_list:
tags = cls.pattern_html_search_tags.findall(tag_text)
content.append((
album_id, {
'name': title, # 改成name是为了兼容 parse_api_resp_to_page
'tags': tags
}
))
return JmSearchPage(content, total)
@classmethod
def parse_html_to_favorite_page(cls, html: str) -> JmFavoritePage:
total = int(PatternTool.require_match(
html,
cls.pattern_html_favorite_total,
'未匹配到收藏夹的本子总数',
))
# 收藏夹的本子结果
content = cls.pattern_html_favorite_content.findall(html)
content = [
(aid, {'name': atitle})
for aid, atitle in content
]
# 匹配收藏夹列表
p1, p2 = cls.pattern_html_favorite_folder_list
folder_list_text = PatternTool.require_match(html, p1, '未匹配到收藏夹列表')
folder_list_raw = p2.findall(folder_list_text)
folder_list = [{'name': fname, 'FID': fid} for fid, fname in folder_list_raw]
return JmFavoritePage(content, folder_list, total)
@classmethod
def parse_api_to_search_page(cls, data: AdvancedDict) -> JmSearchPage:
"""
model_data: {
"search_query": "MANA",
"total": "177",
"content": [
{
"id": "441923",
"author": "MANA",
"description": "",
"name": "[MANA] 神里绫华5",
"image": "",
"category": {
"id": "1",
"title": "同人"
},
"category_sub": {
"id": "1",
"title": "同人"
}
}
]
}
"""
total: int = int(data.total or 0) # 2024.1.5 data.total可能为None
content = cls.adapt_content(data.content)
return JmSearchPage(content, total)
@classmethod
def parse_api_to_favorite_page(cls, data: AdvancedDict) -> JmFavoritePage:
"""
{
"list": [
{
"id": "363859",
"author": "紺菓",
"description": "",
"name": "[無邪氣漢化組] (C99) [紺色果實 (紺菓)] サレンの樂しい夢 (プリンセスコネクト!Re:Dive) [中國翻譯]",
"latest_ep": null,
"latest_ep_aid": null,
"image": "",
"category": {
"id": "1",
"title": "同人"
},
"category_sub": {
"id": "1",
"title": "同人"
}
}
],
"folder_list": [
{
"0": "123",
"FID": "123",
"1": "456",
"UID": "456",
"2": "收藏夹名",
"name": "收藏夹名"
}
],
"total": "87",
"count": 20
}
"""
total: int = int(data.total)
# count: int = int(data.count)
content = cls.adapt_content(data.list)
folder_list = data.get('folder_list', [])
return JmFavoritePage(content, folder_list, total)
@classmethod
def adapt_content(cls, content):
def adapt_item(item: AdvancedDict):
item: dict = item.src_dict
item.setdefault('tags', [])
return item
content = [
(item.id, adapt_item(item)) for item in content
]
return content
class JmApiAdaptTool:
"""
本类负责把移动端的api返回值适配为标准的实体类
# album
{
"id": 123,
"name": "[狗野叉漢化]",
"author": [
"AREA188"
],
"images": [
"00004.webp"
],
"description": null,
"total_views": "41314",
"likes": "918",
"series": [],
"series_id": "0",
"comment_total": "5",
"tags": [
"全彩",
"中文"
],
"works": [],
"actors": [],
"related_list": [
{
"id": "333718",
"author": "been",
"description": "",
"name": "[been]The illusion of lies1[中國語][無修正][全彩]",
"image": ""
}
],
"liked": false,
"is_favorite": false
}
# photo
{
"id": 413446,
"series": [
{
"id": "487043",
"name": "第48話",
"sort": "48"
}
],
"tags": "慾望 調教 NTL 地鐵 戲劇",
"name": "癡漢成癮-第2話",
"images": [
"00047.webp"
],
"series_id": "400222",
"is_favorite": false,
"liked": false
}
"""
field_adapter = {
JmAlbumDetail: [
'likes',
'tags',
'works',
'actors',
'related_list',
'name',
('id', 'album_id'),
('author', 'authors'),
('total_views', 'views'),
('comment_total', 'comment_count'),
],
JmPhotoDetail: [
'name',
'series_id',
'tags',
('id', 'photo_id'),
('images', 'page_arr'),
]
}
@classmethod
def parse_entity(cls, data: dict, clazz: type):
adapter = cls.get_adapter(clazz)
fields = {}
for k in adapter:
if isinstance(k, str):
v = data[k]
fields[k] = v
elif isinstance(k, tuple):
k, rename_k = k
v = data[k]
fields[rename_k] = v
if issubclass(clazz, JmAlbumDetail):
cls.post_adapt_album(data, clazz, fields)
else:
cls.post_adapt_photo(data, clazz, fields)
return clazz(**fields)
@classmethod
def get_adapter(cls, clazz: type):
for k, v in cls.field_adapter.items():
if issubclass(clazz, k):
return v
ExceptionTool.raises(f'不支持的类型: {clazz}')
@classmethod
def post_adapt_album(cls, data: dict, _clazz: type, fields: dict):
series = data['series']
episode_list = []
for chapter in series:
chapter = AdvancedDict(chapter)
# photo_id, photo_index, photo_title, photo_pub_date
episode_list.append(
(chapter.id, chapter.sort, chapter.name)
)
fields['episode_list'] = episode_list
for it in 'scramble_id', 'page_count', 'pub_date', 'update_date':
fields[it] = '0'
@classmethod
def post_adapt_photo(cls, data: dict, _clazz: type, fields: dict):
# 1. 获取sort字段如果data['series']中没有使用默认值1
sort = 1
series: list = data['series'] # series中的sort从1开始
for chapter in series:
chapter = AdvancedDict(chapter)
if int(chapter.id) == int(data['id']):
sort = chapter.sort
break
fields['sort'] = sort
import random
fields['data_original_domain'] = random.choice(JmModuleConfig.DOMAIN_IMAGE_LIST)
class JmImageTool:
@classmethod
def save_resp_img(cls, resp: Any, filepath: str, need_convert=True):
"""
接收HTTP响应对象将其保存到图片文件.
如果需要改变图片的文件格式,比如 .jpg → .png则需要指定参数 neet_convert=True.
如果不需要改变图片的文件格式,使用 need_convert=False可以跳过PIL解析图片效率更高.
:param resp: JmImageResp
:param filepath: 图片文件路径
:param need_convert: 是否转换图片
"""
if need_convert is False:
cls.save_directly(resp, filepath)
else:
cls.save_image(cls.open_image(resp.content), filepath)
@classmethod
def save_image(cls, image: Image, filepath: str):
"""
保存图片
:param image: PIL.Image对象
:param filepath: 保存文件路径
"""
image.save(filepath)
@classmethod
def save_directly(cls, resp, filepath):
from common import save_resp_content
save_resp_content(resp, filepath)
@classmethod
def decode_and_save(cls,
num: int,
img_src: Image,
decoded_save_path: str
) -> None:
"""
解密图片并保存
:param num: 分割数,可以用 cls.calculate_segmentation_num 计算
:param img_src: 原始图片
:param decoded_save_path: 解密图片的保存路径
"""
# 无需解密,直接保存
if num == 0:
cls.save_image(img_src, decoded_save_path)
return
import math
w, h = img_src.size
# 创建新的解密图片
img_decode = Image.new("RGB", (w, h))
over = h % num
for i in range(num):
move = math.floor(h / num)
y_src = h - (move * (i + 1)) - over
y_dst = move * i
if i == 0:
move += over
else:
y_dst += over
img_decode.paste(
img_src.crop((
0, y_src,
w, y_src + move
)),
(
0, y_dst,
w, y_dst + move
)
)
# save every step result
# cls.save_image(img_decode, change_file_name(
# decoded_save_path,
# f'{of_file_name(decoded_save_path, trim_suffix=True)}_{i}{of_file_suffix(decoded_save_path)}'
# ))
# 保存到新的解密文件
cls.save_image(img_decode, decoded_save_path)
@classmethod
def open_image(cls, fp: Union[str, bytes]):
from io import BytesIO
fp = fp if isinstance(fp, str) else BytesIO(fp)
return Image.open(fp)
@classmethod
def get_num(cls, scramble_id, aid, filename: str) -> int:
"""
获得图片分割数
"""
scramble_id = int(scramble_id)
aid = int(aid)
if aid < scramble_id:
return 0
elif aid < JmMagicConstants.SCRAMBLE_268850:
return 10
else:
import hashlib
x = 10 if aid < JmMagicConstants.SCRAMBLE_421926 else 8
s = f"{aid}{filename}" # 拼接
s = s.encode()
s = hashlib.md5(s).hexdigest()
num = ord(s[-1])
num %= x
num = num * 2 + 2
return num
@classmethod
def get_num_by_url(cls, scramble_id, url) -> int:
"""
获得图片分割数
"""
return cls.get_num(
scramble_id,
aid=JmcomicText.parse_to_jm_id(url),
filename=of_file_name(url, True),
)
@classmethod
def get_num_by_detail(cls, detail: JmImageDetail) -> int:
"""
获得图片分割数
"""
return cls.get_num(detail.scramble_id, detail.aid, detail.img_file_name)
class JmCryptoTool:
"""
禁漫加解密相关逻辑
"""
@classmethod
def token_and_tokenparam(cls,
ts,
ver=None,
secret=None,
):
"""
计算禁漫接口的请求headers的token和tokenparam
:param ts: 时间戳
:param ver: app版本
:param secret: 密钥
:return (token, tokenparam)
"""
if ver is None:
ver = JmMagicConstants.APP_VERSION
if secret is None:
secret = JmMagicConstants.APP_TOKEN_SECRET
# tokenparam: 1700566805,1.6.3
tokenparam = '{},{}'.format(ts, ver)
# token: 81498a20feea7fbb7149c637e49702e3
token = cls.md5hex(f'{ts}{secret}')
return token, tokenparam
@classmethod
def decode_resp_data(cls,
data: str,
ts,
secret=None,
) -> str:
"""
解密接口返回值
:param data: resp.json()['data']
:param ts: 时间戳
:param secret: 密钥
:return: json格式的字符串
"""
if secret is None:
secret = JmMagicConstants.APP_DATA_SECRET
# 1. base64解码
import base64
data_b64 = base64.b64decode(data)
# 2. AES-ECB解密
key = cls.md5hex(f'{ts}{secret}').encode('utf-8')
from Crypto.Cipher import AES
data_aes = AES.new(key, AES.MODE_ECB).decrypt(data_b64)
# 3. 移除末尾的padding
data = data_aes[:-data_aes[-1]]
# 4. 解码为字符串 (json)
res = data.decode('utf-8')
return res
@classmethod
def md5hex(cls, key: str):
ExceptionTool.require_true(isinstance(key, str), 'key参数需为字符串')
from hashlib import md5
return md5(key.encode("utf-8")).hexdigest()

8
jm/src/pixiv/__init__.py Executable file
View File

@@ -0,0 +1,8 @@
from pixivpy3 import AppPixivAPI
api = AppPixivAPI()
# 作品推荐
json_result = api.illust_recommended()
print(json_result)
illust = json_result.illusts[0]
print(f">>> {illust.title}, origin url: {illust.image_urls.large}")