Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 45 additions & 49 deletions src/secops/chronicle/case.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,10 @@
CaseList,
CasePriority,
)
from secops.chronicle.utils.format_utils import (
format_resource_id,
remove_none_values,
)
from secops.chronicle.utils.request_utils import (
chronicle_paginated_request,
chronicle_request,
Expand Down Expand Up @@ -58,32 +62,29 @@ def get_cases(
Raises:
APIError: If the API request fails
"""
params: dict[str, Any] = {"pageSize": str(page_size)}

if page_token:
params["pageToken"] = page_token
params = remove_none_values(
{
"pageSize": str(page_size),
"pageToken": page_token,
"tenantId": tenant_id,
}
)

if start_time:
params["createTime.startTime"] = start_time.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)

if end_time:
params["createTime.endTime"] = end_time.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
)

if case_ids:
for case_id in case_ids:
params["caseId"] = case_id

if asset_identifiers:
for asset in asset_identifiers:
params["assetId"] = asset

if tenant_id:
params["tenantId"] = tenant_id

return chronicle_request(
client,
method="GET",
Expand Down Expand Up @@ -296,17 +297,15 @@ def execute_bulk_close(
f"Valid values: {valid_values}"
) from ve

body: dict[str, Any] = {
"casesIds": case_ids,
"closeReason": close_reason,
}

if root_cause is not None:
body["rootCause"] = root_cause
if close_comment is not None:
body["closeComment"] = close_comment
if dynamic_parameters is not None:
body["dynamicParameters"] = dynamic_parameters
body = remove_none_values(
{
"casesIds": case_ids,
"closeReason": close_reason,
"rootCause": root_cause,
"closeComment": close_comment,
"dynamicParameters": dynamic_parameters,
}
)

return chronicle_request(
client,
Expand Down Expand Up @@ -363,21 +362,20 @@ def get_case(client, case_name: str, expand: str | None = None) -> Case:
Raises:
APIError: If the API request fails
"""
if not case_name.startswith("projects/"):
endpoint_path = f"cases/{case_name}"
else:
endpoint_path = case_name
endpoint_path = format_resource_id(case_name)

params: dict[str, Any] = {}
if expand:
params["expand"] = expand
params = remove_none_values(
{
"expand": expand,
}
)

data = chronicle_request(
client,
method="GET",
endpoint_path=endpoint_path,
endpoint_path=f"cases/{endpoint_path}",
api_version=APIVersion.V1BETA,
params=params,
params=params or None,
error_message="Failed to get case",
)

Expand Down Expand Up @@ -418,15 +416,14 @@ def list_cases(
Raises:
APIError: If the API request fails
"""
extra_params: dict[str, Any] = {}
if filter_query:
extra_params["filter"] = filter_query
if order_by:
extra_params["orderBy"] = order_by
if expand:
extra_params["expand"] = expand
if distinct_by:
extra_params["distinctBy"] = distinct_by
extra_params = remove_none_values(
{
"filter": filter_query,
"orderBy": order_by,
"expand": expand,
"distinctBy": distinct_by,
}
)

return chronicle_paginated_request(
client,
Expand All @@ -435,7 +432,7 @@ def list_cases(
items_key="cases",
page_size=page_size,
page_token=page_token,
extra_params=extra_params if extra_params else None,
extra_params=extra_params or None,
as_list=as_list,
)

Expand Down Expand Up @@ -500,10 +497,7 @@ def patch_case(
APIError: If the API request fails
ValueError: If an invalid priority value is provided
"""
if not case_name.startswith("projects/"):
endpoint_path = f"cases/{case_name}"
else:
endpoint_path = case_name
endpoint_path = format_resource_id(case_name)

if "priority" in case_data and isinstance(case_data["priority"], str):
case_priority = case_data["priority"]
Expand All @@ -519,17 +513,19 @@ def patch_case(
f"Valid values: {valid_values}"
) from ve

params: dict[str, Any] = {}
if update_mask:
params["updateMask"] = update_mask
params = remove_none_values(
{
"updateMask": update_mask,
}
)

data = chronicle_request(
client,
method="PATCH",
endpoint_path=endpoint_path,
endpoint_path=f"cases/{endpoint_path}",
api_version=APIVersion.V1BETA,
json=case_data,
params=params if params else None,
params=params or None,
error_message="Failed to patch case",
)

Expand Down
1 change: 1 addition & 0 deletions src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
#
"""Chronicle API client."""

import ipaddress
import re
from collections.abc import Iterator
Expand Down
30 changes: 16 additions & 14 deletions src/secops/chronicle/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from secops.chronicle.utils.format_utils import (
format_resource_id,
parse_json_list,
remove_none_values,
)

if TYPE_CHECKING:
Expand Down Expand Up @@ -105,21 +106,22 @@ def create_dashboard(
if charts is not None:
charts = parse_json_list(charts, "charts")

definition = {}
if filters is not None:
definition["filters"] = filters
if charts is not None:
definition["charts"] = charts

payload = {
"displayName": display_name,
"definition": definition,
"access": access_type.value,
"type": "CUSTOM",
}
definition = remove_none_values(
{
"filters": filters,
"charts": charts,
}
)

if description is not None:
payload["description"] = description
payload = remove_none_values(
{
"displayName": display_name,
"definition": definition,
"access": access_type.value,
"type": "CUSTOM",
"description": description,
}
)

return chronicle_request(
client,
Expand Down
30 changes: 16 additions & 14 deletions src/secops/chronicle/dashboard_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,14 @@
import json
from typing import Any

from secops.exceptions import APIError
from secops.chronicle.models import InputInterval
from secops.chronicle.utils.request_utils import chronicle_request
from secops.exceptions import APIError
from secops.chronicle.utils.format_utils import (
format_resource_id,
parse_json_list,
remove_none_values,
)


def execute_query(
Expand All @@ -47,10 +52,6 @@ def execute_query(
try:
if isinstance(interval, str):
interval = json.loads(interval)
if filters and isinstance(filters, str):
filters = json.loads(filters)
if not isinstance(filters, list):
filters = [filters]
except ValueError as e:
raise APIError(
f"Failed to parse JSON. Must be a valid JSON string: {e}"
Expand All @@ -59,12 +60,16 @@ def execute_query(
if isinstance(interval, dict):
interval = InputInterval.from_dict(interval)

payload = {"query": {"query": query, "input": interval.to_dict()}}

if clear_cache is not None:
payload["clearCache"] = clear_cache
if filters:
payload["filters"] = filters
filters = parse_json_list(filters, "filters")

payload = remove_none_values(
{
"query": {"query": query, "input": interval.to_dict()},
"clearCache": clear_cache,
"filters": filters if filters else None,
}
)

return chronicle_request(
client,
Expand All @@ -85,12 +90,9 @@ def get_execute_query(client, query_id: str) -> dict[str, Any]:
Returns:
Dictionary containing query details
"""
if query_id.startswith("projects/"):
query_id = query_id.split("/")[-1]

return chronicle_request(
client,
method="GET",
endpoint_path=f"dashboardQueries/{query_id}",
endpoint_path=f"dashboardQueries/{format_resource_id(query_id)}",
error_message="Failed to get query",
)
11 changes: 7 additions & 4 deletions src/secops/chronicle/data_export.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
from datetime import datetime
from typing import Any

from secops.chronicle.utils.format_utils import remove_none_values
from secops.chronicle.utils.request_utils import (
chronicle_request,
chronicle_paginated_request,
Expand Down Expand Up @@ -438,16 +439,18 @@ def list_data_export(
export = chronicle.list_data_export()
```
"""
extra_params = {}
if filters:
extra_params["filter"] = filters
extra_params = remove_none_values(
{
"filter": filters,
}
)

return chronicle_paginated_request(
client,
path="dataExports",
items_key="dataExports",
page_size=page_size,
page_token=page_token,
extra_params=extra_params if extra_params else None,
extra_params=extra_params or None,
as_list=as_list,
)
Loading