traci._trafficlight

  1# -*- coding: utf-8 -*-
  2# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
  3# Copyright (C) 2011-2026 German Aerospace Center (DLR) and others.
  4# This program and the accompanying materials are made available under the
  5# terms of the Eclipse Public License 2.0 which is available at
  6# https://www.eclipse.org/legal/epl-2.0/
  7# This Source Code may also be made available under the following Secondary
  8# Licenses when the conditions for such availability set forth in the Eclipse
  9# Public License 2.0 are satisfied: GNU General Public License, version 2
 10# or later which is available at
 11# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
 12# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
 13
 14# @file    _trafficlight.py
 15# @author  Michael Behrisch
 16# @date    2011-03-16
 17
 18from __future__ import absolute_import
 19from sumolib.net import Phase
 20from .domain import Domain
 21from . import constants as tc
 22from .exceptions import TraCIException, deprecated, alias_param
 23
 24
 25class Logic:
 26
 27    def __init__(self, programID, type, currentPhaseIndex, phases=None, subParameter=None):
 28        """
 29        Constructs a new Logic object with the following arguments:
 30          programID (string) - arbitrary
 31          type (integer) - one of
 32             tc.TRAFFICLIGHT_TYPE_STATIC
 33             tc.TRAFFICLIGHT_TYPE_ACTUATED
 34             tc.TRAFFICLIGHT_TYPE_NEMA
 35             tc.TRAFFICLIGHT_TYPE_DELAYBASED
 36          currentPhaseIndex (integer) - must be in range [0, len(phases) - 1]
 37          phases (list of Phase objects)
 38          subParameter (dictl) - corresponds to generic <param> objects
 39        """
 40        self.programID = programID
 41        self.type = type
 42        self.currentPhaseIndex = currentPhaseIndex
 43        self.phases = phases if phases is not None else []
 44        self.subParameter = subParameter if subParameter is not None else {}
 45
 46    def getPhases(self):
 47        return self.phases
 48
 49    def getSubID(self):
 50        return self.programID
 51
 52    def getType(self):
 53        return self.type
 54
 55    def getParameters(self):
 56        return self.subParameter
 57
 58    def getParameter(self, key, default=None):
 59        return self.subParameter.get(key, default)
 60
 61    def __repr__(self):
 62        return ("Logic(programID='%s', type=%s, currentPhaseIndex=%s, phases=%s, subParameter=%s)" %
 63                (self.programID, self.type, self.currentPhaseIndex, self.phases, self.subParameter))
 64
 65
 66def _readLogics(result):
 67    numLogics = result.readInt()
 68    logics = []
 69    for _ in range(numLogics):
 70        result.readCompound(5)
 71        programID = result.readTypedString()
 72        type = result.readTypedInt()
 73        currentPhaseIndex = result.readTypedInt()
 74        numPhases = result.readCompound()
 75        phases = []
 76        for __ in range(numPhases):
 77            result.readCompound(6)
 78            duration = result.readTypedDouble()
 79            state = result.readTypedString()
 80            minDur = result.readTypedDouble()
 81            maxDur = result.readTypedDouble()
 82            numNext = result.readCompound()
 83            next = tuple([result.readTypedInt() for ___ in range(numNext)])
 84            name = result.readTypedString()
 85            phases.append(Phase(duration, state, minDur, maxDur, next, name))
 86        logic = Logic(programID, type, currentPhaseIndex, tuple(phases))
 87        numParams = result.readCompound()
 88        for __ in range(numParams):
 89            key, value = result.readTypedStringList()
 90            logic.subParameter[key] = value
 91        logics.append(logic)
 92    return tuple(logics)
 93
 94
 95class Constraint:
 96    def __init__(self, signalId, tripId, foeId, foeSignal, limit, type, mustWait, active=True, param={}):
 97        self.signalId = signalId
 98        self.tripId = tripId
 99        self.foeId = foeId
