2022-11-22 16:26:31 +01:00
from pathlib import Path
import json
2022-11-22 16:30:07 +01:00
import itertools
2022-11-22 16:27:23 +01:00
import yaml
import jsonschema
2022-11-23 13:00:38 +01:00
from typing import Any , List , NotRequired , Optional , TypedDict
2022-11-22 16:26:31 +01:00
import requests
2022-11-22 16:27:23 +01:00
try :
from yachalk import chalk
yachalk_imported = True
except ModuleNotFoundError :
yachalk_imported = False
2022-11-22 16:26:31 +01:00
dataset_path = Path ( ' dataset ' )
2022-11-22 16:30:07 +01:00
output_path = Path ( ' pages ' )
dataset_info = dataset_path / Path ( ' dataset.json ' )
2022-11-22 16:26:31 +01:00
token = " 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 "
2022-11-22 16:30:07 +01:00
def error ( msg : str ) - > Exception :
print ( chalk . red ( msg ) if yachalk_imported else " Error: {} " . format ( msg ) )
return Exception ( msg )
2022-11-22 16:27:23 +01:00
def warning ( msg : str ) :
2022-11-22 16:30:07 +01:00
print ( chalk . yellow ( msg ) if yachalk_imported else " Warning: {} " . format ( msg ) )
2022-11-22 16:27:23 +01:00
2022-11-23 13:00:38 +01:00
class License ( TypedDict ) :
key : str
name : str
spdx_id : str
url : str
node_id : str
class Permissions ( TypedDict ) :
admin : bool
maintain : bool
push : bool
triage : bool
pull : bool
class Owner ( TypedDict ) :
login : str
id : int
node_id : str
avatar_url : str
gravatar_id : str
url : str
html_url : str
followers_url : str
following_url : str
gists_url : str
starred_url : str
subscriptions_url : str
organizations_url : str
repos_url : str
events_url : str
received_events_url : str
type : str
site_admin : bool
name : NotRequired [ str ]
company : NotRequired [ Optional [ str ] ]
blog : NotRequired [ str ]
location : NotRequired [ Optional [ str ] ]
email : NotRequired [ Optional [ str ] ]
hireable : NotRequired [ Optional [ bool ] ]
bio : NotRequired [ Optional [ str ] ]
twitter_username : NotRequired [ Optional [ str ] ]
public_repos : NotRequired [ int ]
public_gists : NotRequired [ int ]
followers : NotRequired [ int ]
following : NotRequired [ int ]
created_at : NotRequired [ str ]
updated_at : NotRequired [ str ]
class GithubRepositoryInformation ( TypedDict ) :
id : int
node_id : str
name : str
full_name : str
private : bool
owner : Owner
html_url : str
description : Optional [ str ]
fork : bool
url : str
forks_url : str
keys_url : str
collaborators_url : str
teams_url : str
hooks_url : str
issue_events_url : str
events_url : str
assignees_url : str
branches_url : str
tags_url : str
blobs_url : str
git_tags_url : str
git_refs_url : str
trees_url : str
statuses_url : str
languages_url : str
stargazers_url : str
contributors_url : str
subscribers_url : str
subscription_url : str
commits_url : str
git_commits_url : str
comments_url : str
issue_comment_url : str
contents_url : str
compare_url : str
merges_url : str
archive_url : str
downloads_url : str
issues_url : str
pulls_url : str
milestones_url : str
notifications_url : str
labels_url : str
releases_url : str
deployments_url : str
created_at : str
updated_at : str
pushed_at : str
git_url : str
ssh_url : str
clone_url : str
svn_url : str
homepage : Optional [ str ]
size : int
stargazers_count : int
watchers_count : int
language : str
has_issues : bool
has_projects : bool
has_downloads : bool
has_wiki : bool
has_pages : bool
forks_count : int
mirror_url : None
archived : bool
disabled : bool
open_issues_count : int
license : Optional [ License ]
allow_forking : bool
is_template : bool
web_commit_signoff_required : bool
topics : List [ str ]
visibility : str
forks : int
open_issues : int
watchers : int
default_branch : str
permissions : Permissions
temp_clone_token : str
organization : NotRequired [ Owner ]
network_count : int
subscribers_count : int
class ModelInformation ( TypedDict ) :
title : NotRequired [ str ]
slug : str
branch : NotRequired [ str ]
data : GithubRepositoryInformation
owner : Owner
stars : int
forks : int
owner_name : str
owner_slug : str
s : int
e : int
i : int
a : int
t : int
l : int
tech : List [ str ]
Dataset = dict [ str , ModelInformation ]
def open_dataset ( ) - > Dataset :
2022-11-22 16:26:31 +01:00
with open ( dataset_info , ' r ' ) as f :
return json . load ( f )
2022-11-23 13:00:38 +01:00
def save_dataset ( dataset : Dataset ) :
2022-11-22 16:26:31 +01:00
with open ( dataset_info , ' w ' ) as f :
json . dump ( dataset , f , indent = 4 )
def get_json ( uri : str ) :
print ( uri )
resp = requests . get ( url = uri , headers = { " Authorization " : f " Bearer { token } " } )
print ( resp )
if not resp . ok :
try :
2022-11-22 16:27:23 +01:00
resp_error = resp . json ( ) [ ' message ' ]
2022-11-22 16:26:31 +01:00
except Exception :
2022-11-22 16:27:23 +01:00
resp_error = resp . text
raise Exception ( f " Invalid response: { resp_error } " )
2022-11-22 16:26:31 +01:00
return resp . json ( )
def get_repo ( slug : str ) :
return get_json ( f " https://api.github.com/repos/ { slug } " )
def get_user ( name : str ) :
return get_json ( f " https://api.github.com/users/ { name } " )
def get_file ( slug : str , path : str ) :
return get_json ( f " https://api.github.com/repos/ { slug } /contents/ { path } " )
def plural ( amount : int , name : str , plural : str = ' s ' ) :
return f " { amount } { name } { plural [ : amount ^ 1 ] } "
2022-11-22 16:31:20 +01:00
from typing import TypedDict
2022-11-22 16:27:23 +01:00
2022-11-22 16:31:20 +01:00
class Artifact ( TypedDict ) :
2022-11-22 16:27:23 +01:00
file : str
lines : list [ int ]
2022-11-22 16:31:20 +01:00
class SecurityRule ( TypedDict ) :
2022-11-22 16:27:23 +01:00
status : str
argument : str
2022-11-23 13:00:38 +01:00
artifacts : NotRequired [ list [ Artifact ] ]
2022-11-22 16:27:23 +01:00
rule_schema = yaml . safe_load ( """ type: object
additionalProperties : no
required :
- status
- argument
properties :
status :
type : string
enum :
- disregarded
2022-11-22 16:30:07 +01:00
- observed
2022-11-22 16:27:23 +01:00
- not applicable
- unknown
argument :
type : string
artifacts :
type : array
items :
type : object
properties :
file :
type : string
lines :
type : array
items :
type : integer """ )
2022-11-22 16:30:44 +01:00
def check_security_rules ( security_rules : dict [ Any , Any ] | None ) - > dict [ int , SecurityRule ] :
if security_rules is None :
raise Exception ( " Security rules file is empty! " )
2022-11-22 16:27:23 +01:00
for n in range ( 1 , 19 ) :
try :
rule = security_rules . get ( n , None )
2022-11-22 16:31:20 +01:00
if rule is None : raise jsonschema . ValidationError ( f " Rule { n } is not evaluated " )
2022-11-22 16:27:23 +01:00
jsonschema . validate ( rule , rule_schema )
2022-11-22 16:31:20 +01:00
rule : SecurityRule
if rule [ " status " ] == " unknown " :
warning ( f " Rule { n } is still unknown! " )
2022-11-22 16:27:23 +01:00
except jsonschema . ValidationError as e :
warning ( " Not checking further rules! " )
2022-11-23 13:00:38 +01:00
raise Exception ( " Security rule {n} : {msg} at $. {n} . {path} " . format ( n = n , msg = e . message , path = e . json_path ) ) from e
return dict ( sorted ( security_rules . items ( ) ) )
2022-11-22 16:27:23 +01:00
2022-11-22 16:26:31 +01:00
update_dataset = False
def get_name ( slug : str ) :
return slug [ slug . find ( ' / ' ) + 1 : ]
2022-11-22 16:30:07 +01:00
def get_tag_slug ( tag : str ) - > str :
return tag . lower ( ) . replace ( ' ' , ' _ ' )
2022-11-23 13:00:38 +01:00
rule_names = {
1 : " API Gateway " ,
2 : " Mutual Authentication " ,
3 : " Decoupled Authentication " ,
4 : " Internal Identity Represenation " ,
5 : " Authentication Token Validation " ,
6 : " Login Rate Limiting " ,
7 : " Edge Encryption " ,
8 : " Internal Encryption " ,
9 : " Central Logging Subsystem " ,
10 : " Local Logging Agent " ,
11 : " Log Sanitization " ,
12 : " Log Message Broker " ,
13 : " Circuit Breaker " ,
14 : " Load Balancing " ,
15 : " Service Mesh Usage Limits " ,
16 : " Service Registry Deployment " ,
17 : " Service Registry Validation " ,
18 : " Secret Manager " ,
}
def artifact_to_string ( info : ModelInformation , artifact : Artifact ) :
file = Path ( artifact [ ' file ' ] )
filename = file . name
file_url = f " https://github.com/ { info [ ' slug ' ] } /blob/ { info . get ( ' branch ' , ' master ' ) } / { artifact [ ' file ' ] } "
return f " - { filename } : Line { ' s ' [ : len ( artifact [ ' lines ' ] ) ^ 1 ] } : { ' , ' . join ( f ' [ { line } ]( { file_url } #L { line } ) ' for line in artifact [ ' lines ' ] ) } "
def rule_to_string ( info : ModelInformation , id : int , rule : SecurityRule | None ) :
if rule is None :
# warning(f"Rule {id} is missing!") # TODO Enable warning
return " "
text = f """ #### Rule { id } : { rule_names [ id ] }
This rule is { rule [ ' status ' ] } : { rule [ ' argument ' ] } """
artifacts = rule . get ( " artifacts " , [ ] )
if len ( artifacts ) > 0 :
text = text + f """
Artifacts :
{ chr ( 10 ) . join ( artifact_to_string ( info , artifact ) for artifact in artifacts ) } """
return text
def write_security_rules ( info : ModelInformation , security_rules : dict [ int , SecurityRule ] ) :
return f """ ## Security Rules
### Authentication / Authorization
{ ( chr ( 10 ) * 2 ) . join ( rule_to_string ( info , i , security_rules . get ( i ) ) for i in range ( 1 , 7 ) ) }
### Encryption
{ ( chr ( 10 ) * 2 ) . join ( rule_to_string ( info , i , security_rules . get ( i ) ) for i in range ( 7 , 9 ) ) }
### Logging
{ ( chr ( 10 ) * 2 ) . join ( rule_to_string ( info , i , security_rules . get ( i ) ) for i in range ( 9 , 13 ) ) }
### Availability
{ ( chr ( 10 ) * 2 ) . join ( rule_to_string ( info , i , security_rules . get ( i ) ) for i in range ( 13 , 16 ) ) }
### Service Registry
{ ( chr ( 10 ) * 2 ) . join ( rule_to_string ( info , i , security_rules . get ( i ) ) for i in range ( 16 , 18 ) ) }
### Secret Management
{ ( chr ( 10 ) * 2 ) . join ( rule_to_string ( info , i , security_rules . get ( i ) ) for i in range ( 18 , 19 ) ) } """
def write_model_readmes ( dataset : Dataset ) :
2022-11-22 16:26:31 +01:00
for model_id , info in dataset . items ( ) :
2022-11-22 16:30:07 +01:00
dir = output_path / ' dataset '
readme = dir / f ' { model_id } .md '
2022-11-23 13:00:38 +01:00
slug = info [ ' slug ' ]
2022-11-22 16:26:31 +01:00
data = info . get ( ' data ' )
if not data :
data = get_repo ( slug )
info [ ' data ' ] = data
owner_url = data . get ( ' owner ' , { } ) . get ( ' url ' )
if not owner_url :
raise Exception ( f ' No owner in repo { slug } ! ' )
owner = info . get ( ' owner ' )
if not owner :
owner = get_json ( owner_url )
info [ ' owner ' ] = owner
owner_name = owner . get ( ' name ' )
if not owner_name :
raise Exception ( f ' No owner name in repo { slug } ! ' )
stars = data [ ' stargazers_count ' ]
forks = data [ ' forks ' ]
owner_slug = owner [ ' login ' ]
info [ ' stars ' ] = stars
info [ ' forks ' ] = forks
info [ ' owner_name ' ] = owner_name
info [ ' owner_slug ' ] = owner_slug
2022-11-23 13:07:21 +01:00
model_path = dataset_path / model_id
security_rules_file = model_path / ' security_rules.yaml '
model_file = model_path / f " { model_id } .py "
with model_file . open ( " r " ) as f :
model = f . read ( )
2022-11-22 16:27:23 +01:00
try :
2022-11-23 13:07:21 +01:00
with security_rules_file . open ( ' r ' ) as f :
2022-11-22 16:27:23 +01:00
security_rules = yaml . safe_load ( f )
security_rules = check_security_rules ( security_rules )
except FileNotFoundError :
warning ( " Security rules file not found at {} " . format ( security_rules_file ) )
security_rules = { }
2022-11-22 16:30:44 +01:00
except Exception as e :
warning ( " Security rules file at {} is invalid: {} " . format ( security_rules_file , e ) )
security_rules = { }
2022-11-22 16:26:31 +01:00
print ( f " Writing readme file { readme } " )
2022-11-22 16:30:07 +01:00
dir . mkdir ( exist_ok = True )
2022-11-23 13:07:21 +01:00
with readme . open ( ' w ' , encoding = " utf-8 " ) as f :
2022-11-22 16:30:07 +01:00
f . write ( f """ ---
title : { slug }
keywords : model TODO
tags : [ { ' , ' . join ( get_tag_slug ( tech ) for tech in info [ ' tech ' ] ) } ]
sidebar : datasetdoc_sidebar
permalink : { model_id } . html
2022-11-23 13:00:38 +01:00
toc : false
2022-11-22 16:30:07 +01:00
- - -
2022-11-22 16:26:31 +01:00
## Repository Information
Repository : [ GitHub ] ( https : / / github . com / { slug } )
Owner : [ { owner_name } ] ( https : / / github . com / { owner_slug } )
The repository has { plural ( stars , ' star ' ) } and was forked { plural ( forks , ' time ' ) } . The codebase consists of { plural ( info [ ' l ' ] , ' line ' ) } of code and makes use of the following technologies :
2022-11-23 11:48:09 +01:00
{ chr ( 10 ) . join ( f ' <a class= " btn btn-primary " style= " margin-bottom: 5px " role= " button " href= " tag_ { get_tag_slug ( tech ) } .html " > { tech } </a> ' for tech in info [ ' tech ' ] ) }
2022-11-22 16:26:31 +01:00
## Data Flow Diagram
2022-11-23 11:48:09 +01:00
### DFD Model
{ { % include note . html content = " Download the [model file](../../dataset/ {model_id} / {model_id} .py) " % } }
The images below were generated by executing the model file . The DFD is represented as a CodeableModels file .
2022-11-23 13:07:21 +01:00
` ` ` python
{ model }
` ` `
2022-11-22 16:26:31 +01:00
### Statistics
The Application consists of a total of { plural ( info [ ' t ' ] , ' element ' ) } :
Element | Count
- - | - -
Services | { info [ ' s ' ] }
External Entities | { info [ ' e ' ] }
Information Flows | { info [ ' i ' ] }
Annotations | { info [ ' a ' ] }
Total Items | { info [ ' t ' ] }
### Diagram
Formats :
2022-11-22 16:30:44 +01:00
- [ PlantUML Model ] ( . . / . . / dataset / { model_id } / { model_id } / { model_id } . txt )
- [ SVG Vector Image ] ( . . / . . / dataset / { model_id } / { model_id } / { model_id } . svg )
- [ PNG Raster Image ] ( . . / . . / dataset / { model_id } / { model_id } / { model_id } . png )
2022-11-22 16:26:31 +01:00
2022-11-23 11:49:26 +01:00
! [ Data Flow Diagram ] ( . . / . . / dataset / { model_id } / { model_id } / { model_id } . svg )
2022-11-23 13:00:38 +01:00
{ write_security_rules ( info , security_rules ) }
2022-11-23 11:49:26 +01:00
""" )
2022-11-22 16:26:31 +01:00
2022-11-23 13:00:38 +01:00
def write_root_readme ( dataset : Dataset ) :
2022-11-22 16:26:31 +01:00
print ( f " Writing main readme file " )
2022-11-22 16:30:07 +01:00
with open ( ' index.md ' , ' w ' , encoding = " utf-8 " ) as f :
f . write ( f """ ---
2022-11-23 11:48:09 +01:00
title : code2DFD Documentation
keywords : code2DFD introduction
tags : [ ]
2022-11-22 16:30:07 +01:00
sidebar : datasetdoc_sidebar
permalink : index . html
summary : Dataset of dataflow diagrams of microservice applications .
- - -
2022-11-23 11:48:09 +01:00
# code2DFD
{ { % include image . html file = " TUHH_logo-wortmarke_en_rgb.svg " alt = " TUHH Logo " max - width = " 350 " % } }
This project is developed by the Institute of Software Security at Hamburg University of Technology .
{ { % include image . html file = " company_logo_big.png " alt = " SoftSec Institute Logo " max - width = " 350 " % } }
This is a description . Duis proident aliqua laborum reprehenderit duis nostrud sint duis anim Lorem anim ut .
## DFD Items
Do culpa deserunt est excepteur amet . Non pariatur ea elit ad eiusmod veniam exercitation nulla . Commodo do adipisicing amet et . Voluptate laboris commodo dolor eu mollit ipsum . Amet reprehenderit velit eu culpa amet exercitation . Elit esse ullamco duis mollit quis . Eiusmod qui reprehenderit sunt cupidatat Lorem anim occaecat enim sint eiusmod tempor .
## Use-Cases
Veniam culpa nostrud id laborum deserunt consectetur consectetur voluptate . Sint aute cupidatat velit irure elit laboris anim labore esse labore . Quis ullamco ut consequat amet . Enim sit laboris deserunt veniam duis aliqua irure proident .
""" )
print ( f " Writing models readme file " )
with open ( ' dataset.md ' , ' w ' , encoding = " utf-8 " ) as f :
f . write ( f """ ---
title : code2DFD Dataset
keywords : dataset models
tags : [ ]
sidebar : datasetdoc_sidebar
permalink : dataset . html
summary : Dataset of dataflow diagrams of microservice applications .
2022-11-23 13:00:38 +01:00
datatable : true
2022-11-23 11:48:09 +01:00
- - -
2022-11-22 16:30:07 +01:00
# Dataset of Dataflow Diagrams
2022-11-22 16:26:31 +01:00
2022-11-22 16:30:07 +01:00
This repository contains of { len ( dataset ) } manually created dataflow diagrams ( DFDs ) of microservice applications found on GitHub . The dataset is published as an additional contribution to " Automatic Extraction of Security-Rich Dataflow Diagrams for Microservice Applications written in Java " [ Simon Schneider , Riccardo Scandariato ] . Each folder in the [ ` dataset ` ] ( dataset / ) directory contains one DFD in a [ CodeableModels ] ( https : / / github . com / uzdun / CodeableModels ) - format that can be executed to generate PNG , SVG and TXT files for the DFD . Each model refers to stereotypes and metaclasses from the [ metamodel ] ( microservice_dfds_metamodel . py ) which needs to be imported . This repository already contains rendered versions for each model , thus setup and rendering is only necessary once changes to the models are made .
2022-11-22 16:26:31 +01:00
## Models
2022-11-22 16:27:23 +01:00
< div class = " datatable-begin " > < / div >
2022-11-22 16:26:31 +01:00
Name | Source | LoC | Stars | Forks | DFD Items | Technologies
- - | - - | - - | - - | - - | - - | - -
2022-11-22 16:30:07 +01:00
{ chr ( 10 ) . join ( f " [ { info [ ' slug ' ] } ]( { model_id } .html) | [GitHub](https://github.com/ { info [ ' slug ' ] } ) | { info [ ' l ' ] } | { info [ ' stars ' ] } | { info [ ' forks ' ] } | { info [ ' t ' ] } | { len ( info [ ' tech ' ] ) } " for model_id , info in dataset . items ( ) ) }
2022-11-22 16:27:23 +01:00
< div class = " datatable-end " > < / div >
2022-11-22 16:26:31 +01:00
""" )
2022-11-23 13:00:38 +01:00
def write_tag_readme ( dataset : Dataset ) :
2022-11-22 16:30:07 +01:00
tag_dir = output_path / ' tags '
known_tech = set ( tech for model in dataset . values ( ) for tech in model [ ' tech ' ] )
print ( f " Writing tag data file " )
2022-11-23 11:48:09 +01:00
tags_data_path = Path ( ' _data ' )
tags_data_file = tags_data_path / ' tags.yml '
tags_data_path . mkdir ( exist_ok = True , parents = True )
with tags_data_file . open ( ' r+ ' ) as f :
2022-11-22 16:30:07 +01:00
tags = yaml . safe_load ( f )
tags [ ' allowed-tags ' ] = list ( sorted ( set ( itertools . chain ( tags [ ' allowed-tags ' ] , ( get_tag_slug ( tech ) for tech in known_tech ) ) ) ) )
f . seek ( 0 )
yaml . dump ( tags , f )
f . truncate ( )
for tech in known_tech :
slug = get_tag_slug ( tech )
info_file = tag_dir / f ' tag_ { slug } .md '
print ( f " Writing tag file for { tech } " )
2022-11-23 11:48:09 +01:00
tag_dir . mkdir ( exist_ok = True , parents = True )
2022-11-22 16:30:07 +01:00
with open ( info_file , ' w ' , encoding = " utf-8 " ) as f :
f . write ( f """ ---
title : " {tech} "
tagName : { slug }
search : exclude
permalink : tag_ { slug } . html
sidebar : datasetdoc_sidebar
2022-11-23 11:48:09 +01:00
hide_sidebar : true
2022-11-22 16:30:07 +01:00
folder : tags
- - -
{ { % include taglogic . html % } }
{ { % include links . html % } }
""" )
2022-11-22 16:26:31 +01:00
def main ( ) :
2022-11-22 16:30:07 +01:00
global known_tags
2022-11-22 16:26:31 +01:00
dataset = open_dataset ( )
2022-11-22 16:30:07 +01:00
write_tag_readme ( dataset )
2022-11-22 16:26:31 +01:00
write_root_readme ( dataset )
write_model_readmes ( dataset )
2022-11-22 16:30:07 +01:00
if update_dataset :
save_dataset ( dataset )
2022-11-22 16:26:31 +01:00
2022-11-22 16:30:07 +01:00
yaml . dump
2022-11-22 16:26:31 +01:00
if __name__ == ' __main__ ' :
2022-11-22 16:29:30 +01:00
main ( )