sumolib.output.convert.fcdfilter

 1# Eclipse SUMO, Simulation of Urban MObility; see https://eclipse.dev/sumo
 2# Copyright (C) 2017-2026 German Aerospace Center (DLR) and others.
 3# This program and the accompanying materials are made available under the
 4# terms of the Eclipse Public License 2.0 which is available at
 5# https://www.eclipse.org/legal/epl-2.0/
 6# This Source Code may also be made available under the following Secondary
 7# Licenses when the conditions for such availability set forth in the Eclipse
 8# Public License 2.0 are satisfied: GNU General Public License, version 2
 9# or later which is available at
10# https://www.gnu.org/licenses/old-licenses/gpl-2.0-standalone.html
11# SPDX-License-Identifier: EPL-2.0 OR GPL-2.0-or-later
12
13# @file    fcdfilter.py
14# @author  Evamarie Wiessner
15# @author  Michael Behrisch
16# @date    2017-08-15
17
18from __future__ import print_function
19from __future__ import absolute_import
20
21
22class Filter:
23    def __init__(self, opts, begin, end, bbox, outfile):
24        self._begin = float(begin)
25        self._end = float(end)
26        self._bbox = [float(b) for b in bbox.split(",")]
27        self._out = open(outfile, "w")
28        if opts.get("comment") is not None:
29            k, v = opts.get("comment").split(":")
30            self._out.write("""<!--
31    <time>
32        <begin value="%s"/>
33        <end value="%s"/>
34        <%s value="%s"/>
35    </time>
36-->
37""" % (begin, end, k, v))
38        self._out.write("<fcd-export>\n")
39        self._type = opts.get("type")
40        self._active = False
41
42    def done(self, t):
43        if t >= self._end:
44            self.close()
45            return True
46        return False
47
48    def checkTime(self, t):
49        if self._active:
50            self._out.write('    </timestep>\n')
51        self._active = (t >= self._begin and t < self._end)
52        if self._active:
53            self._out.write('    <timestep time="%s">\n' % t)
54        return self._active
55
56    def write(self, v):
57        lon, lat = float(v.x), float(v.y)
58        if lon >= self._bbox[0] and lat >= self._bbox[1] and lon < self._bbox[2] and lat < self._bbox[3]:
59            if self._type is not None:
60                v.type = self._type
61            self._out.write(v.toXML(8 * " "))
62
63    def close(self):
64        if self._active:
65            self._out.write('    </timestep>\n')
66        self._out.write("</fcd-export>\n")
67        self._out.close()
68
69
70def fcdfilter(inpFCD, outSTRM, further):
71    filters = []
72    with open(further["filter"]) as ff:
73        for line in ff:
74            filters.append(Filter(further, *line.strip().split(";")))
75    for timestep in inpFCD:
76        t = float(timestep.time)
77        filters = [f for f in filters if not f.done(t)]
78        if not filters:
79            break
80        active = [f for f in filters if f.checkTime(t)]
81        if timestep.vehicle and active:
82            for v in timestep.vehicle:
83                for f in active:
84                    f.write(v)
85    for f in filters:
86        f.close()
class Filter:
23class Filter:
24    def __init__(self, opts, begin, end, bbox, outfile):
25        self._begin = float(begin)
26        self._end = float(end)
27        self._bbox = [float(b) for b in bbox.split(",")]
28        self._out = open(outfile, "w")
29        if opts.get("comment") is not None:
30            k, v = opts.get("comment").split(":")
31            self._out.write("""<!--
32    <time>
33        <begin value="%s"/>
34        <end value="%s"/>
35        <%s value="%s"/>
36    </time>
37-->
38""" % (begin, end, k, v))
39        self._out.write("<fcd-export>\n")
40        self._type = opts.get("type")
41        self._active = False
42
43    def done(self, t):
44        if t >= self._end:
45            self.close()
46            return True
47        return False
48
49    def checkTime(self, t):
50        if self._active:
51            self._out.write('    </timestep>\n')
52        self._active = (t >= self._begin and t < self._end)
53        if self._active:
54            self._out.write('    <timestep time="%s">\n' % t)
55        return self._active
56
57    def write(self, v):
58        lon, lat = float(v.x), float(v.y)
59        if lon >= self._bbox[0] and lat >= self._bbox[1] and lon < self._bbox[2] and lat < self._bbox[3]:
60            if self._type is not None:
61                v.type = self._type
62            self._out.write(v.toXML(8 * " "))
63
64    def close(self):
65        if self._active:
66            self._out.write('    </timestep>\n')
67        self._out.write("</fcd-export>\n")
68        self._out.close()
Filter(opts, begin, end, bbox, outfile)
24    def __init__(self, opts, begin, end, bbox, outfile):
25        self._begin = float(begin)
26        self._end = float(end)
27        self._bbox = [float(b) for b in bbox.split(",")]
28        self._out = open(outfile, "w")
29        if opts.get("comment") is not None:
30            k, v = opts.get("comment").split(":")
31            self._out.write("""<!--
32    <time>
33        <begin value="%s"/>
34        <end value="%s"/>
35        <%s value="%s"/>
36    </time>
37-->
38""" % (begin, end, k, v))
39        self._out.write("<fcd-export>\n")
40        self._type = opts.get("type")
41        self._active = False
def done(self, t):
43    def done(self, t):
44        if t >= self._end:
45            self.close()
46            return True
47        return False
def checkTime(self, t):
49    def checkTime(self, t):
50        if self._active:
51            self._out.write('    </timestep>\n')
52        self._active = (t >= self._begin and t < self._end)
53        if self._active:
54            self._out.write('    <timestep time="%s">\n' % t)
55        return self._active
def write(self, v):
57    def write(self, v):
58        lon, lat = float(v.x), float(v.y)
59        if lon >= self._bbox[0] and lat >= self._bbox[1] and lon < self._bbox[2] and lat < self._bbox[3]:
60            if self._type is not None:
61                v.type = self._type
62            self._out.write(v.toXML(8 * " "))
def close(self):
64    def close(self):
65        if self._active:
66            self._out.write('    </timestep>\n')
67        self._out.write("</fcd-export>\n")
68        self._out.close()
def fcdfilter(inpFCD, outSTRM, further):
71def fcdfilter(inpFCD, outSTRM, further):
72    filters = []
73    with open(further["filter"]) as ff:
74        for line in ff:
75            filters.append(Filter(further, *line.strip().split(";")))
76    for timestep in inpFCD:
77        t = float(timestep.time)
78        filters = [f for f in filters if not f.done(t)]
79        if not filters:
80            break
81        active = [f for f in filters if f.checkTime(t)]
82        if timestep.vehicle and active:
83            for v in timestep.vehicle:
84                for f in active:
85                    f.write(v)
86    for f in filters:
87        f.close()