ftb-docker/jsongetcache.py
2022-11-21 16:33:29 +01:00

55 lines
1.8 KiB
Python

from datetime import datetime, timedelta
from json import dump, load
from pathlib import Path
from typing import Any, Callable, Generic, TypeVar, TypedDict
from requests import RequestException, get
T = TypeVar('T')
class Cached(TypedDict, Generic[T]):
loaded: int
data: T
def get_resource(url: str):
with get(url) as request:
request.raise_for_status()
return request.json()
def load_cached(url: str, cache_file: Path, loader: Callable[[str], T] = get_resource, max_age: timedelta = timedelta(hours=1), reload: bool = False, encoding: str = "utf-8") -> T:
if reload:
# print(f"Forced cache reload for {cache_file}!")
pass
else:
try:
with cache_file.open("r", encoding=encoding) as f:
cached = load(f)
data: T = cached["data"]
time = datetime.fromtimestamp(cached["loaded"])
cache_age = datetime.now() - time
if cache_age <= max_age:
# print(f"Loaded cache file with age {cache_age}.")
return data
# print(f"Cache is older than {max_age} and will be reloaded.")
except FileNotFoundError:
# print(f"File {cache_file} not cached, will be reloaded!")
pass
data = loader(url)
data_dict_tmp: Any = data
data_dict: dict[str, Any] = data_dict_tmp
status: None | str = data_dict.get("status", None)
if status is not None and status.lower() == "error":
raise RequestException(data_dict.get("message", "No error message!"))
# Save to cache
cached: Cached[T] = {"data": data, "loaded": int(
datetime.now().timestamp())}
cache_file.parent.mkdir(parents=True, exist_ok=True)
with cache_file.open("w", encoding=encoding) as f:
dump(cached, f)
return data