Fix login
This commit is contained in:
parent
588c9105cc
commit
66a635f917
@ -10,7 +10,12 @@ class Download:
|
|||||||
quality: str
|
quality: str
|
||||||
filename: str
|
filename: str
|
||||||
|
|
||||||
def download(self, cookies: dict[str, str], destination_dir: Path):
|
def download(
|
||||||
|
self,
|
||||||
|
cookies: dict[str, str],
|
||||||
|
destination_dir: Path,
|
||||||
|
filename: str | None = None,
|
||||||
|
):
|
||||||
"""Downloads the episode into the directory @destination_dir. The cookies from @cookies are used
|
"""Downloads the episode into the directory @destination_dir. The cookies from @cookies are used
|
||||||
during the request."""
|
during the request."""
|
||||||
with requests.get(
|
with requests.get(
|
||||||
@ -23,6 +28,7 @@ class Download:
|
|||||||
) as r:
|
) as r:
|
||||||
r.raise_for_status()
|
r.raise_for_status()
|
||||||
|
|
||||||
with open(destination_dir / self.filename, "wb") as f:
|
fname = filename or self.filename
|
||||||
|
with open(destination_dir / fname, "wb") as f:
|
||||||
for chunk in r.iter_content(chunk_size=8196):
|
for chunk in r.iter_content(chunk_size=8196):
|
||||||
f.write(chunk)
|
f.write(chunk)
|
||||||
|
@ -5,53 +5,10 @@ import re
|
|||||||
import requests
|
import requests
|
||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
|
|
||||||
from akibapass_downloader.const import LOGIN_URL
|
from akibapass_downloader.const import LOGIN_URL, BASE_URL
|
||||||
|
|
||||||
|
|
||||||
def login(email: str, password: str) -> dict[str, str] | None:
|
def _check_login(soup: BeautifulSoup) -> bool:
|
||||||
"""Performs a login on the Akibapass site and returns a set of cookies
|
|
||||||
to use while performing other requests."""
|
|
||||||
login_page_req = requests.get(LOGIN_URL)
|
|
||||||
if login_page_req.status_code != 200:
|
|
||||||
return None
|
|
||||||
|
|
||||||
page = BeautifulSoup(login_page_req.text, "html.parser")
|
|
||||||
# Find the CSRF token's value.
|
|
||||||
csrf_token = page.find("meta", attrs={"name": "csrf-token"}).attrs["content"]
|
|
||||||
# Find the CSRD token's name.
|
|
||||||
csrf_param = page.find("meta", attrs={"name": "csrf-param"}).attrs["content"]
|
|
||||||
|
|
||||||
r = requests.post(
|
|
||||||
LOGIN_URL,
|
|
||||||
cookies={
|
|
||||||
# Use the session we received on the login page.
|
|
||||||
"_session": login_page_req.cookies["_session"],
|
|
||||||
},
|
|
||||||
data={
|
|
||||||
"email": email,
|
|
||||||
"password": password,
|
|
||||||
csrf_param: csrf_token,
|
|
||||||
"utf8": "✓",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
if r.status_code != 200:
|
|
||||||
return None
|
|
||||||
|
|
||||||
return {
|
|
||||||
"_session": r.cookies["_session"],
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def is_logged_in(cookies: dict[str, str]) -> bool:
|
|
||||||
"""Checks if the cookies in @cookies are stil valid."""
|
|
||||||
r = requests.get(
|
|
||||||
LOGIN_URL,
|
|
||||||
cookies=cookies,
|
|
||||||
)
|
|
||||||
if r.status_code != 200:
|
|
||||||
return False
|
|
||||||
|
|
||||||
soup = BeautifulSoup(r.text, "html.parser")
|
|
||||||
scripts = soup.find_all("script")
|
scripts = soup.find_all("script")
|
||||||
user_data_matcher = re.compile(r".*window._current_user\s*=\s*(\{.*?\}).*")
|
user_data_matcher = re.compile(r".*window._current_user\s*=\s*(\{.*?\}).*")
|
||||||
for script in scripts:
|
for script in scripts:
|
||||||
@ -77,3 +34,62 @@ def is_logged_in(cookies: dict[str, str]) -> bool:
|
|||||||
return data != {} and data.get("id") is not None
|
return data != {} and data.get("id") is not None
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
|
def login(email: str, password: str) -> dict[str, str] | None:
|
||||||
|
"""Performs a login on the Akibapass site and returns a set of cookies
|
||||||
|
to use while performing other requests."""
|
||||||
|
s = requests.Session()
|
||||||
|
login_page_req = s.get(LOGIN_URL)
|
||||||
|
if login_page_req.status_code != 200:
|
||||||
|
return None
|
||||||
|
|
||||||
|
page = BeautifulSoup(login_page_req.text, "html.parser")
|
||||||
|
# Find the CSRF token's value.
|
||||||
|
csrf_token = page.find("meta", attrs={"name": "csrf-token"}).attrs["content"]
|
||||||
|
# Find the CSRD token's name.
|
||||||
|
csrf_param = page.find("meta", attrs={"name": "csrf-param"}).attrs["content"]
|
||||||
|
|
||||||
|
r = s.post(
|
||||||
|
LOGIN_URL,
|
||||||
|
# cookies={
|
||||||
|
# # Use the session we received on the login page.
|
||||||
|
# "_session": login_page_req.cookies["_session"],
|
||||||
|
# "__cf_bm": login_page_req.cookies["__cf_bm"],
|
||||||
|
# },
|
||||||
|
headers={
|
||||||
|
"Referer": LOGIN_URL,
|
||||||
|
"Content-Type": "application/x-www-form-urlencoded",
|
||||||
|
"Origin": BASE_URL,
|
||||||
|
"DNT": "1",
|
||||||
|
},
|
||||||
|
data={
|
||||||
|
"email": email,
|
||||||
|
"password": password,
|
||||||
|
csrf_param: csrf_token,
|
||||||
|
"utf8": "✓",
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if r.status_code != 200:
|
||||||
|
return None
|
||||||
|
if not _check_login(BeautifulSoup(r.text, "html.parser")):
|
||||||
|
return None
|
||||||
|
|
||||||
|
return {
|
||||||
|
"_session": r.cookies["_session"],
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def is_logged_in(cookies: dict[str, str]) -> dict[str, str]:
|
||||||
|
"""Checks if the cookies in @cookies are stil valid."""
|
||||||
|
r = requests.get(
|
||||||
|
LOGIN_URL,
|
||||||
|
cookies=cookies,
|
||||||
|
headers={
|
||||||
|
"Referer": LOGIN_URL,
|
||||||
|
},
|
||||||
|
)
|
||||||
|
if r.status_code != 200:
|
||||||
|
return False
|
||||||
|
|
||||||
|
return _check_login(BeautifulSoup(r.text, "html.parser"))
|
||||||
|
Loading…
Reference in New Issue
Block a user