224 lines
7.4 KiB
Python
224 lines
7.4 KiB
Python
from pathlib import Path
|
|
import json
|
|
import yaml
|
|
import jsonschema
|
|
from typing import Any
|
|
import requests
|
|
try:
|
|
from yachalk import chalk
|
|
yachalk_imported = True
|
|
except ModuleNotFoundError:
|
|
yachalk_imported = False
|
|
|
|
dataset_path = Path('dataset')
|
|
dataset_info = dataset_path / 'dataset.json'
|
|
token = "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"
|
|
|
|
def error(msg: str):
|
|
if yachalk_imported:
|
|
msg = chalk.red(msg)
|
|
else:
|
|
msg = "Error: {}".format(msg)
|
|
print(msg)
|
|
|
|
def warning(msg: str):
|
|
if yachalk_imported:
|
|
msg = chalk.yellow(msg)
|
|
else:
|
|
msg = "Warning: {}".format(msg)
|
|
print(msg)
|
|
|
|
def open_dataset() -> dict[str, Any]:
|
|
with open(dataset_info, 'r') as f:
|
|
return json.load(f)
|
|
|
|
def save_dataset(dataset: dict[str, Any]):
|
|
with open(dataset_info, 'w') as f:
|
|
json.dump(dataset, f, indent=4)
|
|
|
|
def get_json(uri: str):
|
|
print(uri)
|
|
resp = requests.get(url=uri, headers={"Authorization": f"Bearer {token}"})
|
|
print(resp)
|
|
if not resp.ok:
|
|
try:
|
|
resp_error = resp.json()['message']
|
|
except Exception:
|
|
resp_error = resp.text
|
|
raise Exception(f"Invalid response: {resp_error}")
|
|
return resp.json()
|
|
|
|
def get_repo(slug: str):
|
|
return get_json(f"https://api.github.com/repos/{slug}")
|
|
|
|
def get_user(name: str):
|
|
return get_json(f"https://api.github.com/users/{name}")
|
|
|
|
def get_file(slug: str, path: str):
|
|
return get_json(f"https://api.github.com/repos/{slug}/contents/{path}")
|
|
|
|
def plural(amount: int, name: str, plural: str = 's'):
|
|
return f"{amount} {name}{plural[:amount^1]}"
|
|
|
|
from typing import NamedTuple
|
|
|
|
class Artifact(NamedTuple):
|
|
file: str
|
|
lines: list[int]
|
|
|
|
class SecurityRule(NamedTuple):
|
|
status: str
|
|
argument: str
|
|
artifacts: None | list[Artifact]
|
|
|
|
rule_schema = yaml.safe_load("""type: object
|
|
additionalProperties: no
|
|
required:
|
|
- status
|
|
- argument
|
|
properties:
|
|
status:
|
|
type: string
|
|
enum:
|
|
- disregarded
|
|
- not applicable
|
|
- unknown
|
|
argument:
|
|
type: string
|
|
artifacts:
|
|
type: array
|
|
items:
|
|
type: object
|
|
properties:
|
|
file:
|
|
type: string
|
|
lines:
|
|
type: array
|
|
items:
|
|
type: integer""")
|
|
|
|
def check_security_rules(security_rules: dict[Any, Any]) -> dict[int, SecurityRule]:
|
|
for n in range(1, 19):
|
|
try:
|
|
rule = security_rules.get(n, None)
|
|
if rule is None: raise Exception('No result for rule {}'.format(n))
|
|
jsonschema.validate(rule, rule_schema)
|
|
except jsonschema.ValidationError as e:
|
|
error("Security rule {n}: {msg} at $.{n}.{path}".format(n=n, msg=e.message, path=e.json_path))
|
|
warning("Not checking further rules!")
|
|
break
|
|
return security_rules
|
|
|
|
update_dataset = False
|
|
|
|
def get_name(slug: str):
|
|
return slug[slug.find('/')+1:]
|
|
|
|
def write_model_readmes(dataset: dict[str, Any]):
|
|
for model_id, info in dataset.items():
|
|
dir = dataset_path / model_id
|
|
readme = dir / 'README.md'
|
|
slug: str = info['slug']
|
|
data = info.get('data')
|
|
if not data:
|
|
data = get_repo(slug)
|
|
info['data'] = data
|
|
owner_url = data.get('owner', {}).get('url')
|
|
if not owner_url:
|
|
raise Exception(f'No owner in repo {slug}!')
|
|
owner = info.get('owner')
|
|
if not owner:
|
|
owner = get_json(owner_url)
|
|
info['owner'] = owner
|
|
owner_name = owner.get('name')
|
|
if not owner_name:
|
|
raise Exception(f'No owner name in repo {slug}!')
|
|
stars = data['stargazers_count']
|
|
forks = data['forks']
|
|
owner_slug = owner['login']
|
|
info['stars'] = stars
|
|
info['forks'] = forks
|
|
info['owner_name'] = owner_name
|
|
info['owner_slug'] = owner_slug
|
|
security_rules_file = dir / 'security_rules.yaml'
|
|
try:
|
|
with open(security_rules_file, 'r') as f:
|
|
security_rules = yaml.safe_load(f)
|
|
security_rules = check_security_rules(security_rules)
|
|
except FileNotFoundError:
|
|
warning("Security rules file not found at {}".format(security_rules_file))
|
|
security_rules = {}
|
|
print(f"Writing readme file {readme}")
|
|
with open(readme, 'w', encoding="utf-8") as f:
|
|
f.write(f"""# {slug}
|
|
|
|
## Repository Information
|
|
|
|
Repository: [GitHub](https://github.com/{slug})
|
|
|
|
Owner: [{owner_name}](https://github.com/{owner_slug})
|
|
|
|
The repository has {plural(stars, 'star')} and was forked {plural(forks, 'time')}. The codebase consists of {plural(info['l'], 'line')} of code and makes use of the following technologies:
|
|
|
|
{chr(10).join(f'- {tech}' for tech in info['tech'])}
|
|
|
|
## Data Flow Diagram
|
|
|
|
### Statistics
|
|
|
|
The Application consists of a total of {plural(info['t'], 'element')}:
|
|
|
|
Element | Count
|
|
-- | --
|
|
Services | {info['s']}
|
|
External Entities | {info['e']}
|
|
Information Flows | {info['i']}
|
|
Annotations | {info['a']}
|
|
Total Items | {info['t']}
|
|
|
|
### Diagram
|
|
|
|
The below diagram is generated from the corresponding [model file]({model_id}.py).
|
|
|
|
Formats:
|
|
- [PlantUML Model]({model_id}/{model_id}.txt)
|
|
- [SVG Vector Image]({model_id}/{model_id}.svg)
|
|
- [PNG Raster Image]({model_id}/{model_id}.png)
|
|
|
|
![Data Flow Diagram]({model_id}/{model_id}.svg)""")
|
|
|
|
def write_root_readme(dataset: dict[str, Any]):
|
|
print(f"Writing main readme file")
|
|
with open('README.md', 'w', encoding="utf-8") as f:
|
|
f.write(f"""# Dataset of Dataflow Diagrams
|
|
|
|
This repository contains of 17 manually created dataflow diagrams (DFDs) of microservice applications found on GitHub. The dataset is published as an additional contribution to "Automatic Extraction of Security-Rich Dataflow Diagrams for Microservice Applications written in Java" [Simon Schneider, Riccardo Scandariato]. Each folder in the [`dataset`](dataset/) directory contains one DFD in a [CodeableModels](https://github.com/uzdun/CodeableModels)-format that can be executed to generate PNG, SVG and TXT files for the DFD. Each model refers to stereotypes and metaclasses from the [metamodel](microservice_dfds_metamodel.py) which needs to be imported. This repository already contains rendered versions for each model, thus setup and rendering is only necessary once changes to the models are made.
|
|
|
|
## Models
|
|
|
|
<div class="datatable-begin"></div>
|
|
|
|
Name | Source | LoC | Stars | Forks | DFD Items | Technologies
|
|
-- | -- | -- | -- | -- | -- | --
|
|
{chr(10).join(f"[{info['slug']}](dataset/{model_id}/README.md) | [GitHub](https://github.com/{info['slug']}) | {info['l']} | {info['stars']} | {info['forks']} | {info['t']} | {len(info['tech'])}" for model_id, info in dataset.items())}
|
|
|
|
<div class="datatable-end"></div>
|
|
|
|
## DFD Items
|
|
|
|
Do culpa deserunt est excepteur amet. Non pariatur ea elit ad eiusmod veniam exercitation nulla. Commodo do adipisicing amet et. Voluptate laboris commodo dolor eu mollit ipsum. Amet reprehenderit velit eu culpa amet exercitation. Elit esse ullamco duis mollit quis. Eiusmod qui reprehenderit sunt cupidatat Lorem anim occaecat enim sint eiusmod tempor.
|
|
|
|
## Use-Cases
|
|
|
|
Veniam culpa nostrud id laborum deserunt consectetur consectetur voluptate. Sint aute cupidatat velit irure elit laboris anim labore esse labore. Quis ullamco ut consequat amet. Enim sit laboris deserunt veniam duis aliqua irure proident.
|
|
""")
|
|
|
|
def main():
|
|
dataset = open_dataset()
|
|
write_root_readme(dataset)
|
|
write_model_readmes(dataset)
|
|
save_dataset(dataset)
|
|
|
|
if __name__ == '__main__':
|
|
main()
|