taskgraph.transforms package

Contents

taskgraph.transforms package#

Subpackages#

Submodules#

taskgraph.transforms.base module#

class taskgraph.transforms.base.RepoConfig(prefix: str, name: str, base_repository: str, head_repository: str, head_ref: str, type: str, path: str = '', head_rev: str | None = None, ssh_secret_name: str | None = None)#

Bases: object

base_repository: str#
head_ref: str#
head_repository: str#
head_rev: str | None = None#
name: str#
path: str = ''#
prefix: str#
ssh_secret_name: str | None = None#
type: str#
class taskgraph.transforms.base.TransformConfig(kind: str, path: str, config: dict, params: Parameters, kind_dependencies_tasks: dict[str, Task], graph_config: GraphConfig, write_artifacts: bool)#

Bases: object

A container for configuration affecting transforms. The config argument to transforms is an instance of this class.

config: dict#
graph_config: GraphConfig#
kind: str#
kind_dependencies_tasks: dict[str, Task]#
params: Parameters#
path: str#
property repo_configs#
write_artifacts: bool#
class taskgraph.transforms.base.TransformSequence(_transforms: list = <factory>)#

Bases: object

Container for a sequence of transforms. Each transform is represented as a callable taking (config, items) and returning a generator which will yield transformed items. The resulting sequence has the same interface.

This is convenient to use in a file full of transforms, as it provides a decorator, @transforms.add, that will add the decorated function to the sequence.

add(func)#
add_validate(schema)#
class taskgraph.transforms.base.ValidateSchema(schema: taskgraph.util.schema.Schema)#

Bases: object

schema: Schema#

taskgraph.transforms.from_deps module#

Transforms used to create tasks based on the kind dependencies, filtering on common attributes like the build-type.

These transforms are useful when follow-up tasks are needed for some indeterminate subset of existing tasks. For example, running a signing task after each build task, whatever builds may exist.

taskgraph.transforms.from_deps.FROM_DEPS_SCHEMA#

alias of FromDepsSchema

taskgraph.transforms.from_deps.from_deps(config, tasks)#

taskgraph.transforms.cached_tasks module#

taskgraph.transforms.cached_tasks.cache_task(config, tasks)#
taskgraph.transforms.cached_tasks.format_task_digest(cached_task)#
taskgraph.transforms.cached_tasks.order_tasks(config, tasks)#

Iterate image tasks in an order where parent tasks come first.

taskgraph.transforms.chunking module#

taskgraph.transforms.chunking.CHUNK_SCHEMA#

alias of ChunkSchema

class taskgraph.transforms.chunking.ChunkConfig(total_chunks: int, substitution_fields: list[str] = <factory>)#

Bases: Schema

substitution_fields: list[str]#
total_chunks: int#
class taskgraph.transforms.chunking.ChunkSchema(*, chunk: ChunkConfig | None = None)#

Bases: Schema

chunk: ChunkConfig | None#
taskgraph.transforms.chunking.chunk_tasks(config, tasks)#

taskgraph.transforms.code_review module#

Add soft dependencies and configuration to code-review tasks.

taskgraph.transforms.code_review.add_dependencies(config, tasks)#

taskgraph.transforms.docker_image module#

class taskgraph.transforms.docker_image.DockerImageSchema(name: str, parent: str | None = None, symbol: str | None = None, task_from: str | None = None, args: dict[str, str] | None = None, definition: str | None = None, packages: list[str] | None = None, index: IndexSchema | None = None, cache: bool | None = None)#

Bases: Schema

args: dict[str, str] | None#
cache: bool | None#
definition: str | None#
index: IndexSchema | None#
name: str#
packages: list[str] | None#
parent: str | None#
symbol: str | None#
task_from: str | None#
taskgraph.transforms.docker_image.docker_image_schema#

alias of DockerImageSchema

taskgraph.transforms.docker_image.fill_template(config, tasks)#

taskgraph.transforms.fetch module#

taskgraph.transforms.fetch.FETCH_SCHEMA#

alias of FetchSchema

class taskgraph.transforms.fetch.FetchBuilder(schema: taskgraph.util.schema.Schema, builder: Callable)#

Bases: object

builder: Callable#
schema: Schema#
class taskgraph.transforms.fetch.FetchSchema(name: str, description: str, fetch: FetchSubSchema, task_from: str | None = None, expires_after: str | None = None, docker_image: object | None = None, fetch_alias: str | None = None, artifact_prefix: str | None = None, attributes: dict[str, object] | None = None)#

