Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 0 additions & 16 deletions .basedpyright/baseline.json
Original file line number Diff line number Diff line change
Expand Up @@ -19468,22 +19468,6 @@
"endColumn": 21,
"lineCount": 1
}
},
{
"code": "reportPossiblyUnboundVariable",
"range": {
"startColumn": 26,
"endColumn": 33,
"lineCount": 1
}
},
{
"code": "reportArgumentType",
"range": {
"startColumn": 39,
"endColumn": 49,
"lineCount": 1
}
}
],
"./monitoring/uss_qualifier/scenarios/astm/utm/off_nominal_planning/down_uss_equal_priority_not_permitted.py": [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class DownUSS(TestScenario):
scenario_execution_max_extents: Volume4D | None = None
"""Actual bounding extent of the flight intents created by the USS qualifier acting as a virtual USS during the scenario execution."""

estimated_max_extents: Volume4D | None = None
"""Estimated bounding extent of the flight intents created by the USS qualifier acting as a virtual USS during the scenario execution."""

tested_uss: FlightPlannerClient
dss_resource: DSSInstanceResource
dss: DSSInstance
Expand Down Expand Up @@ -136,15 +139,15 @@ def run(self, context: ExecutionContext):
self.end_test_scenario()

def _setup(self):
estimated_max_extents = estimate_scenario_execution_max_extents(
self.estimated_max_extents = estimate_scenario_execution_max_extents(
self.time_context, self.flight_intents_templates
)

self.begin_test_step("Resolve USS ID of virtual USS")
with self.check("Successful dummy query", [self.dss.participant_id]) as check:
try:
_, dummy_query = self.dss.find_op_intent(
estimated_max_extents.to_f3548v21()
self.estimated_max_extents.to_f3548v21()
)
self.record_query(dummy_query)
except QueryError as e:
Expand Down Expand Up @@ -174,7 +177,7 @@ def _setup(self):
validate_clear_area(
self,
self.dss,
[estimated_max_extents],
[self.estimated_max_extents],
ignore_self=False,
)
self.end_test_step()
Expand Down Expand Up @@ -299,38 +302,39 @@ def _clear_op_intents(self):
with self.check(
"Successful operational intents cleanup", [self.dss.participant_id]
) as check:
if self.scenario_execution_max_extents is None:
return

try:
oi_refs, find_query = self.dss.find_op_intent(
self.scenario_execution_max_extents.to_f3548v21()
)
self.record_query(find_query)
except QueryError as e:
self.record_queries(e.queries)
find_query = e.queries[0]
check.record_failed(
summary=f"Failed to query operational intent references from DSS in {self.scenario_execution_max_extents} for cleanup",
details=f"DSS responded code {find_query.status_code}; {e}",
query_timestamps=[find_query.request.timestamp],
)

for oi_ref in oi_refs:
if oi_ref.manager == self.uss_qualifier_sub:
try:
del_oi, _, del_query = self.dss.delete_op_intent(
oi_ref.id, oi_ref.ovn
)
self.record_query(del_query)
except QueryError as e:
self.record_queries(e.queries)
del_query = e.queries[0]
check.record_failed(
summary=f"Failed to delete op intent {oi_ref.id} from DSS",
details=f"DSS responded code {del_query.status_code}; {e}",
query_timestamps=[del_query.request.timestamp],
)
for area in [
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

By always potentially clearing according to both areas, we're probably being a little less clear (scenario_execution_max_extents will never be relevant when calling this function from the setup step) and a little more broad than necessary (we don't need to clear estimated_max_extents in the end-of-scenario cleanup if nothing actually happened during the scenario). If we were still going to do this, I think we should still make just one query as queries are far, far more expensive than a tiny bit of local math:

            areas = Volume4DCollection([])
            if self.scenario_execution_max_extents:
                areas.append(self.scenario_execution_max_extents)
            if self.estimated_max_extents:
                areas.append(self.estimated_max_extents)
            if not areas:
                return
            area = areas.bounding_volume

But, it seems like we could instead simply have _clear_op_intents accept a Volume4D and call it with estimated_max_extents in setup and scenario_execution_max_extents in end-of-scenario cleanup.

self.estimated_max_extents,
self.scenario_execution_max_extents,
]:
if area is None:
continue
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a pre-existing problem, but we're initiating the check too early. Ideally, we should complete the action to collect data before we initiate the check to evaluate that data. But, it would be inconvenient to need to repeat the with initiation statement in two different places, so I think practically it's ok to initiate the check before performing the action (find_op_intent) since starting to perform the action necessarily means we're going to perform the check one way or another. But, we should not initiate the check if we're not actually going to perform the action (and therefore definitely not perform the check).

If area is None, currently the check will indicate as passing, but that is incorrect as we haven't checked anything. Instead, we should make sure we're actually going to check something before initiating the check since initiating a check and not recording a failure is defined as meaning that the check has positively passed.


try:
oi_refs, find_query = self.dss.find_op_intent(area.to_f3548v21())
self.record_query(find_query)
except QueryError as e:
self.record_queries(e.queries)
find_query = e.queries[0]
check.record_failed(
summary=f"Failed to query operational intent references from DSS in {area} for cleanup",
details=f"DSS responded code {find_query.status_code}; {e}",
query_timestamps=[find_query.request.timestamp],
)
continue

for oi_ref in oi_refs:
if oi_ref.manager == self.uss_qualifier_sub and (ovn := oi_ref.ovn):
try:
_, _, del_query = self.dss.delete_op_intent(oi_ref.id, ovn)
self.record_query(del_query)
except QueryError as e:
self.record_queries(e.queries)
del_query = e.queries[0]
check.record_failed(
summary=f"Failed to delete op intent {oi_ref.id} from DSS",
details=f"DSS responded code {del_query.status_code}; {e}",
query_timestamps=[del_query.request.timestamp],
)

def cleanup(self):
self.begin_cleanup()
Expand Down
Loading