Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions nodescraper/base/regexanalyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,24 @@
from nodescraper.models.event import Event


def _coerce_event_priority_from_dict(value: Union[str, int, EventPriority]) -> EventPriority:
"""Turn a string name, integer level, or already-coerced value into the canonical priority member.

Args:
value: Member name (case-insensitive), numeric level, or same-type value passthrough.

Returns:
Matching priority member for the configured level.
"""
if isinstance(value, EventPriority):
return value
if isinstance(value, int):
return EventPriority(value)
if isinstance(value, str):
return EventPriority[value.upper()]
raise TypeError(f"Invalid event_priority: {value!r}")


class ErrorRegex(BaseModel):
regex: re.Pattern
message: str
Expand Down Expand Up @@ -135,13 +153,13 @@ def _convert_and_extend_error_regex(
if isinstance(item, ErrorRegex):
converted_regex.append(item)
elif isinstance(item, dict):
# Convert dict to ErrorRegex
item["regex"] = re.compile(item["regex"])
if "event_category" in item:
item["event_category"] = EventCategory(item["event_category"])
if "event_priority" in item:
item["event_priority"] = EventPriority(item["event_priority"])
converted_regex.append(ErrorRegex(**item))
d = dict(item)
d["regex"] = re.compile(d["regex"])
if "event_category" in d:
d["event_category"] = EventCategory(d["event_category"])
if "event_priority" in d:
d["event_priority"] = _coerce_event_priority_from_dict(d["event_priority"])
converted_regex.append(ErrorRegex(**d))

return converted_regex + list(base_regex)

Expand Down
51 changes: 43 additions & 8 deletions nodescraper/cli/dynamicparserbuilder.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,40 @@
from nodescraper.typeutils import TypeUtils


def _help_from_annotated(anno: object) -> str:
"""Pull CLI help from ``Annotated[T, metadata...]`` (string or ``Field(description=...)``)."""
if anno is None or get_origin(anno) is not Annotated:
return ""
for meta in get_args(anno)[1:]:
if isinstance(meta, str):
return meta
desc = getattr(meta, "description", None)
if isinstance(desc, str) and desc.strip():
return desc
return ""


def _get_run_arg_help(plugin_class: Type[PluginInterface], arg: str) -> str:
"""Get help text for a run() parameter from typing.Annotated metadata on the parameter."""
try:
hints = get_type_hints(plugin_class.run, include_extras=True)
anno = hints.get(arg)
if anno is not None and get_origin(anno) is Annotated:
args = get_args(anno)
if len(args) >= 2 and isinstance(args[1], str):
return args[1]
run_obj = None
for cls in plugin_class.__mro__:
if "run" in cls.__dict__:
run_obj = cls.__dict__["run"]
break
if run_obj is None:
run_obj = plugin_class.run
run_fn = run_obj
if isinstance(run_obj, staticmethod):
run_fn = run_obj.__func__
elif isinstance(run_obj, classmethod):
run_fn = run_obj.__func__
raw = getattr(run_fn, "__annotations__", {}).get(arg)
text = _help_from_annotated(raw)
if text:
return text
hints = get_type_hints(run_fn, include_extras=True)
return _help_from_annotated(hints.get(arg))
except Exception:
pass
return ""
Expand Down Expand Up @@ -167,12 +192,22 @@ def add_argument(

if list in type_class_map:
type_class = type_class_map[list]
inner = type_class.inner_type
if inner is dict or get_origin(inner) is dict:
elt_type = dict_arg
metavar = META_VAR_MAP[dict]
elif inner is not None:
elt_type = inner
metavar = META_VAR_MAP.get(inner, "STRING")
else:
elt_type = str
metavar = "STRING"
self.parser.add_argument(
f"--{arg_name}",
nargs="*",
type=type_class.inner_type if type_class.inner_type else str,
type=elt_type,
required=required,
metavar=META_VAR_MAP.get(type_class.inner_type, "STRING"),
metavar=metavar,
**add_kw,
)
elif bool in type_class_map:
Expand Down
3 changes: 2 additions & 1 deletion nodescraper/cli/helper.py
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,8 @@ def process_args(
else:
cur_plugin = None
for arg in plugin_args:
if not arg.startswith("-") and "," in arg:
# Only split on commas before a plugin context is set (e.g. "P1,P2").
if not arg.startswith("-") and "," in arg and cur_plugin is None:
for potential_plugin in arg.split(","):
potential_plugin = potential_plugin.strip()
if potential_plugin in plugin_names:
Expand Down
9 changes: 8 additions & 1 deletion nodescraper/interfaces/dataplugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
from pathlib import Path
from typing import Annotated, Any, Generic, Optional, Type, Union

from pydantic import Field

from nodescraper.enums import EventPriority, ExecutionStatus, SystemInteractionLevel
from nodescraper.generictypes import TAnalyzeArg, TCollectArg, TDataModel
from nodescraper.interfaces.dataanalyzertask import DataAnalyzer
Expand Down Expand Up @@ -313,7 +315,12 @@ def run(
preserve_connection: bool = False,
data: Annotated[
Optional[Union[str, dict, TDataModel]],
"Path to pre-collected data (file or directory). Load this data instead of collecting; use with --collection False to run only the analyzer.",
Field(
description=(
"Path to pre-collected data"
"; use with --collection False to run the analyzer only."
),
),
] = None,
collection_args: Optional[Union[TCollectArg, dict]] = None,
analysis_args: Optional[Union[TAnalyzeArg, dict]] = None,
Expand Down
28 changes: 28 additions & 0 deletions nodescraper/plugins/regex_search/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
from .regex_search_plugin import RegexSearchPlugin

__all__ = ["RegexSearchPlugin"]
50 changes: 50 additions & 0 deletions nodescraper/plugins/regex_search/analyzer_args.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
from typing import Any, Optional

from pydantic import Field

from nodescraper.models import AnalyzerArgs


class RegexSearchAnalyzerArgs(AnalyzerArgs):
"""Arguments for RegexSearchAnalyzer (dict items match Dmesg-style error_regex)."""

error_regex: Optional[list[dict[str, Any]]] = Field(
default=None,
description=(
"Regex patterns to search for; each dict may include regex (str), message, "
"event_category, event_priority (same as Dmesg analyzer error_regex). "
),
)
interval_to_collapse_event: int = Field(
default=60,
description="Seconds within which repeated events are collapsed into one.",
)
num_timestamps: int = Field(
default=3,
description="Number of timestamps to include per event in output.",
)
102 changes: 102 additions & 0 deletions nodescraper/plugins/regex_search/regex_search_analyzer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
###############################################################################
#
# MIT License
#
# Copyright (c) 2026 Advanced Micro Devices, Inc.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
###############################################################################
import os
from typing import Optional, Union

from nodescraper.base.regexanalyzer import ErrorRegex, RegexAnalyzer, RegexEvent
from nodescraper.enums import ExecutionStatus
from nodescraper.models import TaskResult

from .analyzer_args import RegexSearchAnalyzerArgs
from .regex_search_data import RegexSearchData


class RegexSearchAnalyzer(RegexAnalyzer[RegexSearchData, RegexSearchAnalyzerArgs]):
"""Run user-provided regexes against text loaded from --data (file or directory)."""

DATA_MODEL = RegexSearchData

ERROR_REGEX: list[ErrorRegex] = []

def _build_regex_event(
self, regex_obj: ErrorRegex, match: Union[str, list[str]], source: str
) -> RegexEvent:
"""Augment the default event text with a file path when the origin is a concrete path.

Args:
regex_obj: Metadata for the rule that produced the match.
match: Substring or grouped capture text from the pattern.
source: Origin label, or an absolute path when matching per file.

Returns:
Match record with an extended description when a path-like source is present.
"""
event = super()._build_regex_event(regex_obj, match, source)
if source and source != "regex_search":
event.description = f"{regex_obj.message} [file: {source}]"
return event

def analyze_data(
self,
data: RegexSearchData,
args: Optional[RegexSearchAnalyzerArgs] = None,
) -> TaskResult:
"""Scan loaded inputs with the given patterns, or mark the task not run if inputs are incomplete.

Args:
data: Aggregated and per-file text loaded from the user data path.
args: Optional pattern list and timing knobs; omitted or empty patterns skip work.

Returns:
Work outcome with match events, or a not-run status when patterns are absent.
"""
if args is None or not args.error_regex:
self.result.status = ExecutionStatus.NOT_RAN
self.result.message = "Analysis args need to be provided for the analyzer to run"
return self.result

final_regex = self._convert_and_extend_error_regex(args.error_regex, [])

if data.files:
for rel_path in sorted(data.files.keys()):
file_content = data.files[rel_path]
abs_source = os.path.normpath(os.path.join(data.data_root, rel_path))
self.result.events += self.check_all_regexes(
content=file_content,
source=abs_source,
error_regex=final_regex,
num_timestamps=args.num_timestamps,
interval_to_collapse_event=args.interval_to_collapse_event,
)
else:
self.result.events += self.check_all_regexes(
content=data.content,
source=data.data_root or "regex_search",
error_regex=final_regex,
num_timestamps=args.num_timestamps,
interval_to_collapse_event=args.interval_to_collapse_event,
)
return self.result
Loading
Loading