100        self.foeSignal = foeSignal
101        self.limit = limit
102        self.type = type
103        self.mustWait = mustWait
104        self.active = active
105        self.param = param
106
107    def getParameters(self):
108        return self.param
109
110    def getParameter(self, key, default=None):
111        return self.param.get(key, default)
112
113    def __repr__(self):
114        param = ""
115        if self.param:
116            kvs = ["%s=%s" % (k, self.param[k]) for k in sorted(self.param.keys())]
117            param = " param=%s" % '|'.join(kvs)
118        return ("Constraint(signalId=%s tripId=%s, foeId=%s, foeSignal=%s, limit=%s, type=%s, mustWait=%s, active=%s%s)" %  # noqa
119                (self.signalId, self.tripId, self.foeId, self.foeSignal,
120                 self.limit, self.type, self.mustWait, self.active, param))
121
122
123def _readLinks(result):
124    result.readLength()
125    numSignals = result.readInt()  # Length
126    signals = []
127    for _ in range(numSignals):
128        # Type of Number of Controlled Links
129        result.read("!B")
130        # Number of Controlled Links
131        nbControlledLinks = result.read("!i")[0]
132        controlledLinks = []
133        for __ in range(nbControlledLinks):
134            result.read("!B")                       # Type of Link j
135            link = result.readStringList()          # Link j
136            controlledLinks.append(link)
137        signals.append(tuple(controlledLinks))
138    return tuple(signals)
139
140
141def _readConstraints(result):
142    result.readLength()
143    num = result.readInt()  # Length
144    constraints = []
145    for _ in range(num):
146        signalId = result.readTypedString()
147        tripId = result.readTypedString()
148        foeId = result.readTypedString()
149        foeSignal = result.readTypedString()
150        limit = result.readTypedInt()
151        type = result.readTypedInt()
152        mustWait = bool(result.readTypedByte())
153        active = bool(result.readTypedByte())
154        paramItems = result.readTypedStringList()
155        param = {}
156        for i in range(0, len(paramItems), 2):
157            param[paramItems[i]] = paramItems[i + 1]
158        constraints.append(Constraint(signalId, tripId, foeId, foeSignal, limit, type, mustWait, active, param))
159    return tuple(constraints)
160
161
162_RETURN_VALUE_FUNC = {tc.TL_COMPLETE_DEFINITION_RYG: _readLogics,
163                      tc.TL_CONTROLLED_LINKS: _readLinks,
164                      tc.TL_CONSTRAINT: _readConstraints,
165                      tc.TL_CONSTRAINT_BYFOE: _readConstraints,
166                      tc.TL_CONSTRAINT_SWAP: _readConstraints,
167                      }
168
169
170class TrafficLightDomain(Domain):
171
172    Phase = Phase
173    Logic = Logic
174
175    def __init__(self, name="trafficlight", deprecatedFor=None):
176        Domain.__init__(self, name, tc.CMD_GET_TL_VARIABLE, tc.CMD_SET_TL_VARIABLE,
177                        tc.CMD_SUBSCRIBE_TL_VARIABLE, tc.RESPONSE_SUBSCRIBE_TL_VARIABLE,
178                        tc.CMD_SUBSCRIBE_TL_CONTEXT, tc.RESPONSE_SUBSCRIBE_TL_CONTEXT,
179                        _RETURN_VALUE_FUNC, deprecatedFor)
180
181    def getRedYellowGreenState(self, tlsID):
182        """getRedYellowGreenState(string) -> string
183
184        Returns the named tl's state as a tuple of light definitions from
185        rugGyYoO, for red, yed-yellow, green, yellow, off, where lower case letters mean that the stream
186        has to decelerate.
187        """
188        return self._getUniversal(tc.TL_RED_YELLOW_GREEN_STATE, tlsID)
189
190    def getAllProgramLogics(self, tlsID):
191        """getAllProgramLogics(string) -> tuple(Logic)
192
193        Returns a tuple of Logic objects.
194        Each Logic encodes a traffic light program for the given tlsID.
195        """
196        return self._getUniversal(tc.TL_COMPLETE_DEFINITION_RYG, tlsID)
197
198    getCompleteRedYellowGreenDefinition = deprecated("getCompleteRedYellowGreenDefinition")(getAllProgramLogics)
199
200    def getControlledLanes(self, tlsID):
201        """getControlledLanes(string) -> tuple(string)
202
203        Returns the tuple of lanes which are controlled by the named traffic light.
204        """
205        return self._getUniversal(tc.TL_CONTROLLED_LANES, tlsID)
206
207    def getControlledLinks(self, tlsID):
208        """getControlledLinks(string) -> tuple(tuple(tuple(string)))
209
210        Returns the links controlled by the traffic light, sorted by the signal index and described by giving
211        the incoming, outgoing, and via lane.
212        """
213        return self._getUniversal(tc.TL_CONTROLLED_LINKS, tlsID)
214
215    def getProgram(self, tlsID):
216        """getProgram(string) -> string
217
218        Returns the id of the current program.
219        """
220        return self._getUniversal(tc.TL_CURRENT_PROGRAM, tlsID)
221
222    def getPhase(self, tlsID):
223        """getPhase(string) -> integer
224
225        Returns the index of the current phase within the list of all phases of
226        the current program.
227        """
228        return self._getUniversal(tc.TL_CURRENT_PHASE, tlsID)
229
230    def getPhaseName(self, tlsID):
231        """getPhaseName(string) -> string
232        Returns the name of the current phase.
233        """
234        return self._getUniversal(tc.VAR_NAME, tlsID)
235
236    def getNextSwitch(self, tlsID):
237        """getNextSwitch(string) -> double
238
239        Returns the absolute simulation time at which the traffic light is
240        schedule to switch to the next phase (in seconds).
241        """
242        return self._getUniversal(tc.TL_NEXT_SWITCH, tlsID)
243
244    def getSpentDuration(self, tlsID):
245        """getSpentDuration(string) -> double
246
247        Returns the time in seconds for which the current phase has been active
248        """
249        return self._getUniversal(tc.TL_SPENT_DURATION, tlsID)
250
251    def getPhaseDuration(self, tlsID):
252        """getPhaseDuration(string) -> double
253
254        Returns the total duration of the current phase (in seconds). This value
255        is not affected by the elapsed or remaining duration of the current phase.
256        """
257        return self._getUniversal(tc.TL_PHASE_DURATION, tlsID)
258
259    def getServedPersonCount(self, tlsID, index):
260        """getServedPersonCount(string, int) -> int
261        Returns the number of persons that would be served in the given phase
262        """
263        return self._getUniversal(tc.VAR_PERSON_NUMBER, tlsID, "i", index)
264
265    def getBlockingVehicles(self, tlsID, linkIndex):
266        """getBlockingVehicles(string, int) -> tuple(string)
267        Returns the tuple of vehicles that are blocking the subsequent block for
268        the given tls-linkIndex
269        """
270        return self._getUniversal(tc.TL_BLOCKING_VEHICLES, tlsID, "i", linkIndex)
271
272    def getRivalVehicles(self, tlsID, linkIndex):
273        """getRivalVehicles(string, int) -> tuple(string)
274        Returns the tuple of vehicles that also wish to enter the subsequent block for
275        the given tls-linkIndex (regardless of priority)
276        """
277        return self._getUniversal(tc.TL_RIVAL_VEHICLES, tlsID, "i", linkIndex)
278
279    def getPriorityVehicles(self, tlsID, linkIndex):
280        """getPriorityVehicles(string, int) -> tuple(string)
281        Returns the tuple of vehicles that also wish to enter the subsequent block for
282        the given tls-linkIndex (only those with higher priority)
283        """
284        return self._getUniversal(tc.TL_PRIORITY_VEHICLES, tlsID, "i", linkIndex)
285
286    def getConstraints(self, tlsID, tripId=""):
287        """getConstraints(string, string) -> tuple(Constraint)
288        Returns the tuple of rail signal constraints for the given rail signal.
289        If tripId is not "", only constraints with the given tripId are
290        returned. Otherwise, all constraints are returned
291        """
292        return self._getUniversal(tc.TL_CONSTRAINT, tlsID, "s", tripId)
293
294    def getConstraintsByFoe(self, foeSignal, foeId=""):
295        """getConstraintsByFoe(string, string) -> tuple(Constraint)
296        Returns the tuple of rail signal constraints that have the given rail
297        signal id as their foeSignal.
298        If foeId is not "", only constraints with the given foeId are
299        returned. Otherwise, all constraints are returned
300        """
301        return self._getUniversal(tc.TL_CONSTRAINT_BYFOE, foeSignal, "s", foeId)
302
303    def setRedYellowGreenState(self, tlsID, state):
304        """setRedYellowGreenState(string, string) -> None
305
306        Sets the named tl's state as a tuple of light definitions from
307        rugGyYuoO, for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has
308        to decelerate.
309        """
310        self._setCmd(tc.TL_RED_YELLOW_GREEN_STATE, tlsID, "s", state)
311
312    def setLinkState(self, tlsID, tlsLinkIndex, state):
313        """setLinkState(string, string, int, string) -> None
314        Sets the state for the given tls and link index. The state must be one
315        of rRgGyYoOu for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has
316        to decelerate.
317        The link index is shown in the GUI when setting the appropriate junction
318        visualization option.
319        """
320        fullState = list(self.getRedYellowGreenState(tlsID))
321        if tlsLinkIndex >= len(fullState):
322            raise TraCIException("Invalid tlsLinkIndex %s for tls '%s' with maximum index %s." % (
323                tlsLinkIndex, tlsID, len(fullState) - 1))
324        else:
325            fullState[tlsLinkIndex] = state
326            self.setRedYellowGreenState(tlsID, ''.join(fullState))
327
328    def setPhase(self, tlsID, index):
329        """setPhase(string, integer) -> None
330
331        Switches to the phase with the given index in the list of all phases for
332        the current program.
333        """
334        self._setCmd(tc.TL_PHASE_INDEX, tlsID, "i", index)
335
336    def setPhaseName(self, tlsID, name):
337        """setPhaseName(string, string) -> None
338
339        Sets the name of the current phase within the current program
340        """
341        self._setCmd(tc.VAR_NAME, tlsID, "s", name)
342
343    def setProgram(self, tlsID, programID):
344        """setProgram(string, string) -> None
345
346        Switches to the program with the given programID. The program must have
347        been loaded earlier. The special value 'off' can always be used to
348        switch off the traffic light.
349        """
350        self._setCmd(tc.TL_PROGRAM, tlsID, "s", programID)
351
352    def getNemaPhaseCalls(self, tlsID):
353        """getNemaPhaseCalls(string) -> tuple(string)
354        Get the vehicle calls for the phases.
355        The output is vehicle calls (coming from the detectors) for the phases.
356        """
357        vehCallStr = self.getParameter(tlsID, "NEMA.phaseCall")
358        return tuple(vehCallStr.split(","))
359
360    def setNemaSplits(self, tlsID, splits):
361        """setNemaSplits(string, list(string)) -> None
362        Set a new set of splits to the given NEMA-controller.
363        This function is only effective for NEMA type of controllers.
364        The new splits will be implemented in the next cycle of the control.
365        The value must be a space-separated list of 8 numbers with each number
366        being the time in seconds for NEMA-phases 1 to 8.
367        Time 0 must be used of the phase does not exists.
368        Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" (gives total cycle length of 80s)
369        """
370        if isinstance(splits, list):
371            splits = ' '.join(map(str, splits))
372        self.setParameter(tlsID, "NEMA.splits", splits)
373
374    def setNemaMaxGreens(self, tlsID, maxGreens):
375        """setNemaMaxGreens(string, list(string)) -> None
376        Set a new set of splits to the given NEMA-controller.
377        This function is only effective for NEMA type of controllers.
378        The new max green will be implemented in the next cycle of the control.
379        The value must be a space-separated list of 8 numbers with each number
380        being the time in seconds for NEMA-phases 1 to 8.
381        Time 0 must be used of the phase does not exists.
382        Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0"
383        """
384        if isinstance(maxGreens, list):
385            maxGreens = ' '.join(map(str, maxGreens))
386        self.setParameter(tlsID, "NEMA.maxGreens", maxGreens)
387
388    def setNemaCycleLength(self, tlsID, cycleLength):
389        """setNemaCycleLength(string, double) -> None
390        Set a new cycle length to the given NEMA-controller.
391        This function is only effective for NEMA type of controllers.
392        The new cycle length will be implemented in the next cycle of the control.
393        This function should be used with setNemaSplits or setNemaMaxGreen.
394        """
395        self.setParameter(tlsID, "NEMA.cycleLength", str(cycleLength))
396
397    def setNemaOffset(self, tlsID, offset):
398        """setNemaOffset(string, double) -> None
399        Set a new offset to the given NEMA-controller.
400        This function is only effective for NEMA type controllers when operating under coordinated mode.
401        The new offset will be implemented in the next cycle of the control by adjusting
402        the actual green time of the coordinated phase.
403        There is no transition implemented in the NEMA-controller for changing the offset.
404        It’s expected that the users will control the change of the offset in each cycle
405        to implement their own transition algorithm.
406        """
407        self.setParameter(tlsID, "NEMA.offset", str(offset))
408
409    def setPhaseDuration(self, tlsID, phaseDuration):
410        """setPhaseDuration(string, double) -> None
411
412        Set the remaining phase duration of the current phase in seconds.
413        This value has no effect on subsquent repetitions of this phase.
414        """
415        self._setCmd(tc.TL_PHASE_DURATION, tlsID, "d", phaseDuration)
416
417    @alias_param("logic", "tls")
418    def setProgramLogic(self, tlsID, logic):
419        """setProgramLogic(string, Logic) -> None
420
421        Sets a new program for the given tlsID from a Logic object.
422        See getAllProgramLogics which returns a tuple of Logic objects.
423        """
424        format = "tsiit"
425        values = [5, logic.programID, logic.type, logic.currentPhaseIndex, len(logic.phases)]
426        for p in logic.phases:
427            format += "tdsddt"
428            values += [6, p.duration, p.state, p.minDur, p.maxDur, len(p.next)]
429            for n in p.next:
430                format += "i"
431                values += [n]
432            format += "s"
433            values += [p.name]
434        # subparams
435        format += "t"
436        values += [len(logic.subParameter)]
437        for par in logic.subParameter.items():
438            format += "l"
439            values += [par]
440        self._setCmd(tc.TL_COMPLETE_PROGRAM_RYG, tlsID, format, *values)
441
442    setCompleteRedYellowGreenDefinition = deprecated("setCompleteRedYellowGreenDefinition")(setProgramLogic)
443
444    def addConstraint(self, tlsID, tripId, foeSignal, foeId, type=0, limit=0):
445        """addConstraint(string, string, string, string, int, int) -> None
446        Add the given constraint.
447        """
448        self._setCmd(tc.TL_CONSTRAINT_ADD, tlsID, "tsssii", 5, tripId, foeSignal, foeId, type, limit)
449
450    def swapConstraints(self, tlsID, tripId, foeSignal, foeId):
451        """swapConstraints(string, string, string, string) -> tuple(Constraint)
452        Reverse the given constraint and return a tuple of new constraints that
453        were created (by swapping) to avoid deadlock.
454        """
455        return self._getUniversal(tc.TL_CONSTRAINT_SWAP, tlsID, "tsss", 3, tripId, foeSignal, foeId)
456
457    def removeConstraints(self, tlsID, tripId, foeSignal, foeId):
458        """removeConstraints(string, string, string, string)
459        remove constraints with the given values. Any combination of inputs may
460        be set to "" to act as a wildcard filter """
461        self._setCmd(tc.TL_CONSTRAINT_REMOVE, tlsID, "tsss", 3, tripId, foeSignal, foeId)
462
463    def updateConstraints(self, vehID, tripId=""):
464        """getConstraints(string, string)
465        Removes all constraints that can no longer be met because the route of
466        vehID does not pass traffic light of the constraint with the given tripId.
467        This includes constraints on tripId as well as constraints where tripId
468        is the foeId.
469        If tripId is "", the current tripId of vehID is used.
470        """
471        self._setCmd(tc.TL_CONSTRAINT_UPDATE, vehID, "s", tripId)
class Logic:
26class Logic:
27
28    def __init__(self, programID, type, currentPhaseIndex, phases=None, subParameter=None):
29        """
30        Constructs a new Logic object with the following arguments:
31          programID (string) - arbitrary
32          type (integer) - one of
33             tc.TRAFFICLIGHT_TYPE_STATIC
34             tc.TRAFFICLIGHT_TYPE_ACTUATED
35             tc.TRAFFICLIGHT_TYPE_NEMA
36             tc.TRAFFICLIGHT_TYPE_DELAYBASED
37          currentPhaseIndex (integer) - must be in range [0, len(phases) - 1]
38          phases (list of Phase objects)
39          subParameter (dictl) - corresponds to generic <param> objects
40        """
41        self.programID = programID
42        self.type = type
43        self.currentPhaseIndex = currentPhaseIndex
44        self.phases = phases if phases is not None else []
45        self.subParameter = subParameter if subParameter is not None else {}
46
47    def getPhases(self):
48        return self.phases
49
50    def getSubID(self):
51        return self.programID
52
53    def getType(self):
54        return self.type
55
56    def getParameters(self):
57        return self.subParameter
58
59    def getParameter(self, key, default=None):
60        return self.subParameter.get(key, default)
61
62    def __repr__(self):
63        return ("Logic(programID='%s', type=%s, currentPhaseIndex=%s, phases=%s, subParameter=%s)" %
64                (self.programID, self.type, self.currentPhaseIndex, self.phases, self.subParameter))
Logic(programID, type, currentPhaseIndex, phases=None, subParameter=None)
28    def __init__(self, programID, type, currentPhaseIndex, phases=None, subParameter=None):
29        """
30        Constructs a new Logic object with the following arguments:
31          programID (string) - arbitrary
32          type (integer) - one of
33             tc.TRAFFICLIGHT_TYPE_STATIC
34             tc.TRAFFICLIGHT_TYPE_ACTUATED
35             tc.TRAFFICLIGHT_TYPE_NEMA
36             tc.TRAFFICLIGHT_TYPE_DELAYBASED
37          currentPhaseIndex (integer) - must be in range [0, len(phases) - 1]
38          phases (list of Phase objects)
39          subParameter (dictl) - corresponds to generic <param> objects
40        """
41        self.programID = programID
42        self.type = type
43        self.currentPhaseIndex = currentPhaseIndex
44        self.phases = phases if phases is not None else []
45        self.subParameter = subParameter if subParameter is not None else {}