Bases: Schema

artifact_prefix: str | None#
attributes: dict[str, object] | None#
description: str#
docker_image: object | None#
expires_after: str | None#
fetch: FetchSubSchema#
fetch_alias: str | None#
name: str#
task_from: str | None#
class taskgraph.transforms.fetch.FetchSubSchema(*, type: str)#

Bases: Schema

type: str#
class taskgraph.transforms.fetch.GitFetchSchema(*, type: Literal['git'], repo: str, revision: str, include_dot_git: bool | None = None, artifact_name: str | None = None, path_prefix: str | None = None, ssh_key: str | None = None)#

Bases: Schema

artifact_name: str | None#
include_dot_git: bool | None#
path_prefix: str | None#
repo: str#
revision: str#
ssh_key: str | None#
type: Literal['git']#
class taskgraph.transforms.fetch.GpgSignatureConfig(sig_url: str, key_path: str)#

Bases: Schema

key_path: str#
sig_url: str#
class taskgraph.transforms.fetch.StaticUrlFetchSchema(*, type: Literal['static-url'], url: str, sha256: str, size: int, gpg_signature: GpgSignatureConfig | None = None, artifact_name: str | None = None, strip_components: int | None = None, add_prefix: str | None = None, headers: dict[str, str] | None = None)#

Bases: Schema

add_prefix: str | None#
artifact_name: str | None#
gpg_signature: GpgSignatureConfig | None#
headers: dict[str, str] | None#
sha256: str#
size: int#
strip_components: int | None#
type: Literal['static-url']#
url: str#
taskgraph.transforms.fetch.configure_fetch(config, typ, name, fetch)#
taskgraph.transforms.fetch.create_fetch_url_task(config, name, fetch)#
taskgraph.transforms.fetch.create_git_fetch_task(config, name, fetch)#
taskgraph.transforms.fetch.fetch_builder(name, schema)#
taskgraph.transforms.fetch.make_task(config, tasks)#
taskgraph.transforms.fetch.process_fetch_task(config, tasks)#

taskgraph.transforms.matrix module#

Transforms used to split one task definition into many tasks, governed by a matrix defined in the definition.

taskgraph.transforms.matrix.MATRIX_SCHEMA#

alias of MatrixSchema

class taskgraph.transforms.matrix.MatrixConfig(*, exclude: list[dict[str, str]] | None = None, set_name: str | None = None, substitution_fields: list[str] | None = None)#

Bases: Schema

exclude: list[dict[str, str]] | None#
set_name: str | None#
substitution_fields: list[str] | None#
class taskgraph.transforms.matrix.MatrixSchema(*, name: str, matrix: MatrixConfig | None = None)#

Bases: Schema

matrix: MatrixConfig | None#
name: str#
taskgraph.transforms.matrix.split_matrix(config, tasks)#

taskgraph.transforms.notify module#

Add notifications to tasks via Taskcluster’s notify service.

See https://docs.taskcluster.net/docs/reference/core/notify/usage for more information.

class taskgraph.transforms.notify.EmailContent(subject: str | None = None, content: str | None = None, link: EmailLinkContent | None = None)#

Bases: Schema

content: str | None#
subject: str | None#
class taskgraph.transforms.notify.EmailLinkContent(text: str, href: str)#

Bases: Schema

href: str#
text: str#
class taskgraph.transforms.notify.EmailRecipient(*, address: ~typing.Annotated[~typing.Any, <taskgraph.util.schema.OptionallyKeyedBy object at 0x7dc1437f3800>], status_type: ~typing.Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None = None)#

Bases: Schema

