dms-client/data_meta/GetMetaInfo.py
2022-02-28 13:51:30 +08:00

2025 lines
98 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import io
from xml.dom import minidom
from osgeo import gdal
import h5py
from PIL import Image
import numpy as np
import re
import os
import sys
import time
import datetime
import tarfile
import zipfile
def exe_path():
"""
[获取exe目录]
Returns:
[str]: [exe目录]
"""
if hasattr(sys, 'frozen'):
# Handles PyInstaller
return os.path.dirname(sys.executable)
return os.path.dirname(os.path.realpath(__file__))
os.environ['PROJ_LIB'] = exe_path() + "/PROJ"
def GetTimestamp(date) :
# 转换成时间数组
timeArray = time.strptime(date, "%Y-%m-%d %H:%M:%S")
# 转换成时间戳
timestamp = time.mktime(timeArray)
return timestamp
def uint16to8(bands, lower_percent=0.001, higher_percent=99.999):
"""
拉伸图像图片16位转8位
:param bands: 输入栅格数据
:param lower_percent: 最低百分比
:param higher_percent: 最高百分比
:return:
"""
out = np.zeros_like(bands, dtype=np.uint8)
n = bands.shape[0]
for i in range(n):
a = 0 # np.min(band)
b = 255 # np.max(band)
c = np.percentile(bands[i, :, :], lower_percent)
d = np.percentile(bands[i, :, :], higher_percent)
t = a + (bands[i, :, :] - c) * (b - a) / (d - c)
t[t < a] = a
t[t > b] = b
out[i, :, :] = t
return out
def createXML(metadata, xlm_file):
"""
创建xlm文件并写入字典
:param metadata: 元数据信息
:param xlm_file: xlm文件
:return:
"""
# 创建一个空的文档
document = minidom.Document() # 创建DOM文档对象
# 创建一个根节点对象
root = document.createElement('ProductMetaData')
# 设置根节点的属性
# root.setAttribute('', '')
# 将根节点添加到文档对象中
document.appendChild(root)
# 字典转xml
for key in metadata:
# 创建父节点
node_name = document.createElement(key)
# 给父节点设置文本
node_name.appendChild(document.createTextNode(str(metadata[key])))
# 将各父节点添加到根节点
root.appendChild(node_name)
# 写入xlm文档
with open(xlm_file, 'w', encoding='utf-8') as f:
document.writexml(f, indent='\t', newl='\n', addindent='\t', encoding='utf-8')
f.close()
def GetGFPMSData(in_file, xml_path, thumbnail_ath):
"""
获取高分 PMS卫星元数据
:param thumbnail_ath:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with tarfile.open(in_file, mode='r') as tar_file:
extensions = ('MSS2_thumb.jpg', 'PAN2_thumb.jpg', 'MSS2.xml', 'PAN2.xml')
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
# 解压多光谱缩略图
if file_list[1].endswith('MSS2_thumb.jpg'):
tar_file.extract(file_list[1], thumbnail_ath)
ThumbnailPath_MSS = in_path + "/" + file_list[1]
ThumbnailName_MSS = file_list[1]
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压全色缩略图
if file_list[3].endswith("PAN2_thumb.jpg"):
tar_file.extract(file_list[3], thumbnail_ath)
ThumbnailPath_PAN = thumbnail_ath + "/" + file_list[3]
ThumbnailName_PAN = file_list[3]
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压多光谱XML文件
if file_list[0].endswith('MSS2.xml'):
# 解压XML文件
tar_file.extract(file_list[0], xml_path)
xmlPath = xml_path + "/" + file_list[0]
xmlFileName = file_list[0]
# 获取文件流
meta_file = tar_file.extractfile(file_list[0])
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
# WidthInPixels = dom.getElementsByTagName('WidthInPixels')[0].firstChild.data
# HeightInPixels = dom.getElementsByTagName('HeightInPixels')[0].firstChild.data
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
# 边界几何
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建多光谱字典
gf_mss_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
# "WidthInPixels": WidthInPixels,
# "HeightInPixels": HeightInPixels,
"ProjectedCoordinates": "",
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath_MSS,
"ThumbnailName": ThumbnailName_MSS,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压全色XML文件
if file_list[2].endswith('PAN2.xml'):
# 解压XML文件
tar_file.extract(file_list[2], xml_path)
xmlPath = xml_path + "/" + file_list[2]
xmlFileName = file_list[2]
# 获取文件流
meta_file = tar_file.extractfile(file_list[2])
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
# WidthInPixels = dom.getElementsByTagName('WidthInPixels')[0].firstChild.data
# HeightInPixels = dom.getElementsByTagName('HeightInPixels')[0].firstChild.data
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
# 边界几何
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建全色字典
gf_pan_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
# "WidthInPixels": WidthInPixels,
# "HeightInPixels": HeightInPixels,
"ProjectedCoordinates": "",
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath_PAN,
"ThumbnailName": ThumbnailName_PAN,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 关闭压缩文件
tar_file.close()
if (not gf_mss_dict) or (not gf_pan_dict):
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return gf_mss_dict, gf_pan_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetGF3MDJData(in_file, xml_path, thumbnail_path):
"""
获取高分3号MDJGF-3 MDJ卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with tarfile.open(in_file, mode='r') as tar_file:
extensions = ('.thumb.jpg', 'meta.xml')
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
# 解压缩略图
if file_list[0].endswith('.thumb.jpg'):
tar_file.extract(file_list[0], thumbnail_path)
ThumbnailPath = thumbnail_path + "/" + file_list[0]
ThumbnailName = file_list[0]
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压XML文件
if file_list[1].endswith('meta.xml'):
tar_file.extract(file_list[1], xml_path)
xmlPath = xml_path + "/" + file_list[1]
xmlFileName = file_list[1]
# 获取文件流
meta_file = tar_file.extractfile(file_list[1])
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
CollectionCode = "GF3_MDJ"
ProduceTime = dom.getElementsByTagName('productGentime')[0].firstChild.data
StartTime = dom.getElementsByTagName("imagingTime")[0].getElementsByTagName("start")[0].firstChild.data
EndTime = dom.getElementsByTagName("imagingTime")[0].getElementsByTagName("end")[0].firstChild.data
# 其他信息
ImageGSD = dom.getElementsByTagName('NominalResolution')[0].firstChild.data
# EarthModel = dom.getElementsByTagName('EarthModel')[0].firstChild.data
ProjectedCoordinates = dom.getElementsByTagName('ProjectModel')[0].firstChild.data
Bands = "1,2"
# 经纬度
TopLeftLatitude = dom.getElementsByTagName("topLeft")[0].getElementsByTagName("latitude")[
0].firstChild.data
TopLeftLongitude = dom.getElementsByTagName("topLeft")[0].getElementsByTagName("longitude")[
0].firstChild.data
TopRightLatitude = dom.getElementsByTagName("topRight")[0].getElementsByTagName("latitude")[
0].firstChild.data
TopRightLongitude = dom.getElementsByTagName("topRight")[0].getElementsByTagName("longitude")[
0].firstChild.data
BottomLeftLatitude = dom.getElementsByTagName("bottomLeft")[0].getElementsByTagName("latitude")[
0].firstChild.data
BottomLeftLongitude = dom.getElementsByTagName("bottomLeft")[0].getElementsByTagName("longitude")[
0].firstChild.data
BottomRightLatitude = dom.getElementsByTagName("bottomRight")[0].getElementsByTagName("latitude")[
0].firstChild.data
BottomRightLongitude = dom.getElementsByTagName("bottomRight")[0].getElementsByTagName("longitude")[
0].firstChild.data
# 边界几何
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建字典
gf3_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": "",
"boundaryGeomStr": boundaryGeomStr,
"Bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": ProjectedCoordinates,
'CollectionCode': CollectionCode,
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 判断字典是否为空
if not gf3_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return gf3_dict
except Exception as e:
return {"code": -1, "msg": str(e)}
def GetGF4PMIData(in_file, xml_path, thumbnail_path):
"""
获取高分4号PMIGF-4 PMI卫星元数据
PMS可见光、近红外 5个波段 50m、IRS中波红外 1个波段 400m
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with tarfile.open(in_file, mode='r') as tar_file:
extensions = ('_thumb.jpg', '.xml')
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
# 解压PMS缩略图
if file_list[2].endswith('_thumb.jpg') and file_list[2].startswith('GF4_PMS_'):
tar_file.extract(file_list[2], thumbnail_path)
ThumbnailPath_PMS = thumbnail_path + "/" + file_list[2]
ThumbnailName_PMS = file_list[2]
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压IRS缩略图
if file_list[0].endswith('_thumb.jpg') and file_list[0].startswith('GF4_IRS_'):
tar_file.extract(file_list[0], thumbnail_path)
ThumbnailPath_IRS = thumbnail_path + "/" + file_list[0]
ThumbnailName_IRS = file_list[0]
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压PMS XML文件
if file_list[3].endswith('.xml') and file_list[3].startswith('GF4_PMS_'):
# 解压XML文件
tar_file.extract(file_list[3], xml_path)
xmlPath = xml_path + "/" + file_list[3]
xmlFileName = file_list[3]
# 获取文件流
meta_file = tar_file.extractfile(file_list[3])
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
CollectionCode = "GF4_PMS"
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
# ProjectedCoordinates = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
ProjectedCoordinates = "" # 投影坐标系
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建可见光近红外(PMS)字典
gf4_pms_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
"boundaryGeomStr": boundaryGeomStr,
"Bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": ProjectedCoordinates,
'CollectionCode': CollectionCode,
"ThumbnailName": ThumbnailName_PMS,
"ThumbnailPath": ThumbnailPath_PMS,
"xmlFileName": xmlFileName,
"xmlPath": xmlPath,
"DirectoryDepth": "month"}
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 解压IRS XML文件
if file_list[1].endswith('.xml') and file_list[1].startswith('GF4_IRS_'):
# 解压XML文件
tar_file.extract(file_list[1], xml_path)
xmlPath = xml_path + "/" + file_list[1]
xmlFileName = file_list[1]
# 获取文件流
meta_file = tar_file.extractfile(file_list[1])
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
CollectionCode = "GF4_IRS"
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
# ProjectedCoordinates = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
ProjectedCoordinates = "" # 投影坐标系
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建中红外(IRS)字典
gf4_irs_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
"boundaryGeomStr": boundaryGeomStr,
"Bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": ProjectedCoordinates,
'CollectionCode': CollectionCode,
"ThumbnailName": ThumbnailName_IRS,
"ThumbnailPath": ThumbnailPath_IRS,
"xmlFileName": xmlFileName,
"xmlPath": xmlPath,
"DirectoryDepth": "month"}
else:
return {"code": -1, "msg": "找不到指定文件..."}
# 关闭压缩文件
tar_file.close()
# 判断字典是否为空
if (not gf4_pms_dict) or (not gf4_irs_dict):
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return gf4_pms_dict, gf4_irs_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetH08Data(in_file, xml_path, thumbnail_path):
"""
获取葵花8卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
# 其他信息
with h5py.File(in_file, mode='r') as f:
start_time = f['start_time'][0]
end_time = f['end_time'][0]
band_id = f['band_id'][:]
bands = ','.join(str(i) for i in band_id)
ImageGSD = '1000, 500, 2000'
# 生成缩略图
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
in_datasets = gdal.Open(in_file)
meta_data = in_datasets.GetMetadata()
# 取出子数据集
datasets = in_datasets.GetSubDatasets()
red_data = gdal.Open(datasets[7][0]).ReadAsArray()
gre_data = gdal.Open(datasets[6][0]).ReadAsArray()
blu_data = gdal.Open(datasets[5][0]).ReadAsArray()
img_data = np.array([red_data, gre_data, blu_data])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, ::-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2]:
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else:
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del in_datasets
del img_data
del img_data2
del img
# 生成XML文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
createXML(meta_data, xmlPath)
# 产品日期
date_created = meta_data['date_created']
# band_number = meta_data['band_number']
# 经纬度
upper_left_latitude = meta_data['upper_left_latitude']
upper_left_longitude = int(meta_data['upper_left_longitude']) - 180
upper_right_latitude = meta_data['upper_left_latitude']
upper_right_longitude = 200 - 180
lower_right_latitude = -60
lower_right_longitude = 200 - 180
lower_left_latitude = -60
lower_left_longitude = str(int(meta_data['upper_left_longitude']) - 180)
boundaryGeomStr = f'POLYGON(({upper_left_longitude} {upper_left_latitude},' \
f'{upper_right_longitude} {upper_right_latitude},' \
f'{lower_right_longitude} {lower_right_latitude},' \
f'{lower_left_longitude} {lower_left_latitude},' \
f'{upper_left_longitude} {upper_left_latitude}))'
# 构建字典
himawari8_dict = {"ProduceTime": date_created,
"StartTime": "",
"EndTime": "",
"CloudPercent": "",
"boundaryGeomStr": boundaryGeomStr,
"bands": bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": "",
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "day"}
# 判断字典是否为空
if not himawari8_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return himawari8_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetJPSSData(in_file, xml_path, thumbnail_path):
"""
获取联合极轨卫星系统JPSS-1元数据NOAA-20(Joint Polar Satellite System spacecraft)
:param xml_path:
:param thumbnail_path:
:param in_file:
:return: 元数据字典
"""
try:
# 生成缩略图
in_path, basename = os.path.split(in_file)
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
in_datasets = gdal.Open(in_file)
meta_data = in_datasets.GetMetadata()
# 取出子数据集
datasets = in_datasets.GetSubDatasets()
red_data = gdal.Open(datasets[0][0]).ReadAsArray()
nir_data = gdal.Open(datasets[3][0]).ReadAsArray()
swir_data = gdal.Open(datasets[9][0]).ReadAsArray()
img_data = np.array([red_data, nir_data, swir_data])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, ::-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2]:
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else:
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del in_datasets
del img_data
del img_data2
del img
# 生成XML文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
createXML(meta_data, xmlPath)
# 产品日期
ProductionTime = meta_data['ProductionTime']
StartTime = meta_data['StartTime']
EndTime = meta_data['EndTime']
# 其他信息
ImageGSD = str(meta_data['LongName']).split(" ")[-1]
Bands = str(meta_data['title']).split(" ")[1]
# 中心经纬度
productUpperLeftLat = meta_data['NorthBoundingCoordinate'] # 左上纬度
productUpperLeftLong = meta_data['WestBoundingCoordinate'] # 左上经度
productUpperRightLat = meta_data['NorthBoundingCoordinate'] # 右上纬度
productUpperRightLong = meta_data['EastBoundingCoordinate'] # 右上经度
productLowerLeftLat = meta_data['SouthBoundingCoordinate'] # 左下纬度
productLowerLeftLong = meta_data['WestBoundingCoordinate'] # 左下经度
productLowerRightLat = meta_data['SouthBoundingCoordinate'] # 右下纬度
productLowerRightLong = meta_data['EastBoundingCoordinate'] # 右下纬度
# 边界几何
boundaryGeomStr = f'POLYGON(({productUpperLeftLong} {productUpperLeftLat},' \
f'{productUpperRightLong} {productUpperRightLat},' \
f'{productLowerRightLong} {productLowerRightLat},' \
f'{productLowerLeftLong} {productLowerLeftLat},' \
f'{productUpperLeftLong} {productUpperLeftLat}))'
# 构建字典
jpss_dict = {"ProduceTime": ProductionTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": "",
# "TopLeftLatitude": productUpperLeftLat,
# "TopLeftLongitude": productUpperLeftLong,
# "TopRightLatitude": productUpperRightLat,
# "TopRightLongitude": productUpperRightLong,
# "BottomLeftLatitude": productLowerLeftLat,
# "BottomLeftLongitude": productLowerLeftLong,
# "BottomRightLatitude": productLowerRightLat,
# "BottomRightLongitude": productLowerRightLong,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": "",
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "day"}
# 判断字典是否为空
if not jpss_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
print(jpss_dict)
return jpss_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetSNPPData(in_file, xml_path, thumbnail_path):
"""
获取Suomi National Polar-orbiting PartnershipSNPP元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
# 生成缩略图
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
in_datasets = gdal.Open(in_file)
meta_data = in_datasets.GetMetadata()
# 取出子数据集
datasets = in_datasets.GetSubDatasets()
red_data = gdal.Open(datasets[0][0]).ReadAsArray()
gre_data = gdal.Open(datasets[3][0]).ReadAsArray()
blu_data = gdal.Open(datasets[9][0]).ReadAsArray()
img_data = np.array([red_data, gre_data, blu_data])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, ::-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2]:
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else:
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del in_datasets
del img_data
del img_data2
del img
# 生成XML文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
createXML(meta_data, xmlPath)
# 产品日期
ProductionTime = meta_data['ProductionTime']
StartTime = meta_data['StartTime']
EndTime = meta_data['EndTime']
# 其他信息
ImageGSD = str(meta_data['LongName']).split(" ")[-1][:-1]
Bands = str(meta_data['title'])
# 中心经纬度
productUpperLeftLat = meta_data['NorthBoundingCoordinate'] # 左上纬度
productUpperLeftLong = meta_data['WestBoundingCoordinate'] # 左上经度
productUpperRightLat = meta_data['NorthBoundingCoordinate'] # 右上纬度
productUpperRightLong = meta_data['EastBoundingCoordinate'] # 右上经度
productLowerLeftLat = meta_data['SouthBoundingCoordinate'] # 左下纬度
productLowerLeftLong = meta_data['WestBoundingCoordinate'] # 左下经度
productLowerRightLat = meta_data['SouthBoundingCoordinate'] # 右下纬度
productLowerRightLong = meta_data['EastBoundingCoordinate'] # 右下纬度
boundaryGeomStr = f'POLYGON(({productUpperLeftLong} {productUpperLeftLat},' \
f'{productUpperRightLong} {productUpperRightLat},' \
f'{productLowerRightLong} {productLowerRightLat},' \
f'{productLowerLeftLong} {productLowerLeftLat},' \
f'{productUpperLeftLong} {productUpperLeftLat}))'
# 构建字典
snpp_dict = {"ProductionTime": ProductionTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": "",
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": "",
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "day"}
# 判断字典是否为空
if not snpp_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return snpp_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetSentinel1Data(in_file, xml_path, thumbnail_path):
"""
获取哨兵1卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with zipfile.ZipFile(in_file, mode='r') as zip_file:
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
for member in zip_file.namelist():
if re.match(r'[0-9a-zA-Z\_]+.SAFE/annotation/s1a-iw-grd-vv[0-9a-z\-]+.xml', member):
# 输出xml文件
meta_data = zip_file.read(member)
with open(xmlPath, "wb") as fout:
fout.write(meta_data)
# 产品日期
meta_content = zip_file.open(member)
dom = minidom.parse(meta_content)
ProduceTime = dom.getElementsByTagName('qualityInformation')[
0].getElementsByTagName('qualityDataList')[
0].getElementsByTagName('qualityData')[
0].getElementsByTagName('azimuthTime')[
0].firstChild.data
StartTime = dom.getElementsByTagName('adsHeader')[0].getElementsByTagName('startTime')[
0].firstChild.data
StopTime = dom.getElementsByTagName('adsHeader')[0].getElementsByTagName('stopTime')[
0].firstChild.data
elif re.match(r'[0-9a-zA-Z\_]+.SAFE/preview/map-overlay.kml', member):
# 读取其他信息
meta_content = zip_file.open(member)
dom = minidom.parse(meta_content)
coordinates = dom.getElementsByTagName('coordinates')[0].firstChild.data
# 经纬度
lon_lat = re.split(r'\s', coordinates)
TopLeftLatitude = re.split(r'\,', lon_lat[0])[1] # 左上纬度
TopLeftLongitude = re.split(r'\,', lon_lat[0])[0] # 左上经度
TopRightLatitude = re.split(r'\,', lon_lat[1])[1] # 右上纬度
TopRightLongitude = re.split(r'\,', lon_lat[1])[0] # 右上经度
BottomRightLatitude = re.split(r'\,', lon_lat[2])[1] # 右下纬度
BottomRightLongitude = re.split(r'\,', lon_lat[2])[0] # 右下经度
BottomLeftLatitude = re.split(r'\,', lon_lat[3])[1] # 左下纬度
BottomLeftLongitude = re.split(r'\,', lon_lat[3])[0] # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
elif re.match(r'[0-9a-zA-Z\_]+.SAFE/preview/quick-look.png', member):
# 输出缩略图
thumb_data = zip_file.read(member)
with open(ThumbnailPath, "wb") as fout:
fout.write(thumb_data)
else:
continue
# 生成字典
S1_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"StopTime": StopTime,
"CloudPercent": "",
"boundaryGeomStr": boundaryGeomStr,
"bands": "Amplitude_VH,Intensity_VH,Amplitude_VV,Intensity_VV",
# "NumberBands": "",
"ImageGSD": "10",
"ProjectedCoordinates": '',
"CollectionCode": '',
"ThumbnailName": ThumbnailName,
"ThumbnailPath": ThumbnailPath,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
zip_file.close()
if not S1_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return S1_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetSentinel2Data(in_file, xml_path, thumbnail_path):
"""
获取哨兵2卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with zipfile.ZipFile(in_file, 'r', zipfile.ZIP_DEFLATED) as zip_file:
extensions = ('_B02.jp2', '_B03.jp2', '_B04.jp2', '.SAFE/MTD_MSIL1C')
file_list = [file for file in zip_file.namelist() if file.endswith(extensions)]
file_list.sort()
# 生成缩略图
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
# bgr_data = ['/vsizip/%s/%s' % (in_file, file) for file in file_list[:3]]
# out_vrt = '/vsimem/stacked.vrt' # 波段合成输出虚拟路径
# # 将多个源文件合成为一个VRTvirtual gdal dataset文件
# out_dataset = gdal.BuildVRT(out_vrt, bgr_data, separate=True)
# # 将VRT文件转换为目标格式的图像
# gdal.Translate(ThumbnailPath,
# out_dataset,
# format='JPEG',
# outputType=gdal.GDT_Byte,
# widthPct=10,
# heightPct=10,
# creationOptions=["TILED=YES", "COMPRESS=LZW"])
# # 释放内存
# # gdal.GetDriverByName("VRT").Delete('/vsimem/stacked.vrt')
# gdal.Unlink('/vsimem/stacked.vrt')
# del out_dataset
rgb_list = []
for file in file_list[:3]:
sub_dataset = gdal.Open('/vsizip/%s/%s' % (in_file, file))
sub_array = sub_dataset.ReadAsArray()
rgb_list.append(sub_array)
img_data = np.array([rgb_list[2], rgb_list[1], rgb_list[0]])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, ::-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2]:
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else:
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del rgb_list
del img_data
del img_data2
del img
# 解压多光谱XML文件
if file_list[3].endswith('.SAFE/MTD_MSIL1C'):
# 生成XML文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
meta_data = zip_file.read(file_list[3])
with open(xmlPath, "wb") as fout:
fout.write(meta_data)
# 读取其他信息
meta_content = zip_file.open(file_list[3])
dom = minidom.parse(meta_content)
cloud_percent = dom.getElementsByTagName('n1:Quality_Indicators_Info')[
0].getElementsByTagName('Cloud_Coverage_Assessment')[0].firstChild.data
ImageGSD = '10, 20, 60'
ProjectedCoordinates = dom.getElementsByTagName('n1:Geometric_Info')[
0].getElementsByTagName('Coordinate_Reference_System')[
0].getElementsByTagName('GEO_TABLES')[0].firstChild.data
# 产品日期
ProduceTime = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('GENERATION_TIME')[0].firstChild.data
StartTime = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('PRODUCT_START_TIME')[0].firstChild.data
StopTime = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('PRODUCT_STOP_TIME')[0].firstChild.data
# 经纬度
lon_lat = dom.getElementsByTagName('n1:Geometric_Info')[0].getElementsByTagName('Product_Footprint')[
0].getElementsByTagName('Product_Footprint')[0].getElementsByTagName('Global_Footprint')[
0].getElementsByTagName('EXT_POS_LIST')[0].firstChild.data
TopLeftLatitude = re.split(r'\s', lon_lat)[0] # 左上纬度
TopLeftLongitude = re.split(r'\s', lon_lat)[1] # 左上经度
TopRightLatitude = re.split(r'\s', lon_lat)[2] # 右上纬度
TopRightLongitude = re.split(r'\s', lon_lat)[3] # 右上经度
BottomRightLatitude = re.split(r'\s', lon_lat)[4] # 右下纬度
BottomRightLongitude = re.split(r'\s', lon_lat)[5] # 右下经度
BottomLeftLatitude = re.split(r'\s', lon_lat)[6] # 左下纬度
BottomLeftLongitude = re.split(r'\s', lon_lat)[7] # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 生成字典
S2_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"StopTime": StopTime,
"CloudPercent": cloud_percent,
"boundaryGeomStr": boundaryGeomStr,
"bands": "1,2,3,4,5,6,7,8,9,10,11,12",
# "NumberBands": '12',
"ImageGSD": ImageGSD,
"ProjectedCoordinates": ProjectedCoordinates,
"CollectionCode": '',
"ThumbnailName": ThumbnailName,
"ThumbnailPath": ThumbnailPath,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
zip_file.close()
if not S2_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return S2_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetSentinel3OLData(in_file, xml_path, thumbnail_path):
"""
获取哨兵3 OLCI海陆色度计卫星元数据有待修改
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
in_path, basename = os.path.split(in_file)
with zipfile.ZipFile(in_file, 'r', zipfile.ZIP_DEFLATED) as zip_file:
extensions = ('Oa03_radiance.nc', 'Oa05_radiance.nc', 'Oa08_radiance.nc', 'xfdumanifest.xml')
file_list = [file for file in zip_file.namelist() if file.endswith(extensions)]
# 生成缩略图
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
rgb_list = []
for file in file_list[:3]:
sub_dataset = gdal.Open('/vsizip/%s/%s' % (in_file, file))
sub_array = sub_dataset.ReadAsArray()
rgb_list.append(sub_array)
img_data = np.array([rgb_list[2], rgb_list[1], rgb_list[0]])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, ::-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2]:
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else:
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del rgb_list
del img_data
del img_data2
del img
# 解压XML文件
if file_list[3].endswith('xfdumanifest.xml'):
# 生成XML文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
meta_data = zip_file.read(file_list[3])
with open(xmlPath, "wb") as fout:
fout.write(meta_data)
# 读取其他信息
CollectionCode = "Sentinel3_OLCI_L1"
meta_content = zip_file.open(file_list[3])
dom = minidom.parse(meta_content)
ProjectedCoordinates = ""
CloudPercent = ""
# 产品日期
ProduceTime = dom.getElementsByTagName('sentinel3:creationTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('sentinel3:receivingStartTime')[0].firstChild.data
StopTime = dom.getElementsByTagName('sentinel3:receivingStopTime')[0].firstChild.data
# 经纬度
TopLeftLatitude = dom.getElementsByTagName('sentinel-safe:y')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('sentinel-safe:x')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('sentinel-safe:y')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('sentinel-safe:x')[2].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('sentinel-safe:y')[2].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('sentinel-safe:x')[2].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('sentinel-safe:y')[2].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('sentinel-safe:x')[0].firstChild.data # 左下经度
# boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
# f'{TopRightLongitude} {TopRightLatitude},' \
# f'{BottomRightLongitude} {BottomRightLatitude},' \
# f'{BottomLeftLongitude} {BottomLeftLatitude},' \
# f'{TopLeftLongitude} {TopLeftLatitude}))'
# 生成字典
S3_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"StopTime": StopTime,
"CloudPercent": CloudPercent,
"boundaryGeomStr": "",
"bands": "Oa01,Oa02,Oa03,Oa04,Oa05,Oa06,Oa07,Oa08,Oa09,Oa10,Oa11,Oa12,Oa13,Oa14,Oa15,Oa16,"
"Oa17,Oa18,Oa19,Oa20,Oa21",
# "NumberBands": '21',
"ImageGSD": "270,294",
"ProjectedCoordinates": ProjectedCoordinates,
"CollectionCode": CollectionCode,
"ThumbnailName": ThumbnailName,
"ThumbnailPath": ThumbnailPath,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
zip_file.close()
if not S3_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return S3_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetHJ1Data(in_file, xml_path, thumbnail_path):
"""
获取环境1号HJ-1卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
with tarfile.open(in_file, mode='r') as tar_file:
in_path, basename = os.path.split(in_file)
for member in tar_file.getnames():
if member.endswith("THUMB.JPG"):
# 解压缩略图
tar_file.extract(member, thumbnail_path)
ThumbnailPath = thumbnail_path + "/" + member
ThumbnailName = member.split('/')[1]
elif member.endswith(".XML"):
# 解压XML文件
tar_file.extract(member, xml_path)
xmlPath = xml_path + "/" + member
xmlFileName = member.split('/')[1]
# 获取文件流
meta_file = tar_file.extractfile(member)
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
productDate = dom.getElementsByTagName('productDate')[0].firstChild.data
imagingStartTime = dom.getElementsByTagName('imagingStartTime')[0].firstChild.data # 开始时间
imagingStopTime = dom.getElementsByTagName('imagingStopTime')[0].firstChild.data # 结束时间
# 其他信息
pixelSpacing = dom.getElementsByTagName('pixelSpacing')[0].firstChild.data # 分辨率
# earthModel = dom.getElementsByTagName('earthModel')[0].firstChild.data # 投影
mapProjection = dom.getElementsByTagName('mapProjection')[0].firstChild.data # 投影坐标系
# zone = dom.getElementsByTagName('zone')[0].firstChild.data # 带号
bands = dom.getElementsByTagName('bands')[0].firstChild.data # 波段
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('productUpperLeftLat')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('productUpperLeftLong')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('productUpperRightLat')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('productUpperRightLong')[0].firstChild.data # 右上经度
BottomLeftLatitude = dom.getElementsByTagName('productLowerLeftLat')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('productLowerLeftLong')[0].firstChild.data # 左下经度
BottomRightLatitude = dom.getElementsByTagName('productLowerRightLat')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('productLowerRightLong')[0].firstChild.data # 右下纬度
# 边界几何
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
else:
continue
# 构建字典
hj1_dict = {"ProductTime": productDate,
"StartTime": imagingStartTime,
"EndTime": imagingStopTime,
"CloudPercent": "",
# "TopLeftLatitude": TopLeftLatitude,
# "TopLeftLongitude": TopLeftLongitude,
# "TopRightLatitude": TopRightLatitude,
# "TopRightLongitude": TopRightLongitude,
# "BottomLeftLatitude": BottomLeftLatitude,
# "BottomLeftLongitude": BottomLeftLongitude,
# "BottomRightLatitude": BottomRightLatitude,
# "BottomRightLongitude": BottomRightLongitude,
"boundaryGeomStr": boundaryGeomStr,
"bands": bands,
"ImageGSD": pixelSpacing,
"ProjectedCoordinates": mapProjection,
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
# 关闭压缩文件
tar_file.close()
# 判断字典是否为空
if not hj1_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return hj1_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetZY02CData(in_file, xml_path, thumbnail_path):
"""
获取资源2ZY-2卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
zy2_mux_dict, zy2_pan_dict = dict(), dict()
in_path, basename = os.path.split(in_file)
with tarfile.open(in_file, mode='r') as tar_file:
for member in tar_file.getnames():
if member.endswith("MUX_thumb.jpg"):
# 解压多光谱缩略图
tar_file.extract(member, thumbnail_path)
ThumbnailPath_MUX = thumbnail_path + "/" + member
ThumbnailName_MUX = member
elif member.endswith("PAN_thumb.jpg"):
# 解压全色缩略图
tar_file.extract(member, thumbnail_path)
ThumbnailPath_PAN = thumbnail_path + "/" + member
ThumbnailName_PAN = member
if member.endswith('MUX.xml'):
# 解压XML文件
tar_file.extract(member, xml_path)
xmlPath = xml_path + "/" + member
xmlFileName = member
# 获取文件流
meta_file = tar_file.extractfile(member)
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
MapProjection = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
EarthEllipsoid = dom.getElementsByTagName('EarthEllipsoid')[0].firstChild.data # 地理坐标系
ZoneNo = dom.getElementsByTagName('ZoneNo')[0].firstChild.data # 投影分带带号
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
# 几何边界
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建多光谱字典
zy2_mux_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
# "TopLeftLatitude": TopLeftLatitude,
# "TopLeftLongitude": TopLeftLongitude,
# "TopRightLatitude": TopRightLatitude,
# "TopRightLongitude": TopRightLongitude,
# "BottomRightLatitude": BottomRightLatitude,
# "BottomRightLongitude": BottomRightLongitude,
# "BottomLeftLatitude": BottomLeftLatitude,
# "BottomLeftLongitude": BottomLeftLongitude,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": MapProjection,
'CollectionCode': "",
"ThumbnailPath": ThumbnailPath_MUX,
"ThumbnailName": ThumbnailName_MUX,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "day"}
elif member.endswith('PAN.xml'):
# 解压XML文件
tar_file.extract(member, xml_path)
xmlPath = xml_path + "/" + member
xmlFileName = member
# 获取文件流
meta_file = tar_file.extractfile(member)
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
MapProjection = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
EarthEllipsoid = dom.getElementsByTagName('EarthEllipsoid')[0].firstChild.data # 地理坐标系
ZoneNo = dom.getElementsByTagName('ZoneNo')[0].firstChild.data # 投影分带带号
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
# 几何边界
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建全色字典
zy2_pan_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
# "TopLeftLatitude": TopLeftLatitude,
# "TopLeftLongitude": TopLeftLongitude,
# "TopRightLatitude": TopRightLatitude,
# "TopRightLongitude": TopRightLongitude,
# "BottomRightLatitude": BottomRightLatitude,
# "BottomRightLongitude": BottomRightLongitude,
# "BottomLeftLatitude": BottomLeftLatitude,
# "BottomLeftLongitude": BottomLeftLongitude,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": MapProjection,
'CollectionCode': "",
"ThumbnailPath": ThumbnailPath_PAN,
"ThumbnailName": ThumbnailName_PAN,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "day"}
else:
continue
# 关闭压缩文件
tar_file.close()
# 判断字典是否为空
if (not zy2_mux_dict) or (not zy2_pan_dict):
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return zy2_mux_dict, zy2_pan_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetZY3Data(in_file, xml_path, thumbnail_path):
"""
获取资源3ZY-3卫星元数据
:param thumbnail_path:
:param xml_path:
:param in_file:
:return: 元数据字典
"""
try:
zy3_dict = dict()
with tarfile.open(in_file, mode='r') as tar_file:
in_path, basename = os.path.split(in_file)
for member in tar_file.getnames():
if member.endswith("thumb.jpg"):
# 解压缩略图
tar_file.extract(member, thumbnail_path)
ThumbnailPath = thumbnail_path + "/" + member
ThumbnailName = member
if not member.endswith('.xml'):
continue
elif member.endswith('order.xml'):
continue
else:
# 解压XML文件
tar_file.extract(member, xml_path)
xmlPath = xml_path + "/" + member
xmlFileName = member
# 获取文件流
meta_file = tar_file.extractfile(member)
meta_content = meta_file.read()
dom = minidom.parse(io.StringIO(meta_content.decode("utf-8")))
# 产品日期
ProduceTime = dom.getElementsByTagName('ProduceTime')[0].firstChild.data
StartTime = dom.getElementsByTagName('StartTime')[0].firstChild.data
EndTime = dom.getElementsByTagName('EndTime')[0].firstChild.data
# 其他信息
ImageGSD = dom.getElementsByTagName('ImageGSD')[0].firstChild.data # 分辨率
MapProjection = dom.getElementsByTagName('MapProjection')[0].firstChild.data # 投影坐标系
EarthEllipsoid = dom.getElementsByTagName('EarthEllipsoid')[0].firstChild.data # 地理坐标系
ZoneNo = dom.getElementsByTagName('ZoneNo')[0].firstChild.data # 投影分带带号
Bands = dom.getElementsByTagName('Bands')[0].firstChild.data # 波段
CloudPercent = dom.getElementsByTagName('CloudPercent')[0].firstChild.data # 云覆盖
# 中心经纬度
TopLeftLatitude = dom.getElementsByTagName('TopLeftLatitude')[0].firstChild.data # 左上纬度
TopLeftLongitude = dom.getElementsByTagName('TopLeftLongitude')[0].firstChild.data # 左上经度
TopRightLatitude = dom.getElementsByTagName('TopRightLatitude')[0].firstChild.data # 右上纬度
TopRightLongitude = dom.getElementsByTagName('TopRightLongitude')[0].firstChild.data # 右上经度
BottomRightLatitude = dom.getElementsByTagName('BottomRightLatitude')[0].firstChild.data # 右下纬度
BottomRightLongitude = dom.getElementsByTagName('BottomRightLongitude')[0].firstChild.data # 右下经度
BottomLeftLatitude = dom.getElementsByTagName('BottomLeftLatitude')[0].firstChild.data # 左下纬度
BottomLeftLongitude = dom.getElementsByTagName('BottomLeftLongitude')[0].firstChild.data # 左下经度
# 边界几何
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 构建字典
zy3_dict = {"ProduceTime": ProduceTime,
"StartTime": StartTime,
"EndTime": EndTime,
"CloudPercent": CloudPercent,
# "TopLeftLatitude": TopLeftLatitude,
# "TopLeftLongitude": TopLeftLongitude,
# "TopRightLatitude": TopRightLatitude,
# "TopRightLongitude": TopRightLongitude,
# "BottomRightLatitude": BottomRightLatitude,
# "BottomRightLongitude": BottomRightLongitude,
# "BottomLeftLatitude": BottomLeftLatitude,
# "BottomLeftLongitude": BottomLeftLongitude,
"boundaryGeomStr": boundaryGeomStr,
"bands": Bands,
"ImageGSD": ImageGSD,
"ProjectedCoordinates": MapProjection,
"CollectionCode": "",
"ThumbnailPath": ThumbnailPath,
"ThumbnailName": ThumbnailName,
"xmlPath": xmlPath,
"xmlFileName": xmlFileName,
"DirectoryDepth": "month"}
# 关闭压缩文件
tar_file.close()
# 判断是否为空
if not zy3_dict:
return {"code": -1, "msg": "没有满足条件的数据字典..."}
return zy3_dict
except Exception as e:
print(str(e))
return {"code": -1, "msg": str(e)}
def GetLandsatData(in_file, thumbnail_path, txt_path) :
try :
in_path, basename = os.path.split(in_file)
with tarfile.open(in_file, mode='r') as tar_file :
extensions = ('_B2.TIF', '_B3.TIF', '_B4.TIF', '_MTL.txt')
file_list = [file for file in tar_file.getnames() if file.endswith(extensions)]
file_list.sort()
# 生成缩略图
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
rgb_list = []
for file in file_list[:3] :
path = '/vsitar/%s/%s' % (in_file, file)
sub_dataset = gdal.Open(path)
sub_array = sub_dataset.ReadAsArray()
rgb_list.append(sub_array)
img_data = np.array([rgb_list[2], rgb_list[1], rgb_list[0]])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, : :-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2] :
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else :
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del rgb_list
del img_data
del img_data2
del img
Landsat_dict = {}
if file_list[3].endswith('_MTL.txt') :
# 读取信息
TxTPath = os.path.join(txt_path, file_list[3])
tar_file.extract(file_list[3], txt_path)
fr = open(TxTPath, 'r')
dic = {}
keys = [] # 用来存储读取的顺序
for line in fr :
if '=' in line :
v = line.strip().split(' = ')
dic[v[0]] = v[1]
keys.append(v[0])
fr.close()
# 读取信息
cloud_percent = dic['CLOUD_COVER']
ImageGSD = '30'
ProjectedCoordinates = dic['MAP_PROJECTION']
# 产品日期
FILE_DATE = dic['FILE_DATE']
FILE_DATE = FILE_DATE.split("Z")[0].replace("T", " ")
ProduceTime = str(GetTimestamp(FILE_DATE))
# 经纬度
TopLeftLatitude = dic['CORNER_UL_LAT_PRODUCT'] # 左上纬度
TopLeftLongitude = dic['CORNER_UL_LON_PRODUCT'] # 左上经度
TopRightLatitude = dic['CORNER_UR_LAT_PRODUCT'] # 右上纬度
TopRightLongitude = dic['CORNER_UR_LON_PRODUCT'] # 右上经度
BottomRightLatitude = dic['CORNER_LR_LAT_PRODUCT'] # 右下纬度
BottomRightLongitude = dic['CORNER_LR_LON_PRODUCT'] # 右下经度
BottomLeftLatitude = dic['CORNER_LL_LAT_PRODUCT'] # 左下纬度
BottomLeftLongitude = dic['CORNER_LL_LON_PRODUCT'] # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 生成字典
Landsat_dict = {"ProduceTime" : ProduceTime,
"StartTime" : "",
"StopTime" : "",
"CloudPercent" : cloud_percent,
"boundaryGeomStr" : boundaryGeomStr,
# "bands" : "1,2,3,4,5,6,7,8,9,10,11,12",
"ImageGSD" : ImageGSD,
# "ProjectedCoordinates" : ProjectedCoordinates,
# "CollectionCode" : '',
"ThumbnailName" : ThumbnailName,
"ThumbnailPath" : ThumbnailPath,
"xmlPath" : TxTPath,
"xmlFileName" : file_list[3],
"DirectoryDepth" : "month"}
if not Landsat_dict :
return {"code" : -1, "msg" : "没有满足条件的数据字典..."}
return Landsat_dict
except Exception as e :
print(str(e))
return {"code" : -1, "msg" : str(e)}
def GetSentinel2Data(in_file, xml_path, thumbnail_path) :
"""
获取哨兵2卫星元数据
:param thumbnail_path:
:param in_file:
:return: 元数据字典
"""
try :
in_path, basename = os.path.split(in_file)
with zipfile.ZipFile(in_file, 'r', zipfile.ZIP_DEFLATED) as zip_file :
extensions = ('_B02.jp2', '_B03.jp2', '_B04.jp2', 'MTD_MSIL1C.xml')
file_list = [file for file in zip_file.namelist() if file.endswith(extensions)]
file_list.sort()
# 生成缩略图
ThumbnailName = os.path.splitext(basename)[0] + "_thumb.jpg"
ThumbnailPath = os.path.join(thumbnail_path, ThumbnailName)
rgb_list = []
for file in file_list[:3] :
sub_dataset = gdal.Open('/vsizip/%s/%s' % (in_file, file))
sub_array = sub_dataset.ReadAsArray()
rgb_list.append(sub_array)
img_data = np.array([rgb_list[2], rgb_list[1], rgb_list[0]])
img_data = uint16to8(img_data)
# Array转Image
img_data2 = np.transpose(img_data, (1, 2, 0))
img_data2 = img_data2[:, :, : :-1]
img = Image.fromarray(img_data2)
# 压缩图片大小
if img_data.shape[1] > img_data.shape[2] :
width = 512
height = int(width / img_data.shape[1] * img_data.shape[2])
else :
height = 512
width = int(height / img_data.shape[1] * img_data.shape[2])
img.thumbnail((width, height))
img.save(ThumbnailPath, "PNG")
# 释放内存
del rgb_list
del img_data
del img_data2
del img
S2_dict = {}
if file_list[3].endswith('MTD_MSIL1C.xml') :
# 生成xml文件
xmlFileName = os.path.splitext(basename)[0] + ".xml"
xmlPath = os.path.join(xml_path, xmlFileName)
meta_data = zip_file.read(file_list[3])
with open(xmlPath, "wb") as fout :
fout.write(meta_data)
# 读取信息
meta_content = zip_file.open(file_list[3])
dom = minidom.parse(meta_content)
cloud_percent = dom.getElementsByTagName('n1:Quality_Indicators_Info')[
0].getElementsByTagName('Cloud_Coverage_Assessment')[0].firstChild.data
ImageGSD = '10, 20, 60'
# 产品日期
GENERATION_TIME = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('GENERATION_TIME')[0].firstChild.data
GENERATION_TIME = GENERATION_TIME.split(".")[0].replace("T", " ")
ProduceTime = str(GetTimestamp(GENERATION_TIME))
PRODUCT_START_TIME = \
dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('PRODUCT_START_TIME')[0].firstChild.data
PRODUCT_START_TIME = PRODUCT_START_TIME.split(".")[0].replace("T", " ")
StartTime = str(GetTimestamp(PRODUCT_START_TIME))
PRODUCT_STOP_TIME = dom.getElementsByTagName('n1:General_Info')[0].getElementsByTagName('Product_Info')[
0].getElementsByTagName('PRODUCT_STOP_TIME')[0].firstChild.data
PRODUCT_STOP_TIME = PRODUCT_STOP_TIME.split(".")[0].replace("T", " ")
StopTime = str(GetTimestamp(PRODUCT_STOP_TIME))
# 经纬度
lon_lat = dom.getElementsByTagName('n1:Geometric_Info')[0].getElementsByTagName('Product_Footprint')[
0].getElementsByTagName('Product_Footprint')[0].getElementsByTagName('Global_Footprint')[
0].getElementsByTagName('EXT_POS_LIST')[0].firstChild.data
TopLeftLatitude = re.split(r'\s', lon_lat)[0] # 左上纬度
TopLeftLongitude = re.split(r'\s', lon_lat)[1] # 左上经度
TopRightLatitude = re.split(r'\s', lon_lat)[2] # 右上纬度
TopRightLongitude = re.split(r'\s', lon_lat)[3] # 右上经度
BottomRightLatitude = re.split(r'\s', lon_lat)[4] # 右下纬度
BottomRightLongitude = re.split(r'\s', lon_lat)[5] # 右下经度
BottomLeftLatitude = re.split(r'\s', lon_lat)[6] # 左下纬度
BottomLeftLongitude = re.split(r'\s', lon_lat)[7] # 左下经度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
# 生成字典
S2_dict = {"ProduceTime" : ProduceTime,
"StartTime" : StartTime,
"StopTime" : StopTime,
"CloudPercent" : cloud_percent,
"boundaryGeomStr" : boundaryGeomStr,
"ImageGSD" : ImageGSD,
"ThumbnailName" : ThumbnailName,
"ThumbnailPath" : ThumbnailPath,
"xmlPath" : file_list[3],
"xmlFileName" : xmlFileName,
"DirectoryDepth" : "month"}
if not S2_dict :
return {"code" : -1, "msg" : "没有满足条件的数据字典..."}
return S2_dict
except Exception as e :
print(str(e))
return {"code" : -1, "msg" : str(e)}
def GetModisData(in_file) :
"""
获取MODIS卫星元数据
:param in_file:
:return: 元数据字典
"""
try :
datasets = gdal.Open(in_file)
# 获取hdf中的元数据
Metadata = datasets.GetMetadata()
# 获取信息
PRODUCTIONDATETIME = Metadata["PRODUCTIONDATETIME"]
PRODUCTIONDATETIME = PRODUCTIONDATETIME.split(".")[0].replace("T", " ")
ProductionTime = str(GetTimestamp(PRODUCTIONDATETIME))
RANGEBEGINNINGDATE = Metadata["RANGEBEGINNINGDATE"] + " " + Metadata["RANGEBEGINNINGTIME"]
StartTime = str(GetTimestamp(RANGEBEGINNINGDATE))
RANGEENDINGDATE = Metadata["RANGEENDINGDATE"] + " " + Metadata["RANGEENDINGTIME"]
EndTime = str(GetTimestamp(RANGEENDINGDATE))
Latitudes = Metadata["GRINGPOINTLATITUDE.1"] # 获取四个角的维度
LatitudesList = Latitudes.split(", ") # 采用", "进行分割
Longitude = Metadata["GRINGPOINTLONGITUDE.1"] # 获取四个角的经度
LongitudeList = Longitude.split(", ") # 采用", "进行分割
TopLeftLatitude = LatitudesList[0]
TopLeftLongitude = LongitudeList[0]
TopRightLatitude = LatitudesList[1]
TopRightLongitude = LongitudeList[1]
BottomRightLatitude = LatitudesList[2]
BottomRightLongitude = LongitudeList[2]
BottomLeftLatitude = LatitudesList[3]
BottomLeftLongitude = LongitudeList[3] # 获取经纬度
boundaryGeomStr = f'POLYGON(({TopLeftLongitude} {TopLeftLatitude},' \
f'{TopRightLongitude} {TopRightLatitude},' \
f'{BottomRightLongitude} {BottomRightLatitude},' \
f'{BottomLeftLongitude} {BottomLeftLatitude},' \
f'{TopLeftLongitude} {TopLeftLatitude}))'
DirectoryDepth = Metadata["DAYNIGHTFLAG"]
modis_dict = {"ProduceTime" : ProductionTime,
"StartTime" : StartTime,
"EndTime" : EndTime,
"ImageGSD" : "",
"CloudPercent" : "0",
'boundaryGeomStr' : boundaryGeomStr,
"ThumbnailPath" : "",
"ThumbnailName" : "",
"xmlPath" : "",
"xmlFileName" : "",
"DirectoryDepth" : DirectoryDepth
}
if not modis_dict :
return {"code" : -1, "msg" : "没有满足条件的数据字典..."}
return modis_dict
except Exception as e :
print(str(e))
return {"code" : -1, "msg" : str(e)}
def GetGOCIData(in_file) :
"""
获取GOCI卫星元数据
:param in_file:
:return: 元数据字典
"""
try :
# 读取信息
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
in_datasets = gdal.Open(in_file)
meta_data = in_datasets.GetMetadata()
# 产品日期
date_created = meta_data['HDFEOS_POINTS_Ephemeris_Scene_center_time']
date_created = date_created.split(".")[0]
date_created = str(datetime.datetime.strptime(date_created, "%d-%b-%Y %H:%M:%S"))
ProductionTime = str(GetTimestamp(date_created))
start_time = meta_data['HDFEOS_POINTS_Ephemeris_Scene_Start_time']
start_time = start_time.split(".")[0]
start_time = str(datetime.datetime.strptime(start_time, "%d-%b-%Y %H:%M:%S"))
StartTime = str(GetTimestamp(start_time))
end_time = meta_data['HDFEOS_POINTS_Ephemeris_Scene_end_time']
end_time = end_time.split(".")[0]
end_time = str(datetime.datetime.strptime(end_time, "%d-%b-%Y %H:%M:%S"))
EndTime = str(GetTimestamp(end_time))
ImageGSD = meta_data['HDFEOS_POINTS_Scene_Header_pixel_spacing'].split(" ")[0]
# 经纬度
upper_left_latitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_upper-left_latitude']
upper_left_longitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_upper-left_longitude']
upper_right_latitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_upper-right_latitude']
upper_right_longitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_upper-right_longitude']
lower_right_latitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_lower-right_latitude']
lower_right_longitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_lower-right_longitude']
lower_left_latitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_lower-left_latitude']
lower_left_longitude = meta_data['HDFEOS_POINTS_Scene_Header_Scene_lower-left_longitude']
boundaryGeomStr = f'POLYGON(({upper_left_longitude} {upper_left_latitude},' \
f'{upper_right_longitude} {upper_right_latitude},' \
f'{lower_right_longitude} {lower_right_latitude},' \
f'{lower_left_longitude} {lower_left_latitude},' \
f'{upper_left_longitude} {upper_left_latitude}))'
# 构建字典
GOCI_dict = {"ProduceTime" : ProductionTime,
"StartTime" : StartTime,
"EndTime" : EndTime,
"CloudPercent" : "0",
"boundaryGeomStr" : boundaryGeomStr,
"ImageGSD" : ImageGSD,
"ThumbnailPath" : "",
"ThumbnailName" : "",
"xmlPath" : "",
"xmlFileName" : "",
"DirectoryDepth" : "day"}
# 判断字典是否为空
if not GOCI_dict :
return {"code" : -1, "msg" : "没有满足条件的数据字典..."}
return GOCI_dict
except Exception as e :
print(str(e))
return {"code" : -1, "msg" : str(e)}
def GetGK2BData(in_file) :
"""
获取GK2B_GOCI卫星元数据
:param in_file:
:return: 元数据字典
"""
try :
# 读取信息
gdal.SetConfigOption("GDAL_FILENAME_IS_UTF8", "YES")
in_datasets = gdal.Open(in_file)
meta_data = in_datasets.GetMetadata()
# 产品日期
produc_time = meta_data['NC_GLOBAL#slot_acquisition_time'].replace("_", "")
produc_time = produc_time[0 :4] + "-" + produc_time[4 :6] + "-" + produc_time[6 :8] + " " + \
produc_time[8 :10] + ":" + produc_time[10 :12] + ":" + produc_time[12 :14]
ProductionTime = str(GetTimestamp(produc_time))
start_time = meta_data['NC_GLOBAL#observation_start_time'].replace("_", "")
start_time = start_time[0 :4] + "-" + start_time[4 :6] + "-" + start_time[6 :8] + " " + \
start_time[8 :10] + ":" + start_time[10 :12] + ":" + start_time[12 :14]
StartTime = str(GetTimestamp(start_time))
end_time = meta_data['NC_GLOBAL#observation_end_time'].replace("_", "")
end_time = end_time[0 :4] + "-" + end_time[4 :6] + "-" + end_time[6 :8] + " " + \
end_time[8 :10] + ":" + end_time[10 :12] + ":" + end_time[12 :14]
EndTime = str(GetTimestamp(end_time))
# 其他信息
ImageGSD = meta_data['NC_GLOBAL#geospatial_lat_resolution'].split(" ")[0]
# 中心经纬度
productUpperLeftLat = meta_data['NC_GLOBAL#image_upperleft_latitude'] # 左上纬度
productUpperLeftLong = meta_data['NC_GLOBAL#image_upperleft_longitude'] # 左上经度
productUpperRightLat = meta_data['NC_GLOBAL#image_upperleft_latitude'] # 右上纬度
productUpperRightLong = meta_data['NC_GLOBAL#image_lowerright_longitude'] # 右上经度
productLowerLeftLat = meta_data['NC_GLOBAL#image_lowerright_latitude'] # 左下纬度
productLowerLeftLong = meta_data['NC_GLOBAL#image_upperleft_longitude'] # 左下经度
productLowerRightLat = meta_data['NC_GLOBAL#image_lowerright_latitude'] # 右下纬度
productLowerRightLong = meta_data['NC_GLOBAL#image_lowerright_longitude'] # 右下经度
# 边界几何
boundaryGeomStr = f'POLYGON(({productUpperLeftLong} {productUpperLeftLat},' \
f'{productUpperRightLong} {productUpperRightLat},' \
f'{productLowerRightLong} {productLowerRightLat},' \
f'{productLowerLeftLong} {productLowerLeftLat},' \
f'{productUpperLeftLong} {productUpperLeftLat}))'
# 构建字典
GK2B_dict = {"ProduceTime" : ProductionTime,
"StartTime" : StartTime,
"EndTime" : EndTime,
"CloudPercent" : "0",
"boundaryGeomStr" : boundaryGeomStr,
"ImageGSD" : ImageGSD,
"ThumbnailPath" : "",
"ThumbnailName" : "",
"xmlPath" : "",
"xmlFileName" : "",
"DirectoryDepth" : "day"}
# 判断字典是否为空
if not GK2B_dict :
return {"code" : -1, "msg" : "没有满足条件的数据字典..."}
return GK2B_dict
except Exception as e :
print(str(e))
return {"code" : -1, "msg" : str(e)}
if __name__ == '__main__':
# HJ1FilePath = r"Y:\不同传感器数据\HJ-1\HJ1A-CCD2-450-80-20090501-L20000106616.tar.gz"
# JPSSFilePath = r"Y:\不同传感器数据\JPSS\VJ102IMG.A2021159.0542.002.2021159094907.nc"
# ZY2FilePath = r"Y:\不同传感器数据\ZY-2\ZY02C_PMS_E115.9_N36.2_20120422_L2C0000391981.tar.gz"
# ZY3FilePath = r"Y:\不同传感器数据\ZY-3\ZY3_MUX_E83.3_N43.3_20120405_L2A0000301226.tar.gz"
#
# S1FilePath = r'Y:\不同传感器数据\SENTINEL-1\S1A_IW_GRDH_1SDV_20210407T095634_20210407T095659_037343_046675_8E66.zip'
# S2FilePath = r"D:\1Company\Python\RS_data_Dowload\Google_Download_New\data\S2B_MSIL1C_20210113T024049_N0209_R089_T51STB_20210113T041228.zip"
# GF1PMSPath = r'Y:\不同传感器数据\GF-1\GF1_PMS2_E104.1_N36.6_20210308_L1A0005524847.tar.gz'
# H08FilePath = r"Y:\不同传感器数据\葵花8\NC_H08_20210802_2010_R21_FLDK.06001_06001.nc"
# SNPPFilePath = r"Y:\不同传感器数据\VIIRS\VNP02IMG.A2021182.0418.001.2021182100800.nc"
#
# GF3MDJPath = r'Y:\不同传感器数据\GF-3\GF3_MDJ_SS_024986_E120.8_N35.6_20210509_L1A_VHVV_L10005638033.tar.gz'
# GF4PMIPath = r'Y:\不同传感器数据\GF-4\GF4_PMI_E119.8_N35.3_20210908_L1A0000417337.tar.gz'
# S3OLFilePath = r'Y:\不同传感器数据\SENTINEL-3' \
# r'\S3B_OL_1_EFR____20210910T022645_20210910T022945_20210911T064342_0179_056_374_2340_LN1_O_NT_002.zip'
# S3SLFilePath = r'Y:\不同传感器数据\SENTINEL-3' \
# r'\S3A_SL_1_RBT____20210916T020956_20210916T021256_20210917T120953_0179_076_217_2340_LN2_O_NT_004.zip'
# 读取 HJ-1 元数据
# hj1_dic = GetHJ1Data(HJ1FilePath)
# print(hj1_dic)
# 读取 JPSS 元数据
# jpss_dic = GetJPSSData(JPSSFilePath)
# print(jpss_dic)
# 读取 ZY2 元数据
# zy2_mux_dic, zy2_pan_dic = GetZY02CData(ZY2FilePath)
# print(zy2_mux_dic)
# print(zy2_pan_dic)
# 读取 ZY3 元数据
# zy3_dic = GetZY3Data(ZY3FilePath)
# print(zy3_dic)
# 读取GF-PMS元数据
# pms_mss_dic, pms_pan_dic = GetGFPMSData(GF1PMSPath)
# print(pms_mss_dic)
# print(pms_pan_dic)
# 读取葵花8元数据
# h8_dic = GetH08Data(H08FilePath)
# print(h8_dic)
# 读取 S2 元数据
# xml_path = r"D:\1Company\Python\RS_data_Dowload\Google_Download_New\data"
# thumbnail_path = r"D:\1Company\Python\RS_data_Dowload\Google_Download_New\data"
# s2_dic = GetSentinel2Data(S2FilePath, xml_path, thumbnail_path)
# print(s2_dic)
# 读取 S1 元数据
# s1_dic = GetSentinel1Data(S1FilePath)
# print(s1_dic)
# 读取 SNPP 元数据
# snpp_dic = GetSNPPData(SNPPFilePath)
# print(snpp_dic)
# 读取 GF3 元数据
# gf3_dic = GetGF3MDJData(GF3MDJPath)
# print(gf3_dic)
# 读取 GF4 元数据
# gf4_pms_dic, gf4_irs_dic = GetGF4PMIData(GF4PMIPath)
# print(gf4_pms_dic)
# print(gf4_irs_dic)
# 读取 S3 OL元数据
# s3ol_dic = GetSentinel3OLData(S3OLFilePath)
# print(s3ol_dic)
# # 读取 S3 SL元数据
# s3sl_dic = GetSentinel3SLData(S3SLFilePath)
# print(s3sl_dic)
# 读取 S2 元数据
S2FilePath = r"F:\test\Sentinel\S2A_MSIL1C_20220102T031131_N0301_R075_T50SLG_20220102T050158.SAFE.zip"
xml_path = r"F:\test\Sentinel"
thumbnail_path = r"F:\test\Sentinel"
s2_dic = GetSentinel2Data(S2FilePath, xml_path, thumbnail_path)
print(s2_dic)
# 读取Landsat 8 数据
LandsatFilePath = r"F:\test\USGS_data\landsat_8_c1\LC81220342021355LGN00.tar.gz"
thumbnail_path1 = r"F:\test\USGS_data\landsat_8_c1"
txt_path = thumbnail_path1
Landsat_dic = GetLandsatData(LandsatFilePath, thumbnail_path1, txt_path)
print(Landsat_dic)
# 读取MODIS数据
MODIS_path = r"F:\test\MODIS\MOD11A1\MOD11A1.A2022038.h26v05.006.2022039112456.hdf" # MOD11A1数据信息读取
MODIS = GetModisData(MODIS_path)
print(MODIS)
# 读取GOCI数据
GOCI_FilePath = r"F:\test\GOCI_Data\GOCI_L2\2020-01-01\COMS_GOCI_L2A_GA_20200101001642.CDOM.he5"
GOCI = GetGOCIData(GOCI_FilePath)
print(GOCI)
# 读取GK2B_GOCI数据
GK2B_FilePath = r"F:\test\GOCI_Data\GK2_GC2_L2\AC\GK2B_GOCI2_L2_20220208_011530_LA_S010_AC.nc"
GK2B = GetGK2BData(GK2B_FilePath)
print(GK2B)