Constructs a new Logic object with the following arguments: programID (string) - arbitrary type (integer) - one of tc.TRAFFICLIGHT_TYPE_STATIC tc.TRAFFICLIGHT_TYPE_ACTUATED tc.TRAFFICLIGHT_TYPE_NEMA tc.TRAFFICLIGHT_TYPE_DELAYBASED currentPhaseIndex (integer) - must be in range [0, len(phases) - 1] phases (list of Phase objects) subParameter (dictl) - corresponds to generic objects

programID
type
currentPhaseIndex
phases
subParameter
def getPhases(self):
47    def getPhases(self):
48        return self.phases
def getSubID(self):
50    def getSubID(self):
51        return self.programID
def getType(self):
53    def getType(self):
54        return self.type
def getParameters(self):
56    def getParameters(self):
57        return self.subParameter
def getParameter(self, key, default=None):
59    def getParameter(self, key, default=None):
60        return self.subParameter.get(key, default)
class Constraint:
 96class Constraint:
 97    def __init__(self, signalId, tripId, foeId, foeSignal, limit, type, mustWait, active=True, param={}):
 98        self.signalId = signalId
 99        self.tripId = tripId
100        self.foeId = foeId
101        self.foeSignal = foeSignal
102        self.limit = limit
103        self.type = type
104        self.mustWait = mustWait
105        self.active = active
106        self.param = param
107
108    def getParameters(self):
109        return self.param
110
111    def getParameter(self, key, default=None):
112        return self.param.get(key, default)
113
114    def __repr__(self):
115        param = ""
116        if self.param:
117            kvs = ["%s=%s" % (k, self.param[k]) for k in sorted(self.param.keys())]
118            param = " param=%s" % '|'.join(kvs)
119        return ("Constraint(signalId=%s tripId=%s, foeId=%s, foeSignal=%s, limit=%s, type=%s, mustWait=%s, active=%s%s)" %  # noqa
120                (self.signalId, self.tripId, self.foeId, self.foeSignal,
121                 self.limit, self.type, self.mustWait, self.active, param))
Constraint( signalId, tripId, foeId, foeSignal, limit, type, mustWait, active=True, param={})
 97    def __init__(self, signalId, tripId, foeId, foeSignal, limit, type, mustWait, active=True, param={}):
 98        self.signalId = signalId
 99        self.tripId = tripId
