API Reference

firexkit.argument_conversion module

exception firexkit.argument_conversion.ArgumentConversionException[source]

Bases: Exception

An exception occurred while executing a converter

exception firexkit.argument_conversion.CircularDependencyException[source]

Bases: firexkit.argument_conversion.ConverterRegistrationException

A converter was registered with a dependency that is itself directly or indirectly dependent on it.

class firexkit.argument_conversion.ConverterRegister[source]

Bases: object

Converters are a practical mechanism for altering the input values into microservices. They can also be used in upfront validation of the inputs.

convert(pre_task=True, **kwargs) → dict[source]

Run all registered converters :param pre_task: Converters can be registered to run before or after a task runs.

classmethod get_register(task_name)[source]

Provide a list of all converters in order of execution, accounting for dependencies.

classmethod list_converters(task_name, pre_task=True)[source]

Register a converter function.


args – A collection of optional arguments, the function of which is based on it’s type:

  • callable (only once): A function that will be called to convert arguments

  • boolean (only once): At which point should this converter be called? True is pre (before task), False is post. (after task)

  • str: Dependencies. Any dependency of the current converter on the one in the string.

classmethod register_for_task(task: celery.local.PromiseProxy, pre_task=True, *args)[source]

Register a converter function for a given task.

  • task – A microservice signature against which to register the task

  • pre_task – At which point should this converter be called? True is pre (before task), False is post. (after task)

  • args – A collection of optional arguments, the function of which is based on it’s type:

  • callable (only once): A function that will be called to convert arguments

  • str: Dependencies. Any dependency of the current converter on the one in the string.

classmethod task_convert(task_name: str, pre_task=True, **kwargs) → dict[source]

Run the argument conversion for a given task.

  • task_name – the short name of the task. If long name is given, it will be reduced to that short name

  • pre_task – Converters can be registered to run before or after a task runs

  • kwargs – the argument dict to be converted

exception firexkit.argument_conversion.ConverterRegistrationException[source]

Bases: Exception

A coding error in the registration of the converter

exception firexkit.argument_conversion.MissingConverterDependencyError[source]

Bases: firexkit.argument_conversion.ConverterRegistrationException

A converter was registered with a dependency that does not exist.

exception firexkit.argument_conversion.NameDuplicationException[source]

Bases: firexkit.argument_conversion.ConverterRegistrationException

A converter was registered with the same name as another converter. This creates conflicts during dependency check, and is not allow

class firexkit.argument_conversion.SingleArgDecorator(*args)[source]

Bases: object

Decorator to simplify a common use case for argument converters, in which a single argument in the bag of goodies needs to be validated or converted. Converter is only called if the argument is in kwargs.


@ConverterRegister.ConverterRegister(BirthdayCake) @SingleArgDecorator(“message”): def yell_loud(arg_value):

return arg_value.upper()

firexkit.bag_of_goodies module

class firexkit.bag_of_goodies.BagOfGoodies(sig: inspect.Signature, args, kwargs)[source]

Bases: object

get_bag() → {}[source]
pop(k, *default)[source]
update(updates: {})[source]

firexkit.chain module

class firexkit.chain.InjectArgs(**kwargs)[source]

Bases: object

exception firexkit.chain.InvalidChainArgsException(msg, wrong_args: dict = None)[source]

Bases: Exception

firexkit.chain.get_label(sig: celery.canvas.Signature)[source]

The decorator is used to allow us to specify the keys of the dict that the task returns.

This is used only to signal to the user the inputs and outputs of a task, and deduce what arguments are required for a chain.

firexkit.chain.set_execution_options(sig: celery.canvas.Signature, **options)[source]

Set arbitrary executions options in every task in the sig

firexkit.chain.set_label(sig: celery.canvas.Signature, label)[source]
firexkit.chain.set_priority(sig: celery.canvas.Signature, priority: int)[source]

Set the priority execution option in every task in sig

