Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 42 additions & 14 deletions src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -1500,18 +1500,25 @@ def list_parser_extensions(
log_type: str,
page_size: int | None = None,
page_token: str | None = None,
) -> dict[str, Any]:
as_list: bool = False,
) -> dict[str, Any] | list[Any]:
"""List parser extensions.

Args:
log_type: The log type to list parser extensions for
page_size: Maximum number of parser extensions to return
page_token: Token for pagination
as_list: If True, return only the list of parser extensions.
If False, return dict with metadata and pagination tokens.

Returns:
Dict containing list of parser extensions and next page token if any
If as_list is True: List of parser extensions.
If as_list is False: Dict with parserExtensions list and
pagination metadata.
"""
return _list_parser_extensions(self, log_type, page_size, page_token)
return _list_parser_extensions(
self, log_type, page_size, page_token, as_list
)

def activate_parser_extension(
self, log_type: str, extension_id: str
Expand Down Expand Up @@ -1776,22 +1783,27 @@ def list_log_processing_pipelines(
page_size: int | None = None,
page_token: str | None = None,
filter_expr: str | None = None,
) -> dict[str, Any]:
as_list: bool = False,
) -> dict[str, Any] | list[Any]:
"""Lists log processing pipelines.

Args:
page_size: Maximum number of pipelines to return.
page_token: Page token for pagination.
filter_expr: Filter expression to restrict results.
as_list: If True, return only the list of pipelines.
If False, return dict with metadata and pagination tokens.

Returns:
Dictionary containing pipelines and pagination info.
If as_list is True: List of log processing pipelines.
If as_list is False: Dict with logProcessingPipelines list and
pagination metadata.

Raises:
APIError: If the API request fails.
"""
return _list_log_processing_pipelines(
self, page_size, page_token, filter_expr
self, page_size, page_token, filter_expr, as_list
)

def get_log_processing_pipeline(self, pipeline_id: str) -> dict[str, Any]:
Expand Down Expand Up @@ -2024,7 +2036,8 @@ def list_investigations(
page_token: str | None = None,
filter_expr: str | None = None,
order_by: str | None = None,
) -> dict[str, Any]:
as_list: bool = False,
) -> dict[str, Any] | list[Any]:
"""Lists investigations.

Args:
Expand All @@ -2036,16 +2049,19 @@ def list_investigations(
order_by: Ordering of investigations. Default is create time
descending. Supported fields: "startTime", "endTime",
"displayName".
as_list: If True, return only the list of investigations.
If False, return dict with metadata and pagination tokens.

Returns:
Dictionary containing investigations, next page token, and
total size.
If as_list is True: List of investigations.
If as_list is False: Dict with investigations list,
nextPageToken, and totalSize.

Raises:
APIError: If the API request fails.
"""
return _list_investigations(
self, page_size, page_token, filter_expr, order_by
self, page_size, page_token, filter_expr, order_by, as_list
)

def trigger_investigation(self, alert_id: str) -> dict[str, Any]:
Expand Down Expand Up @@ -2400,7 +2416,8 @@ def list_detections(
alert_state: str | None = None,
page_size: int | None = None,
page_token: str | None = None,
) -> dict[str, Any]:
as_list: bool = False,
) -> dict[str, Any] | list[Any]:
"""List detections for a rule.

Args:
Expand All @@ -2421,9 +2438,13 @@ def list_detections(
- "ALERTING"
page_size: If provided, maximum number of detections to return
page_token: If provided, continuation token for pagination
as_list: If True, return only the list of detections.
If False, return dict with metadata and pagination tokens.

Returns:
Dictionary containing detection information
If as_list is True: List of detections.
If as_list is False: Dict with detections list and
pagination metadata.

Raises:
APIError: If the API request fails
Expand All @@ -2438,6 +2459,7 @@ def list_detections(
alert_state,
page_size,
page_token,
as_list,
)

def list_errors(self, rule_id: str) -> dict[str, Any]:
Expand Down Expand Up @@ -3789,16 +3811,21 @@ def list_data_export(
filters: str | None = None,
page_size: int | None = None,
page_token: str | None = None,
) -> dict[str, Any]:
as_list: bool = False,
) -> dict[str, Any] | list[Any]:
"""List data export jobs.

Args:
filters: Filter string
page_size: Page size
page_token: Page token
as_list: If True, return only the list of data exports.
If False, return dict with metadata and pagination tokens.

Returns:
Dictionary containing data export list
If as_list is True: List of data exports.
If as_list is False: Dict with dataExports list and
pagination metadata.

Raises:
APIError: If the API request fails
Expand All @@ -3813,6 +3840,7 @@ def list_data_export(
filters=filters,
page_size=page_size,
page_token=page_token,
as_list=as_list,
)

# Data Table methods
Expand Down
36 changes: 14 additions & 22 deletions src/secops/chronicle/dashboard_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from typing import Any

from secops.chronicle.models import InputInterval
from secops.chronicle.utils.request_utils import chronicle_request
from secops.exceptions import APIError


Expand All @@ -43,8 +44,6 @@ def execute_query(
Returns:
Dictionary containing query results
"""
url = f"{client.base_url}/{client.instance_id}/dashboardQueries:execute"

try:
if isinstance(interval, str):
interval = json.loads(interval)
Expand All @@ -67,15 +66,13 @@ def execute_query(
if filters:
payload["filters"] = filters

response = client.session.post(url, json=payload)

if response.status_code != 200:
raise APIError(
f"Failed to execute query: Status {response.status_code}, "
f"Response: {response.text}"
)

return response.json()
return chronicle_request(
client,
method="POST",
endpoint_path="dashboardQueries:execute",
json=payload,
error_message="Failed to execute query",
)


def get_execute_query(client, query_id: str) -> dict[str, Any]:
Expand All @@ -91,14 +88,9 @@ def get_execute_query(client, query_id: str) -> dict[str, Any]:
if query_id.startswith("projects/"):
query_id = query_id.split("/")[-1]

url = f"{client.base_url}/{client.instance_id}/dashboardQueries/{query_id}"

response = client.session.get(url)

if response.status_code != 200:
raise APIError(
f"Failed to get query: Status {response.status_code}, "
f"Response: {response.text}"
)

return response.json()
return chronicle_request(
client,
method="GET",
endpoint_path=f"dashboardQueries/{query_id}",
error_message="Failed to get query",
)
Loading