100        self.foeId = foeId
101        self.foeSignal = foeSignal
102        self.limit = limit
103        self.type = type
104        self.mustWait = mustWait
105        self.active = active
106        self.param = param
signalId
tripId
foeId
foeSignal
limit
type
mustWait
active
param
def getParameters(self):
108    def getParameters(self):
109        return self.param
def getParameter(self, key, default=None):
111    def getParameter(self, key, default=None):
112        return self.param.get(key, default)
class TrafficLightDomain(traci.domain.Domain):
171class TrafficLightDomain(Domain):
172
173    Phase = Phase
174    Logic = Logic
175
176    def __init__(self, name="trafficlight", deprecatedFor=None):
177        Domain.__init__(self, name, tc.CMD_GET_TL_VARIABLE, tc.CMD_SET_TL_VARIABLE,
178                        tc.CMD_SUBSCRIBE_TL_VARIABLE, tc.RESPONSE_SUBSCRIBE_TL_VARIABLE,
179                        tc.CMD_SUBSCRIBE_TL_CONTEXT, tc.RESPONSE_SUBSCRIBE_TL_CONTEXT,
180                        _RETURN_VALUE_FUNC, deprecatedFor)
181
182    def getRedYellowGreenState(self, tlsID):
183        """getRedYellowGreenState(string) -> string
184
185        Returns the named tl's state as a tuple of light definitions from
186        rugGyYoO, for red, yed-yellow, green, yellow, off, where lower case letters mean that the stream
187        has to decelerate.
188        """
189        return self._getUniversal(tc.TL_RED_YELLOW_GREEN_STATE, tlsID)
190
191    def getAllProgramLogics(self, tlsID):
192        """getAllProgramLogics(string) -> tuple(Logic)
193
194        Returns a tuple of Logic objects.
195        Each Logic encodes a traffic light program for the given tlsID.
196        """
197        return self._getUniversal(tc.TL_COMPLETE_DEFINITION_RYG, tlsID)
198
199    getCompleteRedYellowGreenDefinition = deprecated("getCompleteRedYellowGreenDefinition")(getAllProgramLogics)
200
201    def getControlledLanes(self, tlsID):
202        """getControlledLanes(string) -> tuple(string)
203
204        Returns the tuple of lanes which are controlled by the named traffic light.
205        """
206        return self._getUniversal(tc.TL_CONTROLLED_LANES, tlsID)
207
208    def getControlledLinks(self, tlsID):
209        """getControlledLinks(string) -> tuple(tuple(tuple(string)))
210
211        Returns the links controlled by the traffic light, sorted by the signal index and described by giving
212        the incoming, outgoing, and via lane.
213        """
214        return self._getUniversal(tc.TL_CONTROLLED_LINKS, tlsID)
215
216    def getProgram(self, tlsID):
217        """getProgram(string) -> string
218
219        Returns the id of the current program.
220        """
221        return self._getUniversal(tc.TL_CURRENT_PROGRAM, tlsID)
222
223    def getPhase(self, tlsID):
224        """getPhase(string) -> integer
225
226        Returns the index of the current phase within the list of all phases of
227        the current program.
228        """
229        return self._getUniversal(tc.TL_CURRENT_PHASE, tlsID)
230
231    def getPhaseName(self, tlsID):
232        """getPhaseName(string) -> string
233        Returns the name of the current phase.
234        """
235        return self._getUniversal(tc.VAR_NAME, tlsID)
236
237    def getNextSwitch(self, tlsID):
238        """getNextSwitch(string) -> double
239
240        Returns the absolute simulation time at which the traffic light is
241        schedule to switch to the next phase (in seconds).
242        """
243        return self._getUniversal(tc.TL_NEXT_SWITCH, tlsID)
244
245    def getSpentDuration(self, tlsID):
246        """getSpentDuration(string) -> double
247
248        Returns the time in seconds for which the current phase has been active
249        """
250        return self._getUniversal(tc.TL_SPENT_DURATION, tlsID)
251
252    def getPhaseDuration(self, tlsID):
253        """getPhaseDuration(string) -> double
254
255        Returns the total duration of the current phase (in seconds). This value
256        is not affected by the elapsed or remaining duration of the current phase.
257        """
258        return self._getUniversal(tc.TL_PHASE_DURATION, tlsID)
259
260    def getServedPersonCount(self, tlsID, index):
261        """getServedPersonCount(string, int) -> int
262        Returns the number of persons that would be served in the given phase
263        """
264        return self._getUniversal(tc.VAR_PERSON_NUMBER, tlsID, "i", index)
265
266    def getBlockingVehicles(self, tlsID, linkIndex):
267        """getBlockingVehicles(string, int) -> tuple(string)
268        Returns the tuple of vehicles that are blocking the subsequent block for
269        the given tls-linkIndex
270        """
271        return self._getUniversal(tc.TL_BLOCKING_VEHICLES, tlsID, "i", linkIndex)
272
273    def getRivalVehicles(self, tlsID, linkIndex):
274        """getRivalVehicles(string, int) -> tuple(string)
275        Returns the tuple of vehicles that also wish to enter the subsequent block for
276        the given tls-linkIndex (regardless of priority)
277        """
278        return self._getUniversal(tc.TL_RIVAL_VEHICLES, tlsID, "i", linkIndex)
279
280    def getPriorityVehicles(self, tlsID, linkIndex):
281        """getPriorityVehicles(string, int) -> tuple(string)
282        Returns the tuple of vehicles that also wish to enter the subsequent block for
283        the given tls-linkIndex (only those with higher priority)
284        """
285        return self._getUniversal(tc.TL_PRIORITY_VEHICLES, tlsID, "i", linkIndex)
286
287    def getConstraints(self, tlsID, tripId=""):
288        """getConstraints(string, string) -> tuple(Constraint)
289        Returns the tuple of rail signal constraints for the given rail signal.
290        If tripId is not "", only constraints with the given tripId are
291        returned. Otherwise, all constraints are returned
292        """
293        return self._getUniversal(tc.TL_CONSTRAINT, tlsID, "s", tripId)
294
295    def getConstraintsByFoe(self, foeSignal, foeId=""):
296        """getConstraintsByFoe(string, string) -> tuple(Constraint)
297        Returns the tuple of rail signal constraints that have the given rail
298        signal id as their foeSignal.
299        If foeId is not "", only constraints with the given foeId are
300        returned. Otherwise, all constraints are returned
301        """
302        return self._getUniversal(tc.TL_CONSTRAINT_BYFOE, foeSignal, "s", foeId)
303
304    def setRedYellowGreenState(self, tlsID, state):
305        """setRedYellowGreenState(string, string) -> None
306
307        Sets the named tl's state as a tuple of light definitions from
308        rugGyYuoO, for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has
309        to decelerate.
310        """
311        self._setCmd(tc.TL_RED_YELLOW_GREEN_STATE, tlsID, "s", state)
312
313    def setLinkState(self, tlsID, tlsLinkIndex, state):
314        """setLinkState(string, string, int, string) -> None
315        Sets the state for the given tls and link index. The state must be one
316        of rRgGyYoOu for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has
317        to decelerate.
318        The link index is shown in the GUI when setting the appropriate junction
319        visualization option.
320        """
321        fullState = list(self.getRedYellowGreenState(tlsID))
322        if tlsLinkIndex >= len(fullState):
323            raise TraCIException("Invalid tlsLinkIndex %s for tls '%s' with maximum index %s." % (
324                tlsLinkIndex, tlsID, len(fullState) - 1))
325        else:
326            fullState[tlsLinkIndex] = state
327            self.setRedYellowGreenState(tlsID, ''.join(fullState))
328
329    def setPhase(self, tlsID, index):
330        """setPhase(string, integer) -> None
331
332        Switches to the phase with the given index in the list of all phases for
333        the current program.
334        """
335        self._setCmd(tc.TL_PHASE_INDEX, tlsID, "i", index)
336
337    def setPhaseName(self, tlsID, name):
338        """setPhaseName(string, string) -> None
339
340        Sets the name of the current phase within the current program
341        """
342        self._setCmd(tc.VAR_NAME, tlsID, "s", name)
343
344    def setProgram(self, tlsID, programID):
345        """setProgram(string, string) -> None
346
347        Switches to the program with the given programID. The program must have
348        been loaded earlier. The special value 'off' can always be used to
349        switch off the traffic light.
350        """
351        self._setCmd(tc.TL_PROGRAM, tlsID, "s", programID)
352
353    def getNemaPhaseCalls(self, tlsID):
354        """getNemaPhaseCalls(string) -> tuple(string)
355        Get the vehicle calls for the phases.
356        The output is vehicle calls (coming from the detectors) for the phases.
357        """
358        vehCallStr = self.getParameter(tlsID, "NEMA.phaseCall")
359        return tuple(vehCallStr.split(","))
360
361    def setNemaSplits(self, tlsID, splits):
362        """setNemaSplits(string, list(string)) -> None
363        Set a new set of splits to the given NEMA-controller.
364        This function is only effective for NEMA type of controllers.
365        The new splits will be implemented in the next cycle of the control.
366        The value must be a space-separated list of 8 numbers with each number
367        being the time in seconds for NEMA-phases 1 to 8.
368        Time 0 must be used of the phase does not exists.
369        Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" (gives total cycle length of 80s)
370        """
371        if isinstance(splits, list):
372            splits = ' '.join(map(str, splits))
373        self.setParameter(tlsID, "NEMA.splits", splits)
374
375    def setNemaMaxGreens(self, tlsID, maxGreens):
376        """setNemaMaxGreens(string, list(string)) -> None
377        Set a new set of splits to the given NEMA-controller.
378        This function is only effective for NEMA type of controllers.
379        The new max green will be implemented in the next cycle of the control.
380        The value must be a space-separated list of 8 numbers with each number
381        being the time in seconds for NEMA-phases 1 to 8.
382        Time 0 must be used of the phase does not exists.
383        Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0"
384        """
385        if isinstance(maxGreens, list):
386            maxGreens = ' '.join(map(str, maxGreens))
387        self.setParameter(tlsID, "NEMA.maxGreens", maxGreens)
388
389    def setNemaCycleLength(self, tlsID, cycleLength):
390        """setNemaCycleLength(string, double) -> None
391        Set a new cycle length to the given NEMA-controller.
392        This function is only effective for NEMA type of controllers.
393        The new cycle length will be implemented in the next cycle of the control.
394        This function should be used with setNemaSplits or setNemaMaxGreen.
395        """
396        self.setParameter(tlsID, "NEMA.cycleLength", str(cycleLength))
397
398    def setNemaOffset(self, tlsID, offset):
399        """setNemaOffset(string, double) -> None
400        Set a new offset to the given NEMA-controller.
401        This function is only effective for NEMA type controllers when operating under coordinated mode.
402        The new offset will be implemented in the next cycle of the control by adjusting
403        the actual green time of the coordinated phase.
404        There is no transition implemented in the NEMA-controller for changing the offset.
405        It’s expected that the users will control the change of the offset in each cycle
406        to implement their own transition algorithm.
407        """
408        self.setParameter(tlsID, "NEMA.offset", str(offset))
409
410    def setPhaseDuration(self, tlsID, phaseDuration):
411        """setPhaseDuration(string, double) -> None
412
413        Set the remaining phase duration of the current phase in seconds.
414        This value has no effect on subsquent repetitions of this phase.
415        """
416        self._setCmd(tc.TL_PHASE_DURATION, tlsID, "d", phaseDuration)
417
418    @alias_param("logic", "tls")
419    def setProgramLogic(self, tlsID, logic):
420        """setProgramLogic(string, Logic) -> None
421
422        Sets a new program for the given tlsID from a Logic object.
423        See getAllProgramLogics which returns a tuple of Logic objects.
424        """
425        format = "tsiit"
426        values = [5, logic.programID, logic.type, logic.currentPhaseIndex, len(logic.phases)]
427        for p in logic.phases:
428            format += "tdsddt"
429            values += [6, p.duration, p.state, p.minDur, p.maxDur, len(p.next)]
430            for n in p.next:
431                format += "i"
432                values += [n]
433            format += "s"
434            values += [p.name]
435        # subparams
436        format += "t"
437        values += [len(logic.subParameter)]
438        for par in logic.subParameter.items():
439            format += "l"
440            values += [par]
441        self._setCmd(tc.TL_COMPLETE_PROGRAM_RYG, tlsID, format, *values)
442
443    setCompleteRedYellowGreenDefinition = deprecated("setCompleteRedYellowGreenDefinition")(setProgramLogic)
444
445    def addConstraint(self, tlsID, tripId, foeSignal, foeId, type=0, limit=0):
446        """addConstraint(string, string, string, string, int, int) -> None
447        Add the given constraint.
448        """
449        self._setCmd(tc.TL_CONSTRAINT_ADD, tlsID, "tsssii", 5, tripId, foeSignal, foeId, type, limit)
450
451    def swapConstraints(self, tlsID, tripId, foeSignal, foeId):
452        """swapConstraints(string, string, string, string) -> tuple(Constraint)
453        Reverse the given constraint and return a tuple of new constraints that
454        were created (by swapping) to avoid deadlock.
455        """
456        return self._getUniversal(tc.TL_CONSTRAINT_SWAP, tlsID, "tsss", 3, tripId, foeSignal, foeId)
457
458    def removeConstraints(self, tlsID, tripId, foeSignal, foeId):
459        """removeConstraints(string, string, string, string)
460        remove constraints with the given values. Any combination of inputs may
461        be set to "" to act as a wildcard filter """
462        self._setCmd(tc.TL_CONSTRAINT_REMOVE, tlsID, "tsss", 3, tripId, foeSignal, foeId)
463
464    def updateConstraints(self, vehID, tripId=""):
465        """getConstraints(string, string)
466        Removes all constraints that can no longer be met because the route of
467        vehID does not pass traffic light of the constraint with the given tripId.
468        This includes constraints on tripId as well as constraints where tripId
469        is the foeId.
470        If tripId is "", the current tripId of vehID is used.
471        """
472        self._setCmd(tc.TL_CONSTRAINT_UPDATE, vehID, "s", tripId)
TrafficLightDomain(name='trafficlight', deprecatedFor=None)
176    def __init__(self, name="trafficlight", deprecatedFor=None):
177        Domain.__init__(self, name, tc.CMD_GET_TL_VARIABLE, tc.CMD_SET_TL_VARIABLE,
178                        tc.CMD_SUBSCRIBE_TL_VARIABLE, tc.RESPONSE_SUBSCRIBE_TL_VARIABLE,
179                        tc.CMD_SUBSCRIBE_TL_CONTEXT, tc.RESPONSE_SUBSCRIBE_TL_CONTEXT,
180                        _RETURN_VALUE_FUNC, deprecatedFor)
def getRedYellowGreenState(self, tlsID):
182    def getRedYellowGreenState(self, tlsID):
183        """getRedYellowGreenState(string) -> string
184
185        Returns the named tl's state as a tuple of light definitions from
186        rugGyYoO, for red, yed-yellow, green, yellow, off, where lower case letters mean that the stream
187        has to decelerate.
188        """
189        return self._getUniversal(tc.TL_RED_YELLOW_GREEN_STATE, tlsID)