firexkit.chain.set_queue(sig: celery.canvas.Signature, queue)[source]

Set the queue execution option in every task in sig

firexkit.chain.set_soft_time_limit(sig: celery.canvas.Signature, soft_time_limit)[source]

Set the soft_time_limit execution option in every task in sig

firexkit.chain.verify_chain_arguments(sig: celery.canvas.Signature)[source]

Verifies that the chain is not missing any parameters. Asserts if any parameters are missing, or if a reference parameter (@something) has not provider

firexkit.result module

exception firexkit.result.ChainException[source]

Bases: Exception

exception firexkit.result.ChainInterruptedException(task_id=None, task_name=None, cause=None)[source]

Bases: firexkit.result.ChainException

MESSAGE = 'The chain has been interrupted by a failure in microservice '
exception firexkit.result.ChainRevokedException(task_id=None, task_name=None)[source]

Bases: firexkit.result.ChainException

MESSAGE = 'The chain has been interrupted by the revocation of microservice '
exception firexkit.result.ChainRevokedPreRunException(task_id=None, task_name=None)[source]

Bases: firexkit.result.ChainRevokedException

exception firexkit.result.MultipleFailuresException(task_ids=('UNKNOWN'))[source]

Bases: firexkit.result.ChainInterruptedException

MESSAGE = 'The chain has been interrupted by multiple failing microservices: %s'
class firexkit.result.WaitLoopCallBack(func, frequency, kwargs)

Bases: tuple

property frequency

Alias for field number 1

property func

Alias for field number 0

property kwargs

Alias for field number 2

exception firexkit.result.WaitOnChainTimeoutError[source]

Bases: Exception

firexkit.result.disable_async_result(result: celery.result.AsyncResult)[source]
firexkit.result.find_all_unsuccessful(result: celery.result.AsyncResult, ignore_non_ready=False, depth=0) → {}[source]
firexkit.result.find_unsuccessful_in_chain(result: celery.result.AsyncResult) → {}[source]
firexkit.result.get_result_logging_name(result: celery.result.AsyncResult, name=None)[source]
firexkit.result.get_results(result: celery.result.AsyncResult, return_keys: Union[str, tuple] = (), return_keys_only: bool = True, merge_children_results: bool = False) → Union[tuple, dict][source]

Extract and return task results


result: The AsyncResult to extract actual returned results from return_keys: A single return key string, or a tuple of keys to extract from the AsyncResult.

The default value of None will return a dictionary of key/value pairs for the returned results.

return_keys_only: If True (default), only return results for keys specified by the task’s

@returns decorator or returns attribute. If False, returns will include key/value pairs from the bag of goodies.

merge_children_results: If True, traverse children of result, and merge results produced by them.

The default value of False will not collect results from the children.


If return_keys parameter was specified, returns a tuple of the results in the same order of the return_keys. If return_keys parameter wasn’t specified, return a dictionary of the key/value pairs of the returned results.

firexkit.result.get_results_upto_parent(result: celery.result.AsyncResult, return_keys=(), parent_id: str = None, **kwargs)[source]
firexkit.result.get_task_results(results: dict) → dict[source]
firexkit.result.is_result_ready(result: celery.result.AsyncResult, timeout=None, retry_delay=0.1)[source]

Protect against broker being temporary unreachable and throwing a TimeoutError

firexkit.result.populate_task_name(task_id, task, args, kwargs, **donotcare)[source]
firexkit.result.results2tuple(results: dict, return_keys: Union[str, tuple]) → tuple[source]
firexkit.result.wait_for_any_results(results, max_wait=None, poll_max_wait=0.1, **kwargs)[source]
firexkit.result.wait_on_async_results(*args, **kwargs)[source]
firexkit.result.wait_on_async_results_and_maybe_raise(results, raise_exception_on_failure=True, caller_task=None, **kwargs)[source]

firexkit.revoke module

class firexkit.revoke.RevokedRequests(timer_expiry_secs=60, skip_first_cycle=True)[source]

