Example code¶
The first step we need is to define the operations that we will be combining.
In combtest these are called Actions
. Each is a decendent class
of Action
. Such classes require a
run
function that defines the actual
operation. Acceptable parameters for passing to the function can
be provided via the OPTIONS
class member.
A test case is represented as a sequence of Action
instances. These cases are Walk
instances, which
will be generated via combination of the Actions
the user provides.
Walks
are simply executables that execute the run()
functions of their Actions
in order, passing a shared object called
state
to each.
Example:
from combtest.action import Action
class MyAction(Action):
# Each value in this list can be passed to run(). When we
# generate test cases we will use this option list, and the option
# list of all other Actions, and generate all possible combinations.
OPTIONS = [True, False]
def run(self, param, state=None):
# The 'state' is an optional object passed among all instances in a
# ``Walk``. It can be anything the user wants to have shared across
# those ``Actions``. A copy is made for each generated ``Walk``.
something = state['something']
something.method(param)
# Raising an Exception during execution fails the case, and no
# later ``Actions`` in the case will be run.
if something.value == False:
raise ScaryError(":(")
If you prefer to generate the options of an Action
in some more interesting
way, or want to return an iterator of options, you can override the
get_option_set()
classmethod:
from combtest.action import Action, OptionSet
import my_lib
class MyAction(Action):
def run(self, param, state):
blah blah
@classmethod
def get_option_set(cls):
# An OptionSet is an iterator that produces, in this case, ``cls``
# instances, one for each value produced by the provided iterator.
return OptionSet(my_lib.some_iterator, cls)
That is it; once you have defined your Action
classes, you can pass them to the py-combtest
framework to be run.
Generating and Running Tests¶
Once you have written your Action
classes, running
tests is as simple as:
from combtest.runner import run_tests
action_order = [
MyAction1,
MyAction2,
MyAction3,
MyAction4,
]
results = run_tests(action_order)
print("Ran %d walks in %0.2fs with %d errors" % (results.walk_count,
results.elapsed,
results.error_count)
Tests will be generated automatically given the options that your Actions
provide.
run_tests()
has many options, which are worth reading
in the docs. Here are a few examples.
Providing a custom state
object:
# You can provide some instance to serve as the state passed around during
# the tests. There are two important details to know about this:
# * The state must be JSON-ifiable, but py-combtest provides a convenience
# pattern to help with that. See ``Notes on Serialization`` below.
# * Shallow copies of the state will be made, via copy.copy(), since each
# test owns its own copy. You may want to e.g. override __copy__ if
# the details of the copy are important to you.
# Here is an example of a silly state class for simplicity of illustration:
class MyStateClass(dict):
@property
def A(self):
return self.get('A', None)
@A.setter
def A(self, value):
self['A'] = value
@property
def B(self):
return self.get('B', None)
@B.setter
def B(self, value):
self['B'] = value
class MyActionA(Action):
OPTIONS = [1, 2, 3]
def run(self, param, state):
state.A = param
class MyActionB(MyActionA):
def run(self, param, state):
state.B = param
my_state = MyStateClass()
# gather_states=True means that the resultingn states are interesting to
# us, so we want the framework to pull the states back to us after the
# test run.
results = run_tests([MyActionA,
MyActionB],
state=my_state,
gather_states=True)
print("Resulting states: ", results.states)
- The output for this case looks like this: ::
- Resulting states: [[{u’A’: 1, u’B’: 1}, {u’A’: 2, u’B’: 1}, {u’A’: 3, u’B’: 1}], [{u’A’: 1, u’B’: 2}, {u’A’: 2, u’B’: 2}, {u’A’: 3, u’B’: 2}], [{u’A’: 3, u’B’: 3}, {u’A’: 1, u’B’: 3}, {u’A’: 2, u’B’: 3}]]
You can vary the log directory and verbosity of logs, and can record replays
of each Walk
so you can re-run them later.
# Minimalist logging. Verbose can be 0, 1, 2, with increasing log verbosity
run_tests(...,
verbose=0,
...
)
# At verbosity=2 we will produce an additional debug level log
# If log_dir is provided we will also record a replay for every
# ``Walk`` that is run. Paths to the log files will be available in
# the Results object returned from ``run_tests()``.
run_tests(...
verbose=2,
log_dir='/my/log/dir',
...
)
Serial Actions¶
Sometimes we need to perform an operation that affects multiple tests that we
want to run in parallel. py-combtest
provides a feature for accomplishing
that.
By way of example: suppose we are performing a system test, and that the system has some global config setting that affects all tests. First We need all tests to finish executing up to the point that the config change should happen. At that point we perform the config change once and for all, and then release the next part of the tests to once again run in parallel.
The operation that needs to run once-and-for-all and serial is represented
with a SerialAction
. The user provides their
SerialAction
in exactly the same way they provide other
Actions
.
from combtest.action import SerialAction
import my_system
class ChangeConfigSerialAction(SerialAction):
# A single option means we will be running this once for all
# tests, but it is perfectly fine to have multiple options.
OPTIONS = [True,]
def run(self, param, state):
# Change some global setting
my_system.set_some_global_config(1)
run_tests([Action1,
Action2,
ChangeConfigSerialAction,
Action3],
...)
Notice above that the SerialAction
is treated like any other. The framework
will run all Action1
and Action2
in parallel, wait for them to finish,
run ChangeConfigSerialAction
once, then run all Action3
in parallel. If
the SerialAction
had more than one option, each option will be run one at a
time, serially.
Notes on Distributed Test Execution¶
py-combtest
provides a number of ways to execute tests in a distributed +
parallel fashion. The mechanisms for that are as follows:
- A generalized thread pool implementation that can execute multiple callables in parallel in a given process. (
ThreadPool
)- An
rpyc
-based service which receives lists of tests (or portions of tests) to run, dispatches them to a thread pool for for execution, and which collects outputs, statistics, etc. (CoordinatorService
)- A client class that bootstraps test executors, potentially across many machines, and dispatches tests to them for running. (
ServiceGroup
)- Customizable mechanisms for bootstrapping the services - e.g. via SSH, multiprocessing. (
ServiceHandler
)
For most purposes the user should not need to inherit or override any of these,
but they are designed to make that possible. For convenience, all overrides can
be made at the run_tests()
call. Example:
class MyTestService(MultistageWalkRunningService):
...
...
run_tests(..., runner_class=MyTestService)
Using SSH To Bootstrap Test Executors Across Machines¶
The user is free to specify how test executors are bootstrapped, but two convenience implementations are provided:
- A
multiprocessing
-based method, which spins up multiple processes on the local machine. This is the default implementation.- An optional
paramiko
-based SSH implementation, which makes it relatively simple to bootstrap test executors across machines.
The latter is used by simply installing it and passing it to
run_tests()
:
import combtest.ssh_handle as ssh
...
run_test(..., service_handler_class=ssh.ServiceHandler_SSH)
The trick to using this is the need to provide authentication. Authentication
credentials can be specified using the config
module
(see below). A path to an rsakey file can be provided for example.
paramiko
also supports username/password authentication, among other
methods.
NOTE: paramiko
is not installed by default with py-combtest
. You will
need to ensure it is installed to use ssh_handle
.
Custom Service Bootstrapping Methods¶
The user is free to specify other ways of bootstrapping test executors,
though that will not be required for most users. See the interface provided
by ServiceHandler
for reference.
Notes on Serialization¶
Several objects need to be serialized to work properly, such as custom
state
objects. py-combtest
uses a callback pattern to make it
easy for the user to provide JSON serializers for any classes they
are using. The serialization looks like this:
class MyClass(object):
def __init__(self, a, b):
self.a = a
self.b = b
def to_json(self):
return (self.a, self.b)
@classmethod
def from_json(cls, obj):
cls(obj[0], obj[1])
The user provides the pair of methods to_json
, from_json
. The
former provides some JSON-ifiable object. py-combtest
will recursively call
to_json
on anything contained in that object. So if self.a
is another
custom class with a to_json
method, the above example will still work.
The latter receives a copy of that object during deserialization and returns
a deserialized instance of the given class. That is: it maps the thing
returned by to_json
on the class’s constructor.
Logging and Replay¶
- There are three main types of logging
combtest
uses: - Standard Python logging, which the user can leverge as normal
- An extension to Python logging, called
central_logger
; the user can treat this like standard Python logging, but logs will automatically be sent between the test execution services and the coordinating process (whererun_tests()
was called). - Human-readable representations of test cases; these service as traces of
the cases which were run, and can be fed back to the
replay
module to be re-run individually.
Distributed Logging¶
To log using central_logger
, simply leverage the logger
it exposes instead of using a logger from the standard logging lib:
from combtest.central_logger import logger
...
logger.error("We are having trouble here.")
These log lines will automatically be dispatched back to the coordinating process, where they may be printed to e.g. stdout and/or to a log file, depending on the verbosity settings the user chose.
Replay¶
We collect human-readable traces of the tests we run whenever the user
provides a log directory to run_tests()
. After the
tests run, we can pass the logs, log string, or Walk
to the replay
module to run an individual case.
Running a single Walk
:
import combtest.replay as replay
import combtest.walk as walk
import my_test
actions = [
my_test.ActionClass1(1),
my_test.ActionClass2(True),
my_test.SerialActionClass1("qqq"),
]
my_walk = walk.Walk(actions)
my_state = {"some": "stuff}
replay.replay_walk(my_walk, state=my_state)
Re-running a Walk
from a trace file:
# Suppose we run some tests, and some of them failed:
results = run_test(...)
# We can replay one of the failed tests from the trace logs:
my_state = {"some": "thing"}
replay.replay_walk_by_id(results.logs, results.failed_tests[17],
state=my_state)
You can actually run any Walk
from the trace, not
just failed Walks
. The second argument to
replay_walk_by_id()
is an integer identifier of
the Walk
, which can be found in the trace file.
Config¶
py-combtest
uses a minimalist config for parameterizing things like
thread counts and authentication information for machines where test
executors should run. The values can be set at runtime or by config
file.
By default, config
will look for a file called
combtest.cfg
in the current working directory and attempt to load it. It
could be used for example to specify the maximum number of worker threads the
test executors are allowed. When using SSH to bootstrap the test executors,
config
can be used to provide authentication information. Example file:
[NET]
rsakey: /path/to/my/rsakey/file
[WORKER]
max_thread_count: 10
The values can also be overridden programatically by the user:
import combtest.config as config
# Default port we will bind our rpyc instance to on a given machine. If we
# bind more than one on a given machine, this will be the first port, and
# all ports will be consecutive.
config.set_service_port(6009)