getRedYellowGreenState(string) -> string

Returns the named tl's state as a tuple of light definitions from rugGyYoO, for red, yed-yellow, green, yellow, off, where lower case letters mean that the stream has to decelerate.

def getAllProgramLogics(self, tlsID):
191    def getAllProgramLogics(self, tlsID):
192        """getAllProgramLogics(string) -> tuple(Logic)
193
194        Returns a tuple of Logic objects.
195        Each Logic encodes a traffic light program for the given tlsID.
196        """
197        return self._getUniversal(tc.TL_COMPLETE_DEFINITION_RYG, tlsID)

getAllProgramLogics(string) -> tuple(Logic)

Returns a tuple of Logic objects. Each Logic encodes a traffic light program for the given tlsID.

def getCompleteRedYellowGreenDefinition(self, tlsID):
191    def getAllProgramLogics(self, tlsID):
192        """getAllProgramLogics(string) -> tuple(Logic)
193
194        Returns a tuple of Logic objects.
195        Each Logic encodes a traffic light program for the given tlsID.
196        """
197        return self._getUniversal(tc.TL_COMPLETE_DEFINITION_RYG, tlsID)

getAllProgramLogics(string) -> tuple(Logic)

Returns a tuple of Logic objects. Each Logic encodes a traffic light program for the given tlsID.