Bases: object

Need to inspect the app for the revoked requests, because AsyncResult.state of a task that hasn’t

been de-queued and executed by a worker but was revoked is PENDING (i.e., the REVOKED state is only updated upon executing a task). This phenomenon makes the wait_for_results wait on such “revoked” tasks, and therefore required us to implement this work-around.

classmethod get_revoked_list_from_app()[source]
classmethod instance(existing_instance=None)[source]
is_revoked(result_id, timer_expiry_secs=None)[source]
firexkit.revoke.get_chain_head(parent, child)[source]
firexkit.revoke.revoke_nodes_up_to_parent(starting_node, parent)[source]
firexkit.revoke.revoke_recursively(results, depth=1, terminate=True, wait=False, timeout=None)[source]

firexkit.task module

exception firexkit.task.DyanmicReturnsNotADict[source]

Bases: Exception

class firexkit.task.FireXTask[source]

Bases: celery.app.task.Task

Task object that facilitates passing of arguments and return values from one task to another, to be used in chains

RETURN_KEYS_KEY = '__task_return_keys'
property abog
property all_args
property args
property bag
property bound_args
property called_as_orig
classmethod convert_returns_to_dict(return_keys, result) → dict[source]
property default_bound_args
enqueue_child(chain: celery.canvas.Signature, add_to_enqueued_children: bool = True, block: bool = False, raise_exception_on_failure: bool = None, apply_async_epilogue: Callable[[celery.result.AsyncResult], None] = None, apply_async_options=None, **kwargs) → celery.result.AsyncResult[source]

Schedule a child task to run

property enqueued_children
property file_logging_dirpath
property from_plugin
static get_short_name(task_name)[source]
classmethod get_task_logfile(task_logging_dirpath, task_name, uuid)[source]
static get_task_logfilename(task_name, uuid)[source]
static get_task_logging_dirpath(file_logging_dirpath, hostname)[source]
get_task_return_keys() → tuple[source]
handle_exception(e, logging_extra=None, raise_exception=True)[source]
ignore_result = False
classmethod is_dynamic_return(value)[source]
property kwargs
map_args(*args, **kwargs) → dict[source]
map_input_args_kwargs(*args, **kwargs)[source]
property name_without_orig
property optional_args

dict of optional arguments to the microservice, and their values.

property pending_enqueued_children
abstract post_task_run(results, extra_events: Optional[dict] = None)[source]

Overrideable method to allow subclasses to do something with the BagOfGoodies after the task has been run

abstract pre_task_run(extra_events: Optional[dict] = None)[source]

Overrideable method to allow subclasses to do something with the BagOfGoodies before returning the results

print_precall_header(bound_args, default_bound_args)[source]
priority = None
rate_limit = None
reject_on_worker_lost = None
property request_soft_time_limit
request_stack = <celery.utils.threads._LocalStack object>
property required_args

list of required arguments to the microservice.

retry(*args, **kwargs)[source]

Retry the task, adding it to the back of the queue.

>>> from imaginary_twitter_lib import Twitter
>>> from proj.celery import app
>>> @app.task(bind=True)
... def tweet(self, auth, message):
...     twitter = Twitter(oauth=auth)
...     try:
...         twitter.post_status_update(message)
...     except twitter.FailWhale as exc:
...         # Retry in 5 minutes.
...         raise self.retry(countdown=60 * 5, exc=exc)

Although the task will never return above as retry raises an exception to notify the worker, we use raise in front of the retry to convey that the rest of the block won’t be executed.


args (Tuple): Positional arguments to retry with. kwargs (Dict): Keyword arguments to retry with. exc (Exception): Custom exception to report when the max retry

limit has been exceeded (default: @MaxRetriesExceededError).

If this argument is set and retry is called while an exception was raised (sys.exc_info() is set) it will attempt to re-raise the current exception.

If no exception was raised it will raise the exc argument provided.

