py-combtest API reference

Submodules

action module

This module provides the core classes for specifying operations and systematically generating combinations of them.

class combtest.action.Action(param)[source]

Bases: combtest.action.BaseAction

Convenience form of BaseAction: let the user provide a simple iterable of parameters, called OPTIONS.

OPTIONS = None
classmethod get_option_set()[source]

Return the full parameterization space for this Action type. For example: if you are testing a GUI and this Action sets the state of a checkbox, then perhaps this will return cls(True) and cls(False) for ‘set the box to checked’ and ‘set the box to unchecked’ respectively.

Note:
This returns cls instances, not the parameters themselves.
Returns:An iterable of instances of type cls
class combtest.action.BaseAction(param)[source]

Bases: object

An Action class bundles a function that performs an operation, and all the different parameterizations for that operation. The run() method should be overriden; it performs the operation. The get_option_set() classmethod should be overriden to provide all the valid parameterizations of the operation.

If you won’t know until runtime if an operation is valid, you can test for that in run(), and raise a combtest.walk.CancelWalk exception if you want to stop executing that Walk right away. You may want to cancel, for example, if the operation + parameter does not make sense given the current state of the feature. Giving that a solid example: suppose you are testing a GUI and it has a control hidden, and suppose the operation is to interact with that control. In the run() method you can test for that condition and raise a combtest.walk.CancelWalk.

State is represented as an object provided by the user, which will be passed to run() call.

classmethod from_json(obj)[source]
classmethod get_option_set()[source]

Return the full parameterization space for this Action type. For example: if you are testing a GUI and this Action sets the state of a checkbox, then perhaps this will return cls(True) and cls(False) for ‘set the box to checked’ and ‘set the box to unchecked’ respectively.

Note:
This returns cls instances, not the parameters themselves.
Returns:An iterable of instances of type cls
param

This instance’s parameters, as passed to the initializer.

run(param, state=None)[source]

This is called to actually run the operation. It does not need to be invoked directly by the user.

Parameters:
  • param (object) – this represents the paramaterization of the operation. For example: if you are testing a GUI and this Action represents “set the state of checkbox1”, then param would be the checkbox1 setting.
  • state (object) – this object is passed along from Action-to-Action in a given Walk as the Actions execute. It represents any state that needs to be provided to the Action, or which may be modified by the Action. If you are testing a GUI for example, perhaps this is a handle to an instance of the GUI.

:raises combtest.walk.CancelWalk: the operation can raise combtest.walk.CancelWalk to cancel the execution of a combtest.walk.Walk at this point.

should_cancel(walk, idx)[source]

This method is called before a Walk containing this Action is run. This gives an opportunity to prevent the Walk from running if this combination just doesn’t make any sense.

Parameters:
  • walk (Walk) – the walk we are in
  • idx (int) – the index we are at in the walk
Returns:

True if the Walk should not be run, else False

to_json()[source]

Return something JSONifiable that can uniquely represent this instance.

exception combtest.action.CancelWalk[source]

Bases: exceptions.RuntimeError

Raised to immediately stop the execution of a combtest.walk.Walk.

class combtest.action.OptionSet(options, action_class)[source]

Bases: object

An OptionSet is an iterable that provides instances of a given combtest.action.Action class. This is typically used as a return value from an Action’s get_option_set() method.

classmethod from_json(obj)[source]
next()[source]

Lazy iteration, meaning: generate the actual Actions on the fly to try to minimize mem footprint. :raise StopIteration: when the underlying options iterator is exhausted

to_json()[source]
class combtest.action.SerialAction(param)[source]

Bases: combtest.action.Action

A SerialAction is a type of Action which cannot be run in parallel with others, e.g. for when we are running a bunch of ‘Walks’ in parallel. Thinking in terms of parallel algorithms: this represents a pinch point where an operation must be serialized with everything else. In terms of testing: it is part of a test where we can’t run other cases in parallel; e.g. if we are twiddling some global config option that affects all running test cases. We change the option once for all test cases after all outstanding work has finished.

Fundamentally, a SerialAction is run a single time for a given parameterization, regardless of how many Walks contain it. The part of the Walks prior to the SerialAction will be run first.

Example: suppose we have three Walks:
  • [Action1(1), Action2(2), SerialAction1(1), Action3(3)],
  • [Action1(1), Action2(3), SerialAction1(1), Action3(3)],
  • [Action1(1), Action2(2), Action3(3), SerialAction1(1), Action4(4)]

We can run the first ‘segment’ of each Walk in parallel: * [Action1(1), Action2(2)], * [Action1(1), Action2(3)], * [Action1(1), Action2(2), Action3(3)]

Then, single a thread we will run SerialAction1(1), then in parallal we can run: * [Action3(3)], * [Action3(3)], * [Action4(4)]

combtest.action.get_random_action_by_class(action_class)[source]

Get an instance of the given Action, randomly chosen from the set its Action.get_option_set() produces. :return: An instance of action_class.

bootstrap module

We need some way to bootstrap the rpyc-based combtest.worker.CoordinatorService. That can include bootstrapping it on a remote node if we want to take advantage of a whole cluster. We don’t need to make too many assumptions to do so. Let’s provide a general interface for accomplishing this (ServiceHandleArray), and a base implementation for bootstrapping locally using multiprocessing (ServiceHandler_Local). The client can swap out ServiceHandlers to provide alternative bootstrapping logic. ServiceHandleArray and ServiceHandler simply provide the interface that a combtest.worker.ServiceGroup uses to bootstrap the service instances.