address: OptionallyKeyedBy object at 0x7dc1437f3800>]#
status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None#
class taskgraph.transforms.notify.LegacyNotificationsConfig(emails: ~typing.Annotated[~typing.Any, <taskgraph.util.schema.OptionallyKeyedBy object at 0x7dc14381d6a0>], subject: str, message: str | None = None, status_types: list[~typing.Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running']] | None = None)#

Bases: Schema

emails: OptionallyKeyedBy object at 0x7dc14381d6a0>]#
message: str | None#
status_types: list[Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running']] | None#
subject: str#
class taskgraph.transforms.notify.MatrixContent(body: str | None = None, formatted_body: str | None = None, format: str | None = None, msg_type: str | None = None)#

Bases: Schema

body: str | None#
format: str | None#
formatted_body: str | None#
msg_type: str | None#
class taskgraph.transforms.notify.MatrixRoomRecipient(*, room_id: str, status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None = None)#

Bases: Schema

room_id: str#
status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None#
taskgraph.transforms.notify.NOTIFY_SCHEMA#

alias of NotifySchema

class taskgraph.transforms.notify.NotifyConfig(recipients: list[EmailRecipient | MatrixRoomRecipient | PulseRecipient | SlackChannelRecipient], content: NotifyContentConfig | None = None)#

Bases: Schema

content: NotifyContentConfig | None#
recipients: list[EmailRecipient | MatrixRoomRecipient | PulseRecipient | SlackChannelRecipient]#
class taskgraph.transforms.notify.NotifyContentConfig(email: EmailContent | None = None, matrix: MatrixContent | None = None, slack: SlackContent | None = None)#

Bases: Schema

email: EmailContent | None#
matrix: MatrixContent | None#
slack: SlackContent | None#
class taskgraph.transforms.notify.NotifySchema(*, notify: NotifyConfig | None = None, notifications: LegacyNotificationsConfig | None = None)#

Bases: Schema

notifications: LegacyNotificationsConfig | None#
notify: NotifyConfig | None#
class taskgraph.transforms.notify.PulseRecipient(*, routing_key: str, status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None = None)#

Bases: Schema

routing_key: str#
status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None#
class taskgraph.transforms.notify.SlackChannelRecipient(*, channel_id: str, status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None = None)#

Bases: Schema

channel_id: str#
status_type: Literal['on-completed', 'on-defined', 'on-exception', 'on-failed', 'on-pending', 'on-resolved', 'on-running'] | None#
class taskgraph.transforms.notify.SlackContent(text: str | None = None, blocks: list | None = None, attachments: list | None = None)#

Bases: Schema

attachments: list | None#
blocks: list | None#
text: str | None#
taskgraph.transforms.notify.add_notifications(config, tasks)#

taskgraph.transforms.task module#

These transformations take a task description and turn it into a TaskCluster task definition (along with attributes, label, etc.). The input to these transformations is generic to any kind of task, but abstracts away some of the complexities of worker implementations, scopes, and treeherder annotations.

class taskgraph.transforms.task.AlwaysOptimizedPayloadSchema(*, implementation: Literal['always-optimized'])#

Bases: Schema

implementation: Literal['always-optimized']#
class taskgraph.transforms.task.BeetmoverPayloadSchema(*, implementation: Literal['beetmover'], max_run_time: int, release_properties: ReleaseProperties, upstream_artifacts: list[UpstreamArtifact], locale: str | None = None, partner_public: bool | None = None, artifact_map: object | None = None)#

Bases: Schema

artifact_map: object | None#
implementation: Literal['beetmover']#
locale: str | None#
max_run_time: int#
partner_public: bool | None#
release_properties: ReleaseProperties#
upstream_artifacts: list[UpstreamArtifact]#
class taskgraph.transforms.task.DockerWorkerArtifact(type: Literal['file', 'directory', 'volume'] | None = None, path: str | None = None, name: str | None = None)#

Bases: Schema

name: str | None#
path: str | None#
type: Literal['file', 'directory', 'volume'] | None#
class taskgraph.transforms.task.DockerWorkerCacheEntry(type: Literal['persistent'] = 'persistent', name: str | None = None, mount_point: str | None = None, skip_untrusted: bool | None = None)#

Bases: Schema

mount_point: str | None#
name: str | None#
skip_untrusted: bool | None#
type: Literal['persistent']#
class taskgraph.transforms.task.DockerWorkerPayloadSchema(*, implementation: Literal['docker-worker'], os: Literal['linux'], docker_image: str | dict[str, str], chain_of_trust: bool, taskcluster_proxy: bool, allow_ptrace: bool, loopback_video: bool, env: dict[str, str | TaskRefTypeSchema], max_run_time: int, volumes: list[str] | None = None, caches: list[DockerWorkerCacheEntry] | None = None, artifacts: list[DockerWorkerArtifact] | None = None, command: list[str | TaskRefTypeSchema] | None = None, retry_exit_status: list[int] | None = None, purge_caches_exit_status: list[int] | None = None, skip_artifacts: bool | None = None)#

Bases: Schema

allow_ptrace: bool#
artifacts: list[DockerWorkerArtifact] | None#
caches: list[DockerWorkerCacheEntry] | None#
chain_of_trust: bool#
command: list[str | TaskRefTypeSchema] | None#
docker_image: str | dict[str, str]#
env: dict[str, str | TaskRefTypeSchema]#
implementation: Literal['docker-worker']#
loopback_video: bool#
max_run_time: int#
os: Literal['linux']#
purge_caches_exit_status: list[int] | None#
retry_exit_status: list[int] | None#
skip_artifacts: bool | None#
taskcluster_proxy: bool#
volumes: list[str] | None#
class taskgraph.transforms.task.GenericWorkerArtifact(type: Literal['file', 'directory'], path: str, name: str | None = None)#

Bases: Schema

name: str | None#
path: str#
type: Literal['file', 'directory']#
class taskgraph.transforms.task.GenericWorkerPayloadSchema(*, implementation: Literal['generic-worker'], os: Literal['windows', 'macosx', 'linux', 'linux-bitbar'], command: list, env: dict[str, str | TaskRefTypeSchema], max_run_time: int, chain_of_trust: bool, artifacts: list[GenericWorkerArtifact] | None = None, mounts: list[MountSchema] | None = None, retry_exit_status: list[int] | None = None, purge_caches_exit_status: list[int] | None = None, os_groups: list[str] | None = None, run_as_administrator: bool | None = None, run_task_as_current_user: bool | None = None, taskcluster_proxy: bool | None = None, hide_cmd_window: bool | None = None, skip_artifacts: bool | None = None)#

Bases: Schema

artifacts: list[GenericWorkerArtifact] | None#
chain_of_trust: bool#
command: list#
env: dict[str, str | TaskRefTypeSchema]#
hide_cmd_window: bool | None#
implementation: Literal['generic-worker']#
max_run_time: int#
mounts: list[MountSchema] | None#
os: Literal['windows', 'macosx', 'linux', 'linux-bitbar']#
os_groups: list[str] | None#
purge_caches_exit_status: list[int] | None#
retry_exit_status: list[int] | None#
run_as_administrator: bool | None#
run_task_as_current_user: bool | None#
skip_artifacts: bool | None#
taskcluster_proxy: bool | None#
class taskgraph.transforms.task.InvalidPayloadSchema(*, implementation: Literal['invalid'])#

Bases: Schema

implementation: Literal['invalid']#
class taskgraph.transforms.task.MountContentSchema(artifact: str | None = None, task_id: str | TaskRefTypeSchema | None = None, url: str | None = None)#

Bases: Schema

artifact: str | None#
task_id: str | TaskRefTypeSchema | None#
url: str | None#
class taskgraph.transforms.task.MountSchema(cache_name: str | None = None, content: MountContentSchema | None = None, directory: str | None = None, file: str | None = None, format: Literal['rar', 'tar.bz2', 'tar.gz', 'zip'] | None = None)#

Bases: Schema

cache_name: str | None#
content: MountContentSchema | None#
directory: str | None#
file: str | None#
format: Literal['rar', 'tar.bz2', 'tar.gz', 'zip'] | None#
class taskgraph.transforms.task.PayloadBuilder(schema: object, builder: Callable)#

Bases: object

builder: Callable#
schema: object#
class taskgraph.transforms.task.ReleaseProperties(app_name: str | None = None, app_version: str | None = None, branch: str | None = None, build_id: str | None = None, hash_type: str | None = None, platform: str | None = None)#

Bases: Schema

app_name: str | None#
app_version: str | None#
branch: str | None#
build_id: str | None#
hash_type: str | None#
platform: str | None#
class taskgraph.transforms.task.SucceedPayloadSchema(implementation: Literal['succeed'])#

Bases: Schema

implementation: Literal['succeed']#
class taskgraph.transforms.task.TaskDescriptionSchema(label: str, description: str, always_target: bool, optimization: None | OptimizationTypeSchema, worker_type: str, needs_sccache: bool, attributes: dict[str, object] | None=None, task_from: str | None = None, dependencies: dict[str, object] | None=None, priority: Literal['highest', 'very-high', 'high', 'medium', 'low', 'very-low', 'lowest'] | None=None, soft_dependencies: list[str] | None = None, if_dependencies: list[str] | None = None, requires: Literal['all-completed', 'all-resolved'] | None=None, expires_after: str | None = None, deadline_after: str | None = None, routes: list[str] | None = None, scopes: list[str] | None = None, tags: dict[str, str] | None=None, extra: dict[str, object] | None=None, treeherder: bool | TreeherderConfig | None = None, index: IndexSchema | None = None, run_on_projects: Any, <taskgraph.util.schema.OptionallyKeyedBy object at 0x7dc144869ee0>] | None=None, run_on_tasks_for: list[str] | None = None, run_on_git_branches: list[str] | None = None, shipping_phase: Literal['build', 'promote', 'push', 'ship'] | None=None, worker: WorkerSchema | None = None)#

Bases: Schema

always_target: bool#
attributes: dict[str, object] | None#
deadline_after: str | None#
dependencies: dict[str, object] | None#
description: str#
expires_after: str | None#
extra: dict[str, object] | None#
if_dependencies: list[str] | None#
index: IndexSchema | None#
label: str#
needs_sccache: bool#
optimization: None | OptimizationTypeSchema#
priority: Literal['highest', 'very-high', 'high', 'medium', 'low', 'very-low', 'lowest'] | None#
requires: Literal['all-completed', 'all-resolved'] | None#
routes: list[str] | None#
run_on_git_branches: list[str] | None#
run_on_projects: OptionallyKeyedBy object at 0x7dc144869ee0>] | None#
run_on_tasks_for: list[str] | None#
scopes: list[str] | None#
shipping_phase: Literal['build', 'promote', 'push', 'ship'] | None#
soft_dependencies: list[str] | None#
tags: dict[str, str] | None#
task_from: str | None#
treeherder: bool | TreeherderConfig | None#
worker: WorkerSchema | None#
worker_type: str#
class taskgraph.transforms.task.UpstreamArtifact(task_id: str | TaskRefTypeSchema, task_type: str, paths: list[str], locale: str)#