countdown (float): Time in seconds to delay the retry for. eta (~datetime.datetime): Explicit time and date to run the

retry at.

max_retries (int): If set, overrides the default retry limit for

this execution. Changes to this parameter don’t propagate to subsequent task retry attempts. A value of None, means “use the default”, so if you want infinite retries you’d have to set the max_retries attribute of the task to None first.

time_limit (int): If set, overrides the default time limit. soft_time_limit (int): If set, overrides the default soft

time limit.

throw (bool): If this is False, don’t raise the

@Retry exception, that tells the worker to mark the task as being retried. Note that this means the task will be marked as failed if the task raises an exception, or successful if it returns after the retry call.

**options (Any): Extra options to pass on to apply_async().



To tell the worker that the task has been re-sent for retry. This always happens, unless the throw keyword argument has been explicitly set to False, and is considered normal operation.

revoke_child(result: celery.result.AsyncResult, **kwargs)[source]
property root_logger
property root_logger_file_handler
run(*args, **kwargs)[source]

The body of the task executed by workers.

send_event(*args, **kwargs)[source]

Send monitoring event message.

This can be used to add custom event types in :pypi:`Flower` and other monitors.


type_ (str): Type of event, e.g. "task-failed".

Keyword Arguments:
retry (bool): Retry sending the message

if the connection is lost. Default is taken from the :setting:`task_publish_retry` setting.

retry_policy (Mapping): Retry settings. Default is taken

from the :setting:`task_publish_retry_policy` setting.

**fields (Any): Map containing information about the event.

Must be JSON serializable.

serializer = 'json'
property short_name
property short_name_without_orig
store_errors_even_if_ignored = False
static strip_orig_from_name(task_name)[source]
property task_label

Returns a label for this task


8345379a-e536-4566-b5c9-3d515ec5936a 8345379a-e536-4566-b5c9-3d515ec5936a_2 (if it was the second retry) microservices.testsuites_tasks.CreateWorkerConfigFromTestsuites (if there was no request id yet)

property task_logfile
property task_logging_dirpath
track_started = False
typing = True
wait_for_any_children(pending_only=True, **kwargs)[source]

Wait for any of the enqueued child tasks to run and complete

wait_for_children(pending_only=True, **kwargs)[source]

Wait for all enqueued child tasks to run and complete

wait_for_specific_children(child_results, **kwargs: dict)[source]

Wait for the explicitly provided child_results to run and complete

property worker_log_file
class firexkit.task.PendingChildStrategy(value)[source]

Bases: enum.Enum

Available strategies for handling remaining pending child tasks upon successful completion of the parent microservice.

Block = (0, 'Default')
Continue = 2
Revoke = 1
exception firexkit.task.ReturnsCodingException[source]

Bases: Exception

class firexkit.task.TaskContext[source]

Bases: object

firexkit.task.banner(text, ch='=', length=78, content='')[source]
firexkit.task.convert_to_serializable(obj, depth=0)[source]
firexkit.task.get_attr_unwrapped(fun: callable, attr_name, *default_value)[source]

Unwraps a function and returns an attribute of the root function

firexkit.task.is_jsonable(obj) → bool[source]

Returns True if the obj can be serialized via Json, otherwise returns False

firexkit.task.parse_signature(sig: inspect.Signature) -> (<class 'set'>, <class 'dict'>)[source]

Parse the run function of a microservice and return required and optional arguments

firexkit.task.task_prerequisite(pre_req_task: celery.local.PromiseProxy, key: str = None, trigger: callable = <class 'bool'>) → callable[source]
Register a prerequisite to a microservice.
param pre_req_task

microservice to be invoked if trigger returns False

param key

key in kwargs to pass to the trigger. If None, all kwargs are passed

param trigger

a function returning a bool. When False is returned, then pre_req_task is enqueued

When adding multiple prerequisites, they must be added in reverse order (i.e. last one to run first)


the original function that was used to create a microservice