from pathlib import Path import json import itertools import yaml import jsonschema from typing import Any, List, NotRequired, Optional, TypedDict import requests try: from yachalk import chalk yachalk_imported = True except ModuleNotFoundError: yachalk_imported = False dataset_path = Path('dataset') output_path = Path('pages') dataset_info = dataset_path / Path('dataset.json') token = "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" def error(msg: str) -> Exception: print(chalk.red(msg) if yachalk_imported else "Error: {}".format(msg)) return Exception(msg) def warning(msg: str): print(chalk.yellow(msg) if yachalk_imported else "Warning: {}".format(msg)) class License(TypedDict): key: str name: str spdx_id: str url: str node_id: str class Permissions(TypedDict): admin: bool maintain: bool push: bool triage: bool pull: bool class Owner(TypedDict): login: str id: int node_id: str avatar_url: str gravatar_id: str url: str html_url: str followers_url: str following_url: str gists_url: str starred_url: str subscriptions_url: str organizations_url: str repos_url: str events_url: str received_events_url: str type: str site_admin: bool name: NotRequired[str] company: NotRequired[Optional[str]] blog: NotRequired[str] location: NotRequired[Optional[str]] email: NotRequired[Optional[str]] hireable: NotRequired[Optional[bool]] bio: NotRequired[Optional[str]] twitter_username: NotRequired[Optional[str]] public_repos: NotRequired[int] public_gists: NotRequired[int] followers: NotRequired[int] following: NotRequired[int] created_at: NotRequired[str] updated_at: NotRequired[str] class GithubRepositoryInformation(TypedDict): id: int node_id: str name: str full_name: str private: bool owner: Owner html_url: str description: Optional[str] fork: bool url: str forks_url: str keys_url: str collaborators_url: str teams_url: str hooks_url: str issue_events_url: str events_url: str assignees_url: str branches_url: str tags_url: str blobs_url: str git_tags_url: str git_refs_url: str trees_url: str statuses_url: str languages_url: str stargazers_url: str contributors_url: str subscribers_url: str subscription_url: str commits_url: str git_commits_url: str comments_url: str issue_comment_url: str contents_url: str compare_url: str merges_url: str archive_url: str downloads_url: str issues_url: str pulls_url: str milestones_url: str notifications_url: str labels_url: str releases_url: str deployments_url: str created_at: str updated_at: str pushed_at: str git_url: str ssh_url: str clone_url: str svn_url: str homepage: Optional[str] size: int stargazers_count: int watchers_count: int language: str has_issues: bool has_projects: bool has_downloads: bool has_wiki: bool has_pages: bool forks_count: int mirror_url: None archived: bool disabled: bool open_issues_count: int license: Optional[License] allow_forking: bool is_template: bool web_commit_signoff_required: bool topics: List[str] visibility: str forks: int open_issues: int watchers: int default_branch: str permissions: Permissions temp_clone_token: str organization: NotRequired[Owner] network_count: int subscribers_count: int class ModelInformation(TypedDict): title: NotRequired[str] slug: str branch: NotRequired[str] data: GithubRepositoryInformation owner: Owner stars: int forks: int owner_name: str owner_slug: str s: int e: int i: int a: int t: int l: int tech: List[str] Dataset = dict[str, ModelInformation] def open_dataset() -> Dataset: with open(dataset_info, 'r') as f: return json.load(f) def save_dataset(dataset: Dataset): with open(dataset_info, 'w') as f: json.dump(dataset, f, indent=4) def get_json(uri: str): print(uri) resp = requests.get(url=uri, headers={"Authorization": f"Bearer {token}"}) print(resp) if not resp.ok: try: resp_error = resp.json()['message'] except Exception: resp_error = resp.text raise Exception(f"Invalid response: {resp_error}") return resp.json() def get_repo(slug: str): return get_json(f"https://api.github.com/repos/{slug}") def get_user(name: str): return get_json(f"https://api.github.com/users/{name}") def get_file(slug: str, path: str): return get_json(f"https://api.github.com/repos/{slug}/contents/{path}") def plural(amount: int, name: str, plural: str = 's'): return f"{amount} {name}{plural[:amount^1]}" from typing import TypedDict class Artifact(TypedDict): file: str lines: list[int] class SecurityRule(TypedDict): status: str argument: str artifacts: NotRequired[list[Artifact]] rule_schema = yaml.safe_load("""type: object additionalProperties: no required: - status - argument properties: status: type: string enum: - disregarded - observed - not applicable - unknown argument: type: string artifacts: type: array items: type: object properties: file: type: string lines: type: array items: type: integer""") def check_security_rules(security_rules: dict[Any, Any] | None) -> dict[int, SecurityRule]: if security_rules is None: raise Exception("Security rules file is empty!") for n in range(1, 19): try: rule = security_rules.get(n, None) if rule is None: raise jsonschema.ValidationError(f"Rule {n} is not evaluated") jsonschema.validate(rule, rule_schema) rule: SecurityRule if rule["status"] == "unknown": warning(f"Rule {n} is still unknown!") except jsonschema.ValidationError as e: warning("Not checking further rules!") raise Exception("Security rule {n}: {msg} at $.{n}.{path}".format(n=n, msg=e.message, path=e.json_path)) from e return dict(sorted(security_rules.items())) update_dataset = False def get_name(slug: str): return slug[slug.find('/')+1:] def get_tag_slug(tag: str) -> str: return tag.lower().replace(' ', '_') rule_names = { 1: "API Gateway", 2: "Mutual Authentication", 3: "Decoupled Authentication", 4: "Internal Identity Represenation", 5: "Authentication Token Validation", 6: "Login Rate Limiting", 7: "Edge Encryption", 8: "Internal Encryption", 9: "Central Logging Subsystem", 10: "Local Logging Agent", 11: "Log Sanitization", 12: "Log Message Broker", 13: "Circuit Breaker", 14: "Load Balancing", 15: "Service Mesh Usage Limits", 16: "Service Registry Deployment", 17: "Service Registry Validation", 18: "Secret Manager", } def artifact_to_string(info: ModelInformation, artifact: Artifact): file = Path(artifact['file']) filename = file.name file_url = f"https://github.com/{info['slug']}/blob/{info.get('branch', 'master')}/{artifact['file']}" return f"- {filename}: Line{'s'[:len(artifact['lines'])^1]}: {', '.join(f'[{line}]({file_url}#L{line})' for line in artifact['lines'])}" def rule_to_string(info: ModelInformation, id: int, rule: SecurityRule | None): if rule is None: # warning(f"Rule {id} is missing!") # TODO Enable warning return "" text = f"""#### Rule {id}: {rule_names[id]} This rule is {rule['status']}: {rule['argument']}""" artifacts = rule.get("artifacts", []) if len(artifacts) > 0: text = text + f""" Artifacts: {chr(10).join(artifact_to_string(info, artifact) for artifact in artifacts)}""" return text def write_security_rules(info: ModelInformation, security_rules: dict[int, SecurityRule]): return f"""## Security Rules ### Authentication / Authorization {(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(1, 7))} ### Encryption {(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(7, 9))} ### Logging {(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(9, 13))} ### Availability {(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(13, 16))} ### Service Registry {(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(16, 18))} ### Secret Management {(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(18, 19))}""" def write_model_readmes(dataset: Dataset): for model_id, info in dataset.items(): dir = output_path / 'dataset' readme = dir / f'{model_id}.md' slug = info['slug'] data = info.get('data') if not data: data = get_repo(slug) info['data'] = data owner_url = data.get('owner', {}).get('url') if not owner_url: raise Exception(f'No owner in repo {slug}!') owner = info.get('owner') if not owner: owner = get_json(owner_url) info['owner'] = owner owner_name = owner.get('name') if not owner_name: raise Exception(f'No owner name in repo {slug}!') stars = data['stargazers_count'] forks = data['forks'] owner_slug = owner['login'] info['stars'] = stars info['forks'] = forks info['owner_name'] = owner_name info['owner_slug'] = owner_slug model_path = dataset_path / model_id security_rules_file = model_path / 'security_rules.yaml' model_file = model_path / f"{model_id}.py" with model_file.open("r") as f: model = f.read() try: with security_rules_file.open('r') as f: security_rules = yaml.safe_load(f) security_rules = check_security_rules(security_rules) except FileNotFoundError: warning("Security rules file not found at {}".format(security_rules_file)) security_rules = {} except Exception as e: warning("Security rules file at {} is invalid: {}".format(security_rules_file, e)) security_rules = {} print(f"Writing readme file {readme}") dir.mkdir(exist_ok=True) with readme.open('w', encoding="utf-8") as f: f.write(f"""--- title: {slug} keywords: model TODO tags: [{', '.join(get_tag_slug(tech) for tech in info['tech'])}] sidebar: datasetdoc_sidebar permalink: {model_id}.html toc: false --- ## Repository Information Repository: [GitHub](https://github.com/{slug}) Owner: [{owner_name}](https://github.com/{owner_slug}) The repository has {plural(stars, 'star')} and was forked {plural(forks, 'time')}. The codebase consists of {plural(info['l'], 'line')} of code and makes use of the following technologies: {chr(10).join(f'{tech}' for tech in info['tech'])} ## Data Flow Diagram ### DFD Model {{% include note.html content="Download the [model file](../../dataset/{model_id}/{model_id}.py)" %}} The images below were generated by executing the model file. The DFD is represented as a CodeableModels file. ```python {model} ``` ### Statistics The Application consists of a total of {plural(info['t'], 'element')}: Element | Count -- | -- Services | {info['s']} External Entities | {info['e']} Information Flows | {info['i']} Annotations | {info['a']} Total Items | {info['t']} ### Diagram Formats: - [PlantUML Model](../../dataset/{model_id}/{model_id}/{model_id}.txt) - [SVG Vector Image](../../dataset/{model_id}/{model_id}/{model_id}.svg) - [PNG Raster Image](../../dataset/{model_id}/{model_id}/{model_id}.png) ![Data Flow Diagram](../../dataset/{model_id}/{model_id}/{model_id}.svg) {write_security_rules(info, security_rules)} """) def write_root_readme(dataset: Dataset): print(f"Writing main readme file") with open('index.md', 'w', encoding="utf-8") as f: f.write(f"""--- title: code2DFD Documentation keywords: code2DFD introduction tags: [] sidebar: datasetdoc_sidebar permalink: index.html summary: Dataset of dataflow diagrams of microservice applications. --- # code2DFD {{% include image.html file="TUHH_logo-wortmarke_en_rgb.svg" alt="TUHH Logo" max-width="350" %}} This project is developed by the Institute of Software Security at Hamburg University of Technology. {{% include image.html file="company_logo_big.png" alt="SoftSec Institute Logo" max-width="350" %}} This is a description. Duis proident aliqua laborum reprehenderit duis nostrud sint duis anim Lorem anim ut. ## DFD Items Do culpa deserunt est excepteur amet. Non pariatur ea elit ad eiusmod veniam exercitation nulla. Commodo do adipisicing amet et. Voluptate laboris commodo dolor eu mollit ipsum. Amet reprehenderit velit eu culpa amet exercitation. Elit esse ullamco duis mollit quis. Eiusmod qui reprehenderit sunt cupidatat Lorem anim occaecat enim sint eiusmod tempor. ## Use-Cases Veniam culpa nostrud id laborum deserunt consectetur consectetur voluptate. Sint aute cupidatat velit irure elit laboris anim labore esse labore. Quis ullamco ut consequat amet. Enim sit laboris deserunt veniam duis aliqua irure proident. """) print(f"Writing models readme file") with open('dataset.md', 'w', encoding="utf-8") as f: f.write(f"""--- title: code2DFD Dataset keywords: dataset models tags: [] sidebar: datasetdoc_sidebar permalink: dataset.html summary: Dataset of dataflow diagrams of microservice applications. datatable: true --- # Dataset of Dataflow Diagrams This repository contains of {len(dataset)} manually created dataflow diagrams (DFDs) of microservice applications found on GitHub. The dataset is published as an additional contribution to "Automatic Extraction of Security-Rich Dataflow Diagrams for Microservice Applications written in Java" [Simon Schneider, Riccardo Scandariato]. Each folder in the [`dataset`](dataset/) directory contains one DFD in a [CodeableModels](https://github.com/uzdun/CodeableModels)-format that can be executed to generate PNG, SVG and TXT files for the DFD. Each model refers to stereotypes and metaclasses from the [metamodel](microservice_dfds_metamodel.py) which needs to be imported. This repository already contains rendered versions for each model, thus setup and rendering is only necessary once changes to the models are made. ## Models
Name | Source | LoC | Stars | Forks | DFD Items | Technologies -- | -- | -- | -- | -- | -- | -- {chr(10).join(f"[{info['slug']}]({model_id}.html) | [GitHub](https://github.com/{info['slug']}) | {info['l']} | {info['stars']} | {info['forks']} | {info['t']} | {len(info['tech'])}" for model_id, info in dataset.items())}
""") def write_tag_readme(dataset: Dataset): tag_dir = output_path / 'tags' known_tech = set(tech for model in dataset.values() for tech in model['tech']) print(f"Writing tag data file") tags_data_path = Path('_data') tags_data_file = tags_data_path / 'tags.yml' tags_data_path.mkdir(exist_ok=True, parents=True) with tags_data_file.open('r+') as f: tags = yaml.safe_load(f) tags['allowed-tags'] = list(sorted(set(itertools.chain(tags['allowed-tags'], (get_tag_slug(tech) for tech in known_tech))))) f.seek(0) yaml.dump(tags, f) f.truncate() for tech in known_tech: slug = get_tag_slug(tech) info_file = tag_dir / f'tag_{slug}.md' print(f"Writing tag file for {tech}") tag_dir.mkdir(exist_ok=True, parents=True) with open(info_file, 'w', encoding="utf-8") as f: f.write(f"""--- title: "{tech}" tagName: {slug} search: exclude permalink: tag_{slug}.html sidebar: datasetdoc_sidebar hide_sidebar: true folder: tags --- {{% include taglogic.html %}} {{% include links.html %}} """) def main(): global known_tags dataset = open_dataset() write_tag_readme(dataset) write_root_readme(dataset) write_model_readmes(dataset) if update_dataset: save_dataset(dataset) yaml.dump if __name__ == '__main__': main()