From bc7c22807bda0e1323b9da716b4f9de83978c715 Mon Sep 17 00:00:00 2001 From: Aryan-SINGH-GIT Date: Mon, 16 Feb 2026 05:23:51 +0530 Subject: [PATCH] Optimize CVSS parsing UI Signed-off-by: Aryan-SINGH-GIT --- .../commands/populate_severity_data.py | 28 +++++++ ...severity_scoring_elements_data_and_more.py | 23 ++++++ vulnerabilities/models.py | 42 +++++++++++ vulnerabilities/tests/test_cvss_storage.py | 31 ++++++++ vulnerabilities/views.py | 73 ++++++++++--------- 5 files changed, 163 insertions(+), 34 deletions(-) create mode 100644 vulnerabilities/management/commands/populate_severity_data.py create mode 100644 vulnerabilities/migrations/0113_advisoryseverity_scoring_elements_data_and_more.py create mode 100644 vulnerabilities/tests/test_cvss_storage.py diff --git a/vulnerabilities/management/commands/populate_severity_data.py b/vulnerabilities/management/commands/populate_severity_data.py new file mode 100644 index 000000000..f328030ee --- /dev/null +++ b/vulnerabilities/management/commands/populate_severity_data.py @@ -0,0 +1,28 @@ +from django.core.management.base import BaseCommand +from vulnerabilities.models import VulnerabilitySeverity, AdvisorySeverity + +class Command(BaseCommand): + help = "Populate scoring_elements_data for VulnerabilitySeverity and AdvisorySeverity" + + def handle(self, *args, **options): + self.stdout.write("Starting population of VulnerabilitySeverity...") + qs = VulnerabilitySeverity.objects.filter(scoring_elements__isnull=False) + count = qs.count() + self.stdout.write(f"Found {count} VulnerabilitySeverity records to process.") + + for i, severity in enumerate(qs.iterator(chunk_size=1000), start=1): + severity.save() + if i % 1000 == 0: + self.stdout.write(f"Processed {i}/{count} VulnerabilitySeverity records...") + + self.stdout.write("Starting population of AdvisorySeverity...") + qs = AdvisorySeverity.objects.filter(scoring_elements__isnull=False) + count = qs.count() + self.stdout.write(f"Found {count} AdvisorySeverity records to process.") + + for i, severity in enumerate(qs.iterator(chunk_size=1000), start=1): + severity.save() + if i % 1000 == 0: + self.stdout.write(f"Processed {i}/{count} AdvisorySeverity records...") + + self.stdout.write("Population completed.") diff --git a/vulnerabilities/migrations/0113_advisoryseverity_scoring_elements_data_and_more.py b/vulnerabilities/migrations/0113_advisoryseverity_scoring_elements_data_and_more.py new file mode 100644 index 000000000..1fb560fcc --- /dev/null +++ b/vulnerabilities/migrations/0113_advisoryseverity_scoring_elements_data_and_more.py @@ -0,0 +1,23 @@ +# Generated by Django 5.2.11 on 2026-02-15 20:41 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('vulnerabilities', '0112_alter_advisoryseverity_scoring_system_and_more'), + ] + + operations = [ + migrations.AddField( + model_name='advisoryseverity', + name='scoring_elements_data', + field=models.JSONField(blank=True, default=dict, help_text='Serialized scoring elements data.'), + ), + migrations.AddField( + model_name='vulnerabilityseverity', + name='scoring_elements_data', + field=models.JSONField(blank=True, default=dict, help_text='Serialized scoring elements data.'), + ), + ] diff --git a/vulnerabilities/models.py b/vulnerabilities/models.py index e64599c96..ef821ee35 100644 --- a/vulnerabilities/models.py +++ b/vulnerabilities/models.py @@ -210,6 +210,27 @@ class VulnerabilitySeverity(models.Model): "For example a CVSS vector string as used to compute a CVSS score.", ) + scoring_elements_data = models.JSONField( + default=dict, + blank=True, + help_text="Serialized scoring elements data.", + ) + + def save(self, *args, **kwargs): + if self.scoring_elements: + try: + scoring_system = SCORING_SYSTEMS.get(self.scoring_system) + if scoring_system: + self.scoring_elements_data = scoring_system.get(self.scoring_elements) + except ( + CVSS2MalformedError, + CVSS3MalformedError, + CVSS4MalformedError, + NotImplementedError, + ): + pass + super().save(*args, **kwargs) + published_at = models.DateTimeField( blank=True, null=True, help_text="UTC Date of publication of the vulnerability severity" ) @@ -2579,6 +2600,27 @@ class AdvisorySeverity(models.Model): "For example a CVSS vector string as used to compute a CVSS score.", ) + scoring_elements_data = models.JSONField( + default=dict, + blank=True, + help_text="Serialized scoring elements data.", + ) + + def save(self, *args, **kwargs): + if self.scoring_elements: + try: + scoring_system = SCORING_SYSTEMS.get(self.scoring_system) + if scoring_system: + self.scoring_elements_data = scoring_system.get(self.scoring_elements) + except ( + CVSS2MalformedError, + CVSS3MalformedError, + CVSS4MalformedError, + NotImplementedError, + ): + pass + super().save(*args, **kwargs) + published_at = models.DateTimeField( blank=True, null=True, help_text="UTC Date of publication of the vulnerability severity" ) diff --git a/vulnerabilities/tests/test_cvss_storage.py b/vulnerabilities/tests/test_cvss_storage.py new file mode 100644 index 000000000..8251a5384 --- /dev/null +++ b/vulnerabilities/tests/test_cvss_storage.py @@ -0,0 +1,31 @@ + +from django.test import TestCase +from vulnerabilities.models import VulnerabilitySeverity, AdvisorySeverity +from vulnerabilities.severity_systems import SCORING_SYSTEMS + +class TestSeverityStorage(TestCase): + def test_scoring_elements_data_population_vulnerability(self): + # CVSS v3.1 vector + vector = "CVSS:3.1/AV:N/AC:L/PR:N/UI:N/S:U/C:H/I:H/A:H" + severity = VulnerabilitySeverity.objects.create( + scoring_system="cvssv3.1", + scoring_elements=vector, + value="9.8" + ) + severity.refresh_from_db() + self.assertTrue(severity.scoring_elements_data) + self.assertEqual(severity.scoring_elements_data['version'], '3.1') + self.assertEqual(severity.scoring_elements_data['vectorString'], vector) + + def test_scoring_elements_data_population_advisory(self): + # CVSS v2 vector + vector = "AV:N/AC:L/Au:N/C:P/I:P/A:P" + severity = AdvisorySeverity.objects.create( + scoring_system="cvssv2", + scoring_elements=vector, + value="7.5" + ) + severity.refresh_from_db() + self.assertTrue(severity.scoring_elements_data) + self.assertEqual(severity.scoring_elements_data['version'], '2.0') + self.assertEqual(severity.scoring_elements_data['vectorString'], vector) diff --git a/vulnerabilities/views.py b/vulnerabilities/views.py index 8a867983e..b3f346623 100644 --- a/vulnerabilities/views.py +++ b/vulnerabilities/views.py @@ -314,7 +314,7 @@ def get_queryset(self): Prefetch( "severities", queryset=models.VulnerabilitySeverity.objects.only( - "scoring_system", "value", "url", "scoring_elements", "published_at" + "scoring_system", "value", "url", "scoring_elements", "scoring_elements_data", "published_at" ), ), Prefetch( @@ -347,21 +347,22 @@ def get_context_data(self, **kwargs): severity_vectors = [] for severity in valid_severities: - try: - vector_values_system = SCORING_SYSTEMS[severity.scoring_system] - if not vector_values_system: - logging.error(f"Unknown scoring system: {severity.scoring_system}") - continue - vector_values = vector_values_system.get(severity.scoring_elements) - if vector_values: - severity_vectors.append({"vector": vector_values, "origin": severity.url}) - except ( - CVSS2MalformedError, - CVSS3MalformedError, - CVSS4MalformedError, - NotImplementedError, - ): - logging.error(f"CVSSMalformedError for {severity.scoring_elements}") + vector_values = severity.scoring_elements_data + if not vector_values and severity.scoring_elements: + try: + vector_values_system = SCORING_SYSTEMS.get(severity.scoring_system) + if vector_values_system: + vector_values = vector_values_system.get(severity.scoring_elements) + except ( + CVSS2MalformedError, + CVSS3MalformedError, + CVSS4MalformedError, + NotImplementedError, + ): + logging.error(f"CVSSMalformedError for {severity.scoring_elements}") + + if vector_values: + severity_vectors.append({"vector": vector_values, "origin": severity.url}) epss_severity = vulnerability.severities.filter(scoring_system="epss").first() epss_data = None @@ -427,7 +428,7 @@ def get_queryset(self): Prefetch( "severities", queryset=models.AdvisorySeverity.objects.only( - "scoring_system", "value", "url", "scoring_elements", "published_at" + "scoring_system", "value", "url", "scoring_elements", "scoring_elements_data", "published_at" ), ), Prefetch( @@ -494,23 +495,27 @@ def get_context_data(self, **kwargs): severity_vectors = [] for severity in valid_severities: - try: - vector_values_system = SCORING_SYSTEMS.get(severity.scoring_system) - if not vector_values_system: - logging.error(f"Unknown scoring system: {severity.scoring_system}") - continue - if vector_values_system.identifier in ["cvssv3.1_qr"]: - continue - vector_values = vector_values_system.get(severity.scoring_elements) - if vector_values: - severity_vectors.append({"vector": vector_values, "origin": severity.url}) - except ( - CVSS2MalformedError, - CVSS3MalformedError, - CVSS4MalformedError, - NotImplementedError, - ): - logging.error(f"CVSSMalformedError for {severity.scoring_elements}") + vector_values_system = SCORING_SYSTEMS.get(severity.scoring_system) + if not vector_values_system: + logging.error(f"Unknown scoring system: {severity.scoring_system}") + continue + if vector_values_system.identifier in ["cvssv3.1_qr"]: + continue + + vector_values = severity.scoring_elements_data + if not vector_values and severity.scoring_elements: + try: + vector_values = vector_values_system.get(severity.scoring_elements) + except ( + CVSS2MalformedError, + CVSS3MalformedError, + CVSS4MalformedError, + NotImplementedError, + ): + logging.error(f"CVSSMalformedError for {severity.scoring_elements}") + + if vector_values: + severity_vectors.append({"vector": vector_values, "origin": severity.url}) def add_ssvc(ssvc): key = (ssvc.vector, ssvc.source_advisory_id)