def getControlledLanes(self, tlsID):
201    def getControlledLanes(self, tlsID):
202        """getControlledLanes(string) -> tuple(string)
203
204        Returns the tuple of lanes which are controlled by the named traffic light.
205        """
206        return self._getUniversal(tc.TL_CONTROLLED_LANES, tlsID)

getControlledLanes(string) -> tuple(string)

Returns the tuple of lanes which are controlled by the named traffic light.

def getProgram(self, tlsID):
216    def getProgram(self, tlsID):
217        """getProgram(string) -> string
218
219        Returns the id of the current program.
220        """
221        return self._getUniversal(tc.TL_CURRENT_PROGRAM, tlsID)

getProgram(string) -> string

Returns the id of the current program.

def getPhase(self, tlsID):
223    def getPhase(self, tlsID):
224        """getPhase(string) -> integer
225
226        Returns the index of the current phase within the list of all phases of
227        the current program.
228        """
229        return self._getUniversal(tc.TL_CURRENT_PHASE, tlsID)

getPhase(string) -> integer

Returns the index of the current phase within the list of all phases of the current program.

def getPhaseName(self, tlsID):
231    def getPhaseName(self, tlsID):
232        """getPhaseName(string) -> string
233        Returns the name of the current phase.
234        """
235        return self._getUniversal(tc.VAR_NAME, tlsID)

getPhaseName(string) -> string Returns the name of the current phase.

def getNextSwitch(self, tlsID):
237    def getNextSwitch(self, tlsID):
238        """getNextSwitch(string) -> double
239
240        Returns the absolute simulation time at which the traffic light is
241        schedule to switch to the next phase (in seconds).
242        """
243        return self._getUniversal(tc.TL_NEXT_SWITCH, tlsID)

getNextSwitch(string) -> double

Returns the absolute simulation time at which the traffic light is schedule to switch to the next phase (in seconds).

def getSpentDuration(self, tlsID):
245    def getSpentDuration(self, tlsID):
246        """getSpentDuration(string) -> double
247
248        Returns the time in seconds for which the current phase has been active
249        """
250        return self._getUniversal(tc.TL_SPENT_DURATION, tlsID)

getSpentDuration(string) -> double

Returns the time in seconds for which the current phase has been active

def getPhaseDuration(self, tlsID):
252    def getPhaseDuration(self, tlsID):
253        """getPhaseDuration(string) -> double
254
255        Returns the total duration of the current phase (in seconds). This value
256        is not affected by the elapsed or remaining duration of the current phase.
257        """
258        return self._getUniversal(tc.TL_PHASE_DURATION, tlsID)

getPhaseDuration(string) -> double

Returns the total duration of the current phase (in seconds). This value is not affected by the elapsed or remaining duration of the current phase.

def getServedPersonCount(self, tlsID, index):
260    def getServedPersonCount(self, tlsID, index):
261        """getServedPersonCount(string, int) -> int
262        Returns the number of persons that would be served in the given phase
263        """
264        return self._getUniversal(tc.VAR_PERSON_NUMBER, tlsID, "i", index)

getServedPersonCount(string, int) -> int Returns the number of persons that would be served in the given phase

def getBlockingVehicles(self, tlsID, linkIndex):
266    def getBlockingVehicles(self, tlsID, linkIndex):
267        """getBlockingVehicles(string, int) -> tuple(string)
268        Returns the tuple of vehicles that are blocking the subsequent block for
269        the given tls-linkIndex
270        """
271        return self._getUniversal(tc.TL_BLOCKING_VEHICLES, tlsID, "i", linkIndex)

getBlockingVehicles(string, int) -> tuple(string) Returns the tuple of vehicles that are blocking the subsequent block for the given tls-linkIndex

def getRivalVehicles(self, tlsID, linkIndex):
273    def getRivalVehicles(self, tlsID, linkIndex):
274        """getRivalVehicles(string, int) -> tuple(string)
275        Returns the tuple of vehicles that also wish to enter the subsequent block for
276        the given tls-linkIndex (regardless of priority)
277        """
278        return self._getUniversal(tc.TL_RIVAL_VEHICLES, tlsID, "i", linkIndex)

getRivalVehicles(string, int) -> tuple(string) Returns the tuple of vehicles that also wish to enter the subsequent block for the given tls-linkIndex (regardless of priority)

def getPriorityVehicles(self, tlsID, linkIndex):
280    def getPriorityVehicles(self, tlsID, linkIndex):
281        """getPriorityVehicles(string, int) -> tuple(string)
282        Returns the tuple of vehicles that also wish to enter the subsequent block for
283        the given tls-linkIndex (only those with higher priority)
284        """
285        return self._getUniversal(tc.TL_PRIORITY_VEHICLES, tlsID, "i", linkIndex)

getPriorityVehicles(string, int) -> tuple(string) Returns the tuple of vehicles that also wish to enter the subsequent block for the given tls-linkIndex (only those with higher priority)

def getConstraints(self, tlsID, tripId=''):
287    def getConstraints(self, tlsID, tripId=""):
288        """getConstraints(string, string) -> tuple(Constraint)
289        Returns the tuple of rail signal constraints for the given rail signal.
290        If tripId is not "", only constraints with the given tripId are
291        returned. Otherwise, all constraints are returned
292        """
293        return self._getUniversal(tc.TL_CONSTRAINT, tlsID, "s", tripId)

getConstraints(string, string) -> tuple(Constraint) Returns the tuple of rail signal constraints for the given rail signal. If tripId is not "", only constraints with the given tripId are returned. Otherwise, all constraints are returned

def getConstraintsByFoe(self, foeSignal, foeId=''):
295    def getConstraintsByFoe(self, foeSignal, foeId=""):
296        """getConstraintsByFoe(string, string) -> tuple(Constraint)
297        Returns the tuple of rail signal constraints that have the given rail
298        signal id as their foeSignal.
299        If foeId is not "", only constraints with the given foeId are
300        returned. Otherwise, all constraints are returned
301        """
302        return self._getUniversal(tc.TL_CONSTRAINT_BYFOE, foeSignal, "s", foeId)

