Compare commits

...

2 Commits

Author SHA1 Message Date
e623b960a6
Embedded python model 2022-11-23 13:07:21 +01:00
94545afb28
Added typing for Dataset
Added security rules on model pages
Hide toc for long security rules
Use datatable for dataset table
2022-11-23 13:00:38 +01:00

View File

@ -3,7 +3,7 @@ import json
import itertools
import yaml
import jsonschema
from typing import Any, TypedDict
from typing import Any, List, NotRequired, Optional, TypedDict
import requests
try:
from yachalk import chalk
@ -23,11 +23,168 @@ def error(msg: str) -> Exception:
def warning(msg: str):
print(chalk.yellow(msg) if yachalk_imported else "Warning: {}".format(msg))
def open_dataset() -> dict[str, Any]:
class License(TypedDict):
key: str
name: str
spdx_id: str
url: str
node_id: str
class Permissions(TypedDict):
admin: bool
maintain: bool
push: bool
triage: bool
pull: bool
class Owner(TypedDict):
login: str
id: int
node_id: str
avatar_url: str
gravatar_id: str
url: str
html_url: str
followers_url: str
following_url: str
gists_url: str
starred_url: str
subscriptions_url: str
organizations_url: str
repos_url: str
events_url: str
received_events_url: str
type: str
site_admin: bool
name: NotRequired[str]
company: NotRequired[Optional[str]]
blog: NotRequired[str]
location: NotRequired[Optional[str]]
email: NotRequired[Optional[str]]
hireable: NotRequired[Optional[bool]]
bio: NotRequired[Optional[str]]
twitter_username: NotRequired[Optional[str]]
public_repos: NotRequired[int]
public_gists: NotRequired[int]
followers: NotRequired[int]
following: NotRequired[int]
created_at: NotRequired[str]
updated_at: NotRequired[str]
class GithubRepositoryInformation(TypedDict):
id: int
node_id: str
name: str
full_name: str
private: bool
owner: Owner
html_url: str
description: Optional[str]
fork: bool
url: str
forks_url: str
keys_url: str
collaborators_url: str
teams_url: str
hooks_url: str
issue_events_url: str
events_url: str
assignees_url: str
branches_url: str
tags_url: str
blobs_url: str
git_tags_url: str
git_refs_url: str
trees_url: str
statuses_url: str
languages_url: str
stargazers_url: str
contributors_url: str
subscribers_url: str
subscription_url: str
commits_url: str
git_commits_url: str
comments_url: str
issue_comment_url: str
contents_url: str
compare_url: str
merges_url: str
archive_url: str
downloads_url: str
issues_url: str
pulls_url: str
milestones_url: str
notifications_url: str
labels_url: str
releases_url: str
deployments_url: str
created_at: str
updated_at: str
pushed_at: str
git_url: str
ssh_url: str
clone_url: str
svn_url: str
homepage: Optional[str]
size: int
stargazers_count: int
watchers_count: int
language: str
has_issues: bool
has_projects: bool
has_downloads: bool
has_wiki: bool
has_pages: bool
forks_count: int
mirror_url: None
archived: bool
disabled: bool
open_issues_count: int
license: Optional[License]
allow_forking: bool
is_template: bool
web_commit_signoff_required: bool
topics: List[str]
visibility: str
forks: int
open_issues: int
watchers: int
default_branch: str
permissions: Permissions
temp_clone_token: str
organization: NotRequired[Owner]
network_count: int
subscribers_count: int
class ModelInformation(TypedDict):
title: NotRequired[str]
slug: str
branch: NotRequired[str]
data: GithubRepositoryInformation
owner: Owner
stars: int
forks: int
owner_name: str
owner_slug: str
s: int
e: int
i: int
a: int
t: int
l: int
tech: List[str]
Dataset = dict[str, ModelInformation]
def open_dataset() -> Dataset:
with open(dataset_info, 'r') as f:
return json.load(f)
def save_dataset(dataset: dict[str, Any]):
def save_dataset(dataset: Dataset):
with open(dataset_info, 'w') as f:
json.dump(dataset, f, indent=4)
@ -64,7 +221,7 @@ class Artifact(TypedDict):
class SecurityRule(TypedDict):
status: str
argument: str
artifacts: None | list[Artifact]
artifacts: NotRequired[list[Artifact]]
rule_schema = yaml.safe_load("""type: object
additionalProperties: no
@ -105,10 +262,9 @@ def check_security_rules(security_rules: dict[Any, Any] | None) -> dict[int, Sec
if rule["status"] == "unknown":
warning(f"Rule {n} is still unknown!")
except jsonschema.ValidationError as e:
error("Security rule {n}: {msg} at $.{n}.{path}".format(n=n, msg=e.message, path=e.json_path))
warning("Not checking further rules!")
break
return security_rules
raise Exception("Security rule {n}: {msg} at $.{n}.{path}".format(n=n, msg=e.message, path=e.json_path)) from e
return dict(sorted(security_rules.items()))
update_dataset = False
@ -118,11 +274,81 @@ def get_name(slug: str):
def get_tag_slug(tag: str) -> str:
return tag.lower().replace(' ', '_')
def write_model_readmes(dataset: dict[str, Any]):
rule_names = {
1: "API Gateway",
2: "Mutual Authentication",
3: "Decoupled Authentication",
4: "Internal Identity Represenation",
5: "Authentication Token Validation",
6: "Login Rate Limiting",
7: "Edge Encryption",
8: "Internal Encryption",
9: "Central Logging Subsystem",
10: "Local Logging Agent",
11: "Log Sanitization",
12: "Log Message Broker",
13: "Circuit Breaker",
14: "Load Balancing",
15: "Service Mesh Usage Limits",
16: "Service Registry Deployment",
17: "Service Registry Validation",
18: "Secret Manager",
}
def artifact_to_string(info: ModelInformation, artifact: Artifact):
file = Path(artifact['file'])
filename = file.name
file_url = f"https://github.com/{info['slug']}/blob/{info.get('branch', 'master')}/{artifact['file']}"
return f"- {filename}: Line{'s'[:len(artifact['lines'])^1]}: {', '.join(f'[{line}]({file_url}#L{line})' for line in artifact['lines'])}"
def rule_to_string(info: ModelInformation, id: int, rule: SecurityRule | None):
if rule is None:
# warning(f"Rule {id} is missing!") # TODO Enable warning
return ""
text = f"""#### Rule {id}: {rule_names[id]}
This rule is {rule['status']}: {rule['argument']}"""
artifacts = rule.get("artifacts", [])
if len(artifacts) > 0:
text = text + f"""
Artifacts:
{chr(10).join(artifact_to_string(info, artifact) for artifact in artifacts)}"""
return text
def write_security_rules(info: ModelInformation, security_rules: dict[int, SecurityRule]):
return f"""## Security Rules
### Authentication / Authorization
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(1, 7))}
### Encryption
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(7, 9))}
### Logging
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(9, 13))}
### Availability
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(13, 16))}
### Service Registry
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(16, 18))}
### Secret Management
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(18, 19))}"""
def write_model_readmes(dataset: Dataset):
for model_id, info in dataset.items():
dir = output_path / 'dataset'
readme = dir / f'{model_id}.md'
slug: str = info['slug']
slug = info['slug']
data = info.get('data')
if not data:
data = get_repo(slug)
@ -144,9 +370,13 @@ def write_model_readmes(dataset: dict[str, Any]):
info['forks'] = forks
info['owner_name'] = owner_name
info['owner_slug'] = owner_slug
security_rules_file = dataset_path / model_id / 'security_rules.yaml'
model_path = dataset_path / model_id
security_rules_file = model_path / 'security_rules.yaml'
model_file = model_path / f"{model_id}.py"
with model_file.open("r") as f:
model = f.read()
try:
with open(security_rules_file, 'r') as f:
with security_rules_file.open('r') as f:
security_rules = yaml.safe_load(f)
security_rules = check_security_rules(security_rules)
except FileNotFoundError:
@ -157,13 +387,14 @@ def write_model_readmes(dataset: dict[str, Any]):
security_rules = {}
print(f"Writing readme file {readme}")
dir.mkdir(exist_ok=True)
with open(readme, 'w', encoding="utf-8") as f:
with readme.open('w', encoding="utf-8") as f:
f.write(f"""---
title: {slug}
keywords: model TODO
tags: [{', '.join(get_tag_slug(tech) for tech in info['tech'])}]
sidebar: datasetdoc_sidebar
permalink: {model_id}.html
toc: false
---
## Repository Information
@ -184,6 +415,10 @@ The repository has {plural(stars, 'star')} and was forked {plural(forks, 'time')
The images below were generated by executing the model file. The DFD is represented as a CodeableModels file.
```python
{model}
```
### Statistics
The Application consists of a total of {plural(info['t'], 'element')}:
@ -204,9 +439,11 @@ Formats:
- [PNG Raster Image](../../dataset/{model_id}/{model_id}/{model_id}.png)
![Data Flow Diagram](../../dataset/{model_id}/{model_id}/{model_id}.svg)
{write_security_rules(info, security_rules)}
""")
def write_root_readme(dataset: dict[str, Any]):
def write_root_readme(dataset: Dataset):
print(f"Writing main readme file")
with open('index.md', 'w', encoding="utf-8") as f:
f.write(f"""---
@ -245,6 +482,7 @@ tags: []
sidebar: datasetdoc_sidebar
permalink: dataset.html
summary: Dataset of dataflow diagrams of microservice applications.
datatable: true
---
# Dataset of Dataflow Diagrams
@ -262,7 +500,7 @@ Name | Source | LoC | Stars | Forks | DFD Items | Technologies
<div class="datatable-end"></div>
""")
def write_tag_readme(dataset: dict[str, Any]):
def write_tag_readme(dataset: Dataset):
tag_dir = output_path / 'tags'
known_tech = set(tech for model in dataset.values() for tech in model['tech'])
print(f"Writing tag data file")