traci._trafficlight
1# -*- coding: utf-8 -*- 2# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo 3# Copyright (C) 2011-2026 German Aerospace Center (DLR) and others. 4# This program and the accompanying materials are made available under the 5# terms of the Eclipse Public License 2.0 which is available at 6# https://www.eclipse.org/legal/epl-2.0/ 7# This Source Code may also be made available under the following Secondary 8# Licenses when the conditions for such availability set forth in the Eclipse 9# Public License 2.0 are satisfied: GNU General Public License, version 2 10# or later which is available at 11# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html 12# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later 13 14# @file _trafficlight.py 15# @author Michael Behrisch 16# @date 2011-03-16 17 18from __future__ import absolute_import 19from sumolib.net import Phase 20from .domain import Domain 21from . import constants as tc 22from .exceptions import TraCIException, deprecated, alias_param 23 24 25class Logic: 26 27 def __init__(self, programID, type, currentPhaseIndex, phases=None, subParameter=None): 28 """ 29 Constructs a new Logic object with the following arguments: 30 programID (string) - arbitrary 31 type (integer) - one of 32 tc.TRAFFICLIGHT_TYPE_STATIC 33 tc.TRAFFICLIGHT_TYPE_ACTUATED 34 tc.TRAFFICLIGHT_TYPE_NEMA 35 tc.TRAFFICLIGHT_TYPE_DELAYBASED 36 currentPhaseIndex (integer) - must be in range [0, len(phases) - 1] 37 phases (list of Phase objects) 38 subParameter (dictl) - corresponds to generic <param> objects 39 """ 40 self.programID = programID 41 self.type = type 42 self.currentPhaseIndex = currentPhaseIndex 43 self.phases = phases if phases is not None else [] 44 self.subParameter = subParameter if subParameter is not None else {} 45 46 def getPhases(self): 47 return self.phases 48 49 def getSubID(self): 50 return self.programID 51 52 def getType(self): 53 return self.type 54 55 def getParameters(self): 56 return self.subParameter 57 58 def getParameter(self, key, default=None): 59 return self.subParameter.get(key, default) 60 61 def __repr__(self): 62 return ("Logic(programID='%s', type=%s, currentPhaseIndex=%s, phases=%s, subParameter=%s)" % 63 (self.programID, self.type, self.currentPhaseIndex, self.phases, self.subParameter)) 64 65 66def _readLogics(result): 67 numLogics = result.readInt() 68 logics = [] 69 for _ in range(numLogics): 70 result.readCompound(5) 71 programID = result.readTypedString() 72 type = result.readTypedInt() 73 currentPhaseIndex = result.readTypedInt() 74 numPhases = result.readCompound() 75 phases = [] 76 for __ in range(numPhases): 77 result.readCompound(6) 78 duration = result.readTypedDouble() 79 state = result.readTypedString() 80 minDur = result.readTypedDouble() 81 maxDur = result.readTypedDouble() 82 numNext = result.readCompound() 83 next = tuple([result.readTypedInt() for ___ in range(numNext)]) 84 name = result.readTypedString() 85 phases.append(Phase(duration, state, minDur, maxDur, next, name)) 86 logic = Logic(programID, type, currentPhaseIndex, tuple(phases)) 87 numParams = result.readCompound() 88 for __ in range(numParams): 89 key, value = result.readTypedStringList() 90 logic.subParameter[key] = value 91 logics.append(logic) 92 return tuple(logics) 93 94 95class Constraint: 96 def __init__(self, signalId, tripId, foeId, foeSignal, limit, type, mustWait, active=True, param={}): 97 self.signalId = signalId 98 self.tripId = tripId 99 self.foeId = foeId 100 self.foeSignal = foeSignal 101 self.limit = limit 102 self.type = type 103 self.mustWait = mustWait 104 self.active = active 105 self.param = param 106 107 def getParameters(self): 108 return self.param 109 110 def getParameter(self, key, default=None): 111 return self.param.get(key, default) 112 113 def __repr__(self): 114 param = "" 115 if self.param: 116 kvs = ["%s=%s" % (k, self.param[k]) for k in sorted(self.param.keys())] 117 param = " param=%s" % '|'.join(kvs) 118 return ("Constraint(signalId=%s tripId=%s, foeId=%s, foeSignal=%s, limit=%s, type=%s, mustWait=%s, active=%s%s)" % # noqa 119 (self.signalId, self.tripId, self.foeId, self.foeSignal, 120 self.limit, self.type, self.mustWait, self.active, param)) 121 122 123def _readLinks(result): 124 result.readLength() 125 numSignals = result.readInt() # Length 126 signals = [] 127 for _ in range(numSignals): 128 # Type of Number of Controlled Links 129 result.read("!B") 130 # Number of Controlled Links 131 nbControlledLinks = result.read("!i")[0] 132 controlledLinks = [] 133 for __ in range(nbControlledLinks): 134 result.read("!B") # Type of Link j 135 link = result.readStringList() # Link j 136 controlledLinks.append(link) 137 signals.append(tuple(controlledLinks)) 138 return tuple(signals) 139 140 141def _readConstraints(result): 142 result.readLength() 143 num = result.readInt() # Length 144 constraints = [] 145 for _ in range(num): 146 signalId = result.readTypedString() 147 tripId = result.readTypedString() 148 foeId = result.readTypedString() 149 foeSignal = result.readTypedString() 150 limit = result.readTypedInt() 151 type = result.readTypedInt() 152 mustWait = bool(result.readTypedByte()) 153 active = bool(result.readTypedByte()) 154 paramItems = result.readTypedStringList() 155 param = {} 156 for i in range(0, len(paramItems), 2): 157 param[paramItems[i]] = paramItems[i + 1] 158 constraints.append(Constraint(signalId, tripId, foeId, foeSignal, limit, type, mustWait, active, param)) 159 return tuple(constraints) 160 161 162_RETURN_VALUE_FUNC = {tc.TL_COMPLETE_DEFINITION_RYG: _readLogics, 163 tc.TL_CONTROLLED_LINKS: _readLinks, 164 tc.TL_CONSTRAINT: _readConstraints, 165 tc.TL_CONSTRAINT_BYFOE: _readConstraints, 166 tc.TL_CONSTRAINT_SWAP: _readConstraints, 167 } 168 169 170class TrafficLightDomain(Domain): 171 172 Phase = Phase 173 Logic = Logic 174 175 def __init__(self, name="trafficlight", deprecatedFor=None): 176 Domain.__init__(self, name, tc.CMD_GET_TL_VARIABLE, tc.CMD_SET_TL_VARIABLE, 177 tc.CMD_SUBSCRIBE_TL_VARIABLE, tc.RESPONSE_SUBSCRIBE_TL_VARIABLE, 178 tc.CMD_SUBSCRIBE_TL_CONTEXT, tc.RESPONSE_SUBSCRIBE_TL_CONTEXT, 179 _RETURN_VALUE_FUNC, deprecatedFor) 180 181 def getRedYellowGreenState(self, tlsID): 182 """getRedYellowGreenState(string) -> string 183 184 Returns the named tl's state as a tuple of light definitions from 185 rugGyYoO, for red, yed-yellow, green, yellow, off, where lower case letters mean that the stream 186 has to decelerate. 187 """ 188 return self._getUniversal(tc.TL_RED_YELLOW_GREEN_STATE, tlsID) 189 190 def getAllProgramLogics(self, tlsID): 191 """getAllProgramLogics(string) -> tuple(Logic) 192 193 Returns a tuple of Logic objects. 194 Each Logic encodes a traffic light program for the given tlsID. 195 """ 196 return self._getUniversal(tc.TL_COMPLETE_DEFINITION_RYG, tlsID) 197 198 getCompleteRedYellowGreenDefinition = deprecated("getCompleteRedYellowGreenDefinition")(getAllProgramLogics) 199 200 def getControlledLanes(self, tlsID): 201 """getControlledLanes(string) -> tuple(string) 202 203 Returns the tuple of lanes which are controlled by the named traffic light. 204 """ 205 return self._getUniversal(tc.TL_CONTROLLED_LANES, tlsID) 206 207 def getControlledLinks(self, tlsID): 208 """getControlledLinks(string) -> tuple(tuple(tuple(string))) 209 210 Returns the links controlled by the traffic light, sorted by the signal index and described by giving 211 the incoming, outgoing, and via lane. 212 """ 213 return self._getUniversal(tc.TL_CONTROLLED_LINKS, tlsID) 214 215 def getProgram(self, tlsID): 216 """getProgram(string) -> string 217 218 Returns the id of the current program. 219 """ 220 return self._getUniversal(tc.TL_CURRENT_PROGRAM, tlsID) 221 222 def getPhase(self, tlsID): 223 """getPhase(string) -> integer 224 225 Returns the index of the current phase within the list of all phases of 226 the current program. 227 """ 228 return self._getUniversal(tc.TL_CURRENT_PHASE, tlsID) 229 230 def getPhaseName(self, tlsID): 231 """getPhaseName(string) -> string 232 Returns the name of the current phase. 233 """ 234 return self._getUniversal(tc.VAR_NAME, tlsID) 235 236 def getNextSwitch(self, tlsID): 237 """getNextSwitch(string) -> double 238 239 Returns the absolute simulation time at which the traffic light is 240 schedule to switch to the next phase (in seconds). 241 """ 242 return self._getUniversal(tc.TL_NEXT_SWITCH, tlsID) 243 244 def getSpentDuration(self, tlsID): 245 """getSpentDuration(string) -> double 246 247 Returns the time in seconds for which the current phase has been active 248 """ 249 return self._getUniversal(tc.TL_SPENT_DURATION, tlsID) 250 251 def getPhaseDuration(self, tlsID): 252 """getPhaseDuration(string) -> double 253 254 Returns the total duration of the current phase (in seconds). This value 255 is not affected by the elapsed or remaining duration of the current phase. 256 """ 257 return self._getUniversal(tc.TL_PHASE_DURATION, tlsID) 258 259 def getServedPersonCount(self, tlsID, index): 260 """getServedPersonCount(string, int) -> int 261 Returns the number of persons that would be served in the given phase 262 """ 263 return self._getUniversal(tc.VAR_PERSON_NUMBER, tlsID, "i", index) 264 265 def getBlockingVehicles(self, tlsID, linkIndex): 266 """getBlockingVehicles(string, int) -> tuple(string) 267 Returns the tuple of vehicles that are blocking the subsequent block for 268 the given tls-linkIndex 269 """ 270 return self._getUniversal(tc.TL_BLOCKING_VEHICLES, tlsID, "i", linkIndex) 271 272 def getRivalVehicles(self, tlsID, linkIndex): 273 """getRivalVehicles(string, int) -> tuple(string) 274 Returns the tuple of vehicles that also wish to enter the subsequent block for 275 the given tls-linkIndex (regardless of priority) 276 """ 277 return self._getUniversal(tc.TL_RIVAL_VEHICLES, tlsID, "i", linkIndex) 278 279 def getPriorityVehicles(self, tlsID, linkIndex): 280 """getPriorityVehicles(string, int) -> tuple(string) 281 Returns the tuple of vehicles that also wish to enter the subsequent block for 282 the given tls-linkIndex (only those with higher priority) 283 """ 284 return self._getUniversal(tc.TL_PRIORITY_VEHICLES, tlsID, "i", linkIndex) 285 286 def getConstraints(self, tlsID, tripId=""): 287 """getConstraints(string, string) -> tuple(Constraint) 288 Returns the tuple of rail signal constraints for the given rail signal. 289 If tripId is not "", only constraints with the given tripId are 290 returned. Otherwise, all constraints are returned 291 """ 292 return self._getUniversal(tc.TL_CONSTRAINT, tlsID, "s", tripId) 293 294 def getConstraintsByFoe(self, foeSignal, foeId=""): 295 """getConstraintsByFoe(string, string) -> tuple(Constraint) 296 Returns the tuple of rail signal constraints that have the given rail 297 signal id as their foeSignal. 298 If foeId is not "", only constraints with the given foeId are 299 returned. Otherwise, all constraints are returned 300 """ 301 return self._getUniversal(tc.TL_CONSTRAINT_BYFOE, foeSignal, "s", foeId) 302 303 def setRedYellowGreenState(self, tlsID, state): 304 """setRedYellowGreenState(string, string) -> None 305 306 Sets the named tl's state as a tuple of light definitions from 307 rugGyYuoO, for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has 308 to decelerate. 309 """ 310 self._setCmd(tc.TL_RED_YELLOW_GREEN_STATE, tlsID, "s", state) 311 312 def setLinkState(self, tlsID, tlsLinkIndex, state): 313 """setLinkState(string, string, int, string) -> None 314 Sets the state for the given tls and link index. The state must be one 315 of rRgGyYoOu for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has 316 to decelerate. 317 The link index is shown in the GUI when setting the appropriate junction 318 visualization option. 319 """ 320 fullState = list(self.getRedYellowGreenState(tlsID)) 321 if tlsLinkIndex >= len(fullState): 322 raise TraCIException("Invalid tlsLinkIndex %s for tls '%s' with maximum index %s." % ( 323 tlsLinkIndex, tlsID, len(fullState) - 1)) 324 else: 325 fullState[tlsLinkIndex] = state 326 self.setRedYellowGreenState(tlsID, ''.join(fullState)) 327 328 def setPhase(self, tlsID, index): 329 """setPhase(string, integer) -> None 330 331 Switches to the phase with the given index in the list of all phases for 332 the current program. 333 """ 334 self._setCmd(tc.TL_PHASE_INDEX, tlsID, "i", index) 335 336 def setPhaseName(self, tlsID, name): 337 """setPhaseName(string, string) -> None 338 339 Sets the name of the current phase within the current program 340 """ 341 self._setCmd(tc.VAR_NAME, tlsID, "s", name) 342 343 def setProgram(self, tlsID, programID): 344 """setProgram(string, string) -> None 345 346 Switches to the program with the given programID. The program must have 347 been loaded earlier. The special value 'off' can always be used to 348 switch off the traffic light. 349 """ 350 self._setCmd(tc.TL_PROGRAM, tlsID, "s", programID) 351 352 def getNemaPhaseCalls(self, tlsID): 353 """getNemaPhaseCalls(string) -> tuple(string) 354 Get the vehicle calls for the phases. 355 The output is vehicle calls (coming from the detectors) for the phases. 356 """ 357 vehCallStr = self.getParameter(tlsID, "NEMA.phaseCall") 358 return tuple(vehCallStr.split(",")) 359 360 def setNemaSplits(self, tlsID, splits): 361 """setNemaSplits(string, list(string)) -> None 362 Set a new set of splits to the given NEMA-controller. 363 This function is only effective for NEMA type of controllers. 364 The new splits will be implemented in the next cycle of the control. 365 The value must be a space-separated list of 8 numbers with each number 366 being the time in seconds for NEMA-phases 1 to 8. 367 Time 0 must be used of the phase does not exists. 368 Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" (gives total cycle length of 80s) 369 """ 370 if isinstance(splits, list): 371 splits = ' '.join(map(str, splits)) 372 self.setParameter(tlsID, "NEMA.splits", splits) 373 374 def setNemaMaxGreens(self, tlsID, maxGreens): 375 """setNemaMaxGreens(string, list(string)) -> None 376 Set a new set of splits to the given NEMA-controller. 377 This function is only effective for NEMA type of controllers. 378 The new max green will be implemented in the next cycle of the control. 379 The value must be a space-separated list of 8 numbers with each number 380 being the time in seconds for NEMA-phases 1 to 8. 381 Time 0 must be used of the phase does not exists. 382 Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" 383 """ 384 if isinstance(maxGreens, list): 385 maxGreens = ' '.join(map(str, maxGreens)) 386 self.setParameter(tlsID, "NEMA.maxGreens", maxGreens) 387 388 def setNemaCycleLength(self, tlsID, cycleLength): 389 """setNemaCycleLength(string, double) -> None 390 Set a new cycle length to the given NEMA-controller. 391 This function is only effective for NEMA type of controllers. 392 The new cycle length will be implemented in the next cycle of the control. 393 This function should be used with setNemaSplits or setNemaMaxGreen. 394 """ 395 self.setParameter(tlsID, "NEMA.cycleLength", str(cycleLength)) 396 397 def setNemaOffset(self, tlsID, offset): 398 """setNemaOffset(string, double) -> None 399 Set a new offset to the given NEMA-controller. 400 This function is only effective for NEMA type controllers when operating under coordinated mode. 401 The new offset will be implemented in the next cycle of the control by adjusting 402 the actual green time of the coordinated phase. 403 There is no transition implemented in the NEMA-controller for changing the offset. 404 It’s expected that the users will control the change of the offset in each cycle 405 to implement their own transition algorithm. 406 """ 407 self.setParameter(tlsID, "NEMA.offset", str(offset)) 408 409 def setPhaseDuration(self, tlsID, phaseDuration): 410 """setPhaseDuration(string, double) -> None 411 412 Set the remaining phase duration of the current phase in seconds. 413 This value has no effect on subsquent repetitions of this phase. 414 """ 415 self._setCmd(tc.TL_PHASE_DURATION, tlsID, "d", phaseDuration) 416 417 @alias_param("logic", "tls") 418 def setProgramLogic(self, tlsID, logic): 419 """setProgramLogic(string, Logic) -> None 420 421 Sets a new program for the given tlsID from a Logic object. 422 See getAllProgramLogics which returns a tuple of Logic objects. 423 """ 424 format = "tsiit" 425 values = [5, logic.programID, logic.type, logic.currentPhaseIndex, len(logic.phases)] 426 for p in logic.phases: 427 format += "tdsddt" 428 values += [6, p.duration, p.state, p.minDur, p.maxDur, len(p.next)] 429 for n in p.next: 430 format += "i" 431 values += [n] 432 format += "s" 433 values += [p.name] 434 # subparams 435 format += "t" 436 values += [len(logic.subParameter)] 437 for par in logic.subParameter.items(): 438 format += "l" 439 values += [par] 440 self._setCmd(tc.TL_COMPLETE_PROGRAM_RYG, tlsID, format, *values) 441 442 setCompleteRedYellowGreenDefinition = deprecated("setCompleteRedYellowGreenDefinition")(setProgramLogic) 443 444 def addConstraint(self, tlsID, tripId, foeSignal, foeId, type=0, limit=0): 445 """addConstraint(string, string, string, string, int, int) -> None 446 Add the given constraint. 447 """ 448 self._setCmd(tc.TL_CONSTRAINT_ADD, tlsID, "tsssii", 5, tripId, foeSignal, foeId, type, limit) 449 450 def swapConstraints(self, tlsID, tripId, foeSignal, foeId): 451 """swapConstraints(string, string, string, string) -> tuple(Constraint) 452 Reverse the given constraint and return a tuple of new constraints that 453 were created (by swapping) to avoid deadlock. 454 """ 455 return self._getUniversal(tc.TL_CONSTRAINT_SWAP, tlsID, "tsss", 3, tripId, foeSignal, foeId) 456 457 def removeConstraints(self, tlsID, tripId, foeSignal, foeId): 458 """removeConstraints(string, string, string, string) 459 remove constraints with the given values. Any combination of inputs may 460 be set to "" to act as a wildcard filter """ 461 self._setCmd(tc.TL_CONSTRAINT_REMOVE, tlsID, "tsss", 3, tripId, foeSignal, foeId) 462 463 def updateConstraints(self, vehID, tripId=""): 464 """getConstraints(string, string) 465 Removes all constraints that can no longer be met because the route of 466 vehID does not pass traffic light of the constraint with the given tripId. 467 This includes constraints on tripId as well as constraints where tripId 468 is the foeId. 469 If tripId is "", the current tripId of vehID is used. 470 """ 471 self._setCmd(tc.TL_CONSTRAINT_UPDATE, vehID, "s", tripId)
26class Logic: 27 28 def __init__(self, programID, type, currentPhaseIndex, phases=None, subParameter=None): 29 """ 30 Constructs a new Logic object with the following arguments: 31 programID (string) - arbitrary 32 type (integer) - one of 33 tc.TRAFFICLIGHT_TYPE_STATIC 34 tc.TRAFFICLIGHT_TYPE_ACTUATED 35 tc.TRAFFICLIGHT_TYPE_NEMA 36 tc.TRAFFICLIGHT_TYPE_DELAYBASED 37 currentPhaseIndex (integer) - must be in range [0, len(phases) - 1] 38 phases (list of Phase objects) 39 subParameter (dictl) - corresponds to generic <param> objects 40 """ 41 self.programID = programID 42 self.type = type 43 self.currentPhaseIndex = currentPhaseIndex 44 self.phases = phases if phases is not None else [] 45 self.subParameter = subParameter if subParameter is not None else {} 46 47 def getPhases(self): 48 return self.phases 49 50 def getSubID(self): 51 return self.programID 52 53 def getType(self): 54 return self.type 55 56 def getParameters(self): 57 return self.subParameter 58 59 def getParameter(self, key, default=None): 60 return self.subParameter.get(key, default) 61 62 def __repr__(self): 63 return ("Logic(programID='%s', type=%s, currentPhaseIndex=%s, phases=%s, subParameter=%s)" % 64 (self.programID, self.type, self.currentPhaseIndex, self.phases, self.subParameter))
28 def __init__(self, programID, type, currentPhaseIndex, phases=None, subParameter=None): 29 """ 30 Constructs a new Logic object with the following arguments: 31 programID (string) - arbitrary 32 type (integer) - one of 33 tc.TRAFFICLIGHT_TYPE_STATIC 34 tc.TRAFFICLIGHT_TYPE_ACTUATED 35 tc.TRAFFICLIGHT_TYPE_NEMA 36 tc.TRAFFICLIGHT_TYPE_DELAYBASED 37 currentPhaseIndex (integer) - must be in range [0, len(phases) - 1] 38 phases (list of Phase objects) 39 subParameter (dictl) - corresponds to generic <param> objects 40 """ 41 self.programID = programID 42 self.type = type 43 self.currentPhaseIndex = currentPhaseIndex 44 self.phases = phases if phases is not None else [] 45 self.subParameter = subParameter if subParameter is not None else {}
Constructs a new Logic object with the following arguments: programID (string) - arbitrary type (integer) - one of tc.TRAFFICLIGHT_TYPE_STATIC tc.TRAFFICLIGHT_TYPE_ACTUATED tc.TRAFFICLIGHT_TYPE_NEMA tc.TRAFFICLIGHT_TYPE_DELAYBASED currentPhaseIndex (integer) - must be in range [0, len(phases) - 1] phases (list of Phase objects) subParameter (dictl) - corresponds to generic objects
96class Constraint: 97 def __init__(self, signalId, tripId, foeId, foeSignal, limit, type, mustWait, active=True, param={}): 98 self.signalId = signalId 99 self.tripId = tripId 100 self.foeId = foeId 101 self.foeSignal = foeSignal 102 self.limit = limit 103 self.type = type 104 self.mustWait = mustWait 105 self.active = active 106 self.param = param 107 108 def getParameters(self): 109 return self.param 110 111 def getParameter(self, key, default=None): 112 return self.param.get(key, default) 113 114 def __repr__(self): 115 param = "" 116 if self.param: 117 kvs = ["%s=%s" % (k, self.param[k]) for k in sorted(self.param.keys())] 118 param = " param=%s" % '|'.join(kvs) 119 return ("Constraint(signalId=%s tripId=%s, foeId=%s, foeSignal=%s, limit=%s, type=%s, mustWait=%s, active=%s%s)" % # noqa 120 (self.signalId, self.tripId, self.foeId, self.foeSignal, 121 self.limit, self.type, self.mustWait, self.active, param))
97 def __init__(self, signalId, tripId, foeId, foeSignal, limit, type, mustWait, active=True, param={}): 98 self.signalId = signalId 99 self.tripId = tripId 100 self.foeId = foeId 101 self.foeSignal = foeSignal 102 self.limit = limit 103 self.type = type 104 self.mustWait = mustWait 105 self.active = active 106 self.param = param
171class TrafficLightDomain(Domain): 172 173 Phase = Phase 174 Logic = Logic 175 176 def __init__(self, name="trafficlight", deprecatedFor=None): 177 Domain.__init__(self, name, tc.CMD_GET_TL_VARIABLE, tc.CMD_SET_TL_VARIABLE, 178 tc.CMD_SUBSCRIBE_TL_VARIABLE, tc.RESPONSE_SUBSCRIBE_TL_VARIABLE, 179 tc.CMD_SUBSCRIBE_TL_CONTEXT, tc.RESPONSE_SUBSCRIBE_TL_CONTEXT, 180 _RETURN_VALUE_FUNC, deprecatedFor) 181 182 def getRedYellowGreenState(self, tlsID): 183 """getRedYellowGreenState(string) -> string 184 185 Returns the named tl's state as a tuple of light definitions from 186 rugGyYoO, for red, yed-yellow, green, yellow, off, where lower case letters mean that the stream 187 has to decelerate. 188 """ 189 return self._getUniversal(tc.TL_RED_YELLOW_GREEN_STATE, tlsID) 190 191 def getAllProgramLogics(self, tlsID): 192 """getAllProgramLogics(string) -> tuple(Logic) 193 194 Returns a tuple of Logic objects. 195 Each Logic encodes a traffic light program for the given tlsID. 196 """ 197 return self._getUniversal(tc.TL_COMPLETE_DEFINITION_RYG, tlsID) 198 199 getCompleteRedYellowGreenDefinition = deprecated("getCompleteRedYellowGreenDefinition")(getAllProgramLogics) 200 201 def getControlledLanes(self, tlsID): 202 """getControlledLanes(string) -> tuple(string) 203 204 Returns the tuple of lanes which are controlled by the named traffic light. 205 """ 206 return self._getUniversal(tc.TL_CONTROLLED_LANES, tlsID) 207 208 def getControlledLinks(self, tlsID): 209 """getControlledLinks(string) -> tuple(tuple(tuple(string))) 210 211 Returns the links controlled by the traffic light, sorted by the signal index and described by giving 212 the incoming, outgoing, and via lane. 213 """ 214 return self._getUniversal(tc.TL_CONTROLLED_LINKS, tlsID) 215 216 def getProgram(self, tlsID): 217 """getProgram(string) -> string 218 219 Returns the id of the current program. 220 """ 221 return self._getUniversal(tc.TL_CURRENT_PROGRAM, tlsID) 222 223 def getPhase(self, tlsID): 224 """getPhase(string) -> integer 225 226 Returns the index of the current phase within the list of all phases of 227 the current program. 228 """ 229 return self._getUniversal(tc.TL_CURRENT_PHASE, tlsID) 230 231 def getPhaseName(self, tlsID): 232 """getPhaseName(string) -> string 233 Returns the name of the current phase. 234 """ 235 return self._getUniversal(tc.VAR_NAME, tlsID) 236 237 def getNextSwitch(self, tlsID): 238 """getNextSwitch(string) -> double 239 240 Returns the absolute simulation time at which the traffic light is 241 schedule to switch to the next phase (in seconds). 242 """ 243 return self._getUniversal(tc.TL_NEXT_SWITCH, tlsID) 244 245 def getSpentDuration(self, tlsID): 246 """getSpentDuration(string) -> double 247 248 Returns the time in seconds for which the current phase has been active 249 """ 250 return self._getUniversal(tc.TL_SPENT_DURATION, tlsID) 251 252 def getPhaseDuration(self, tlsID): 253 """getPhaseDuration(string) -> double 254 255 Returns the total duration of the current phase (in seconds). This value 256 is not affected by the elapsed or remaining duration of the current phase. 257 """ 258 return self._getUniversal(tc.TL_PHASE_DURATION, tlsID) 259 260 def getServedPersonCount(self, tlsID, index): 261 """getServedPersonCount(string, int) -> int 262 Returns the number of persons that would be served in the given phase 263 """ 264 return self._getUniversal(tc.VAR_PERSON_NUMBER, tlsID, "i", index) 265 266 def getBlockingVehicles(self, tlsID, linkIndex): 267 """getBlockingVehicles(string, int) -> tuple(string) 268 Returns the tuple of vehicles that are blocking the subsequent block for 269 the given tls-linkIndex 270 """ 271 return self._getUniversal(tc.TL_BLOCKING_VEHICLES, tlsID, "i", linkIndex) 272 273 def getRivalVehicles(self, tlsID, linkIndex): 274 """getRivalVehicles(string, int) -> tuple(string) 275 Returns the tuple of vehicles that also wish to enter the subsequent block for 276 the given tls-linkIndex (regardless of priority) 277 """ 278 return self._getUniversal(tc.TL_RIVAL_VEHICLES, tlsID, "i", linkIndex) 279 280 def getPriorityVehicles(self, tlsID, linkIndex): 281 """getPriorityVehicles(string, int) -> tuple(string) 282 Returns the tuple of vehicles that also wish to enter the subsequent block for 283 the given tls-linkIndex (only those with higher priority) 284 """ 285 return self._getUniversal(tc.TL_PRIORITY_VEHICLES, tlsID, "i", linkIndex) 286 287 def getConstraints(self, tlsID, tripId=""): 288 """getConstraints(string, string) -> tuple(Constraint) 289 Returns the tuple of rail signal constraints for the given rail signal. 290 If tripId is not "", only constraints with the given tripId are 291 returned. Otherwise, all constraints are returned 292 """ 293 return self._getUniversal(tc.TL_CONSTRAINT, tlsID, "s", tripId) 294 295 def getConstraintsByFoe(self, foeSignal, foeId=""): 296 """getConstraintsByFoe(string, string) -> tuple(Constraint) 297 Returns the tuple of rail signal constraints that have the given rail 298 signal id as their foeSignal. 299 If foeId is not "", only constraints with the given foeId are 300 returned. Otherwise, all constraints are returned 301 """ 302 return self._getUniversal(tc.TL_CONSTRAINT_BYFOE, foeSignal, "s", foeId) 303 304 def setRedYellowGreenState(self, tlsID, state): 305 """setRedYellowGreenState(string, string) -> None 306 307 Sets the named tl's state as a tuple of light definitions from 308 rugGyYuoO, for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has 309 to decelerate. 310 """ 311 self._setCmd(tc.TL_RED_YELLOW_GREEN_STATE, tlsID, "s", state) 312 313 def setLinkState(self, tlsID, tlsLinkIndex, state): 314 """setLinkState(string, string, int, string) -> None 315 Sets the state for the given tls and link index. The state must be one 316 of rRgGyYoOu for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has 317 to decelerate. 318 The link index is shown in the GUI when setting the appropriate junction 319 visualization option. 320 """ 321 fullState = list(self.getRedYellowGreenState(tlsID)) 322 if tlsLinkIndex >= len(fullState): 323 raise TraCIException("Invalid tlsLinkIndex %s for tls '%s' with maximum index %s." % ( 324 tlsLinkIndex, tlsID, len(fullState) - 1)) 325 else: 326 fullState[tlsLinkIndex] = state 327 self.setRedYellowGreenState(tlsID, ''.join(fullState)) 328 329 def setPhase(self, tlsID, index): 330 """setPhase(string, integer) -> None 331 332 Switches to the phase with the given index in the list of all phases for 333 the current program. 334 """ 335 self._setCmd(tc.TL_PHASE_INDEX, tlsID, "i", index) 336 337 def setPhaseName(self, tlsID, name): 338 """setPhaseName(string, string) -> None 339 340 Sets the name of the current phase within the current program 341 """ 342 self._setCmd(tc.VAR_NAME, tlsID, "s", name) 343 344 def setProgram(self, tlsID, programID): 345 """setProgram(string, string) -> None 346 347 Switches to the program with the given programID. The program must have 348 been loaded earlier. The special value 'off' can always be used to 349 switch off the traffic light. 350 """ 351 self._setCmd(tc.TL_PROGRAM, tlsID, "s", programID) 352 353 def getNemaPhaseCalls(self, tlsID): 354 """getNemaPhaseCalls(string) -> tuple(string) 355 Get the vehicle calls for the phases. 356 The output is vehicle calls (coming from the detectors) for the phases. 357 """ 358 vehCallStr = self.getParameter(tlsID, "NEMA.phaseCall") 359 return tuple(vehCallStr.split(",")) 360 361 def setNemaSplits(self, tlsID, splits): 362 """setNemaSplits(string, list(string)) -> None 363 Set a new set of splits to the given NEMA-controller. 364 This function is only effective for NEMA type of controllers. 365 The new splits will be implemented in the next cycle of the control. 366 The value must be a space-separated list of 8 numbers with each number 367 being the time in seconds for NEMA-phases 1 to 8. 368 Time 0 must be used of the phase does not exists. 369 Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" (gives total cycle length of 80s) 370 """ 371 if isinstance(splits, list): 372 splits = ' '.join(map(str, splits)) 373 self.setParameter(tlsID, "NEMA.splits", splits) 374 375 def setNemaMaxGreens(self, tlsID, maxGreens): 376 """setNemaMaxGreens(string, list(string)) -> None 377 Set a new set of splits to the given NEMA-controller. 378 This function is only effective for NEMA type of controllers. 379 The new max green will be implemented in the next cycle of the control. 380 The value must be a space-separated list of 8 numbers with each number 381 being the time in seconds for NEMA-phases 1 to 8. 382 Time 0 must be used of the phase does not exists. 383 Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" 384 """ 385 if isinstance(maxGreens, list): 386 maxGreens = ' '.join(map(str, maxGreens)) 387 self.setParameter(tlsID, "NEMA.maxGreens", maxGreens) 388 389 def setNemaCycleLength(self, tlsID, cycleLength): 390 """setNemaCycleLength(string, double) -> None 391 Set a new cycle length to the given NEMA-controller. 392 This function is only effective for NEMA type of controllers. 393 The new cycle length will be implemented in the next cycle of the control. 394 This function should be used with setNemaSplits or setNemaMaxGreen. 395 """ 396 self.setParameter(tlsID, "NEMA.cycleLength", str(cycleLength)) 397 398 def setNemaOffset(self, tlsID, offset): 399 """setNemaOffset(string, double) -> None 400 Set a new offset to the given NEMA-controller. 401 This function is only effective for NEMA type controllers when operating under coordinated mode. 402 The new offset will be implemented in the next cycle of the control by adjusting 403 the actual green time of the coordinated phase. 404 There is no transition implemented in the NEMA-controller for changing the offset. 405 It’s expected that the users will control the change of the offset in each cycle 406 to implement their own transition algorithm. 407 """ 408 self.setParameter(tlsID, "NEMA.offset", str(offset)) 409 410 def setPhaseDuration(self, tlsID, phaseDuration): 411 """setPhaseDuration(string, double) -> None 412 413 Set the remaining phase duration of the current phase in seconds. 414 This value has no effect on subsquent repetitions of this phase. 415 """ 416 self._setCmd(tc.TL_PHASE_DURATION, tlsID, "d", phaseDuration) 417 418 @alias_param("logic", "tls") 419 def setProgramLogic(self, tlsID, logic): 420 """setProgramLogic(string, Logic) -> None 421 422 Sets a new program for the given tlsID from a Logic object. 423 See getAllProgramLogics which returns a tuple of Logic objects. 424 """ 425 format = "tsiit" 426 values = [5, logic.programID, logic.type, logic.currentPhaseIndex, len(logic.phases)] 427 for p in logic.phases: 428 format += "tdsddt" 429 values += [6, p.duration, p.state, p.minDur, p.maxDur, len(p.next)] 430 for n in p.next: 431 format += "i" 432 values += [n] 433 format += "s" 434 values += [p.name] 435 # subparams 436 format += "t" 437 values += [len(logic.subParameter)] 438 for par in logic.subParameter.items(): 439 format += "l" 440 values += [par] 441 self._setCmd(tc.TL_COMPLETE_PROGRAM_RYG, tlsID, format, *values) 442 443 setCompleteRedYellowGreenDefinition = deprecated("setCompleteRedYellowGreenDefinition")(setProgramLogic) 444 445 def addConstraint(self, tlsID, tripId, foeSignal, foeId, type=0, limit=0): 446 """addConstraint(string, string, string, string, int, int) -> None 447 Add the given constraint. 448 """ 449 self._setCmd(tc.TL_CONSTRAINT_ADD, tlsID, "tsssii", 5, tripId, foeSignal, foeId, type, limit) 450 451 def swapConstraints(self, tlsID, tripId, foeSignal, foeId): 452 """swapConstraints(string, string, string, string) -> tuple(Constraint) 453 Reverse the given constraint and return a tuple of new constraints that 454 were created (by swapping) to avoid deadlock. 455 """ 456 return self._getUniversal(tc.TL_CONSTRAINT_SWAP, tlsID, "tsss", 3, tripId, foeSignal, foeId) 457 458 def removeConstraints(self, tlsID, tripId, foeSignal, foeId): 459 """removeConstraints(string, string, string, string) 460 remove constraints with the given values. Any combination of inputs may 461 be set to "" to act as a wildcard filter """ 462 self._setCmd(tc.TL_CONSTRAINT_REMOVE, tlsID, "tsss", 3, tripId, foeSignal, foeId) 463 464 def updateConstraints(self, vehID, tripId=""): 465 """getConstraints(string, string) 466 Removes all constraints that can no longer be met because the route of 467 vehID does not pass traffic light of the constraint with the given tripId. 468 This includes constraints on tripId as well as constraints where tripId 469 is the foeId. 470 If tripId is "", the current tripId of vehID is used. 471 """ 472 self._setCmd(tc.TL_CONSTRAINT_UPDATE, vehID, "s", tripId)
176 def __init__(self, name="trafficlight", deprecatedFor=None): 177 Domain.__init__(self, name, tc.CMD_GET_TL_VARIABLE, tc.CMD_SET_TL_VARIABLE, 178 tc.CMD_SUBSCRIBE_TL_VARIABLE, tc.RESPONSE_SUBSCRIBE_TL_VARIABLE, 179 tc.CMD_SUBSCRIBE_TL_CONTEXT, tc.RESPONSE_SUBSCRIBE_TL_CONTEXT, 180 _RETURN_VALUE_FUNC, deprecatedFor)
182 def getRedYellowGreenState(self, tlsID): 183 """getRedYellowGreenState(string) -> string 184 185 Returns the named tl's state as a tuple of light definitions from 186 rugGyYoO, for red, yed-yellow, green, yellow, off, where lower case letters mean that the stream 187 has to decelerate. 188 """ 189 return self._getUniversal(tc.TL_RED_YELLOW_GREEN_STATE, tlsID)
getRedYellowGreenState(string) -> string
Returns the named tl's state as a tuple of light definitions from rugGyYoO, for red, yed-yellow, green, yellow, off, where lower case letters mean that the stream has to decelerate.
191 def getAllProgramLogics(self, tlsID): 192 """getAllProgramLogics(string) -> tuple(Logic) 193 194 Returns a tuple of Logic objects. 195 Each Logic encodes a traffic light program for the given tlsID. 196 """ 197 return self._getUniversal(tc.TL_COMPLETE_DEFINITION_RYG, tlsID)
getAllProgramLogics(string) -> tuple(Logic)
Returns a tuple of Logic objects. Each Logic encodes a traffic light program for the given tlsID.
191 def getAllProgramLogics(self, tlsID): 192 """getAllProgramLogics(string) -> tuple(Logic) 193 194 Returns a tuple of Logic objects. 195 Each Logic encodes a traffic light program for the given tlsID. 196 """ 197 return self._getUniversal(tc.TL_COMPLETE_DEFINITION_RYG, tlsID)
getAllProgramLogics(string) -> tuple(Logic)
Returns a tuple of Logic objects. Each Logic encodes a traffic light program for the given tlsID.
201 def getControlledLanes(self, tlsID): 202 """getControlledLanes(string) -> tuple(string) 203 204 Returns the tuple of lanes which are controlled by the named traffic light. 205 """ 206 return self._getUniversal(tc.TL_CONTROLLED_LANES, tlsID)
getControlledLanes(string) -> tuple(string)
Returns the tuple of lanes which are controlled by the named traffic light.
208 def getControlledLinks(self, tlsID): 209 """getControlledLinks(string) -> tuple(tuple(tuple(string))) 210 211 Returns the links controlled by the traffic light, sorted by the signal index and described by giving 212 the incoming, outgoing, and via lane. 213 """ 214 return self._getUniversal(tc.TL_CONTROLLED_LINKS, tlsID)
getControlledLinks(string) -> tuple(tuple(tuple(string)))
Returns the links controlled by the traffic light, sorted by the signal index and described by giving the incoming, outgoing, and via lane.
216 def getProgram(self, tlsID): 217 """getProgram(string) -> string 218 219 Returns the id of the current program. 220 """ 221 return self._getUniversal(tc.TL_CURRENT_PROGRAM, tlsID)
getProgram(string) -> string
Returns the id of the current program.
223 def getPhase(self, tlsID): 224 """getPhase(string) -> integer 225 226 Returns the index of the current phase within the list of all phases of 227 the current program. 228 """ 229 return self._getUniversal(tc.TL_CURRENT_PHASE, tlsID)
getPhase(string) -> integer
Returns the index of the current phase within the list of all phases of the current program.
231 def getPhaseName(self, tlsID): 232 """getPhaseName(string) -> string 233 Returns the name of the current phase. 234 """ 235 return self._getUniversal(tc.VAR_NAME, tlsID)
getPhaseName(string) -> string Returns the name of the current phase.
237 def getNextSwitch(self, tlsID): 238 """getNextSwitch(string) -> double 239 240 Returns the absolute simulation time at which the traffic light is 241 schedule to switch to the next phase (in seconds). 242 """ 243 return self._getUniversal(tc.TL_NEXT_SWITCH, tlsID)
getNextSwitch(string) -> double
Returns the absolute simulation time at which the traffic light is schedule to switch to the next phase (in seconds).
245 def getSpentDuration(self, tlsID): 246 """getSpentDuration(string) -> double 247 248 Returns the time in seconds for which the current phase has been active 249 """ 250 return self._getUniversal(tc.TL_SPENT_DURATION, tlsID)
getSpentDuration(string) -> double
Returns the time in seconds for which the current phase has been active
252 def getPhaseDuration(self, tlsID): 253 """getPhaseDuration(string) -> double 254 255 Returns the total duration of the current phase (in seconds). This value 256 is not affected by the elapsed or remaining duration of the current phase. 257 """ 258 return self._getUniversal(tc.TL_PHASE_DURATION, tlsID)
getPhaseDuration(string) -> double
Returns the total duration of the current phase (in seconds). This value is not affected by the elapsed or remaining duration of the current phase.
260 def getServedPersonCount(self, tlsID, index): 261 """getServedPersonCount(string, int) -> int 262 Returns the number of persons that would be served in the given phase 263 """ 264 return self._getUniversal(tc.VAR_PERSON_NUMBER, tlsID, "i", index)
getServedPersonCount(string, int) -> int Returns the number of persons that would be served in the given phase
266 def getBlockingVehicles(self, tlsID, linkIndex): 267 """getBlockingVehicles(string, int) -> tuple(string) 268 Returns the tuple of vehicles that are blocking the subsequent block for 269 the given tls-linkIndex 270 """ 271 return self._getUniversal(tc.TL_BLOCKING_VEHICLES, tlsID, "i", linkIndex)
getBlockingVehicles(string, int) -> tuple(string) Returns the tuple of vehicles that are blocking the subsequent block for the given tls-linkIndex
273 def getRivalVehicles(self, tlsID, linkIndex): 274 """getRivalVehicles(string, int) -> tuple(string) 275 Returns the tuple of vehicles that also wish to enter the subsequent block for 276 the given tls-linkIndex (regardless of priority) 277 """ 278 return self._getUniversal(tc.TL_RIVAL_VEHICLES, tlsID, "i", linkIndex)
getRivalVehicles(string, int) -> tuple(string) Returns the tuple of vehicles that also wish to enter the subsequent block for the given tls-linkIndex (regardless of priority)
280 def getPriorityVehicles(self, tlsID, linkIndex): 281 """getPriorityVehicles(string, int) -> tuple(string) 282 Returns the tuple of vehicles that also wish to enter the subsequent block for 283 the given tls-linkIndex (only those with higher priority) 284 """ 285 return self._getUniversal(tc.TL_PRIORITY_VEHICLES, tlsID, "i", linkIndex)
getPriorityVehicles(string, int) -> tuple(string) Returns the tuple of vehicles that also wish to enter the subsequent block for the given tls-linkIndex (only those with higher priority)
287 def getConstraints(self, tlsID, tripId=""): 288 """getConstraints(string, string) -> tuple(Constraint) 289 Returns the tuple of rail signal constraints for the given rail signal. 290 If tripId is not "", only constraints with the given tripId are 291 returned. Otherwise, all constraints are returned 292 """ 293 return self._getUniversal(tc.TL_CONSTRAINT, tlsID, "s", tripId)
getConstraints(string, string) -> tuple(Constraint) Returns the tuple of rail signal constraints for the given rail signal. If tripId is not "", only constraints with the given tripId are returned. Otherwise, all constraints are returned
295 def getConstraintsByFoe(self, foeSignal, foeId=""): 296 """getConstraintsByFoe(string, string) -> tuple(Constraint) 297 Returns the tuple of rail signal constraints that have the given rail 298 signal id as their foeSignal. 299 If foeId is not "", only constraints with the given foeId are 300 returned. Otherwise, all constraints are returned 301 """ 302 return self._getUniversal(tc.TL_CONSTRAINT_BYFOE, foeSignal, "s", foeId)
getConstraintsByFoe(string, string) -> tuple(Constraint) Returns the tuple of rail signal constraints that have the given rail signal id as their foeSignal. If foeId is not "", only constraints with the given foeId are returned. Otherwise, all constraints are returned
304 def setRedYellowGreenState(self, tlsID, state): 305 """setRedYellowGreenState(string, string) -> None 306 307 Sets the named tl's state as a tuple of light definitions from 308 rugGyYuoO, for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has 309 to decelerate. 310 """ 311 self._setCmd(tc.TL_RED_YELLOW_GREEN_STATE, tlsID, "s", state)
setRedYellowGreenState(string, string) -> None
Sets the named tl's state as a tuple of light definitions from rugGyYuoO, for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has to decelerate.
313 def setLinkState(self, tlsID, tlsLinkIndex, state): 314 """setLinkState(string, string, int, string) -> None 315 Sets the state for the given tls and link index. The state must be one 316 of rRgGyYoOu for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has 317 to decelerate. 318 The link index is shown in the GUI when setting the appropriate junction 319 visualization option. 320 """ 321 fullState = list(self.getRedYellowGreenState(tlsID)) 322 if tlsLinkIndex >= len(fullState): 323 raise TraCIException("Invalid tlsLinkIndex %s for tls '%s' with maximum index %s." % ( 324 tlsLinkIndex, tlsID, len(fullState) - 1)) 325 else: 326 fullState[tlsLinkIndex] = state 327 self.setRedYellowGreenState(tlsID, ''.join(fullState))
setLinkState(string, string, int, string) -> None Sets the state for the given tls and link index. The state must be one of rRgGyYoOu for red, red-yellow, green, yellow, off, where lower case letters mean that the stream has to decelerate. The link index is shown in the GUI when setting the appropriate junction visualization option.
329 def setPhase(self, tlsID, index): 330 """setPhase(string, integer) -> None 331 332 Switches to the phase with the given index in the list of all phases for 333 the current program. 334 """ 335 self._setCmd(tc.TL_PHASE_INDEX, tlsID, "i", index)
setPhase(string, integer) -> None
Switches to the phase with the given index in the list of all phases for the current program.
337 def setPhaseName(self, tlsID, name): 338 """setPhaseName(string, string) -> None 339 340 Sets the name of the current phase within the current program 341 """ 342 self._setCmd(tc.VAR_NAME, tlsID, "s", name)
setPhaseName(string, string) -> None
Sets the name of the current phase within the current program
344 def setProgram(self, tlsID, programID): 345 """setProgram(string, string) -> None 346 347 Switches to the program with the given programID. The program must have 348 been loaded earlier. The special value 'off' can always be used to 349 switch off the traffic light. 350 """ 351 self._setCmd(tc.TL_PROGRAM, tlsID, "s", programID)
setProgram(string, string) -> None
Switches to the program with the given programID. The program must have been loaded earlier. The special value 'off' can always be used to switch off the traffic light.
353 def getNemaPhaseCalls(self, tlsID): 354 """getNemaPhaseCalls(string) -> tuple(string) 355 Get the vehicle calls for the phases. 356 The output is vehicle calls (coming from the detectors) for the phases. 357 """ 358 vehCallStr = self.getParameter(tlsID, "NEMA.phaseCall") 359 return tuple(vehCallStr.split(","))
getNemaPhaseCalls(string) -> tuple(string) Get the vehicle calls for the phases. The output is vehicle calls (coming from the detectors) for the phases.
361 def setNemaSplits(self, tlsID, splits): 362 """setNemaSplits(string, list(string)) -> None 363 Set a new set of splits to the given NEMA-controller. 364 This function is only effective for NEMA type of controllers. 365 The new splits will be implemented in the next cycle of the control. 366 The value must be a space-separated list of 8 numbers with each number 367 being the time in seconds for NEMA-phases 1 to 8. 368 Time 0 must be used of the phase does not exists. 369 Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" (gives total cycle length of 80s) 370 """ 371 if isinstance(splits, list): 372 splits = ' '.join(map(str, splits)) 373 self.setParameter(tlsID, "NEMA.splits", splits)
setNemaSplits(string, list(string)) -> None Set a new set of splits to the given NEMA-controller. This function is only effective for NEMA type of controllers. The new splits will be implemented in the next cycle of the control. The value must be a space-separated list of 8 numbers with each number being the time in seconds for NEMA-phases 1 to 8. Time 0 must be used of the phase does not exists. Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" (gives total cycle length of 80s)
375 def setNemaMaxGreens(self, tlsID, maxGreens): 376 """setNemaMaxGreens(string, list(string)) -> None 377 Set a new set of splits to the given NEMA-controller. 378 This function is only effective for NEMA type of controllers. 379 The new max green will be implemented in the next cycle of the control. 380 The value must be a space-separated list of 8 numbers with each number 381 being the time in seconds for NEMA-phases 1 to 8. 382 Time 0 must be used of the phase does not exists. 383 Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0" 384 """ 385 if isinstance(maxGreens, list): 386 maxGreens = ' '.join(map(str, maxGreens)) 387 self.setParameter(tlsID, "NEMA.maxGreens", maxGreens)
setNemaMaxGreens(string, list(string)) -> None Set a new set of splits to the given NEMA-controller. This function is only effective for NEMA type of controllers. The new max green will be implemented in the next cycle of the control. The value must be a space-separated list of 8 numbers with each number being the time in seconds for NEMA-phases 1 to 8. Time 0 must be used of the phase does not exists. Example: “11.0 34.0 15.0 20.0 11.0 34.0 15.0 20.0"
389 def setNemaCycleLength(self, tlsID, cycleLength): 390 """setNemaCycleLength(string, double) -> None 391 Set a new cycle length to the given NEMA-controller. 392 This function is only effective for NEMA type of controllers. 393 The new cycle length will be implemented in the next cycle of the control. 394 This function should be used with setNemaSplits or setNemaMaxGreen. 395 """ 396 self.setParameter(tlsID, "NEMA.cycleLength", str(cycleLength))
setNemaCycleLength(string, double) -> None Set a new cycle length to the given NEMA-controller. This function is only effective for NEMA type of controllers. The new cycle length will be implemented in the next cycle of the control. This function should be used with setNemaSplits or setNemaMaxGreen.
398 def setNemaOffset(self, tlsID, offset): 399 """setNemaOffset(string, double) -> None 400 Set a new offset to the given NEMA-controller. 401 This function is only effective for NEMA type controllers when operating under coordinated mode. 402 The new offset will be implemented in the next cycle of the control by adjusting 403 the actual green time of the coordinated phase. 404 There is no transition implemented in the NEMA-controller for changing the offset. 405 It’s expected that the users will control the change of the offset in each cycle 406 to implement their own transition algorithm. 407 """ 408 self.setParameter(tlsID, "NEMA.offset", str(offset))
setNemaOffset(string, double) -> None Set a new offset to the given NEMA-controller. This function is only effective for NEMA type controllers when operating under coordinated mode. The new offset will be implemented in the next cycle of the control by adjusting the actual green time of the coordinated phase. There is no transition implemented in the NEMA-controller for changing the offset. It’s expected that the users will control the change of the offset in each cycle to implement their own transition algorithm.
410 def setPhaseDuration(self, tlsID, phaseDuration): 411 """setPhaseDuration(string, double) -> None 412 413 Set the remaining phase duration of the current phase in seconds. 414 This value has no effect on subsquent repetitions of this phase. 415 """ 416 self._setCmd(tc.TL_PHASE_DURATION, tlsID, "d", phaseDuration)
setPhaseDuration(string, double) -> None
Set the remaining phase duration of the current phase in seconds. This value has no effect on subsquent repetitions of this phase.
418 @alias_param("logic", "tls") 419 def setProgramLogic(self, tlsID, logic): 420 """setProgramLogic(string, Logic) -> None 421 422 Sets a new program for the given tlsID from a Logic object. 423 See getAllProgramLogics which returns a tuple of Logic objects. 424 """ 425 format = "tsiit" 426 values = [5, logic.programID, logic.type, logic.currentPhaseIndex, len(logic.phases)] 427 for p in logic.phases: 428 format += "tdsddt" 429 values += [6, p.duration, p.state, p.minDur, p.maxDur, len(p.next)] 430 for n in p.next: 431 format += "i" 432 values += [n] 433 format += "s" 434 values += [p.name] 435 # subparams 436 format += "t" 437 values += [len(logic.subParameter)] 438 for par in logic.subParameter.items(): 439 format += "l" 440 values += [par] 441 self._setCmd(tc.TL_COMPLETE_PROGRAM_RYG, tlsID, format, *values)
setProgramLogic(string, Logic) -> None
Sets a new program for the given tlsID from a Logic object. See getAllProgramLogics which returns a tuple of Logic objects.
418 @alias_param("logic", "tls") 419 def setProgramLogic(self, tlsID, logic): 420 """setProgramLogic(string, Logic) -> None 421 422 Sets a new program for the given tlsID from a Logic object. 423 See getAllProgramLogics which returns a tuple of Logic objects. 424 """ 425 format = "tsiit" 426 values = [5, logic.programID, logic.type, logic.currentPhaseIndex, len(logic.phases)] 427 for p in logic.phases: 428 format += "tdsddt" 429 values += [6, p.duration, p.state, p.minDur, p.maxDur, len(p.next)] 430 for n in p.next: 431 format += "i" 432 values += [n] 433 format += "s" 434 values += [p.name] 435 # subparams 436 format += "t" 437 values += [len(logic.subParameter)] 438 for par in logic.subParameter.items(): 439 format += "l" 440 values += [par] 441 self._setCmd(tc.TL_COMPLETE_PROGRAM_RYG, tlsID, format, *values)
setProgramLogic(string, Logic) -> None
Sets a new program for the given tlsID from a Logic object. See getAllProgramLogics which returns a tuple of Logic objects.
445 def addConstraint(self, tlsID, tripId, foeSignal, foeId, type=0, limit=0): 446 """addConstraint(string, string, string, string, int, int) -> None 447 Add the given constraint. 448 """ 449 self._setCmd(tc.TL_CONSTRAINT_ADD, tlsID, "tsssii", 5, tripId, foeSignal, foeId, type, limit)
addConstraint(string, string, string, string, int, int) -> None Add the given constraint.
451 def swapConstraints(self, tlsID, tripId, foeSignal, foeId): 452 """swapConstraints(string, string, string, string) -> tuple(Constraint) 453 Reverse the given constraint and return a tuple of new constraints that 454 were created (by swapping) to avoid deadlock. 455 """ 456 return self._getUniversal(tc.TL_CONSTRAINT_SWAP, tlsID, "tsss", 3, tripId, foeSignal, foeId)
swapConstraints(string, string, string, string) -> tuple(Constraint) Reverse the given constraint and return a tuple of new constraints that were created (by swapping) to avoid deadlock.
458 def removeConstraints(self, tlsID, tripId, foeSignal, foeId): 459 """removeConstraints(string, string, string, string) 460 remove constraints with the given values. Any combination of inputs may 461 be set to "" to act as a wildcard filter """ 462 self._setCmd(tc.TL_CONSTRAINT_REMOVE, tlsID, "tsss", 3, tripId, foeSignal, foeId)
removeConstraints(string, string, string, string) remove constraints with the given values. Any combination of inputs may be set to "" to act as a wildcard filter
464 def updateConstraints(self, vehID, tripId=""): 465 """getConstraints(string, string) 466 Removes all constraints that can no longer be met because the route of 467 vehID does not pass traffic light of the constraint with the given tripId. 468 This includes constraints on tripId as well as constraints where tripId 469 is the foeId. 470 If tripId is "", the current tripId of vehID is used. 471 """ 472 self._setCmd(tc.TL_CONSTRAINT_UPDATE, vehID, "s", tripId)
getConstraints(string, string) Removes all constraints that can no longer be met because the route of vehID does not pass traffic light of the constraint with the given tripId. This includes constraints on tripId as well as constraints where tripId is the foeId. If tripId is "", the current tripId of vehID is used.
111class Phase: 112 113 def __init__(self, duration, state, minDur=None, maxDur=None, next=tuple(), name="", earlyTarget=""): 114 """ 115 Constructs a traffic light phase 116 duration (float): the duration of the phase in seconds 117 state (string): the state codes for each controlled link 118 minDur (float): the minimum duration (ignored by static tls) 119 maxDur (float): the maximum duration (ignored by static tls) 120 next (intList): possible succesor phase (optional) 121 name (string): the name of the phase 122 earlyTarget (string): early switching to phase with the given index(es) 123 """ 124 self.duration = duration 125 self.state = state 126 # minimum and maximum duration (only for actuated tls) 127 self.minDur = minDur if minDur is not None else duration 128 self.maxDur = maxDur if maxDur is not None else duration 129 self.next = next 130 self.name = name 131 self.earlyTarget = earlyTarget 132 133 def __repr__(self): 134 name = (", name='%s'" % self.name) if self.name else "" 135 next = (", next='%s'" % str(self.next)) if self.next else "" 136 earlyTarget = (", earlyTarget='%s'" % self.earlyTarget) if self.earlyTarget else "" 137 return ("Phase(duration=%s, state='%s', minDur=%s, maxDur=%s%s%s%s)" 138 % (self.duration, self.state, self.minDur, self.maxDur, name, next, earlyTarget))
Inherited Members
26class Logic: 27 28 def __init__(self, programID, type, currentPhaseIndex, phases=None, subParameter=None): 29 """ 30 Constructs a new Logic object with the following arguments: 31 programID (string) - arbitrary 32 type (integer) - one of 33 tc.TRAFFICLIGHT_TYPE_STATIC 34 tc.TRAFFICLIGHT_TYPE_ACTUATED 35 tc.TRAFFICLIGHT_TYPE_NEMA 36 tc.TRAFFICLIGHT_TYPE_DELAYBASED 37 currentPhaseIndex (integer) - must be in range [0, len(phases) - 1] 38 phases (list of Phase objects) 39 subParameter (dictl) - corresponds to generic <param> objects 40 """ 41 self.programID = programID 42 self.type = type 43 self.currentPhaseIndex = currentPhaseIndex 44 self.phases = phases if phases is not None else [] 45 self.subParameter = subParameter if subParameter is not None else {} 46 47 def getPhases(self): 48 return self.phases 49 50 def getSubID(self): 51 return self.programID 52 53 def getType(self): 54 return self.type 55 56 def getParameters(self): 57 return self.subParameter 58 59 def getParameter(self, key, default=None): 60 return self.subParameter.get(key, default) 61 62 def __repr__(self): 63 return ("Logic(programID='%s', type=%s, currentPhaseIndex=%s, phases=%s, subParameter=%s)" % 64 (self.programID, self.type, self.currentPhaseIndex, self.phases, self.subParameter))