Note:
A paramiko-based ServiceHandler class for SSH connections can be found in combtest.ssh_handle.ServiceHandler_SSH.
class combtest.bootstrap.ConnectionInfo(ip, port, service_info)[source]

Bases: object

This is a simple struct-like object meant to wrap all the stuff needed to bootstrap a service.

Parameters:
  • ip (str) – A string that can be passed as the first argument to rpyc.connect. Typically this will be an IP address for our purposes, but could also be a hostname.
  • port (int) – The port number where the service can be found
  • service_info (object) – Additional information your class may need for bootstrapping the service. For the SSH implementation for example, these will be some authentication bits.
class combtest.bootstrap.ServiceHandleArray(connection_infos, spawn=True)[source]

Bases: object

A ServiceHandlerArray is a collection of ServiceHandlers, together with dynamic foreach-style dispatch on attribute access. Example:

>>> sa.ServiceHandleArray(blah blah blah)
>>> sa.q(1)

this will dispatch an access to the q attribute of each contained ServiceHandler in parallel.

Parameters:
  • connection_infos (iterable) – An iterable of ConnectionInfo which describe how we will start the services up.
  • spawn (bool) – True if we should spawn the services during initialization, False otherwise. The user can later spawn them manually using spawn().
REMOTE_CONNECTION_CLASS

alias of ServiceHandler

attach(connection_info, instance)[source]

Attach a single instance to this array.

is_alive
Returns:True if we have any connections, False otherwise. This is not directly related to whether the services are up; e.g. they may have died for some reason.
shutdown(hard=False)[source]

Shutdown all instances this array is tracking.

Parameters:hard (bool) – if False, we will assert that instances shut down cleanly, otherwise we won’t.
classmethod shutdown_all()[source]

Shutdown all arrays which have not yet been shut down.

spawn(connection_info)[source]

Spawn a single instance and attach it to this array.

Parameters:connection_info (ConnectionInfo) – The ConnectionInfo that describes how to start this instance.
spawn_many(connection_infos=None)[source]

Spawn and attach a bunch of instances.

Parameters:connection_infos (iterable) – if None, we will spawn using the ConnectionInfo s passed to the initializer. Otherwise, connection_infos should be an iterable of ConnectionInfos.
class combtest.bootstrap.ServiceHandler(connection_info)[source]

Bases: object

A ServiceHandler implements a way of bootstrapping an rpyc-based service. This base class defines the interface and should be overridden.

Parameters:connection_info (ConnectionInfo) – Specifies where the service should be contactable once it is bootstrapped, and any additional information needed to bootstrap it.
shutdown()[source]

Called to shut the service down. Should be idempotent unless the class writer is very careful about how they shut their services down manually. The overridden method should call the super-method.

start_cmd(service_class, service_run_func='combtest.worker.start_service_by_name')[source]

This method will be called to actually do the bootstrapping work. It should be overridden.

Parameters:
  • service_class (<str|rpyc.Service>) – A full __qualname__ of the service we are bootstrapping, or the class itself.
  • service_run_func (str) – optional override of the function that will be used on the remote side to start the service up. Needs to be a __qualname__. Typically this can be left default.
class combtest.bootstrap.ServiceHandler_Local(connection_info)[source]

Bases: combtest.bootstrap.ServiceHandler

A ServiceHandler implementation foor bootstrapping locally via simple multiprocessing.Process call out.

shutdown()[source]

Called to shut the service down. Should be idempotent unless the class writer is very careful about how they shut their services down manually. The overridden method should call the super-method.

start_cmd(service_class, service_run_func='combtest.worker.start_service_by_name')[source]

This method will be called to actually do the bootstrapping work. It should be overridden.

Parameters:
  • service_class (<str|rpyc.Service>) – A full __qualname__ of the service we are bootstrapping, or the class itself.
  • service_run_func (str) – optional override of the function that will be used on the remote side to start the service up. Needs to be a __qualname__. Typically this can be left default.

config module

Getters, setters, and loaders of config. Config includes e.g. SSH authentication stuff, port numbers to use, etc.

There are two layers of config provided: config loaded from file via refresh_cfg, and runtime-provided overrides to those. The getters given below will enforce that ordering. The user provides overrides via the setters.

combtest.config.CONFIG_LOC = 'combtest.cfg'

Location of a config file to load, if the user wants to load config values that way

combtest.config.DEFAULT_LOGGER_PORT = 6186

Default port number for interproc communication for the logger

combtest.config.DEFAULT_MAX_THREAD_COUNT = 12

Default number of workers executing a combtest.walk.Walk at a time for a given service instance.

combtest.config.DEFAULT_SERVICE_PORT = 6187

Default port an rpyc service should listen on e.g. if we start up a remote service

combtest.config.get_logger_port()[source]

Get the port we will expose locally for remote loggers to connect to.

combtest.config.get_machine_ips()[source]

Get a list of IPs where we should try to set services up

combtest.config.get_max_thread_count()[source]

Get the max thread count for combtest.worker.ThreadPool.

combtest.config.get_service_port()[source]

Get the port number where we should expect to find an rpyc service running, once it is bootstrapped.

