Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions api_module_mapping.md
Original file line number Diff line number Diff line change
Expand Up @@ -285,6 +285,8 @@ Following shows mapping between SecOps [REST Resource](https://cloud.google.com/
|logTypes.getLogTypeSetting |v1alpha| | |
|logTypes.legacySubmitParserExtension |v1alpha| | |
|logTypes.list |v1alpha| | |
|logTypes.getParserAnalysisReport |v1alpha|chronicle.parser_validation.get_analysis_report |secops log-type get-analysis-report |
|logTypes.triggerGitHubChecks |v1alpha|chronicle.parser_validation.trigger_github_checks |secops log-type trigger-checks |
|logTypes.logs.export |v1alpha| | |
|logTypes.logs.get |v1alpha| | |
|logTypes.logs.import |v1alpha|chronicle.log_ingest.ingest_log |secops log ingest |
Expand Down
42 changes: 42 additions & 0 deletions src/secops/chronicle/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,10 @@
create_watchlist as _create_watchlist,
update_watchlist as _update_watchlist,
)
from secops.chronicle.parser_validation import (
get_analysis_report as _get_analysis_report,
trigger_github_checks as _trigger_github_checks,
)
from secops.exceptions import SecOpsError


Expand Down Expand Up @@ -761,6 +765,44 @@ def update_watchlist(
update_mask,
)

def get_analysis_report(self, name: str) -> dict[str, Any]:
"""Get a parser analysis report.
Args:
name: The full resource name of the analysis report.
Returns:
Dictionary containing the analysis report.
Raises:
APIError: If the API request fails.
"""
return _get_analysis_report(self, name)

def trigger_github_checks(
self,
associated_pr: str,
log_type: str,
customer_id: str | None = None,
) -> dict[str, Any]:
"""Trigger GitHub checks for a parser.

Args:
associated_pr: The PR string (e.g., "owner/repo/pull/123").
log_type: The string name of the LogType enum.
customer_id: The customer UUID string.

Returns:
Dictionary containing the response details.

Raises:
SecOpsError: If gRPC modules or client stub are not available.
APIError: If the gRPC API request fails.
"""
return _trigger_github_checks(
self,
associated_pr=associated_pr,
log_type=log_type,
customer_id=customer_id,
)