Bases: Schema

locale: str#
paths: list[str]#
task_id: str | TaskRefTypeSchema#
task_type: str#
class taskgraph.transforms.task.WorkerSchema(*, implementation: str)#

Bases: Schema

implementation: str#
taskgraph.transforms.task.add_generic_index_routes(config, task)#
taskgraph.transforms.task.add_github_checks(config, tasks)#

For git repositories, add checks route to all tasks.

This will be replaced by a configurable option in the future.

taskgraph.transforms.task.add_index_routes(config, tasks)#
taskgraph.transforms.task.build_beetmover_payload(config, task, task_def)#
taskgraph.transforms.task.build_docker_worker_payload(config, task, task_def)#
taskgraph.transforms.task.build_dummy_payload(config, task, task_def)#
taskgraph.transforms.task.build_generic_worker_payload(config, task, task_def)#
taskgraph.transforms.task.build_invalid_payload(config, task, task_def)#
taskgraph.transforms.task.build_task(config, tasks)#
taskgraph.transforms.task.chain_of_trust(config, tasks)#
taskgraph.transforms.task.get_branch_rev(config)#
taskgraph.transforms.task.get_default_deadline(graph_config, project)#
taskgraph.transforms.task.get_default_priority(graph_config, project)#
taskgraph.transforms.task.index_builder(name)#
taskgraph.transforms.task.payload_builder(name, schema)#
taskgraph.transforms.task.process_treeherder_metadata(config, tasks)#
taskgraph.transforms.task.run_task_suffix()#