combtest.config.get_ssh_options()[source]

Get all SSH authentication options in a single call. :returns: a dict of options

combtest.config.get_ssh_passwords()[source]

Get passwords for SSH authentication. :return: password, map of ip->password

combtest.config.get_ssh_rsa_keys()[source]

Get paths to RSA keys that we can use for SSH authentication. This can include a single file, or one file per e.g. IP. :return: path, map of ip->path

combtest.config.get_ssh_usernames()[source]

Get usernames for SSH authentication. :return: username, map of ip->username

combtest.config.refresh_cfg()[source]

Call to refresh config loaded from file.

combtest.config.set_service_port(port)[source]

Set the port number where we should expect to find an rpyc service running, once it is bootstrapped.

encode module

Easy serialization/deserialization mechanism using JSON. It uses a pair of methods: an instance level to_json() method and a classmethod from_json() which are used, when available, to convert b/w an instance and JSON.

We use this mechanism to serialize combtest.action.Action and combtest.walk.Walk instances for printing to a trace file, or other logs where human readability is handy. The reason we don’t use a custom encoder/decoder pair for those two classes is that the user can attach a param which has no default JSON behavior. We let them write to_json() etc. if they want param to be a member of a custom class.

combtest.encode.decode(json_str)[source]

Decode a JSON string which was provided by encode(). Will leverage a cache for single-instancing Actions. An Action is defined by a single, immutable param, so we are safe to use interning.

combtest.encode.encode(obj)[source]

Return JSON equivalent to the provided obj, if possible. Will raise TypeError if it isn’t possible.

forkjoin module

Simple fork/join implementation, modelled after Java’s lib for the same. This doesn’t attempt to make any scheduling optimizations for e.g. the sake of minimizing makespan. Threads simply share a queue and pop work off of it until the queue is drained.

Note:
This is not designed well for bytecode-level work. This is for I/O heavy work like e.g. performing network roundtrips, doing disk I/O, etc. If you need to dodge the GIL, you’ll want to use something else.
class combtest.forkjoin.WorkItem(func, *args, **kwargs)[source]

A simple container representing a work item to be done.

Parameters:
  • func (function) – A pointer to the work callback
  • args – args to pass to the work callback
  • kwargs – kwargs to pass to the work callback
combtest.forkjoin.fork_join(work, suppress_errors=False, max_spawn_threads=60)[source]

Poor man’s fork/join implementation. Run all work items in parallel up to some maximum number of simultaneous threads. Wait for all work to finish, then return results. Maximum number of threads that will run in parallel for a given job will be max_spawn_threads.

Parameters:
  • work (iterable) – an iterable of WorkItems that supports direct indexing and len
  • suppress_errors (bool) – if False, we will print any exceptions to stderr, otherwise we won’t. Does not affect the return value.
  • max_spawn_threads (int) – The max number of threads running simultaneously.
Returns:

A list of returned items, in 1:1 correspondence to the WorkItems in ‘work’. That is: the first result corresponds to the first WorkItem, etc. If nothing is explicitly returned by an item, its ‘return value’ in this list will be None.

replay module

Methods for replaying a combtest.walk.Walk. Used via function call, or main. This includes the ability to replay the Walk from a trace file produced during a run.

combtest.replay.load_from_master(log_file, walk_id)[source]

Load a Walk from a master log file. A master log file provides paths to trace files which were produced by Walk running services. This presumes the trace files are available locally, so if they were produced on a remote node, you’ll need to make them available by whatever method you prefer.

Parameters:
  • log_file (str) – a path to the master log file
  • walk_id (int) – a walk_id which appears in the trace file
combtest.replay.load_from_trace(trace_file, walk_id)[source]

Load a Walk from a trace file.

Parameters:
  • trace_file (str) – a path to the trace file
  • walk_id (int) – a walk_id which appears in the trace file
combtest.replay.replay_walk(walk_to_run, step=False, replay_func_qualname=None, state=None)[source]

Run a single combtest.walk.Walk.

Parameters:
  • walk_to_run (Walk) – self evident
  • step (bool) – if True, step Action-by-Action through the Walk; the user hits a key to proceed to the next Action.
  • replay_func_qualname (str) – qualname of a replay function to use. Typically this will be combtest.runner.replay_walk(), but the user is free to provide their own.
  • state (object) – state passed to the Walk for execution.
combtest.replay.replay_walk_by_id(log_file, walk_id, step=False, replay_func_qualname=None, state=None)[source]

Run a single combtest.walk.Walk. Load it by deserializing it from a trace file.

Parameters:
  • log_file (object) – ‘logs’ object returned from run_tests()
  • walk_id (int) – a walk_id that appears in the trace file, or one of the trace files referenced by the master log file.
  • step (bool) – if True, step Action-by-Action through the Walk; the user hits a key to proceed to the next Action.
  • replay_func_qualname (str) – qualname of a replay function to use. Typically this will be combtest.runner.replay_walk(), but the user is free to provide their own.
  • state (object) – state passed to the Walk for execution.

runner module

This module provides a set of tools for running Walks. This includes running Walks in stages using e.g. SerialActions, and ways to run Walks in parallel across a cluster.

class combtest.runner.ContinuingWalkServiceGroup(*args, **kwargs)[source]

Bases: combtest.worker.ServiceGroup

