taskgraph.transforms package#
Subpackages#
- taskgraph.transforms.run package
- Submodules
- taskgraph.transforms.run.common module
- taskgraph.transforms.run.index_search module
- taskgraph.transforms.run.run_task module
- taskgraph.transforms.run.toolchain module
- Module contents
AlwaysOptimizedRunSchemaFetchesEntrySchemaRunConfigRunDescriptionSchemaRunDescriptionSchema.always_targetRunDescriptionSchema.attributesRunDescriptionSchema.deadline_afterRunDescriptionSchema.dependenciesRunDescriptionSchema.descriptionRunDescriptionSchema.expires_afterRunDescriptionSchema.extraRunDescriptionSchema.fetchesRunDescriptionSchema.if_dependenciesRunDescriptionSchema.indexRunDescriptionSchema.labelRunDescriptionSchema.nameRunDescriptionSchema.needs_sccacheRunDescriptionSchema.optimizationRunDescriptionSchema.priorityRunDescriptionSchema.requiresRunDescriptionSchema.routesRunDescriptionSchema.runRunDescriptionSchema.run_on_git_branchesRunDescriptionSchema.run_on_projectsRunDescriptionSchema.run_on_tasks_forRunDescriptionSchema.scopesRunDescriptionSchema.shipping_phaseRunDescriptionSchema.soft_dependenciesRunDescriptionSchema.tagsRunDescriptionSchema.task_fromRunDescriptionSchema.treeherderRunDescriptionSchema.whenRunDescriptionSchema.workerRunDescriptionSchema.worker_type
WhenConfigalways_optimized()configure_taskdesc_for_run()get_attribute()make_task_description()rewrite_when_to_optimization()run_description_schemarun_task_using()set_implementation()set_label()use_fetches()
Submodules#
taskgraph.transforms.base module#
- class taskgraph.transforms.base.RepoConfig(prefix: str, name: str, base_repository: str, head_repository: str, head_ref: str, type: str, path: str = '', head_rev: str | None = None, ssh_secret_name: str | None = None)#
Bases:
object- base_repository: str#
- head_ref: str#
- head_repository: str#
- head_rev: str | None = None#
- name: str#
- path: str = ''#
- prefix: str#
- ssh_secret_name: str | None = None#
- type: str#
- class taskgraph.transforms.base.TransformConfig(kind: str, path: str, config: dict, params: Parameters, kind_dependencies_tasks: dict[str, Task], graph_config: GraphConfig, write_artifacts: bool)#
Bases:
objectA container for configuration affecting transforms. The config argument to transforms is an instance of this class.
- config: dict#
- graph_config: GraphConfig#
- kind: str#
- params: Parameters#
- path: str#
- property repo_configs#
- write_artifacts: bool#
- class taskgraph.transforms.base.TransformSequence(_transforms: list = <factory>)#
Bases:
objectContainer for a sequence of transforms. Each transform is represented as a callable taking (config, items) and returning a generator which will yield transformed items. The resulting sequence has the same interface.
This is convenient to use in a file full of transforms, as it provides a decorator, @transforms.add, that will add the decorated function to the sequence.
- add(func)#
- add_validate(schema)#
- class taskgraph.transforms.base.ValidateSchema(schema: taskgraph.util.schema.Schema)#
Bases:
object
taskgraph.transforms.from_deps module#
Transforms used to create tasks based on the kind dependencies, filtering on
common attributes like the build-type.
These transforms are useful when follow-up tasks are needed for some indeterminate subset of existing tasks. For example, running a signing task after each build task, whatever builds may exist.
- taskgraph.transforms.from_deps.FROM_DEPS_SCHEMA#
alias of
FromDepsSchema
- class taskgraph.transforms.from_deps.FromDepsConfig(kinds: list[str] | None = None, set_name: bool | str | dict[str, object] | None = None, with_attributes: dict[str, list | str] | None = None, group_by: str | dict[str, object] | None = None, copy_attributes: bool | None = None, unique_kinds: bool | None = None, fetches: dict[str, list[FetchesEntrySchema]] | None = None)#
Bases:
Schema- copy_attributes: bool | None#
- fetches: dict[str, list[FetchesEntrySchema]] | None#
- group_by: str | dict[str, object] | None#
- kinds: list[str] | None#
- set_name: bool | str | dict[str, object] | None#
- unique_kinds: bool | None#
- with_attributes: dict[str, list | str] | None#
- class taskgraph.transforms.from_deps.FromDepsSchema(*, from_deps: FromDepsConfig)#
Bases:
Schema- from_deps: FromDepsConfig#
- taskgraph.transforms.from_deps.from_deps(config, tasks)#
taskgraph.transforms.cached_tasks module#
- taskgraph.transforms.cached_tasks.cache_task(config, tasks)#
- taskgraph.transforms.cached_tasks.format_task_digest(cached_task)#
- taskgraph.transforms.cached_tasks.order_tasks(config, tasks)#
Iterate image tasks in an order where parent tasks come first.
taskgraph.transforms.chunking module#
- taskgraph.transforms.chunking.CHUNK_SCHEMA#
alias of
ChunkSchema
- class taskgraph.transforms.chunking.ChunkConfig(total_chunks: int, substitution_fields: list[str] = <factory>)#
Bases:
Schema- substitution_fields: list[str]#
- total_chunks: int#
- class taskgraph.transforms.chunking.ChunkSchema(*, chunk: ChunkConfig | None = None)#
Bases:
Schema- chunk: ChunkConfig | None#
- taskgraph.transforms.chunking.chunk_tasks(config, tasks)#
taskgraph.transforms.code_review module#
Add soft dependencies and configuration to code-review tasks.
- taskgraph.transforms.code_review.add_dependencies(config, tasks)#
taskgraph.transforms.docker_image module#
- class taskgraph.transforms.docker_image.DockerImageSchema(name: str, parent: str | None = None, symbol: str | None = None, task_from: str | None = None, args: dict[str, str] | None = None, definition: str | None = None, packages: list[str] | None = None, index: IndexSchema | None = None, cache: bool | None = None)#
Bases:
Schema- args: dict[str, str] | None#
- cache: bool | None#
- definition: str | None#
- index: IndexSchema | None#
- name: str#
- packages: list[str] | None#
- parent: str | None#
- symbol: str | None#
- task_from: str | None#
- taskgraph.transforms.docker_image.docker_image_schema#
alias of
DockerImageSchema
- taskgraph.transforms.docker_image.fill_template(config, tasks)#
taskgraph.transforms.fetch module#
- taskgraph.transforms.fetch.FETCH_SCHEMA#
alias of
FetchSchema
- class taskgraph.transforms.fetch.FetchBuilder(schema: taskgraph.util.schema.Schema, builder: Callable)#
Bases:
object- builder: Callable#
- class taskgraph.transforms.fetch.FetchSchema(name: str, description: str, fetch: FetchSubSchema, task_from: str | None = None, expires_after: str | None = None, docker_image: object | None = None, fetch_alias: str | None = None, artifact_prefix: str | None = None, attributes: dict[str, object] | None = None)#
Bases:
Schema- artifact_prefix: str | None#
- attributes: dict[str, object] | None#
- description: str#
- docker_image: object | None#
- expires_after: str | None#
- fetch: FetchSubSchema#
- fetch_alias: str | None#
- name: str#
- task_from: str | None#
- class taskgraph.transforms.fetch.GitFetchSchema(*, type: Literal['git'], repo: str, revision: str, include_dot_git: bool | None = None, artifact_name: str | None = None, path_prefix: str | None = None, ssh_key: str | None = None)#
Bases:
Schema- artifact_name: str | None#
- include_dot_git: bool | None#
- path_prefix: str | None#
- repo: str#
- revision: str#
- ssh_key: str | None#
- type: Literal['git']#
- class taskgraph.transforms.fetch.GpgSignatureConfig(sig_url: str, key_path: str)#
Bases:
Schema- key_path: str#
- sig_url: str#
- class taskgraph.transforms.fetch.StaticUrlFetchSchema(*, type: Literal['static-url'], url: str, sha256: str, size: int, gpg_signature: GpgSignatureConfig | None = None, artifact_name: str | None = None, strip_components: int | None = None, add_prefix: str | None = None, headers: dict[str, str] | None = None)#
Bases:
Schema- add_prefix: str | None#
- artifact_name: str | None#
- gpg_signature: GpgSignatureConfig | None#
- headers: dict[str, str] | None#
- sha256: str#
- size: int#
- strip_components: int | None#
- type: Literal['static-url']#
- url: str#
- taskgraph.transforms.fetch.configure_fetch(config, typ, name, fetch)#
- taskgraph.transforms.fetch.create_fetch_url_task(config, name, fetch)#
- taskgraph.transforms.fetch.create_git_fetch_task(config, name, fetch)#
- taskgraph.transforms.fetch.fetch_builder(name, schema)#
- taskgraph.transforms.fetch.make_task(config, tasks)#
- taskgraph.transforms.fetch.process_fetch_task(config, tasks)#
taskgraph.transforms.matrix module#
Transforms used to split one task definition into many tasks, governed by a matrix defined in the definition.
- taskgraph.transforms.matrix.MATRIX_SCHEMA#
alias of
MatrixSchema
- class taskgraph.transforms.matrix.MatrixConfig(*, exclude: list[dict[str, str]] | None = None, set_name: str | None = None, substitution_fields: list[str] | None = None)#
Bases:
Schema- exclude: list[dict[str, str]] | None#
- set_name: str | None#
- substitution_fields: list[str] | None#
- class taskgraph.transforms.matrix.MatrixSchema(*, name: str, matrix: MatrixConfig | None = None)#
Bases:
Schema- matrix: MatrixConfig | None#
- name: str#
- taskgraph.transforms.matrix.split_matrix(config, tasks)#
taskgraph.transforms.notify module#
Add notifications to tasks via Taskcluster’s notify service.
See https://docs.taskcluster.net/docs/reference/core/notify/usage for more information.
- class taskgraph.transforms.notify.EmailContent(subject: str | None = None, content: str | None = None, link: EmailLinkContent | None = None)#
Bases:
Schema- content: str | None#
- link: EmailLinkContent | None#
- subject: str | None#
- class taskgraph.transforms.notify.EmailLinkContent(text: str, href: str)#
Bases:
Schema- href: str#
- text: str#
- class taskgraph.transforms.notify.EmailRecipient(*, address: ~typing.Annotated[~typing.Any, <taskgraph.util.schema.OptionallyKeyedBy object at 0x770a75692a80>], status_type: ~typing.Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None = None)#
Bases:
Schema- address: OptionallyKeyedBy object at 0x770a75692a80>]#
- status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None#
- class taskgraph.transforms.notify.LegacyNotificationsConfig(emails: ~typing.Annotated[~typing.Any, <taskgraph.util.schema.OptionallyKeyedBy object at 0x770a75628170>], subject: str, message: str | None = None, status_types: list[~typing.Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running']] | None = None)#
Bases:
Schema- emails: OptionallyKeyedBy object at 0x770a75628170>]#
- message: str | None#
- status_types: list[Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running']] | None#
- subject: str#
- class taskgraph.transforms.notify.MatrixContent(body: str | None = None, formatted_body: str | None = None, format: str | None = None, msg_type: str | None = None)#
Bases:
Schema- body: str | None#
- format: str | None#
- formatted_body: str | None#
- msg_type: str | None#
- class taskgraph.transforms.notify.MatrixRoomRecipient(*, room_id: str, status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None = None)#
Bases:
Schema- room_id: str#
- status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None#
- taskgraph.transforms.notify.NOTIFY_SCHEMA#
alias of
NotifySchema
- class taskgraph.transforms.notify.NotifyConfig(recipients: list[EmailRecipient | MatrixRoomRecipient | PulseRecipient | SlackChannelRecipient], content: NotifyContentConfig | None = None)#
Bases:
Schema- content: NotifyContentConfig | None#
- recipients: list[EmailRecipient | MatrixRoomRecipient | PulseRecipient | SlackChannelRecipient]#
- class taskgraph.transforms.notify.NotifyContentConfig(email: EmailContent | None = None, matrix: MatrixContent | None = None, slack: SlackContent | None = None)#
Bases:
Schema- email: EmailContent | None#
- matrix: MatrixContent | None#
- slack: SlackContent | None#
- class taskgraph.transforms.notify.NotifySchema(*, notify: NotifyConfig | None = None, notifications: LegacyNotificationsConfig | None = None)#
Bases:
Schema- notifications: LegacyNotificationsConfig | None#
- notify: NotifyConfig | None#
- class taskgraph.transforms.notify.PulseRecipient(*, routing_key: str, status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None = None)#
Bases:
Schema- routing_key: str#
- status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None#
- class taskgraph.transforms.notify.SlackChannelRecipient(*, channel_id: str, status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None = None)#
Bases:
Schema- channel_id: str#
- status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None#
- class taskgraph.transforms.notify.SlackContent(text: str | None = None, blocks: list | None = None, attachments: list | None = None)#
Bases:
Schema- attachments: list | None#
- blocks: list | None#
- text: str | None#
- taskgraph.transforms.notify.add_notifications(config, tasks)#
taskgraph.transforms.task module#
These transformations take a task description and turn it into a TaskCluster task definition (along with attributes, label, etc.). The input to these transformations is generic to any kind of task, but abstracts away some of the complexities of worker implementations, scopes, and treeherder annotations.
- class taskgraph.transforms.task.AlwaysOptimizedPayloadSchema(*, implementation: Literal['always-optimized'])#
Bases:
Schema- implementation: Literal['always-optimized']#
- class taskgraph.transforms.task.BeetmoverPayloadSchema(*, implementation: Literal['beetmover'], max_run_time: int, release_properties: ReleaseProperties, upstream_artifacts: list[UpstreamArtifact], locale: str | None = None, partner_public: bool | None = None, artifact_map: object | None = None)#
Bases:
Schema- artifact_map: object | None#
- implementation: Literal['beetmover']#
- locale: str | None#
- max_run_time: int#
- partner_public: bool | None#
- release_properties: ReleaseProperties#
- upstream_artifacts: list[UpstreamArtifact]#
- class taskgraph.transforms.task.DockerWorkerArtifact(type: Literal['file', 'directory', 'volume'] | None = None, path: str | None = None, name: str | None = None)#
Bases:
Schema- name: str | None#
- path: str | None#
- type: Literal['file', 'directory', 'volume'] | None#
- class taskgraph.transforms.task.DockerWorkerCacheEntry(type: Literal['persistent'] = 'persistent', name: str | None = None, mount_point: str | None = None, skip_untrusted: bool | None = None)#
Bases:
Schema- mount_point: str | None#
- name: str | None#
- skip_untrusted: bool | None#
- type: Literal['persistent']#
- class taskgraph.transforms.task.DockerWorkerPayloadSchema(*, implementation: Literal['docker-worker'], os: Literal['linux'], docker_image: str | dict[str, str], relengapi_proxy: bool, chain_of_trust: bool, taskcluster_proxy: bool, allow_ptrace: bool, loopback_video: bool, env: dict[str, str | TaskRefTypeSchema], max_run_time: int, volumes: list[str] | None = None, caches: list[DockerWorkerCacheEntry] | None = None, artifacts: list[DockerWorkerArtifact] | None = None, command: list[str | TaskRefTypeSchema] | None = None, retry_exit_status: list[int] | None = None, purge_caches_exit_status: list[int] | None = None, skip_artifacts: bool | None = None)#
Bases:
Schema- allow_ptrace: bool#
- artifacts: list[DockerWorkerArtifact] | None#
- caches: list[DockerWorkerCacheEntry] | None#
- chain_of_trust: bool#
- command: list[str | TaskRefTypeSchema] | None#
- docker_image: str | dict[str, str]#
- env: dict[str, str | TaskRefTypeSchema]#
- implementation: Literal['docker-worker']#
- loopback_video: bool#
- max_run_time: int#
- os: Literal['linux']#
- purge_caches_exit_status: list[int] | None#
- relengapi_proxy: bool#
- retry_exit_status: list[int] | None#
- skip_artifacts: bool | None#
- taskcluster_proxy: bool#
- volumes: list[str] | None#
- class taskgraph.transforms.task.GenericWorkerArtifact(type: Literal['file', 'directory'], path: str, name: str | None = None)#
Bases:
Schema- name: str | None#
- path: str#
- type: Literal['file', 'directory']#
- class taskgraph.transforms.task.GenericWorkerPayloadSchema(*, implementation: Literal['generic-worker'], os: Literal['windows', 'macosx', 'linux', 'linux-bitbar'], command: list, env: dict[str, str | TaskRefTypeSchema], max_run_time: int, chain_of_trust: bool, artifacts: list[GenericWorkerArtifact] | None = None, mounts: list[MountSchema] | None = None, retry_exit_status: list[int] | None = None, purge_caches_exit_status: list[int] | None = None, os_groups: list[str] | None = None, run_as_administrator: bool | None = None, run_task_as_current_user: bool | None = None, taskcluster_proxy: bool | None = None, hide_cmd_window: bool | None = None, skip_artifacts: bool | None = None)#
Bases:
Schema- artifacts: list[GenericWorkerArtifact] | None#
- chain_of_trust: bool#
- command: list#
- env: dict[str, str | TaskRefTypeSchema]#
- hide_cmd_window: bool | None#
- implementation: Literal['generic-worker']#
- max_run_time: int#
- mounts: list[MountSchema] | None#
- os: Literal['windows', 'macosx', 'linux', 'linux-bitbar']#
- os_groups: list[str] | None#
- purge_caches_exit_status: list[int] | None#
- retry_exit_status: list[int] | None#
- run_as_administrator: bool | None#
- run_task_as_current_user: bool | None#
- skip_artifacts: bool | None#
- taskcluster_proxy: bool | None#
- class taskgraph.transforms.task.InvalidPayloadSchema(*, implementation: Literal['invalid'])#
Bases:
Schema- implementation: Literal['invalid']#
- class taskgraph.transforms.task.MountContentSchema(artifact: str | None = None, task_id: str | TaskRefTypeSchema | None = None, url: str | None = None)#
Bases:
Schema- artifact: str | None#
- task_id: str | TaskRefTypeSchema | None#
- url: str | None#
- class taskgraph.transforms.task.MountSchema(cache_name: str | None = None, content: MountContentSchema | None = None, directory: str | None = None, file: str | None = None, format: Literal['rar', 'tar.bz2', 'tar.gz', 'zip'] | None = None)#
Bases:
Schema- cache_name: str | None#
- content: MountContentSchema | None#
- directory: str | None#
- file: str | None#
- format: Literal['rar', 'tar.bz2', 'tar.gz', 'zip'] | None#
- class taskgraph.transforms.task.PayloadBuilder(schema: object, builder: Callable)#
Bases:
object- builder: Callable#
- schema: object#
- class taskgraph.transforms.task.ReleaseProperties(app_name: str | None = None, app_version: str | None = None, branch: str | None = None, build_id: str | None = None, hash_type: str | None = None, platform: str | None = None)#
Bases:
Schema- app_name: str | None#
- app_version: str | None#
- branch: str | None#
- build_id: str | None#
- hash_type: str | None#
- platform: str | None#
- class taskgraph.transforms.task.SucceedPayloadSchema(implementation: Literal['succeed'])#
Bases:
Schema- implementation: Literal['succeed']#
- class taskgraph.transforms.task.TaskDescriptionSchema(label: str, description: str, always_target: bool, optimization: None | OptimizationTypeSchema, worker_type: str, needs_sccache: bool, attributes: dict[str, object] | None=None, task_from: str | None = None, dependencies: dict[str, object] | None=None, priority: Literal['highest', 'very-high', 'high', 'medium', 'low', 'very-low', 'lowest'] | None=None, soft_dependencies: list[str] | None = None, if_dependencies: list[str] | None = None, requires: Literal['all-completed', 'all-resolved'] | None=None, expires_after: str | None = None, deadline_after: str | None = None, routes: list[str] | None = None, scopes: list[str] | None = None, tags: dict[str, str] | None=None, extra: dict[str, object] | None=None, treeherder: bool | TreeherderConfig | None = None, index: IndexSchema | None = None, run_on_projects: Any, <taskgraph.util.schema.OptionallyKeyedBy object at 0x770a76e08530>] | None=None, run_on_tasks_for: list[str] | None = None, run_on_git_branches: list[str] | None = None, shipping_phase: Literal['build', 'promote', 'push', 'ship'] | None=None, worker: WorkerSchema | None = None)#
Bases:
Schema- always_target: bool#
- attributes: dict[str, object] | None#
- deadline_after: str | None#
- dependencies: dict[str, object] | None#
- description: str#
- expires_after: str | None#
- extra: dict[str, object] | None#
- if_dependencies: list[str] | None#
- index: IndexSchema | None#
- label: str#
- needs_sccache: bool#
- optimization: None | OptimizationTypeSchema#
- priority: Literal['highest', 'very-high', 'high', 'medium', 'low', 'very-low', 'lowest'] | None#
- requires: Literal['all-completed', 'all-resolved'] | None#
- routes: list[str] | None#
- run_on_git_branches: list[str] | None#
- run_on_projects: OptionallyKeyedBy object at 0x770a76e08530>] | None#
- run_on_tasks_for: list[str] | None#
- scopes: list[str] | None#
- shipping_phase: Literal['build', 'promote', 'push', 'ship'] | None#
- soft_dependencies: list[str] | None#
- tags: dict[str, str] | None#
- task_from: str | None#
- treeherder: bool | TreeherderConfig | None#
- worker: WorkerSchema | None#
- worker_type: str#
- class taskgraph.transforms.task.UpstreamArtifact(task_id: str | TaskRefTypeSchema, task_type: str, paths: list[str], locale: str)#
Bases:
Schema- locale: str#
- paths: list[str]#
- task_id: str | TaskRefTypeSchema#
- task_type: str#
- class taskgraph.transforms.task.WorkerSchema(*, implementation: str)#
Bases:
Schema- implementation: str#
- taskgraph.transforms.task.add_generic_index_routes(config, task)#
- taskgraph.transforms.task.add_github_checks(config, tasks)#
For git repositories, add checks route to all tasks.
This will be replaced by a configurable option in the future.
- taskgraph.transforms.task.add_index_routes(config, tasks)#
- taskgraph.transforms.task.build_beetmover_payload(config, task, task_def)#
- taskgraph.transforms.task.build_docker_worker_payload(config, task, task_def)#
- taskgraph.transforms.task.build_dummy_payload(config, task, task_def)#
- taskgraph.transforms.task.build_generic_worker_payload(config, task, task_def)#
- taskgraph.transforms.task.build_invalid_payload(config, task, task_def)#
- taskgraph.transforms.task.build_task(config, tasks)#
- taskgraph.transforms.task.chain_of_trust(config, tasks)#
- taskgraph.transforms.task.get_branch_rev(config)#
- taskgraph.transforms.task.get_default_deadline(graph_config, project)#
- taskgraph.transforms.task.get_default_priority(graph_config, project)#
- taskgraph.transforms.task.index_builder(name)#
- taskgraph.transforms.task.payload_builder(name, schema)#
- taskgraph.transforms.task.process_treeherder_metadata(config, tasks)#
- taskgraph.transforms.task.run_task_suffix()#
String to append to cache names under control of run-task.
- taskgraph.transforms.task.set_defaults(config, tasks)#
- taskgraph.transforms.task.set_implementation(config, tasks)#
Set the worker implementation based on the worker-type alias.
- taskgraph.transforms.task.task_description_schema#
alias of
TaskDescriptionSchema
- taskgraph.transforms.task.task_name_from_label(config, tasks)#
- taskgraph.transforms.task.validate(config, tasks)#
- taskgraph.transforms.task.verify_index(config, index)#
taskgraph.transforms.task_context module#
- taskgraph.transforms.task_context.SCHEMA#
alias of
TaskContextSchema
- class taskgraph.transforms.task_context.TaskContextConfig(substitution_fields: list[str], from_parameters: dict[str, list[str] | str] | None = None, from_file: str | None = None, from_object: object | None = None)#
Bases:
Schema- from_file: str | None#
- from_object: object | None#
- from_parameters: dict[str, list[str] | str] | None#
- substitution_fields: list[str]#
- class taskgraph.transforms.task_context.TaskContextSchema(*, name: str | None = None, task_context: TaskContextConfig | None = None)#
Bases:
Schema- name: str | None#
- task_context: TaskContextConfig | None#
- taskgraph.transforms.task_context.render_task(config, tasks)#