diff --git a/src/secops/chronicle/case.py b/src/secops/chronicle/case.py index eb1ff896..0e78d989 100644 --- a/src/secops/chronicle/case.py +++ b/src/secops/chronicle/case.py @@ -24,6 +24,10 @@ CaseList, CasePriority, ) +from secops.chronicle.utils.format_utils import ( + format_resource_id, + remove_none_values, +) from secops.chronicle.utils.request_utils import ( chronicle_paginated_request, chronicle_request, @@ -58,32 +62,29 @@ def get_cases( Raises: APIError: If the API request fails """ - params: dict[str, Any] = {"pageSize": str(page_size)} - - if page_token: - params["pageToken"] = page_token + params = remove_none_values( + { + "pageSize": str(page_size), + "pageToken": page_token, + "tenantId": tenant_id, + } + ) if start_time: params["createTime.startTime"] = start_time.strftime( "%Y-%m-%dT%H:%M:%S.%fZ" ) - if end_time: params["createTime.endTime"] = end_time.strftime( "%Y-%m-%dT%H:%M:%S.%fZ" ) - if case_ids: for case_id in case_ids: params["caseId"] = case_id - if asset_identifiers: for asset in asset_identifiers: params["assetId"] = asset - if tenant_id: - params["tenantId"] = tenant_id - return chronicle_request( client, method="GET", @@ -296,17 +297,15 @@ def execute_bulk_close( f"Valid values: {valid_values}" ) from ve - body: dict[str, Any] = { - "casesIds": case_ids, - "closeReason": close_reason, - } - - if root_cause is not None: - body["rootCause"] = root_cause - if close_comment is not None: - body["closeComment"] = close_comment - if dynamic_parameters is not None: - body["dynamicParameters"] = dynamic_parameters + body = remove_none_values( + { + "casesIds": case_ids, + "closeReason": close_reason, + "rootCause": root_cause, + "closeComment": close_comment, + "dynamicParameters": dynamic_parameters, + } + ) return chronicle_request( client, @@ -363,21 +362,20 @@ def get_case(client, case_name: str, expand: str | None = None) -> Case: Raises: APIError: If the API request fails """ - if not case_name.startswith("projects/"): - endpoint_path = f"cases/{case_name}" - else: - endpoint_path = case_name + endpoint_path = format_resource_id(case_name) - params: dict[str, Any] = {} - if expand: - params["expand"] = expand + params = remove_none_values( + { + "expand": expand, + } + ) data = chronicle_request( client, method="GET", - endpoint_path=endpoint_path, + endpoint_path=f"cases/{endpoint_path}", api_version=APIVersion.V1BETA, - params=params, + params=params or None, error_message="Failed to get case", ) @@ -418,15 +416,14 @@ def list_cases( Raises: APIError: If the API request fails """ - extra_params: dict[str, Any] = {} - if filter_query: - extra_params["filter"] = filter_query - if order_by: - extra_params["orderBy"] = order_by - if expand: - extra_params["expand"] = expand - if distinct_by: - extra_params["distinctBy"] = distinct_by + extra_params = remove_none_values( + { + "filter": filter_query, + "orderBy": order_by, + "expand": expand, + "distinctBy": distinct_by, + } + ) return chronicle_paginated_request( client, @@ -435,7 +432,7 @@ def list_cases( items_key="cases", page_size=page_size, page_token=page_token, - extra_params=extra_params if extra_params else None, + extra_params=extra_params or None, as_list=as_list, ) @@ -500,10 +497,7 @@ def patch_case( APIError: If the API request fails ValueError: If an invalid priority value is provided """ - if not case_name.startswith("projects/"): - endpoint_path = f"cases/{case_name}" - else: - endpoint_path = case_name + endpoint_path = format_resource_id(case_name) if "priority" in case_data and isinstance(case_data["priority"], str): case_priority = case_data["priority"] @@ -519,17 +513,19 @@ def patch_case( f"Valid values: {valid_values}" ) from ve - params: dict[str, Any] = {} - if update_mask: - params["updateMask"] = update_mask + params = remove_none_values( + { + "updateMask": update_mask, + } + ) data = chronicle_request( client, method="PATCH", - endpoint_path=endpoint_path, + endpoint_path=f"cases/{endpoint_path}", api_version=APIVersion.V1BETA, json=case_data, - params=params if params else None, + params=params or None, error_message="Failed to patch case", ) diff --git a/src/secops/chronicle/client.py b/src/secops/chronicle/client.py index daf37376..7995a2b3 100644 --- a/src/secops/chronicle/client.py +++ b/src/secops/chronicle/client.py @@ -13,6 +13,7 @@ # limitations under the License. # """Chronicle API client.""" + import ipaddress import re from collections.abc import Iterator diff --git a/src/secops/chronicle/dashboard.py b/src/secops/chronicle/dashboard.py index 33223acb..f6518b6d 100644 --- a/src/secops/chronicle/dashboard.py +++ b/src/secops/chronicle/dashboard.py @@ -36,6 +36,7 @@ from secops.chronicle.utils.format_utils import ( format_resource_id, parse_json_list, + remove_none_values, ) if TYPE_CHECKING: @@ -105,21 +106,22 @@ def create_dashboard( if charts is not None: charts = parse_json_list(charts, "charts") - definition = {} - if filters is not None: - definition["filters"] = filters - if charts is not None: - definition["charts"] = charts - - payload = { - "displayName": display_name, - "definition": definition, - "access": access_type.value, - "type": "CUSTOM", - } + definition = remove_none_values( + { + "filters": filters, + "charts": charts, + } + ) - if description is not None: - payload["description"] = description + payload = remove_none_values( + { + "displayName": display_name, + "definition": definition, + "access": access_type.value, + "type": "CUSTOM", + "description": description, + } + ) return chronicle_request( client, diff --git a/src/secops/chronicle/dashboard_query.py b/src/secops/chronicle/dashboard_query.py index e7c8ce2a..02adac86 100644 --- a/src/secops/chronicle/dashboard_query.py +++ b/src/secops/chronicle/dashboard_query.py @@ -20,9 +20,14 @@ import json from typing import Any +from secops.exceptions import APIError from secops.chronicle.models import InputInterval from secops.chronicle.utils.request_utils import chronicle_request -from secops.exceptions import APIError +from secops.chronicle.utils.format_utils import ( + format_resource_id, + parse_json_list, + remove_none_values, +) def execute_query( @@ -47,10 +52,6 @@ def execute_query( try: if isinstance(interval, str): interval = json.loads(interval) - if filters and isinstance(filters, str): - filters = json.loads(filters) - if not isinstance(filters, list): - filters = [filters] except ValueError as e: raise APIError( f"Failed to parse JSON. Must be a valid JSON string: {e}" @@ -59,12 +60,16 @@ def execute_query( if isinstance(interval, dict): interval = InputInterval.from_dict(interval) - payload = {"query": {"query": query, "input": interval.to_dict()}} - - if clear_cache is not None: - payload["clearCache"] = clear_cache if filters: - payload["filters"] = filters + filters = parse_json_list(filters, "filters") + + payload = remove_none_values( + { + "query": {"query": query, "input": interval.to_dict()}, + "clearCache": clear_cache, + "filters": filters if filters else None, + } + ) return chronicle_request( client, @@ -85,12 +90,9 @@ def get_execute_query(client, query_id: str) -> dict[str, Any]: Returns: Dictionary containing query details """ - if query_id.startswith("projects/"): - query_id = query_id.split("/")[-1] - return chronicle_request( client, method="GET", - endpoint_path=f"dashboardQueries/{query_id}", + endpoint_path=f"dashboardQueries/{format_resource_id(query_id)}", error_message="Failed to get query", ) diff --git a/src/secops/chronicle/data_export.py b/src/secops/chronicle/data_export.py index d641f1d1..a1fa738b 100644 --- a/src/secops/chronicle/data_export.py +++ b/src/secops/chronicle/data_export.py @@ -22,6 +22,7 @@ from datetime import datetime from typing import Any +from secops.chronicle.utils.format_utils import remove_none_values from secops.chronicle.utils.request_utils import ( chronicle_request, chronicle_paginated_request, @@ -438,9 +439,11 @@ def list_data_export( export = chronicle.list_data_export() ``` """ - extra_params = {} - if filters: - extra_params["filter"] = filters + extra_params = remove_none_values( + { + "filter": filters, + } + ) return chronicle_paginated_request( client, @@ -448,6 +451,6 @@ def list_data_export( items_key="dataExports", page_size=page_size, page_token=page_token, - extra_params=extra_params if extra_params else None, + extra_params=extra_params or None, as_list=as_list, ) diff --git a/src/secops/chronicle/data_table.py b/src/secops/chronicle/data_table.py index 46cacc90..2eba2bf3 100644 --- a/src/secops/chronicle/data_table.py +++ b/src/secops/chronicle/data_table.py @@ -6,6 +6,7 @@ from itertools import islice from typing import Any +from secops.chronicle.utils.format_utils import remove_none_values from secops.chronicle.utils.request_utils import ( chronicle_paginated_request, chronicle_request, @@ -376,15 +377,17 @@ def list_data_tables( Raises: APIError: If the API request fails """ - extra_params = {} - if order_by: - extra_params["orderBy"] = order_by + extra_params = remove_none_values( + { + "orderBy": order_by, + } + ) return chronicle_paginated_request( client, path="dataTables", items_key="dataTables", - extra_params=extra_params if extra_params else None, + extra_params=extra_params or None, as_list=as_list, ) @@ -414,15 +417,17 @@ def list_data_table_rows( Raises: APIError: If the API request fails """ - extra_params = {} - if order_by: - extra_params["orderBy"] = order_by + extra_params = remove_none_values( + { + "orderBy": order_by, + } + ) return chronicle_paginated_request( client, path=f"dataTables/{name}/dataTableRows", items_key="dataTableRows", - extra_params=extra_params if extra_params else None, + extra_params=extra_params or None, as_list=as_list, ) @@ -460,24 +465,25 @@ def update_data_table( "numbers, and underscores, and has length < 256 characters." ) - # Prepare request body - body_payload = {} - if description is not None: - body_payload["description"] = description - if row_time_to_live is not None: - body_payload["row_time_to_live"] = row_time_to_live + body_payload = remove_none_values( + { + "description": description, + "row_time_to_live": row_time_to_live, + } + ) - # Prepare query parameters - params = {} - if update_mask: - params["updateMask"] = ",".join(update_mask) + params = remove_none_values( + { + "updateMask": ",".join(update_mask) if update_mask else None, + } + ) return chronicle_request( client, method="PATCH", endpoint_path=f"dataTables/{name}", - params=params if params else None, - json=body_payload, + params=params or None, + json=body_payload or None, error_message=f"Failed to update data table '{name}'", ) diff --git a/src/secops/chronicle/entity.py b/src/secops/chronicle/entity.py index 8e4af5df..192a919f 100644 --- a/src/secops/chronicle/entity.py +++ b/src/secops/chronicle/entity.py @@ -15,6 +15,7 @@ """ Provides entity search, analysis and summarization functionality for Chronicle. """ + import ipaddress import re from datetime import datetime @@ -35,6 +36,7 @@ TimelineBucket, WidgetMetadata, ) +from secops.chronicle.utils.format_utils import remove_none_values from secops.chronicle.utils.request_utils import chronicle_request from secops.exceptions import APIError @@ -169,17 +171,18 @@ def _summarize_entity_by_id( Raises: APIError: If API request fails. """ - params = { - "entityId": entity_id, - "timeRange.startTime": start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "timeRange.endTime": end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "returnAlerts": return_alerts, - "returnPrevalence": return_prevalence, - "includeAllUdmEventTypesForFirstLastSeen": include_all_udm_types, - "pageSize": page_size, - } - if page_token: - params["pageToken"] = page_token + params = remove_none_values( + { + "entityId": entity_id, + "timeRange.startTime": start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "timeRange.endTime": end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "returnAlerts": return_alerts, + "returnPrevalence": return_prevalence, + "includeAllUdmEventTypesForFirstLastSeen": include_all_udm_types, + "pageSize": page_size, + "pageToken": page_token, + } + ) return chronicle_request( client, diff --git a/src/secops/chronicle/feeds.py b/src/secops/chronicle/feeds.py index e8191635..f452122b 100644 --- a/src/secops/chronicle/feeds.py +++ b/src/secops/chronicle/feeds.py @@ -15,6 +15,7 @@ """ Provides ingestion feed management functionality for Chronicle. """ + import json import os import sys diff --git a/src/secops/chronicle/gemini.py b/src/secops/chronicle/gemini.py index 381dd51e..15009a0e 100644 --- a/src/secops/chronicle/gemini.py +++ b/src/secops/chronicle/gemini.py @@ -16,6 +16,7 @@ Provides access to Chronicle's Gemini conversational AI interface. """ + import re from typing import Any diff --git a/src/secops/chronicle/investigations.py b/src/secops/chronicle/investigations.py index 201b77e3..5ee8731e 100644 --- a/src/secops/chronicle/investigations.py +++ b/src/secops/chronicle/investigations.py @@ -17,6 +17,10 @@ from typing import Any from secops.chronicle.models import APIVersion, DetectionType +from secops.chronicle.utils.format_utils import ( + format_resource_id, + remove_none_values, +) from secops.chronicle.utils.request_utils import ( chronicle_paginated_request, chronicle_request, @@ -71,26 +75,22 @@ def fetch_associated_investigations( f'Valid values: {", ".join(valid)}' ) from ke - params: dict[str, Any] = {"detectionType": detection_type} - - if alert_ids is not None: - params["alertIds"] = alert_ids - - if case_ids is not None: - params["caseIds"] = case_ids - - if association_limit_per_detection is not None: - params["associationLimitPerDetection"] = association_limit_per_detection - - if order_by: - params["orderBy"] = order_by + params = remove_none_values( + { + "detectionType": detection_type, + "alertIds": alert_ids, + "caseIds": case_ids, + "associationLimitPerDetection": association_limit_per_detection, + "orderBy": order_by, + } + ) return chronicle_request( client, method="GET", endpoint_path="investigations:fetchAssociated", api_version=APIVersion.V1ALPHA, - params=params, + params=params or None, error_message="Failed to fetch associated investigations", ) @@ -111,15 +111,12 @@ def get_investigation( Raises: APIError: If the API request fails. """ - if not investigation_id.startswith("projects/"): - endpoint_path = f"investigations/{investigation_id}" - else: - endpoint_path = investigation_id + inv_id = format_resource_id(investigation_id) return chronicle_request( client, method="GET", - endpoint_path=endpoint_path, + endpoint_path=f"investigations/{inv_id}", api_version=APIVersion.V1ALPHA, error_message="Failed to get investigation", ) @@ -158,11 +155,12 @@ def list_investigations( Raises: APIError: If the API request fails. """ - extra_params: dict[str, Any] = {} - if filter_expr: - extra_params["filter"] = filter_expr - if order_by: - extra_params["orderBy"] = order_by + extra_params = remove_none_values( + { + "filter": filter_expr, + "orderBy": order_by, + } + ) return chronicle_paginated_request( client, @@ -171,7 +169,7 @@ def list_investigations( api_version=APIVersion.V1ALPHA, page_size=page_size, page_token=page_token, - extra_params=extra_params if extra_params else None, + extra_params=extra_params or None, as_list=as_list, ) diff --git a/src/secops/chronicle/log_processing_pipelines.py b/src/secops/chronicle/log_processing_pipelines.py index aa9b15a9..9a48f1b7 100644 --- a/src/secops/chronicle/log_processing_pipelines.py +++ b/src/secops/chronicle/log_processing_pipelines.py @@ -16,7 +16,10 @@ from typing import Any -from secops.chronicle.utils.format_utils import format_resource_id +from secops.chronicle.utils.format_utils import ( + format_resource_id, + remove_none_values, +) from secops.chronicle.utils.request_utils import ( chronicle_request, chronicle_paginated_request, @@ -50,9 +53,7 @@ def list_log_processing_pipelines( Raises: APIError: If the API request fails. """ - extra_params = {} - if filter_expr: - extra_params["filter"] = filter_expr + extra_params = remove_none_values({"filter": filter_expr}) return chronicle_paginated_request( client, @@ -60,7 +61,7 @@ def list_log_processing_pipelines( items_key="logProcessingPipelines", page_size=page_size, page_token=page_token, - extra_params=extra_params if extra_params else None, + extra_params=extra_params or None, as_list=as_list, ) @@ -115,15 +116,17 @@ def create_log_processing_pipeline( Raises: APIError: If the API request fails. """ - params: dict[str, Any] = {} - if pipeline_id: - params["logProcessingPipelineId"] = pipeline_id + params = remove_none_values( + { + "logProcessingPipelineId": pipeline_id, + } + ) return chronicle_request( client, method="POST", endpoint_path="logProcessingPipelines", - params=params if params else None, + params=params or None, json=pipeline, error_message="Failed to create log processing pipeline", ) @@ -153,17 +156,18 @@ def update_log_processing_pipeline( APIError: If the API request fails. """ extracted_pipeline_id = format_resource_id(pipeline_id) - endpoint_path = f"logProcessingPipelines/{extracted_pipeline_id}" - params: dict[str, Any] = {} - if update_mask: - params["updateMask"] = update_mask + params = remove_none_values( + { + "updateMask": update_mask, + } + ) return chronicle_request( client, method="PATCH", - endpoint_path=endpoint_path, - params=params if params else None, + endpoint_path=f"logProcessingPipelines/{extracted_pipeline_id}", + params=params or None, json=pipeline, error_message="Failed to patch log processing pipeline", ) @@ -188,16 +192,17 @@ def delete_log_processing_pipeline( """ extracted_pipeline_id = format_resource_id(pipeline_id) - endpoint_path = f"logProcessingPipelines/{extracted_pipeline_id}" - params: dict[str, Any] = {} - if etag: - params["etag"] = etag + params = remove_none_values( + { + "etag": etag, + } + ) return chronicle_request( client, method="DELETE", - endpoint_path=endpoint_path, + endpoint_path=f"logProcessingPipelines/{extracted_pipeline_id}", params=params if params else None, error_message="Failed to delete log processing pipeline", ) @@ -321,9 +326,12 @@ def fetch_sample_logs_by_streams( Raises: APIError: If the API request fails. """ - body = {"streams": streams} - if sample_logs_count is not None: - body["sampleLogsCount"] = sample_logs_count + body = remove_none_values( + { + "streams": streams, + "sampleLogsCount": sample_logs_count, + } + ) return chronicle_request( client, diff --git a/src/secops/chronicle/log_search.py b/src/secops/chronicle/log_search.py index 7d7a6797..25486b14 100644 --- a/src/secops/chronicle/log_search.py +++ b/src/secops/chronicle/log_search.py @@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, Any from secops.chronicle.models import APIVersion +from secops.chronicle.utils.format_utils import remove_none_values from secops.chronicle.utils.request_utils import chronicle_request if TYPE_CHECKING: @@ -55,28 +56,24 @@ def search_raw_logs( Raises: APIError: If the API request fails. """ - search_query: dict[str, Any] = { - "baselineQuery": query, - "baselineTimeRange": { - "startTime": start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - "endTime": end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), - }, - "caseSensitive": case_sensitive, - } - - if snapshot_query: - search_query["snapshotQuery"] = snapshot_query + search_query: dict[str, Any] = remove_none_values( + { + "baselineQuery": query, + "baselineTimeRange": { + "startTime": start_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + "endTime": end_time.strftime("%Y-%m-%dT%H:%M:%S.%fZ"), + }, + "caseSensitive": case_sensitive, + "snapshotQuery": snapshot_query, + "maxAggregationsPerField": max_aggregations_per_field, + "pageSize": page_size, + } + ) if log_types: # The API expects a list of LogType objects, filtering by displayName search_query["logTypes"] = [{"displayName": lt} for lt in log_types] - if max_aggregations_per_field is not None: - search_query["maxAggregationsPerField"] = max_aggregations_per_field - - if page_size is not None: - search_query["pageSize"] = page_size - return chronicle_request( client, method="POST", diff --git a/src/secops/chronicle/models.py b/src/secops/chronicle/models.py index 5db56d27..1d6938e8 100644 --- a/src/secops/chronicle/models.py +++ b/src/secops/chronicle/models.py @@ -13,6 +13,7 @@ # limitations under the License. # """Data models for Chronicle API responses.""" + import json import sys from dataclasses import asdict, dataclass, field diff --git a/src/secops/chronicle/parser.py b/src/secops/chronicle/parser.py index 67c46a7d..f5f9f412 100644 --- a/src/secops/chronicle/parser.py +++ b/src/secops/chronicle/parser.py @@ -18,6 +18,7 @@ import json from typing import Any +from secops.chronicle.utils.format_utils import remove_none_values from secops.chronicle.utils.request_utils import ( chronicle_paginated_request, chronicle_request, @@ -261,9 +262,11 @@ def list_parsers( Raises: APIError: If the API request fails """ - extra_params = {} - if filter: - extra_params["filter"] = filter + extra_params = remove_none_values( + { + "filter": filter, + } + ) # For backward compatibility: if page_size is None, force as_list to True effective_as_list = True if page_size is None else as_list diff --git a/src/secops/chronicle/rule_alert.py b/src/secops/chronicle/rule_alert.py index b68c6ef5..56eb0270 100644 --- a/src/secops/chronicle/rule_alert.py +++ b/src/secops/chronicle/rule_alert.py @@ -17,6 +17,7 @@ from datetime import datetime from typing import Any, Literal +from secops.chronicle.utils.format_utils import remove_none_values from secops.chronicle.utils.request_utils import chronicle_request @@ -149,29 +150,21 @@ def update_alert( raise ValueError("severity must be between 0 and 100") # Build feedback dictionary with only provided values - feedback = {} - if confidence_score is not None: - feedback["confidence_score"] = confidence_score - if reason: - feedback["reason"] = reason - if reputation: - feedback["reputation"] = reputation - if priority: - feedback["priority"] = priority - if status: - feedback["status"] = status - if verdict: - feedback["verdict"] = verdict - if risk_score is not None: - feedback["risk_score"] = risk_score - if disregarded is not None: - feedback["disregarded"] = disregarded - if severity is not None: - feedback["severity"] = severity - if comment is not None: # Accept empty string - feedback["comment"] = comment - if root_cause is not None: # Accept empty string - feedback["root_cause"] = root_cause + feedback = remove_none_values( + { + "confidence_score": confidence_score, + "reason": reason, + "reputation": reputation, + "priority": priority, + "status": status, + "verdict": verdict, + "risk_score": risk_score, + "disregarded": disregarded, + "severity": severity, + "comment": comment, + "root_cause": root_cause, + } + ) # Check if at least one property is provided if not feedback: @@ -321,12 +314,13 @@ def search_rule_alerts( """ _ = (rule_status,) - params = { - "timeRange.start_time": start_time.isoformat(), - "timeRange.end_time": end_time.isoformat(), - } - if page_size: - params["maxNumAlertsToReturn"] = page_size + params = remove_none_values( + { + "timeRange.start_time": start_time.isoformat(), + "timeRange.end_time": end_time.isoformat(), + "maxNumAlertsToReturn": page_size, + } + ) return chronicle_request( client, diff --git a/src/secops/chronicle/rule_exclusion.py b/src/secops/chronicle/rule_exclusion.py index 19d369ff..c2a13ee0 100644 --- a/src/secops/chronicle/rule_exclusion.py +++ b/src/secops/chronicle/rule_exclusion.py @@ -20,7 +20,10 @@ from datetime import datetime from typing import Annotated, Any -from secops.chronicle.utils.format_utils import format_resource_id +from secops.chronicle.utils.format_utils import ( + format_resource_id, + remove_none_values, +) from secops.chronicle.utils.request_utils import ( chronicle_paginated_request, chronicle_request, @@ -129,12 +132,11 @@ def get_rule_exclusion(client, exclusion_id: str) -> dict[str, Any]: APIError: If the API request fails """ exclusion_id = format_resource_id(exclusion_id) - endpoint_path = f"findingsRefinements/{exclusion_id}" return chronicle_request( client, method="GET", - endpoint_path=endpoint_path, + endpoint_path=f"findingsRefinements/{exclusion_id}", error_message="Failed to get rule exclusion", ) @@ -202,24 +204,25 @@ def patch_rule_exclusion( APIError: If the API request fails """ exclusion_id = format_resource_id(exclusion_id) - endpoint_path = f"findingsRefinements/{exclusion_id}" - body = {} - if display_name: - body["display_name"] = display_name - if refinement_type: - body["type"] = refinement_type - if query: - body["query"] = query + body = remove_none_values( + { + "display_name": display_name, + "type": refinement_type, + "query": query, + } + ) - params = {} - if update_mask: - params["updateMask"] = update_mask + params = remove_none_values( + { + "updateMask": update_mask, + } + ) return chronicle_request( client, method="PATCH", - endpoint_path=endpoint_path, + endpoint_path=f"findingsRefinements/{exclusion_id}", params=params, json=body, error_message="Failed to update rule exclusion", @@ -293,12 +296,11 @@ def get_rule_exclusion_deployment(client, exclusion_id: str) -> dict[str, Any]: APIError: If the API request fails """ exclusion_id = format_resource_id(exclusion_id) - endpoint_path = f"findingsRefinements/{exclusion_id}/deployment" return chronicle_request( client, method="GET", - endpoint_path=endpoint_path, + endpoint_path=f"findingsRefinements/{exclusion_id}/deployment", error_message="Failed to get rule exclusion deployment", ) diff --git a/src/secops/chronicle/stats.py b/src/secops/chronicle/stats.py index 99b46309..42e31aba 100644 --- a/src/secops/chronicle/stats.py +++ b/src/secops/chronicle/stats.py @@ -13,6 +13,7 @@ # limitations under the License. # """Statistics functionality for Chronicle searches.""" + from datetime import datetime from typing import Any, TYPE_CHECKING diff --git a/src/secops/chronicle/udm_search.py b/src/secops/chronicle/udm_search.py index f825d01f..b1418713 100644 --- a/src/secops/chronicle/udm_search.py +++ b/src/secops/chronicle/udm_search.py @@ -19,6 +19,7 @@ from secops.exceptions import APIError from secops.chronicle.models import APIVersion +from secops.chronicle.utils.format_utils import remove_none_values from secops.chronicle.utils.request_utils import chronicle_request if TYPE_CHECKING: @@ -92,9 +93,12 @@ def find_udm_field_values( Raises: APIError: If the API request fails """ - params = {"query": query} - if page_size is not None: - params["pageSize"] = page_size + params = remove_none_values( + { + "query": query, + "pageSize": page_size, + } + ) return chronicle_request( client, diff --git a/src/secops/chronicle/utils/format_utils.py b/src/secops/chronicle/utils/format_utils.py index b6567528..c31f8a17 100644 --- a/src/secops/chronicle/utils/format_utils.py +++ b/src/secops/chronicle/utils/format_utils.py @@ -65,3 +65,8 @@ def parse_json_list( except ValueError as e: raise APIError(f"Invalid {field_name} JSON") from e return value + + +def remove_none_values(d: dict) -> dict: + """Remove keys with None values from dictionary.""" + return {k: v for k, v in d.items() if v is not None} diff --git a/tests/chronicle/test_case.py b/tests/chronicle/test_case.py index 110bc65c..0e2873d9 100644 --- a/tests/chronicle/test_case.py +++ b/tests/chronicle/test_case.py @@ -447,7 +447,7 @@ def test_get_case_with_full_name(chronicle_client, mock_case_data): mock_request.assert_called_once() call_args = mock_request.call_args - assert call_args[1]["endpoint_path"] == full_name + assert call_args[1]["endpoint_path"] == "cases/12345" assert isinstance(result, Case) assert result.id == "12345" @@ -742,7 +742,7 @@ def test_patch_case_with_full_name(chronicle_client, mock_case_data): mock_request.assert_called_once() call_args = mock_request.call_args - assert call_args[1]["endpoint_path"] == full_name + assert call_args[1]["endpoint_path"] == "cases/12345" assert isinstance(result, Case) diff --git a/tests/chronicle/test_investigations.py b/tests/chronicle/test_investigations.py index 3ce52029..6d0cad34 100644 --- a/tests/chronicle/test_investigations.py +++ b/tests/chronicle/test_investigations.py @@ -253,7 +253,8 @@ def test_get_investigation_with_full_resource_name( ): """Test get_investigation with full resource name.""" full_name = ( - "projects/123/locations/us/instances/456/investigations/" + "projects/test-project/locations/us/instances/" + "test-customer/investigations/" "82fb18cb-bfc0-4d7f-acf2-80508e145da2" ) mock_response.json.return_value = { diff --git a/tests/chronicle/utils/test_format_utils.py b/tests/chronicle/utils/test_format_utils.py index c71bda40..410dde6a 100644 --- a/tests/chronicle/utils/test_format_utils.py +++ b/tests/chronicle/utils/test_format_utils.py @@ -20,6 +20,7 @@ from secops.chronicle.utils.format_utils import ( format_resource_id, parse_json_list, + remove_none_values, ) from secops.exceptions import APIError @@ -98,3 +99,61 @@ def test_parse_json_list_handles_empty_json_array() -> None: def test_parse_json_list_handles_empty_list_input() -> None: result = parse_json_list([], "filters") assert result == [] + + +# --------------------------------------------------------------------------- +# remove_none_values +# --------------------------------------------------------------------------- + + +def test_remove_none_values_removes_none_entries() -> None: + # Keys whose value is None should be absent from the result. + result = remove_none_values({"a": 1, "b": None, "c": "hello"}) + assert result == {"a": 1, "c": "hello"} + + +def test_remove_none_values_keeps_all_entries_when_no_nones() -> None: + d = {"x": 0, "y": False, "z": ""} + assert remove_none_values(d) == {"x": 0, "y": False, "z": ""} + + +def test_remove_none_values_returns_empty_dict_when_all_none() -> None: + assert remove_none_values({"a": None, "b": None}) == {} + + +def test_remove_none_values_returns_empty_dict_for_empty_input() -> None: + assert remove_none_values({}) == {} + + +def test_remove_none_values_keeps_falsy_non_none_values() -> None: + # 0, False, and "" are falsy but not None — they must be preserved. + result = remove_none_values({"zero": 0, "false": False, "empty": "", "none": None}) + assert result == {"zero": 0, "false": False, "empty": ""} + + +def test_remove_none_values_keeps_nested_dicts_intact() -> None: + # The function is shallow — nested dicts are kept as-is even if they + # contain None values inside them. + inner = {"nested_none": None} + result = remove_none_values({"outer": inner, "gone": None}) + assert result == {"outer": inner} + assert result["outer"]["nested_none"] is None + + +def test_remove_none_values_does_not_mutate_original_dict() -> None: + original = {"a": 1, "b": None} + _ = remove_none_values(original) + assert "b" in original # original must be unchanged + + +def test_remove_none_values_returns_new_dict_object() -> None: + original = {"a": 1} + result = remove_none_values(original) + assert result is not original + + +def test_remove_none_values_with_list_and_dict_values() -> None: + # Lists and dicts as values should be preserved. + result = remove_none_values({"lst": [1, 2], "dct": {"k": "v"}, "gone": None}) + assert result == {"lst": [1, 2], "dct": {"k": "v"}} +