A ServiceGroup designed for running Walks in stages. We need to be able to run Walks in stages in order to satisfy the semantic of combtest.action.SerialAction during a run.

gather_all_runner_states(worker_ids)[source]

Gather stats from remote workers. User may want to extend this to include any new stats they think of.

Parameters:worker_ids (dict) – a mapping hostname/ip->iterable of worker_ids from which we should gather responses.
Returns:count of Walk segments run, count of Walk execution errors, count of Walks run, list of walk_ids of failed Walks
gather_all_states(worker_ids, full=False)[source]

Gather states from all the given workers.

Parameters:worker_ids (dict) – a mapping (hostname/ip, port)->worker_id
Returns:a list of states in no particular order.
gather_runner_state(connection, worker_id)[source]

Gather state from a remote runner for a given worker. This is an optional internal state object provided by WalkRunner.get_state().

Parameters:
  • connection (tuple) – str hostname/ip of the remote service, int port number
  • worker_id (int) – id of remote worker, as returned when starting the work (see scatter_work()).
gather_state(connection, worker_id, full=False)[source]

Gather state from a remote service for a given worker. A running Walk is free to mutate its state, and sometimes that is what really constitutes the “response” or “output” of a quantum of work.

Parameters:
  • connection (tuple) – str hostname/ip of the remote service, int port number
  • worker_id (int) – id of remote worker, as returned when starting the work (see scatter_work()).
run(work, state)[source]

Scatter work to all remote services, wait for it to finish executing, then return gathered states.

Parameters:
  • work (iterable) – iterable of work items
  • state (object) – make sure it is picklable
Returns:

a list of states, whose ordering is not particularly important.

scatter_work(epoch, id_map=None, max_thread_count=None, state=None)[source]

Scatter some Walk segments out to the remote workers.

Parameters:
  • epoch (iterable) – iterable of (walk_id, branch_id, Walk)
  • id_map (dict) – optional map walk_id->(ip, port) of service currently holding that Walk’s state.
Returns:

an updated id_map

start_all_on(work_item, shared_state)[source]

Start a single item of work running on all remote services.

Parameters:
  • work (callable) – a single work item
  • shared_state (object) – make sure it is picklable. Will be shared across all threads on a given service.
Returns:

a dict mapping (hostname/ip, port) -> worker_id

start_remote_logging(ip, port, log_dir=None, log_namespace=None, verbose=1)[source]

Start logging on all remote services.

Parameters:
  • ip (str) – hostname or ip of local machine, where a log server is running
  • port (int) – port number of local log server
  • log_dir (str) – path to remote log directory the service should use
  • log_namespace (str) – prefix for log files
  • verbose (int) – 0-2 verbosity setting. As 2 an additional verbose level log will be produced.
Returns:

a dict mapping remote service hostname/ip->remote log file locations

update_remote_states(ip, worker_ids, walk_ids, state)[source]

Push an update to the states on the remote executors. This is sometimes useful when a combtest.action.SerialAction executes and affects the state of some set of Walks.

Parameters:
  • ip (str) – hostname or IP where the remote executor can be found
  • worker_ids (iterable) – iterable of ints, corresponding to worker_ids that can be found on the remote executor.
  • state (object) – something JSONifiable; the remote state will be updated via its update() method. The simplest way to support this is by using a dict as a state.
class combtest.runner.MultistageWalkRunner(*args, **kwargs)[source]

Bases: combtest.runner.WalkRunner

Similar to WalkRunner, but walks are sent with an id that allows them to reclaim state from a prior Walk. Typically this is used for Walks to be executed in multiple stages: the second stage is sent with an id matching the first.

Note:
When we say “reclaim state” we are referring to the state.
prep_work_call(work)[source]

Called to wrap a Walk into a callable that accepts a single argument: the state.

run_walk(walk_id, branch_id, walk_to_run, state, serial_action=None)[source]

Called to run an individual Walk.

class combtest.runner.MultistageWalkRunningService[source]

Bases: combtest.runner.WalkExecutorService

A WalkExecutorService for executing Walks in multiple stages. That is: we can execute Walks in parts/slices. A walk_id is used to refer to the Walks uniquely and is the same across all slices.

WALK_RUNNER_TYPE

alias of MultistageWalkRunner

exposed_gather_state(worker_id)[source]

Return the state associated with the given worker.

Parameters:worker_id (int) – the worker_id returned from the func used to start work
repack_ctx(state, **repack_kwargs)[source]

Called back when a new worker/runner is made to pack up kwargs that we pass to the runner’s initializer.

work_repack(work, shared_state, ctx=None, **repack_kwargs)[source]

This is called back to repackage each work item sent to the service. The call is an opportunity to e.g. do some deserialization, wrap the Walk in a WalkRunner, or anything else the user needs to prep the work for execution.

Parameters:
  • work (object) – the work to execute; typically this will be e.g. a JSONified Walk.
  • shared_state (object) – a state copied in for executing the Walk
  • ctx (object) – the object returned by repack_ctx before
class combtest.runner.Result(walk_count, error_count, segment_count, elapsed, states, logs, failed_tests)

Bases: tuple

elapsed

Alias for field number 3

error_count

Alias for field number 1

failed_tests

Alias for field number 6

logs

Alias for field number 5

segment_count

Alias for field number 2

states

Alias for field number 4

walk_count

Alias for field number 0

