Added typing for Dataset

Added security rules on model pages
Hide toc for long security rules
Use datatable for dataset table
This commit is contained in:
Michael Chen 2022-11-23 13:00:38 +01:00
parent 57829c67f5
commit 94545afb28
Signed by: cnml
GPG Key ID: 5845BF3F82D5F629

View File

@ -3,7 +3,7 @@ import json
import itertools import itertools
import yaml import yaml
import jsonschema import jsonschema
from typing import Any, TypedDict from typing import Any, List, NotRequired, Optional, TypedDict
import requests import requests
try: try:
from yachalk import chalk from yachalk import chalk
@ -23,11 +23,168 @@ def error(msg: str) -> Exception:
def warning(msg: str): def warning(msg: str):
print(chalk.yellow(msg) if yachalk_imported else "Warning: {}".format(msg)) print(chalk.yellow(msg) if yachalk_imported else "Warning: {}".format(msg))
def open_dataset() -> dict[str, Any]: class License(TypedDict):
key: str
name: str
spdx_id: str
url: str
node_id: str
class Permissions(TypedDict):
admin: bool
maintain: bool
push: bool
triage: bool
pull: bool
class Owner(TypedDict):
login: str
id: int
node_id: str
avatar_url: str
gravatar_id: str
url: str
html_url: str
followers_url: str
following_url: str
gists_url: str
starred_url: str
subscriptions_url: str
organizations_url: str
repos_url: str
events_url: str
received_events_url: str
type: str
site_admin: bool
name: NotRequired[str]
company: NotRequired[Optional[str]]
blog: NotRequired[str]
location: NotRequired[Optional[str]]
email: NotRequired[Optional[str]]
hireable: NotRequired[Optional[bool]]
bio: NotRequired[Optional[str]]
twitter_username: NotRequired[Optional[str]]
public_repos: NotRequired[int]
public_gists: NotRequired[int]
followers: NotRequired[int]
following: NotRequired[int]
created_at: NotRequired[str]
updated_at: NotRequired[str]
class GithubRepositoryInformation(TypedDict):
id: int
node_id: str
name: str
full_name: str
private: bool
owner: Owner
html_url: str
description: Optional[str]
fork: bool
url: str
forks_url: str
keys_url: str
collaborators_url: str
teams_url: str
hooks_url: str
issue_events_url: str
events_url: str
assignees_url: str
branches_url: str
tags_url: str
blobs_url: str
git_tags_url: str
git_refs_url: str
trees_url: str
statuses_url: str
languages_url: str
stargazers_url: str
contributors_url: str
subscribers_url: str
subscription_url: str
commits_url: str
git_commits_url: str
comments_url: str
issue_comment_url: str
contents_url: str
compare_url: str
merges_url: str
archive_url: str
downloads_url: str
issues_url: str
pulls_url: str
milestones_url: str
notifications_url: str
labels_url: str
releases_url: str
deployments_url: str
created_at: str
updated_at: str
pushed_at: str
git_url: str
ssh_url: str
clone_url: str
svn_url: str
homepage: Optional[str]
size: int
stargazers_count: int
watchers_count: int
language: str
has_issues: bool
has_projects: bool
has_downloads: bool
has_wiki: bool
has_pages: bool
forks_count: int
mirror_url: None
archived: bool
disabled: bool
open_issues_count: int
license: Optional[License]
allow_forking: bool
is_template: bool
web_commit_signoff_required: bool
topics: List[str]
visibility: str
forks: int
open_issues: int
watchers: int
default_branch: str
permissions: Permissions
temp_clone_token: str
organization: NotRequired[Owner]
network_count: int
subscribers_count: int
class ModelInformation(TypedDict):
title: NotRequired[str]
slug: str
branch: NotRequired[str]
data: GithubRepositoryInformation
owner: Owner
stars: int
forks: int
owner_name: str
owner_slug: str
s: int
e: int
i: int
a: int
t: int
l: int
tech: List[str]
Dataset = dict[str, ModelInformation]
def open_dataset() -> Dataset:
with open(dataset_info, 'r') as f: with open(dataset_info, 'r') as f:
return json.load(f) return json.load(f)
def save_dataset(dataset: dict[str, Any]): def save_dataset(dataset: Dataset):
with open(dataset_info, 'w') as f: with open(dataset_info, 'w') as f:
json.dump(dataset, f, indent=4) json.dump(dataset, f, indent=4)
@ -64,7 +221,7 @@ class Artifact(TypedDict):
class SecurityRule(TypedDict): class SecurityRule(TypedDict):
status: str status: str
argument: str argument: str
artifacts: None | list[Artifact] artifacts: NotRequired[list[Artifact]]
rule_schema = yaml.safe_load("""type: object rule_schema = yaml.safe_load("""type: object
additionalProperties: no additionalProperties: no
@ -105,10 +262,9 @@ def check_security_rules(security_rules: dict[Any, Any] | None) -> dict[int, Sec
if rule["status"] == "unknown": if rule["status"] == "unknown":
warning(f"Rule {n} is still unknown!") warning(f"Rule {n} is still unknown!")
except jsonschema.ValidationError as e: except jsonschema.ValidationError as e:
error("Security rule {n}: {msg} at $.{n}.{path}".format(n=n, msg=e.message, path=e.json_path))
warning("Not checking further rules!") warning("Not checking further rules!")
break raise Exception("Security rule {n}: {msg} at $.{n}.{path}".format(n=n, msg=e.message, path=e.json_path)) from e
return security_rules return dict(sorted(security_rules.items()))
update_dataset = False update_dataset = False
@ -118,11 +274,81 @@ def get_name(slug: str):
def get_tag_slug(tag: str) -> str: def get_tag_slug(tag: str) -> str:
return tag.lower().replace(' ', '_') return tag.lower().replace(' ', '_')
def write_model_readmes(dataset: dict[str, Any]): rule_names = {
1: "API Gateway",
2: "Mutual Authentication",
3: "Decoupled Authentication",
4: "Internal Identity Represenation",
5: "Authentication Token Validation",
6: "Login Rate Limiting",
7: "Edge Encryption",
8: "Internal Encryption",
9: "Central Logging Subsystem",
10: "Local Logging Agent",
11: "Log Sanitization",
12: "Log Message Broker",
13: "Circuit Breaker",
14: "Load Balancing",
15: "Service Mesh Usage Limits",
16: "Service Registry Deployment",
17: "Service Registry Validation",
18: "Secret Manager",
}
def artifact_to_string(info: ModelInformation, artifact: Artifact):
file = Path(artifact['file'])
filename = file.name
file_url = f"https://github.com/{info['slug']}/blob/{info.get('branch', 'master')}/{artifact['file']}"
return f"- {filename}: Line{'s'[:len(artifact['lines'])^1]}: {', '.join(f'[{line}]({file_url}#L{line})' for line in artifact['lines'])}"
def rule_to_string(info: ModelInformation, id: int, rule: SecurityRule | None):
if rule is None:
# warning(f"Rule {id} is missing!") # TODO Enable warning
return ""
text = f"""#### Rule {id}: {rule_names[id]}
This rule is {rule['status']}: {rule['argument']}"""
artifacts = rule.get("artifacts", [])
if len(artifacts) > 0:
text = text + f"""
Artifacts:
{chr(10).join(artifact_to_string(info, artifact) for artifact in artifacts)}"""
return text
def write_security_rules(info: ModelInformation, security_rules: dict[int, SecurityRule]):
return f"""## Security Rules
### Authentication / Authorization
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(1, 7))}
### Encryption
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(7, 9))}
### Logging
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(9, 13))}
### Availability
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(13, 16))}
### Service Registry
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(16, 18))}
### Secret Management
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(18, 19))}"""
def write_model_readmes(dataset: Dataset):
for model_id, info in dataset.items(): for model_id, info in dataset.items():
dir = output_path / 'dataset' dir = output_path / 'dataset'
readme = dir / f'{model_id}.md' readme = dir / f'{model_id}.md'
slug: str = info['slug'] slug = info['slug']
data = info.get('data') data = info.get('data')
if not data: if not data:
data = get_repo(slug) data = get_repo(slug)
@ -164,6 +390,7 @@ keywords: model TODO
tags: [{', '.join(get_tag_slug(tech) for tech in info['tech'])}] tags: [{', '.join(get_tag_slug(tech) for tech in info['tech'])}]
sidebar: datasetdoc_sidebar sidebar: datasetdoc_sidebar
permalink: {model_id}.html permalink: {model_id}.html
toc: false
--- ---
## Repository Information ## Repository Information
@ -204,9 +431,11 @@ Formats:
- [PNG Raster Image](../../dataset/{model_id}/{model_id}/{model_id}.png) - [PNG Raster Image](../../dataset/{model_id}/{model_id}/{model_id}.png)
![Data Flow Diagram](../../dataset/{model_id}/{model_id}/{model_id}.svg) ![Data Flow Diagram](../../dataset/{model_id}/{model_id}/{model_id}.svg)
{write_security_rules(info, security_rules)}
""") """)
def write_root_readme(dataset: dict[str, Any]): def write_root_readme(dataset: Dataset):
print(f"Writing main readme file") print(f"Writing main readme file")
with open('index.md', 'w', encoding="utf-8") as f: with open('index.md', 'w', encoding="utf-8") as f:
f.write(f"""--- f.write(f"""---
@ -245,6 +474,7 @@ tags: []
sidebar: datasetdoc_sidebar sidebar: datasetdoc_sidebar
permalink: dataset.html permalink: dataset.html
summary: Dataset of dataflow diagrams of microservice applications. summary: Dataset of dataflow diagrams of microservice applications.
datatable: true
--- ---
# Dataset of Dataflow Diagrams # Dataset of Dataflow Diagrams
@ -262,7 +492,7 @@ Name | Source | LoC | Stars | Forks | DFD Items | Technologies
<div class="datatable-end"></div> <div class="datatable-end"></div>
""") """)
def write_tag_readme(dataset: dict[str, Any]): def write_tag_readme(dataset: Dataset):
tag_dir = output_path / 'tags' tag_dir = output_path / 'tags'
known_tech = set(tech for model in dataset.values() for tech in model['tech']) known_tech = set(tech for model in dataset.values() for tech in model['tech'])
print(f"Writing tag data file") print(f"Writing tag data file")