Compare commits
3 Commits
b4f5e3ebd4
...
556035bdd9
Author | SHA1 | Date | |
---|---|---|---|
556035bdd9 | |||
44bae3fa85 | |||
62c96fcbf6 |
38
build.py
38
build.py
@ -30,6 +30,22 @@ def get_webhook():
|
||||
return None
|
||||
|
||||
|
||||
def update_portainer_stack(webhook_id: str):
|
||||
print("Updating portainer stack...")
|
||||
resp = post(f"https://docker.cnml.de/api/stacks/webhooks/{webhook_id}")
|
||||
if not resp.ok:
|
||||
try:
|
||||
try:
|
||||
error = resp.json()
|
||||
except Exception:
|
||||
error = resp.content.decode()
|
||||
raise Exception(error)
|
||||
raise Exception(f"{error['message']} ({error['details']})")
|
||||
except Exception as e:
|
||||
print("Failed to update:", e)
|
||||
else:
|
||||
print("Stack successfully updated!")
|
||||
|
||||
if __name__ == '__main__':
|
||||
output_path = Path("dist")
|
||||
if output_path.exists():
|
||||
@ -42,22 +58,14 @@ if __name__ == '__main__':
|
||||
stdout=PIPE, check=True).stdout.decode().strip()
|
||||
short_sha = run(["git", "rev-parse", "--short", "HEAD"],
|
||||
stdout=PIPE, check=True).stdout.decode().strip()
|
||||
tags = [branch, short_sha]
|
||||
if branch == 'main':
|
||||
default_branch_tag = "latest"
|
||||
print(f"On default branch, also building {default_branch_tag} tag!")
|
||||
tags.append(default_branch_tag)
|
||||
platforms = ['linux/amd64', 'linux/arm/v6', 'linux/arm/v7',
|
||||
'linux/arm64/v8', 'linux/386', 'linux/ppc64le', 'linux/s390x']
|
||||
buildx("chenio/code2dfd", [branch, short_sha], platforms, dockerfile=dockerfile)
|
||||
buildx("chenio/code2dfd", tags, platforms, dockerfile=dockerfile)
|
||||
webhook_id = get_webhook()
|
||||
if webhook_id is not None:
|
||||
print("Updating portainer stack...")
|
||||
resp = post(f"https://docker.cnml.de/api/stacks/webhooks/{webhook_id}")
|
||||
if not resp.ok:
|
||||
try:
|
||||
try:
|
||||
error = resp.json()
|
||||
except Exception:
|
||||
error = resp.content.decode()
|
||||
raise Exception(error)
|
||||
raise Exception(f"{error['message']} ({error['details']})")
|
||||
except Exception as e:
|
||||
print("Failed to update:", e)
|
||||
else:
|
||||
print("Stack successfully updated!")
|
||||
update_portainer_stack(webhook_id)
|
||||
|
@ -4,7 +4,7 @@ import json
|
||||
import itertools
|
||||
import yaml
|
||||
import jsonschema
|
||||
from typing import Any, List, NotRequired, Optional, TypedDict
|
||||
from typing import Any, Dict, List, Literal, NotRequired, Optional, TypedDict
|
||||
import requests
|
||||
try:
|
||||
from yachalk import chalk
|
||||
@ -219,9 +219,12 @@ class Artifact(TypedDict):
|
||||
file: str
|
||||
lines: NotRequired[list[int]]
|
||||
repository: NotRequired[str]
|
||||
branch: NotRequired[str]
|
||||
|
||||
RuleStatus = Literal["disregarded", "observed", "not applicable", "unknown"]
|
||||
|
||||
class SecurityRule(TypedDict):
|
||||
status: str
|
||||
status: RuleStatus
|
||||
argument: str | list[str]
|
||||
artifacts: NotRequired[list[Artifact]]
|
||||
|
||||
@ -256,12 +259,14 @@ properties:
|
||||
type: string
|
||||
repository:
|
||||
type: string
|
||||
branch:
|
||||
type: string
|
||||
lines:
|
||||
type: array
|
||||
items:
|
||||
type: integer""")
|
||||
|
||||
def check_security_rules(security_rules: dict[Any, Any] | None) -> dict[int, SecurityRule]:
|
||||
def check_security_rules(model_id: str, security_rules: dict[Any, Any] | None) -> dict[int, SecurityRule]:
|
||||
if security_rules is None:
|
||||
raise Exception("Security rules file is empty!")
|
||||
for n in range(1, 19):
|
||||
@ -271,7 +276,7 @@ def check_security_rules(security_rules: dict[Any, Any] | None) -> dict[int, Sec
|
||||
jsonschema.validate(rule, rule_schema)
|
||||
rule: SecurityRule
|
||||
if rule["status"] == "unknown":
|
||||
warning(f"Rule {n} is still unknown!")
|
||||
warning(f"In model {model_id}: Rule {n} is still unknown!")
|
||||
except jsonschema.ValidationError as e:
|
||||
warning("Not checking further rules!")
|
||||
raise Exception("Security rule {n}: {msg} at $.{n}.{path}".format(n=n, msg=e.message, path=e.json_path)) from e
|
||||
@ -309,7 +314,9 @@ rule_names = {
|
||||
def artifact_to_string(info: ModelInformation, artifact: Artifact):
|
||||
file = Path(artifact['file'])
|
||||
filename = file.name
|
||||
file_url = f"https://github.com/{artifact.get('repository', info['slug'])}/blob/{info.get('branch', 'master')}/{artifact['file']}"
|
||||
project_branch = info.get("branch", "master")
|
||||
branch = artifact.get("branch", project_branch)
|
||||
file_url = f"https://github.com/{artifact.get('repository', info['slug'])}/blob/{branch}/{artifact['file']}"
|
||||
lines = artifact.get("lines")
|
||||
if lines is None:
|
||||
return f"- {filename}: [File]({file_url})"
|
||||
@ -322,7 +329,7 @@ def rule_to_string(info: ModelInformation, id: int, rule: SecurityRule | None):
|
||||
return ""
|
||||
argument = rule['argument']
|
||||
argument = argument if isinstance(argument, str) else "".join(f"\n1. {arg}" for arg in argument)
|
||||
text = f"""#### Rule {id}: {rule_names[id]}
|
||||
text = f"""#### Rule {id}: {rule_names[id]} {{#rule{id:02}}}
|
||||
|
||||
This rule is {rule['status']}: {argument}"""
|
||||
artifacts = rule.get("artifacts", [])
|
||||
@ -334,8 +341,18 @@ Artifacts:
|
||||
return text
|
||||
|
||||
def write_security_rules(info: ModelInformation, security_rules: dict[int, SecurityRule]):
|
||||
icons: Dict[RuleStatus | str, str] = {
|
||||
'disregarded': '<i class="fa fa-exclamation-circle" style="color: #d72b28;"></i>',
|
||||
'observed': '<i class="fa fa-check-square-o" style="color: #6be16d;"></i>',
|
||||
'not applicable': '<i class="fa fa-info-circle" style="color: #31708;"></i>',
|
||||
'unknown': '<i class="fa fa-warning" style="color: #bfc600;"></i>',
|
||||
}
|
||||
return f"""## Security Rules
|
||||
|
||||
{" | ".join(f"R{i}" for i in range(1, 19))}
|
||||
{" | ".join("--" for _ in range(1, 19))}
|
||||
{" | ".join(f'<a href="#rule{i:02}">{icons[security_rules.get(i, {"status": "unknown"})["status"]]}</a>' for i in range(1, 19))}
|
||||
|
||||
### Authentication / Authorization
|
||||
|
||||
{(chr(10)*2).join(rule_to_string(info, i, security_rules.get(i)) for i in range(1, 7))}
|
||||
@ -404,7 +421,7 @@ def write_model_readmes(dataset: Dataset):
|
||||
security_rules = None
|
||||
try:
|
||||
with security_rules_file.open('r') as f:
|
||||
security_rules = check_security_rules(yaml.safe_load(f))
|
||||
security_rules = check_security_rules(model_id, yaml.safe_load(f))
|
||||
except FileNotFoundError:
|
||||
warning("Security rules file not found at {}".format(security_rules_file))
|
||||
except Exception as e:
|
||||
@ -412,7 +429,7 @@ def write_model_readmes(dataset: Dataset):
|
||||
dir.mkdir(exist_ok=True)
|
||||
write_file_if_changed(readme, f"""---
|
||||
title: {slug}
|
||||
keywords: model TODO
|
||||
keywords: model
|
||||
tags: [{', '.join(get_tag_slug(tech) for tech in info['tech'])}]
|
||||
sidebar: datasetdoc_sidebar
|
||||
permalink: {model_id}.html
|
||||
@ -475,7 +492,6 @@ keywords: code2DFD introduction
|
||||
tags: [overview]
|
||||
sidebar: datasetdoc_sidebar
|
||||
permalink: index.html
|
||||
summary: Dataset of dataflow diagrams of microservice applications.
|
||||
toc: false
|
||||
---
|
||||
|
||||
|
@ -28,6 +28,7 @@
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
"repository": { "type": "string" },
|
||||
"branch": { "type": "string" },
|
||||
"file": { "type": "string" },
|
||||
"lines": { "type": "array", "items": { "type": "integer" } }
|
||||
},
|
||||
|
Loading…
Reference in New Issue
Block a user