class combtest.runner.WalkExecutorService[source]

Bases: combtest.worker.CoordinatorService

Simplest rpyc-based services for running walks across the cluster: the user only needs to provide a list of Walks to their corresponding ServiceGroup. The rest is handled.

Note:
The user should probably not be calling into this directly. They should be starting it up via rpyc.
WALK_RUNNER_TYPE

Override the type of WalkRunner used, here or in a child class

alias of WalkRunner

WORKER_TYPE

Override the type of ThreadPool used for executing Walks, here or in a child class

alias of WalkThreadPool

exposed_clean_worker(worker_id)[source]

Toss all state related to a worker. Implies a join first. Gives the user full control over when we reap memory, since they may still want to retrieve/pickle results. It isn’t enough to wait until after they gather the response, since a.) they may not have “obtained” the response value yet, and b.) they may just be gathering intermediate results.

exposed_gather_state(worker_id)[source]

Return the state associated with the given worker.

Parameters:worker_id (int) – the worker_id returned from the func used to start work
repack_ctx(state, **repack_kwargs)[source]

Called back when a new worker/runner is made to pack up kwargs that we pass to the runner’s initializer.

work_repack(work, shared_state, ctx=None, **repack_kwargs)[source]

This is called back to repackage each work item sent to the service. The call is an opportunity to e.g. do some deserialization, wrap the Walk in a WalkRunner, or anything else the user needs to prep the work for execution.

Parameters:
  • work (object) – the work to execute; typically this will be e.g. a JSONified Walk.
  • shared_state (object) – a state copied in for executing the Walk
  • ctx (object) – the object returned by repack_ctx before
class combtest.runner.WalkRunner(reporting_interval=10000)[source]

Bases: object

A WalkRunner simply wraps a Walks execution method with some tracking for reporting stats. The user is free to inherit and add more stats as they see fit.

count_cancel()[source]

Called when a Walk is canceled.

count_error()[source]

Called on a Walk execution error.

count_total()[source]

Called to count the number of Walks/Walk segments we started running.

prep_work_call(walk_to_run, walk_state)[source]

Called to wrap a Walk into a callable that accepts a single argument: the state.

run_walk(walk_to_run, state)[source]

Called to run an individual Walk.

class combtest.runner.WalkThreadPool(max_thread_count=None, work=None, **kwargs)[source]

Bases: combtest.worker.ThreadPool

on_error_item(work_item, state, exc, tb)[source]

Called if a work item errors out (via Exception).

combtest.runner.replay_walk(walk_to_run, step=False, log_errors=True, state=None)[source]

Run a single combtest.walk.Walk

Parameters:
  • walk_to_run (Walk) – self evident
  • step (bool) – if True, step Action-by-Action through the Walk; the user hits a key to proceed to the next Action.
  • log_errors (bool) – log exceptions to the logger if True
  • state (object) – state passed to the Walk for execution.
combtest.runner.run_tests(walk_order, state=None, verbose=1, logger_port=None, runner_class=<class 'combtest.runner.MultistageWalkRunningService'>, service_group_class=<class 'combtest.runner.ContinuingWalkServiceGroup'>, service_infos=None, service_handler_class=<class 'combtest.bootstrap.ServiceHandler_Local'>, max_thread_count=None, gather_states=False, log_dir=None, use_accountant=False)[source]

Run a collection of combtest.walk.Walk. This should be the main way to execute Walks for most users. This is the only interface that supports correct execution of a combtest.action.SerialAction.

You can provide some instance to serve as the state passed around during the tests. There are two important details to know about this:

  • The state must be JSON-ifiable, but py-combtest provides a convenience pattern to help with that. See encode().
  • Shallow copies of the state will be made, via copy.copy(), since each test owns its own copy. You may want to e.g. override __copy__ if the details of the copy are important to you.
Parameters:
  • walk_order (iterable) – An iterable of iterables which produce combtest.action.Action. Example: a list of iterables produced by MyActionClass.get_option_set().
  • state (object) – a state to copy and pass to the Walks when we execute them.
  • verbose (int) – 0-2 verbosity setting. At 2 an additional verbose level log will be produced.
  • logger_port (int) – the port number where our local logger should accept data.
  • runner_class (combtest.worker.CoordinatorService) – the type of Walk execution service to use.
  • service_group_class (combtest.worker.ServiceGroup) – the type of ServiceGroup we will use to coordinate remote executors
  • service_infos (iterable) – An iterable of any extra infos we need to bootstrap the remote services. See combtest.bootstrap.ServiceHandleArray.
  • service_handler_class (combtest.bootstrap.ServiceHandler) – Type of ServiceHandler to use to bootstrap the services.
  • gather_states (bool) – if True or 1, gather and return all states from the remote services at the end of the run. Will be returned as a mapping ip->[state, …]. if 2, gather extra info about the run of the walk, such as if it was canceled. else the returned states will be None
  • max_thread_count (int) – Max number of Walk executing threads that each service will use.
  • log_dir (str) – Directory where we will store traces, debug logs, etc. Remote services will also attempt to store logs to the same path. If Accountant is enabled, this argument will override any directory choice that Accountant may make automatically. Use with caution on the regression wheel, since this means Accountant won’t automatically point to a log server directory.
  • use_accountant (bool) – if True, use Accountant to record results. This will also cause testdb report back where possible (e.g. during a regression wheel run). You can also provide a SimpleAccountant here, which we will gladly use. It must not already be in a testcase or suite context.
