py-combtest API reference¶
Submodules¶
action module¶
This module provides the core classes for specifying operations and systematically generating combinations of them.
-
class
combtest.action.
Action
(param)[source]¶ Bases:
combtest.action.BaseAction
Convenience form of BaseAction: let the user provide a simple iterable of parameters, called
OPTIONS
.-
OPTIONS
= None¶
-
classmethod
get_option_set
()[source]¶ Return the full parameterization space for this Action type. For example: if you are testing a GUI and this Action sets the state of a checkbox, then perhaps this will return
cls(True)
andcls(False)
for ‘set the box to checked’ and ‘set the box to unchecked’ respectively.- Note:
- This returns cls instances, not the parameters themselves.
Returns: An iterable of instances of type cls
-
-
class
combtest.action.
BaseAction
(param)[source]¶ Bases:
object
An Action class bundles a function that performs an operation, and all the different parameterizations for that operation. The
run()
method should be overriden; it performs the operation. Theget_option_set()
classmethod should be overriden to provide all the valid parameterizations of the operation.If you won’t know until runtime if an operation is valid, you can test for that in
run()
, and raise acombtest.walk.CancelWalk
exception if you want to stop executing that Walk right away. You may want to cancel, for example, if the operation + parameter does not make sense given the current state of the feature. Giving that a solid example: suppose you are testing a GUI and it has a control hidden, and suppose the operation is to interact with that control. In therun()
method you can test for that condition and raise acombtest.walk.CancelWalk
.State is represented as an object provided by the user, which will be passed to
run()
call.-
classmethod
get_option_set
()[source]¶ Return the full parameterization space for this Action type. For example: if you are testing a GUI and this Action sets the state of a checkbox, then perhaps this will return
cls(True)
andcls(False)
for ‘set the box to checked’ and ‘set the box to unchecked’ respectively.- Note:
- This returns cls instances, not the parameters themselves.
Returns: An iterable of instances of type cls
-
param
¶ This instance’s parameters, as passed to the initializer.
-
run
(param, state=None)[source]¶ This is called to actually run the operation. It does not need to be invoked directly by the user.
Parameters: - param (object) – this represents the paramaterization of the operation. For example: if you are testing a GUI and this Action represents “set the state of checkbox1”, then param would be the checkbox1 setting.
- state (object) – this object is passed along from Action-to-Action in a given Walk as the Actions execute. It represents any state that needs to be provided to the Action, or which may be modified by the Action. If you are testing a GUI for example, perhaps this is a handle to an instance of the GUI.
:raises
combtest.walk.CancelWalk
: the operation can raisecombtest.walk.CancelWalk
to cancel the execution of acombtest.walk.Walk
at this point.
-
should_cancel
(walk, idx)[source]¶ This method is called before a Walk containing this Action is run. This gives an opportunity to prevent the Walk from running if this combination just doesn’t make any sense.
Parameters: - walk (Walk) – the walk we are in
- idx (int) – the index we are at in the walk
Returns: True if the Walk should not be run, else False
-
classmethod
-
exception
combtest.action.
CancelWalk
[source]¶ Bases:
exceptions.RuntimeError
Raised to immediately stop the execution of a
combtest.walk.Walk
.
-
class
combtest.action.
OptionSet
(options, action_class)[source]¶ Bases:
object
An
OptionSet
is an iterable that provides instances of a givencombtest.action.Action
class. This is typically used as a return value from an Action’sget_option_set()
method.
-
class
combtest.action.
SerialAction
(param)[source]¶ Bases:
combtest.action.Action
A SerialAction is a type of Action which cannot be run in parallel with others, e.g. for when we are running a bunch of ‘Walks’ in parallel. Thinking in terms of parallel algorithms: this represents a pinch point where an operation must be serialized with everything else. In terms of testing: it is part of a test where we can’t run other cases in parallel; e.g. if we are twiddling some global config option that affects all running test cases. We change the option once for all test cases after all outstanding work has finished.
Fundamentally, a SerialAction is run a single time for a given parameterization, regardless of how many Walks contain it. The part of the Walks prior to the SerialAction will be run first.
- Example: suppose we have three Walks:
- [Action1(1), Action2(2), SerialAction1(1), Action3(3)],
- [Action1(1), Action2(3), SerialAction1(1), Action3(3)],
- [Action1(1), Action2(2), Action3(3), SerialAction1(1), Action4(4)]
We can run the first ‘segment’ of each Walk in parallel: * [Action1(1), Action2(2)], * [Action1(1), Action2(3)], * [Action1(1), Action2(2), Action3(3)]
Then, single a thread we will run SerialAction1(1), then in parallal we can run: * [Action3(3)], * [Action3(3)], * [Action4(4)]
-
combtest.action.
get_random_action_by_class
(action_class)[source]¶ Get an instance of the given
Action
, randomly chosen from the set itsAction.get_option_set()
produces. :return: An instance ofaction_class
.
bootstrap module¶
We need some way to bootstrap the rpyc-based
combtest.worker.CoordinatorService
. That can include bootstrapping it
on a remote node if we want to take advantage of a whole cluster. We don’t need
to make too many assumptions to do so. Let’s provide a general interface for
accomplishing this (ServiceHandleArray
), and a base implementation for
bootstrapping locally using multiprocessing
(ServiceHandler_Local
). The client can swap out ServiceHandlers
to
provide alternative bootstrapping logic. ServiceHandleArray
and
ServiceHandler
simply provide the interface that a
combtest.worker.ServiceGroup
uses to bootstrap the service instances.
- Note:
- A paramiko-based
ServiceHandler
class for SSH connections can be found incombtest.ssh_handle.ServiceHandler_SSH
.
-
class
combtest.bootstrap.
ConnectionInfo
(ip, port, service_info)[source]¶ Bases:
object
This is a simple struct-like object meant to wrap all the stuff needed to bootstrap a service.
Parameters: - ip (str) – A string that can be passed as the first argument to
rpyc.connect
. Typically this will be an IP address for our purposes, but could also be a hostname. - port (int) – The port number where the service can be found
- service_info (object) – Additional information your class may need for bootstrapping the service. For the SSH implementation for example, these will be some authentication bits.
- ip (str) – A string that can be passed as the first argument to
-
class
combtest.bootstrap.
ServiceHandleArray
(connection_infos, spawn=True)[source]¶ Bases:
object
A ServiceHandlerArray is a collection of ServiceHandlers, together with dynamic foreach-style dispatch on attribute access. Example:
>>> sa.ServiceHandleArray(blah blah blah) >>> sa.q(1)
this will dispatch an access to the
q
attribute of each containedServiceHandler
in parallel.Parameters: - connection_infos (iterable) – An iterable of
ConnectionInfo
which describe how we will start the services up. - spawn (bool) – True if we should spawn the services during
initialization, False otherwise. The user can later
spawn them manually using
spawn()
.
-
REMOTE_CONNECTION_CLASS
¶ alias of
ServiceHandler
-
is_alive
¶ Returns: True if we have any connections, False otherwise. This is not directly related to whether the services are up; e.g. they may have died for some reason.
-
shutdown
(hard=False)[source]¶ Shutdown all instances this array is tracking.
Parameters: hard (bool) – if False, we will assert that instances shut down cleanly, otherwise we won’t.
-
spawn
(connection_info)[source]¶ Spawn a single instance and attach it to this array.
Parameters: connection_info (ConnectionInfo) – The ConnectionInfo that describes how to start this instance.
-
spawn_many
(connection_infos=None)[source]¶ Spawn and attach a bunch of instances.
Parameters: connection_infos (iterable) – if None, we will spawn using the ConnectionInfo
s passed to the initializer. Otherwise,connection_infos
should be an iterable ofConnectionInfos
.
- connection_infos (iterable) – An iterable of
-
class
combtest.bootstrap.
ServiceHandler
(connection_info)[source]¶ Bases:
object
A
ServiceHandler
implements a way of bootstrapping an rpyc-based service. This base class defines the interface and should be overridden.Parameters: connection_info (ConnectionInfo) – Specifies where the service should be contactable once it is bootstrapped, and any additional information needed to bootstrap it. -
shutdown
()[source]¶ Called to shut the service down. Should be idempotent unless the class writer is very careful about how they shut their services down manually. The overridden method should call the super-method.
-
start_cmd
(service_class, service_run_func='combtest.worker.start_service_by_name')[source]¶ This method will be called to actually do the bootstrapping work. It should be overridden.
Parameters: - service_class (<str|rpyc.Service>) – A full __qualname__ of the service we are bootstrapping, or the class itself.
- service_run_func (str) – optional override of the function that will be used on the remote side to start the service up. Needs to be a __qualname__. Typically this can be left default.
-
-
class
combtest.bootstrap.
ServiceHandler_Local
(connection_info)[source]¶ Bases:
combtest.bootstrap.ServiceHandler
A ServiceHandler implementation foor bootstrapping locally via simple multiprocessing.Process call out.
-
shutdown
()[source]¶ Called to shut the service down. Should be idempotent unless the class writer is very careful about how they shut their services down manually. The overridden method should call the super-method.
-
start_cmd
(service_class, service_run_func='combtest.worker.start_service_by_name')[source]¶ This method will be called to actually do the bootstrapping work. It should be overridden.
Parameters: - service_class (<str|rpyc.Service>) – A full __qualname__ of the service we are bootstrapping, or the class itself.
- service_run_func (str) – optional override of the function that will be used on the remote side to start the service up. Needs to be a __qualname__. Typically this can be left default.
-
config module¶
Getters, setters, and loaders of config. Config includes e.g. SSH authentication stuff, port numbers to use, etc.
There are two layers of config provided: config loaded from file via
refresh_cfg
, and runtime-provided overrides to those. The getters given
below will enforce that ordering. The user provides overrides via the setters.
-
combtest.config.
CONFIG_LOC
= 'combtest.cfg'¶ Location of a config file to load, if the user wants to load config values that way
-
combtest.config.
DEFAULT_LOGGER_PORT
= 6186¶ Default port number for interproc communication for the logger
-
combtest.config.
DEFAULT_MAX_THREAD_COUNT
= 12¶ Default number of workers executing a
combtest.walk.Walk
at a time for a given service instance.
-
combtest.config.
DEFAULT_SERVICE_PORT
= 6187¶ Default port an rpyc service should listen on e.g. if we start up a remote service
-
combtest.config.
get_logger_port
()[source]¶ Get the port we will expose locally for remote loggers to connect to.
-
combtest.config.
get_max_thread_count
()[source]¶ Get the max thread count for
combtest.worker.ThreadPool
.
-
combtest.config.
get_service_port
()[source]¶ Get the port number where we should expect to find an rpyc service running, once it is bootstrapped.
-
combtest.config.
get_ssh_options
()[source]¶ Get all SSH authentication options in a single call. :returns: a dict of options
-
combtest.config.
get_ssh_passwords
()[source]¶ Get passwords for SSH authentication. :return: password, map of ip->password
-
combtest.config.
get_ssh_rsa_keys
()[source]¶ Get paths to RSA keys that we can use for SSH authentication. This can include a single file, or one file per e.g. IP. :return: path, map of ip->path
encode module¶
Easy serialization/deserialization mechanism using JSON. It uses a pair of
methods: an instance level to_json()
method and a classmethod
from_json()
which are used, when available, to convert b/w an instance and
JSON.
We use this mechanism to serialize combtest.action.Action
and
combtest.walk.Walk
instances for printing to a trace file, or other
logs where human readability is handy. The reason we don’t use a custom
encoder/decoder pair for those two classes is that the user can attach a
param which has no default JSON behavior. We let them write to_json()
etc. if they want param
to be a member of a custom class.
forkjoin module¶
Simple fork/join implementation, modelled after Java’s lib for the same. This doesn’t attempt to make any scheduling optimizations for e.g. the sake of minimizing makespan. Threads simply share a queue and pop work off of it until the queue is drained.
- Note:
- This is not designed well for bytecode-level work. This is for I/O heavy work like e.g. performing network roundtrips, doing disk I/O, etc. If you need to dodge the GIL, you’ll want to use something else.
-
class
combtest.forkjoin.
WorkItem
(func, *args, **kwargs)[source]¶ A simple container representing a work item to be done.
Parameters: - func (function) – A pointer to the work callback
- args – args to pass to the work callback
- kwargs – kwargs to pass to the work callback
-
combtest.forkjoin.
fork_join
(work, suppress_errors=False, max_spawn_threads=60)[source]¶ Poor man’s fork/join implementation. Run all work items in parallel up to some maximum number of simultaneous threads. Wait for all work to finish, then return results. Maximum number of threads that will run in parallel for a given job will be max_spawn_threads.
Parameters: - work (iterable) – an iterable of WorkItems that supports direct indexing and len
- suppress_errors (bool) – if False, we will print any exceptions to stderr, otherwise we won’t. Does not affect the return value.
- max_spawn_threads (int) – The max number of threads running simultaneously.
Returns: A list of returned items, in 1:1 correspondence to the WorkItems in ‘work’. That is: the first result corresponds to the first WorkItem, etc. If nothing is explicitly returned by an item, its ‘return value’ in this list will be None.
replay module¶
Methods for replaying a combtest.walk.Walk
. Used via function call, or
main. This includes the ability to replay the Walk
from a trace file
produced during a run.
-
combtest.replay.
load_from_master
(log_file, walk_id)[source]¶ Load a
Walk
from a master log file. A master log file provides paths to trace files which were produced byWalk
running services. This presumes the trace files are available locally, so if they were produced on a remote node, you’ll need to make them available by whatever method you prefer.Parameters: - log_file (str) – a path to the master log file
- walk_id (int) – a
walk_id
which appears in the trace file
-
combtest.replay.
load_from_trace
(trace_file, walk_id)[source]¶ Load a
Walk
from a trace file.Parameters: - trace_file (str) – a path to the trace file
- walk_id (int) – a
walk_id
which appears in the trace file
-
combtest.replay.
replay_walk
(walk_to_run, step=False, replay_func_qualname=None, state=None)[source]¶ Run a single
combtest.walk.Walk
.Parameters: - walk_to_run (Walk) – self evident
- step (bool) – if True, step Action-by-Action through the Walk; the user hits a key to proceed to the next Action.
- replay_func_qualname (str) – qualname of a replay function to use.
Typically this will be
combtest.runner.replay_walk()
, but the user is free to provide their own. - state (object) –
state
passed to the Walk for execution.
-
combtest.replay.
replay_walk_by_id
(log_file, walk_id, step=False, replay_func_qualname=None, state=None)[source]¶ Run a single
combtest.walk.Walk
. Load it by deserializing it from a trace file.Parameters: - log_file (object) – ‘logs’ object returned from
run_tests()
- walk_id (int) – a walk_id that appears in the trace file, or one of the trace files referenced by the master log file.
- step (bool) – if True, step Action-by-Action through the Walk; the user hits a key to proceed to the next Action.
- replay_func_qualname (str) – qualname of a replay function to use.
Typically this will be
combtest.runner.replay_walk()
, but the user is free to provide their own. - state (object) –
state
passed to the Walk for execution.
- log_file (object) – ‘logs’ object returned from
runner module¶
This module provides a set of tools for running Walks. This includes running Walks in stages using e.g. SerialActions, and ways to run Walks in parallel across a cluster.
-
class
combtest.runner.
ContinuingWalkServiceGroup
(*args, **kwargs)[source]¶ Bases:
combtest.worker.ServiceGroup
A
ServiceGroup
designed for runningWalks
in stages. We need to be able to runWalks
in stages in order to satisfy the semantic ofcombtest.action.SerialAction
during a run.-
gather_all_runner_states
(worker_ids)[source]¶ Gather stats from remote workers. User may want to extend this to include any new stats they think of.
Parameters: worker_ids (dict) – a mapping hostname/ip->iterable of worker_ids
from which we should gather responses.Returns: count of Walk segments run, count of Walk execution errors, count of Walks run, list of walk_ids of failed Walks
-
gather_all_states
(worker_ids, full=False)[source]¶ Gather states from all the given workers.
Parameters: worker_ids (dict) – a mapping (hostname/ip, port)->worker_id Returns: a list of states in no particular order.
-
gather_runner_state
(connection, worker_id)[source]¶ Gather
state
from a remote runner for a given worker. This is an optional internal state object provided byWalkRunner.get_state()
.Parameters: - connection (tuple) – str hostname/ip of the remote service, int port number
- worker_id (int) – id of remote worker, as returned when starting
the work (see
scatter_work()
).
-
gather_state
(connection, worker_id, full=False)[source]¶ Gather
state
from a remote service for a given worker. A runningWalk
is free to mutate itsstate
, and sometimes that is what really constitutes the “response” or “output” of a quantum of work.Parameters: - connection (tuple) – str hostname/ip of the remote service, int port number
- worker_id (int) – id of remote worker, as returned when starting
the work (see
scatter_work()
).
-
run
(work, state)[source]¶ Scatter work to all remote services, wait for it to finish executing, then return gathered states.
Parameters: - work (iterable) – iterable of work items
- state (object) – make sure it is picklable
Returns: a list of states, whose ordering is not particularly important.
-
scatter_work
(epoch, id_map=None, max_thread_count=None, state=None)[source]¶ Scatter some
Walk
segments out to the remote workers.Parameters: - epoch (iterable) – iterable of (walk_id, branch_id, Walk)
- id_map (dict) – optional map walk_id->(ip, port) of service currently holding that Walk’s state.
Returns: an updated
id_map
-
start_all_on
(work_item, shared_state)[source]¶ Start a single item of work running on all remote services.
Parameters: - work (callable) – a single work item
- shared_state (object) – make sure it is picklable. Will be shared across all threads on a given service.
Returns: a dict mapping (hostname/ip, port) -> worker_id
-
start_remote_logging
(ip, port, log_dir=None, log_namespace=None, verbose=1)[source]¶ Start logging on all remote services.
Parameters: - ip (str) – hostname or ip of local machine, where a log server is running
- port (int) – port number of local log server
- log_dir (str) – path to remote log directory the service should use
- log_namespace (str) – prefix for log files
- verbose (int) – 0-2 verbosity setting. As 2 an additional verbose level log will be produced.
Returns: a dict mapping remote service hostname/ip->remote log file locations
-
update_remote_states
(ip, worker_ids, walk_ids, state)[source]¶ Push an update to the
states
on the remote executors. This is sometimes useful when acombtest.action.SerialAction
executes and affects the state of some set ofWalks
.Parameters: - ip (str) – hostname or IP where the remote executor can be found
- worker_ids (iterable) – iterable of ints, corresponding to
worker_ids
that can be found on the remote executor. - state (object) – something JSONifiable; the remote state will be updated via its update() method. The simplest way to support this is by using a dict as a state.
-
-
class
combtest.runner.
MultistageWalkRunner
(*args, **kwargs)[source]¶ Bases:
combtest.runner.WalkRunner
Similar to
WalkRunner
, but walks are sent with an id that allows them to reclaim state from a priorWalk
. Typically this is used for Walks to be executed in multiple stages: the second stage is sent with an id matching the first.- Note:
- When we say “reclaim state” we are referring to the
state
.
-
class
combtest.runner.
MultistageWalkRunningService
[source]¶ Bases:
combtest.runner.WalkExecutorService
A
WalkExecutorService
for executingWalks
in multiple stages. That is: we can executeWalks
in parts/slices. Awalk_id
is used to refer to theWalks
uniquely and is the same across all slices.-
WALK_RUNNER_TYPE
¶ alias of
MultistageWalkRunner
-
exposed_gather_state
(worker_id)[source]¶ Return the state associated with the given worker.
Parameters: worker_id (int) – the worker_id
returned from the func used to start work
-
repack_ctx
(state, **repack_kwargs)[source]¶ Called back when a new worker/runner is made to pack up kwargs that we pass to the runner’s initializer.
-
work_repack
(work, shared_state, ctx=None, **repack_kwargs)[source]¶ This is called back to repackage each work item sent to the service. The call is an opportunity to e.g. do some deserialization, wrap the
Walk
in aWalkRunner
, or anything else the user needs to prep the work for execution.Parameters: - work (object) – the work to execute; typically this will be e.g. a
JSONified
Walk
. - shared_state (object) – a
state
copied in for executing theWalk
- ctx (object) – the object returned by repack_ctx before
- work (object) – the work to execute; typically this will be e.g. a
JSONified
-
-
class
combtest.runner.
Result
(walk_count, error_count, segment_count, elapsed, states, logs, failed_tests)¶ Bases:
tuple
-
elapsed
¶ Alias for field number 3
-
error_count
¶ Alias for field number 1
-
failed_tests
¶ Alias for field number 6
-
logs
¶ Alias for field number 5
-
segment_count
¶ Alias for field number 2
-
states
¶ Alias for field number 4
-
walk_count
¶ Alias for field number 0
-
-
class
combtest.runner.
WalkExecutorService
[source]¶ Bases:
combtest.worker.CoordinatorService
Simplest rpyc-based services for running walks across the cluster: the user only needs to provide a list of
Walks
to their corresponding ServiceGroup. The rest is handled.- Note:
- The user should probably not be calling into this directly. They should be starting it up via rpyc.
-
WALK_RUNNER_TYPE
¶ Override the type of
WalkRunner
used, here or in a child classalias of
WalkRunner
-
WORKER_TYPE
¶ Override the type of
ThreadPool
used for executingWalks
, here or in a child classalias of
WalkThreadPool
-
exposed_clean_worker
(worker_id)[source]¶ Toss all state related to a worker. Implies a join first. Gives the user full control over when we reap memory, since they may still want to retrieve/pickle results. It isn’t enough to wait until after they gather the response, since a.) they may not have “obtained” the response value yet, and b.) they may just be gathering intermediate results.
-
exposed_gather_state
(worker_id)[source]¶ Return the state associated with the given worker.
Parameters: worker_id (int) – the worker_id
returned from the func used to start work
-
repack_ctx
(state, **repack_kwargs)[source]¶ Called back when a new worker/runner is made to pack up kwargs that we pass to the runner’s initializer.
-
work_repack
(work, shared_state, ctx=None, **repack_kwargs)[source]¶ This is called back to repackage each work item sent to the service. The call is an opportunity to e.g. do some deserialization, wrap the
Walk
in aWalkRunner
, or anything else the user needs to prep the work for execution.Parameters: - work (object) – the work to execute; typically this will be e.g. a
JSONified
Walk
. - shared_state (object) – a
state
copied in for executing theWalk
- ctx (object) – the object returned by repack_ctx before
- work (object) – the work to execute; typically this will be e.g. a
JSONified
-
class
combtest.runner.
WalkRunner
(reporting_interval=10000)[source]¶ Bases:
object
A WalkRunner simply wraps a
Walks
execution method with some tracking for reporting stats. The user is free to inherit and add more stats as they see fit.
-
class
combtest.runner.
WalkThreadPool
(max_thread_count=None, work=None, **kwargs)[source]¶ Bases:
combtest.worker.ThreadPool
-
combtest.runner.
replay_walk
(walk_to_run, step=False, log_errors=True, state=None)[source]¶ Run a single
combtest.walk.Walk
Parameters: - walk_to_run (Walk) – self evident
- step (bool) – if True, step Action-by-Action through the Walk; the user hits a key to proceed to the next Action.
- log_errors (bool) – log exceptions to the logger if True
- state (object) – state passed to the Walk for execution.
-
combtest.runner.
run_tests
(walk_order, state=None, verbose=1, logger_port=None, runner_class=<class 'combtest.runner.MultistageWalkRunningService'>, service_group_class=<class 'combtest.runner.ContinuingWalkServiceGroup'>, service_infos=None, service_handler_class=<class 'combtest.bootstrap.ServiceHandler_Local'>, max_thread_count=None, gather_states=False, log_dir=None, use_accountant=False)[source]¶ Run a collection of
combtest.walk.Walk
. This should be the main way to executeWalks
for most users. This is the only interface that supports correct execution of acombtest.action.SerialAction
.You can provide some instance to serve as the state passed around during the tests. There are two important details to know about this:
- The state must be JSON-ifiable, but py-combtest provides a convenience
pattern to help with that. See
encode()
. - Shallow copies of the state will be made, via copy.copy(), since each test owns its own copy. You may want to e.g. override __copy__ if the details of the copy are important to you.
Parameters: - walk_order (iterable) – An iterable of iterables which produce
combtest.action.Action
. Example: a list of iterables produced byMyActionClass.get_option_set()
. - state (object) – a state to copy and pass to the
Walks
when we execute them. - verbose (int) – 0-2 verbosity setting. At 2 an additional verbose level log will be produced.
- logger_port (int) – the port number where our local logger should accept data.
- runner_class (combtest.worker.CoordinatorService) – the type of Walk execution service to use.
- service_group_class (combtest.worker.ServiceGroup) – the type of
ServiceGroup
we will use to coordinate remote executors - service_infos (iterable) – An iterable of any extra infos we need to
bootstrap the remote services. See
combtest.bootstrap.ServiceHandleArray
. - service_handler_class (combtest.bootstrap.ServiceHandler) – Type of
ServiceHandler
to use to bootstrap the services. - gather_states (bool) – if True or 1, gather and return all
states
from the remote services at the end of the run. Will be returned as a mapping ip->[state, …]. if 2, gather extra info about the run of the walk, such as if it was canceled. else the returned states will be None - max_thread_count (int) – Max number of
Walk
executing threads that each service will use. - log_dir (str) – Directory where we will store traces, debug logs, etc. Remote services will also attempt to store logs to the same path. If Accountant is enabled, this argument will override any directory choice that Accountant may make automatically. Use with caution on the regression wheel, since this means Accountant won’t automatically point to a log server directory.
- use_accountant (bool) – if True, use Accountant to record results. This will also cause testdb report back where possible (e.g. during a regression wheel run). You can also provide a SimpleAccountant here, which we will gladly use. It must not already be in a testcase or suite context.
Raises: RuntimeError – when remote services can’t be established and connected to.
Returns: count of walks run, count of walk execution errors, count of walk segments run, total elapsed time, remote states if
gather_state == True
else None, the location of the master log file, where applicable.- The state must be JSON-ifiable, but py-combtest provides a convenience
pattern to help with that. See
ssh_handle module¶
SSH-based connection for bootstrapping remote rpyc services. The user does not have to use this, but it is provided as a default implementation for those not wanting to write their own connection logic.
-
class
combtest.ssh_handle.
ServiceHandler_SSH
(connection_info)[source]¶ Provide an SSH-based method of bootstrapping our rpyc
Walk
executors.Parameters: connection_info (ConnectionInfo) – a combtest.bootstrap.ConnectionInfo
which optionally provides aservice_info
dictionary containing SSH authentication info. You can get such a dict fromcombtest.config.get_ssh_options()
utils module¶
Misc utils
-
class
combtest.utils.
RangeTree
(min_idx, max_idx, root_value=())[source]¶ A representation of a range, with the levels of the tree representing hierarchical splits of the range. O(N) mem in the number of sub-ranges. Example:
-->972 -->486 -->243 -->243 -->486 -->243 -->243
-
combtest.utils.
get_class_from_qualname
(name)[source]¶ Resolve a fully qualified class name to a class, attempting any imports that need to be done along the way. :raises: ImportError or AttributeError if we can’t figure out how to import it
-
combtest.utils.
get_class_qualname
(cls)[source]¶ Give a fully qualified name for a class (and therefore a function ref as well). Example: ‘combtest.action.Action’.
- Note:
- Python3 has this, or something like it, called
__qualname__
. We implement our own here to retain control of it, and give us Python2 compat.
-
combtest.utils.
get_my_IP
()[source]¶ Get our best guess at one of our local IPs that others could use to contact us (e.g. to connect to a local log server).
walk module¶
This module provides ways of systematically combining operations, sets of operations, etc., and executing such sequences of operations.
-
class
combtest.walk.
Walk
(*elems)[source]¶ A Walk, named after the graph theory concept, is a list of
combtest.action.Action
to execute, together with a piece of state that the Actions can manipulate. The notion here is that eachAction
performs an operation that takes us through a transition in state/action space.Parameters: elems (iterable) – An iterable of combtest.action.Action
-
exception
combtest.walk.
WalkFailedError
[source]¶ Raised when a
combtest.walk.Walk
failed to execute to completion for any reason (e.g. one of its operations raised anException
).
-
class
combtest.walk.
WalkOpTracer
(log_dir, *namespace)[source]¶ Traces a
Walk
portion + its adjacentSerialAction
andwalk_id
. The id is consistent across portions so that you can relate them back together later.
-
class
combtest.walk.
WalkOptions
(walk_order)[source]¶ A WalkOptions accepts a set of
Action
andSerialAction
options and produces an iterable ofEpoch
. Each Epoch represents a set ofWalk
segments which can run in parallel, and an optionalSerialAction
which should be run before theWalk
portions.Not threadsafe.
Parameters: walk_order (iterable) – An iterable Action
types-
sizes
¶ Returns: A tuple of the size of the option sets for each Action
type in thisWalkOptions
.
-
worker module¶
This module provides a set of mechanisms for dispatching “work” across a set of worker nodes. “Work” can mean “execute sets of arbitrary Python code”. This system is made of a few pieces:
- A basic thread pool implementation. There is a
multiprocessing.ThreadPool
implementation in Python already, but I want the control and flexibility of having my own. This and its decendents are the “workers” that actually e.g. execute test cases or run fuzz+stress load (seeThreadPool
). Thread counts can be chosen by the client, by default, whatever. - An rpyc-based coordinator that deserializes work sent to it and sends it
to a worker/executor of some sort, which will typically be a ThreadPool
(see
CoordinatorService
below). Typically there will be one of these per node running in its own process, or at least one of a given “type” per node. The only constraint is that it needs to have a 1:1 ip/port mapping. Typically there will be one central coordinator (ServiceGroup
) paired with oneCoordinatorService
per node. - A central coordinator running in a single process that is passed work and
sends it to all paired remote coordinators for execution
(
ServiceGroup
).
Work is sent in some quantum defined by the client code and passed from
ServiceGroup->CoordinatorService->ThreadPool
. Responses can be sent back.
- The quantum should be large enough for a worker to keep its children busy
- The child may quit before all work is done if e.g. a failure limit is reached, or a fatal error hit.
- A parent can specify how many workers a child can have, otherwise a worker is free to decide.
- A parent can raise signal or other condition to a child to tell it to quit early. It must obey and send back any accumulated responses, or have its results abandoned.
- Parent must deal with errors where the child dies silently (e.g. interpreter crash hard enough that we don’t run atexit handlers)
-
class
combtest.worker.
CoordinatorService
[source]¶ Bases:
rpyc.core.service.Service
Receives work from a remote process via rpyc, starts the work running via
ThreadPool
or some decendent class.-
DEFAULT_MAX_THREAD_COUNT
= 12¶ Override point for max number of threads for each worker to have. The other ways to set this are 1. via config.py, and 2. via an option passed at runtime (e.g. via
exposed_start_workers_on()
).
-
WORKER_TYPE
¶ a ThreadPool class that workers should be instantiated from User can override
alias of
ThreadPool
-
exposed_add_work
(worker_id, work, repack_kwargs=None)[source]¶ Add work to an existing worker. The original state will be used. If the user wants work to be executed with a different state, they start a new worker up via
exposed_start_work()
.Parameters: - worker_id (int) – the
worker_id
returned from the func used to start work - work (object) – the work item
- worker_id (int) – the
-
exposed_clean_worker
(worker_id)[source]¶ Toss all state related to a worker. Implies a join first. Gives the user full control over when we reap memory, since they may still want to retrieve/pickle results. It isn’t enough to wait until after they gather the response, since a.) they may not have “obtained” the response value yet, and b.) they may just be gathering intermediate results.
-
exposed_gather_state
(worker_id)[source]¶ Return the state associated with the given worker.
Parameters: worker_id (int) – the worker_id
returned from the func used to start work
-
exposed_join_workers
(worker_id)[source]¶ Join all working threads in a worker and toss the worker for further use. The responses and states will stay around until the client “cleans” the worker state. That means this is not a stateless service. The user needs to beware of leaking resources.
Parameters: worker_id (int) – the worker_id
returned from the func used to start work
-
exposed_run
(work, max_thread_count=None, state=None, repack_kwargs=None)[source]¶ Run work synchronously. Return a
worker_id
so that responses/states can be reclaimed.
-
exposed_signal_stop
(worker_id)[source]¶ Signal that a worker should stop. May not be immediate since this is intended to be a “clean” shutdown where workers can finish their current work item.
Parameters: worker_id (int) – the worker_id
returned from the func used to start work
-
exposed_start_remote_logging
(ip, port, **kwargs)[source]¶ Start sending logs to the log server at the given ip+port.
Parameters: - ip (str) – hostname or ip where the log server is running
- port (int) – port number where the log server is running
-
exposed_start_work
(work, state=None, max_thread_count=None, repack_kwargs=None)[source]¶ Called by remote side to start some work running asynchronously. The work and state are pickled and pulled through to this side in whole, rather than accessing via netrefs.
Returns: a worker_id to refer back to this work later for e.g. stopping it or getting responses.
-
exposed_start_workers_on
(work, max_thread_count=None, state=None, repack_kwargs=None)[source]¶ Similar to start_work, but a single item is sent, and all threads in the new worker will run that single item.
Returns: The worker_id
of the worker executing the work
-
on_connect
(conn)[source]¶ rpyc.Service
is designed for this to be the de-facto__init__
function. So we are doing our initialization here.
-
repack_ctx
(state, **repack_kwargs)[source]¶ Called back when a new worker/runner is made to pack up kwargs that we pass to the runner’s initializer.
-
work_repack
(work, state, ctx=None, **repack_kwargs)[source]¶ This is called back to repackage each work item sent to the service. The call is an opportunity to e.g. do some deserialization, wrap the
Walk
in aWalkRunner
, or anything else the user needs to prep the work for execution.Parameters: - work (object) – the work to execute; typically this will be e.g. a
JSONified
Walk
. - state (object) – a
state
copied in for executing theWalk
- ctx (object) – the object returned by repack_ctx before
- work (object) – the work to execute; typically this will be e.g. a
JSONified
-
-
class
combtest.worker.
ServiceGroup
(service_name, service_infos=None, service_handler_class=<class 'combtest.bootstrap.ServiceHandler_Local'>, spawn_services=True, spawn_clients=True)[source]¶ Bases:
object
Handles to running service instances across the nodes via e.g. SSH, and rpyc-based clients connected to those. This has a number of functions for dispatching work to the execution services, and gathering responses and stats.
Warning
We spawn connections to all nodes. b/w the SSH connections and client connections we may run into scalability issues on giant clusters. There are obvious optimizations we can do here, such as lazy connecting, but it would require some extra hooks. Please file a bug later when/if we prove there is such a problem and a need for a fix.
-
DEFAULT_INSTANCE_COUNT
= 3¶ If the user does not provide a specific enumeration of ip/port where we should set services up running, this will be the default number of services to start up, which we will start locally.
-
SPAWN_TIMEOUT
= 30¶ Max time we spend waiting for a service spawn to succeed. We use this since e.g. starting services does not mean the service is immediately available, due to waiting for imports to happen. Hence the clients need to retry for a bit until the services come up. This timeout is measured in seconds.
-
WORK_QUANTUM_SIZE
= 1000¶ The maximum chunk of work we will send in a single RPC call. This is to keep the request size sane. If the user gives us an iterable that is generating a massive amount of work on-the-fly we may not be able to instantiate it all and hold it in memory. So we send batches that are this size at maximum.
-
clients
¶ Returns a mapping (ip, port)->client “root” (see rpyc connect docs)
-
gather_all_states
(worker_ids)[source]¶ Gather states from all the given workers.
Parameters: worker_ids (dict) – a mapping (hostname/ip, port)->worker_id Returns: a list of states in no particular order.
-
gather_state
(connection, worker_id)[source]¶ Gather
state
from a remote service for a given worker. A runningWalk
is free to mutate itsstate
, and sometimes that is what really constitutes the “response” or “output” of a quantum of work.Parameters: - connection (tuple) – str hostname/ip of the remote service, int port number
- worker_id (int) – id of remote worker, as returned when starting
the work (see
scatter_work()
).
-
classmethod
give_up
()[source]¶ Signal that all
ServiceGroup
should give up trying to connect to remote services, send work, etc. and bail immediately. We can use this e.g. if we receive a signal locally and don’t want to wait for a long-running scatter or gather.
-
join
(hard=False)[source]¶ Wait for all workers on all remote services to complete. We can have a finer-grained function later if it is helpful.
Parameters: hard (bool) – ignore errors if True Raises: EOFError, ReferenceError, RuntimeError – if we have an issue while joinging (e.g. the remote side died)
-
run
(work, state=None)[source]¶ Scatter work to all remote services, wait for it to finish executing, then return gathered states.
Parameters: - work (iterable) – iterable of work items
- state (object) – make sure it is picklable
Returns: a list of states, whose ordering is not particularly important.
-
scatter_work
(work, max_thread_count=None, state=None)[source]¶ Partition the provided iterable of work into roughly even-sized portions and send them to each of the remote services.
state
will be copied to each node independently. The user must handle the logic of retrieving results and stitching them together. Seegather_state()
.Parameters: - work (iterable) – iterable of work items
- max_thread_count (int) – override of how many threads each remote executor should have
- state (object) – make sure it is picklable
Returns: a dict mapping (hostname/ip, port) -> worker_id
-
shutdown
(hard=False)[source]¶ Shut down both clients and services.
Parameters: hard (bool) – if hard, we will ignore any errors trying to shut down
-
shutdown_clients
(hard=False)[source]¶ Shut down all running clients.
Parameters: hard (bool) – if hard, we will ignore any errors trying to shut down
-
shutdown_services
(hard=False)[source]¶ Shut down all running services.
Parameters: hard (bool) – if hard, we will ignore any errors trying to shut down
-
spawn_clients
()[source]¶ Spawn rpyc clients to the remote services. Assumes remote services are up, or will be within
SPAWN_TIMEOUT
.
-
start_all_on
(work_item, shared_state=None)[source]¶ Start a single item of work running on all remote services.
Parameters: - work (callable) – a single work item
- shared_state (object) – make sure it is picklable. Will be shared across all threads on a given service.
Returns: a dict mapping (hostname/ip, port) -> worker_id
-
-
class
combtest.worker.
ThreadPool
(max_thread_count=None, work=None, **kwargs)[source]¶ Bases:
object
A set of threads doing work from a queue asynchronously to the caller. The caller can add additional work after the work has started. Child classes can add callbacks for e.g. when work starts/finishes, how to handle errors, etc.
-
add_work
(work)[source]¶ Add a work item to this pool. Will not kick threads into action if all prior work has finished. See
kick_workers()
.
-
kick_workers
()[source]¶ If some workers are dead, reap them and start up new ones to replace them. This is useful e.g. when adding work after the pool is already running, since some may have finished and exited.
-
on_start
(state)[source]¶ Called when threads start up to do work for the first time. Not re-run when they are “kicked”.
-
run
(state=None, all_on_next=False)[source]¶ Run all queued work all the way through to completion, then join.
Parameters: - state (object) – an optional single arg to pass to the work callbacks
- all_on_next (bool) – rather than each thread popping an item that it runs (and therefore no other thread runs), pop a single item and have all threads run it.
-
signal_stop
()[source]¶ Signal threads they should stop when they are done with their current unit of work. This means they will not stop as quickly as e.g. killing this proc with a signal. Gives them a chance to finish cleanly.
-
-
combtest.worker.
start_remote_services
(service_class)[source]¶ Simplest method to start some services remotely.
Parameters: service_class (class) – a decendent of CoordinatorService
Raises: RuntimeError – on various issues with service start up Returns: A ServiceGroup wrapping the started services
-
combtest.worker.
start_service
(service_class, port=None)[source]¶ Start an rpyc service given by the provided class. Port can be overridden.
Parameters: - service_class (rpyc.Service) – a child class of
rpyc.Service
. - port (int) – the port the service should listen for requests on. If it
isn’t provided by the caller, we we get a value from
combtest.config
Returns: a handle to the resulting
ThreadedServer
.- service_class (rpyc.Service) – a child class of
-
combtest.worker.
start_service_by_name
(service_name, port=None)[source]¶ Start an rpyc service given by the provided class qualname. Port can be overridden.
Parameters: - service_name (str) – a qualname of achild class of
rpyc.Service
. - port (int) – the port the service should listen for requests on. If it
isn’t provided by the caller, we we get a value from
combtest.config
Returns: a handle to the resulting
ThreadedServer
.- service_name (str) – a qualname of achild class of