diff --git a/src/taskgraph/transforms/chunking.py b/src/taskgraph/transforms/chunking.py index 6547708bd..ba596d7d7 100644 --- a/src/taskgraph/transforms/chunking.py +++ b/src/taskgraph/transforms/chunking.py @@ -2,30 +2,29 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. import copy -from typing import Optional from taskgraph.transforms.base import TransformSequence from taskgraph.util.schema import Schema from taskgraph.util.templates import substitute - -class ChunkConfig(Schema): - # The total number of chunks to split the task into. - total_chunks: int - # A list of fields that need to have `{this_chunk}` and/or - # `{total_chunks}` replaced in them. - substitution_fields: list[str] = [] - - -#: Schema for chunking transforms -class ChunkSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # `chunk` can be used to split one task into `total-chunks` - # tasks, substituting `this_chunk` and `total_chunks` into any - # fields in `substitution-fields`. - chunk: Optional[ChunkConfig] = None - - -CHUNK_SCHEMA = ChunkSchema +CHUNK_SCHEMA = Schema.from_dict( + { + # `chunk` can be used to split one task into `total-chunks` + # tasks, substituting `this_chunk` and `total_chunks` into any + # fields in `substitution-fields`. + "chunk": Schema.from_dict( + { + # The total number of chunks to split the task into. + "total-chunks": int, + # A list of fields that need to have `{this_chunk}` and/or + # `{total_chunks}` replaced in them. + "substitution-fields": (list[str], []), + }, + optional=True, + ), + }, + forbid_unknown_fields=False, +) transforms = TransformSequence() transforms.add_validate(CHUNK_SCHEMA) diff --git a/src/taskgraph/transforms/docker_image.py b/src/taskgraph/transforms/docker_image.py index 643be20e5..b1d94afe0 100644 --- a/src/taskgraph/transforms/docker_image.py +++ b/src/taskgraph/transforms/docker_image.py @@ -5,7 +5,6 @@ import logging import os import re -from typing import Optional import taskgraph from taskgraph.transforms.base import TransformSequence @@ -27,31 +26,32 @@ transforms = TransformSequence() - #: Schema for docker_image transforms -class DockerImageSchema(Schema): - # Name of the docker image. - name: str - # Name of the parent docker image. - parent: Optional[str] = None - # Treeherder symbol. - symbol: Optional[str] = None - # Relative path (from config.path) to the file the docker image was defined in. - task_from: Optional[str] = None - # Arguments to use for the Dockerfile. - args: Optional[dict[str, str]] = None - # Name of the docker image definition under taskcluster/docker, when - # different from the docker image name. - definition: Optional[str] = None - # List of package tasks this docker image depends on. - packages: Optional[list[str]] = None - # Information for indexing this build so its artifacts can be discovered. - index: Optional[IndexSchema] = None - # Whether this image should be cached based on inputs. - cache: Optional[bool] = None - - -docker_image_schema = DockerImageSchema +DOCKER_IMAGE_SCHEMA = Schema.from_dict( + { + # Name of the docker image. + "name": str, + # Name of the parent docker image. + "parent": (str, None), + # Treeherder symbol. + "symbol": (str, None), + # Relative path (from config.path) to the file the docker image was defined in. + "task-from": (str, None), + # Arguments to use for the Dockerfile. + "args": (dict[str, str], None), + # Name of the docker image definition under taskcluster/docker, when + # different from the docker image name. + "definition": (str, None), + # List of package tasks this docker image depends on. + "packages": (list[str], None), + # Information for indexing this build so its artifacts can be discovered. + "index": (IndexSchema, None), + # Whether this image should be cached based on inputs. + "cache": (bool, None), + }, +) + +docker_image_schema = DOCKER_IMAGE_SCHEMA transforms.add_validate(docker_image_schema) diff --git a/src/taskgraph/transforms/fetch.py b/src/taskgraph/transforms/fetch.py index 2258cb85a..e3508542f 100644 --- a/src/taskgraph/transforms/fetch.py +++ b/src/taskgraph/transforms/fetch.py @@ -21,36 +21,38 @@ CACHE_TYPE = "content.v1" - -class FetchSubSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # The fetch type - type: str - +_FETCH_SUB_SCHEMA = Schema.from_dict( + { + # The fetch type + "type": str, + }, + forbid_unknown_fields=False, +) #: Schema for fetch transforms -class FetchSchema(Schema): - # Name of the task. - name: str - # Description of the task. - description: str - # The fetch configuration - fetch: FetchSubSchema - # Relative path (from config.path) to the file the task was defined - # in. - task_from: Optional[str] = None - expires_after: Optional[str] = None - docker_image: Optional[object] = None - # An alias that can be used instead of the real fetch task name in - # fetch stanzas for tasks. - fetch_alias: Optional[str] = None - # The prefix of the taskcluster artifact being uploaded. - # Defaults to `public/`; if it starts with something other than - # `public/` the artifact will require scopes to access. - artifact_prefix: Optional[str] = None - attributes: Optional[dict[str, object]] = None - - -FETCH_SCHEMA = FetchSchema +FETCH_SCHEMA = Schema.from_dict( + { + # Name of the task. + "name": str, + # Description of the task. + "description": str, + # The fetch configuration + "fetch": _FETCH_SUB_SCHEMA, + # Relative path (from config.path) to the file the task was defined + # in. + "task-from": (str, None), + "expires-after": (str, None), + "docker-image": (object, None), + # An alias that can be used instead of the real fetch task name in + # fetch stanzas for tasks. + "fetch-alias": (str, None), + # The prefix of the taskcluster artifact being uploaded. + # Defaults to `public/`; if it starts with something other than + # `public/` the artifact will require scopes to access. + "artifact-prefix": (str, None), + "attributes": (dict[str, object], None), + }, +) # define a collection of payload builders, depending on the worker implementation fetch_builders = {} @@ -173,42 +175,48 @@ def make_task(config, tasks): yield task_desc -class GpgSignatureConfig(Schema): - # URL where GPG signature document can be obtained. Can contain the - # value ``{url}``, which will be substituted with the value from - # ``url``. - sig_url: str - # Path to file containing GPG public key(s) used to validate - # download. - key_path: str - - -class StaticUrlFetchSchema(Schema, forbid_unknown_fields=False, kw_only=True): - type: Literal["static-url"] - # The URL to download. - url: str - # The SHA-256 of the downloaded content. - sha256: str - # Size of the downloaded entity, in bytes. - size: int - # GPG signature verification. - gpg_signature: Optional[GpgSignatureConfig] = None - # The name to give to the generated artifact. Defaults to the file - # portion of the URL. Using a different extension converts the - # archive to the given type. Only conversion to .tar.zst is - # supported. - artifact_name: Optional[str] = None - # Strip the given number of path components at the beginning of - # each file entry in the archive. - # Requires an artifact-name ending with .tar.zst. - strip_components: Optional[int] = None - # Add the given prefix to each file entry in the archive. - # Requires an artifact-name ending with .tar.zst. - add_prefix: Optional[str] = None - # Headers to pass alongside the request. - headers: Optional[dict[str, str]] = None - # IMPORTANT: when adding anything that changes the behavior of the task, - # it is important to update the digest data used to compute cache hits. +_GPG_SIGNATURE_SCHEMA = Schema.from_dict( + { + # URL where GPG signature document can be obtained. Can contain the + # value ``{url}``, which will be substituted with the value from + # ``url``. + "sig-url": str, + # Path to file containing GPG public key(s) used to validate + # download. + "key-path": str, + }, +) + +StaticUrlFetchSchema = Schema.from_dict( + { + "type": Literal["static-url"], + # The URL to download. + "url": str, + # The SHA-256 of the downloaded content. + "sha256": str, + # Size of the downloaded entity, in bytes. + "size": int, + # GPG signature verification. + "gpg-signature": (_GPG_SIGNATURE_SCHEMA, None), + # The name to give to the generated artifact. Defaults to the file + # portion of the URL. Using a different extension converts the + # archive to the given type. Only conversion to .tar.zst is + # supported. + "artifact-name": (str, None), + # Strip the given number of path components at the beginning of + # each file entry in the archive. + # Requires an artifact-name ending with .tar.zst. + "strip-components": (int, None), + # Add the given prefix to each file entry in the archive. + # Requires an artifact-name ending with .tar.zst. + "add-prefix": (str, None), + # Headers to pass alongside the request. + "headers": (dict[str, str], None), + # IMPORTANT: when adding anything that changes the behavior of the task, + # it is important to update the digest data used to compute cache hits. + }, + forbid_unknown_fields=False, +) @fetch_builder("static-url", schema=StaticUrlFetchSchema) @@ -274,18 +282,22 @@ def create_fetch_url_task(config, name, fetch): } -class GitFetchSchema(Schema, forbid_unknown_fields=False, kw_only=True): - type: Literal["git"] - repo: str - revision: str - include_dot_git: Optional[bool] = None - artifact_name: Optional[str] = None - path_prefix: Optional[str] = None - # ssh-key is a taskcluster secret path (e.g. project/civet/github-deploy-key) - # In the secret dictionary, the key should be specified as - # "ssh_privkey": "-----BEGIN OPENSSH PRIVATE KEY-----\nkfksnb3jc..." - # n.b. The OpenSSH private key file format requires a newline at the end of the file. - ssh_key: Optional[str] = None +GitFetchSchema = Schema.from_dict( + { + "type": Literal["git"], + "repo": str, + "revision": str, + "include-dot-git": (bool, None), + "artifact-name": (str, None), + "path-prefix": (str, None), + # ssh-key is a taskcluster secret path (e.g. project/civet/github-deploy-key) + # In the secret dictionary, the key should be specified as + # "ssh_privkey": "-----BEGIN OPENSSH PRIVATE KEY-----\nkfksnb3jc..." + # n.b. The OpenSSH private key file format requires a newline at the end of the file. + "ssh-key": (str, None), + }, + forbid_unknown_fields=False, +) @fetch_builder("git", schema=GitFetchSchema) diff --git a/src/taskgraph/transforms/from_deps.py b/src/taskgraph/transforms/from_deps.py index 101579394..70bda2730 100644 --- a/src/taskgraph/transforms/from_deps.py +++ b/src/taskgraph/transforms/from_deps.py @@ -22,46 +22,46 @@ from taskgraph.util.schema import Schema, validate_schema from taskgraph.util.set_name import SET_NAME_MAP - -class FromDepsConfig(Schema): - # Limit dependencies to specified kinds (defaults to all kinds in - # `kind-dependencies`). - # - # The first kind in the list is the "primary" kind. The - # dependency of this kind will be used to derive the label - # and copy attributes (if `copy-attributes` is True). - kinds: Optional[list[str]] = None - # Set-name function (dynamic: validated at runtime against SET_NAME_MAP). - set_name: Optional[Union[bool, str, dict[str, object]]] = None - # Limit dependencies to tasks whose attributes match - # using :func:`~taskgraph.util.attributes.attrmatch`. - with_attributes: Optional[dict[str, Union[list, str]]] = None - # Group cross-kind dependencies using the given group-by - # function. One task will be created for each group. If not - # specified, the 'single' function will be used which creates - # a new task for each individual dependency. - group_by: Optional[Union[str, dict[str, object]]] = None - # If True, copy attributes from the dependency matching the - # first kind in the `kinds` list (whether specified explicitly - # or taken from `kind-dependencies`). - copy_attributes: Optional[bool] = None - # If true (the default), there must be only a single unique task - # for each kind in a dependency group. Setting this to false - # disables that requirement. - unique_kinds: Optional[bool] = None - # If present, a `fetches` entry will be added for each task - # dependency. Attributes of the upstream task may be used as - # substitution values in the `artifact` or `dest` values of the - # `fetches` entry. - fetches: Optional[dict[str, list[FetchesEntrySchema]]] = None - - -#: Schema for from_deps transforms -class FromDepsSchema(Schema, forbid_unknown_fields=False, kw_only=True): - from_deps: FromDepsConfig - - -FROM_DEPS_SCHEMA = FromDepsSchema +FROM_DEPS_SCHEMA = Schema.from_dict( + { + "from-deps": Schema.from_dict( + { + # Limit dependencies to specified kinds (defaults to all kinds in + # `kind-dependencies`). + # + # The first kind in the list is the "primary" kind. The + # dependency of this kind will be used to derive the label + # and copy attributes (if `copy-attributes` is True). + "kinds": Optional[list[str]], + # Set-name function (dynamic: validated at runtime against SET_NAME_MAP). + "set-name": Optional[Union[bool, str, dict[str, object]]], + # Limit dependencies to tasks whose attributes match + # using :func:`~taskgraph.util.attributes.attrmatch`. + "with-attributes": Optional[dict[str, Union[list, str]]], + # Group cross-kind dependencies using the given group-by + # function. One task will be created for each group. If not + # specified, the 'single' function will be used which creates + # a new task for each individual dependency. + "group-by": Optional[Union[str, dict[str, object]]], + # If True, copy attributes from the dependency matching the + # first kind in the `kinds` list (whether specified explicitly + # or taken from `kind-dependencies`). + "copy-attributes": Optional[bool], + # If true (the default), there must be only a single unique task + # for each kind in a dependency group. Setting this to false + # disables that requirement. + "unique-kinds": Optional[bool], + # If present, a `fetches` entry will be added for each task + # dependency. Attributes of the upstream task may be used as + # substitution values in the `artifact` or `dest` values of the + # `fetches` entry. + "fetches": Optional[dict[str, list[FetchesEntrySchema]]], + }, + ), + }, + name="FromDepsSchema", + forbid_unknown_fields=False, +) transforms = TransformSequence() transforms.add_validate(FROM_DEPS_SCHEMA) @@ -151,9 +151,9 @@ def from_deps(config, tasks): else: raise Exception("Could not detect primary kind!") - new_task.setdefault("attributes", {})["primary-kind-dependency"] = ( - primary_kind - ) + new_task.setdefault("attributes", {})[ + "primary-kind-dependency" + ] = primary_kind primary_dep = [dep for dep in group if dep.kind == primary_kind][0] new_task["attributes"]["primary-dependency-label"] = primary_dep.label diff --git a/src/taskgraph/transforms/matrix.py b/src/taskgraph/transforms/matrix.py index b3ac53f16..2a591f2e9 100644 --- a/src/taskgraph/transforms/matrix.py +++ b/src/taskgraph/transforms/matrix.py @@ -8,42 +8,43 @@ """ from copy import deepcopy -from typing import Optional from taskgraph.transforms.base import TransformSequence from taskgraph.util.schema import Schema from taskgraph.util.templates import substitute_task_fields - -class MatrixConfig(Schema, forbid_unknown_fields=False, kw_only=True): - # Exclude the specified combination(s) of matrix values from the - # final list of tasks. - # - # If only a subset of the possible rows are present in the - # exclusion rule, then *all* combinations including that subset - # subset will be excluded. - exclude: Optional[list[dict[str, str]]] = None - # Sets the task name to the specified format string. - # - # Useful for cases where the default of joining matrix values by - # a dash is not desired. - set_name: Optional[str] = None - # List of fields in the task definition to substitute matrix values into. - # - # If not specified, all fields in the task definition will be - # substituted. - substitution_fields: Optional[list[str]] = None - # Extra dimension keys (e.g. "platform": ["linux", "win"]) allowed - # via forbid_unknown_fields=False - - -#: Schema for matrix transforms -class MatrixSchema(Schema, forbid_unknown_fields=False, kw_only=True): - name: str - matrix: Optional[MatrixConfig] = None - - -MATRIX_SCHEMA = MatrixSchema +MATRIX_SCHEMA = Schema.from_dict( + { + "name": str, + # `matrix` holds the configuration for splitting tasks. + "matrix": Schema.from_dict( + { + # Exclude the specified combination(s) of matrix values from the + # final list of tasks. + # + # If only a subset of the possible rows are present in the + # exclusion rule, then *all* combinations including that subset + # subset will be excluded. + "exclude": (list[dict[str, str]], None), + # Sets the task name to the specified format string. + # + # Useful for cases where the default of joining matrix values by + # a dash is not desired. + "set-name": (str, None), + # List of fields in the task definition to substitute matrix values into. + # + # If not specified, all fields in the task definition will be + # substituted. + "substitution-fields": (list[str], None), + # Extra dimension keys (e.g. "platform": ["linux", "win"]) allowed + # via forbid_unknown_fields=False + }, + optional=True, + forbid_unknown_fields=False, + ), + }, + forbid_unknown_fields=False, +) transforms = TransformSequence() transforms.add_validate(MATRIX_SCHEMA) diff --git a/src/taskgraph/transforms/notify.py b/src/taskgraph/transforms/notify.py index 9f73d8d04..b986212eb 100644 --- a/src/taskgraph/transforms/notify.py +++ b/src/taskgraph/transforms/notify.py @@ -23,33 +23,86 @@ "on-running", ] - -class EmailRecipient(Schema, tag_field="type", tag="email", kw_only=True): - address: optionally_keyed_by("project", "level", str, use_msgspec=True) # type: ignore - status_type: Optional[StatusType] = None - - -class MatrixRoomRecipient(Schema, tag_field="type", tag="matrix-room", kw_only=True): - room_id: str - status_type: Optional[StatusType] = None - - -class PulseRecipient(Schema, tag_field="type", tag="pulse", kw_only=True): - routing_key: str - status_type: Optional[StatusType] = None - - -class SlackChannelRecipient( - Schema, tag_field="type", tag="slack-channel", kw_only=True -): - channel_id: str - status_type: Optional[StatusType] = None - - Recipient = Union[ - EmailRecipient, MatrixRoomRecipient, PulseRecipient, SlackChannelRecipient + Schema.from_dict( + { + "address": optionally_keyed_by("project", "level", str, use_msgspec=True), + "status-type": Optional[StatusType], + }, + name="EmailRecipient", + tag_field="type", + tag="email", + ), # type: ignore [invalid-type-form] + Schema.from_dict( + { + "room-id": str, + "status-type": Optional[StatusType], + }, + name="MatrixRoomRecipient", + tag_field="type", + tag="matrix-room", + ), # type: ignore [invalid-type-form] + Schema.from_dict( + { + "routing-key": str, + "status-type": Optional[StatusType], + }, + name="PulseRecipient", + tag_field="type", + tag="pulse", + ), # type: ignore [invalid-type-form] + Schema.from_dict( + { + "channel-id": str, + "status-type": Optional[StatusType], + }, + name="SlackChannelRecipient", + tag_field="type", + tag="slack-channel", + ), # type: ignore [invalid-type-form] ] +Content = Schema.from_dict( + { + "email": Schema.from_dict( + { + "subject": Optional[str], + "content": Optional[str], + "link": Schema.from_dict( + { + "text": str, + "href": str, + }, + optional=True, + ), + }, + name="EmailContent", + optional=True, + ), + "matrix": Schema.from_dict( + { + "body": Optional[str], + "formatted-body": Optional[str], + "format": Optional[str], + "msg-type": Optional[str], + }, + name="MatrixContent", + optional=True, + ), + "slack": Schema.from_dict( + { + "text": Optional[str], + "blocks": Optional[list], + "attachments": Optional[list], + }, + name="SlackContent", + optional=True, + ), + }, + optional=True, + name="NotifyContentConfig", +) + _route_keys = { "email": "address", @@ -60,60 +113,32 @@ class SlackChannelRecipient( """Map each type to its primary key that will be used in the route.""" -class EmailLinkContent(Schema): - text: str - href: str - - -class EmailContent(Schema): - subject: Optional[str] = None - content: Optional[str] = None - link: Optional[EmailLinkContent] = None - - -class MatrixContent(Schema): - body: Optional[str] = None - formatted_body: Optional[str] = None - format: Optional[str] = None - msg_type: Optional[str] = None - - -class SlackContent(Schema): - text: Optional[str] = None - blocks: Optional[list] = None - attachments: Optional[list] = None - - -class NotifyContentConfig(Schema): - email: Optional[EmailContent] = None - matrix: Optional[MatrixContent] = None - slack: Optional[SlackContent] = None - - -class NotifyConfig(Schema): - recipients: list[Recipient] - content: Optional[NotifyContentConfig] = None - - -class LegacyNotificationsConfig(Schema): - # Continue supporting the legacy schema for backwards compat. - emails: optionally_keyed_by("project", "level", list[str], use_msgspec=True) # type: ignore - subject: str - message: Optional[str] = None - status_types: Optional[list[StatusType]] = None - - #: Schema for notify transforms -class NotifySchema(Schema, forbid_unknown_fields=False, kw_only=True): - notify: Optional[NotifyConfig] = None - notifications: Optional[LegacyNotificationsConfig] = None - - def __post_init__(self): - if self.notify is not None and self.notifications is not None: - raise ValueError("'notify' and 'notifications' are mutually exclusive") - - -NOTIFY_SCHEMA = NotifySchema +NOTIFY_SCHEMA = Schema.from_dict( + { + "notify": Schema.from_dict( + { + "recipients": list[Recipient], + "content": Content, + }, + optional=True, + ), + # Continue supporting the legacy schema for backwards compat. + "notifications": Schema.from_dict( + { + "emails": optionally_keyed_by( + "project", "level", list[str], use_msgspec=True + ), + "subject": str, + "message": Optional[str], + "status-types": Optional[list[StatusType]], + }, + optional=True, + ), + }, + exclusive=[("notify", "notifications")], + forbid_unknown_fields=False, +) transforms = TransformSequence() transforms.add_validate(NOTIFY_SCHEMA) diff --git a/src/taskgraph/transforms/run/__init__.py b/src/taskgraph/transforms/run/__init__.py index e4ad08716..3322ee62b 100644 --- a/src/taskgraph/transforms/run/__init__.py +++ b/src/taskgraph/transforms/run/__init__.py @@ -33,25 +33,36 @@ # Fetches may be accepted in other transforms and eventually passed along # to a `task` (eg: from_deps). Defining this here allows them to reuse # the schema and avoid duplication. -class FetchesEntrySchema(Schema): - artifact: str - dest: Optional[str] = None - extract: Optional[bool] = None - verify_hash: Optional[bool] = None - - -class WhenConfig(Schema): - # This task only needs to be run if a file matching one of the given - # patterns has changed in the push. The patterns use the mozpack - # match function (python/mozbuild/mozpack/path.py). - files_changed: Optional[list[str]] = None +FetchesEntrySchema = Schema.from_dict( + { + "artifact": str, + "dest": Optional[str], + "extract": Optional[bool], + "verify-hash": Optional[bool], + }, + name="FetchesEntrySchema", +) +WhenConfig = Schema.from_dict( + { + # This task only needs to be run if a file matching one of the given + # patterns has changed in the push. The patterns use the mozpack + # match function (python/mozbuild/mozpack/path.py). + "files-changed": Optional[list[str]], + }, + name="WhenConfig", +) -class RunConfig(Schema, forbid_unknown_fields=False, kw_only=True): - # The key to a run implementation in a peer module to this one. - using: str - # Base work directory used to set up the task. - workdir: Optional[str] = None +RunConfig = Schema.from_dict( + { + # The key to a run implementation in a peer module to this one. + "using": str, + # Base work directory used to set up the task. + "workdir": Optional[str], + }, + name="RunConfig", + forbid_unknown_fields=False, +) #: Schema for a run transforms @@ -405,8 +416,10 @@ def wrap(func): return wrap -class AlwaysOptimizedRunSchema(Schema): - using: Literal["always-optimized"] +AlwaysOptimizedRunSchema = Schema.from_dict( + {"using": Literal["always-optimized"]}, + name="AlwaysOptimizedRunSchema", +) @run_task_using("always-optimized", "always-optimized", AlwaysOptimizedRunSchema) diff --git a/src/taskgraph/transforms/run/index_search.py b/src/taskgraph/transforms/run/index_search.py index fd3ada672..0b86dc644 100644 --- a/src/taskgraph/transforms/run/index_search.py +++ b/src/taskgraph/transforms/run/index_search.py @@ -18,14 +18,15 @@ #: Schema for run.using index-search -class IndexSearchRunSchema(Schema): - using: Literal["index-search"] - # A list of indexes in decreasing order of priority at which to lookup for this - # task. This is interpolated with the graph parameters. - index_search: list[str] - - -run_task_schema = IndexSearchRunSchema +run_task_schema = Schema.from_dict( + { + "using": Literal["index-search"], + # A list of indexes in decreasing order of priority at which to lookup for this + # task. This is interpolated with the graph parameters. + "index-search": list[str], + }, + name="IndexSearchRunSchema", +) @run_task_using("always-optimized", "index-search", schema=run_task_schema) diff --git a/src/taskgraph/transforms/run/run_task.py b/src/taskgraph/transforms/run/run_task.py index 7c3ccb7a6..3cafaf7b7 100644 --- a/src/taskgraph/transforms/run/run_task.py +++ b/src/taskgraph/transforms/run/run_task.py @@ -27,36 +27,38 @@ #: Schema for run.using run_task -class RunTaskRunSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # Specifies the task type. Must be 'run-task'. - using: Literal["run-task"] - # The command arguments to pass to the `run-task` script, after the checkout - # arguments. If a list, it will be passed directly; otherwise it will be - # included in a single argument to the command specified by `exec-with`. - command: Union[list[taskref_or_string_msgspec], taskref_or_string_msgspec] - # If true (the default), perform a checkout on the worker. Can also be a - # dictionary specifying explicit checkouts. - checkout: Union[bool, dict[str, dict]] - # Base work directory used to set up the task. - workdir: str - # Specifies which caches to use. May take a boolean in which case either all - # (True) or no (False) caches will be used. Alternatively, it can accept a - # list of caches to enable. Defaults to only the checkout cache enabled. - use_caches: Optional[Union[bool, list[CacheType]]] = None - # Path to run command in. If a checkout is present, the path to the checkout - # will be interpolated with the key `checkout`. - cwd: Optional[str] = None - # Specifies what to execute the command with in the event the command is a - # string. - exec_with: Optional[ExecWith] = None - # Command used to invoke the `run-task` script. Can be used if the script - # or Python installation is in a non-standard location on the workers. - run_task_command: Optional[list] = None - # Whether to run as root. Defaults to False. - run_as_root: Optional[bool] = None - - -run_task_schema = RunTaskRunSchema +run_task_schema = Schema.from_dict( + { + # Specifies the task type. Must be 'run-task'. + "using": Literal["run-task"], + # The command arguments to pass to the `run-task` script, after the checkout + # arguments. If a list, it will be passed directly; otherwise it will be + # included in a single argument to the command specified by `exec-with`. + "command": Union[list[taskref_or_string_msgspec], taskref_or_string_msgspec], + # If true (the default), perform a checkout on the worker. Can also be a + # dictionary specifying explicit checkouts. + "checkout": Union[bool, dict[str, dict]], + # Base work directory used to set up the task. + "workdir": str, + # Specifies which caches to use. May take a boolean in which case either all + # (True) or no (False) caches will be used. Alternatively, it can accept a + # list of caches to enable. Defaults to only the checkout cache enabled. + "use-caches": Optional[Union[bool, list[CacheType]]], + # Path to run command in. If a checkout is present, the path to the checkout + # will be interpolated with the key `checkout`. + "cwd": Optional[str], + # Specifies what to execute the command with in the event the command is a + # string. + "exec-with": Optional[ExecWith], + # Command used to invoke the `run-task` script. Can be used if the script + # or Python installation is in a non-standard location on the workers. + "run-task-command": Optional[list], + # Whether to run as root. Defaults to False. + "run-as-root": Optional[bool], + }, + name="RunTaskRunSchema", + forbid_unknown_fields=False, +) def common_setup(config, task, taskdesc, command): diff --git a/src/taskgraph/transforms/run/toolchain.py b/src/taskgraph/transforms/run/toolchain.py index 44067801f..3800d0643 100644 --- a/src/taskgraph/transforms/run/toolchain.py +++ b/src/taskgraph/transforms/run/toolchain.py @@ -23,29 +23,31 @@ #: Schema for run.using toolchain -class ToolchainRunSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # Specifies the run type. Must be "toolchain-script". - using: Literal["toolchain-script"] - # The script (in taskcluster/scripts/misc) to run. - script: str - # Path to the artifact produced by the toolchain task. - toolchain_artifact: str - # Base work directory used to set up the task. - workdir: str - # Arguments to pass to the script. - arguments: Optional[list[str]] = None - # Paths/patterns pointing to files that influence the outcome of - # a toolchain build. - resources: Optional[list[str]] = None - # An alias that can be used instead of the real toolchain task name in - # fetch stanzas for tasks. - toolchain_alias: Optional[Union[str, list[str]]] = None - # Additional env variables to add to the worker when using this - # toolchain. - toolchain_env: Optional[dict[str, object]] = None - - -toolchain_run_schema = ToolchainRunSchema +toolchain_run_schema = Schema.from_dict( + { + # Specifies the run type. Must be "toolchain-script". + "using": Literal["toolchain-script"], + # The script (in taskcluster/scripts/misc) to run. + "script": str, + # Path to the artifact produced by the toolchain task. + "toolchain-artifact": str, + # Base work directory used to set up the task. + "workdir": str, + # Arguments to pass to the script. + "arguments": Optional[list[str]], + # Paths/patterns pointing to files that influence the outcome of + # a toolchain build. + "resources": Optional[list[str]], + # An alias that can be used instead of the real toolchain task name in + # fetch stanzas for tasks. + "toolchain-alias": Optional[Union[str, list[str]]], + # Additional env variables to add to the worker when using this + # toolchain. + "toolchain-env": Optional[dict[str, object]], + }, + name="ToolchainRunSchema", + forbid_unknown_fields=False, +) def get_digest_data(config, run, taskdesc): diff --git a/src/taskgraph/transforms/task.py b/src/taskgraph/transforms/task.py index 53995a8ad..2c8192e03 100644 --- a/src/taskgraph/transforms/task.py +++ b/src/taskgraph/transforms/task.py @@ -50,9 +50,13 @@ def run_task_suffix(): return hash_path(RUN_TASK)[0:20] -class WorkerSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # The worker implementation type. - implementation: str +WORKER_SCHEMA = Schema.from_dict( + { + # The worker implementation type. + "implementation": str, + }, + forbid_unknown_fields=False, +) #: Schema for the task transforms @@ -134,7 +138,7 @@ class TaskDescriptionSchema(Schema): # the release promotion phase that this task belongs to. shipping_phase: Optional[Literal["build", "promote", "push", "ship"]] = None # Information specific to the worker implementation that will run this task. - worker: Optional[WorkerSchema] = None + worker: Optional[WORKER_SCHEMA] = None def __post_init__(self): if self.dependencies: @@ -241,58 +245,66 @@ def verify_index(config, index): DockerImage = Union[str, dict[str, str]] -class DockerWorkerCacheEntry(Schema): - # only one type is supported by any of the workers right now - type: Literal["persistent"] = "persistent" - # name of the cache, allowing reuse by subsequent tasks naming the same cache - name: Optional[str] = None - # location in the task image where the cache will be mounted - mount_point: Optional[str] = None - # Whether the cache is not used in untrusted environments (like the Try repo). - skip_untrusted: Optional[bool] = None - - -class DockerWorkerArtifact(Schema): - # type of artifact -- simple file, or recursive directory, or a volume mounted directory. - type: Optional[Literal["file", "directory", "volume"]] = None - # task image path from which to read artifact - path: Optional[str] = None - # name of the produced artifact (root of the names for type=directory) - name: Optional[str] = None - - -class DockerWorkerPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True): - implementation: Literal["docker-worker"] - os: Literal["linux"] - # For tasks that will run in docker-worker, this is the name of the docker - # image or in-tree docker image to run the task in. - docker_image: DockerImage - # worker features that should be enabled - relengapi_proxy: bool - chain_of_trust: bool - taskcluster_proxy: bool - allow_ptrace: bool - loopback_video: bool - # environment variables - env: dict[str, taskref_or_string_msgspec] - # the maximum time to run, in seconds - max_run_time: int - # Paths to Docker volumes. - volumes: Optional[list[str]] = None - # caches to set up for the task - caches: Optional[list[DockerWorkerCacheEntry]] = None - # artifacts to extract from the task image after completion - artifacts: Optional[list[DockerWorkerArtifact]] = None - # the command to run; if not given, docker-worker will default to the - # command in the docker image - command: Optional[list[taskref_or_string_msgspec]] = None - # the exit status code(s) that indicates the task should be retried - retry_exit_status: Optional[list[int]] = None - # the exit status code(s) that indicates the caches used by the task - # should be purged - purge_caches_exit_status: Optional[list[int]] = None - # Whether any artifacts are assigned to this worker - skip_artifacts: Optional[bool] = None +DOCKER_WORKER_CACHE_ENTRY = Schema.from_dict( + { + # only one type is supported by any of the workers right now + "type": (Literal["persistent"], "persistent"), + # name of the cache, allowing reuse by subsequent tasks naming the same cache + "name": (str, None), + # location in the task image where the cache will be mounted + "mount-point": (str, None), + # Whether the cache is not used in untrusted environments (like the Try repo). + "skip-untrusted": (bool, None), + }, +) + +DOCKER_WORKER_ARTIFACT = Schema.from_dict( + { + # type of artifact -- simple file, or recursive directory, or a volume mounted directory. + "type": (Optional[Literal["file", "directory", "volume"]], None), + # task image path from which to read artifact + "path": (str, None), + # name of the produced artifact (root of the names for type=directory) + "name": (str, None), + }, +) + +DockerWorkerPayloadSchema = Schema.from_dict( + { + "implementation": Literal["docker-worker"], + "os": Literal["linux"], + # For tasks that will run in docker-worker, this is the name of the docker + # image or in-tree docker image to run the task in. + "docker-image": DockerImage, + # worker features that should be enabled + "relengapi-proxy": bool, + "chain-of-trust": bool, + "taskcluster-proxy": bool, + "allow-ptrace": bool, + "loopback-video": bool, + # environment variables + "env": dict[str, taskref_or_string_msgspec], + # the maximum time to run, in seconds + "max-run-time": int, + # Paths to Docker volumes. + "volumes": (list[str], None), + # caches to set up for the task + "caches": (list[DOCKER_WORKER_CACHE_ENTRY], None), + # artifacts to extract from the task image after completion + "artifacts": (list[DOCKER_WORKER_ARTIFACT], None), + # the command to run; if not given, docker-worker will default to the + # command in the docker image + "command": (list[taskref_or_string_msgspec], None), + # the exit status code(s) that indicates the task should be retried + "retry-exit-status": (list[int], None), + # the exit status code(s) that indicates the caches used by the task + # should be purged + "purge-caches-exit-status": (list[int], None), + # Whether any artifacts are assigned to this worker + "skip-artifacts": (bool, None), + }, + forbid_unknown_fields=False, +) @payload_builder("docker-worker", schema=DockerWorkerPayloadSchema) @@ -497,69 +509,78 @@ def build_docker_worker_payload(config, task, task_def): payload["capabilities"] = capabilities -class GenericWorkerArtifact(Schema): - # type of artifact -- simple file, or recursive directory - type: Literal["file", "directory"] - # filesystem path from which to read artifact - path: str - # if not specified, path is used for artifact name - name: Optional[str] = None - - -class MountContentSchema(Schema): - # Artifact name that contains the content. - artifact: Optional[str] = None - # Task ID that has the artifact that contains the content. - task_id: Optional[taskref_or_string_msgspec] = None - # URL that supplies the content in response to an unauthenticated GET request. - url: Optional[str] = None - - -class MountSchema(Schema): - # A unique name for the cache volume. - cache_name: Optional[str] = None - # Optional content for pre-loading cache, or mandatory content for - # read-only file or directory. - content: Optional[MountContentSchema] = None - # If mounting a cache or read-only directory. - directory: Optional[str] = None - # If mounting a file. - file: Optional[str] = None - # Archive format of the content. - format: Optional[Literal["rar", "tar.bz2", "tar.gz", "zip"]] = None - - -class GenericWorkerPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True): - implementation: Literal["generic-worker"] - os: Literal["windows", "macosx", "linux", "linux-bitbar"] - # command is a list of commands to run, sequentially - # On Windows, each command is a string; on Linux/OS X, each command is a string array - command: list - # environment variables - env: dict[str, taskref_or_string_msgspec] - # the maximum time to run, in seconds - max_run_time: int - # optional features - chain_of_trust: bool - # artifacts to extract from the task image after completion - artifacts: Optional[list[GenericWorkerArtifact]] = None - # Directories and/or files to be mounted. - mounts: Optional[list[MountSchema]] = None - # the exit status code(s) that indicates the task should be retried - retry_exit_status: Optional[list[int]] = None - # the exit status code(s) that indicates the caches used by the task - # should be purged - purge_caches_exit_status: Optional[list[int]] = None - # os user groups for test task workers - os_groups: Optional[list[str]] = None - # feature for test task to run as administrator - run_as_administrator: Optional[bool] = None - # feature for task to run as current OS user - run_task_as_current_user: Optional[bool] = None - taskcluster_proxy: Optional[bool] = None - hide_cmd_window: Optional[bool] = None - # Whether any artifacts are assigned to this worker - skip_artifacts: Optional[bool] = None +GENERIC_WORKER_ARTIFACT = Schema.from_dict( + { + # type of artifact -- simple file, or recursive directory + "type": Literal["file", "directory"], + # filesystem path from which to read artifact + "path": str, + # if not specified, path is used for artifact name + "name": (str, None), + }, +) + +MOUNT_SCHEMA = Schema.from_dict( + { + # A unique name for the cache volume. + "cache-name": (str, None), + # Optional content for pre-loading cache, or mandatory content for + # read-only file or directory. + "content": Schema.from_dict( + { + # Artifact name that contains the content. + "artifact": (str, None), + # Task ID that has the artifact that contains the content. + "task-id": (taskref_or_string_msgspec, None), + # URL that supplies the content in response to an unauthenticated GET request. + "url": (str, None), + }, + optional=True, + ), + # If mounting a cache or read-only directory. + "directory": (str, None), + # If mounting a file. + "file": (str, None), + # Archive format of the content. + "format": (Optional[Literal["rar", "tar.bz2", "tar.gz", "zip"]], None), + }, +) + +GenericWorkerPayloadSchema = Schema.from_dict( + { + "implementation": Literal["generic-worker"], + "os": Literal["windows", "macosx", "linux", "linux-bitbar"], + # command is a list of commands to run, sequentially + # On Windows, each command is a string; on Linux/OS X, each command is a string array + "command": list, + # environment variables + "env": dict[str, taskref_or_string_msgspec], + # the maximum time to run, in seconds + "max-run-time": int, + # optional features + "chain-of-trust": bool, + # artifacts to extract from the task image after completion + "artifacts": (list[GENERIC_WORKER_ARTIFACT], None), + # Directories and/or files to be mounted. + "mounts": (list[MOUNT_SCHEMA], None), + # the exit status code(s) that indicates the task should be retried + "retry-exit-status": (list[int], None), + # the exit status code(s) that indicates the caches used by the task + # should be purged + "purge-caches-exit-status": (list[int], None), + # os user groups for test task workers + "os-groups": (list[str], None), + # feature for test task to run as administrator + "run-as-administrator": (bool, None), + # feature for task to run as current OS user + "run-task-as-current-user": (bool, None), + "taskcluster-proxy": (bool, None), + "hide-cmd-window": (bool, None), + # Whether any artifacts are assigned to this worker + "skip-artifacts": (bool, None), + }, + forbid_unknown_fields=False, +) @payload_builder("generic-worker", schema=GenericWorkerPayloadSchema) @@ -677,13 +698,16 @@ def build_generic_worker_payload(config, task, task_def): task_def["payload"]["features"] = features -class ReleaseProperties(Schema): - app_name: Optional[str] = None - app_version: Optional[str] = None - branch: Optional[str] = None - build_id: Optional[str] = None - hash_type: Optional[str] = None - platform: Optional[str] = None +RELEASE_PROPERTIES = Schema.from_dict( + { + "app-name": (str, None), + "app-version": (str, None), + "branch": (str, None), + "build-id": (str, None), + "hash-type": (str, None), + "platform": (str, None), + }, +) class UpstreamArtifact(Schema, rename="camel"): @@ -697,18 +721,22 @@ class UpstreamArtifact(Schema, rename="camel"): locale: str -class BeetmoverPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True): - implementation: Literal["beetmover"] - # the maximum time to run, in seconds - max_run_time: int - # release properties - release_properties: ReleaseProperties - # list of artifact URLs for the artifacts that should be beetmoved - upstream_artifacts: list[UpstreamArtifact] - # locale key, if this is a locale beetmover task - locale: Optional[str] = None - partner_public: Optional[bool] = None - artifact_map: Optional[object] = None +BeetmoverPayloadSchema = Schema.from_dict( + { + "implementation": Literal["beetmover"], + # the maximum time to run, in seconds + "max-run-time": int, + # release properties + "release-properties": RELEASE_PROPERTIES, + # list of artifact URLs for the artifacts that should be beetmoved + "upstream-artifacts": list[UpstreamArtifact], + # locale key, if this is a locale beetmover task + "locale": (str, None), + "partner-public": (bool, None), + "artifact-map": (object, None), + }, + forbid_unknown_fields=False, +) @payload_builder("beetmover", schema=BeetmoverPayloadSchema) @@ -737,10 +765,14 @@ def build_beetmover_payload(config, task, task_def): task_def["payload"]["is_partner_repack_public"] = worker["partner-public"] -class InvalidPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True): - # an invalid task is one which should never actually be created; this is used in - # release automation on branches where the task just doesn't make sense - implementation: Literal["invalid"] +InvalidPayloadSchema = Schema.from_dict( + { + # an invalid task is one which should never actually be created; this is used in + # release automation on branches where the task just doesn't make sense + "implementation": Literal["invalid"], + }, + forbid_unknown_fields=False, +) @payload_builder("invalid", schema=InvalidPayloadSchema) @@ -748,12 +780,18 @@ def build_invalid_payload(config, task, task_def): task_def["payload"] = "invalid task - should never be created" -class AlwaysOptimizedPayloadSchema(Schema, forbid_unknown_fields=False, kw_only=True): - implementation: Literal["always-optimized"] - +AlwaysOptimizedPayloadSchema = Schema.from_dict( + { + "implementation": Literal["always-optimized"], + }, + forbid_unknown_fields=False, +) -class SucceedPayloadSchema(Schema): - implementation: Literal["succeed"] +SucceedPayloadSchema = Schema.from_dict( + { + "implementation": Literal["succeed"], + }, +) @payload_builder("always-optimized", schema=AlwaysOptimizedPayloadSchema) diff --git a/src/taskgraph/transforms/task_context.py b/src/taskgraph/transforms/task_context.py index 5911bd52c..aff763f5f 100644 --- a/src/taskgraph/transforms/task_context.py +++ b/src/taskgraph/transforms/task_context.py @@ -1,55 +1,55 @@ -from typing import Optional, Union +from typing import Union from taskgraph.transforms.base import TransformSequence from taskgraph.util.schema import Schema from taskgraph.util.templates import deep_get, substitute_task_fields from taskgraph.util.yaml import load_yaml - -class TaskContextConfig(Schema): - # A list of fields in the task to substitute the provided values - # into. - substitution_fields: list[str] - # Retrieve task context values from parameters. A single - # parameter may be provided or a list of parameters in - # priority order. The latter can be useful in implementing a - # "default" value if some other parameter is not provided. - from_parameters: Optional[dict[str, Union[list[str], str]]] = None - # Retrieve task context values from a yaml file. The provided - # file should usually only contain top level keys and values - # (eg: nested objects will not be interpolated - they will be - # substituted as text representations of the object). - from_file: Optional[str] = None - # Key/value pairs to be used as task context - from_object: Optional[object] = None - - -#: Schema for the task_context transforms -class TaskContextSchema(Schema, forbid_unknown_fields=False, kw_only=True): - name: Optional[str] = None - # `task-context` can be used to substitute values into any field in a - # task with data that is not known until `taskgraph` runs. - # - # This data can be provided via `from-parameters` or `from-file`, - # which can pull in values from parameters and a defined yml file - # respectively. - # - # Data may also be provided directly in the `from-object` section of - # `task-context`. This can be useful in `kinds` that define most of - # their contents in `task-defaults`, but have some values that may - # differ for various concrete `tasks` in the `kind`. - # - # If the same key is found in multiple places the order of precedence - # is as follows: - # - Parameters - # - `from-object` keys - # - File - # - # That is to say: parameters will always override anything else. - task_context: Optional[TaskContextConfig] = None - - -SCHEMA = TaskContextSchema +SCHEMA = Schema.from_dict( + { + "name": (str, None), + # `task-context` can be used to substitute values into any field in a + # task with data that is not known until `taskgraph` runs. + # + # This data can be provided via `from-parameters` or `from-file`, + # which can pull in values from parameters and a defined yml file + # respectively. + # + # Data may also be provided directly in the `from-object` section of + # `task-context`. This can be useful in `kinds` that define most of + # their contents in `task-defaults`, but have some values that may + # differ for various concrete `tasks` in the `kind`. + # + # If the same key is found in multiple places the order of precedence + # is as follows: + # - Parameters + # - `from-object` keys + # - File + # + # That is to say: parameters will always override anything else. + "task-context": Schema.from_dict( + { + # A list of fields in the task to substitute the provided values + # into. + "substitution-fields": list[str], + # Retrieve task context values from parameters. A single + # parameter may be provided or a list of parameters in + # priority order. The latter can be useful in implementing a + # "default" value if some other parameter is not provided. + "from-parameters": (dict[str, Union[list[str], str]], None), + # Retrieve task context values from a yaml file. The provided + # file should usually only contain top level keys and values + # (eg: nested objects will not be interpolated - they will be + # substituted as text representations of the object). + "from-file": (str, None), + # Key/value pairs to be used as task context + "from-object": (object, None), + }, + optional=True, + ), + }, + forbid_unknown_fields=False, +) transforms = TransformSequence() transforms.add_validate(SCHEMA) diff --git a/src/taskgraph/util/schema.py b/src/taskgraph/util/schema.py index ad9a93e5a..45bfcf4ae 100644 --- a/src/taskgraph/util/schema.py +++ b/src/taskgraph/util/schema.py @@ -2,6 +2,7 @@ # License, v. 2.0. If a copy of the MPL was not distributed with this # file, You can obtain one at http://mozilla.org/MPL/2.0/. +import inspect import pprint import re import threading @@ -318,6 +319,11 @@ def __getitem__(self, item): return self.schema[item] # type: ignore +def _caller_module_name(depth=1): + frame = inspect.stack()[depth + 1].frame + return frame.f_globals.get("__name__", "schema") + + class Schema( msgspec.Struct, kw_only=True, @@ -345,6 +351,11 @@ class MySchema(Schema, forbid_unknown_fields=False, kw_only=True): foo: str """ + def __init_subclass__(cls, exclusive=None, **kwargs): + super().__init_subclass__(**kwargs) + if exclusive is not None: + cls.exclusive = exclusive + def __post_init__(self): if taskgraph.fast: return @@ -370,6 +381,74 @@ def __post_init__(self): keyed_by.validate(obj) + # Validate mutually exclusive field groups. + for group in getattr(self, "exclusive", []): + set_fields = [f for f in group if getattr(self, f) is not None] + if len(set_fields) > 1: + raise ValueError( + f"{' and '.join(repr(f) for f in set_fields)} are mutually exclusive" + ) + + @classmethod + def from_dict( + cls, + fields_dict: dict[str, Any], + name: Optional[str] = None, + optional: bool = False, + **kwargs, + ) -> Union[type[msgspec.Struct], type[Optional[msgspec.Struct]]]: + """Create a Schema subclass dynamically from a dict of field definitions. + + Each key is a field name and each value is either a type annotation or a + ``(type, default)`` tuple. Fields typed as ``Optional[...]`` automatically + receive a default of ``None`` when no explicit default is provided. + + Usage:: + + Schema.from_dict("MySchema", { + "required_field": str, + "optional_field": Optional[int], # default None inferred + "explicit_default": (list[str], []), # explicit default + }) + + Keyword arguments are forwarded to ``msgspec.defstruct`` (e.g. + ``forbid_unknown_fields=False``). + """ + # Don't use `rename=kebab` by default as we can define kebab case + # properly in dicts. + kwargs.setdefault("rename", None) + + # Ensure name and module are set correctly for error messages. + caller_module = _caller_module_name() + kwargs.setdefault("module", caller_module) + name = name or caller_module.rsplit(".", 1)[-1] + + fields = [] + for field_name, field_spec in fields_dict.items(): + python_name = field_name.replace("-", "_") + + if isinstance(field_spec, tuple): + typ, default = field_spec + else: + typ = field_spec + if get_origin(typ) is Union and type(None) in get_args(typ): + default = None + else: + default = msgspec.NODEFAULT + + if field_name != python_name: + # Use msgspec.field to preserve the kebab-case encoded name. + # Explicit field names take priority over the struct-level rename. + fields.append((python_name, typ, msgspec.field(name=field_name, default=default))) + else: + fields.append((python_name, typ, default)) + + exclusive = kwargs.pop("exclusive", None) + result = msgspec.defstruct(name, fields, bases=(cls,), **kwargs) + if exclusive: + result.exclusive = exclusive + return Optional[result] if optional else result # type: ignore[valid-type] + @classmethod def validate(cls, data): """Validate data against this schema."""