getConstraintsByFoe(string, string) -> tuple(Constraint) Returns the tuple of rail signal constraints that have the given rail signal id as their foeSignal. If foeId is not "", only constraints with the given foeId are returned. Otherwise, all constraints are returned

def setRedYellowGreenState(self, tlsID, state):
304    def setRedYellowGreenState(self, tlsID, state):
305        """setRedYellowGreenState(string, string) -> None
306
307        Sets the named tl's state as a tuple of light definitions from
308        rugGyYuoO, for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has
309        to decelerate.
310        """
311        self._setCmd(tc.TL_RED_YELLOW_GREEN_STATE, tlsID, "s", state)

setRedYellowGreenState(string, string) -> None

Sets the named tl's state as a tuple of light definitions from rugGyYuoO, for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has to decelerate.

def setLinkState(self, tlsID, tlsLinkIndex, state):
313    def setLinkState(self, tlsID, tlsLinkIndex, state):
314        """setLinkState(string, string, int, string) -> None
315        Sets the state for the given tls and link index. The state must be one
316        of rRgGyYoOu for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has
317        to decelerate.
318        The link index is shown in the GUI when setting the appropriate junction
319        visualization option.
320        """
321        fullState = list(self.getRedYellowGreenState(tlsID))
322        if tlsLinkIndex >= len(fullState):
323            raise TraCIException("Invalid tlsLinkIndex %s for tls '%s' with maximum index %s." % (
324                tlsLinkIndex, tlsID, len(fullState) - 1))
325        else:
326            fullState[tlsLinkIndex] = state
327            self.setRedYellowGreenState(tlsID, ''.join(fullState))

setLinkState(string, string, int, string) -> None Sets the state for the given tls and link index. The state must be one of rRgGyYoOu for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has to decelerate. The link index is shown in the GUI when setting the appropriate junction visualization option.

def setPhase(self, tlsID, index):
329    def setPhase(self, tlsID, index):
330        """setPhase(string, integer) -> None
331
332        Switches to the phase with the given index in the list of all phases for
333        the current program.
334        """
335        self._setCmd(tc.TL_PHASE_INDEX, tlsID, "i", index)

setPhase(string, integer) -> None

Switches to the phase with the given index in the list of all phases for the current program.

def setPhaseName(self, tlsID, name):
337    def setPhaseName(self, tlsID, name):
338        """setPhaseName(string, string) -> None
339
340        Sets the name of the current phase within the current program
341        """
342        self._setCmd(tc.VAR_NAME, tlsID, "s", name)

setPhaseName(string, string) -> None

Sets the name of the current phase within the current program

def setProgram(self, tlsID, programID):
344    def setProgram(self, tlsID, programID):
345        """setProgram(string, string) -> None
346
347        Switches to the program with the given programID. The program must have
348        been loaded earlier. The special value 'off' can always be used to
349        switch off the traffic light.
350        """
351        self._setCmd(tc.TL_PROGRAM, tlsID, "s", programID)

setProgram(string, string) -> None

Switches to the program with the given programID. The program must have been loaded earlier. The special value 'off' can always be used to switch off the traffic light.

def getNemaPhaseCalls(self, tlsID):
353    def getNemaPhaseCalls(self, tlsID):
354        """getNemaPhaseCalls(string) -> tuple(string)
355        Get the vehicle calls for the phases.
356        The output is vehicle calls (coming from the detectors) for the phases.
357        """
358        vehCallStr = self.getParameter(tlsID, "NEMA.phaseCall")
359        return tuple(vehCallStr.split(","))

getNemaPhaseCalls(string) -> tuple(string) Get the vehicle calls for the phases. The output is vehicle calls (coming from the detectors) for the phases.

def setNemaSplits(self, tlsID, splits):
361    def setNemaSplits(self, tlsID, splits):
362        """setNemaSplits(string, list(string)) -> None
363        Set a new set of splits to the given NEMA-controller.
364        This function is only effective for NEMA type of controllers.
365        The new splits will be implemented in the next cycle of the control.
366        The value must be a space-separated list of 8 numbers with each number
367        being the time in seconds for NEMA-phases 1 to 8.
368        Time 0 must be used of the phase does not exists.
369        Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" (gives total cycle length of 80s)
370        """
371        if isinstance(splits, list):
372            splits = ' '.join(map(str, splits))
373        self.setParameter(tlsID, "NEMA.splits", splits)

setNemaSplits(string, list(string)) -> None Set a new set of splits to the given NEMA-controller. This function is only effective for NEMA type of controllers. The new splits will be implemented in the next cycle of the control. The value must be a space-separated list of 8 numbers with each number being the time in seconds for NEMA-phases 1 to 8. Time 0 must be used of the phase does not exists. Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" (gives total cycle length of 80s)

def setNemaMaxGreens(self, tlsID, maxGreens):
375    def setNemaMaxGreens(self, tlsID, maxGreens):
376        """setNemaMaxGreens(string, list(string)) -> None
377        Set a new set of splits to the given NEMA-controller.
378        This function is only effective for NEMA type of controllers.
379        The new max green will be implemented in the next cycle of the control.
380        The value must be a space-separated list of 8 numbers with each number
381        being the time in seconds for NEMA-phases 1 to 8.
382        Time 0 must be used of the phase does not exists.
383        Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0"
384        """
385        if isinstance(maxGreens, list):
386            maxGreens = ' '.join(map(str, maxGreens))
387        self.setParameter(tlsID, "NEMA.maxGreens", maxGreens)

setNemaMaxGreens(string, list(string)) -> None Set a new set of splits to the given NEMA-controller. This function is only effective for NEMA type of controllers. The new max green will be implemented in the next cycle of the control. The value must be a space-separated list of 8 numbers with each number being the time in seconds for NEMA-phases 1 to 8. Time 0 must be used of the phase does not exists. Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0"

def setNemaCycleLength(self, tlsID, cycleLength):
389    def setNemaCycleLength(self, tlsID, cycleLength):
390        """setNemaCycleLength(string, double) -> None
391        Set a new cycle length to the given NEMA-controller.
392        This function is only effective for NEMA type of controllers.
393        The new cycle length will be implemented in the next cycle of the control.
394        This function should be used with setNemaSplits or setNemaMaxGreen.
395        """
396        self.setParameter(tlsID, "NEMA.cycleLength", str(cycleLength))

setNemaCycleLength(string, double) -> None Set a new cycle length to the given NEMA-controller. This function is only effective for NEMA type of controllers. The new cycle length will be implemented in the next cycle of the control. This function should be used with setNemaSplits or setNemaMaxGreen.

def setNemaOffset(self, tlsID, offset):
398    def setNemaOffset(self, tlsID, offset):
399        """setNemaOffset(string, double) -> None
400        Set a new offset to the given NEMA-controller.
401        This function is only effective for NEMA type controllers when operating under coordinated mode.
402        The new offset will be implemented in the next cycle of the control by adjusting
403        the actual green time of the coordinated phase.
404        There is no transition implemented in the NEMA-controller for changing the offset.
405        It’s expected that the users will control the change of the offset in each cycle
406        to implement their own transition algorithm.
407        """
408        self.setParameter(tlsID, "NEMA.offset", str(offset))

setNemaOffset(string, double) -> None Set a new offset to the given NEMA-controller. This function is only effective for NEMA type controllers when operating under coordinated mode. The new offset will be implemented in the next cycle of the control by adjusting the actual green time of the coordinated phase. There is no transition implemented in the NEMA-controller for changing the offset. It’s expected that the users will control the change of the offset in each cycle to implement their own transition algorithm.

def setPhaseDuration(self, tlsID, phaseDuration):
410    def setPhaseDuration(self, tlsID, phaseDuration):
411        """setPhaseDuration(string, double) -> None
412
413        Set the remaining phase duration of the current phase in seconds.
414        This value has no effect on subsquent repetitions of this phase.
415        """
416        self._setCmd(tc.TL_PHASE_DURATION, tlsID, "d", phaseDuration)

