569 lines
16 KiB
Python
569 lines
16 KiB
Python
from io import StringIO
|
|
from pathlib import Path
|
|
import json
|
|
import itertools
|
|
import yaml
|
|
import jsonschema
|
|
from typing import Any, List, NotRequired, Optional, TypedDict
|
|
import requests
|
|
try:
|
|
from yachalk import chalk
|
|
yachalk_imported = True
|
|
except ModuleNotFoundError:
|
|
yachalk_imported = False
|
|
|
|
dataset_path = Path('dataset')
|
|
output_path = Path('pages')
|
|
dataset_info = dataset_path / Path('dataset.json')
|
|
token = "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"
|
|
|
|
def error(msg: str) -> Exception:
|
|
print(chalk.red(msg) if yachalk_imported else "Error: {}".format(msg))
|
|
return Exception(msg)
|
|
|
|
def warning(msg: str):
|
|
print(chalk.yellow(msg) if yachalk_imported else "Warning: {}".format(msg))
|
|
|
|
class License(TypedDict):
|
|
key: str
|
|
name: str
|
|
spdx_id: str
|
|
url: str
|
|
node_id: str
|
|
|
|
|
|
class Permissions(TypedDict):
|
|
admin: bool
|
|
maintain: bool
|
|
push: bool
|
|
triage: bool
|
|
pull: bool
|
|
|
|
|
|
class Owner(TypedDict):
|
|
login: str
|
|
id: int
|
|
node_id: str
|
|
avatar_url: str
|
|
gravatar_id: str
|
|
url: str
|
|
html_url: str
|
|
followers_url: str
|
|
following_url: str
|
|
gists_url: str
|
|
starred_url: str
|
|
subscriptions_url: str
|
|
organizations_url: str
|
|
repos_url: str
|
|
events_url: str
|
|
received_events_url: str
|
|
type: str
|
|
site_admin: bool
|
|
name: NotRequired[str]
|
|
company: NotRequired[Optional[str]]
|
|
blog: NotRequired[str]
|
|
location: NotRequired[Optional[str]]
|
|
email: NotRequired[Optional[str]]
|
|
hireable: NotRequired[Optional[bool]]
|
|
bio: NotRequired[Optional[str]]
|
|
twitter_username: NotRequired[Optional[str]]
|
|
public_repos: NotRequired[int]
|
|
public_gists: NotRequired[int]
|
|
followers: NotRequired[int]
|
|
following: NotRequired[int]
|
|
created_at: NotRequired[str]
|
|
updated_at: NotRequired[str]
|
|
|
|
|
|
class GithubRepositoryInformation(TypedDict):
|
|
id: int
|
|
node_id: str
|
|
name: str
|
|
full_name: str
|
|
private: bool
|
|
owner: Owner
|
|
html_url: str
|
|
description: Optional[str]
|
|
fork: bool
|
|
url: str
|
|
forks_url: str
|
|
keys_url: str
|
|
collaborators_url: str
|
|
teams_url: str
|
|
hooks_url: str
|
|
issue_events_url: str
|
|
events_url: str
|
|
assignees_url: str
|
|
branches_url: str
|
|
tags_url: str
|
|
blobs_url: str
|
|
git_tags_url: str
|
|
git_refs_url: str
|
|
trees_url: str
|
|
statuses_url: str
|
|
languages_url: str
|
|
stargazers_url: str
|
|
contributors_url: str
|
|
subscribers_url: str
|
|
subscription_url: str
|
|
commits_url: str
|
|
git_commits_url: str
|
|
comments_url: str
|
|
issue_comment_url: str
|
|
contents_url: str
|
|
compare_url: str
|
|
merges_url: str
|
|
archive_url: str
|
|
downloads_url: str
|
|
issues_url: str
|
|
pulls_url: str
|
|
milestones_url: str
|
|
notifications_url: str
|
|
labels_url: str
|
|
releases_url: str
|
|
deployments_url: str
|
|
created_at: str
|
|
updated_at: str
|
|
pushed_at: str
|
|
git_url: str
|
|
ssh_url: str
|
|
clone_url: str
|
|
svn_url: str
|
|
homepage: Optional[str]
|
|
size: int
|
|
stargazers_count: int
|
|
watchers_count: int
|
|
language: str
|
|
has_issues: bool
|
|
has_projects: bool
|
|
has_downloads: bool
|
|
has_wiki: bool
|
|
has_pages: bool
|
|
forks_count: int
|
|
mirror_url: None
|
|
archived: bool
|
|
disabled: bool
|
|
open_issues_count: int
|
|
license: Optional[License]
|
|
allow_forking: bool
|
|
is_template: bool
|
|
web_commit_signoff_required: bool
|
|
topics: List[str]
|
|
visibility: str
|
|
forks: int
|
|
open_issues: int
|
|
watchers: int
|
|
default_branch: str
|
|
permissions: Permissions
|
|
temp_clone_token: str
|
|
organization: NotRequired[Owner]
|
|
network_count: int
|
|
subscribers_count: int
|
|
|
|
|
|
class ModelInformation(TypedDict):
|
|
title: NotRequired[str]
|
|
slug: str
|
|
branch: NotRequired[str]
|
|
data: GithubRepositoryInformation
|
|
owner: Owner
|
|
stars: int
|
|
forks: int
|
|
owner_name: str
|
|
owner_slug: str
|
|
s: int
|
|
e: int
|
|
i: int
|
|
a: int
|
|
t: int
|
|
l: int
|
|
tech: List[str]
|
|
|
|
Dataset = dict[str, ModelInformation]
|
|
|
|
def open_dataset() -> Dataset:
|
|
with open(dataset_info, 'r') as f:
|
|
return json.load(f)
|
|
|
|
def save_dataset(dataset: Dataset):
|
|
with open(dataset_info, 'w') as f:
|
|
json.dump(dataset, f, indent=4)
|
|
|
|
def get_json(uri: str):
|
|
print(uri)
|
|
resp = requests.get(url=uri, headers={"Authorization": f"Bearer {token}"})
|
|
print(resp)
|
|
if not resp.ok:
|
|
try:
|
|
resp_error = resp.json()['message']
|
|
except Exception:
|
|
resp_error = resp.text
|
|
raise Exception(f"Invalid response: {resp_error}")
|
|
return resp.json()
|
|
|
|
def get_repo(slug: str):
|
|
return get_json(f"https://api.github.com/repos/{slug}")
|
|
|
|
def get_user(name: str):
|
|
return get_json(f"https://api.github.com/users/{name}")
|
|
|
|
def get_file(slug: str, path: str):
|
|
return get_json(f"https://api.github.com/repos/{slug}/contents/{path}")
|
|
|
|
def plural(amount: int, name: str, plural: str = 's'):
|
|
return f"{amount} {name}{plural[:amount^1]}"
|
|
|
|
from typing import TypedDict
|
|
|
|
class Artifact(TypedDict):
|
|
file: str
|
|
lines: NotRequired[list[int]]
|
|
|
|
class SecurityRule(TypedDict):
|
|
status: str
|
|
argument: str | list[str]
|
|
artifacts: NotRequired[list[Artifact]]
|
|
|
|
rule_schema = yaml.safe_load("""type: object
|
|
additionalProperties: no
|
|
required:
|
|
- status
|
|
- argument
|
|
properties:
|
|
status:
|
|
type: string
|
|
enum:
|
|
- disregarded
|
|
- observed
|
|
- not applicable
|
|
- unknown
|
|
argument:
|
|
anyOf:
|
|
- type: string
|
|
- type: array
|
|
items:
|
|
type: string
|
|
artifacts:
|
|
type: array
|
|
items:
|
|
additionalProperties: no
|
|
required:
|
|
- file
|
|
type: object
|
|
properties:
|
|
file:
|
|
type: string
|
|
lines:
|
|
type: array
|
|
items:
|
|
type: integer""")
|
|
|
|
def check_security_rules(security_rules: dict[Any, Any] | None) -> dict[int, SecurityRule]:
|
|
if security_rules is None:
|
|
raise Exception("Security rules file is empty!")
|
|
for n in range(1, 19):
|
|
try:
|
|
rule = security_rules.get(n, None)
|
|
if rule is None: raise jsonschema.ValidationError(f"Rule {n} is not evaluated")
|
|
jsonschema.validate(rule, rule_schema)
|
|
rule: SecurityRule
|
|
if rule["status"] == "unknown":
|
|
warning(f"Rule {n} is still unknown!")
|
|
except jsonschema.ValidationError as e:
|
|
warning("Not checking further rules!")
|
|
raise Exception("Security rule {n}: {msg} at $.{n}.{path}".format(n=n, msg=e.message, path=e.json_path)) from e
|
|
return dict(sorted(security_rules.items()))
|
|
|
|
update_dataset = False
|
|
|
|
def get_name(slug: str):
|
|
return slug[slug.find('/')+1:]
|
|
|
|
def get_tag_slug(tag: str) -> str:
|
|
return tag.lower().replace(' ', '_')
|
|
|
|
rule_names = {
|
|
1: "API Gateway",
|
|
2: "Mutual Authentication",
|
|
3: "Decoupled Authentication",
|
|
4: "Internal Identity Represenation",
|
|
5: "Authentication Token Validation",
|
|
6: "Login Rate Limiting",
|
|
7: "Edge Encryption",
|
|
8: "Internal Encryption",
|
|
9: "Central Logging Subsystem",
|
|
10: "Local Logging Agent",
|
|
11: "Log Sanitization",
|
|
12: "Log Message Broker",
|
|
13: "Circuit Breaker",
|
|
14: "Load Balancing",
|
|
15: "Service Mesh Usage Limits",
|
|
16: "Service Registry Deployment",
|
|
17: "Service Registry Validation",
|
|
18: "Secret Manager",
|
|
}
|
|
|
|
def artifact_to_string(info: ModelInformation, artifact: Artifact):
|
|
file = Path(artifact['file'])
|
|
filename = file.name
|
|
file_url = f"https://github.com/{info['slug']}/blob/{info.get('branch', 'master')}/{artifact['file']}"
|
|
lines = artifact.get("lines")
|
|
if lines is None:
|
|
return f"- {filename}: [File]({file_url})"
|
|
return f"- {filename}: Line{'s'[:len(lines)^1]}: {', '.join(f'[{line}]({file_url}#L{line})' for line in lines)}"
|
|
|
|
|
|
def rule_to_string(info: ModelInformation, id: int, rule: SecurityRule | None):
|
|
if rule is None:
|
|
warning(f"Rule {id} is missing!")
|
|
return ""
|
|
argument = rule['argument']
|
|
argument = argument if isinstance(argument, str) else "".join(f"\n1. {arg}" for arg in argument)
|
|
text = f"""#### Rule {id}: {rule_names[id]}
|
|
|
|
This rule is {rule['status']}: {argument}"""
|
|
artifacts = rule.get("artifacts", [])
|
|
if len(artifacts) > 0:
|
|
text = text + f"""
|
|
|
|
Artifacts:
|
|
{chr(10).join(artifact_to_string(info, artifact) for artifact in artifacts)}"""
|
|
return text
|
|
|
|
def write_security_rules(info: ModelInformation, security_rules: dict[int, SecurityRule]):
|
|
return f"""## Security Rules
|
|
|
|
### Authentication / Authorization
|
|
|
|
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(1, 7))}
|
|
|
|
### Encryption
|
|
|
|
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(7, 9))}
|
|
|
|
### Logging
|
|
|
|
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(9, 13))}
|
|
|
|
### Availability
|
|
|
|
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(13, 16))}
|
|
|
|
### Service Registry
|
|
|
|
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(16, 18))}
|
|
|
|
### Secret Management
|
|
|
|
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(18, 19))}"""
|
|
|
|
def write_file_if_changed(file: Path, content: str, encoding: str = "utf-8"):
|
|
old_content = None
|
|
if file.exists():
|
|
with file.open('r', encoding=encoding) as f:
|
|
old_content = f.read()
|
|
if old_content is None or old_content != content:
|
|
print(f"Writing changed file: {file}")
|
|
with file.open('w', encoding=encoding) as f:
|
|
f.write(content)
|
|
|
|
def write_model_readmes(dataset: Dataset):
|
|
for model_id, info in dataset.items():
|
|
dir = output_path / 'dataset'
|
|
readme = dir / f'{model_id}.md'
|
|
slug = info['slug']
|
|
data = info.get('data')
|
|
if not data:
|
|
data = get_repo(slug)
|
|
info['data'] = data
|
|
owner_url = data.get('owner', {}).get('url')
|
|
if not owner_url:
|
|
raise Exception(f'No owner in repo {slug}!')
|
|
owner = info.get('owner')
|
|
if not owner:
|
|
owner = get_json(owner_url)
|
|
info['owner'] = owner
|
|
owner_name = owner.get('name')
|
|
if not owner_name:
|
|
raise Exception(f'No owner name in repo {slug}!')
|
|
stars = data['stargazers_count']
|
|
forks = data['forks']
|
|
owner_slug = owner['login']
|
|
info['stars'] = stars
|
|
info['forks'] = forks
|
|
info['owner_name'] = owner_name
|
|
info['owner_slug'] = owner_slug
|
|
model_path = dataset_path / model_id
|
|
security_rules_file = model_path / 'security_rules.yaml'
|
|
model_file = model_path / f"{model_id}.py"
|
|
with model_file.open("r") as f:
|
|
model = f.read()
|
|
security_rules = None
|
|
try:
|
|
with security_rules_file.open('r') as f:
|
|
security_rules = check_security_rules(yaml.safe_load(f))
|
|
except FileNotFoundError:
|
|
warning("Security rules file not found at {}".format(security_rules_file))
|
|
except Exception as e:
|
|
warning("Security rules file at {} is invalid: {}".format(security_rules_file, e))
|
|
dir.mkdir(exist_ok=True)
|
|
write_file_if_changed(readme, f"""---
|
|
title: {slug}
|
|
keywords: model TODO
|
|
tags: [{', '.join(get_tag_slug(tech) for tech in info['tech'])}]
|
|
sidebar: datasetdoc_sidebar
|
|
permalink: {model_id}.html
|
|
toc: false
|
|
---
|
|
|
|
## Repository Information
|
|
|
|
Repository: [GitHub](https://github.com/{slug})
|
|
|
|
Owner: [{owner_name}](https://github.com/{owner_slug})
|
|
|
|
The repository has {plural(stars, 'star')} and was forked {plural(forks, 'time')}. The codebase consists of {plural(info['l'], 'line')} of code and makes use of the following technologies:
|
|
|
|
{chr(10).join(f'<a class="btn btn-primary" style="margin-bottom: 5px" role="button" href="tag_{get_tag_slug(tech)}.html">{tech}</a>' for tech in info['tech'])}
|
|
|
|
## Data Flow Diagram
|
|
|
|
### DFD Model
|
|
|
|
{{% include note.html content="Download the [model file](../../dataset/{model_id}/{model_id}.py)" %}}
|
|
|
|
The images below were generated by executing the model file. The DFD is represented as a CodeableModels file.
|
|
|
|
```python
|
|
{model}
|
|
```
|
|
|
|
### Statistics
|
|
|
|
The Application consists of a total of {plural(info['t'], 'element')}:
|
|
|
|
Element | Count
|
|
-- | --
|
|
Services | {info['s']}
|
|
External Entities | {info['e']}
|
|
Information Flows | {info['i']}
|
|
Annotations | {info['a']}
|
|
Total Items | {info['t']}
|
|
|
|
### Diagram
|
|
|
|
Formats:
|
|
- [PlantUML Model](../../dataset/{model_id}/{model_id}/{model_id}.txt)
|
|
- [SVG Vector Image](../../dataset/{model_id}/{model_id}/{model_id}.svg)
|
|
- [PNG Raster Image](../../dataset/{model_id}/{model_id}/{model_id}.png)
|
|
|
|
![Data Flow Diagram](../../dataset/{model_id}/{model_id}/{model_id}.svg)
|
|
|
|
{"" if security_rules is None else write_security_rules(info, security_rules)}
|
|
""")
|
|
|
|
def write_root_readme(dataset: Dataset):
|
|
overview_dir = output_path / 'overview'
|
|
index_file = Path('index.md')
|
|
|
|
write_file_if_changed(index_file, f"""---
|
|
title: code2DFD Documentation
|
|
keywords: code2DFD introduction
|
|
tags: [overview]
|
|
sidebar: datasetdoc_sidebar
|
|
permalink: index.html
|
|
summary: Dataset of dataflow diagrams of microservice applications.
|
|
toc: false
|
|
---
|
|
|
|
## DaFD
|
|
|
|
{{% include image.html file="TUHH_logo-wortmarke_en_rgb.svg" alt="TUHH Logo" max-width="350" %}}
|
|
{{% include image.html file="company_logo_big.png" alt="SoftSec Institute Logo" max-width="350" %}}
|
|
|
|
This is DaFD, a dataset containing Dataflow Diagrams (DFDs) of microservices written in Java. The models correspond to actual implementation code of open-source applications found on GitHub.
|
|
The DFDs are presented in multiple formats and contain full traceability of all model items to code, indicating the evidence for their implementation. Additionally to the models themselves, we present a mapping to a list of 17 architectural security best-practices, i.e. a table indicating whether each rules is followed or not. For those that are not followed, we created model variants that do follow the rule. These variants were crafted purely on the model-level and the added items do not correspond to code anymore. All artifacts were created manually by researchers of the Institute of Software Security at Hamburg University of Technology.
|
|
|
|
## Table of Contents
|
|
|
|
- [Overview](index.html)
|
|
- [Dataflow Diagrams](dfds.html)
|
|
- [Use-Cases](usecases.html)
|
|
- [Models](models.html)
|
|
""")
|
|
|
|
models_file = overview_dir / 'models.md'
|
|
write_file_if_changed(models_file, f"""---
|
|
title: Models
|
|
keywords: dataset models
|
|
tags: [overview]
|
|
sidebar: datasetdoc_sidebar
|
|
permalink: models.html
|
|
summary: Dataset of dataflow diagrams of microservice applications.
|
|
datatable: true
|
|
---
|
|
|
|
The following table presents the models in this dataset. It shows some properties about their popularity and size of the models. Column `Source` links directly to the corresponding repository on GitHub. If you click on the name of an entry, you will be referred to the model and all artifacts.
|
|
|
|
Please select a model in column `Name`
|
|
|
|
<div class="datatable-begin"></div>
|
|
|
|
Name | Source | LoC | Stars | Forks | DFD Items | Technologies
|
|
-- | -- | -- | -- | -- | -- | --
|
|
{chr(10).join(f"[{info['slug']}]({model_id}.html) | [GitHub](https://github.com/{info['slug']}) | {info['l']} | {info['stars']} | {info['forks']} | {info['t']} | {len(info['tech'])}" for model_id, info in dataset.items())}
|
|
|
|
<div class="datatable-end"></div>
|
|
""")
|
|
|
|
def write_tag_readme(dataset: Dataset):
|
|
tag_dir = output_path / 'tags'
|
|
known_tech = set(tech for model in dataset.values() for tech in model['tech'])
|
|
|
|
tags_data_path = Path('_data')
|
|
tags_data_file = tags_data_path / 'tags.yml'
|
|
if tags_data_file.exists():
|
|
tags_data_path.mkdir(exist_ok=True, parents=True)
|
|
with tags_data_file.open('r') as f:
|
|
tags: dict[Any, Any] = yaml.safe_load(f)
|
|
else:
|
|
tags = {}
|
|
|
|
tags['allowed-tags'] = list(sorted(set(itertools.chain(tags.get('allowed-tags', []), (get_tag_slug(tech) for tech in known_tech)))))
|
|
|
|
with StringIO() as f:
|
|
yaml.dump(tags, f)
|
|
tags_content = f.getvalue()
|
|
write_file_if_changed(tags_data_file, tags_content)
|
|
|
|
for tech in known_tech:
|
|
slug = get_tag_slug(tech)
|
|
info_file = tag_dir / f'tag_{slug}.md'
|
|
tag_dir.mkdir(exist_ok=True, parents=True)
|
|
write_file_if_changed(info_file, f"""---
|
|
title: "{tech}"
|
|
tagName: {slug}
|
|
search: exclude
|
|
permalink: tag_{slug}.html
|
|
sidebar: datasetdoc_sidebar
|
|
hide_sidebar: true
|
|
folder: tags
|
|
---
|
|
{{% include taglogic.html %}}
|
|
|
|
{{% include links.html %}}
|
|
""")
|
|
|
|
def main():
|
|
global known_tags
|
|
dataset = open_dataset()
|
|
write_tag_readme(dataset)
|
|
write_root_readme(dataset)
|
|
write_model_readmes(dataset)
|
|
if update_dataset:
|
|
save_dataset(dataset)
|
|
|
|
yaml.dump
|
|
if __name__ == '__main__':
|
|
main()
|