Raises:

RuntimeError – when remote services can’t be established and connected to.

Returns:

count of walks run, count of walk execution errors, count of walk segments run, total elapsed time, remote states if gather_state == True else None, the location of the master log file, where applicable.

ssh_handle module

SSH-based connection for bootstrapping remote rpyc services. The user does not have to use this, but it is provided as a default implementation for those not wanting to write their own connection logic.

class combtest.ssh_handle.ServiceHandler_SSH(connection_info)[source]

Provide an SSH-based method of bootstrapping our rpyc Walk executors.

Parameters:connection_info (ConnectionInfo) – a combtest.bootstrap.ConnectionInfo which optionally provides a service_info dictionary containing SSH authentication info. You can get such a dict from combtest.config.get_ssh_options()

utils module

Misc utils

class combtest.utils.RangeTree(min_idx, max_idx, root_value=())[source]

A representation of a range, with the levels of the tree representing hierarchical splits of the range. O(N) mem in the number of sub-ranges. Example:

-->972
  -->486
    -->243
    -->243
  -->486
    -->243
    -->243
combtest.utils.get_class_from_qualname(name)[source]

Resolve a fully qualified class name to a class, attempting any imports that need to be done along the way. :raises: ImportError or AttributeError if we can’t figure out how to import it

combtest.utils.get_class_qualname(cls)[source]

Give a fully qualified name for a class (and therefore a function ref as well). Example: ‘combtest.action.Action’.

Note:
Python3 has this, or something like it, called __qualname__. We implement our own here to retain control of it, and give us Python2 compat.
combtest.utils.get_my_IP()[source]

Get our best guess at one of our local IPs that others could use to contact us (e.g. to connect to a local log server).

combtest.utils.set_my_IP(ip)[source]

Set the IP that the rest of the system should assume is our local IP address. This may work for a hostname as well, but that isn’t well tested.

Parameters:ip (str) – ip as a string
combtest.utils.standard_strftime(dt)[source]

A “standard” strftime conversion of a datetime.

walk module

This module provides ways of systematically combining operations, sets of operations, etc., and executing such sequences of operations.

class combtest.walk.Walk(*elems)[source]

A Walk, named after the graph theory concept, is a list of combtest.action.Action to execute, together with a piece of state that the Actions can manipulate. The notion here is that each Action performs an operation that takes us through a transition in state/action space.

Parameters:elems (iterable) – An iterable of combtest.action.Action
append(elem)[source]

Logically equivalent to list.append

execute(state, log_errors=True)[source]

Execute the Actions in order. If an Action raises CancelWalk, we will stop executing immediately. :return: True if the Walk was run successfully or CancelWalk was raised, False otherwise.

exception combtest.walk.WalkFailedError[source]

Raised when a combtest.walk.Walk failed to execute to completion for any reason (e.g. one of its operations raised an Exception).

class combtest.walk.WalkOpTracer(log_dir, *namespace)[source]

Traces a Walk portion + its adjacent SerialAction and walk_id. The id is consistent across portions so that you can relate them back together later.

class combtest.walk.WalkOptions(walk_order)[source]

A WalkOptions accepts a set of Action and SerialAction options and produces an iterable of Epoch. Each Epoch represents a set of Walk segments which can run in parallel, and an optional SerialAction which should be run before the Walk portions.

Not threadsafe.

Parameters:walk_order (iterable) – An iterable Action types
sizes
Returns:A tuple of the size of the option sets for each Action type in this WalkOptions.

worker module

This module provides a set of mechanisms for dispatching “work” across a set of worker nodes. “Work” can mean “execute sets of arbitrary Python code”. This system is made of a few pieces:

  • A basic thread pool implementation. There is a multiprocessing.ThreadPool implementation in Python already, but I want the control and flexibility of having my own. This and its decendents are the “workers” that actually e.g. execute test cases or run fuzz+stress load (see ThreadPool). Thread counts can be chosen by the client, by default, whatever.
  • An rpyc-based coordinator that deserializes work sent to it and sends it to a worker/executor of some sort, which will typically be a ThreadPool (see CoordinatorService below). Typically there will be one of these per node running in its own process, or at least one of a given “type” per node. The only constraint is that it needs to have a 1:1 ip/port mapping. Typically there will be one central coordinator (ServiceGroup) paired with one CoordinatorService per node.
  • A central coordinator running in a single process that is passed work and sends it to all paired remote coordinators for execution (ServiceGroup).

Work is sent in some quantum defined by the client code and passed from ServiceGroup->CoordinatorService->ThreadPool. Responses can be sent back.

  • The quantum should be large enough for a worker to keep its children busy
  • The child may quit before all work is done if e.g. a failure limit is reached, or a fatal error hit.
  • A parent can specify how many workers a child can have, otherwise a worker is free to decide.
  • A parent can raise signal or other condition to a child to tell it to quit early. It must obey and send back any accumulated responses, or have its results abandoned.
  • Parent must deal with errors where the child dies silently (e.g. interpreter crash hard enough that we don’t run atexit handlers)
class combtest.worker.CoordinatorService[source]

Bases: rpyc.core.service.Service

Receives work from a remote process via rpyc, starts the work running via ThreadPool or some decendent class.

