""" This module defines the :class:`quantized_mesh_tile.terrain.TerrainTile`.
More information about the format specification can be found here:
https://github.com/AnalyticalGraphicsInc/quantized-mesh
Reference
---------
"""
from __future__ import absolute_import, division
import gzip
import io
import os
from builtins import map, object
from collections import OrderedDict
from future import standard_library
from past.builtins import xrange
from past.utils import old_div
from . import horizon_occlusion_point as occ
from .bbsphere import BoundingSphere
from .topology import TerrainTopology
from .utils import (decodeIndices, encodeIndices, gzipFileObject, octDecode,
octEncode, packEntry, packIndices, ungzipFileObject,
unpackEntry, zigZagDecode, zigZagEncode)
standard_library.install_aliases()
# For a tile of 256px * 256px
TILEPXS = 65536
[docs]def lerp(p, q, time):
return ((1.0 - time) * p) + (time * q)
[docs]class TerrainTile(object):
"""
The main class to read and write a terrain tile.
Constructor arguments:
``west``
The longitude at the western edge of the tile. Default is `-1.0`.
``east``
The longitude at the eastern edge of the tile. Default is `1.0`.
``south``
The latitude at the southern edge of the tile. Default is `-1.0`.
``north``
The latitude at the northern edge of the tile. Default is `1.0`.
``topology``
The topology of the mesh which but be an instance of
:class:`quantized_mesh_tile.topology.TerrainTopology`. Default is `None`.
``watermask``
A water mask list (Optional). Adds rendering water effect.
The water mask list is either one byte, `[0]` for land and `[255]` for
water, either a list of 256*256 values ranging from 0 to 255.
Values in the mask are defined from north-to-south and west-to-east.
Per default no watermask is applied. Note that the water mask effect depends on
the texture of the raster layer drapped over your terrain.
Default is `[]`.
Usage examples::
from quantized_mesh_tile.terrain import TerrainTile
from quantized_mesh_tile.topology import TerrainTopology
from quantized_mesh_tile.global_geodetic import GlobalGeodetic
# The tile coordinates
x = 533
y = 383
z = 9
geodetic = GlobalGeodetic(True)
[west, south, east, north] = geodetic.TileBounds(x, y, z)
# Read a terrain tile (unzipped)
tile = TerrainTile(west=west, south=south, east=east, north=north)
tile.fromFile('mytile.terrain')
# Write a terrain tile locally from scratch (lon/lat/height)
wkts = [
'POLYGON Z ((7.3828125 44.6484375 303.3, ' +
'7.3828125 45.0 320.2, ' +
'7.5585937 44.82421875 310.2, ' +
'7.3828125 44.6484375 303.3))',
'POLYGON Z ((7.3828125 44.6484375 303.3, ' +
'7.734375 44.6484375 350.3, ' +
'7.5585937 44.82421875 310.2, ' +
'7.3828125 44.6484375 303.3))',
'POLYGON Z ((7.734375 44.6484375 350.3, ' +
'7.734375 45.0 330.3, ' +
'7.5585937 44.82421875 310.2, ' +
'7.734375 44.6484375 350.3))',
'POLYGON Z ((7.734375 45.0 330.3, ' +
'7.5585937 44.82421875 310.2, ' +
'7.3828125 45.0 320.2, ' +
'7.734375 45.0 330.3))'
]
topology = TerrainTopology(geometries=wkts)
tile = TerrainTile(topology=topology)
tile.toFile('mytile.terrain')
"""
quantizedMeshHeader = OrderedDict([
['centerX', 'd'], # 8bytes
['centerY', 'd'],
['centerZ', 'd'],
['minimumHeight', 'f'], # 4bytes
['maximumHeight', 'f'],
['boundingSphereCenterX', 'd'],
['boundingSphereCenterY', 'd'],
['boundingSphereCenterZ', 'd'],
['boundingSphereRadius', 'd'],
['horizonOcclusionPointX', 'd'],
['horizonOcclusionPointY', 'd'],
['horizonOcclusionPointZ', 'd']
])
vertexData = OrderedDict([
# 4bytes -> determines the size of the 3 following arrays
['vertexCount', 'I'],
['uVertexCount', 'H'], # 2bytes, unsigned short
['vVertexCount', 'H'],
['heightVertexCount', 'H']
])
indexData16 = OrderedDict([
['triangleCount', 'I'],
['indices', 'H']
])
indexData32 = OrderedDict([
['triangleCount', 'I'],
['indices', 'I']
])
EdgeIndices16 = OrderedDict([
['westVertexCount', 'I'],
['westIndices', 'H'],
['southVertexCount', 'I'],
['southIndices', 'H'],
['eastVertexCount', 'I'],
['eastIndices', 'H'],
['northVertexCount', 'I'],
['northIndices', 'H']
])
EdgeIndices32 = OrderedDict([
['westVertexCount', 'I'],
['westIndices', 'I'],
['southVertexCount', 'I'],
['southIndices', 'I'],
['eastVertexCount', 'I'],
['eastIndices', 'I'],
['northVertexCount', 'I'],
['northIndices', 'I']
])
ExtensionHeader = OrderedDict([
['extensionId', 'B'],
['extensionLength', 'I']
])
OctEncodedVertexNormals = OrderedDict([
['xy', 'B']
])
WaterMask = OrderedDict([
['xy', 'B']
])
BYTESPLIT = 65636
# min and max quantized values for indices
MIN = 0.0
MAX = 32767.0
# Coordinates are given in lon/lat WSG84
def __init__(self, *args, **kwargs):
self._west = kwargs.get('west', -1.0)
self._east = kwargs.get('east', 1.0)
self._south = kwargs.get('south', -1.0)
self._north = kwargs.get('north', 1.0)
self._longs = []
self._lats = []
self._heights = []
self._triangles = []
self._workingUnitLongitude = None
self._workingUnitLatitude = None
self._deltaHeight = None
self.EPSG = 4326
# Extensions
self.vLight = []
self.watermask = kwargs.get('watermask', [])
self.hasWatermask = kwargs.get('hasWatermask', bool(self.watermask))
self.header = OrderedDict()
for k in TerrainTile.quantizedMeshHeader.keys():
self.header[k] = 0.0
self.u = []
self.v = []
self.h = []
self.indices = []
self.westI = []
self.southI = []
self.eastI = []
self.northI = []
topology = kwargs.get('topology')
if topology is not None:
self.fromTerrainTopology(topology)
def __repr__(self):
msg = 'Header: %s\n' % self.header
# Output intermediate structure
msg += '\nVertexCount: %s' % len(self.u)
msg += '\nuVertex: %s' % self.u
msg += '\nvVertex: %s' % self.v
msg += '\nhVertex: %s' % self.h
msg += '\nindexDataCount: %s' % len(self.indices)
msg += '\nindexData: %s' % self.indices
msg += '\nwestIndicesCount: %s' % len(self.westI)
msg += '\nwestIndices: %s' % self.westI
msg += '\nsouthIndicesCount: %s' % len(self.southI)
msg += '\nsouthIndices: %s' % self.southI
msg += '\neastIndicesCount: %s' % len(self.eastI)
msg += '\neastIndices: %s' % self.eastI
msg += '\nnorthIndicesCount: %s' % len(self.northI)
msg += '\nnorthIndices: %s\n' % self.northI
# Output coordinates
msg += '\nNumber of triangles: %s' % (old_div(len(self.indices), 3))
msg += '\nTriangles coordinates in EPSG %s' % self.EPSG
msg += '\n%s' % self.getTrianglesCoordinates()
return msg
[docs] def getContentType(self):
"""
A method to determine the content type of a tile.
"""
baseContent = 'application/vnd.quantized-mesh'
if self.hasLighting and self.hasWatermask:
return baseContent + ';extensions=octvertexnormals-watermask'
elif self.hasLighting:
return baseContent + ';extensions=octvertexnormals'
elif self.hasWatermask:
return baseContent + ';extensions=watermask'
else:
return baseContent
[docs] def getVerticesCoordinates(self):
"""
A method to retrieve the coordinates of the vertices in lon,lat,height.
"""
self._computeVerticesCoordinates()
coordinates = []
for i, lon in enumerate(self._longs):
coordinates.append((lon, self._lats[i], self._heights[i]))
return coordinates
[docs] def getTrianglesCoordinates(self):
"""
A method to retrieve triplet of coordinates representing the triangles
in lon,lat,height.
"""
self._computeVerticesCoordinates()
triangles = []
nbTriangles = len(self.indices)
if nbTriangles % 3 != 0:
raise Exception('Corrupted tile')
for i in xrange(0, nbTriangles - 1, 3):
vi1 = self.indices[i]
vi2 = self.indices[i + 1]
vi3 = self.indices[i + 2]
triangle = (
(self._longs[vi1],
self._lats[vi1],
self._heights[vi1]),
(self._longs[vi2],
self._lats[vi2],
self._heights[vi2]),
(self._longs[vi3],
self._lats[vi3],
self._heights[vi3])
)
triangles.append(triangle)
return triangles
[docs] def _computeVerticesCoordinates(self):
"""
A private method to compute the vertices coordinates.
"""
if not self._longs:
for u in self.u:
self._longs.append(
lerp(self._west, self._east, old_div(float(u), self.MAX)))
for v in self.v:
self._lats.append(
lerp(self._south, self._north, old_div(float(v), self.MAX)))
for h in self.h:
self._heights.append(
lerp(
self.header['minimumHeight'],
self.header['maximumHeight'],
old_div(float(h), self.MAX)
)
)
[docs] def fromBytesIO(self, f, hasLighting=False, hasWatermask=False):
"""
A method to read a terrain tile content.
Arguments:
``f``
An instance of io.BytesIO containing the terrain data. (Required)
``hasLighting``
Indicate if the tile contains lighting information. Default is ``False``.
``hasWatermask``
Indicate if the tile contains watermask information. Default is ``False``.
"""
self.hasLighting = hasLighting
self.hasWatermask = hasWatermask
# Header
for k, v in TerrainTile.quantizedMeshHeader.items():
self.header[k] = unpackEntry(f, v)
# Vertices
vertexCount = unpackEntry(f, TerrainTile.vertexData['vertexCount'])
for ud in self._iterUnpackAndDecodeVertices(
f, vertexCount, TerrainTile.vertexData['uVertexCount']):
self.u.append(ud)
for vd in self._iterUnpackAndDecodeVertices(
f, vertexCount, TerrainTile.vertexData['vVertexCount']):
self.v.append(vd)
for hd in self._iterUnpackAndDecodeVertices(
f, vertexCount, TerrainTile.vertexData['heightVertexCount']):
self.h.append(hd)
# Indices
meta = TerrainTile.indexData16
if vertexCount > TerrainTile.BYTESPLIT:
meta = TerrainTile.indexData32
triangleCount = unpackEntry(f, meta['triangleCount'])
ind = [
index for index
in self._iterUnpackIndices(f, triangleCount * 3, meta['indices'])]
self.indices = decodeIndices(ind)
meta = TerrainTile.EdgeIndices16
if vertexCount > TerrainTile.BYTESPLIT:
meta = TerrainTile.indexData32
# Edges (vertices on the edge of the tile)
westIndicesCount = unpackEntry(f, meta['westVertexCount'])
for wi in self._iterUnpackIndices(f, westIndicesCount, meta['westIndices']):
self.westI.append(wi)
southIndicesCount = unpackEntry(f, meta['southVertexCount'])
for si in self._iterUnpackIndices(f, southIndicesCount, meta['southIndices']):
self.southI.append(si)
eastIndicesCount = unpackEntry(f, meta['eastVertexCount'])
for ei in self._iterUnpackIndices(f, eastIndicesCount, meta['eastIndices']):
self.eastI.append(ei)
northIndicesCount = unpackEntry(f, meta['northVertexCount'])
for ni in self._iterUnpackIndices(f, northIndicesCount, meta['northIndices']):
self.northI.append(ni)
if self.hasLighting:
# One byte of padding
# Light extension header
meta = TerrainTile.ExtensionHeader
extensionId = unpackEntry(f, meta['extensionId'])
if extensionId == 1:
extensionLength = unpackEntry(f, meta['extensionLength'])
for xy in self._iterUnpackAndDecodeLight(
f, extensionLength, TerrainTile.OctEncodedVertexNormals['xy']):
self.vLight.append(xy)
if self.hasWatermask:
meta = TerrainTile.ExtensionHeader
extensionId = unpackEntry(f, meta['extensionId'])
if extensionId == 2:
extensionLength = unpackEntry(f, meta['extensionLength'])
for row in self._iterUnpackWatermaskRow(
f, extensionLength, TerrainTile.WaterMask['xy']):
self.watermask.append(row)
data = f.read(1)
if data:
raise Exception('Should have reached end of file, but didn\'t')
[docs] @staticmethod
def _iterUnpackAndDecodeVertices(f, vertexCount, structType):
"""
A private method to itertatively unpack and decode indices.
"""
i = 0
# Delta decoding
delta = 0
while i != vertexCount:
delta += zigZagDecode(unpackEntry(f, structType))
yield delta
i += 1
[docs] @staticmethod
def _iterUnpackIndices(f, indicesCount, structType):
"""
A private method to iteratively unpack indices
"""
i = 0
while i != indicesCount:
yield unpackEntry(f, structType)
i += 1
[docs] @staticmethod
def _iterUnpackAndDecodeLight(f, extensionLength, structType):
"""
A private method to iteratively unpack light vector.
"""
i = 0
xyCount = old_div(extensionLength, 2)
while i != xyCount:
yield octDecode(
unpackEntry(
f, structType),
unpackEntry(
f, structType)
)
i += 1
[docs] @staticmethod
def _iterUnpackWatermaskRow(f, extensionLength, structType):
"""
A private method to iteratively unpack watermask rows
"""
i = 0
xyCount = 0
row = []
while xyCount != extensionLength:
row.append(unpackEntry(f, structType))
if i == 255:
yield row
i = 0
row = []
else:
i += 1
xyCount += 1
if row:
yield row
[docs] def fromFile(self, filePath, hasLighting=False, hasWatermask=False, gzipped=False):
"""
A method to read a terrain tile file. It is assumed that the tile unzipped.
Arguments:
``filePath``
An absolute or relative path to a quantized-mesh terrain tile. (Required)
``hasLighting``
Indicate if the tile contains lighting information. Default is ``False``.
``hasWatermask``
Indicate if the tile contains watermask information. Default is ``False``.
``gzipped``
Indicate if the tile content is gzipped. Default is ``False``.
"""
with open(filePath, 'rb') as f:
if gzipped:
f = ungzipFileObject(f)
self.fromBytesIO(f, hasLighting=hasLighting,
hasWatermask=hasWatermask)
[docs] def toBytesIO(self, gzipped=False):
"""
A method to write the terrain tile data to a file-like object (a string buffer).
Arguments:
``gzipped``
Indicate if the content should be gzipped. Default is ``False``.
"""
f = io.BytesIO()
self._writeTo(f)
if gzipped:
f = gzipFileObject(f)
return f
[docs] def toFile(self, filePath, gzipped=False):
"""
A method to write the terrain tile data to a physical file.
Argument:
``filePath``
An absolute or relative path to write the terrain tile. (Required)
``gzipped``
Indicate if the content should be gzipped. Default is ``False``.
"""
if os.path.isfile(filePath):
raise IOError('File %s already exists' % filePath)
if not gzipped:
with open(filePath, 'wb') as f:
self._writeTo(f)
else:
with gzip.open(filePath, 'wb') as f:
self._writeTo(f)
[docs] def _getWorkingUnitLatitude(self):
if not self._workingUnitLatitude:
self._workingUnitLatitude = old_div(self.MAX, (self._north - self._south))
return self._workingUnitLatitude
[docs] def _getWorkingUnitLongitude(self):
if not self._workingUnitLongitude:
self._workingUnitLongitude = old_div(self.MAX, (self._east - self._west))
return self._workingUnitLongitude
[docs] def _getDeltaHeight(self):
if not self._deltaHeight:
maxHeight = self.header['maximumHeight']
minHeight = self.header['minimumHeight']
self._deltaHeight = maxHeight - minHeight
return self._deltaHeight
[docs] def _quantizeLatitude(self, latitude):
return int(round((latitude - self._south) *
self._getWorkingUnitLatitude()))
[docs] def _quantizeLongitude(self, longitude):
return int(round((longitude - self._west) *
self._getWorkingUnitLongitude()))
[docs] def _quantizeHeight(self, height):
deniv = self._getDeltaHeight()
# In case a tile is completely flat
if deniv == 0:
h = 0
else:
workingUnitHeight = old_div(self.MAX, deniv)
h = int(round((height - self.header['minimumHeight']) * workingUnitHeight))
return h
[docs] def _dequantizeHeight(self, h):
"""
Private helper method to convert quantized tile (h) values to real world height
values
:param h: the quantized height value
:return: the height in ground units (meter)
"""
return lerp(self.header['minimumHeight'],
self.header['maximumHeight'],
old_div(float(h), self.MAX))
[docs] def _writeTo(self, f):
"""
A private method to write the terrain tile to a file or file-like object.
"""
# Header
for k, v in TerrainTile.quantizedMeshHeader.items():
f.write(packEntry(v, self.header[k]))
# Delta decoding
vertexCount = len(self.u)
# Vertices
f.write(packEntry(TerrainTile.vertexData['vertexCount'], vertexCount))
# Move the initial value
f.write(
packEntry(
TerrainTile.vertexData['uVertexCount'], zigZagEncode(self.u[0]))
)
for i in xrange(0, vertexCount - 1):
ud = self.u[i + 1] - self.u[i]
f.write(
packEntry(TerrainTile.vertexData['uVertexCount'], zigZagEncode(ud)))
f.write(
packEntry(
TerrainTile.vertexData['uVertexCount'], zigZagEncode(self.v[0]))
)
for i in xrange(0, vertexCount - 1):
vd = self.v[i + 1] - self.v[i]
f.write(
packEntry(TerrainTile.vertexData['vVertexCount'], zigZagEncode(vd)))
f.write(
packEntry(
TerrainTile.vertexData['uVertexCount'], zigZagEncode(self.h[0]))
)
for i in xrange(0, vertexCount - 1):
hd = self.h[i + 1] - self.h[i]
f.write(
packEntry(
TerrainTile.vertexData['heightVertexCount'], zigZagEncode(hd))
)
# Indices
meta = TerrainTile.indexData16
if vertexCount > TerrainTile.BYTESPLIT:
meta = TerrainTile.indexData32
f.write(packEntry(meta['triangleCount'],
old_div(len(self.indices), 3)))
ind = encodeIndices(self.indices)
packIndices(f, meta['indices'], ind)
meta = TerrainTile.EdgeIndices16
if vertexCount > TerrainTile.BYTESPLIT:
meta = TerrainTile.EdgeIndices32
f.write(packEntry(meta['westVertexCount'], len(self.westI)))
for wi in self.westI:
f.write(packEntry(meta['westIndices'], wi))
f.write(packEntry(meta['southVertexCount'], len(self.southI)))
for si in self.southI:
f.write(packEntry(meta['southIndices'], si))
f.write(packEntry(meta['eastVertexCount'], len(self.eastI)))
for ei in self.eastI:
f.write(packEntry(meta['eastIndices'], ei))
f.write(packEntry(meta['northVertexCount'], len(self.northI)))
for ni in self.northI:
f.write(packEntry(meta['northIndices'], ni))
# Extension header for light
if len(self.vLight) > 0:
self.hasLighting = True
meta = TerrainTile.ExtensionHeader
# Extension header ID is 1 for lightening
f.write(packEntry(meta['extensionId'], 1))
# Unsigned char size len is 1
f.write(packEntry(meta['extensionLength'], 2 * vertexCount))
metaV = TerrainTile.OctEncodedVertexNormals
for i in xrange(0, vertexCount):
x, y = octEncode(self.vLight[i])
f.write(packEntry(metaV['xy'], x))
f.write(packEntry(metaV['xy'], y))
if self.watermask:
self.hasWatermask = True
# Extension header ID is 2 for watermark
meta = TerrainTile.ExtensionHeader
f.write(packEntry(meta['extensionId'], 2))
# Extension header meta
nbRows = len(self.watermask)
if nbRows > 1:
# Unsigned char size len is 1
f.write(packEntry(meta['extensionLength'], TILEPXS))
if nbRows != 256:
raise Exception(
'Unexpected number of rows for the watermask: %s' % nbRows
)
# From North to South
for i in xrange(0, nbRows):
x = self.watermask[i]
if len(x) != 256:
raise Exception(
'Unexpected number of columns for the watermask: %s' % len(
x)
)
# From West to East
for y in x:
f.write(packEntry(TerrainTile.WaterMask['xy'], int(y)))
else:
f.write(packEntry(meta['extensionLength'], 1))
if self.watermask[0][0] is None:
self.watermask[0][0] = 0
f.write(
packEntry(TerrainTile.WaterMask['xy'], int(self.watermask[0][0])))
[docs] def fromTerrainTopology(self, topology, bounds=None):
"""
A method to prepare a terrain tile data structure.
Arguments:
``topology``
The topology of the mesh which must be an instance of
:class:`quantized_mesh_tile.topology.TerrainTopology`. (Required)
``bounds``
The bounds of a the terrain tile. (west, south, east, north)
If not defined, the bounds defined during initialization will be used.
If no bounds are provided, then the bounds
are extracted from the topology object.
"""
if not isinstance(topology, TerrainTopology):
raise Exception(
'topology object must be an instance of TerrainTopology')
# If the bounds are not provided use
# topology extent instead
if bounds is not None:
self._west = bounds[0]
self._east = bounds[2]
self._south = bounds[1]
self._north = bounds[3]
elif set([self._west, self._south, self._east, self._north]).difference(
set([-1.0, -1.0, 1.0, 1.0])):
# Bounds already defined earlier
pass
else:
# Set tile bounds
self._west = topology.minLon
self._east = topology.maxLon
self._south = topology.minLat
self._north = topology.maxLat
bSphere = BoundingSphere()
bSphere.fromPoints(topology.cartesianVertices)
ecefMinX = topology.ecefMinX
ecefMinY = topology.ecefMinY
ecefMinZ = topology.ecefMinZ
ecefMaxX = topology.ecefMaxX
ecefMaxY = topology.ecefMaxY
ecefMaxZ = topology.ecefMaxZ
# Center of the bounding box 3d
centerCoords = [
ecefMinX + (ecefMaxX - ecefMinX) * 0.5,
ecefMinY + (ecefMaxY - ecefMinY) * 0.5,
ecefMinZ + (ecefMaxZ - ecefMinZ) * 0.5
]
occlusionPCoords = occ.fromPoints(topology.cartesianVertices, bSphere)
for k in TerrainTile.quantizedMeshHeader.keys():
if k == 'centerX':
self.header[k] = centerCoords[0]
elif k == 'centerY':
self.header[k] = centerCoords[1]
elif k == 'centerZ':
self.header[k] = centerCoords[2]
elif k == 'minimumHeight':
self.header[k] = topology.minHeight
elif k == 'maximumHeight':
self.header[k] = topology.maxHeight
elif k == 'boundingSphereCenterX':
self.header[k] = bSphere.center[0]
elif k == 'boundingSphereCenterY':
self.header[k] = bSphere.center[1]
elif k == 'boundingSphereCenterZ':
self.header[k] = bSphere.center[2]
elif k == 'boundingSphereRadius':
self.header[k] = bSphere.radius
elif k == 'horizonOcclusionPointX':
self.header[k] = occlusionPCoords[0]
elif k == 'horizonOcclusionPointY':
self.header[k] = occlusionPCoords[1]
elif k == 'horizonOcclusionPointZ':
self.header[k] = occlusionPCoords[2]
# High watermark encoding performed during toFile
self.u = list(map(self._quantizeLongitude, topology.uVertex))
self.v = list(map(self._quantizeLatitude, topology.vVertex))
self.h = list(map(self._quantizeHeight, topology.hVertex))
self.indices = topology.indexData
# List all the vertices on the edge of the tile
# Use quantized values to determine if an indice belong to a tile edge
for indice in self.indices:
x = self.u[indice]
y = self.v[indice]
if x == self.MIN and indice not in self.westI:
self.westI.append(indice)
elif x == self.MAX and indice not in self.eastI:
self.eastI.append(indice)
if y == self.MIN and indice not in self.southI:
self.southI.append(indice)
elif y == self.MAX and indice not in self.northI:
self.northI.append(indice)
self.hasLighting = topology.hasLighting
if self.hasLighting:
self.vLight = topology.verticesUnitVectors