From 179d35173a6848113addbf9318a23ee495248573 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Wed, 30 Nov 2016 18:28:21 -0800 Subject: [PATCH 01/11] Add a directed acyclic graph (DAG) implementation. --- stacker/dag/__init__.py | 217 ++++++++++++++++++++++++++++++++++++++ stacker/tests/test_dag.py | 125 ++++++++++++++++++++++ 2 files changed, 342 insertions(+) create mode 100644 stacker/dag/__init__.py create mode 100644 stacker/tests/test_dag.py diff --git a/stacker/dag/__init__.py b/stacker/dag/__init__.py new file mode 100644 index 000000000..ee1b71118 --- /dev/null +++ b/stacker/dag/__init__.py @@ -0,0 +1,217 @@ +from copy import copy, deepcopy +from collections import deque + +try: + from collections import OrderedDict +except: + from ordereddict import OrderedDict + + +class DAGValidationError(Exception): + pass + + +class DAG(object): + """ Directed acyclic graph implementation. """ + + def __init__(self): + """ Construct a new DAG with no nodes or edges. """ + self.reset_graph() + + def add_node(self, node_name, graph=None): + """ Add a node if it does not exist yet, or error out. """ + if not graph: + graph = self.graph + if node_name in graph: + raise KeyError('node %s already exists' % node_name) + graph[node_name] = set() + + def add_node_if_not_exists(self, node_name, graph=None): + try: + self.add_node(node_name, graph=graph) + except KeyError: + pass + + def delete_node(self, node_name, graph=None): + """ Deletes this node and all edges referencing it. """ + if not graph: + graph = self.graph + if node_name not in graph: + raise KeyError('node %s does not exist' % node_name) + graph.pop(node_name) + + for node, edges in graph.iteritems(): + if node_name in edges: + edges.remove(node_name) + + def delete_node_if_exists(self, node_name, graph=None): + try: + self.delete_node(node_name, graph=graph) + except KeyError: + pass + + def add_edge(self, ind_node, dep_node, graph=None): + """ Add an edge (dependency) between the specified nodes. """ + if not graph: + graph = self.graph + if ind_node not in graph or dep_node not in graph: + raise KeyError('one or more nodes do not exist in graph') + test_graph = deepcopy(graph) + test_graph[ind_node].add(dep_node) + is_valid, message = self.validate(test_graph) + if is_valid: + graph[ind_node].add(dep_node) + else: + raise DAGValidationError() + + def delete_edge(self, ind_node, dep_node, graph=None): + """ Delete an edge from the graph. """ + if not graph: + graph = self.graph + if dep_node not in graph.get(ind_node, []): + raise KeyError('this edge does not exist in graph') + graph[ind_node].remove(dep_node) + + def walk(self, walk_func, graph=None, reverse=True): + """ Walks each node of the graph in reverse topological order. + + This can be used to perform a set of operations, where the next + operation depends on the previous operation. It's important to note + that walking happens serially, and is not paralellized. + """ + if not graph: + graph = self.graph + nodes = self.topological_sort(graph=graph) + if reverse: + nodes.reverse() + for n in nodes: + walk_func(n) + + def rename_edges(self, old_task_name, new_task_name, graph=None): + """ Change references to a task in existing edges. """ + if not graph: + graph = self.graph + for node, edges in graph.items(): + + if node == old_task_name: + graph[new_task_name] = copy(edges) + del graph[old_task_name] + + else: + if old_task_name in edges: + edges.remove(old_task_name) + edges.add(new_task_name) + + def predecessors(self, node, graph=None): + """ Returns a list of all predecessors of the given node """ + if graph is None: + graph = self.graph + return [key for key in graph if node in graph[key]] + + def downstream(self, node, graph=None): + """ Returns a list of all nodes this node has edges towards. """ + if graph is None: + graph = self.graph + if node not in graph: + raise KeyError('node %s is not in graph' % node) + return list(graph[node]) + + def all_downstreams(self, node, graph=None): + """Returns a list of all nodes ultimately downstream + of the given node in the dependency graph, in + topological order.""" + if graph is None: + graph = self.graph + nodes = [node] + nodes_seen = set() + i = 0 + while i < len(nodes): + downstreams = self.downstream(nodes[i], graph) + for downstream_node in downstreams: + if downstream_node not in nodes_seen: + nodes_seen.add(downstream_node) + nodes.append(downstream_node) + i += 1 + return filter(lambda node: node in nodes_seen, self.topological_sort(graph=graph)) + + def all_leaves(self, graph=None): + """ Return a list of all leaves (nodes with no downstreams) """ + if graph is None: + graph = self.graph + return [key for key in graph if not graph[key]] + + def from_dict(self, graph_dict): + """ Reset the graph and build it from the passed dictionary. + + The dictionary takes the form of {node_name: [directed edges]} + """ + + self.reset_graph() + for new_node in graph_dict.iterkeys(): + self.add_node(new_node) + for ind_node, dep_nodes in graph_dict.iteritems(): + if not isinstance(dep_nodes, list): + raise TypeError('dict values must be lists') + for dep_node in dep_nodes: + self.add_edge(ind_node, dep_node) + + def reset_graph(self): + """ Restore the graph to an empty state. """ + self.graph = OrderedDict() + + def ind_nodes(self, graph=None): + """ Returns a list of all nodes in the graph with no dependencies. """ + if graph is None: + graph = self.graph + + dependent_nodes = set(node for dependents in graph.itervalues() for node in dependents) + return [node for node in graph.keys() if node not in dependent_nodes] + + def validate(self, graph=None): + """ Returns (Boolean, message) of whether DAG is valid. """ + graph = graph if graph is not None else self.graph + if len(self.ind_nodes(graph)) == 0: + return (False, 'no independent nodes detected') + try: + self.topological_sort(graph) + except ValueError: + return (False, 'failed topological sort') + return (True, 'valid') + + def topological_sort(self, graph=None): + """ Returns a topological ordering of the DAG. + + Raises an error if this is not possible (graph is not valid). + """ + if graph is None: + graph = self.graph + + in_degree = {} + for u in graph: + in_degree[u] = 0 + + for u in graph: + for v in graph[u]: + in_degree[v] += 1 + + queue = deque() + for u in in_degree: + if in_degree[u] == 0: + queue.appendleft(u) + + l = [] + while queue: + u = queue.pop() + l.append(u) + for v in graph[u]: + in_degree[v] -= 1 + if in_degree[v] == 0: + queue.appendleft(v) + + if len(l) == len(graph): + return l + else: + raise ValueError('graph is not acyclic') + + def size(self): + return len(self.graph) diff --git a/stacker/tests/test_dag.py b/stacker/tests/test_dag.py new file mode 100644 index 000000000..55e5f5edc --- /dev/null +++ b/stacker/tests/test_dag.py @@ -0,0 +1,125 @@ +""" Tests on the DAG implementation """ + +from nose import with_setup +from nose.tools import nottest, raises +from stacker.dag import DAG, DAGValidationError + +dag = None + +@nottest +def blank_setup(): + global dag + dag = DAG() + +@nottest +def start_with_graph(): + global dag + dag = DAG() + dag.from_dict({'a': ['b', 'c'], + 'b': ['d'], + 'c': ['d'], + 'd': []}) + +@with_setup(blank_setup) +def test_add_node(): + dag.add_node('a') + assert dag.graph == {'a': set()} + +@with_setup(blank_setup) +def test_add_edge(): + dag.add_node('a') + dag.add_node('b') + dag.add_edge('a', 'b') + assert dag.graph == {'a': set('b'), 'b': set()} + +@with_setup(blank_setup) +def test_from_dict(): + dag.from_dict({'a': ['b', 'c'], + 'b': ['d'], + 'c': ['d'], + 'd': []}) + assert dag.graph == {'a': set(['b', 'c']), + 'b': set('d'), + 'c': set('d'), + 'd': set()} + +@with_setup(blank_setup) +def test_reset_graph(): + dag.add_node('a') + assert dag.graph == {'a': set()} + dag.reset_graph() + assert dag.graph == {} + +@with_setup(start_with_graph) +def test_walk(): + nodes = [] + def walk_func(n): + nodes.append(n) + dag.walk(walk_func) + assert nodes == ['d', 'b', 'c', 'a'] + +def test_walk_noreverse(): + nodes = [] + def walk_func(n): + nodes.append(n) + dag.walk(walk_func, reverse=False) + assert nodes == ['a', 'c', 'b', 'd'] + +@with_setup(start_with_graph) +def test_ind_nodes(): + assert dag.ind_nodes(dag.graph) == ['a'] + +@with_setup(blank_setup) +def test_topological_sort(): + dag.from_dict({'a': [], + 'b': ['a'], + 'c': ['b']}) + assert dag.topological_sort() == ['c', 'b', 'a'] + +@with_setup(start_with_graph) +def test_successful_validation(): + assert dag.validate()[0] == True + +@raises(DAGValidationError) +@with_setup(blank_setup) +def test_failed_validation(): + dag.from_dict({'a': ['b'], + 'b': ['a']}) + +@with_setup(start_with_graph) +def test_downstream(): + assert set(dag.downstream('a', dag.graph)) == set(['b', 'c']) + +@with_setup(start_with_graph) +def test_all_downstreams(): + assert dag.all_downstreams('a') == ['c', 'b', 'd'] + assert dag.all_downstreams('b') == ['d'] + assert dag.all_downstreams('d') == [] + +@with_setup(start_with_graph) +def test_all_downstreams_pass_graph(): + dag2 = DAG() + dag2.from_dict({'a': ['c'], + 'b': ['d'], + 'c': ['d'], + 'd': []}) + assert dag.all_downstreams('a', dag2.graph) == ['c', 'd'] + assert dag.all_downstreams('b', dag2.graph) == ['d'] + assert dag.all_downstreams('d', dag2.graph) == [] + +@with_setup(start_with_graph) +def test_predecessors(): + assert set(dag.predecessors('a')) == set([]) + assert set(dag.predecessors('b')) == set(['a']) + assert set(dag.predecessors('c')) == set(['a']) + assert set(dag.predecessors('d')) == set(['b', 'c']) + +@with_setup(start_with_graph) +def test_all_leaves(): + assert dag.all_leaves() == ['d'] + +@with_setup(start_with_graph) +def test_size(): + assert dag.size() == 4 + dag.delete_node('a') + assert dag.size() == 3 From 3b5ab8a33f0f7e420162ee7e3812181dff293377 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Wed, 30 Nov 2016 20:25:00 -0800 Subject: [PATCH 02/11] Use a graph to build the plan --- stacker/actions/base.py | 41 ---- stacker/actions/build.py | 22 +- stacker/actions/destroy.py | 29 +-- stacker/dag/__init__.py | 8 +- stacker/exceptions.py | 9 + stacker/plan.py | 303 ++++++++------------------ stacker/tests/actions/test_build.py | 50 ++--- stacker/tests/actions/test_destroy.py | 92 +++----- stacker/tests/blueprints/test_base.py | 1 + stacker/tests/test_dag.py | 24 +- stacker/tests/test_plan.py | 257 +++++----------------- 11 files changed, 238 insertions(+), 598 deletions(-) diff --git a/stacker/actions/base.py b/stacker/actions/base.py index 4989cbbe5..110029950 100644 --- a/stacker/actions/base.py +++ b/stacker/actions/base.py @@ -1,4 +1,3 @@ -import copy import logging import sys @@ -150,43 +149,3 @@ def _get_all_stack_names(self, dependencies): dependencies.keys() + [item for items in dependencies.values() for item in items] ) - - def get_stack_execution_order(self, dependencies): - """Return the order in which the stacks should be executed. - - Args: - - dependencies (dict): a dictionary where each key should be the - fully qualified name of a stack whose value is an array of - fully qualified stack names that the stack depends on. This is - used to generate the order in which the stacks should be - executed. - - Returns: - array: An array of stack names in the order which they should be - executed. - - """ - # copy the dependencies since we pop items out of it to get the - # execution order, we don't want to mutate the one passed in - dependencies = copy.deepcopy(dependencies) - pending_steps = [] - executed_steps = [] - stack_names = self._get_all_stack_names(dependencies) - for stack_name in stack_names: - requirements = dependencies.get(stack_name, None) - if not requirements: - dependencies.pop(stack_name, None) - pending_steps.append(stack_name) - - while dependencies: - for step in pending_steps: - for stack_name, requirements in dependencies.items(): - if step in requirements: - requirements.remove(step) - - if not requirements: - dependencies.pop(stack_name) - pending_steps.append(stack_name) - pending_steps.remove(step) - executed_steps.append(step) - return executed_steps + pending_steps diff --git a/stacker/actions/build.py b/stacker/actions/build.py index 4505630e0..62a5e1313 100644 --- a/stacker/actions/build.py +++ b/stacker/actions/build.py @@ -271,26 +271,10 @@ def _handle_missing_parameters(self, params, required_params, return params.items() def _generate_plan(self, tail=False): - plan_kwargs = {} - if tail: - plan_kwargs["watch_func"] = self.provider.tail_stack - plan = Plan(description="Create/Update stacks", **plan_kwargs) - stacks = self.context.get_stacks_dict() - dependencies = self._get_dependencies() - for stack_name in self.get_stack_execution_order(dependencies): - plan.add( - stacks[stack_name], - run_func=self._launch_stack, - requires=dependencies.get(stack_name), - ) + plan = Plan(description="Create/Update stacks") + plan.build(self.context.get_stacks()) return plan - def _get_dependencies(self): - dependencies = {} - for stack in self.context.get_stacks(): - dependencies[stack.fqn] = stack.requires - return dependencies - def pre_run(self, outline=False, *args, **kwargs): """Any steps that need to be taken prior to running the action.""" pre_build = self.context.config.get("pre_build") @@ -308,7 +292,7 @@ def run(self, outline=False, tail=False, dump=False, *args, **kwargs): if not outline and not dump: plan.outline(logging.DEBUG) logger.debug("Launching stacks: %s", ", ".join(plan.keys())) - plan.execute() + plan.execute(self._launch_stack) else: if outline: plan.outline() diff --git a/stacker/actions/destroy.py b/stacker/actions/destroy.py index 26aa40dd7..9ec5392fa 100644 --- a/stacker/actions/destroy.py +++ b/stacker/actions/destroy.py @@ -31,32 +31,9 @@ class Action(BaseAction): """ - def _get_dependencies(self, stacks_dict): - dependencies = {} - for stack_name, stack in stacks_dict.iteritems(): - required_stacks = stack.requires - if not required_stacks: - if stack_name not in dependencies: - dependencies[stack_name] = required_stacks - continue - - for requirement in required_stacks: - dependencies.setdefault(requirement, set()).add(stack_name) - return dependencies - def _generate_plan(self, tail=False): - plan_kwargs = {} - if tail: - plan_kwargs["watch_func"] = self.provider.tail_stack - plan = Plan(description="Destroy stacks", **plan_kwargs) - stacks_dict = self.context.get_stacks_dict() - dependencies = self._get_dependencies(stacks_dict) - for stack_name in self.get_stack_execution_order(dependencies): - plan.add( - stacks_dict[stack_name], - run_func=self._destroy_stack, - requires=dependencies.get(stack_name), - ) + plan = Plan(description="Destroy stacks", reverse=True) + plan.build(self.context.get_stacks()) return plan def _destroy_stack(self, stack, **kwargs): @@ -100,7 +77,7 @@ def run(self, force, tail=False, *args, **kwargs): # steps to COMPLETE in order to log them debug_plan = self._generate_plan() debug_plan.outline(logging.DEBUG) - plan.execute() + plan.execute(self._destroy_stack) else: plan.outline(message="To execute this plan, run with \"--force\" " "flag.") diff --git a/stacker/dag/__init__.py b/stacker/dag/__init__.py index ee1b71118..41903023d 100644 --- a/stacker/dag/__init__.py +++ b/stacker/dag/__init__.py @@ -132,7 +132,9 @@ def all_downstreams(self, node, graph=None): nodes_seen.add(downstream_node) nodes.append(downstream_node) i += 1 - return filter(lambda node: node in nodes_seen, self.topological_sort(graph=graph)) + return filter( + lambda node: node + in nodes_seen, self.topological_sort(graph=graph)) def all_leaves(self, graph=None): """ Return a list of all leaves (nodes with no downstreams) """ @@ -164,7 +166,9 @@ def ind_nodes(self, graph=None): if graph is None: graph = self.graph - dependent_nodes = set(node for dependents in graph.itervalues() for node in dependents) + dependent_nodes = set( + node for dependents + in graph.itervalues() for node in dependents) return [node for node in graph.keys() if node not in dependent_nodes] def validate(self, graph=None): diff --git a/stacker/exceptions.py b/stacker/exceptions.py index 71ee2d879..ebc666a3d 100644 --- a/stacker/exceptions.py +++ b/stacker/exceptions.py @@ -137,3 +137,12 @@ def __init__(self, variable, validator, value, exception=None): def __str__(self): return self.message + + +class CyclicDependencyError(Exception): + """Raised when there are cyclic dependencies between stacks.""" + + def __init__(self, fqn, *args, **kwargs): + self.fqn = fqn + message = "Cyclic dependency detected in %s" % (fqn) + super(CyclicDependencyError, self).__init__(message, *args, **kwargs) diff --git a/stacker/plan.py b/stacker/plan.py index 406f1b739..0ed2495f4 100644 --- a/stacker/plan.py +++ b/stacker/plan.py @@ -1,6 +1,4 @@ -from collections import OrderedDict import logging -import multiprocessing import os import time import uuid @@ -8,14 +6,13 @@ from colorama.ansi import Fore from .exceptions import ( - CancelExecution, - ImproperlyConfigured, + CyclicDependencyError, ) +from .dag import DAG, DAGValidationError from .actions.base import stack_template_key_name from .status import ( - Status, PENDING, SUBMITTED, COMPLETE, @@ -25,24 +22,20 @@ logger = logging.getLogger(__name__) +def sleep(): + time.sleep(5) + + class Step(object): """State machine for executing generic actions related to stacks. - Args: stack (:class:`stacker.stack.Stack`): the stack associated with this step - run_func (func): the function to be run for the given stack - requires (list, optional): List of stacks this step depends on being - completed before running. This step will not be executed unless the - required stacks have either completed or skipped. - """ - def __init__(self, stack, run_func, requires=None): + def __init__(self, stack): self.stack = stack self.status = PENDING - self.requires = requires or [] - self._run_func = run_func self.last_updated = time.time() def __repr__(self): @@ -68,12 +61,8 @@ def submitted(self): """Returns True if the step is SUBMITTED, COMPLETE, or SKIPPED.""" return self.status >= SUBMITTED - def run(self): - return self._run_func(self.stack, status=self.status) - def set_status(self, status): """Sets the current step's status. - Args: status (:class:`Status ` object): The status to set the step to. @@ -97,174 +86,79 @@ def submit(self): self.set_status(SUBMITTED) -class Plan(OrderedDict): - """A collection of :class:`Step` objects to execute. - - The :class:`Plan` helps organize the steps needed to execute a particular - action for a set of :class:`stacker.stack.Stack` objects. It will run the - steps in the order they are added to the `Plan` via the :func:`Plan.add` - function. If a `Step` specifies requirements, the `Plan` will wait until - the required stacks have completed before executing that `Step`. - - Args: - description (str): description of the plan - sleep_time (int, optional): the amount of time that will be passed to - the `wait_func`. Default: 5 seconds. - wait_func (func, optional): the function to be called after each pass - of running stacks. This defaults to :func:`time.sleep` and will - sleep for the given `sleep_time` before starting the next pass. - Default: :func:`time.sleep` - - """ - - def __init__(self, description, sleep_time=5, wait_func=None, - watch_func=None, *args, **kwargs): +class Plan(): + def __init__(self, description, reverse=False, sleep_func=sleep): self.description = description - self.sleep_time = sleep_time - if wait_func is not None: - if not callable(wait_func): - raise ImproperlyConfigured(self.__class__, - "\"wait_func\" must be a callable") - self._wait_func = wait_func - else: - self._wait_func = time.sleep - - self._watchers = {} - self._watch_func = watch_func + self._dag = None + self._steps = {} + self._reverse = reverse + self._sleep_func = sleep_func self.id = uuid.uuid4() - super(Plan, self).__init__(*args, **kwargs) - - def add(self, stack, run_func, requires=None): - """Add a new step to the plan. - - Args: - stack (:class:`stacker.stack.Stack`): The stack to add to the plan. - run_func (function): The function to call when the step is ran. - requires (list, optional): A list of other stacks that are required - to be complete before this step is started. - """ - self[stack.fqn] = Step( - stack=stack, - run_func=run_func, - requires=requires, - ) - def list_status(self, status): - """Returns a list of steps in the given status. - - Args: - status (:class:`Status`): The status to match steps against. - - Returns: - list: A list of :class:`Step` objects that are in the given status. + def build(self, stacks): + """ Builds an internal dag from the stacks and their dependencies """ + dag = DAG() + + for stack in stacks: + fqn = stack.fqn + dag.add_node(fqn) + self._steps[fqn] = Step( + stack=stack) + + for stack in stacks: + for dep in stack.requires: + try: + dag.add_edge(stack.fqn, dep) + except DAGValidationError: + raise CyclicDependencyError(stack.fqn) + + self._dag = dag + return None + + def execute(self, fn): + """ Executes the plan by walking the graph and executing dependencies + first. """ - return [step for step in self.iteritems() if step[1].status == status] - - def list_completed(self): - """A shortcut for list_status(COMPLETE)""" - return self.list_status(COMPLETE) - - def list_submitted(self): - """A shortcut for list_status(SUBMITTED)""" - return self.list_status(SUBMITTED) - - def list_skipped(self): - """A shortcut for list_status(SKIPPED)""" - return self.list_status(SKIPPED) - - def list_pending(self): - """Pending is any task that isn't COMPLETE or SKIPPED. """ - return [step for step in self.iteritems() if ( - step[1].status != COMPLETE and - step[1].status != SKIPPED - )] - - @property - def completed(self): - """True if there are no more pending steps.""" - if self.list_pending(): - return False + check_point = self._check_point + sleep_func = self._sleep_func + + # This function is called for each step in the graph, it's responsible + # for managing the lifecycle of the step until completion. + def step_func(step): + while not step.done: + check_point() + status = fn(step.stack, status=step.status) + step.set_status(status) + check_point() + if sleep_func: + sleep_func() + + self._walk_steps(step_func) return True - def _single_run(self): - """Executes a single run through the plan, touching each step.""" - for step_name, step in self.list_pending(): - waiting_on = [] - for required_stack in step.requires: - if not self[required_stack].completed and \ - not self[required_stack].skipped: - waiting_on.append(required_stack) - - if waiting_on: - logger.debug( - "Stack: \"%s\" waiting on required stacks: %s", - step.stack.name, - ", ".join(waiting_on), - ) - continue - - # Kick off watchers - used for tailing the stack - if ( - not step.done and - self._watch_func and - step_name not in self._watchers - ): - process = multiprocessing.Process( - target=self._watch_func, - args=(step.stack,) - ) - self._watchers[step_name] = process - process.start() - - status = step.run() - if not isinstance(status, Status): - raise ValueError( - "Step run_func must return a valid Status object. " - "(Returned type: %s)" % (type(status))) - step.set_status(status) - - # Terminate any watchers when step completes - if step.done and step_name in self._watchers: - self._terminate_watcher(self._watchers[step_name]) - - return self.completed - - def _terminate_watcher(self, watcher): - if watcher.is_alive(): - watcher.terminate() - watcher.join() - - def execute(self): - """Execute the plan. - - This will run through all of the steps registered with the plan and - submit them in parallel based on their dependencies. - """ + def dump(self, directory): + logger.info("Dumping \"%s\"...", self.description) + directory = os.path.expanduser(directory) + if not os.path.exists(directory): + os.makedirs(directory) - attempts = 0 - try: - while not self.completed: - if not attempts % 10: - self._check_point() + def step_func(step): + blueprint = step.stack.blueprint + filename = stack_template_key_name(blueprint) + path = os.path.join(directory, filename) + logger.info("Writing stack \"%s\" -> %s", step.stack.fqn, path) + with open(path, "w") as f: + f.write(blueprint.rendered) - attempts += 1 - if not self._single_run(): - self._wait_func(self.sleep_time) - except CancelExecution: - logger.info("Cancelling execution") - return - finally: - for watcher in self._watchers.values(): - self._terminate_watcher(watcher) + self._walk_steps(step_func) - self._check_point() + def keys(self): + return self._steps.keys() def outline(self, level=logging.INFO, message=""): """Print an outline of the actions the plan is going to take. - The outline will represent the rough ordering of the steps that will be taken. - Args: level (int, optional): a valid log level that should be used to log the outline @@ -273,49 +167,32 @@ def outline(self, level=logging.INFO, message=""): """ steps = 1 logger.log(level, "Plan \"%s\":", self.description) - while not self.completed: - step_name, step = self.list_pending()[0] + + def step_func(step): logger.log( level, - " - step: %s: target: \"%s\", action: \"%s\"", + " - step: %s: target: \"%s\"", steps, - step_name, - step._run_func.__name__, + step.stack.fqn, ) - # Set the status to COMPLETE directly so we don't call the - # completion func - step.status = COMPLETE - steps += 1 + + self._walk_steps(step_func) if message: logger.log(level, message) - self.reset() - - def reset(self): - for _, step in self.iteritems(): - step.status = PENDING + def _walk_steps(self, step_func): + steps = self._steps - def dump(self, directory): - steps = 1 - logger.info("Dumping \"%s\"...", self.description) - directory = os.path.expanduser(directory) - if not os.path.exists(directory): - os.makedirs(directory) - - while not self.completed: - step_name, step = self.list_pending()[0] - blueprint = step.stack.blueprint - filename = stack_template_key_name(blueprint) - path = os.path.join(directory, filename) - logger.info("Writing stack \"%s\" -> %s", step_name, path) - with open(path, "w") as f: - f.write(blueprint.rendered) + def walk_func(fqn): + step = steps[fqn] + step_func(step) - step.status = COMPLETE - steps += 1 + reverse = True + if self._reverse: + reverse = False - self.reset() + return self._dag.walk(walk_func, reverse=reverse) def _check_point(self): """Outputs the current status of all steps in the plan.""" @@ -325,22 +202,26 @@ def _check_point(self): } logger.info("Plan Status:", extra={"reset": True, "loop": self.id}) - longest = 0 + class local: + longest = 0 messages = [] - for step_name, step in self.iteritems(): - length = len(step_name) - if length > longest: - longest = length - msg = "%s: %s" % (step_name, step.status.name) + def step_func(step): + length = len(step.stack.fqn) + if length > local.longest: + local.longest = length + + msg = "%s: %s" % (step.stack.fqn, step.status.name) if step.status.reason: msg += " (%s)" % (step.status.reason) messages.append((msg, step)) + self._walk_steps(step_func) + for msg, step in messages: parts = msg.split(' ', 1) - fmt = "\t{0: <%d}{1}" % (longest + 2,) + fmt = "\t{0: <%d}{1}" % (local.longest + 2,) color = status_to_color.get(step.status.code, Fore.WHITE) logger.info(fmt.format(*parts), extra={ 'loop': self.id, diff --git a/stacker/tests/actions/test_build.py b/stacker/tests/actions/test_build.py index 6ca4d375a..bf7afc8a7 100644 --- a/stacker/tests/actions/test_build.py +++ b/stacker/tests/actions/test_build.py @@ -7,6 +7,7 @@ from stacker.actions import build from stacker.actions.build import resolve_parameters from stacker.context import Context +from stacker.plan import Step from stacker.exceptions import StackDidNotChange from stacker.providers.base import BaseProvider from stacker.status import ( @@ -104,37 +105,17 @@ def test_stack_params_dont_override_given_params(self): required, stack) self.assertEqual(result, def_params.items()) - def test_get_dependencies(self): - context = self._get_context() - build_action = build.Action(context) - dependencies = build_action._get_dependencies() - self.assertEqual( - dependencies[context.get_fqn("bastion")], - set([context.get_fqn("vpc")]), - ) - self.assertEqual( - dependencies[context.get_fqn("db")], - set([context.get_fqn(s) for s in ["vpc", "bastion"]]), - ) - self.assertFalse(dependencies[context.get_fqn("other")]) - - def test_get_stack_execution_order(self): - context = self._get_context() - build_action = build.Action(context) - dependencies = build_action._get_dependencies() - execution_order = build_action.get_stack_execution_order(dependencies) - self.assertEqual( - execution_order, - [context.get_fqn(s) for s in ["other", "vpc", "bastion", "db"]], - ) - def test_generate_plan(self): context = self._get_context() build_action = build.Action(context) plan = build_action._generate_plan() self.assertEqual( - plan.keys(), - [context.get_fqn(s) for s in ["other", "vpc", "bastion", "db"]], + { + 'namespace-db': set(['namespace-bastion', 'namespace-vpc']), + 'namespace-bastion': set(['namespace-vpc']), + 'namespace-other': set([]), + 'namespace-vpc': set([])}, + plan._dag.graph ) def test_dont_execute_plan_when_outline_specified(self): @@ -159,10 +140,9 @@ def test_launch_stack_step_statuses(self): context = self._get_context() build_action = build.Action(context, provider=mock_provider) - plan = build_action._generate_plan() - _, step = plan.list_pending()[0] - step.stack = mock.MagicMock() - step.stack.locked = False + stack = mock.MagicMock() + stack.locked = False + step = Step(stack=mock.MagicMock()) # mock provider shouldn't return a stack at first since it hasn't been # launched @@ -171,7 +151,7 @@ def test_launch_stack_step_statuses(self): # initial status should be PENDING self.assertEqual(step.status, PENDING) # initial run should return SUBMITTED since we've passed off to CF - status = step.run() + status = build_action._launch_stack(step.stack, status=step.status) step.set_status(status) self.assertEqual(status, SUBMITTED) self.assertEqual(status.reason, "creating new stack") @@ -181,7 +161,7 @@ def test_launch_stack_step_statuses(self): # simulate that we're still in progress mock_provider.is_stack_in_progress.return_value = True mock_provider.is_stack_completed.return_value = False - status = step.run() + status = build_action._launch_stack(step.stack, status=step.status) step.set_status(status) # status should still be SUBMITTED since we're waiting for it to # complete @@ -190,7 +170,7 @@ def test_launch_stack_step_statuses(self): # simulate completed stack mock_provider.is_stack_completed.return_value = True mock_provider.is_stack_in_progress.return_value = False - status = step.run() + status = build_action._launch_stack(step.stack, status=step.status) step.set_status(status) self.assertEqual(status, COMPLETE) self.assertEqual(status.reason, "creating new stack") @@ -199,7 +179,7 @@ def test_launch_stack_step_statuses(self): mock_provider.is_stack_completed.return_value = False mock_provider.is_stack_in_progress.return_value = False mock_provider.update_stack.side_effect = StackDidNotChange - status = step.run() + status = build_action._launch_stack(step.stack, status=step.status) step.set_status(status) self.assertEqual(status, SKIPPED) self.assertEqual(status.reason, "nochange") @@ -208,7 +188,7 @@ def test_launch_stack_step_statuses(self): mock_provider.reset_mock() mock_provider.update_stack.side_effect = None step.set_status(PENDING) - status = step.run() + status = build_action._launch_stack(step.stack, status=step.status) step.set_status(status) self.assertEqual(status, SUBMITTED) self.assertEqual(status.reason, "updating existing stack") diff --git a/stacker/tests/actions/test_destroy.py b/stacker/tests/actions/test_destroy.py index d348bfd97..055e5e3b5 100644 --- a/stacker/tests/actions/test_destroy.py +++ b/stacker/tests/actions/test_destroy.py @@ -4,6 +4,7 @@ from stacker.actions import destroy from stacker.context import Context +from stacker.plan import Step from stacker.exceptions import StackDoesNotExist from stacker.status import ( COMPLETE, @@ -39,10 +40,25 @@ def setUp(self): def test_generate_plan(self): plan = self.action._generate_plan() - stacks = ["other", "db", "instance", "bastion", "vpc"] self.assertEqual( - [self.context.get_fqn(s) for s in stacks], - plan.keys(), + { + 'namespace-db': set( + [ + 'namespace-instance', + 'namespace-bastion', + 'namespace-vpc']), + 'namespace-instance': set( + [ + 'namespace-bastion', + 'namespace-vpc']), + 'namespace-bastion': set( + [ + 'namespace-vpc']), + 'namespace-other': set( + [ + 'namespace-db']), + 'namespace-vpc': set([])}, + plan._dag.graph, ) def test_only_execute_plan_when_forced(self): @@ -71,63 +87,6 @@ def test_destroy_stack_complete_if_state_submitted(self): # we successfully deleted it self.assertEqual(status, COMPLETE) - def test_destroy_stack_in_parallel(self): - count = {"_count": 0} - mock_provider = mock.MagicMock() - self.context.config = { - "stacks": [ - {"name": "vpc"}, - {"name": "bastion", "requires": ["vpc"]}, - {"name": "instance", "requires": ["vpc"]}, - {"name": "db", "requies": ["vpc"]}, - ], - } - stacks_dict = self.context.get_stacks_dict() - - def get_stack(stack_name): - return stacks_dict.get(stack_name) - - def get_stack_staggered(stack_name): - count["_count"] += 1 - if not count["_count"] % 2: - raise StackDoesNotExist(stack_name) - return stacks_dict.get(stack_name) - - def wait_func(*args): - # we want "get_stack" above to return staggered results for the - # stack being "deleted" to simulate waiting on stacks to complete - mock_provider.get_stack.side_effect = get_stack_staggered - - plan = self.action._generate_plan() - plan._wait_func = wait_func - - # swap for mock provider - plan.provider = mock_provider - self.action.provider = mock_provider - - # we want "get_stack" to return the mock stacks above on the first - # pass. "wait_func" will simulate the stack being deleted every second - # pass - mock_provider.get_stack.side_effect = get_stack - mock_provider.is_stack_destroyed.return_value = False - mock_provider.is_stack_in_progress.return_value = True - - independent_stacks = filter(lambda x: x.name != "vpc", - self.context.get_stacks()) - while not plan._single_run(): - # vpc should be the last stack that is deleted - if plan["namespace-vpc"].completed: - self.assertFalse(plan.list_pending()) - - # all of the independent stacks should be submitted at the same - # time - submitted_stacks = [ - plan[stack.fqn].submitted for stack in independent_stacks - ] - if any(submitted_stacks): - self.assertTrue(all(submitted_stacks)) - wait_func() - def test_destroy_stack_step_statuses(self): mock_provider = mock.MagicMock() stacks_dict = self.context.get_stacks_dict() @@ -135,8 +94,9 @@ def test_destroy_stack_step_statuses(self): def get_stack(stack_name): return stacks_dict.get(stack_name) - plan = self.action._generate_plan() - _, step = plan.list_pending()[0] + stack = mock.MagicMock() + stack.locked = False + step = Step(stack=mock.MagicMock()) # we need the AWS provider to generate the plan, but swap it for # the mock one to make the test easier self.action.provider = mock_provider @@ -144,20 +104,20 @@ def get_stack(stack_name): # simulate stack doesn't exist and we haven't submitted anything for # deletion mock_provider.get_stack.side_effect = StackDoesNotExist("mock") - status = step.run() + status = self.action._destroy_stack(step.stack, status=step.status) self.assertEqual(status, SKIPPED) # simulate stack getting successfully deleted mock_provider.get_stack.side_effect = get_stack mock_provider.is_stack_destroyed.return_value = False mock_provider.is_stack_in_progress.return_value = False - status = step.run() + status = self.action._destroy_stack(step.stack, status=step.status) self.assertEqual(status, SUBMITTED) mock_provider.is_stack_destroyed.return_value = False mock_provider.is_stack_in_progress.return_value = True - status = step.run() + status = self.action._destroy_stack(step.stack, status=step.status) self.assertEqual(status, SUBMITTED) mock_provider.is_stack_destroyed.return_value = True mock_provider.is_stack_in_progress.return_value = False - status = step.run() + status = self.action._destroy_stack(step.stack, status=step.status) self.assertEqual(status, COMPLETE) diff --git a/stacker/tests/blueprints/test_base.py b/stacker/tests/blueprints/test_base.py index 23366b2a8..1fd2fee20 100644 --- a/stacker/tests/blueprints/test_base.py +++ b/stacker/tests/blueprints/test_base.py @@ -37,6 +37,7 @@ def mock_lookup_handler(value, provider=None, context=None, fqn=False, **kwargs): return value + register_lookup_handler("mock", mock_lookup_handler) diff --git a/stacker/tests/test_dag.py b/stacker/tests/test_dag.py index 55e5f5edc..dc99be085 100644 --- a/stacker/tests/test_dag.py +++ b/stacker/tests/test_dag.py @@ -6,11 +6,13 @@ dag = None + @nottest def blank_setup(): global dag dag = DAG() + @nottest def start_with_graph(): global dag @@ -20,11 +22,13 @@ def start_with_graph(): 'c': ['d'], 'd': []}) + @with_setup(blank_setup) def test_add_node(): dag.add_node('a') assert dag.graph == {'a': set()} + @with_setup(blank_setup) def test_add_edge(): dag.add_node('a') @@ -32,6 +36,7 @@ def test_add_edge(): dag.add_edge('a', 'b') assert dag.graph == {'a': set('b'), 'b': set()} + @with_setup(blank_setup) def test_from_dict(): dag.from_dict({'a': ['b', 'c'], @@ -43,6 +48,7 @@ def test_from_dict(): 'c': set('d'), 'd': set()} + @with_setup(blank_setup) def test_reset_graph(): dag.add_node('a') @@ -50,25 +56,33 @@ def test_reset_graph(): dag.reset_graph() assert dag.graph == {} + @with_setup(start_with_graph) def test_walk(): nodes = [] + def walk_func(n): nodes.append(n) + dag.walk(walk_func) assert nodes == ['d', 'b', 'c', 'a'] + def test_walk_noreverse(): nodes = [] + def walk_func(n): nodes.append(n) + dag.walk(walk_func, reverse=False) assert nodes == ['a', 'c', 'b', 'd'] + @with_setup(start_with_graph) def test_ind_nodes(): assert dag.ind_nodes(dag.graph) == ['a'] + @with_setup(blank_setup) def test_topological_sort(): dag.from_dict({'a': [], @@ -76,9 +90,11 @@ def test_topological_sort(): 'c': ['b']}) assert dag.topological_sort() == ['c', 'b', 'a'] + @with_setup(start_with_graph) def test_successful_validation(): - assert dag.validate()[0] == True + assert dag.validate()[0] == True # noqa: E712 + @raises(DAGValidationError) @with_setup(blank_setup) @@ -86,16 +102,19 @@ def test_failed_validation(): dag.from_dict({'a': ['b'], 'b': ['a']}) + @with_setup(start_with_graph) def test_downstream(): assert set(dag.downstream('a', dag.graph)) == set(['b', 'c']) + @with_setup(start_with_graph) def test_all_downstreams(): assert dag.all_downstreams('a') == ['c', 'b', 'd'] assert dag.all_downstreams('b') == ['d'] assert dag.all_downstreams('d') == [] + @with_setup(start_with_graph) def test_all_downstreams_pass_graph(): dag2 = DAG() @@ -107,6 +126,7 @@ def test_all_downstreams_pass_graph(): assert dag.all_downstreams('b', dag2.graph) == ['d'] assert dag.all_downstreams('d', dag2.graph) == [] + @with_setup(start_with_graph) def test_predecessors(): assert set(dag.predecessors('a')) == set([]) @@ -114,10 +134,12 @@ def test_predecessors(): assert set(dag.predecessors('c')) == set(['a']) assert set(dag.predecessors('d')) == set(['b', 'c']) + @with_setup(start_with_graph) def test_all_leaves(): assert dag.all_leaves() == ['d'] + @with_setup(start_with_graph) def test_size(): assert dag.size() == 4 diff --git a/stacker/tests/test_plan.py b/stacker/tests/test_plan.py index f719f928d..d47599b1f 100644 --- a/stacker/tests/test_plan.py +++ b/stacker/tests/test_plan.py @@ -3,15 +3,13 @@ import mock from stacker.context import Context -from stacker.exceptions import ImproperlyConfigured +from stacker.exceptions import CyclicDependencyError from stacker.plan import ( Step, Plan, ) from stacker.status import ( COMPLETE, - SKIPPED, - SUBMITTED, ) from stacker.stack import Stack @@ -28,10 +26,7 @@ def setUp(self): definition=generate_definition("vpc", 1), context=self.context, ) - self.step = Step( - stack=stack, - run_func=lambda x, y: (x, y), - ) + self.step = Step(stack=stack) def test_status(self): self.assertFalse(self.step.submitted) @@ -45,216 +40,88 @@ def test_status(self): class TestPlan(unittest.TestCase): - def setUp(self): self.count = 0 self.environment = {"namespace": "namespace"} self.context = Context(self.environment) - def _run_func(self, stack, **kwargs): - self.count += 1 - if not self.count % 2: - return COMPLETE - elif self.count == 9: - return SKIPPED - return SUBMITTED - - def test_execute_plan(self): - plan = Plan(description="Test", sleep_time=0) - previous_stack = None - for i in range(5): - overrides = {} - if previous_stack: - overrides["requires"] = [previous_stack.fqn] - stack = Stack( - definition=generate_definition("vpc", i, **overrides), - context=self.context, - ) - previous_stack = stack - plan.add( - stack=stack, - run_func=self._run_func, - requires=stack.requires, - ) + def test_build_plan(self): + vpc = Stack( + definition=generate_definition('vpc', 1), + context=self.context) + bastion = Stack( + definition=generate_definition('bastion', 1, requires=[vpc.fqn]), + context=self.context) - plan.execute() - self.assertEqual(self.count, 9) - self.assertEqual(len(plan.list_skipped()), 1) + plan = Plan(description="Test", sleep_func=None) + plan.build([vpc, bastion]) - @mock.patch("stacker.plan.multiprocessing") - def test_execute_plan_with_watchers(self, patched_multiprocessing): - watch_func = mock.MagicMock() - plan = Plan(description="Test", sleep_time=0, watch_func=watch_func) - previous_stack = None - for i in range(5): - overrides = {} - if previous_stack: - overrides["requires"] = [previous_stack.fqn] - stack = Stack( - definition=generate_definition("vpc", i, **overrides), - context=self.context, - ) - previous_stack = stack - plan.add( - stack=stack, - run_func=self._run_func, - requires=stack.requires, - ) + self.assertEqual(plan._dag.graph, { + 'namespace-bastion.1': set(['namespace-vpc.1']), + 'namespace-vpc.1': set([])}) - plan.execute() - self.assertEqual(self.count, 9) - self.assertEqual(len(plan.list_skipped()), 1) - self.assertEqual(patched_multiprocessing.Process().start.call_count, 5) - # verify we terminate the process when the stack is finished and also - # redundantly terminate the process after execution - self.assertEqual( - patched_multiprocessing.Process().terminate.call_count, 10) + def test_build_plan_cyclic_dependencies(self): + vpc = Stack( + definition=generate_definition( + 'vpc', 1, requires=['namespace-bastion.1']), + context=self.context) + bastion = Stack( + definition=generate_definition( + 'bastion', 1, requires=['namespace-vpc.1']), + context=self.context) - def test_step_must_return_status(self): - plan = Plan(description="Test", sleep_time=0) - stack = Stack(definition=generate_definition("vpc", 1), - context=mock.MagicMock()) - plan.add( - stack=stack, - run_func=lambda x, **kwargs: (x), - ) - with self.assertRaises(ValueError): - plan.execute() + plan = Plan(description="Test", sleep_func=None) - def test_execute_plan_ensure_parallel_builds(self): - # key: stack_name, value: current iteration - work_states = {} - submitted_state = 0 - # It takes 4 iterations for each task to finish - finished_state = 3 + with self.assertRaises(CyclicDependencyError): + plan.build([vpc, bastion]) - def _run_func(stack, *args, **kwargs): - if stack.name not in work_states: - work_states[stack.name] = submitted_state - return SUBMITTED + def test_execute_plan(self): + vpc = Stack( + definition=generate_definition('vpc', 1), + context=self.context) + bastion = Stack( + definition=generate_definition('bastion', 1, requires=[vpc.fqn]), + context=self.context) - if work_states[stack.name] == finished_state: - return COMPLETE + plan = Plan(description="Test", sleep_func=None) + plan.build([vpc, bastion]) - work_states[stack.name] += 1 - return SUBMITTED + steps = [] - vpc_stack = Stack(definition=generate_definition("vpc", 1), - context=self.context) - web_stack = Stack( - definition=generate_definition("web", 2, requires=[vpc_stack.fqn]), - context=self.context, - ) - db_stack = Stack( - definition=generate_definition("db", 3, requires=[vpc_stack.fqn]), - context=self.context, - ) + def fn(stack, status=None): + steps.append(stack.fqn) + return COMPLETE - plan = Plan(description="Test", sleep_time=0) - for stack in [vpc_stack, web_stack, db_stack]: - plan.add( - stack=stack, - run_func=_run_func, - requires=stack.requires, - ) + plan.execute(fn) - parallel_success = False - while not plan._single_run(): - vpc_step = plan[vpc_stack.fqn] - web_step = plan[web_stack.fqn] - db_step = plan[db_stack.fqn] - if not vpc_step.completed: - self.assertFalse(web_step.submitted) - self.assertFalse(db_step.submitted) - else: - # If the vpc step is complete, and we see both the web & db - # steps submitted during the same run, then parallel running - # works - if web_step.status == SUBMITTED and \ - db_step.status == SUBMITTED: - parallel_success = True - self.assertTrue(parallel_success) + self.assertEqual(steps, ['namespace-vpc.1', 'namespace-bastion.1']) - def test_plan_wait_func_must_be_function(self): - with self.assertRaises(ImproperlyConfigured): - Plan(description="Test", wait_func="invalid") + def test_execute_plan_reverse(self): + vpc = Stack( + definition=generate_definition('vpc', 1), + context=self.context) + bastion = Stack( + definition=generate_definition('bastion', 1, requires=[vpc.fqn]), + context=self.context) - def test_plan_steps_listed_with_fqn(self): - plan = Plan(description="Test", sleep_time=0) - stack = Stack(definition=generate_definition("vpc", 1), - context=self.context) - plan.add(stack=stack, run_func=lambda x, y: (x, y)) - steps = plan.list_pending() - self.assertEqual(steps[0][0], stack.fqn) + plan = Plan(description="Test", reverse=True, sleep_func=None) + plan.build([vpc, bastion]) - def test_execute_plan_wait_func_not_called_if_complete(self): - wait_func = mock.MagicMock() - plan = Plan(description="Test", wait_func=wait_func) + steps = [] - def run_func(*args, **kwargs): + def fn(stack, status=None): + steps.append(stack.fqn) return COMPLETE - for i in range(2): - stack = Stack(definition=generate_definition("vpc", i), - context=self.context) - plan.add( - stack=stack, - run_func=run_func, - requires=stack.requires, - ) - - plan.execute() - self.assertEqual(wait_func.call_count, 0) + plan.execute(fn) - def test_reset_plan(self): - plan = Plan(description="Test", sleep_time=0) - previous_stack = None - for i in range(5): - overrides = {} - if previous_stack: - overrides["requires"] = [previous_stack.fqn] - stack = Stack( - definition=generate_definition("vpc", i, **overrides), - context=self.context, - ) - previous_stack = stack - plan.add( - stack=stack, - run_func=self._run_func, - requires=stack.requires, - ) - - plan.execute() - self.assertEqual(self.count, 9) - self.assertEqual(len(plan.list_skipped()), 1) - plan.reset() - self.assertEqual(len(plan.list_pending()), len(plan)) - - def test_reset_after_outline(self): - plan = Plan(description="Test", sleep_time=0) - previous_stack = None - for i in range(5): - overrides = {} - if previous_stack: - overrides["requires"] = [previous_stack.fqn] - stack = Stack( - definition=generate_definition("vpc", i, **overrides), - context=self.context, - ) - previous_stack = stack - plan.add( - stack=stack, - run_func=self._run_func, - requires=stack.requires, - ) - - plan.outline() - self.assertEqual(len(plan.list_pending()), len(plan)) + self.assertEqual(steps, ['namespace-bastion.1', 'namespace-vpc.1']) @mock.patch("stacker.plan.os") @mock.patch("stacker.plan.open", mock.mock_open(), create=True) - def test_reset_after_dump(self, *args): - plan = Plan(description="Test", sleep_time=0) + def test_dump(self, *args): + plan = Plan(description="Test", sleep_func=None) + stacks = [] previous_stack = None for i in range(5): overrides = {} @@ -265,11 +132,7 @@ def test_reset_after_dump(self, *args): context=self.context, ) previous_stack = stack - plan.add( - stack=stack, - run_func=self._run_func, - requires=stack.requires, - ) + stacks.append(stack) + plan.build(stacks) plan.dump("test") - self.assertEqual(len(plan.list_pending()), len(plan)) From d4686846ba2838fbbaf583b3d03d535a493cea13 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Thu, 1 Dec 2016 16:19:52 -0800 Subject: [PATCH 03/11] Better error when graph fails validation --- Makefile | 6 ++++-- stacker/dag/__init__.py | 6 +++--- stacker/exceptions.py | 17 +++++++++++------ stacker/plan.py | 6 +++--- stacker/tests/test_plan.py | 22 ++++++++++++++++------ 5 files changed, 37 insertions(+), 20 deletions(-) diff --git a/Makefile b/Makefile index 712bab3e3..5ea1df2c8 100644 --- a/Makefile +++ b/Makefile @@ -1,9 +1,11 @@ -.PHONY: build +.PHONY: build test lint build: docker build -t remind101/stacker . -test: +lint: flake8 --exclude stacker/tests/ stacker flake8 --ignore N802 stacker/tests # ignore setUp naming + +test: lint python setup.py test diff --git a/stacker/dag/__init__.py b/stacker/dag/__init__.py index 41903023d..cc07740a6 100644 --- a/stacker/dag/__init__.py +++ b/stacker/dag/__init__.py @@ -62,7 +62,7 @@ def add_edge(self, ind_node, dep_node, graph=None): if is_valid: graph[ind_node].add(dep_node) else: - raise DAGValidationError() + raise DAGValidationError(message) def delete_edge(self, ind_node, dep_node, graph=None): """ Delete an edge from the graph. """ @@ -178,8 +178,8 @@ def validate(self, graph=None): return (False, 'no independent nodes detected') try: self.topological_sort(graph) - except ValueError: - return (False, 'failed topological sort') + except ValueError as e: + return (False, e.message) return (True, 'valid') def topological_sort(self, graph=None): diff --git a/stacker/exceptions.py b/stacker/exceptions.py index ebc666a3d..a0fdae9c6 100644 --- a/stacker/exceptions.py +++ b/stacker/exceptions.py @@ -139,10 +139,15 @@ def __str__(self): return self.message -class CyclicDependencyError(Exception): - """Raised when there are cyclic dependencies between stacks.""" +class GraphError(Exception): + """Raised when the graph is invalid (e.g. acyclic dependencies) + """ - def __init__(self, fqn, *args, **kwargs): - self.fqn = fqn - message = "Cyclic dependency detected in %s" % (fqn) - super(CyclicDependencyError, self).__init__(message, *args, **kwargs) + def __init__(self, exception, stack=None, dependency=None): + self.stack = stack + self.dependency = dependency + self.exception = exception + message = ("Error detected when adding '%s' " + "as a dependency of '%s': %s") % ( + dependency, stack, exception.message) + super(GraphError, self).__init__(message) diff --git a/stacker/plan.py b/stacker/plan.py index 0ed2495f4..5d6a9bde1 100644 --- a/stacker/plan.py +++ b/stacker/plan.py @@ -6,7 +6,7 @@ from colorama.ansi import Fore from .exceptions import ( - CyclicDependencyError, + GraphError, ) from .dag import DAG, DAGValidationError @@ -109,8 +109,8 @@ def build(self, stacks): for dep in stack.requires: try: dag.add_edge(stack.fqn, dep) - except DAGValidationError: - raise CyclicDependencyError(stack.fqn) + except DAGValidationError as e: + raise GraphError(e, stack.fqn, dep) self._dag = dag return None diff --git a/stacker/tests/test_plan.py b/stacker/tests/test_plan.py index d47599b1f..f7d32d8bb 100644 --- a/stacker/tests/test_plan.py +++ b/stacker/tests/test_plan.py @@ -3,7 +3,7 @@ import mock from stacker.context import Context -from stacker.exceptions import CyclicDependencyError +from stacker.exceptions import GraphError from stacker.plan import ( Step, Plan, @@ -63,17 +63,27 @@ def test_build_plan(self): def test_build_plan_cyclic_dependencies(self): vpc = Stack( definition=generate_definition( - 'vpc', 1, requires=['namespace-bastion.1']), + 'vpc', 1), context=self.context) - bastion = Stack( + db = Stack( + definition=generate_definition( + 'db', 1, requires=['namespace-app.1']), + context=self.context) + app = Stack( definition=generate_definition( - 'bastion', 1, requires=['namespace-vpc.1']), + 'app', 1, requires=['namespace-db.1']), context=self.context) plan = Plan(description="Test", sleep_func=None) - with self.assertRaises(CyclicDependencyError): - plan.build([vpc, bastion]) + try: + plan.build([vpc, db, app]) + self.assertFail() + except GraphError as e: + message = ("Error detected when adding 'namespace-db.1' " + "as a dependency of 'namespace-app.1': graph is " + "not acyclic") + self.assertEqual(e.message, message) def test_execute_plan(self): vpc = Stack( From 6d8ab4e6d58ec51572a28969e121d1463ad21acd Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Thu, 1 Dec 2016 16:42:34 -0800 Subject: [PATCH 04/11] Include the name of the node that didn't exist in the graph --- stacker/dag/__init__.py | 6 ++++-- stacker/plan.py | 2 ++ stacker/tests/test_plan.py | 17 +++++++++++++++++ 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/stacker/dag/__init__.py b/stacker/dag/__init__.py index cc07740a6..9857ccc3e 100644 --- a/stacker/dag/__init__.py +++ b/stacker/dag/__init__.py @@ -54,8 +54,10 @@ def add_edge(self, ind_node, dep_node, graph=None): """ Add an edge (dependency) between the specified nodes. """ if not graph: graph = self.graph - if ind_node not in graph or dep_node not in graph: - raise KeyError('one or more nodes do not exist in graph') + if ind_node not in graph: + raise KeyError('node %s does not exist' % ind_node) + if dep_node not in graph: + raise KeyError('node %s does not exist' % dep_node) test_graph = deepcopy(graph) test_graph[ind_node].add(dep_node) is_valid, message = self.validate(test_graph) diff --git a/stacker/plan.py b/stacker/plan.py index 5d6a9bde1..be905dd1f 100644 --- a/stacker/plan.py +++ b/stacker/plan.py @@ -109,6 +109,8 @@ def build(self, stacks): for dep in stack.requires: try: dag.add_edge(stack.fqn, dep) + except KeyError as e: + raise GraphError(e, stack.fqn, dep) except DAGValidationError as e: raise GraphError(e, stack.fqn, dep) diff --git a/stacker/tests/test_plan.py b/stacker/tests/test_plan.py index f7d32d8bb..1bcb843be 100644 --- a/stacker/tests/test_plan.py +++ b/stacker/tests/test_plan.py @@ -60,6 +60,23 @@ def test_build_plan(self): 'namespace-bastion.1': set(['namespace-vpc.1']), 'namespace-vpc.1': set([])}) + def test_build_plan_missing_dependency(self): + bastion = Stack( + definition=generate_definition( + 'bastion', 1, requires=['namespace-vpc.1']), + context=self.context) + + plan = Plan(description="Test", sleep_func=None) + + try: + plan.build([bastion]) + self.assertFail() + except GraphError as e: + message = ("Error detected when adding 'namespace-vpc.1' " + "as a dependency of 'namespace-bastion.1': node " + "namespace-vpc.1 does not exist") + self.assertEqual(e.message, message) + def test_build_plan_cyclic_dependencies(self): vpc = Stack( definition=generate_definition( From efb33e934e8c342853d5844e4ff5e420ae561182 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Thu, 1 Dec 2016 17:34:14 -0800 Subject: [PATCH 05/11] Docstrings and small tweaks --- stacker/dag/__init__.py | 5 +++-- stacker/plan.py | 34 ++++++++++++++++++++++++++++++++-- 2 files changed, 35 insertions(+), 4 deletions(-) diff --git a/stacker/dag/__init__.py b/stacker/dag/__init__.py index 9857ccc3e..8b20a9645 100644 --- a/stacker/dag/__init__.py +++ b/stacker/dag/__init__.py @@ -94,7 +94,6 @@ def rename_edges(self, old_task_name, new_task_name, graph=None): if not graph: graph = self.graph for node, edges in graph.items(): - if node == old_task_name: graph[new_task_name] = copy(edges) del graph[old_task_name] @@ -175,7 +174,9 @@ def ind_nodes(self, graph=None): def validate(self, graph=None): """ Returns (Boolean, message) of whether DAG is valid. """ - graph = graph if graph is not None else self.graph + if graph is None: + graph = self.graph + if len(self.ind_nodes(graph)) == 0: return (False, 'no independent nodes detected') try: diff --git a/stacker/plan.py b/stacker/plan.py index be905dd1f..86a171927 100644 --- a/stacker/plan.py +++ b/stacker/plan.py @@ -86,7 +86,26 @@ def submit(self): self.set_status(SUBMITTED) -class Plan(): +class Plan(object): + """A collection of :class:`Step` objects to execute. + The :class:`Plan` helps organize the steps needed to execute a particular + action for a set of :class:`stacker.stack.Stack` objects. Once a plan is + initialized, :func:`Plan.build` should be called with a list of stacks + to build the dependency graph. After the plan has been built, it can be + executed via :func:`Plan.execute`. + + Args: + description (str): description of the plan + reverse (bool, optional): by default, the plan will be run in + topological order based on each stacks dependencies. Put + more simply, the stacks with no dependencies will be ran + first. When this flag is set, the plan will be executed + in reverse order. This can be useful for destroy actions. + sleep_func (func, optional): when executing the plan, the + provided function may be called multiple times. This + controls the wait time between successive calls. + """ + def __init__(self, description, reverse=False, sleep_func=sleep): self.description = description self._dag = None @@ -96,7 +115,12 @@ def __init__(self, description, reverse=False, sleep_func=sleep): self.id = uuid.uuid4() def build(self, stacks): - """ Builds an internal dag from the stacks and their dependencies """ + """ Builds an internal dag from the stacks and their dependencies. + + Args: + stacks (list): a list of :class:`stacker.stack.Stack` objects + to build the plan with. + """ dag = DAG() for stack in stacks: @@ -120,6 +144,12 @@ def build(self, stacks): def execute(self, fn): """ Executes the plan by walking the graph and executing dependencies first. + + Args: + fn (func): a function that will be executed for each step. The + function will be called multiple times until the step is + `done`. The function should return a + :class:`stacker.status.Status` each time it is called. """ check_point = self._check_point sleep_func = self._sleep_func From 74de5e8a50810b92f6bdde4bb4c21654fdb3a0ca Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Thu, 1 Dec 2016 18:42:06 -0800 Subject: [PATCH 06/11] Add a DAG.transpose function, and use that to walk the graph in reverse order. --- stacker/dag/__init__.py | 19 ++++++++++++++++--- stacker/plan.py | 6 +++--- stacker/tests/test_dag.py | 19 +++++++++---------- 3 files changed, 28 insertions(+), 16 deletions(-) diff --git a/stacker/dag/__init__.py b/stacker/dag/__init__.py index 8b20a9645..4c8f2d3ba 100644 --- a/stacker/dag/__init__.py +++ b/stacker/dag/__init__.py @@ -74,7 +74,20 @@ def delete_edge(self, ind_node, dep_node, graph=None): raise KeyError('this edge does not exist in graph') graph[ind_node].remove(dep_node) - def walk(self, walk_func, graph=None, reverse=True): + def transpose(self, graph=None): + """ Builds a new graph with the edges reversed. """ + if not graph: + graph = self.graph + transposed = DAG() + for node, edges in graph.items(): + transposed.add_node(node) + for node, edges in graph.items(): + # for each edge A -> B, transpose it so that B -> A + for edge in edges: + transposed.add_edge(edge, node) + return transposed + + def walk(self, walk_func, graph=None): """ Walks each node of the graph in reverse topological order. This can be used to perform a set of operations, where the next @@ -84,8 +97,8 @@ def walk(self, walk_func, graph=None, reverse=True): if not graph: graph = self.graph nodes = self.topological_sort(graph=graph) - if reverse: - nodes.reverse() + # Reverse so we start with nodes that have no dependencies. + nodes.reverse() for n in nodes: walk_func(n) diff --git a/stacker/plan.py b/stacker/plan.py index 86a171927..12c8c7d4b 100644 --- a/stacker/plan.py +++ b/stacker/plan.py @@ -220,11 +220,11 @@ def walk_func(fqn): step = steps[fqn] step_func(step) - reverse = True + dag = self._dag if self._reverse: - reverse = False + dag = dag.transpose() - return self._dag.walk(walk_func, reverse=reverse) + return dag.walk(walk_func) def _check_point(self): """Outputs the current status of all steps in the plan.""" diff --git a/stacker/tests/test_dag.py b/stacker/tests/test_dag.py index dc99be085..4d4eb66c1 100644 --- a/stacker/tests/test_dag.py +++ b/stacker/tests/test_dag.py @@ -29,6 +29,15 @@ def test_add_node(): assert dag.graph == {'a': set()} +@with_setup(start_with_graph) +def test_transpose(): + transposed = dag.transpose() + assert transposed.graph == {'d': set(['c', 'b']), + 'c': set(['a']), + 'b': set(['a']), + 'a': set([])} + + @with_setup(blank_setup) def test_add_edge(): dag.add_node('a') @@ -68,16 +77,6 @@ def walk_func(n): assert nodes == ['d', 'b', 'c', 'a'] -def test_walk_noreverse(): - nodes = [] - - def walk_func(n): - nodes.append(n) - - dag.walk(walk_func, reverse=False) - assert nodes == ['a', 'c', 'b', 'd'] - - @with_setup(start_with_graph) def test_ind_nodes(): assert dag.ind_nodes(dag.graph) == ['a'] From 314628f5d845ae4fb9153c96d62421f8a4df3796 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Wed, 30 Nov 2016 22:36:04 -0800 Subject: [PATCH 07/11] Execute plan in parallel --- stacker/dag/__init__.py | 49 +++++++++++++++++++++++++++++++++++++++ stacker/plan.py | 16 ++++++++++--- stacker/tests/test_dag.py | 31 +++++++++++++++++++++++++ 3 files changed, 93 insertions(+), 3 deletions(-) diff --git a/stacker/dag/__init__.py b/stacker/dag/__init__.py index 4c8f2d3ba..b79430644 100644 --- a/stacker/dag/__init__.py +++ b/stacker/dag/__init__.py @@ -1,6 +1,10 @@ +import threading +import logging from copy import copy, deepcopy from collections import deque +logger = logging.getLogger(__name__) + try: from collections import OrderedDict except: @@ -102,6 +106,51 @@ def walk(self, walk_func, graph=None): for n in nodes: walk_func(n) + def walk_parallel(self, walk_func, graph=None): + """ Walks each node of the graph, in parallel if it can. + + The walk_func is only called when the nodes dependencies have been + satisfied + """ + if not graph: + graph = self.graph + nodes = self.topological_sort(graph=graph) + # Reverse so we start with nodes that have no dependencies. + nodes.reverse() + + # This allows consumers to block until a dependency has completed. + completed = {} + for node in nodes: + completed[node] = threading.Event() + + threads = [] + for node in nodes: + def fn(n, deps): + # Wait for each dependency to complete. + if deps: + logger.debug( + "%s waiting for %s to complete", + n, + ", ".join(deps)) + + for dep in deps: + completed[dep].wait() + + logger.debug("%s starting", n) + + walk_func(n) + completed[n].set() + + logger.debug("%s completed", n) + + deps = self.all_downstreams(node, graph=graph) + thread = threading.Thread(target=fn, args=(node, deps)) + thread.start() + threads.append(thread) + + for thread in threads: + thread.join() + def rename_edges(self, old_task_name, new_task_name, graph=None): """ Change references to a task in existing edges. """ if not graph: diff --git a/stacker/plan.py b/stacker/plan.py index 12c8c7d4b..d5eac8a13 100644 --- a/stacker/plan.py +++ b/stacker/plan.py @@ -2,6 +2,7 @@ import os import time import uuid +import threading from colorama.ansi import Fore @@ -113,6 +114,8 @@ def __init__(self, description, reverse=False, sleep_func=sleep): self._reverse = reverse self._sleep_func = sleep_func self.id = uuid.uuid4() + # Manages synchronization around calling `check_point`. + self._check_point_lock = threading.Lock() def build(self, stacks): """ Builds an internal dag from the stacks and their dependencies. @@ -165,7 +168,7 @@ def step_func(step): if sleep_func: sleep_func() - self._walk_steps(step_func) + self._walk_steps(step_func, parallel=True) return True def dump(self, directory): @@ -213,7 +216,7 @@ def step_func(step): if message: logger.log(level, message) - def _walk_steps(self, step_func): + def _walk_steps(self, step_func, parallel=False): steps = self._steps def walk_func(fqn): @@ -224,9 +227,15 @@ def walk_func(fqn): if self._reverse: dag = dag.transpose() - return dag.walk(walk_func) + walk = dag.walk + if parallel: + walk = dag.walk_parallel + + return walk(walk_func) def _check_point(self): + lock = self._check_point_lock + lock.acquire() """Outputs the current status of all steps in the plan.""" status_to_color = { SUBMITTED.code: Fore.YELLOW, @@ -260,3 +269,4 @@ def step_func(step): 'color': color, 'last_updated': step.last_updated, }) + lock.release() diff --git a/stacker/tests/test_dag.py b/stacker/tests/test_dag.py index 4d4eb66c1..a1363f403 100644 --- a/stacker/tests/test_dag.py +++ b/stacker/tests/test_dag.py @@ -3,6 +3,7 @@ from nose import with_setup from nose.tools import nottest, raises from stacker.dag import DAG, DAGValidationError +import threading dag = None @@ -77,6 +78,36 @@ def walk_func(n): assert nodes == ['d', 'b', 'c', 'a'] +@with_setup(blank_setup) +def test_walk_parallel(): + dag = DAG() + + # b and c should be executed at the same time. + dag.from_dict({'a': ['b', 'c'], + 'b': ['d'], + 'c': ['d'], + 'd': []}) + + lock = threading.Lock() # Protects nodes from concurrent access + nodes = [] + + c_submitted = threading.Event() + + def walk_func(n): + # Wait for c to get submitted first + if n == 'b': + c_submitted.wait() + + lock.acquire() + nodes.append(n) + if n == 'c': + c_submitted.set() + lock.release() + + dag.walk_parallel(walk_func) + assert nodes == ['d', 'c', 'b', 'a'] + + @with_setup(start_with_graph) def test_ind_nodes(): assert dag.ind_nodes(dag.graph) == ['a'] From 0159925d38f3c77e9962d80c576cc8969706882e Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Thu, 1 Dec 2016 19:51:31 -0800 Subject: [PATCH 08/11] Allow providers to disable parallel execution --- stacker/actions/build.py | 4 +++- stacker/actions/destroy.py | 4 +++- stacker/plan.py | 13 ++++++++----- stacker/providers/aws/default.py | 4 ++++ stacker/providers/aws/interactive.py | 4 ++++ stacker/tests/actions/test_build.py | 3 ++- 6 files changed, 24 insertions(+), 8 deletions(-) diff --git a/stacker/actions/build.py b/stacker/actions/build.py index 62a5e1313..ba1032e91 100644 --- a/stacker/actions/build.py +++ b/stacker/actions/build.py @@ -292,7 +292,9 @@ def run(self, outline=False, tail=False, dump=False, *args, **kwargs): if not outline and not dump: plan.outline(logging.DEBUG) logger.debug("Launching stacks: %s", ", ".join(plan.keys())) - plan.execute(self._launch_stack) + plan.execute( + self._launch_stack, + parallel=self.provider.supports_parallel) else: if outline: plan.outline() diff --git a/stacker/actions/destroy.py b/stacker/actions/destroy.py index 9ec5392fa..75d3692a6 100644 --- a/stacker/actions/destroy.py +++ b/stacker/actions/destroy.py @@ -77,7 +77,9 @@ def run(self, force, tail=False, *args, **kwargs): # steps to COMPLETE in order to log them debug_plan = self._generate_plan() debug_plan.outline(logging.DEBUG) - plan.execute(self._destroy_stack) + plan.execute( + self._destroy_stack, + parallel=self.provider.supports_parallel) else: plan.outline(message="To execute this plan, run with \"--force\" " "flag.") diff --git a/stacker/plan.py b/stacker/plan.py index d5eac8a13..9502fc629 100644 --- a/stacker/plan.py +++ b/stacker/plan.py @@ -144,7 +144,7 @@ def build(self, stacks): self._dag = dag return None - def execute(self, fn): + def execute(self, fn, parallel=True): """ Executes the plan by walking the graph and executing dependencies first. @@ -157,18 +157,21 @@ def execute(self, fn): check_point = self._check_point sleep_func = self._sleep_func + check_point() + # This function is called for each step in the graph, it's responsible # for managing the lifecycle of the step until completion. def step_func(step): while not step.done: - check_point() + current_status = step.status status = fn(step.stack, status=step.status) step.set_status(status) - check_point() - if sleep_func: + if status != current_status: + check_point() + if sleep_func and not step.done: sleep_func() - self._walk_steps(step_func, parallel=True) + self._walk_steps(step_func, parallel=parallel) return True def dump(self, directory): diff --git a/stacker/providers/aws/default.py b/stacker/providers/aws/default.py index 25282b7e6..fbe5214e4 100644 --- a/stacker/providers/aws/default.py +++ b/stacker/providers/aws/default.py @@ -94,6 +94,10 @@ def __init__(self, region, **kwargs): # see: https://github.com/remind101/stacker/issues/196 self._pid = os.getpid() + @property + def supports_parallel(self): + return True + @property def cloudformation(self): # deals w/ multiprocessing issues w/ sharing ssl conns diff --git a/stacker/providers/aws/interactive.py b/stacker/providers/aws/interactive.py index 38f20378f..6782d7f1b 100644 --- a/stacker/providers/aws/interactive.py +++ b/stacker/providers/aws/interactive.py @@ -106,6 +106,10 @@ def output_summary(fqn, action, changeset, replacements_only=False): class Provider(AWSProvider): """AWS Cloudformation Change Set Provider""" + @property + def supports_parallel(self): + return False + def __init__(self, *args, **kwargs): self.replacements_only = kwargs.pop('replacements_only', False) super(Provider, self).__init__(*args, **kwargs) diff --git a/stacker/tests/actions/test_build.py b/stacker/tests/actions/test_build.py index bf7afc8a7..a67dcaeea 100644 --- a/stacker/tests/actions/test_build.py +++ b/stacker/tests/actions/test_build.py @@ -127,8 +127,9 @@ def test_dont_execute_plan_when_outline_specified(self): self.assertEqual(mock_generate_plan().execute.call_count, 0) def test_execute_plan_when_outline_not_specified(self): + mock_provider = mock.MagicMock() context = self._get_context() - build_action = build.Action(context) + build_action = build.Action(context, provider=mock_provider) with mock.patch.object(build_action, "_generate_plan") as \ mock_generate_plan: build_action.run(outline=False) From e8794c3579189958bce2eaa27b533e97db42eb57 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Thu, 1 Dec 2016 21:00:57 -0800 Subject: [PATCH 09/11] Cancel parallel execution if a step fails --- stacker/dag/__init__.py | 31 ++++++++++++++++++++++++++----- stacker/tests/test_dag.py | 34 +++++++++++++++++++++++++++++++++- 2 files changed, 59 insertions(+), 6 deletions(-) diff --git a/stacker/dag/__init__.py b/stacker/dag/__init__.py index b79430644..dad79651b 100644 --- a/stacker/dag/__init__.py +++ b/stacker/dag/__init__.py @@ -122,35 +122,56 @@ def walk_parallel(self, walk_func, graph=None): completed = {} for node in nodes: completed[node] = threading.Event() + cancel = threading.Event() + + # Blocks until all dependencies have been satisfied, or another + # thread errored. Returns True if all deps have been satisfied. + def wait_for_deps(deps): + while len(deps) != 0: + for i, dep in enumerate(deps): + # If there was an error in another thread, cancel this one. + if cancel.wait(0.1): + return False + if completed[dep].wait(0.1): + del deps[i] + return True threads = [] for node in nodes: def fn(n, deps): - # Wait for each dependency to complete. if deps: logger.debug( "%s waiting for %s to complete", n, ", ".join(deps)) - for dep in deps: - completed[dep].wait() + # Block until all dependencies have been satisfied, or another + # thread errored. + if not wait_for_deps(deps): + return False logger.debug("%s starting", n) - walk_func(n) + try: + walk_func(n) + except: + cancel.set() + raise + completed[n].set() logger.debug("%s completed", n) deps = self.all_downstreams(node, graph=graph) - thread = threading.Thread(target=fn, args=(node, deps)) + thread = threading.Thread(target=fn, args=(node, deps), name=node) thread.start() threads.append(thread) for thread in threads: thread.join() + return not cancel.wait(0) + def rename_edges(self, old_task_name, new_task_name, graph=None): """ Change references to a task in existing edges. """ if not graph: diff --git a/stacker/tests/test_dag.py b/stacker/tests/test_dag.py index a1363f403..a00cd8d87 100644 --- a/stacker/tests/test_dag.py +++ b/stacker/tests/test_dag.py @@ -4,6 +4,7 @@ from nose.tools import nottest, raises from stacker.dag import DAG, DAGValidationError import threading +import time dag = None @@ -94,6 +95,7 @@ def test_walk_parallel(): c_submitted = threading.Event() def walk_func(n): + time.sleep(0.2) # Wait for c to get submitted first if n == 'b': c_submitted.wait() @@ -104,10 +106,40 @@ def walk_func(n): c_submitted.set() lock.release() - dag.walk_parallel(walk_func) + ok = dag.walk_parallel(walk_func) + assert ok == True # noqa: E712 assert nodes == ['d', 'c', 'b', 'a'] +@with_setup(blank_setup) +def test_walk_parallel_exception(): + dag = DAG() + + # b and c should be executed at the same time. + dag.from_dict({'a': ['b', 'c'], + 'b': ['d'], + 'c': ['d'], + 'd': []}) + + lock = threading.Lock() # Protects nodes from concurrent access + nodes = [] + + def walk_func(n): + lock.acquire() + nodes.append(n) + lock.release() + # Wait for c to get submitted first + if n == 'd': + raise Exception('well shit') + + ok = dag.walk_parallel(walk_func) + + # Only 2 should have been hit. The rest are canceled because they depend on + # the success of d. + assert ok == False # noqa: E712 + assert nodes == ['d'] + + @with_setup(start_with_graph) def test_ind_nodes(): assert dag.ind_nodes(dag.graph) == ['a'] From fe6ea29a23825e9bc47fce793e681556f7d9ca69 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Fri, 2 Dec 2016 17:45:33 -0800 Subject: [PATCH 10/11] Trap SIGINT and SIGTERM and signal threads to stop. --- stacker/actions/base.py | 16 ++++++++++++++++ stacker/actions/build.py | 5 +++-- stacker/actions/destroy.py | 5 +++-- stacker/dag/__init__.py | 7 ++++--- stacker/plan.py | 8 ++++---- stacker/tests/test_dag.py | 30 ++++++++++++++++++++++++++++++ 6 files changed, 60 insertions(+), 11 deletions(-) diff --git a/stacker/actions/base.py b/stacker/actions/base.py index 110029950..06088768e 100644 --- a/stacker/actions/base.py +++ b/stacker/actions/base.py @@ -1,5 +1,7 @@ import logging import sys +import threading +import signal import boto3 import botocore.exceptions @@ -7,6 +9,20 @@ logger = logging.getLogger(__name__) +def cancel(): + """Returns a threading.Event() that will get set when SIGTERM, or + SIGINT are triggered. This can be used to cancel execution of threads. + """ + cancel = threading.Event() + + def cancel_execution(signum, frame): + cancel.set() + + signal.signal(signal.SIGINT, cancel_execution) + signal.signal(signal.SIGTERM, cancel_execution) + return cancel + + def stack_template_key_name(blueprint): """Given a blueprint, produce an appropriate key name. diff --git a/stacker/actions/build.py b/stacker/actions/build.py index ba1032e91..015ccf6cd 100644 --- a/stacker/actions/build.py +++ b/stacker/actions/build.py @@ -1,6 +1,6 @@ import logging -from .base import BaseAction +from .base import BaseAction, cancel from .. import exceptions, util from ..exceptions import StackDidNotChange from ..plan import Plan @@ -294,7 +294,8 @@ def run(self, outline=False, tail=False, dump=False, *args, **kwargs): logger.debug("Launching stacks: %s", ", ".join(plan.keys())) plan.execute( self._launch_stack, - parallel=self.provider.supports_parallel) + parallel=self.provider.supports_parallel, + cancel=cancel()) else: if outline: plan.outline() diff --git a/stacker/actions/destroy.py b/stacker/actions/destroy.py index 75d3692a6..ac9115e37 100644 --- a/stacker/actions/destroy.py +++ b/stacker/actions/destroy.py @@ -1,6 +1,6 @@ import logging -from .base import BaseAction +from .base import BaseAction, cancel from ..exceptions import StackDoesNotExist from .. import util from ..status import ( @@ -79,7 +79,8 @@ def run(self, force, tail=False, *args, **kwargs): debug_plan.outline(logging.DEBUG) plan.execute( self._destroy_stack, - parallel=self.provider.supports_parallel) + parallel=self.provider.supports_parallel, + cancel=cancel()) else: plan.outline(message="To execute this plan, run with \"--force\" " "flag.") diff --git a/stacker/dag/__init__.py b/stacker/dag/__init__.py index dad79651b..d22e595d6 100644 --- a/stacker/dag/__init__.py +++ b/stacker/dag/__init__.py @@ -91,7 +91,7 @@ def transpose(self, graph=None): transposed.add_edge(edge, node) return transposed - def walk(self, walk_func, graph=None): + def walk(self, walk_func, graph=None, cancel=None): """ Walks each node of the graph in reverse topological order. This can be used to perform a set of operations, where the next @@ -106,7 +106,7 @@ def walk(self, walk_func, graph=None): for n in nodes: walk_func(n) - def walk_parallel(self, walk_func, graph=None): + def walk_parallel(self, walk_func, graph=None, cancel=None): """ Walks each node of the graph, in parallel if it can. The walk_func is only called when the nodes dependencies have been @@ -122,7 +122,8 @@ def walk_parallel(self, walk_func, graph=None): completed = {} for node in nodes: completed[node] = threading.Event() - cancel = threading.Event() + if not cancel: + cancel = threading.Event() # Blocks until all dependencies have been satisfied, or another # thread errored. Returns True if all deps have been satisfied. diff --git a/stacker/plan.py b/stacker/plan.py index 9502fc629..435789db9 100644 --- a/stacker/plan.py +++ b/stacker/plan.py @@ -144,7 +144,7 @@ def build(self, stacks): self._dag = dag return None - def execute(self, fn, parallel=True): + def execute(self, fn, parallel=True, cancel=None): """ Executes the plan by walking the graph and executing dependencies first. @@ -171,7 +171,7 @@ def step_func(step): if sleep_func and not step.done: sleep_func() - self._walk_steps(step_func, parallel=parallel) + self._walk_steps(step_func, parallel=parallel, cancel=cancel) return True def dump(self, directory): @@ -219,7 +219,7 @@ def step_func(step): if message: logger.log(level, message) - def _walk_steps(self, step_func, parallel=False): + def _walk_steps(self, step_func, parallel=False, cancel=None): steps = self._steps def walk_func(fqn): @@ -234,7 +234,7 @@ def walk_func(fqn): if parallel: walk = dag.walk_parallel - return walk(walk_func) + return walk(walk_func, cancel=cancel) def _check_point(self): lock = self._check_point_lock diff --git a/stacker/tests/test_dag.py b/stacker/tests/test_dag.py index a00cd8d87..fa8e7b49f 100644 --- a/stacker/tests/test_dag.py +++ b/stacker/tests/test_dag.py @@ -140,6 +140,36 @@ def walk_func(n): assert nodes == ['d'] +@with_setup(blank_setup) +def test_walk_parallel_cancel(): + dag = DAG() + + # b and c should be executed at the same time. + dag.from_dict({'a': ['b', 'c'], + 'b': ['d'], + 'c': ['d'], + 'd': []}) + + cancel = threading.Event() + lock = threading.Lock() # Protects nodes from concurrent access + nodes = [] + + def walk_func(n): + lock.acquire() + nodes.append(n) + lock.release() + # Wait for c to get submitted first + if n == 'd': + cancel.set() + + ok = dag.walk_parallel(walk_func, cancel=cancel) + + # Only 2 should have been hit. The rest are canceled because they depend on + # the success of d. + assert ok == False # noqa: E712 + assert nodes == ['d'] + + @with_setup(start_with_graph) def test_ind_nodes(): assert dag.ind_nodes(dag.graph) == ['a'] From 68ff4a7bbbc726d4f2c26d1b1157254e861b40a2 Mon Sep 17 00:00:00 2001 From: "Eric J. Holmes" Date: Fri, 2 Dec 2016 20:08:14 -0800 Subject: [PATCH 11/11] Synchronize around calls to `fn` within `execute` --- stacker/plan.py | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/stacker/plan.py b/stacker/plan.py index 435789db9..e03d6a7df 100644 --- a/stacker/plan.py +++ b/stacker/plan.py @@ -114,8 +114,8 @@ def __init__(self, description, reverse=False, sleep_func=sleep): self._reverse = reverse self._sleep_func = sleep_func self.id = uuid.uuid4() - # Manages synchronization around calling `check_point`. - self._check_point_lock = threading.Lock() + # Manages synchronization around calling `fn` within `execute`. + self._lock = threading.Lock() def build(self, stacks): """ Builds an internal dag from the stacks and their dependencies. @@ -156,6 +156,7 @@ def execute(self, fn, parallel=True, cancel=None): """ check_point = self._check_point sleep_func = self._sleep_func + lock = self._lock check_point() @@ -163,11 +164,14 @@ def execute(self, fn, parallel=True, cancel=None): # for managing the lifecycle of the step until completion. def step_func(step): while not step.done: + lock.acquire() current_status = step.status status = fn(step.stack, status=step.status) step.set_status(status) if status != current_status: check_point() + lock.release() + if sleep_func and not step.done: sleep_func() @@ -237,8 +241,6 @@ def walk_func(fqn): return walk(walk_func, cancel=cancel) def _check_point(self): - lock = self._check_point_lock - lock.acquire() """Outputs the current status of all steps in the plan.""" status_to_color = { SUBMITTED.code: Fore.YELLOW, @@ -272,4 +274,3 @@ def step_func(step): 'color': color, 'last_updated': step.last_updated, }) - lock.release()