DEFAULT_MAX_THREAD_COUNT = 12

Override point for max number of threads for each worker to have. The other ways to set this are 1. via config.py, and 2. via an option passed at runtime (e.g. via exposed_start_workers_on()).

WORKER_TYPE

a ThreadPool class that workers should be instantiated from User can override

alias of ThreadPool

exposed_add_work(worker_id, work, repack_kwargs=None)[source]

Add work to an existing worker. The original state will be used. If the user wants work to be executed with a different state, they start a new worker up via exposed_start_work().

Parameters:
  • worker_id (int) – the worker_id returned from the func used to start work
  • work (object) – the work item
exposed_clean_worker(worker_id)[source]

Toss all state related to a worker. Implies a join first. Gives the user full control over when we reap memory, since they may still want to retrieve/pickle results. It isn’t enough to wait until after they gather the response, since a.) they may not have “obtained” the response value yet, and b.) they may just be gathering intermediate results.

exposed_gather_state(worker_id)[source]

Return the state associated with the given worker.

Parameters:worker_id (int) – the worker_id returned from the func used to start work
exposed_join_all_workers()[source]

Like join_workers, but tosses all outstanding workers.

exposed_join_workers(worker_id)[source]

Join all working threads in a worker and toss the worker for further use. The responses and states will stay around until the client “cleans” the worker state. That means this is not a stateless service. The user needs to beware of leaking resources.

Parameters:worker_id (int) – the worker_id returned from the func used to start work
exposed_run(work, max_thread_count=None, state=None, repack_kwargs=None)[source]

Run work synchronously. Return a worker_id so that responses/states can be reclaimed.

exposed_signal_stop(worker_id)[source]

Signal that a worker should stop. May not be immediate since this is intended to be a “clean” shutdown where workers can finish their current work item.

Parameters:worker_id (int) – the worker_id returned from the func used to start work
exposed_start_remote_logging(ip, port, **kwargs)[source]

Start sending logs to the log server at the given ip+port.

Parameters:
  • ip (str) – hostname or ip where the log server is running
  • port (int) – port number where the log server is running
exposed_start_work(work, state=None, max_thread_count=None, repack_kwargs=None)[source]

Called by remote side to start some work running asynchronously. The work and state are pickled and pulled through to this side in whole, rather than accessing via netrefs.

Returns:a worker_id to refer back to this work later for e.g. stopping it or getting responses.
exposed_start_workers_on(work, max_thread_count=None, state=None, repack_kwargs=None)[source]

Similar to start_work, but a single item is sent, and all threads in the new worker will run that single item.

Returns:The worker_id of the worker executing the work
on_connect(conn)[source]

rpyc.Service is designed for this to be the de-facto __init__ function. So we are doing our initialization here.

repack_ctx(state, **repack_kwargs)[source]

Called back when a new worker/runner is made to pack up kwargs that we pass to the runner’s initializer.

work_repack(work, state, ctx=None, **repack_kwargs)[source]

This is called back to repackage each work item sent to the service. The call is an opportunity to e.g. do some deserialization, wrap the Walk in a WalkRunner, or anything else the user needs to prep the work for execution.

Parameters:
  • work (object) – the work to execute; typically this will be e.g. a JSONified Walk.
  • state (object) – a state copied in for executing the Walk
  • ctx (object) – the object returned by repack_ctx before
class combtest.worker.ServiceGroup(service_name, service_infos=None, service_handler_class=<class 'combtest.bootstrap.ServiceHandler_Local'>, spawn_services=True, spawn_clients=True)[source]

Bases: object

Handles to running service instances across the nodes via e.g. SSH, and rpyc-based clients connected to those. This has a number of functions for dispatching work to the execution services, and gathering responses and stats.

Warning

We spawn connections to all nodes. b/w the SSH connections and client connections we may run into scalability issues on giant clusters. There are obvious optimizations we can do here, such as lazy connecting, but it would require some extra hooks. Please file a bug later when/if we prove there is such a problem and a need for a fix.

DEFAULT_INSTANCE_COUNT = 3

If the user does not provide a specific enumeration of ip/port where we should set services up running, this will be the default number of services to start up, which we will start locally.

SPAWN_TIMEOUT = 30

Max time we spend waiting for a service spawn to succeed. We use this since e.g. starting services does not mean the service is immediately available, due to waiting for imports to happen. Hence the clients need to retry for a bit until the services come up. This timeout is measured in seconds.

WORK_QUANTUM_SIZE = 1000

The maximum chunk of work we will send in a single RPC call. This is to keep the request size sane. If the user gives us an iterable that is generating a massive amount of work on-the-fly we may not be able to instantiate it all and hold it in memory. So we send batches that are this size at maximum.

clients

Returns a mapping (ip, port)->client “root” (see rpyc connect docs)

gather_all_states(worker_ids)[source]

Gather states from all the given workers.

Parameters:worker_ids (dict) – a mapping (hostname/ip, port)->worker_id
Returns:a list of states in no particular order.
gather_state(connection, worker_id)[source]

Gather state from a remote service for a given worker. A running Walk is free to mutate its state, and sometimes that is what really constitutes the “response” or “output” of a quantum of work.

Parameters:
  • connection (tuple) – str hostname/ip of the remote service, int port number
  • worker_id (int) – id of remote worker, as returned when starting the work (see scatter_work()).
