import collections
import numbers
import copy
import array
import numpy as np
from numpy.random import uniform
try:
import matplotlib.pyplot as plt
from matplotlib.animation import FuncAnimation
from matplotlib.collections import LineCollection
plt.style.use('ggplot')
HAS_MATPLOTLIB = True
except ImportError:
HAS_MATPLOTLIB = False
from queueing_tool.graph import _prepare_graph
from queueing_tool.queues import (
NullQueue,
QueueServer,
LossQueue
)
from queueing_tool.network.priority_queue import PriorityQueue
EPS = np.float64(1e-7)
v_pens = [
[0.5, 0.5, 0.5, 0.5],
[0, 0.5, 1, 1],
[0.133, 0.545, 0.133, 1],
[0.282, 0.239, 0.545, 1],
[1, 0.135, 0, 1]
]
[docs]class QueueNetwork(object):
"""A class that simulates a network of queues.
Takes a networkx :any:`DiGraph<networkx.DiGraph>` and places queues
on each edge of the graph. The simulations are event based, and
this class handles the scheduling of events.
Each edge on the graph has a *type*, and this *type* is used to
specify the subclass of :class:`.QueueServer` and arguments used
when creating the queue that sits on each edge.
Parameters
----------
g : :any:`networkx.DiGraph`, :class:`numpy.ndarray`, dict, \
``None``, etc.
Any object that networkx can turn into a
:any:`DiGraph<networkx.DiGraph>`. The graph specifies the
network, and the queues sit on top of the edges.
q_classes : dict (optional)
Used to specify the :class:`.QueueServer` class for each edge
type. The keys are integers for the edge types, and the values
are classes.
q_args : dict (optional)
Used to specify the class arguments for each type of
:class:`.QueueServer`. The keys are integers for the edge types
and the values are dictionarys holding the arguments that are
passed when instantiating each :class:`.QueueServer` created
with that edge type.
seed : int (optional)
An integer used to initialize numpy's psuedo-random number
generator.
colors : dict (optional)
A dictionary of RGBA colors used to color the graph. The keys
are specified in the Notes section. If this parameter is
supplied but a particular key is missing, then the default
value for that key is used.
max_agents : int (optional, default: 1000)
The maximum number of agents that can be in the network at any
time.
blocking : ``{'BAS', 'RS'}`` (optional, default: ``'BAS'``)
Specifies the blocking behavior for the system. If `blocking
is not ``'RS'``, then it is assumed to be ``'BAS'``.
``'BAS'``
*Blocking After Service*: when an agent attempts to enter a
:class:`.LossQueue` that is at capacity the agent is forced
to wait at his current queue until an agent departs from
the target queue.
``'RS'``
*Repetitive Service Blocking*: when an agent attempts to
enter a :class:`.LossQueue` that is at capacity, the agent
is forced to receive another service from the queue it is
departing from. After the agent receives the service, she
then checks to see if the desired queue is still at
capacity, and if it is this process is repeated, otherwise
she enters the queue.
adjust_graph : bool (optional, default: ``True``)
Specifies whether the graph will be adjusted to make sure
terminal nodes do not cause any issues when simulating. For
most cases, this should be set to ``True``.
Attributes
----------
blocking : str
Specifies whether the system's blocking behavior is either
Blocking After Service (BAS) or Repetitive Service Blocking (RS).
colors : dict
A dictionary of colors used when drawing a graph. See the notes
for the defaults.
current_time : float
The time of the last event.
edge2queue : list
A list of queues where the ``edge2queue[k]`` returns the queue
on the edge with edge index ``k``.
g : :class:`.QueueNetworkDiGraph`
The graph for the network.
default_classes : dict
Specifies the default queue classes for each edge type.
default_colors : dict
Specifies various default colors.
default_q_colors : dict
Specifies the default colors used by the queues.
in_edges : list
A mapping between vertex indices and the in-edges at that
vertex. Specifically, ``in_edges[v]`` returns a list containing
the edge index for all edges with the head of the edge at
``v``, where ``v`` is the vertex's index number.
max_agents : int
The maximum number of agents that can be in the network at any
time.
num_agents : :class:`~numpy.ndarray`
A one-dimensional array where the ``k``-th entry corresponds to
the total number of agents in the :class:`.QueueServer` with
edge index ``k``. This number includes agents that are
scheduled to arrive at the queue at some future time but
haven't yet.
num_edges : int
The number of edges in the graph.
num_events : int
The number of events that have occurred thus far. Every arrival
from outside the network counts as one event, but the departure
of an agent from a queue and the arrival of that same agent to
another queue counts as one event.
num_vertices : int
The number of vertices in the graph.
num_nodes : int
The number of vertices in the graph.
out_edges : list
A mapping between vertex indices and the out-edges at that
vertex. Specifically, ``out_edges[v]`` returns a list
containing the edge index for all edges with the tail of the
edge at ``v``, where ``v`` is the vertex's index number.
time : float
The time of the next event.
Raises
------
TypeError
Raised when the parameter ``g`` is not of a type that can be
made into a :any:`networkx.DiGraph`, or when ``g`` is not
``None``.
Notes
-----
* If only :class:`Agents<.Agent>` enter the network, then the
``QueueNetwork`` instance is a `Jackson network`_. The default
transition probabilities at any vertex ``v`` is
``1 / g.out_degree(v)`` for each adjacent vertex.
* This class must be initialized before any simulations can take
place. To initialize, call the :meth:`~initialize` method.
* When simulating the network, the departure of an agent from one
queue coincides with their arrival to another. There is no time
lag between these events.
* When defining your ``q_classes`` you should not assign queues
with edge type ``0`` to anything other than the
:class:`.NullQueue` class. Edges with edge type ``0`` are
treated by ``QueueNetwork`` as terminal edges (edges that point
to a terminal vertex).
* If an edge type is used in your network but not given in
``q_classes`` parameter then the defaults are used, which are:
>>> default_classes = { # doctest: +SKIP
... 0: qt.NullQueue,
... 1: qt.QueueServer,
... 2: qt.LossQueue,
... 3: qt.LossQueue,
... 4: qt.LossQueue
... }
For example, if your network has type ``0``, ``1``, and ``2``
edges but your ``q_classes`` parameter looks like:
>>> my_classes = {1 : qt.ResourceQueue} # doctest: +SKIP
then each type ``0`` and type ``2`` edge is a :class:`.NullQueue`
and :class:`.LossQueue` respectively.
* The following properties are assigned as a node or edge attribute
to the graph; their default values for each edge or node is shown:
* ``vertex_pen_width``: ``1.1``,
* ``vertex_size``: ``8``,
* ``edge_control_points``: ``[]``
* ``edge_marker_size``: ``8``
* ``edge_pen_width``: ``1.25``
There are also property maps created for graph visualization,
they are ``vertex_color``, ``vertex_fill_color``, ``pos``, and
``edge_color``. The default colors, which are used by various
methods, are:
>>> default_colors = {
... 'vertex_fill_color': [0.9, 0.9, 0.9, 1.0],
... 'vertex_color': [0.0, 0.5, 1.0, 1.0],
... 'vertex_highlight': [0.5, 0.5, 0.5, 1.0],
... 'edge_departure': [0, 0, 0, 1],
... 'vertex_active': [0.1, 1.0, 0.5, 1.0],
... 'vertex_inactive': [0.9, 0.9, 0.9, 0.8],
... 'edge_active': [0.1, 0.1, 0.1, 1.0],
... 'edge_inactive': [0.8, 0.8, 0.8, 0.3],
... 'bgcolor': [1, 1, 1, 1]
... }
.. _Jackson network: http://en.wikipedia.org/wiki/Jackson_network
Examples
--------
The following creates a queueing network with the Moebius-Kantor
graph. Each queue has 5 servers, and the same arrival and service
distributions:
>>> import queueing_tool as qt
>>> import networkx as nx
>>> import numpy as np
>>>
>>> g = nx.moebius_kantor_graph()
>>> q_cl = {1: qt.QueueServer}
>>> def arr(t):
... return t + np.random.gamma(4, 0.0025)
>>> def ser(t):
... return t + np.random.exponential(0.025)
>>> q_ar = {
... 1: {
... 'arrival_f': arr,
... 'service_f': ser,
... 'num_servers': 5
... }
... }
>>> net = qt.QueueNetwork(g, q_classes=q_cl, q_args=q_ar, seed=13)
To specify that arrivals enter from type 1 edges and simulate run:
>>> net.initialize(edge_type=1)
>>> net.simulate(n=100)
Now we'd like to see how many agents are in type 1 edges:
>>> nA = [(q.num_system, q.edge[2]) for q in net.edge2queue if q.edge[3] == 1]
>>> nA.sort(reverse=True)
>>> nA[:5] # doctest: +SKIP
[(4, 37), (4, 34), (3, 43), (3, 32), (3, 30)]
To view the state of the network do the following (note, you need
to have pygraphviz installed and your graph may be rotated):
>>> net.simulate(n=500)
>>> pos = nx.nx_agraph.graphviz_layout(g.to_undirected(), prog='neato') # doctest: +SKIP
>>> net.draw(pos=pos) # doctest: +SKIP
<...>
.. figure:: my_network1.png
:align: center
"""
default_colors = {
'vertex_fill_color': [0.95, 0.95, 0.95, 1.0],
'vertex_color': [0.0, 0.5, 1.0, 1.0],
'vertex_highlight': [0.5, 0.5, 0.5, 1.0],
'edge_departure': [0, 0, 0, 1],
'vertex_active': [0.1, 1.0, 0.5, 1.0],
'vertex_inactive': [0.95, 0.95, 0.95, 1.0],
'edge_active': [0.1, 0.1, 0.1, 1.0],
'edge_inactive': [0.8, 0.8, 0.8, 0.3],
'bgcolor': [1, 1, 1, 1]
}
default_classes = {
0: NullQueue,
1: QueueServer,
2: LossQueue,
3: LossQueue,
4: LossQueue
}
default_q_colors = {
k: {'edge_loop_color': [0, 0, 0, 0],
'edge_color': [0.8, 0.8, 0.8, 1.0],
'vertex_fill_color': [0.95, 0.95, 0.95, 1.0],
'vertex_color': v_pens[k]}
for k in range(5)
}
def __init__(self, g, q_classes=None, q_args=None, seed=None, colors=None,
max_agents=1000, blocking='BAS', adjust_graph=True):
if not isinstance(blocking, str):
raise TypeError("blocking must be a string")
self._t = 0
self.num_events = 0
self.max_agents = max_agents
self._initialized = False
self._prev_edge = None
self._fancy_heap = PriorityQueue()
self._blocking = True if blocking.lower() != 'rs' else False
if colors is None:
colors = {}
for key, value in self.default_colors.items():
if key not in colors:
colors[key] = value
self.colors = colors
if q_classes is None:
q_classes = self.default_classes
else:
for k in set(self.default_classes.keys()) - set(q_classes.keys()):
q_classes[k] = self.default_classes[k]
if q_args is None:
q_args = {k: {} for k in range(5)}
else:
for k in set(q_classes.keys()) - set(q_args.keys()):
q_args[k] = {}
for key, args in q_args.items():
if 'colors' not in args:
args['colors'] = self.default_q_colors.get(key, self.default_q_colors[1])
if isinstance(seed, numbers.Integral):
np.random.seed(seed)
if g is not None:
g, qs = _prepare_graph(g, self.colors, q_classes, q_args, adjust_graph)
self.nV = g.number_of_nodes()
self.nE = g.number_of_edges()
self.edge2queue = qs
self.num_agents = np.zeros(g.number_of_edges(), int)
self.out_edges = [0 for v in range(self.nV)]
self.in_edges = [0 for v in range(self.nV)]
self._route_probs = [0 for v in range(self.nV)]
for v in g.nodes():
vod = g.out_degree(v)
probs = array.array('d', [1. / vod for i in range(vod)])
self.out_edges[v] = [g.edge_index[e] for e in sorted(g.out_edges(v))]
self.in_edges[v] = [g.edge_index[e] for e in sorted(g.in_edges(v))]
self._route_probs[v] = probs
g.freeze()
self.g = g
def __repr__(self):
the_string = 'QueueNetwork. # nodes: {0}, edges: {1}, agents: {2}'
return the_string.format(self.nV, self.nE, np.sum(self.num_agents))
@property
def blocking(self):
return 'BAS' if self._blocking else 'RS'
@blocking.setter
def blocking(self, tmp):
if not isinstance(tmp, str):
raise TypeError("blocking must be a string")
self._blocking = True if tmp.lower() != 'rs' else False
@property
def num_vertices(self):
return self.nV
@property
def num_nodes(self):
return self.nV
@property
def num_edges(self):
return self.nE
@property
def current_time(self):
return self._t
@property
def time(self):
if self._fancy_heap.size > 0:
e = self._fancy_heap.array_edges[0]
t = self.edge2queue[e]._time
else:
t = np.infty
return t
[docs] def animate(self, out=None, t=None, line_kwargs=None,
scatter_kwargs=None, **kwargs):
"""Animates the network as it's simulating.
The animations can be saved to disk or viewed in interactive
mode. Closing the window ends the animation if viewed in
interactive mode. This method calls
:meth:`~matplotlib.axes.scatter`, and
:class:`~matplotlib.collections.LineCollection`, and any
keyword arguments they accept can be passed to them.
Parameters
----------
out : str (optional)
The location where the frames for the images will be saved.
If this parameter is not given, then the animation is shown
in interactive mode.
t : float (optional)
The amount of simulation time to simulate forward. If
given, and ``out`` is given, ``t`` is used instead of
``n``.
line_kwargs : dict (optional, default: None)
Any keyword arguments accepted by
:class:`~matplotlib.collections.LineCollection`.
scatter_kwargs : dict (optional, default: None)
Any keyword arguments accepted by
:meth:`~matplotlib.axes.Axes.scatter`.
bgcolor : list (optional, keyword only)
A list with 4 floats representing a RGBA color. The
default is defined in ``self.colors['bgcolor']``.
figsize : tuple (optional, keyword only, default: ``(7, 7)``)
The width and height of the figure in inches.
**kwargs :
This method calls
:class:`~matplotlib.animation.FuncAnimation` and
optionally :meth:`.matplotlib.animation.FuncAnimation.save`.
Any keyword that can be passed to these functions are
passed via ``kwargs``.
Notes
-----
There are several parameters automatically set and passed to
matplotlib's :meth:`~matplotlib.axes.Axes.scatter`,
:class:`~matplotlib.collections.LineCollection`, and
:class:`~matplotlib.animation.FuncAnimation` by default.
These include:
* :class:`~matplotlib.animation.FuncAnimation`: Uses the
defaults for that function. Saving the animation is done
by passing the 'filename' keyword argument to this method.
This method also accepts any keyword arguments accepted
by :meth:`~matplotlib.animation.FuncAnimation.save`.
* :class:`~matplotlib.collections.LineCollection`: The default
arguments are taken from
:meth:`.QueueNetworkDiGraph.lines_scatter_args`.
* :meth:`~matplotlib.axes.Axes.scatter`: The default
arguments are taken from
:meth:`.QueueNetworkDiGraph.lines_scatter_args`.
Raises
------
QueueingToolError
Will raise a :exc:`.QueueingToolError` if the
``QueueNetwork`` has not been initialized. Call
:meth:`.initialize` before running.
Examples
--------
This function works similarly to ``QueueNetwork's``
:meth:`.draw` method.
>>> import queueing_tool as qt
>>> g = qt.generate_pagerank_graph(100, seed=13)
>>> net = qt.QueueNetwork(g, seed=13)
>>> net.initialize()
>>> net.animate(figsize=(4, 4)) # doctest: +SKIP
To stop the animation just close the window. If you want to
write the animation to disk run something like the following:
>>> kwargs = {
... 'filename': 'test.mp4',
... 'frames': 300,
... 'fps': 30,
... 'writer': 'mencoder',
... 'figsize': (4, 4),
... 'vertex_size': 15
... }
>>> net.animate(**kwargs) # doctest: +SKIP
"""
if not self._initialized:
msg = ("Network has not been initialized. "
"Call '.initialize()' first.")
raise QueueingToolError(msg)
if not HAS_MATPLOTLIB:
msg = "Matplotlib is necessary to animate a simulation."
raise ImportError(msg)
self._update_all_colors()
kwargs.setdefault('bgcolor', self.colors['bgcolor'])
fig = plt.figure(figsize=kwargs.get('figsize', (7, 7)))
ax = fig.gca()
mpl_kwargs = {
'line_kwargs': line_kwargs,
'scatter_kwargs': scatter_kwargs,
'pos': kwargs.get('pos')
}
line_args, scat_args = self.g.lines_scatter_args(**mpl_kwargs)
lines = LineCollection(**line_args)
lines = ax.add_collection(lines)
scatt = ax.scatter(**scat_args)
t = np.infty if t is None else t
now = self._t
def update(frame_number):
if t is not None:
if self._t > now + t:
return False
self._simulate_next_event(slow=True)
lines.set_color(line_args['colors'])
scatt.set_edgecolors(scat_args['edgecolors'])
scatt.set_facecolor(scat_args['c'])
if hasattr(ax, 'set_facecolor'):
ax.set_facecolor(kwargs['bgcolor'])
else:
ax.set_axis_bgcolor(kwargs['bgcolor'])
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
animation_args = {
'fargs': None,
'event_source': None,
'init_func': None,
'frames': None,
'blit': False,
'interval': 10,
'repeat': None,
'func': update,
'repeat_delay': None,
'fig': fig,
'save_count': None,
}
for key, value in kwargs.items():
if key in animation_args:
animation_args[key] = value
animation = FuncAnimation(**animation_args)
if 'filename' not in kwargs:
plt.ioff()
plt.show()
else:
save_args = {
'filename': None,
'writer': None,
'fps': None,
'dpi': None,
'codec': None,
'bitrate': None,
'extra_args': None,
'metadata': None,
'extra_anim': None,
'savefig_kwargs': None
}
for key, value in kwargs.items():
if key in save_args:
save_args[key] = value
animation.save(**save_args)
[docs] def clear(self):
"""Resets the queue to its initial state.
The attributes ``t``, ``num_events``, ``num_agents`` are set to
zero, :meth:`.reset_colors` is called, and the
:meth:`.QueueServer.clear` method is called for each queue in
the network.
Notes
-----
``QueueNetwork`` must be re-initialized before any simulations
can run.
"""
self._t = 0
self.num_events = 0
self.num_agents = np.zeros(self.nE, int)
self._fancy_heap = PriorityQueue()
self._prev_edge = None
self._initialized = False
self.reset_colors()
for q in self.edge2queue:
q.clear()
[docs] def clear_data(self, queues=None, edge=None, edge_type=None):
"""Clears data from all queues.
If none of the parameters are given then every queue's data is
cleared.
Parameters
----------
queues : int or an iterable of int (optional)
The edge index (or an iterable of edge indices) identifying
the :class:`QueueServer(s)<.QueueServer>` whose data will
be cleared.
edge : 2-tuple of int or *array_like* (optional)
Explicitly specify which queues' data to clear. Must be
either:
* A 2-tuple of the edge's source and target vertex
indices, or
* An iterable of 2-tuples of the edge's source and
target vertex indices.
edge_type : int or an iterable of int (optional)
A integer, or a collection of integers identifying which
edge types will have their data cleared.
"""
queues = _get_queues(self.g, queues, edge, edge_type)
for k in queues:
self.edge2queue[k].data = {}
[docs] def copy(self):
"""Returns a deep copy of itself."""
net = QueueNetwork(None)
net.g = self.g.copy()
net.max_agents = copy.deepcopy(self.max_agents)
net.nV = copy.deepcopy(self.nV)
net.nE = copy.deepcopy(self.nE)
net.num_agents = copy.deepcopy(self.num_agents)
net.num_events = copy.deepcopy(self.num_events)
net._t = copy.deepcopy(self._t)
net._initialized = copy.deepcopy(self._initialized)
net._prev_edge = copy.deepcopy(self._prev_edge)
net._blocking = copy.deepcopy(self._blocking)
net.colors = copy.deepcopy(self.colors)
net.out_edges = copy.deepcopy(self.out_edges)
net.in_edges = copy.deepcopy(self.in_edges)
net.edge2queue = copy.deepcopy(self.edge2queue)
net._route_probs = copy.deepcopy(self._route_probs)
if net._initialized:
keys = [q._key() for q in net.edge2queue if q._time < np.infty]
net._fancy_heap = PriorityQueue(keys, net.nE)
return net
[docs] def draw(self, update_colors=True, line_kwargs=None,
scatter_kwargs=None, **kwargs):
"""Draws the network. The coloring of the network corresponds
to the number of agents at each queue.
Parameters
----------
update_colors : ``bool`` (optional, default: ``True``).
Specifies whether all the colors are updated.
line_kwargs : dict (optional, default: None)
Any keyword arguments accepted by
:class:`~matplotlib.collections.LineCollection`
scatter_kwargs : dict (optional, default: None)
Any keyword arguments accepted by
:meth:`~matplotlib.axes.Axes.scatter`.
bgcolor : list (optional, keyword only)
A list with 4 floats representing a RGBA color. The
default is defined in ``self.colors['bgcolor']``.
figsize : tuple (optional, keyword only, default: ``(7, 7)``)
The width and height of the canvas in inches.
**kwargs
Any parameters to pass to
:meth:`.QueueNetworkDiGraph.draw_graph`.
Notes
-----
This method relies heavily on
:meth:`.QueueNetworkDiGraph.draw_graph`. Also, there is a
parameter that sets the background color of the canvas, which
is the ``bgcolor`` parameter.
Examples
--------
To draw the current state of the network, call:
>>> import queueing_tool as qt
>>> g = qt.generate_pagerank_graph(100, seed=13)
>>> net = qt.QueueNetwork(g, seed=13)
>>> net.initialize(100)
>>> net.simulate(1200)
>>> net.draw() # doctest: +SKIP
If you specify a file name and location, the drawing will be
saved to disk. For example, to save the drawing to the current
working directory do the following:
>>> net.draw(fname="state.png", scatter_kwargs={'s': 40}) # doctest: +SKIP
.. figure:: current_state1.png
:align: center
The shade of each edge depicts how many agents are located at
the corresponding queue. The shade of each vertex is determined
by the total number of inbound agents. Although loops are not
visible by default, the vertex that corresponds to a loop shows
how many agents are in that loop.
There are several additional parameters that can be passed --
all :meth:`.QueueNetworkDiGraph.draw_graph` parameters are
valid. For example, to show the edges as dashed lines do the
following.
>>> net.draw(line_kwargs={'linestyle': 'dashed'}) # doctest: +SKIP
"""
if not HAS_MATPLOTLIB:
raise ImportError("matplotlib is necessary to draw the network.")
if update_colors:
self._update_all_colors()
if 'bgcolor' not in kwargs:
kwargs['bgcolor'] = self.colors['bgcolor']
self.g.draw_graph(line_kwargs=line_kwargs,
scatter_kwargs=scatter_kwargs, **kwargs)
[docs] def get_agent_data(self, queues=None, edge=None, edge_type=None, return_header=False):
"""Gets data from queues and organizes it by agent.
If none of the parameters are given then data from every
:class:`.QueueServer` is retrieved.
Parameters
----------
queues : int or *array_like* (optional)
The edge index (or an iterable of edge indices) identifying
the :class:`QueueServer(s)<.QueueServer>` whose data will
be retrieved.
edge : 2-tuple of int or *array_like* (optional)
Explicitly specify which queues to retrieve agent data
from. Must be either:
* A 2-tuple of the edge's source and target vertex
indices, or
* An iterable of 2-tuples of the edge's source and
target vertex indices.
edge_type : int or an iterable of int (optional)
A integer, or a collection of integers identifying which
edge types to retrieve agent data from.
return_header : bool (optonal, default: False)
Determines whether the column headers are returned.
Returns
-------
dict
Returns a ``dict`` where the keys are the
:class:`Agent's<.Agent>` ``agent_id`` and the values are
:class:`ndarrays<~numpy.ndarray>` for that
:class:`Agent's<.Agent>` data. The columns of this array
are as follows:
* First: The arrival time of an agent.
* Second: The service start time of an agent.
* Third: The departure time of an agent.
* Fourth: The length of the queue upon the agents arrival.
* Fifth: The total number of :class:`Agents<.Agent>` in the
:class:`.QueueServer`.
* Sixth: the :class:`QueueServer's<.QueueServer>` id
(its edge index).
headers : str (optional)
A comma seperated string of the column headers. Returns
``'arrival,service,departure,num_queued,num_total,q_id'``
"""
queues = _get_queues(self.g, queues, edge, edge_type)
data = {}
for qid in queues:
for agent_id, dat in self.edge2queue[qid].data.items():
datum = np.zeros((len(dat), 6))
datum[:, :5] = np.array(dat)
datum[:, 5] = qid
if agent_id in data:
data[agent_id] = np.vstack((data[agent_id], datum))
else:
data[agent_id] = datum
dType = [
('a', float),
('s', float),
('d', float),
('q', float),
('n', float),
('id', float)
]
for agent_id, dat in data.items():
datum = np.array([tuple(d) for d in dat.tolist()], dtype=dType)
datum = np.sort(datum, order='a')
data[agent_id] = np.array([tuple(d) for d in datum])
if return_header:
return data, 'arrival,service,departure,num_queued,num_total,q_id'
return data
[docs] def get_queue_data(self, queues=None, edge=None, edge_type=None, return_header=False):
"""Gets data from all the queues.
If none of the parameters are given then data from every
:class:`.QueueServer` is retrieved.
Parameters
----------
queues : int or an *array_like* of int, (optional)
The edge index (or an iterable of edge indices) identifying
the :class:`QueueServer(s)<.QueueServer>` whose data will
be retrieved.
edge : 2-tuple of int or *array_like* (optional)
Explicitly specify which queues to retrieve data from. Must
be either:
* A 2-tuple of the edge's source and target vertex
indices, or
* An iterable of 2-tuples of the edge's source and
target vertex indices.
edge_type : int or an iterable of int (optional)
A integer, or a collection of integers identifying which
edge types to retrieve data from.
return_header : bool (optonal, default: False)
Determines whether the column headers are returned.
Returns
-------
out : :class:`~numpy.ndarray`
* 1st: The arrival time of an agent.
* 2nd: The service start time of an agent.
* 3rd: The departure time of an agent.
* 4th: The length of the queue upon the agents arrival.
* 5th: The total number of :class:`Agents<.Agent>` in the
:class:`.QueueServer`.
* 6th: The :class:`QueueServer's<.QueueServer>` edge index.
out : str (optional)
A comma seperated string of the column headers. Returns
``'arrival,service,departure,num_queued,num_total,q_id'```
Examples
--------
Data is not collected by default. Before simulating, by sure to
turn it on (as well as initialize the network). The following
returns data from queues with ``edge_type`` 1 or 3:
>>> import queueing_tool as qt
>>> g = qt.generate_pagerank_graph(100, seed=13)
>>> net = qt.QueueNetwork(g, seed=13)
>>> net.start_collecting_data()
>>> net.initialize(10)
>>> net.simulate(2000)
>>> data = net.get_queue_data(edge_type=(1, 3))
To get data from an edge connecting two vertices do the
following:
>>> data = net.get_queue_data(edge=(1, 50))
To get data from several edges do the following:
>>> data = net.get_queue_data(edge=[(1, 50), (10, 91), (99, 99)])
You can specify the edge indices as well:
>>> data = net.get_queue_data(queues=(20, 14, 0, 4))
"""
queues = _get_queues(self.g, queues, edge, edge_type)
data = np.zeros((0, 6))
for q in queues:
dat = self.edge2queue[q].fetch_data()
if len(dat) > 0:
data = np.vstack((data, dat))
if return_header:
return data, 'arrival,service,departure,num_queued,num_total,q_id'
return data
[docs] def initialize(self, nActive=1, queues=None, edges=None, edge_type=None):
"""Prepares the ``QueueNetwork`` for simulation.
Each :class:`.QueueServer` in the network starts inactive,
which means they do not accept arrivals from outside the
network, and they have no agents in their system. This method
sets queues to active, which then allows agents to arrive from
outside the network.
Parameters
----------
nActive : int (optional, default: ``1``)
The number of queues to set as active. The queues are
selected randomly.
queues : int *array_like* (optional)
The edge index (or an iterable of edge indices) identifying
the :class:`QueueServer(s)<.QueueServer>` to make active by.
edges : 2-tuple of int or *array_like* (optional)
Explicitly specify which queues to make active. Must be
either:
* A 2-tuple of the edge's source and target vertex
indices, or
* An iterable of 2-tuples of the edge's source and
target vertex indices.
edge_type : int or an iterable of int (optional)
A integer, or a collection of integers identifying which
edge types will be set active.
Raises
------
ValueError
If ``queues``, ``egdes``, and ``edge_type`` are all ``None``
and ``nActive`` is an integer less than 1
:exc:`~ValueError` is raised.
TypeError
If ``queues``, ``egdes``, and ``edge_type`` are all ``None``
and ``nActive`` is not an integer then a :exc:`~TypeError`
is raised.
QueueingToolError
Raised if all the queues specified are
:class:`NullQueues<.NullQueue>`.
Notes
-----
:class:`NullQueues<.NullQueue>` cannot be activated, and are
sifted out if they are specified. More specifically, every edge
with edge type 0 is sifted out.
"""
if queues is None and edges is None and edge_type is None:
if nActive >= 1 and isinstance(nActive, numbers.Integral):
qs = [q.edge[2] for q in self.edge2queue if q.edge[3] != 0]
n = min(nActive, len(qs))
queues = np.random.choice(qs, size=n, replace=False)
elif not isinstance(nActive, numbers.Integral):
msg = "If queues is None, then nActive must be an integer."
raise TypeError(msg)
else:
msg = ("If queues is None, then nActive must be a "
"positive int.")
raise ValueError(msg)
else:
queues = _get_queues(self.g, queues, edges, edge_type)
queues = [e for e in queues if self.edge2queue[e].edge[3] != 0]
if len(queues) == 0:
raise QueueingToolError("There were no queues to initialize.")
if len(queues) > self.max_agents:
queues = queues[:self.max_agents]
for ei in queues:
self.edge2queue[ei].set_active()
self.num_agents[ei] = self.edge2queue[ei]._num_total
keys = [q._key() for q in self.edge2queue if q._time < np.infty]
self._fancy_heap = PriorityQueue(keys, self.nE)
self._initialized = True
[docs] def next_event_description(self):
"""Returns whether the next event is an arrival or a departure
and the queue the event is accuring at.
Returns
-------
des : str
Indicates whether the next event is an arrival, a
departure, or nothing; returns ``'Arrival'``,
``'Departure'``, or ``'Nothing'``.
edge : int or ``None``
The edge index of the edge that this event will occur at.
If there are no events then ``None`` is returned.
"""
if self._fancy_heap.size == 0:
event_type = 'Nothing'
edge_index = None
else:
s = [q._key() for q in self.edge2queue]
s.sort()
e = s[0][1]
q = self.edge2queue[e]
event_type = 'Arrival' if q.next_event_description() == 1 else 'Departure'
edge_index = q.edge[2]
return event_type, edge_index
[docs] def reset_colors(self):
"""Resets all edge and vertex colors to their default values."""
for k, e in enumerate(self.g.edges()):
self.g.set_ep(e, 'edge_color', self.edge2queue[k].colors['edge_color'])
for v in self.g.nodes():
self.g.set_vp(v, 'vertex_fill_color', self.colors['vertex_fill_color'])
[docs] def set_transitions(self, mat):
"""Change the routing transitions probabilities for the
network.
Parameters
----------
mat : dict or :class:`~numpy.ndarray`
A transition routing matrix or transition dictionary. If
passed a dictionary, the keys are source vertex indices and
the values are dictionaries with target vertex indicies
as the keys and the probabilities of routing from the
source to the target as the values.
Raises
------
ValueError
A :exc:`.ValueError` is raised if: the keys in the dict
don't match with a vertex index in the graph; or if the
:class:`~numpy.ndarray` is passed with the wrong shape,
must be (``num_vertices``, ``num_vertices``); or the values
passed are not probabilities (for each vertex they are
positive and sum to 1);
TypeError
A :exc:`.TypeError` is raised if mat is not a dict or
:class:`~numpy.ndarray`.
Examples
--------
The default transition matrix is every out edge being equally
likely:
>>> import queueing_tool as qt
>>> adjacency = {
... 0: [2],
... 1: [2, 3],
... 2: [0, 1, 2, 4],
... 3: [1],
... 4: [2],
... }
>>> g = qt.adjacency2graph(adjacency)
>>> net = qt.QueueNetwork(g)
>>> net.transitions(False) # doctest: +ELLIPSIS
... # doctest: +NORMALIZE_WHITESPACE
{0: {2: 1.0},
1: {2: 0.5, 3: 0.5},
2: {0: 0.25, 1: 0.25, 2: 0.25, 4: 0.25},
3: {1: 1.0},
4: {2: 1.0}}
If you want to change only one vertex's transition
probabilities, you can do so with the following:
>>> net.set_transitions({1 : {2: 0.75, 3: 0.25}})
>>> net.transitions(False) # doctest: +ELLIPSIS
... # doctest: +NORMALIZE_WHITESPACE
{0: {2: 1.0},
1: {2: 0.75, 3: 0.25},
2: {0: 0.25, 1: 0.25, 2: 0.25, 4: 0.25},
3: {1: 1.0},
4: {2: 1.0}}
One can generate a transition matrix using
:func:`.generate_transition_matrix`. You can change all
transition probabilities with an :class:`~numpy.ndarray`:
>>> mat = qt.generate_transition_matrix(g, seed=10)
>>> net.set_transitions(mat)
>>> net.transitions(False) # doctest: +ELLIPSIS
... # doctest: +NORMALIZE_WHITESPACE
{0: {2: 1.0},
1: {2: 0.962..., 3: 0.037...},
2: {0: 0.301..., 1: 0.353..., 2: 0.235..., 4: 0.108...},
3: {1: 1.0},
4: {2: 1.0}}
See Also
--------
:meth:`.transitions` : Return the current routing
probabilities.
:func:`.generate_transition_matrix` : Generate a random routing
matrix.
"""
if isinstance(mat, dict):
for key, value in mat.items():
probs = list(value.values())
if key not in self.g.nodes:
msg = "One of the keys don't correspond to a vertex."
raise ValueError(msg)
elif len(self.out_edges[key]) > 0 and not np.isclose(sum(probs), 1):
msg = "Sum of transition probabilities at a vertex was not 1."
raise ValueError(msg)
elif (np.array(probs) < 0).any():
msg = "Some transition probabilities were negative."
raise ValueError(msg)
for k, e in enumerate(sorted(self.g.out_edges(key))):
self._route_probs[key][k] = value.get(e[1], 0)
elif isinstance(mat, np.ndarray):
non_terminal = np.array([self.g.out_degree(v) > 0 for v in self.g.nodes()])
if mat.shape != (self.nV, self.nV):
msg = ("Matrix is the wrong shape, should "
"be {0} x {1}.").format(self.nV, self.nV)
raise ValueError(msg)
elif not np.allclose(np.sum(mat[non_terminal, :], axis=1), 1):
msg = "Sum of transition probabilities at a vertex was not 1."
raise ValueError(msg)
elif (mat < 0).any():
raise ValueError("Some transition probabilities were negative.")
for k in range(self.nV):
for j, e in enumerate(sorted(self.g.out_edges(k))):
self._route_probs[k][j] = mat[k, e[1]]
else:
raise TypeError("mat must be a numpy array or a dict.")
[docs] def show_active(self, **kwargs):
"""Draws the network, highlighting active queues.
The colored vertices represent vertices that have at least one
queue on an in-edge that is active. Dark edges represent
queues that are active, light edges represent queues that are
inactive.
Parameters
----------
**kwargs
Any additional parameters to pass to :meth:`.draw`, and
:meth:`.QueueNetworkDiGraph.draw_graph`.
Notes
-----
Active queues are :class:`QueueServers<.QueueServer>` that
accept arrivals from outside the network. The colors are
defined by the class attribute ``colors``. The relevant keys
are ``vertex_active``, ``vertex_inactive``, ``edge_active``,
and ``edge_inactive``.
"""
g = self.g
for v in g.nodes():
self.g.set_vp(v, 'vertex_color', [0, 0, 0, 0.9])
is_active = False
my_iter = g.in_edges(v) if g.is_directed() else g.out_edges(v)
for e in my_iter:
ei = g.edge_index[e]
if self.edge2queue[ei]._active:
is_active = True
break
if is_active:
self.g.set_vp(v, 'vertex_fill_color', self.colors['vertex_active'])
else:
self.g.set_vp(v, 'vertex_fill_color', self.colors['vertex_inactive'])
for e in g.edges():
ei = g.edge_index[e]
if self.edge2queue[ei]._active:
self.g.set_ep(e, 'edge_color', self.colors['edge_active'])
else:
self.g.set_ep(e, 'edge_color', self.colors['edge_inactive'])
self.draw(update_colors=False, **kwargs)
self._update_all_colors()
[docs] def show_type(self, edge_type, **kwargs):
"""Draws the network, highlighting queues of a certain type.
The colored vertices represent self loops of type ``edge_type``.
Dark edges represent queues of type ``edge_type``.
Parameters
----------
edge_type : int
The type of vertices and edges to be shown.
**kwargs
Any additional parameters to pass to :meth:`.draw`, and
:meth:`.QueueNetworkDiGraph.draw_graph`
Notes
-----
The colors are defined by the class attribute ``colors``. The
relevant colors are ``vertex_active``, ``vertex_inactive``,
``vertex_highlight``, ``edge_active``, and ``edge_inactive``.
Examples
--------
The following code highlights all edges with edge type ``2``.
If the edge is a loop then the vertex is highlighted as well.
In this case all edges with edge type ``2`` happen to be loops.
>>> import queueing_tool as qt
>>> g = qt.generate_pagerank_graph(100, seed=13)
>>> net = qt.QueueNetwork(g, seed=13)
>>> fname = 'edge_type_2.png'
>>> net.show_type(2, fname=fname) # doctest: +SKIP
.. figure:: edge_type_2-1.png
:align: center
"""
for v in self.g.nodes():
e = (v, v)
if self.g.is_edge(e) and self.g.ep(e, 'edge_type') == edge_type:
ei = self.g.edge_index[e]
self.g.set_vp(v, 'vertex_fill_color', self.colors['vertex_highlight'])
self.g.set_vp(v, 'vertex_color', self.edge2queue[ei].colors['vertex_color'])
else:
self.g.set_vp(v, 'vertex_fill_color', self.colors['vertex_inactive'])
self.g.set_vp(v, 'vertex_color', [0, 0, 0, 0.9])
for e in self.g.edges():
if self.g.ep(e, 'edge_type') == edge_type:
self.g.set_ep(e, 'edge_color', self.colors['edge_active'])
else:
self.g.set_ep(e, 'edge_color', self.colors['edge_inactive'])
self.draw(update_colors=False, **kwargs)
self._update_all_colors()
[docs] def simulate(self, n=1, t=None):
"""Simulates the network forward.
Simulates either a specific number of events or for a specified
amount of simulation time.
Parameters
----------
n : int (optional, default: 1)
The number of events to simulate. If ``t`` is not given
then this parameter is used.
t : float (optional)
The amount of simulation time to simulate forward. If
given, ``t`` is used instead of ``n``.
Raises
------
QueueingToolError
Will raise a :exc:`.QueueingToolError` if the
``QueueNetwork`` has not been initialized. Call
:meth:`.initialize` before calling this method.
Examples
--------
Let ``net`` denote your instance of a ``QueueNetwork``. Before
you simulate, you need to initialize the network, which allows
arrivals from outside the network. To initialize with 2 (random
chosen) edges accepting arrivals run:
>>> import queueing_tool as qt
>>> g = qt.generate_pagerank_graph(100, seed=50)
>>> net = qt.QueueNetwork(g, seed=50)
>>> net.initialize(2)
To simulate the network 50000 events run:
>>> net.num_events
0
>>> net.simulate(50000)
>>> net.num_events
50000
To simulate the network for at least 75 simulation time units
run:
>>> t0 = net.current_time
>>> net.simulate(t=75)
>>> t1 = net.current_time
>>> t1 - t0 # doctest: +ELLIPSIS
75...
"""
if not self._initialized:
msg = ("Network has not been initialized. "
"Call '.initialize()' first.")
raise QueueingToolError(msg)
if t is None:
for dummy in range(n):
self._simulate_next_event(slow=False)
else:
now = self._t
while self._t < now + t:
self._simulate_next_event(slow=False)
def _simulate_next_event(self, slow=True):
if self._fancy_heap.size == 0:
self._t = np.infty
return
q1k = self._fancy_heap.pop()
q1t = q1k[0]
q1 = self.edge2queue[q1k[1]]
e1 = q1.edge[2]
event = q1.next_event_description()
self._t = q1t
self._qkey = q1k
self.num_events += 1
if event == 2: # This is a departure
e2 = q1._departures[0].desired_destination(self, q1.edge)
q2 = self.edge2queue[e2]
q2k = q2._key()
if q2.at_capacity() and e2 != e1:
q2.num_blocked += 1
q1._departures[0].blocked += 1
if self._blocking:
t = q2._departures[0]._time + EPS * uniform(0.33, 0.66)
q1.delay_service(t)
else:
q1.delay_service()
else:
agent = q1.next_event()
agent._time = q1t
q2._add_arrival(agent)
self.num_agents[e1] = q1._num_total
self.num_agents[e2] = q2._num_total
if slow:
self._update_graph_colors(qedge=q1.edge)
self._prev_edge = q1.edge
if q2._active and self.max_agents < np.infty and \
np.sum(self.num_agents) > self.max_agents - 1:
q2._active = False
q2.next_event()
self.num_agents[e2] = q2._num_total
if slow:
self._update_graph_colors(qedge=q2.edge)
self._prev_edge = q2.edge
new_q1k = q1._key()
new_q2k = q2._key()
if new_q2k[0] != q2k[0]:
self._fancy_heap.push(*new_q2k)
if new_q1k[0] < np.infty and new_q1k != new_q2k:
self._fancy_heap.push(*new_q1k)
else:
if new_q1k[0] < np.infty:
self._fancy_heap.push(*new_q1k)
elif event == 1: # This is an arrival
if q1._active and self.max_agents < np.infty and \
np.sum(self.num_agents) > self.max_agents - 1:
q1._active = False
q1.next_event()
self.num_agents[e1] = q1._num_total
if slow:
self._update_graph_colors(qedge=q1.edge)
self._prev_edge = q1.edge
new_q1k = q1._key()
if new_q1k[0] < np.infty:
self._fancy_heap.push(*new_q1k)
[docs] def start_collecting_data(self, queues=None, edge=None, edge_type=None):
"""Tells the queues to collect data on agents' arrival, service
start, and departure times.
If none of the parameters are given then every
:class:`.QueueServer` will start collecting data.
Parameters
----------
queues : :any:`int`, *array_like* (optional)
The edge index (or an iterable of edge indices) identifying
the :class:`QueueServer(s)<.QueueServer>` that will start
collecting data.
edge : 2-tuple of int or *array_like* (optional)
Explicitly specify which queues will collect data. Must be
either:
* A 2-tuple of the edge's source and target vertex
indices, or
* An iterable of 2-tuples of the edge's source and
target vertex indices.
edge_type : int or an iterable of int (optional)
A integer, or a collection of integers identifying which
edge types will be set active.
"""
queues = _get_queues(self.g, queues, edge, edge_type)
for k in queues:
self.edge2queue[k].collect_data = True
[docs] def stop_collecting_data(self, queues=None, edge=None, edge_type=None):
"""Tells the queues to stop collecting data on agents.
If none of the parameters are given then every
:class:`.QueueServer` will stop collecting data.
Parameters
----------
queues : int, *array_like* (optional)
The edge index (or an iterable of edge indices) identifying
the :class:`QueueServer(s)<.QueueServer>` that will stop
collecting data.
edge : 2-tuple of int or *array_like* (optional)
Explicitly specify which queues will stop collecting data.
Must be either:
* A 2-tuple of the edge's source and target vertex
indices, or
* An iterable of 2-tuples of the edge's source and
target vertex indices.
edge_type : int or an iterable of int (optional)
A integer, or a collection of integers identifying which
edge types will stop collecting data.
"""
queues = _get_queues(self.g, queues, edge, edge_type)
for k in queues:
self.edge2queue[k].collect_data = False
[docs] def transitions(self, return_matrix=True):
"""Returns the routing probabilities for each vertex in the
graph.
Parameters
----------
return_matrix : bool (optional, the default is ``True``)
Specifies whether an :class:`~numpy.ndarray` is returned.
If ``False``, a dict is returned instead.
Returns
-------
out : a dict or :class:`~numpy.ndarray`
The transition probabilities for each vertex in the graph.
If ``out`` is an :class:`~numpy.ndarray`, then
``out[v, u]`` returns the probability of a transition from
vertex ``v`` to vertex ``u``. If ``out`` is a dict
then ``out_edge[v][u]`` is the probability of moving from
vertex ``v`` to the vertex ``u``.
Examples
--------
Lets change the routing probabilities:
>>> import queueing_tool as qt
>>> import networkx as nx
>>> g = nx.sedgewick_maze_graph()
>>> net = qt.QueueNetwork(g)
Below is an adjacency list for the graph ``g``.
>>> ans = qt.graph2dict(g, False)
>>> {k: sorted(v) for k, v in ans.items()}
... # doctest: +NORMALIZE_WHITESPACE
{0: [2, 5, 7],
1: [7],
2: [0, 6],
3: [4, 5],
4: [3, 5, 6, 7],
5: [0, 3, 4],
6: [2, 4],
7: [0, 1, 4]}
The default transition matrix is every out edge being equally
likely:
>>> net.transitions(False) # doctest: +ELLIPSIS
... # doctest: +NORMALIZE_WHITESPACE
{0: {2: 0.333..., 5: 0.333..., 7: 0.333...},
1: {7: 1.0},
2: {0: 0.5, 6: 0.5},
3: {4: 0.5, 5: 0.5},
4: {3: 0.25, 5: 0.25, 6: 0.25, 7: 0.25},
5: {0: 0.333..., 3: 0.333..., 4: 0.333...},
6: {2: 0.5, 4: 0.5},
7: {0: 0.333..., 1: 0.333..., 4: 0.333...}}
Now we will generate a random routing matrix:
>>> mat = qt.generate_transition_matrix(g, seed=96)
>>> net.set_transitions(mat)
>>> net.transitions(False) # doctest: +ELLIPSIS
... # doctest: +NORMALIZE_WHITESPACE
{0: {2: 0.112..., 5: 0.466..., 7: 0.420...},
1: {7: 1.0},
2: {0: 0.561..., 6: 0.438...},
3: {4: 0.545..., 5: 0.454...},
4: {3: 0.374..., 5: 0.381..., 6: 0.026..., 7: 0.217...},
5: {0: 0.265..., 3: 0.460..., 4: 0.274...},
6: {2: 0.673..., 4: 0.326...},
7: {0: 0.033..., 1: 0.336..., 4: 0.630...}}
What this shows is the following: when an :class:`.Agent` is at
vertex ``2`` they will transition to vertex ``0`` with
probability ``0.561`` and route to vertex ``6`` probability
``0.438``, when at vertex ``6`` they will transition back to
vertex ``2`` with probability ``0.673`` and route vertex ``4``
probability ``0.326``, etc.
"""
if return_matrix:
mat = np.zeros((self.nV, self.nV))
for v in self.g.nodes():
ind = [e[1] for e in sorted(self.g.out_edges(v))]
mat[v, ind] = self._route_probs[v]
else:
mat = {
k: {e[1]: p for e, p in zip(sorted(self.g.out_edges(k)), value)}
for k, value in enumerate(self._route_probs)
}
return mat
def _update_all_colors(self):
do = [True for v in range(self.nV)]
for q in self.edge2queue:
e = q.edge[:2]
v = q.edge[1]
if q.edge[0] == q.edge[1]:
self.g.set_ep(e, 'edge_color', q._current_color(1))
self.g.set_vp(v, 'vertex_color', q._current_color(2))
if q.edge[3] != 0:
self.g.set_vp(v, 'vertex_fill_color', q._current_color())
do[v] = False
else:
self.g.set_ep(e, 'edge_color', q._current_color())
if do[v]:
self._update_vertex_color(v)
do[v] = False
if do[q.edge[0]]:
self._update_vertex_color(q.edge[0])
do[q.edge[0]] = False
def _update_vertex_color(self, v):
ee = (v, v)
ee_is_edge = self.g.is_edge(ee)
eei = self.g.edge_index[ee] if ee_is_edge else 0
if not ee_is_edge or (ee_is_edge and self.edge2queue[eei].edge[3] == 0):
nSy = 0
cap = 0
for ei in self.in_edges[v]:
nSy += self.edge2queue[ei].num_system
cap += self.edge2queue[ei].num_servers
div = (2 * cap) + 2.
tmp = 1. - min(nSy / div, 1.)
color = [i * tmp for i in self.colors['vertex_fill_color']]
color[3] = 1.0
self.g.set_vp(v, 'vertex_fill_color', color)
if not ee_is_edge:
self.g.set_vp(v, 'vertex_color', self.colors['vertex_color'])
def _update_graph_colors(self, qedge):
e = qedge[:2]
v = qedge[1]
if self._prev_edge is not None:
pe = self._prev_edge[:2]
pv = self._prev_edge[1]
q = self.edge2queue[self._prev_edge[2]]
if pe[0] == pe[1]:
self.g.set_ep(pe, 'edge_color', q._current_color(1))
self.g.set_vp(pv, 'vertex_color', q._current_color(2))
if q.edge[3] != 0:
self.g.set_vp(v, 'vertex_fill_color', q._current_color())
else:
self.g.set_ep(pe, 'edge_color', q._current_color())
self._update_vertex_color(pv)
q = self.edge2queue[qedge[2]]
if qedge[0] == qedge[1]:
self.g.set_ep(e, 'edge_color', q._current_color(1))
self.g.set_vp(v, 'vertex_color', q._current_color(2))
if q.edge[3] != 0:
self.g.set_vp(v, 'vertex_fill_color', q._current_color())
else:
self.g.set_ep(e, 'edge_color', q._current_color())
self._update_vertex_color(v)
def _get_queues(g, queues, edge, edge_type):
"""Used to specify edge indices from different types of arguments."""
INT = numbers.Integral
if isinstance(queues, INT):
queues = [queues]
elif queues is None:
if edge is not None:
if isinstance(edge, tuple):
if isinstance(edge[0], INT) and isinstance(edge[1], INT):
queues = [g.edge_index[edge]]
elif isinstance(edge[0], collections.abc.Iterable):
if np.array([len(e) == 2 for e in edge]).all():
queues = [g.edge_index[e] for e in edge]
else:
queues = [g.edge_index[edge]]
elif edge_type is not None:
if isinstance(edge_type, collections.abc.Iterable):
edge_type = set(edge_type)
else:
edge_type = set([edge_type])
tmp = []
for e in g.edges():
if g.ep(e, 'edge_type') in edge_type:
tmp.append(g.edge_index[e])
queues = np.array(tmp, int)
if queues is None:
queues = range(g.number_of_edges())
return queues