基于Python的矢量与卫星影像重叠区域提取

基于Python的矢量与卫星影像重叠区域提取
ytkz原始版本
功能概述
该脚本通过以下步骤实现矢量范围与卫星影像的有效区域叠加分析:
1.数据输入:读取矢量边界(.shp
)与卫星影像(.tif
)。
2.栅格矢量化:提取卫星影像的非空值区域(有效覆盖范围),将其转为多边形矢量。
3.坐标系对齐:统一矢量与栅格的坐标参考系统(CRS)。
4.空间叠加分析:计算矢量与影像有效区域的空间交集(Overlay Intersection)。
5.成果输出:将重叠区域保存为新的矢量文件。
代码
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2025/2/26 20:24
# @File : overlay_tif_shp.py
'''
'''
from osgeo import gdal, ogr
import numpy as np
import rasterio
from rasterio.features import shapes
import geopandas as gpd
from shapely.geometry import shape
import numpy as np
tif_file = "F:\20240618_L1A13918818001-PAN1_uint8.tif" # 替换为你的 TIFF 文件路径
shp_file = r"F:\ansu.shp" # 替换为你的 SHP 文件路径
output_shp = r"F:\8001-PAN1_uint8.shp" # 输出 SHP 文件路径
# 1. 读取 SHP 文件
gdf_shp = gpd.read_file(shp_file)
# 2. 读取 TIFF 文件并转换为矢量多边形
with rasterio.open(tif_file) as src:
# 获取栅格数据和 NoData 值
raster_data = src.read(1) # 读取第一个波段
nodata = src.nodata
# 创建掩膜:非 NoData 区域为 1,其他为 0
mask = (raster_data != nodata).astype(np.uint8)
# 将掩膜转换为多边形
raster_polygons = []
for geom, value in shapes(mask, transform=src.transform):
if value == 1: # 只保留有效区域
raster_polygons.append(shape(geom))
# 3. 将栅格多边形转为 GeoDataFrame
gdf_raster = gpd.GeoDataFrame(geometry=raster_polygons, crs=src.crs)
# 4. 确保两个数据的坐标系一致
if gdf_shp.crs != gdf_raster.crs:
gdf_shp = gdf_shp.to_crs(gdf_raster.crs)
# 5. 计算重叠区域(交集)
overlap = gdf_shp.overlay(gdf_raster, how="intersection")
# 6. 保存结果到新的 SHP 文件
overlap.to_file(output_shp)
print(f"重叠区域已保存至: {output_shp}")
改进
然而,我的经验告诉我,尽量少使用geopandas.
from osgeo import gdal, ogr, osr
import numpy as np
def overlay_tif_shp(tif_path: str, shp_path: str, output_shp_path: str, block_size: int = 1024):
"""
使用 GDAL/OGR 和 NumPy 将 TIFF 栅格文件和 SHP 矢量文件叠加,生成重叠区域的 SHP 文件。
按块处理以减少内存使用。
参数:
tif_path (str): 输入 TIFF 文件路径
shp_path (str): 输入 SHP 文件路径
output_shp_path (str): 输出重叠区域 SHP 文件路径
block_size (int): 分块处理的大小(默认 1024x1024 像素)
"""
# 1. 打开 TIFF 文件
raster_ds = gdal.Open(tif_path)
if raster_ds is None:
raise ValueError(f"无法打开 TIFF 文件: {tif_path}")
band = raster_ds.GetRasterBand(1)
nodata = band.GetNoDataValue() or 0 # 默认 0,适用于 uint8
# 获取 TIFF 尺寸
width, height = raster_ds.RasterXSize, raster_ds.RasterYSize
# 2. 创建临时内存图层
mem_driver = ogr.GetDriverByName("Memory")
temp_ds = mem_driver.CreateDataSource("temp")
raster_srs = osr.SpatialReference()
raster_wkt = raster_ds.GetProjectionRef()
if raster_wkt:
raster_srs.ImportFromWkt(raster_wkt)
else:
raise ValueError(f"TIFF 文件 {tif_path} 缺少空间参考")
# 计算总块数
total_blocks = ((height + block_size - 1) // block_size) * ((width + block_size - 1) // block_size)
processed_blocks = 0
temp_layer = temp_ds.CreateLayer("temp", srs=raster_srs, geom_type=ogr.wkbPolygon)
temp_layer.CreateField(ogr.FieldDefn("Value", ogr.OFTInteger))
# 3. 分块读取和矢量化
for yoff in range(0, height, block_size):
ysize = min(block_size, height - yoff)
for xoff in range(0, width, block_size):
xsize = min(block_size, width - xoff)
# 读取块数据
block_data = band.ReadAsArray(xoff, yoff, xsize, ysize)
if block_data is None:
continue
# 更新进度
processed_blocks += 1
progress = (processed_blocks / total_blocks) * 100
print(f"\r处理进度: {progress:.2f}% ({processed_blocks}/{total_blocks})", end="", flush=True)
# 创建掩膜
mask = (block_data != nodata).astype(np.uint8)
# 创建临时栅格数据集用于 Polygonize
mem_raster = gdal.GetDriverByName("MEM").Create("", xsize, ysize, 1, gdal.GDT_Byte)
mem_raster.SetGeoTransform((
raster_ds.GetGeoTransform()[0] + xoff * raster_ds.GetGeoTransform()[1],
raster_ds.GetGeoTransform()[1],
0,
raster_ds.GetGeoTransform()[3] + yoff * raster_ds.GetGeoTransform()[5],
0,
raster_ds.GetGeoTransform()[5]
))
mem_raster.SetProjection(raster_wkt)
mem_band = mem_raster.GetRasterBand(1)
mem_band.WriteArray(mask)
# 矢量化当前块
gdal.Polygonize(mem_band, None, temp_layer, 0, [], callback=None)
mem_raster = None # 释放内存
# 4. 打开输入 SHP 文件
shp_ds = ogr.Open(shp_path)
if shp_ds is None:
raise ValueError(f"无法打开 SHP 文件: {shp_path}")
shp_layer = shp_ds.GetLayer()
shp_srs = shp_layer.GetSpatialRef()
if shp_srs is None:
raise ValueError(f"SHP 文件 {shp_path} 缺少空间参考")
# 5. 创建输出 SHP 文件
out_driver = ogr.GetDriverByName("ESRI Shapefile")
out_ds = out_driver.CreateDataSource(output_shp_path)
out_layer = out_ds.CreateLayer("overlap", srs=shp_srs, geom_type=ogr.wkbPolygon)
shp_layer_defn = shp_layer.GetLayerDefn()
for i in range(shp_layer_defn.GetFieldCount()):
out_layer.CreateField(shp_layer_defn.GetFieldDefn(i))
# 6. 计算重叠区域
temp_layer.ResetReading()
shp_layer.ResetReading()
for raster_feat in temp_layer:
if raster_feat.GetField("Value") != 1:
continue
raster_geom = raster_feat.GetGeometryRef()
if raster_geom is None:
continue
for shp_feat in shp_layer:
shp_geom = shp_feat.GetGeometryRef()
if shp_geom is None:
continue
if shp_srs.IsSame(raster_srs) == 0:
transform = osr.CoordinateTransformation(shp_srs, raster_srs)
shp_geom_clone = shp_geom.Clone()
shp_geom_clone.Transform(transform)
else:
shp_geom_clone = shp_geom
intersection = raster_geom.Intersection(shp_geom_clone)
if intersection and not intersection.IsEmpty():
out_feat = ogr.Feature(out_layer.GetLayerDefn())
out_feat.SetGeometry(intersection)
for i in range(shp_feat.GetFieldCount()):
out_feat.SetField(shp_feat.GetFieldDefnRef(i).GetName(), shp_feat.GetField(i))
out_layer.CreateFeature(out_feat)
out_feat = None
shp_layer.ResetReading()
# 7. 清理
temp_ds = None
shp_ds = None
out_ds = None
print(f"重叠区域已保存至: {output_shp_path}")
# 示例调用
if __name__ == "__main__":
tif_file = r"F:\uint8.tif"
shp_file = r"F:\ansu.shp"
output_shp = r"F:\uint8_.shp"
overlay_tif_shp(tif_file, shp_file, output_shp)