599 rivejä
20 KiB
Python
599 rivejä
20 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
#
|
|
"""
|
|
geo.gps (Geo-Positioning)
|
|
=========================
|
|
|
|
**Author:**
|
|
|
|
* Dirk Alders <sudo-dirk@mount-mockery.de>
|
|
|
|
**Description:**
|
|
|
|
This module is a submodule of :mod:`geo` and includes functions and classes for geographic issues (e.g. coordinate, area, tracks, ...).
|
|
|
|
**Contentlist:**
|
|
|
|
* :class:`geo.gps.area`
|
|
* :class:`geo.gps.coordinate`
|
|
* :class:`geo.gps.tracklist`
|
|
* :class:`geo.gps.track`
|
|
|
|
**Unittest:**
|
|
|
|
See also the :download:`unittest <../../geo/_testresults_/unittest.pdf>` documentation.
|
|
"""
|
|
|
|
import calendar
|
|
import math
|
|
import time
|
|
import xml.parsers.expat
|
|
|
|
|
|
class area(object):
|
|
"""
|
|
:param coord1: Corner Coordinate 1
|
|
:type coord1: coordinate
|
|
:param coord2: Corner Coordinate 2
|
|
:type coord2: coordinate
|
|
|
|
Class to store a geographic area and support some calculations.
|
|
|
|
**Example:**
|
|
|
|
.. code-block:: python
|
|
|
|
>>> import geo
|
|
|
|
>>> ar = geo.gps.area(...)
|
|
"""
|
|
def __init__(self, coord1, coord2):
|
|
min_lon = min(coord1[coordinate.LONGITUDE], coord2[coordinate.LONGITUDE])
|
|
max_lon = max(coord1[coordinate.LONGITUDE], coord2[coordinate.LONGITUDE])
|
|
min_lat = min(coord1[coordinate.LATITUDE], coord2[coordinate.LATITUDE])
|
|
max_lat = max(coord1[coordinate.LATITUDE], coord2[coordinate.LATITUDE])
|
|
self.coord1 = coordinate(lon=min_lon, lat=min_lat)
|
|
self.coord2 = coordinate(lon=max_lon, lat=max_lat)
|
|
|
|
def _max_lat(self):
|
|
return self.coord2[coordinate.LATITUDE]
|
|
|
|
def _max_lon(self):
|
|
return self.coord2[coordinate.LONGITUDE]
|
|
|
|
def _min_lat(self):
|
|
return self.coord1[coordinate.LATITUDE]
|
|
|
|
def _min_lon(self):
|
|
return self.coord1[coordinate.LONGITUDE]
|
|
|
|
def center_pos(self):
|
|
"""
|
|
.. warning:: Needs sphinx documentation!
|
|
"""
|
|
clon = (self.coord1[coordinate.LONGITUDE] + self.coord2[coordinate.LONGITUDE]) / 2
|
|
clat = (self.coord1[coordinate.LATITUDE] + self.coord2[coordinate.LATITUDE]) / 2
|
|
return coordinate(lon=clon, lat=clat)
|
|
|
|
def coordinate_in_area(self, coord):
|
|
"""
|
|
.. warning:: Needs sphinx documentation!
|
|
"""
|
|
lon = coord[coordinate.LONGITUDE]
|
|
lat = coord[coordinate.LATITUDE]
|
|
return lon >= self._min_lon() and lon <= self._max_lon() and lat >= self._min_lat() and lat <= self._max_lat()
|
|
|
|
def corner_coordinates(self):
|
|
"""
|
|
.. warning:: Needs sphinx documentation!
|
|
"""
|
|
return self.coord1, self.coord2
|
|
|
|
def extend_area(self, coord):
|
|
"""
|
|
.. warning:: Needs sphinx documentation!
|
|
"""
|
|
if coord[coordinate.LONGITUDE] < self.coord1[coordinate.LONGITUDE]:
|
|
self.coord1[coordinate.LONGITUDE] = coord[coordinate.LONGITUDE]
|
|
elif coord[coordinate.LONGITUDE] > self.coord2[coordinate.LONGITUDE]:
|
|
self.coord2[coordinate.LONGITUDE] = coord[coordinate.LONGITUDE]
|
|
|
|
if coord[coordinate.LATITUDE] < self.coord1[coordinate.LATITUDE]:
|
|
self.coord1[coordinate.LATITUDE] = coord[coordinate.LATITUDE]
|
|
elif coord[coordinate.LATITUDE] > self.coord2[coordinate.LATITUDE]:
|
|
self.coord2[coordinate.LATITUDE] = coord[coordinate.LATITUDE]
|
|
|
|
def osm_map(self, map_code):
|
|
"""
|
|
:param map_code: Map code as defined in :class:`geo.osm` (e.g. :class:`geo.osm.MAP_STANDARD`)
|
|
|
|
This Method returns a :class:`geo.osm.map` instance.
|
|
|
|
.. warning:: Needs sphinx documentation!
|
|
"""
|
|
# TODO: needs to be implemented
|
|
pass
|
|
|
|
def __str__(self):
|
|
return "%s / %s" % self.coordinates()
|
|
|
|
|
|
class coordinate(dict):
|
|
"""
|
|
:param lon: Londitude
|
|
:type lon: float
|
|
:param lat: Latitude
|
|
:type lat: float
|
|
:param height: Height
|
|
:type height: float
|
|
:param time: Time (Seconds since 1970)
|
|
:type time: int
|
|
|
|
Class to store a geographic coodinate and support some calculations.
|
|
|
|
**Example:**
|
|
|
|
.. code-block:: python
|
|
|
|
>>> import geo
|
|
|
|
>>> ab = geo.gps.coordinate(lat=49.976596,lon=9.1481443)
|
|
>>> gb = geo.gps.coordinate(lat=53.6908298,lon=12.1583252)
|
|
>>> ab.dist_to(gb) / 1000
|
|
462.3182843470017
|
|
>>> ab.angle_to(gb) / math.pi * 180
|
|
39.02285256685333
|
|
"""
|
|
LATITUDE = 'lat'
|
|
LONGITUDE = 'lon'
|
|
HIGHT = 'hight'
|
|
TIME = 'time'
|
|
|
|
def __init__(self, **kwargs):
|
|
dict.__init__(self, **kwargs)
|
|
|
|
def __str__(self):
|
|
def to_string(lon_or_lat, plus_minus=('N', 'S')):
|
|
degrees = int(lon_or_lat)
|
|
lon_or_lat -= degrees
|
|
minutes = lon_or_lat * 60
|
|
pm = 0 if degrees >= 0 else 1
|
|
return "%d°%.4f%s" % (abs(degrees), abs(minutes), plus_minus[pm])
|
|
lon = self.get(self.LONGITUDE)
|
|
lat = self.get(self.LATITUDE)
|
|
if lon is not None and lat is not None:
|
|
return to_string(lat) + ' ' + to_string(lon, ['E', 'W'])
|
|
else:
|
|
return None
|
|
|
|
def angle_to(self, coord):
|
|
"""
|
|
This Method calculates the geographic direction in radiant from this to the given coordinate.
|
|
|
|
.. note:: North is 0 (turning right). That means east is :class:`math.pi`/2.
|
|
|
|
:param coord: Target coordinate.
|
|
:type coord: corrdinate
|
|
:returns: The geographic direction in radiant.
|
|
:rtype: int or float
|
|
"""
|
|
lat1 = coord[self.LATITUDE]
|
|
lon1 = coord[self.LONGITUDE]
|
|
lat2 = self[self.LATITUDE]
|
|
lon2 = self[self.LONGITUDE]
|
|
if lat1 is not None and lat2 is not None and lon1 is not None and lon2 is not None:
|
|
dlon = lon1 - lon2
|
|
dlat = lat1 - lat2
|
|
|
|
if dlat > 0:
|
|
# case (half circle north)
|
|
angle = math.atan(dlon / dlat)
|
|
pass
|
|
elif dlat < 0:
|
|
# case (half circle south)
|
|
angle = math.pi + math.atan(dlon / dlat)
|
|
elif dlon > 0:
|
|
# case (east)
|
|
angle = math.pi / 2
|
|
elif dlon < 0:
|
|
# case (west)
|
|
angle = math.pi * 3 / 2
|
|
else:
|
|
# same point
|
|
return None
|
|
|
|
if angle < 0:
|
|
angle += 2 * math.pi
|
|
return angle
|
|
else:
|
|
return None
|
|
|
|
def dist_to(self, coord):
|
|
"""
|
|
This Method calcultes the distance from this coordinate to a given coordinate.
|
|
|
|
:param coord: Target coordinate.
|
|
:type coord: coordinate
|
|
:return: The distance between two coordinates in meters.
|
|
:rtype: int or float
|
|
:raises: -
|
|
"""
|
|
lat1 = coord[self.LATITUDE]
|
|
lon1 = coord[self.LONGITUDE]
|
|
lat2 = self[self.LATITUDE]
|
|
lon2 = self[self.LONGITUDE]
|
|
if lat1 is not None and lat2 is not None and lon1 is not None and lon2 is not None:
|
|
R = 6378140
|
|
dLat = math.radians(lat2 - lat1)
|
|
dLon = math.radians(lon2 - lon1)
|
|
a = math.sin(dLat / 2) * math.sin(dLat / 2) + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dLon / 2) * math.sin(dLon / 2)
|
|
c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))
|
|
return R * c
|
|
else:
|
|
return None
|
|
|
|
|
|
class tracklist(list):
|
|
"""
|
|
Class to store a a list of tracks and parse xml files created by a navigation system like Etrax Vista.
|
|
|
|
**Example:**
|
|
|
|
.. code-block:: python
|
|
|
|
>>> import geo
|
|
|
|
>>> ...
|
|
"""
|
|
def __init__(self):
|
|
list.__init__(self)
|
|
self.__xml_track_on_read = None
|
|
self.__xml_data_on_read = ''
|
|
|
|
def __xml_start_element(self, name, attrs):
|
|
self.__xml_data_on_read = ''
|
|
if name == 'trk':
|
|
# new track found in file
|
|
self.__xml_track_on_read = track()
|
|
elif name == 'trkpt':
|
|
# new waypoint to append
|
|
if 'lon' in attrs and 'lat' in attrs:
|
|
self.__xml_track_on_read.append(coordinate(lon=float(attrs['lon']), lat=float(attrs['lat'])))
|
|
|
|
def __xml_end_element(self, name):
|
|
if name == 'trk':
|
|
if self.__xml_track_on_read is not None and len(self.__xml_track_on_read) > 0:
|
|
self.append(self.__xml_track_on_read)
|
|
self.__xml_track_on_read = None
|
|
elif name == 'name':
|
|
if self.__xml_data_on_read != '':
|
|
self.__xml_track_on_read.set_name(self.__xml_data_on_read)
|
|
elif name == 'ele':
|
|
c = self.__xml_track_on_read[len(self.__xml_track_on_read) - 1]
|
|
c[c.HIGHT] = float(self.__xml_data_on_read)
|
|
elif name == 'time':
|
|
c = self.__xml_track_on_read[len(self.__xml_track_on_read) - 1]
|
|
c[c.TIME] = int(calendar.timegm(time.strptime(self.__xml_data_on_read, '%Y-%m-%dT%H:%M:%SZ')))
|
|
|
|
def __xml_char_data(self, data):
|
|
self.__xml_data_on_read += data
|
|
|
|
def load_from_file(self, xmlfilehandle):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
# TODO: implement either usage of a filename or filehandle
|
|
# parse xml-handle
|
|
p = xml.parsers.expat.ParserCreate()
|
|
p.StartElementHandler = self.__xml_start_element
|
|
p.EndElementHandler = self.__xml_end_element
|
|
p.CharacterDataHandler = self.__xml_char_data
|
|
p.ParseFile(xmlfilehandle)
|
|
|
|
|
|
class track(list):
|
|
"""
|
|
Class to store a a tracks and support some calculations.
|
|
|
|
**Example:**
|
|
|
|
.. code-block:: python
|
|
|
|
>>> import geo
|
|
|
|
>>> ...
|
|
"""
|
|
def __init__(self):
|
|
self._name = None
|
|
list.__init__(self)
|
|
self.__init_state_variables()
|
|
|
|
def __init_state_variables(self):
|
|
self._area = None
|
|
self._average_speed = None
|
|
self._hightcharacteristic = None
|
|
self._end_date = None
|
|
self._optimized_track = None
|
|
self._passed_hight = None
|
|
self._speedcharacteristic = None
|
|
self._start_date = None
|
|
self._total_distance = None
|
|
self._total_time = None
|
|
|
|
def append(self, coord):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
list.append(self, coord)
|
|
self.__init_state_variables()
|
|
|
|
def extend(self, *args, **kwargs):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
self.__init_state_variables()
|
|
return list.extend(self, *args, **kwargs)
|
|
|
|
def insert(self, index, coord):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
list.insert(self, index, coord)
|
|
self.__init_state_variables()
|
|
|
|
def set_name(self, name):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
self._name = name
|
|
|
|
def area(self):
|
|
"""
|
|
:rtype: geo.gps.area or None
|
|
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
if self._area is None:
|
|
if len(self) > 1:
|
|
self._area = area()
|
|
for c in self:
|
|
self._area.extend_area(c)
|
|
return self._area
|
|
|
|
def average_speed(self):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
if self._average_speed is None:
|
|
self._average_speed = self.total_distance() / self.total_time()
|
|
return self._average_speed
|
|
|
|
def hightcharacteristic(self):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
if self._hightcharacteristic is None:
|
|
pass # TODO: implement functionality
|
|
return self._hightcharacteristic
|
|
|
|
def end_date(self):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
if self._end_date is None:
|
|
if len(self) > 0:
|
|
self._end_date = self[len(self) - 1].get(coordinate.TIME)
|
|
return self._end_date
|
|
|
|
def name(self):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
return self._name
|
|
|
|
def optimized_track(self):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
# TODO: REWORK TRACK OPTIMIZATION
|
|
DIST_MOVED = 15. # 15m
|
|
ACCELERATION_MAX = 0.5 * 9.81 # 0.5g
|
|
ANGLE_DIF_FOR_BACKWARDS = 10 # +/- 5deg
|
|
MAX_DELETED_POINTS = 30
|
|
|
|
def backwards_direction(l_angle, t_angle):
|
|
dang = l_angle - t_angle
|
|
if dang < 0:
|
|
dang += math.degrees(math.pi * 2)
|
|
if dang > math.degrees(math.pi) - ANGLE_DIF_FOR_BACKWARDS / 2 and dang < math.degrees(math.pi) + ANGLE_DIF_FOR_BACKWARDS / 2:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
if self._optimized_track is None:
|
|
self._optimized_track = track()
|
|
self._optimized_track.set_name(self._name)
|
|
del_lst = []
|
|
for coord in self:
|
|
if len(self._optimized_track) == 0:
|
|
# first item has to be added always
|
|
self._optimized_track.append(coord)
|
|
else:
|
|
last = self._optimized_track[-1:][0]
|
|
#try:
|
|
acc = coord.dist_to(last) / ((coord[coordinate.TIME] - last[coordinate.TIME]) ** 2)
|
|
#except:
|
|
# acc = 0.0
|
|
if len(self._optimized_track) > 1:
|
|
# calculate last angle
|
|
last_angle = self._optimized_track[-2:-1][0].angle_to(last)
|
|
if last_angle is not None:
|
|
last_angle = math.degrees(last_angle)
|
|
# calculate this angle
|
|
this_angle = last.angle_to(coord)
|
|
if this_angle is not None:
|
|
this_angle = math.degrees(this_angle)
|
|
else:
|
|
last_angle = this_angle = None
|
|
if coord.dist_to(last) > DIST_MOVED:
|
|
# distance ok.
|
|
if acc < ACCELERATION_MAX:
|
|
# acceleration ok.
|
|
if this_angle is None or last_angle is None or not backwards_direction(last_angle, this_angle):
|
|
# direction ok
|
|
del_lst = []
|
|
self._optimized_track.append(coord)
|
|
else:
|
|
del_lst.append(coord)
|
|
#print "this one was in backwards direction (%d)!" % (len(del_lst))
|
|
#print " %.2fkm: last = %.1f\xc2\xb0 - this = %.1f\xc2\xb0" % (self.total_distance()/1000., last_angle, this_angle)
|
|
else:
|
|
del_lst.append(coord)
|
|
#print "this one was with to high acceleration (%d)!" % (len(del_lst))
|
|
#print " %.2fkm: %.1fm/s\xc2\xb2" % (self.total_distance()/1000., acc)
|
|
if len(del_lst) >= MAX_DELETED_POINTS:
|
|
print("Moeglicherweise ist die Optimierung des Tracks noch nicht ausgereift genug.")
|
|
print(" Bei %.1f km gab es %d Koordinaten die aussortiert worden waeren. Optimierung ausgelassen." % (self.get_total_distance() / 1000., MAX_DELETED_POINTS))
|
|
self._optimized_track.extend(del_lst)
|
|
del_lst = []
|
|
return self._optimized_track
|
|
|
|
def osm_map(self, map_code):
|
|
return self.area().osm_map(map_code)
|
|
|
|
def passed_hight(self):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
if self._passed_hight is None:
|
|
if len(self) > 0:
|
|
self._passed_hight = 0.0
|
|
hight = self[0][coordinate.HIGHT]
|
|
if hight is not None:
|
|
for c in self:
|
|
last_hight = hight
|
|
# hysteresis of 1 meter
|
|
hightlist = [c[coordinate.HIGHT] - 1, hight, c[coordinate.HIGHT] + 1]
|
|
hightlist.sort()
|
|
hight = hightlist[1]
|
|
if hight > last_hight:
|
|
self._passed_hight += hight - last_hight
|
|
return self._passed_hight
|
|
|
|
def speedcharacteristic(self):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
if self._speedcharacteristic is None:
|
|
pass # TODO: implement functionality
|
|
return self._speedcharacteristic
|
|
|
|
def start_date(self):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
if self._start_date is None:
|
|
if len(self) > 0:
|
|
self._start_date = self[0].get(coordinate.TIME)
|
|
return self._start_date
|
|
|
|
def total_distance(self):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
if self._total_distance is None:
|
|
if len(self) > 0:
|
|
self._total_distance = 0.
|
|
for i in range(0, len(self) - 1):
|
|
self._total_distance += self[i].dist_to(self[i + 1])
|
|
return self._total_distance
|
|
|
|
def total_time(self):
|
|
"""
|
|
.. warning:: Needs to be documented
|
|
"""
|
|
if self._total_time is None:
|
|
try:
|
|
self._total_time = self.end_date() - self.start_date()
|
|
except TypeError:
|
|
pass # doing nothing will return None as needed, if calculation is not possible
|
|
return self._total_time
|
|
|
|
|
|
'''
|
|
class gpxmanipu():
|
|
debug=False
|
|
class trackmanipu():
|
|
def __init__(self):
|
|
self.lines=[]
|
|
def AddLine(self, line):
|
|
self.lines.append(line)
|
|
def SetName(self, name):
|
|
SEARCHFORSTARTTAG=0
|
|
SEARCHFORENDTAG=1
|
|
state=SEARCHFORSTARTTAG
|
|
for i in range(0, len(self.lines)):
|
|
if state==SEARCHFORSTARTTAG:
|
|
if self.lines[i].find('<name>')>0:
|
|
newline=self.lines[i][:self.lines[i].find('<name>')+6]
|
|
newline+=name
|
|
state=SEARCHFORENDTAG
|
|
if state==SEARCHFORENDTAG:
|
|
if self.lines[i].find('</name>')>0:
|
|
self.lines[i]=newline+self.lines[i][self.lines[i].find('</name>'):]
|
|
else:
|
|
self.lines.remove(self.lines[i])
|
|
def GetTrack(self):
|
|
rv=''
|
|
for line in self.lines:
|
|
rv+=line
|
|
return rv
|
|
def __init__(self, fh):
|
|
HEADERSEARCH=0
|
|
TRACKSEARCH=1
|
|
state=HEADERSEARCH
|
|
self.header=''
|
|
self.tracks=[]
|
|
for line in fh:
|
|
if state==HEADERSEARCH:
|
|
if line.lstrip().startswith('<trk>'):
|
|
state=TRACKSEARCH
|
|
track=self.trackmanipu()
|
|
track.AddLine(line)
|
|
else:
|
|
self.header+=line
|
|
elif state==TRACKSEARCH:
|
|
if line.lstrip().startswith('</trk>'):
|
|
track.AddLine(line)
|
|
self.tracks.append(track)
|
|
track=self.trackmanipu()
|
|
else:
|
|
track.AddLine(line)
|
|
self.footer=track.GetTrack() # This was no track
|
|
del(track)
|
|
if self.debug:
|
|
print "Header found:"
|
|
print self.header[:20]+' ... '+self.header[-20:]
|
|
print str(len(self.tracks))+' tracks found:'
|
|
for track in self.tracks:
|
|
track=track.GetTrack()
|
|
print track[:20]+' ... '+track[-20:]
|
|
print "Footer found:"
|
|
print self.footer
|
|
def GetGpx(self):
|
|
rv=self.header
|
|
for track in self.tracks:
|
|
rv+=track.GetTrack()
|
|
rv+=self.footer
|
|
return rv
|
|
def SetTrackname(self, number, name):
|
|
if len(self.tracks)>number-1:
|
|
self.tracks[number].SetName(name)
|
|
def DeleteTrack(self, number):
|
|
if len(self.tracks)>number-1:
|
|
return self.tracks.pop(number)
|
|
else:
|
|
return None
|
|
'''
|