ftb-docker/jsongetcache.py

55 lines
1.8 KiB
Python
Raw Permalink Normal View History

2022-11-21 16:33:29 +01:00
from datetime import datetime, timedelta
from json import dump, load
from pathlib import Path
from typing import Any, Callable, Generic, TypeVar, TypedDict
from requests import RequestException, get
T = TypeVar('T')
class Cached(TypedDict, Generic[T]):
loaded: int
data: T
def get_resource(url: str):
with get(url) as request:
request.raise_for_status()
return request.json()
def load_cached(url: str, cache_file: Path, loader: Callable[[str], T] = get_resource, max_age: timedelta = timedelta(hours=1), reload: bool = False, encoding: str = "utf-8") -> T:
if reload:
# print(f"Forced cache reload for {cache_file}!")
pass
else:
try:
with cache_file.open("r", encoding=encoding) as f:
cached = load(f)
data: T = cached["data"]
time = datetime.fromtimestamp(cached["loaded"])
cache_age = datetime.now() - time
if cache_age <= max_age:
# print(f"Loaded cache file with age {cache_age}.")
return data
# print(f"Cache is older than {max_age} and will be reloaded.")
except FileNotFoundError:
# print(f"File {cache_file} not cached, will be reloaded!")
pass
data = loader(url)
data_dict_tmp: Any = data
data_dict: dict[str, Any] = data_dict_tmp
status: None | str = data_dict.get("status", None)
if status is not None and status.lower() == "error":
raise RequestException(data_dict.get("message", "No error message!"))
# Save to cache
cached: Cached[T] = {"data": data, "loaded": int(
datetime.now().timestamp())}
cache_file.parent.mkdir(parents=True, exist_ok=True)
with cache_file.open("w", encoding=encoding) as f:
dump(cached, f)
return data