def get_stats(
self,
query: str,
Expand Down
148 changes: 148 additions & 0 deletions src/secops/chronicle/parser_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""Chronicle parser validation functionality."""

from typing import TYPE_CHECKING, Any
import logging

from secops.exceptions import APIError, SecOpsError

if TYPE_CHECKING:
from secops.chronicle.client import ChronicleClient


def trigger_github_checks(
client: "ChronicleClient",
associated_pr: str,

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets add proper validations for the structure of this string, validate by breaking the path and number of substrings, valid pr number.

log_type: str,
customer_id: str | None = None,
timeout: int = 60,
) -> dict[str, Any]:
"""Trigger GitHub checks for a parser.

Args:
client: ChronicleClient instance
associated_pr: The PR string (e.g., "owner/repo/pull/123").
log_type: The string name of the LogType enum.
customer_id: Optional. The customer UUID string. Defaults to client
configured ID.
timeout: Optional RPC timeout in seconds (default: 60).

Returns:
Dictionary containing the response details.

Raises:
SecOpsError: If input is invalid.
APIError: If the API request fails.
"""
if not isinstance(log_type, str) or len(log_type.strip()) < 2:
raise SecOpsError("log_type must be a valid string of length >= 2")
if customer_id is not None:
if not isinstance(customer_id, str) or len(customer_id.strip()) < 2:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets use pattern matching for Customer Id as its a UUID

raise SecOpsError(
"customer_id must be a valid string of length >= 2"
)
if not isinstance(associated_pr, str) or not associated_pr.strip():
raise SecOpsError("associated_pr must be a non-empty string")
if not isinstance(timeout, int) or timeout < 0:
raise SecOpsError("timeout must be a non-negative integer")

eff_customer_id = customer_id or client.customer_id
instance_id = client.instance_id
if eff_customer_id and eff_customer_id != client.customer_id:
# Dev and staging use 'us' as the location
region = "us" if client.region in ["dev", "staging"] else client.region
instance_id = (
f"projects/{client.project_id}/locations/"
f"{region}/instances/{eff_customer_id}"
)

# The backend expects the resource name to be in the format:
# projects/*/locations/*/instances/*/logTypes/*/parsers/<UUID>
base_url = client.base_url(version="v1alpha")

# First get the list of parsers for this log_type to find a valid
# parser UUID
parsers_url = f"{base_url}/{instance_id}/logTypes/{log_type}/parsers"
parsers_resp = client.session.get(parsers_url, timeout=timeout)

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't we use the some information from metadata.json file instead of fetching the parsers for this logtype and then using the first available name?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we are expecting log_type from the cli command

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correct.

if not parsers_resp.ok:
raise APIError(
f"Failed to fetch parsers for log type {log_type}: "
f"{parsers_resp.text}"
)

parsers_data = parsers_resp.json()
parsers = parsers_data.get("parsers")
if not parsers:
logging.info(
"No parsers found for log type %s. Using fallback parser ID.",
log_type,
)
parser_name = f"{instance_id}/logTypes/{log_type}/parsers/-"
else:
if len(parsers) > 1:
logging.warning(
"Multiple parsers found for log type %s. Using the first one.",
log_type,
)

# Use the first parser's name (which includes the UUID)
parser_name = parsers[0]["name"]

url = f"{base_url}/{parser_name}:runAnalysis"
payload = {
"report_type": "GITHUB_PARSER_VALIDATION",
"pull_request": associated_pr,
}

response = client.session.post(url, json=payload, timeout=timeout)

if not response.ok:
raise APIError(f"API call failed: {response.text}")

return response.json()


def get_analysis_report(
client: "ChronicleClient",
name: str,
timeout: int = 60,
) -> dict[str, Any]:
"""Get a parser analysis report.
Args:
client: ChronicleClient instance
name: The full resource name of the analysis report.
timeout: Optional timeout in seconds (default: 60).
Returns:
Dictionary containing the analysis report.
Raises:
SecOpsError: If input is invalid.
APIError: If the API request fails.
"""
if not isinstance(name, str) or len(name.strip()) < 5:
raise SecOpsError("name must be a valid string")
if not isinstance(timeout, int) or timeout < 0:
raise SecOpsError("timeout must be a non-negative integer")

# The name includes 'projects/...', so we just append it to base_url
base_url = client.base_url(version="v1alpha")
url = f"{base_url}/{name}"

response = client.session.get(url, timeout=timeout)

if not response.ok:
raise APIError(f"API call failed: {response.text}")

return response.json()
2 changes: 2 additions & 0 deletions src/secops/cli/cli_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from secops.cli.commands.investigation import setup_investigation_command
from secops.cli.commands.iocs import setup_iocs_command
from secops.cli.commands.log import setup_log_command
from secops.cli.commands.log_type import setup_log_type_commands
from secops.cli.commands.log_processing import (
setup_log_processing_command,
)
Expand Down Expand Up @@ -168,6 +169,7 @@ def build_parser() -> argparse.ArgumentParser:
setup_investigation_command(subparsers)
setup_iocs_command(subparsers)
setup_log_command(subparsers)
setup_log_type_commands(subparsers)
setup_log_processing_command(subparsers)
setup_parser_command(subparsers)
setup_parser_extension_command(subparsers)
Expand Down
104 changes: 104 additions & 0 deletions src/secops/cli/commands/log_type.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
"""CLI for ParserValidationToolingService under Log Type command group"""

import sys

from secops.cli.utils.formatters import output_formatter
from secops.exceptions import APIError, SecOpsError


def setup_log_type_commands(subparsers):
"""Set up the log_type service commands for Parser Validation."""
log_type_parser = subparsers.add_parser(
"log-type", help="Log Type related operations (including Parser Validation)"
)

log_type_subparsers = log_type_parser.add_subparsers(
title="Log Type Commands",
dest="log_type_command",
help="Log Type sub-command to execute"
)

if sys.version_info >= (3, 7):
log_type_subparsers.required = True

log_type_parser.set_defaults(
func=lambda args, chronicle: log_type_parser.print_help()
)

# --- trigger-checks command ---
trigger_github_checks_parser = log_type_subparsers.add_parser(
"trigger-checks", help="Trigger GitHub checks for a parser"
)
trigger_github_checks_parser.add_argument(
"--associated-pr",
"--associated_pr",
required=True,
help='The PR string (e.g., "owner/repo/pull/123").'
)
trigger_github_checks_parser.add_argument(
"--log-type",
"--log_type",
required=True,
help='The string name of the LogType enum (e.g., "DUMMY_LOGTYPE").'
)
trigger_github_checks_parser.set_defaults(func=handle_trigger_checks_command)

# --- get-analysis-report command ---
get_report_parser = log_type_subparsers.add_parser(
"get-analysis-report", help="Get a parser analysis report"
)
get_report_parser.add_argument(
"--name",
required=True,
help="The full resource name of the analysis report."
)
get_report_parser.set_defaults(func=handle_get_analysis_report_command)


def handle_trigger_checks_command(args, chronicle):
"""Handle trigger checks command."""
try:
result = chronicle.trigger_github_checks(
associated_pr=args.associated_pr,
log_type=args.log_type,
)
output_formatter(result, args.output)
except APIError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
except SecOpsError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e: # pylint: disable=broad-exception-caught
print(f"Error triggering GitHub checks: {e}", file=sys.stderr)
sys.exit(1)


def handle_get_analysis_report_command(args, chronicle):
"""Handle get analysis report command."""
try:
result = chronicle.get_analysis_report(name=args.name)
output_formatter(result, args.output)
except APIError as e:
print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
except SecOpsError as e:

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When will this error come?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This SecOpsError is caught because chronicle.get_analysis_report(name=args.name) performs local input validation before sending the request. If the user passes an empty or invalid name
(e.g., less than 5 characters long), the internal client method raises a SecOpsError rather than an APIError.

print(f"Error: {e}", file=sys.stderr)
sys.exit(1)
except Exception as e: # pylint: disable=broad-exception-caught
print(f"Error fetching analysis report: {e}", file=sys.stderr)
sys.exit(1)
60 changes: 60 additions & 0 deletions tests/chronicle/test_client_parser_validation.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
"""Test parser validation methods on ChronicleClient."""

from unittest.mock import MagicMock
import pytest

from secops.chronicle.client import ChronicleClient


@pytest.fixture
def mock_client():
"""Create a mock ChronicleClient."""
client = ChronicleClient(
project_id="test-project",
customer_id="test-customer",
auth=MagicMock(),
)
# Mock the parser validation service stub
client.parser_validation_service_stub = MagicMock()
return client


def test_trigger_github_checks(mock_client, monkeypatch):
"""Test ChronicleClient.trigger_github_checks."""
# Mock the underlying implementation to avoid gRPC dependency in tests
mock_impl = MagicMock(return_value={"message": "Success", "details": "Started"})
monkeypatch.setattr(
"secops.chronicle.client._trigger_github_checks", mock_impl
)

result = mock_client.trigger_github_checks(
associated_pr="owner/repo/pull/123",
log_type="DUMMY_LOGTYPE",
)

assert result == {"message": "Success", "details": "Started"}
mock_impl.assert_called_once_with(
mock_client,
associated_pr="owner/repo/pull/123",
log_type="DUMMY_LOGTYPE",
customer_id=None,
)


def test_get_analysis_report(mock_client, monkeypatch):
"""Test ChronicleClient.get_analysis_report."""
# Mock the underlying implementation
mock_impl = MagicMock(return_value={"reportId": "123"})
monkeypatch.setattr(
"secops.chronicle.client._get_analysis_report", mock_impl
)

result = mock_client.get_analysis_report(
name="projects/test/locations/us/instances/test/logTypes/DEF/parsers/XYZ/analysisReports/123"
)

assert result == {"reportId": "123"}
mock_impl.assert_called_once_with(
mock_client,
"projects/test/locations/us/instances/test/logTypes/DEF/parsers/XYZ/analysisReports/123",
)
Loading