classmethod give_up()[source]

Signal that all ServiceGroup should give up trying to connect to remote services, send work, etc. and bail immediately. We can use this e.g. if we receive a signal locally and don’t want to wait for a long-running scatter or gather.

join(hard=False)[source]

Wait for all workers on all remote services to complete. We can have a finer-grained function later if it is helpful.

Parameters:hard (bool) – ignore errors if True
Raises:EOFError, ReferenceError, RuntimeError – if we have an issue while joinging (e.g. the remote side died)
run(work, state=None)[source]

Scatter work to all remote services, wait for it to finish executing, then return gathered states.

Parameters:
  • work (iterable) – iterable of work items
  • state (object) – make sure it is picklable
Returns:

a list of states, whose ordering is not particularly important.

scatter_work(work, max_thread_count=None, state=None)[source]

Partition the provided iterable of work into roughly even-sized portions and send them to each of the remote services. state will be copied to each node independently. The user must handle the logic of retrieving results and stitching them together. See gather_state().

Parameters:
  • work (iterable) – iterable of work items
  • max_thread_count (int) – override of how many threads each remote executor should have
  • state (object) – make sure it is picklable
Returns:

a dict mapping (hostname/ip, port) -> worker_id

shutdown(hard=False)[source]

Shut down both clients and services.

Parameters:hard (bool) – if hard, we will ignore any errors trying to shut down
shutdown_clients(hard=False)[source]

Shut down all running clients.

Parameters:hard (bool) – if hard, we will ignore any errors trying to shut down
shutdown_services(hard=False)[source]

Shut down all running services.

Parameters:hard (bool) – if hard, we will ignore any errors trying to shut down
spawn()[source]

Spawn both services and clients (in that order).

spawn_clients()[source]

Spawn rpyc clients to the remote services. Assumes remote services are up, or will be within SPAWN_TIMEOUT.

spawn_services()[source]

Spawn remote services via some bootstrap method.

start_all_on(work_item, shared_state=None)[source]

Start a single item of work running on all remote services.

Parameters:
  • work (callable) – a single work item
  • shared_state (object) – make sure it is picklable. Will be shared across all threads on a given service.
Returns:

a dict mapping (hostname/ip, port) -> worker_id

start_remote_logging(ip, port)[source]

Start logging on all remote services.

Parameters:
  • ip (str) – hostname or ip of local machine, where a log server is running
  • port (int) – port number of local log server
class combtest.worker.ThreadPool(max_thread_count=None, work=None, **kwargs)[source]

Bases: object

A set of threads doing work from a queue asynchronously to the caller. The caller can add additional work after the work has started. Child classes can add callbacks for e.g. when work starts/finishes, how to handle errors, etc.

add_work(work)[source]

Add a work item to this pool. Will not kick threads into action if all prior work has finished. See kick_workers().

join()[source]

Wait for all threads to finish, then call on_finish.

kick_workers()[source]

If some workers are dead, reap them and start up new ones to replace them. This is useful e.g. when adding work after the pool is already running, since some may have finished and exited.

on_error_item(work_item, state, exc, tb)[source]

Called if a work item errors out (via Exception).

on_finish()[source]

Called when threads are finished up, as indicated by a call to join().

on_finish_item(work_item, state)[source]

Called after a work item is finished running.

on_start(state)[source]

Called when threads start up to do work for the first time. Not re-run when they are “kicked”.

on_start_item(work_item, state)[source]

Called after a work item is dequeued and before it is run.

run(state=None, all_on_next=False)[source]

Run all queued work all the way through to completion, then join.

Parameters:
  • state (object) – an optional single arg to pass to the work callbacks
  • all_on_next (bool) – rather than each thread popping an item that it runs (and therefore no other thread runs), pop a single item and have all threads run it.
signal_stop()[source]

Signal threads they should stop when they are done with their current unit of work. This means they will not stop as quickly as e.g. killing this proc with a signal. Gives them a chance to finish cleanly.

start(state=None)[source]

Tell threads to start working on any work already queued.

Parameters:state (object) – an optional single arg to pass to the work callbacks
start_all_on_next(state=None)[source]

This gets all threads running the next work item. This means the item will potentially be run more than once, and potentially multiple times in parallel.

Parameters:state (object) – an optional single arg to pass to the work callbacks
combtest.worker.start_remote_services(service_class)[source]

Simplest method to start some services remotely.

Parameters:service_class (class) – a decendent of CoordinatorService
Raises:RuntimeError – on various issues with service start up
Returns:A ServiceGroup wrapping the started services
combtest.worker.start_service(service_class, port=None)[source]

Start an rpyc service given by the provided class. Port can be overridden.

Parameters:
  • service_class (rpyc.Service) – a child class of rpyc.Service.
  • port (int) – the port the service should listen for requests on. If it isn’t provided by the caller, we we get a value from combtest.config
Returns:

a handle to the resulting ThreadedServer.

combtest.worker.start_service_by_name(service_name, port=None)[source]

Start an rpyc service given by the provided class qualname. Port can be overridden.

Parameters:
  • service_name (str) – a qualname of achild class of rpyc.Service.
  • port (int) – the port the service should listen for requests on. If it isn’t provided by the caller, we we get a value from combtest.config
Returns:

a handle to the resulting ThreadedServer.