Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CLI.md
Original file line number Diff line number Diff line change
Expand Up @@ -552,6 +552,12 @@ secops parser list --log-type "OKTA" --page-size 50 --filter "state=ACTIVE"
secops parser get --log-type "WINDOWS" --id "pa_12345"
```

#### Fetch parser candidates:

```bash
secops parser fetch-candidates --log-type "WINDOWS_DHCP" --parser-action "PARSER_ACTION_OPT_IN_TO_PREVIEW"
```

#### Create a new parser:

```bash
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -1739,6 +1739,9 @@ print(f"Parser content: {parser.get('text')}")
chronicle.activate_parser(log_type=log_type, id=parser_id)
chronicle.deactivate_parser(log_type=log_type, id=parser_id)

# Fetch parser candidates (unactivated prebuilt parsers)
candidates = chronicle.fetch_parser_candidates(log_type=log_type, parser_action="PARSER_ACTION_OPT_IN_TO_PREVIEW")

# Copy an existing parser as a starting point
copied_parser = chronicle.copy_parser(log_type=log_type, id="pa_existing_parser")

Expand Down
27 changes: 27 additions & 0 deletions src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.
#
"""Chronicle API client."""

import ipaddress
import re
from collections.abc import Iterator
Expand Down Expand Up @@ -193,6 +194,9 @@
from secops.chronicle.parser import deactivate_parser as _deactivate_parser
from secops.chronicle.parser import delete_parser as _delete_parser
from secops.chronicle.parser import get_parser as _get_parser
from secops.chronicle.parser import (
fetch_parser_candidates as _fetch_parser_candidates,
)
from secops.chronicle.parser import list_parsers as _list_parsers
from secops.chronicle.parser import run_parser as _run_parser
from secops.chronicle.parser_extension import ParserExtensionConfig
Expand Down Expand Up @@ -2679,6 +2683,29 @@ def get_parser(
"""
return _get_parser(self, log_type=log_type, id=id)

def fetch_parser_candidates(
self,
log_type: str,
parser_action: str,
) -> list[Any]:
"""Retrieves prebuilt parsers candidates.

Args:
log_type: Log type of the parser
parser_action: Action to perform

Returns:
List of candidate parsers

Raises:
APIError: If the API request fails
"""
return _fetch_parser_candidates(
self,
log_type=log_type,
parser_action=parser_action,
)

def list_parsers(
self,
log_type: str = "-",
Expand Down
32 changes: 32 additions & 0 deletions src/secops/chronicle/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,38 @@ def get_parser(
return response.json()


def fetch_parser_candidates(
client: "ChronicleClient",
log_type: str,
parser_action: str,
) -> list[Any]:
"""Retrieves prebuilt parsers candidates.

Args:
client: ChronicleClient instance
log_type: Log type of the parser
parser_action: Action to perform

Returns:
List of candidate parsers

Raises:
APIError: If the API request fails
"""
url = (
f"{client.base_url}/{client.instance_id}"
f"/logTypes/{log_type}/parsers:fetchParserCandidates"
)

response = client.session.get(url, params={"parserAction": parser_action})

if response.status_code != 200:
raise APIError(f"Failed to fetch parser candidates: {response.text}")

data = response.json()
return data.get("candidates", [])


def list_parsers(
client: "ChronicleClient",
log_type: str = "-",
Expand Down
29 changes: 29 additions & 0 deletions src/secops/cli/commands/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,23 @@ def setup_parser_command(subparsers):
)
list_parsers_sub.set_defaults(func=handle_parser_list_command)

# --- Fetch Parser Candidates Command ---
fetch_parser_candidates_sub = parser_subparsers.add_parser(
"fetch-candidates", help="Fetch unactivated prebuilt parsers."
)
fetch_parser_candidates_sub.add_argument(
"--log-type", type=str, required=True, help="Log type of the parser."
)
fetch_parser_candidates_sub.add_argument(
"--parser-action",
type=str,
required=True,
help="Action for the parser candidates (e.g., CLONE_PREBUILT).",
)
fetch_parser_candidates_sub.set_defaults(
func=handle_parser_fetch_candidates_command
)

# --- Run Parser Command ---
run_parser_sub = parser_subparsers.add_parser(
"run",
Expand Down Expand Up @@ -313,6 +330,18 @@ def handle_parser_delete_command(args, chronicle):
sys.exit(1)


def handle_parser_fetch_candidates_command(args, chronicle):
"""Handle parser fetch-candidates command."""
try:
result = chronicle.fetch_parser_candidates(
args.log_type, args.parser_action
)
output_formatter(result, args.output)
except Exception as e: # pylint: disable=broad-exception-caught
print(f"Error fetching parser candidates: {e}", file=sys.stderr)
sys.exit(1)


def handle_parser_get_command(args, chronicle):
"""Handle parser get command."""
try:
Expand Down
50 changes: 50 additions & 0 deletions tests/chronicle/test_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
activate_parser,
activate_release_candidate_parser,
copy_parser,
fetch_parser_candidates,
create_parser,
deactivate_parser,
delete_parser,
Expand Down Expand Up @@ -139,6 +140,55 @@ def test_activate_release_candidate_parser_error(
assert "Failed to activate parser: Error message" in str(exc_info.value)


# --- fetch_parser_candidates Tests ---
def test_fetch_parser_candidates_success(chronicle_client, mock_response):
"""Test fetch_parser_candidates function for success."""
log_type = "SOME_LOG_TYPE"
parser_action = "PARSER_ACTION_OPT_IN_TO_PREVIEW"
expected_parsers = []
mock_response.json.return_value = {"parsers": expected_parsers}

with patch.object(
chronicle_client.session, "get", return_value=mock_response
) as mock_get:
result = fetch_parser_candidates(chronicle_client, log_type, parser_action)

expected_url = f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logTypes/{log_type}/parsers:fetchParserCandidates"
mock_get.assert_called_once_with(expected_url, params={"parserAction": parser_action})
assert result == expected_parsers


def test_fetch_parser_candidates_empty(chronicle_client, mock_response):
"""Test fetch_parser_candidates function when no parsers are returned."""
log_type = "EMPTY_LOG_TYPE"
parser_action = "PARSER_ACTION_OPT_IN_TO_PREVIEW"
mock_response.json.return_value = {}

with patch.object(
chronicle_client.session, "get", return_value=mock_response
) as mock_get:
result = fetch_parser_candidates(chronicle_client, log_type, parser_action)

expected_url = f"{chronicle_client.base_url}/{chronicle_client.instance_id}/logTypes/{log_type}/parsers:fetchParserCandidates"
mock_get.assert_called_once_with(expected_url, params={"parserAction": parser_action})
assert result == []


def test_fetch_parser_candidates_error(chronicle_client, mock_error_response):
"""Test fetch_parser_candidates function for API error."""
log_type = "ERROR_LOG_TYPE"
parser_action = "CLONE_PREBUILT"

with patch.object(
chronicle_client.session, "get", return_value=mock_error_response
):
with pytest.raises(APIError) as exc_info:
fetch_parser_candidates(chronicle_client, log_type, parser_action)
assert "Failed to fetch parser candidates: Error message" in str(
exc_info.value
)


# --- copy_parser Tests ---
def test_copy_parser_success(chronicle_client, mock_response):
"""Test copy_parser function for success."""
Expand Down