String to append to cache names under control of run-task.

taskgraph.transforms.task.set_defaults(config, tasks)#
taskgraph.transforms.task.set_implementation(config, tasks)#

Set the worker implementation based on the worker-type alias.

taskgraph.transforms.task.task_description_schema#

alias of TaskDescriptionSchema

taskgraph.transforms.task.task_name_from_label(config, tasks)#
taskgraph.transforms.task.validate(config, tasks)#
taskgraph.transforms.task.verify_index(config, index)#

taskgraph.transforms.task_context module#

class taskgraph.transforms.task_context.ResolveKeyedByConfigOptions(defer: list[str] | None = None, enforce_single_match: bool | None = None)#

Bases: Schema

defer: list[str] | None#
enforce_single_match: bool | None#
taskgraph.transforms.task_context.SCHEMA#

alias of TaskContextSchema

class taskgraph.transforms.task_context.TaskContextConfig(substitution_fields: list[str], from_parameters: dict[str, list[str] | str] | None = None, from_file: str | None = None, from_object: object | None = None, resolve_keyed_by_options: ResolveKeyedByConfigOptions | None = None)#

Bases: Schema

from_file: str | None#
from_object: object | None#
from_parameters: dict[str, list[str] | str] | None#
resolve_keyed_by_options: ResolveKeyedByConfigOptions | None#
substitution_fields: list[str]#
class taskgraph.transforms.task_context.TaskContextSchema(*, name: str | None = None, task_context: TaskContextConfig | None = None)#

Bases: Schema

name: str | None#
task_context: TaskContextConfig | None#
taskgraph.transforms.task_context.render_task(config, tasks)#

Module contents#