setPhaseDuration(string, double) -> None

Set the remaining phase duration of the current phase in seconds. This value has no effect on subsquent repetitions of this phase.

@alias_param('logic', 'tls')
def setProgramLogic(self, tlsID, logic):
418    @alias_param("logic", "tls")
419    def setProgramLogic(self, tlsID, logic):
420        """setProgramLogic(string, Logic) -> None
421
422        Sets a new program for the given tlsID from a Logic object.
423        See getAllProgramLogics which returns a tuple of Logic objects.
424        """
425        format = "tsiit"
426        values = [5, logic.programID, logic.type, logic.currentPhaseIndex, len(logic.phases)]
427        for p in logic.phases:
428            format += "tdsddt"
429            values += [6, p.duration, p.state, p.minDur, p.maxDur, len(p.next)]
430            for n in p.next:
431                format += "i"
432                values += [n]
433            format += "s"
434            values += [p.name]
435        # subparams
436        format += "t"
437        values += [len(logic.subParameter)]
438        for par in logic.subParameter.items():
439            format += "l"
440            values += [par]
441        self._setCmd(tc.TL_COMPLETE_PROGRAM_RYG, tlsID, format, *values)

setProgramLogic(string, Logic) -> None

Sets a new program for the given tlsID from a Logic object. See getAllProgramLogics which returns a tuple of Logic objects.

@alias_param('logic', 'tls')
def setCompleteRedYellowGreenDefinition(self, tlsID, logic):
418    @alias_param("logic", "tls")
419    def setProgramLogic(self, tlsID, logic):
420        """setProgramLogic(string, Logic) -> None
421
422        Sets a new program for the given tlsID from a Logic object.
423        See getAllProgramLogics which returns a tuple of Logic objects.
424        """
425        format = "tsiit"
426        values = [5, logic.programID, logic.type, logic.currentPhaseIndex, len(logic.phases)]
427        for p in logic.phases:
428            format += "tdsddt"
429            values += [6, p.duration, p.state, p.minDur, p.maxDur, len(p.next)]
430            for n in p.next:
431                format += "i"
432                values += [n]
433            format += "s"
434            values += [p.name]
435        # subparams
436        format += "t"
437        values += [len(logic.subParameter)]
438        for par in logic.subParameter.items():
439            format += "l"
440            values += [par]
441        self._setCmd(tc.TL_COMPLETE_PROGRAM_RYG, tlsID, format, *values)

setProgramLogic(string, Logic) -> None

Sets a new program for the given tlsID from a Logic object. See getAllProgramLogics which returns a tuple of Logic objects.

def addConstraint(self, tlsID, tripId, foeSignal, foeId, type=0, limit=0):
445    def addConstraint(self, tlsID, tripId, foeSignal, foeId, type=0, limit=0):
446        """addConstraint(string, string, string, string, int, int) -> None
447        Add the given constraint.
448        """
449        self._setCmd(tc.TL_CONSTRAINT_ADD, tlsID, "tsssii", 5, tripId, foeSignal, foeId, type, limit)

addConstraint(string, string, string, string, int, int) -> None Add the given constraint.

def swapConstraints(self, tlsID, tripId, foeSignal, foeId):
451    def swapConstraints(self, tlsID, tripId, foeSignal, foeId):
452        """swapConstraints(string, string, string, string) -> tuple(Constraint)
453        Reverse the given constraint and return a tuple of new constraints that
454        were created (by swapping) to avoid deadlock.
455        """
456        return self._getUniversal(tc.TL_CONSTRAINT_SWAP, tlsID, "tsss", 3, tripId, foeSignal, foeId)

swapConstraints(string, string, string, string) -> tuple(Constraint) Reverse the given constraint and return a tuple of new constraints that were created (by swapping) to avoid deadlock.

def removeConstraints(self, tlsID, tripId, foeSignal, foeId):
458    def removeConstraints(self, tlsID, tripId, foeSignal, foeId):
459        """removeConstraints(string, string, string, string)
460        remove constraints with the given values. Any combination of inputs may
461        be set to "" to act as a wildcard filter """
462        self._setCmd(tc.TL_CONSTRAINT_REMOVE, tlsID, "tsss", 3, tripId, foeSignal, foeId)

removeConstraints(string, string, string, string) remove constraints with the given values. Any combination of inputs may be set to "" to act as a wildcard filter

def updateConstraints(self, vehID, tripId=''):
464    def updateConstraints(self, vehID, tripId=""):
465        """getConstraints(string, string)
466        Removes all constraints that can no longer be met because the route of
467        vehID does not pass traffic light of the constraint with the given tripId.
468        This includes constraints on tripId as well as constraints where tripId
469        is the foeId.
470        If tripId is "", the current tripId of vehID is used.
471        """
472        self._setCmd(tc.TL_CONSTRAINT_UPDATE, vehID, "s", tripId)

getConstraints(string, string) Removes all constraints that can no longer be met because the route of vehID does not pass traffic light of the constraint with the given tripId. This includes constraints on tripId as well as constraints where tripId is the foeId. If tripId is "", the current tripId of vehID is used.

class TrafficLightDomain.Phase:
111class Phase:
112
113    def __init__(self, duration, state, minDur=None, maxDur=None, next=tuple(), name="", earlyTarget=""):
114        """
115        Constructs a traffic light phase
116        duration (float): the duration of the phase in seconds
117        state (string): the state codes for each controlled link
118        minDur (float): the minimum duration (ignored by static tls)
119        maxDur (float): the maximum duration (ignored by static tls)
120        next (intList): possible succesor phase (optional)
121        name (string): the name of the phase
122        earlyTarget (string): early switching to phase with the given index(es)
123        """
124        self.duration = duration
125        self.state = state
126        # minimum and maximum duration (only for actuated tls)
127        self.minDur = minDur if minDur is not None else duration
128        self.maxDur = maxDur if maxDur is not None else duration
129        self.next = next
130        self.name = name
131        self.earlyTarget = earlyTarget
132
133    def __repr__(self):
134        name = (", name='%s'" % self.name) if self.name else ""
135        next = (", next='%s'" % str(self.next)) if self.next else ""
136        earlyTarget = (", earlyTarget='%s'" % self.earlyTarget) if self.earlyTarget else ""
137        return ("Phase(duration=%s, state='%s', minDur=%s, maxDur=%s%s%s%s)"
138                % (self.duration, self.state, self.minDur, self.maxDur, name, next, earlyTarget))
class TrafficLightDomain.Logic:
26class Logic:
27
28    def __init__(self, programID, type, currentPhaseIndex, phases=None, subParameter=None):
29        """
30        Constructs a new Logic object with the following arguments:
31          programID (string) - arbitrary
32          type (integer) - one of
33             tc.TRAFFICLIGHT_TYPE_STATIC
34             tc.TRAFFICLIGHT_TYPE_ACTUATED
35             tc.TRAFFICLIGHT_TYPE_NEMA
36             tc.TRAFFICLIGHT_TYPE_DELAYBASED
37          currentPhaseIndex (integer) - must be in range [0, len(phases) - 1]
38          phases (list of Phase objects)
39          subParameter (dictl) - corresponds to generic <param> objects
40        """
41        self.programID = programID
42        self.type = type
43        self.currentPhaseIndex = currentPhaseIndex
44        self.phases = phases if phases is not None else []
45        self.subParameter = subParameter if subParameter is not None else {}
46
47    def getPhases(self):
48        return self.phases
49
50    def getSubID(self):
51        return self.programID
52
53    def getType(self):
54        return self.type
55
56    def getParameters(self):
57        return self.subParameter
58
59    def getParameter(self, key, default=None):
60        return self.subParameter.get(key, default)
61
62    def __repr__(self):
63        return ("Logic(programID='%s', type=%s, currentPhaseIndex=%s, phases=%s, subParameter=%s)" %
64                (self.programID, self.type, self.currentPhaseIndex, self.phases, self.subParameter))