From 94545afb28300c3e14446af666d2734f72766323 Mon Sep 17 00:00:00 2001 From: Michael Chen Date: Wed, 23 Nov 2022 13:00:38 +0100 Subject: [PATCH] Added typing for Dataset Added security rules on model pages Hide toc for long security rules Use datatable for dataset table --- createreadmes.py | 252 ++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 241 insertions(+), 11 deletions(-) diff --git a/createreadmes.py b/createreadmes.py index 58880f0..04fddb2 100644 --- a/createreadmes.py +++ b/createreadmes.py @@ -3,7 +3,7 @@ import json import itertools import yaml import jsonschema -from typing import Any, TypedDict +from typing import Any, List, NotRequired, Optional, TypedDict import requests try: from yachalk import chalk @@ -23,11 +23,168 @@ def error(msg: str) -> Exception: def warning(msg: str): print(chalk.yellow(msg) if yachalk_imported else "Warning: {}".format(msg)) -def open_dataset() -> dict[str, Any]: +class License(TypedDict): + key: str + name: str + spdx_id: str + url: str + node_id: str + + +class Permissions(TypedDict): + admin: bool + maintain: bool + push: bool + triage: bool + pull: bool + + +class Owner(TypedDict): + login: str + id: int + node_id: str + avatar_url: str + gravatar_id: str + url: str + html_url: str + followers_url: str + following_url: str + gists_url: str + starred_url: str + subscriptions_url: str + organizations_url: str + repos_url: str + events_url: str + received_events_url: str + type: str + site_admin: bool + name: NotRequired[str] + company: NotRequired[Optional[str]] + blog: NotRequired[str] + location: NotRequired[Optional[str]] + email: NotRequired[Optional[str]] + hireable: NotRequired[Optional[bool]] + bio: NotRequired[Optional[str]] + twitter_username: NotRequired[Optional[str]] + public_repos: NotRequired[int] + public_gists: NotRequired[int] + followers: NotRequired[int] + following: NotRequired[int] + created_at: NotRequired[str] + updated_at: NotRequired[str] + + +class GithubRepositoryInformation(TypedDict): + id: int + node_id: str + name: str + full_name: str + private: bool + owner: Owner + html_url: str + description: Optional[str] + fork: bool + url: str + forks_url: str + keys_url: str + collaborators_url: str + teams_url: str + hooks_url: str + issue_events_url: str + events_url: str + assignees_url: str + branches_url: str + tags_url: str + blobs_url: str + git_tags_url: str + git_refs_url: str + trees_url: str + statuses_url: str + languages_url: str + stargazers_url: str + contributors_url: str + subscribers_url: str + subscription_url: str + commits_url: str + git_commits_url: str + comments_url: str + issue_comment_url: str + contents_url: str + compare_url: str + merges_url: str + archive_url: str + downloads_url: str + issues_url: str + pulls_url: str + milestones_url: str + notifications_url: str + labels_url: str + releases_url: str + deployments_url: str + created_at: str + updated_at: str + pushed_at: str + git_url: str + ssh_url: str + clone_url: str + svn_url: str + homepage: Optional[str] + size: int + stargazers_count: int + watchers_count: int + language: str + has_issues: bool + has_projects: bool + has_downloads: bool + has_wiki: bool + has_pages: bool + forks_count: int + mirror_url: None + archived: bool + disabled: bool + open_issues_count: int + license: Optional[License] + allow_forking: bool + is_template: bool + web_commit_signoff_required: bool + topics: List[str] + visibility: str + forks: int + open_issues: int + watchers: int + default_branch: str + permissions: Permissions + temp_clone_token: str + organization: NotRequired[Owner] + network_count: int + subscribers_count: int + + +class ModelInformation(TypedDict): + title: NotRequired[str] + slug: str + branch: NotRequired[str] + data: GithubRepositoryInformation + owner: Owner + stars: int + forks: int + owner_name: str + owner_slug: str + s: int + e: int + i: int + a: int + t: int + l: int + tech: List[str] + +Dataset = dict[str, ModelInformation] + +def open_dataset() -> Dataset: with open(dataset_info, 'r') as f: return json.load(f) -def save_dataset(dataset: dict[str, Any]): +def save_dataset(dataset: Dataset): with open(dataset_info, 'w') as f: json.dump(dataset, f, indent=4) @@ -64,7 +221,7 @@ class Artifact(TypedDict): class SecurityRule(TypedDict): status: str argument: str - artifacts: None | list[Artifact] + artifacts: NotRequired[list[Artifact]] rule_schema = yaml.safe_load("""type: object additionalProperties: no @@ -105,10 +262,9 @@ def check_security_rules(security_rules: dict[Any, Any] | None) -> dict[int, Sec if rule["status"] == "unknown": warning(f"Rule {n} is still unknown!") except jsonschema.ValidationError as e: - error("Security rule {n}: {msg} at $.{n}.{path}".format(n=n, msg=e.message, path=e.json_path)) warning("Not checking further rules!") - break - return security_rules + raise Exception("Security rule {n}: {msg} at $.{n}.{path}".format(n=n, msg=e.message, path=e.json_path)) from e + return dict(sorted(security_rules.items())) update_dataset = False @@ -118,11 +274,81 @@ def get_name(slug: str): def get_tag_slug(tag: str) -> str: return tag.lower().replace(' ', '_') -def write_model_readmes(dataset: dict[str, Any]): +rule_names = { + 1: "API Gateway", + 2: "Mutual Authentication", + 3: "Decoupled Authentication", + 4: "Internal Identity Represenation", + 5: "Authentication Token Validation", + 6: "Login Rate Limiting", + 7: "Edge Encryption", + 8: "Internal Encryption", + 9: "Central Logging Subsystem", + 10: "Local Logging Agent", + 11: "Log Sanitization", + 12: "Log Message Broker", + 13: "Circuit Breaker", + 14: "Load Balancing", + 15: "Service Mesh Usage Limits", + 16: "Service Registry Deployment", + 17: "Service Registry Validation", + 18: "Secret Manager", +} + +def artifact_to_string(info: ModelInformation, artifact: Artifact): + file = Path(artifact['file']) + filename = file.name + file_url = f"https://github.com/{info['slug']}/blob/{info.get('branch', 'master')}/{artifact['file']}" + return f"- {filename}: Line{'s'[:len(artifact['lines'])^1]}: {', '.join(f'[{line}]({file_url}#L{line})' for line in artifact['lines'])}" + + +def rule_to_string(info: ModelInformation, id: int, rule: SecurityRule | None): + if rule is None: + # warning(f"Rule {id} is missing!") # TODO Enable warning + return "" + text = f"""#### Rule {id}: {rule_names[id]} + +This rule is {rule['status']}: {rule['argument']}""" + artifacts = rule.get("artifacts", []) + if len(artifacts) > 0: + text = text + f""" + +Artifacts: +{chr(10).join(artifact_to_string(info, artifact) for artifact in artifacts)}""" + return text + +def write_security_rules(info: ModelInformation, security_rules: dict[int, SecurityRule]): + return f"""## Security Rules + +### Authentication / Authorization + +{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(1, 7))} + +### Encryption + +{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(7, 9))} + +### Logging + +{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(9, 13))} + +### Availability + +{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(13, 16))} + +### Service Registry + +{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(16, 18))} + +### Secret Management + +{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(18, 19))}""" + +def write_model_readmes(dataset: Dataset): for model_id, info in dataset.items(): dir = output_path / 'dataset' readme = dir / f'{model_id}.md' - slug: str = info['slug'] + slug = info['slug'] data = info.get('data') if not data: data = get_repo(slug) @@ -164,6 +390,7 @@ keywords: model TODO tags: [{', '.join(get_tag_slug(tech) for tech in info['tech'])}] sidebar: datasetdoc_sidebar permalink: {model_id}.html +toc: false --- ## Repository Information @@ -204,9 +431,11 @@ Formats: - [PNG Raster Image](../../dataset/{model_id}/{model_id}/{model_id}.png) ![Data Flow Diagram](../../dataset/{model_id}/{model_id}/{model_id}.svg) + +{write_security_rules(info, security_rules)} """) -def write_root_readme(dataset: dict[str, Any]): +def write_root_readme(dataset: Dataset): print(f"Writing main readme file") with open('index.md', 'w', encoding="utf-8") as f: f.write(f"""--- @@ -245,6 +474,7 @@ tags: [] sidebar: datasetdoc_sidebar permalink: dataset.html summary: Dataset of dataflow diagrams of microservice applications. +datatable: true --- # Dataset of Dataflow Diagrams @@ -262,7 +492,7 @@ Name | Source | LoC | Stars | Forks | DFD Items | Technologies
""") -def write_tag_readme(dataset: dict[str, Any]): +def write_tag_readme(dataset: Dataset): tag_dir = output_path / 'tags' known_tech = set(tech for model in dataset.values() for tech in model['tech']) print(f"Writing tag data file")