基于Python的矢量与卫星影像重叠区域提取

原始版本

功能概述

该脚本通过以下步骤实现矢量范围与卫星影像的有效区域叠加分析:

1.数据输入:读取矢量边界(.shp)与卫星影像(.tif)。

2.栅格矢量化:提取卫星影像的非空值区域(有效覆盖范围),将其转为多边形矢量。

3.坐标系对齐:统一矢量与栅格的坐标参考系统(CRS)。

4.空间叠加分析:计算矢量与影像有效区域的空间交集(Overlay Intersection)。

5.成果输出:将重叠区域保存为新的矢量文件。

代码

#!/usr/bin/env python
# -*- coding: utf-8 -*- 
# @Time : 2025/2/26 20:24 
# @File : overlay_tif_shp.py 
'''

'''
from osgeo import gdal, ogr
import numpy as np

import rasterio
from rasterio.features import shapes
import geopandas as gpd
from shapely.geometry import shape
import numpy as np

tif_file = "F:\20240618_L1A13918818001-PAN1_uint8.tif"  # 替换为你的 TIFF 文件路径
shp_file = r"F:\ansu.shp"  # 替换为你的 SHP 文件路径
output_shp = r"F:\8001-PAN1_uint8.shp"  # 输出 SHP 文件路径

# 1. 读取 SHP 文件
gdf_shp = gpd.read_file(shp_file)

# 2. 读取 TIFF 文件并转换为矢量多边形
with rasterio.open(tif_file) as src:
    # 获取栅格数据和 NoData 值
    raster_data = src.read(1)  # 读取第一个波段
    nodata = src.nodata

    # 创建掩膜:非 NoData 区域为 1,其他为 0
    mask = (raster_data != nodata).astype(np.uint8)

    # 将掩膜转换为多边形
    raster_polygons = []
    for geom, value in shapes(mask, transform=src.transform):
        if value == 1:  # 只保留有效区域
            raster_polygons.append(shape(geom))

# 3. 将栅格多边形转为 GeoDataFrame
gdf_raster = gpd.GeoDataFrame(geometry=raster_polygons, crs=src.crs)

# 4. 确保两个数据的坐标系一致
if gdf_shp.crs != gdf_raster.crs:
    gdf_shp = gdf_shp.to_crs(gdf_raster.crs)

# 5. 计算重叠区域(交集)
overlap = gdf_shp.overlay(gdf_raster, how="intersection")

# 6. 保存结果到新的 SHP 文件
overlap.to_file(output_shp)
print(f"重叠区域已保存至: {output_shp}")

改进

然而,我的经验告诉我,尽量少使用geopandas.

from osgeo import gdal, ogr, osr
import numpy as np


def overlay_tif_shp(tif_path: str, shp_path: str, output_shp_path: str, block_size: int = 1024):
    """
    使用 GDAL/OGR 和 NumPy 将 TIFF 栅格文件和 SHP 矢量文件叠加,生成重叠区域的 SHP 文件。
    按块处理以减少内存使用。

    参数:
        tif_path (str): 输入 TIFF 文件路径
        shp_path (str): 输入 SHP 文件路径
        output_shp_path (str): 输出重叠区域 SHP 文件路径
        block_size (int): 分块处理的大小(默认 1024x1024 像素)
    """
    # 1. 打开 TIFF 文件
    raster_ds = gdal.Open(tif_path)
    if raster_ds is None:
        raise ValueError(f"无法打开 TIFF 文件: {tif_path}")

    band = raster_ds.GetRasterBand(1)
    nodata = band.GetNoDataValue() or 0  # 默认 0,适用于 uint8

    # 获取 TIFF 尺寸
    width, height = raster_ds.RasterXSize, raster_ds.RasterYSize

    # 2. 创建临时内存图层
    mem_driver = ogr.GetDriverByName("Memory")
    temp_ds = mem_driver.CreateDataSource("temp")
    raster_srs = osr.SpatialReference()
    raster_wkt = raster_ds.GetProjectionRef()
    if raster_wkt:
        raster_srs.ImportFromWkt(raster_wkt)
    else:
        raise ValueError(f"TIFF 文件 {tif_path} 缺少空间参考")
        # 计算总块数
    total_blocks = ((height + block_size - 1) // block_size) * ((width + block_size - 1) // block_size)
    processed_blocks = 0
    temp_layer = temp_ds.CreateLayer("temp", srs=raster_srs, geom_type=ogr.wkbPolygon)
    temp_layer.CreateField(ogr.FieldDefn("Value", ogr.OFTInteger))

    # 3. 分块读取和矢量化
    for yoff in range(0, height, block_size):
        ysize = min(block_size, height - yoff)
        for xoff in range(0, width, block_size):
            xsize = min(block_size, width - xoff)

            # 读取块数据
            block_data = band.ReadAsArray(xoff, yoff, xsize, ysize)
            if block_data is None:
                continue

                # 更新进度
            processed_blocks += 1
            progress = (processed_blocks / total_blocks) * 100
            print(f"\r处理进度: {progress:.2f}% ({processed_blocks}/{total_blocks})", end="", flush=True)
            # 创建掩膜
            mask = (block_data != nodata).astype(np.uint8)

            # 创建临时栅格数据集用于 Polygonize
            mem_raster = gdal.GetDriverByName("MEM").Create("", xsize, ysize, 1, gdal.GDT_Byte)
            mem_raster.SetGeoTransform((
                raster_ds.GetGeoTransform()[0] + xoff * raster_ds.GetGeoTransform()[1],
                raster_ds.GetGeoTransform()[1],
                0,
                raster_ds.GetGeoTransform()[3] + yoff * raster_ds.GetGeoTransform()[5],
                0,
                raster_ds.GetGeoTransform()[5]
            ))
            mem_raster.SetProjection(raster_wkt)
            mem_band = mem_raster.GetRasterBand(1)
            mem_band.WriteArray(mask)

            # 矢量化当前块
            gdal.Polygonize(mem_band, None, temp_layer, 0, [], callback=None)
            mem_raster = None  # 释放内存

    # 4. 打开输入 SHP 文件
    shp_ds = ogr.Open(shp_path)
    if shp_ds is None:
        raise ValueError(f"无法打开 SHP 文件: {shp_path}")
    shp_layer = shp_ds.GetLayer()
    shp_srs = shp_layer.GetSpatialRef()
    if shp_srs is None:
        raise ValueError(f"SHP 文件 {shp_path} 缺少空间参考")

    # 5. 创建输出 SHP 文件
    out_driver = ogr.GetDriverByName("ESRI Shapefile")
    out_ds = out_driver.CreateDataSource(output_shp_path)
    out_layer = out_ds.CreateLayer("overlap", srs=shp_srs, geom_type=ogr.wkbPolygon)

    shp_layer_defn = shp_layer.GetLayerDefn()
    for i in range(shp_layer_defn.GetFieldCount()):
        out_layer.CreateField(shp_layer_defn.GetFieldDefn(i))

    # 6. 计算重叠区域
    temp_layer.ResetReading()
    shp_layer.ResetReading()

    for raster_feat in temp_layer:
        if raster_feat.GetField("Value") != 1:
            continue
        raster_geom = raster_feat.GetGeometryRef()
        if raster_geom is None:
            continue

        for shp_feat in shp_layer:
            shp_geom = shp_feat.GetGeometryRef()
            if shp_geom is None:
                continue

            if shp_srs.IsSame(raster_srs) == 0:
                transform = osr.CoordinateTransformation(shp_srs, raster_srs)
                shp_geom_clone = shp_geom.Clone()
                shp_geom_clone.Transform(transform)
            else:
                shp_geom_clone = shp_geom

            intersection = raster_geom.Intersection(shp_geom_clone)
            if intersection and not intersection.IsEmpty():
                out_feat = ogr.Feature(out_layer.GetLayerDefn())
                out_feat.SetGeometry(intersection)
                for i in range(shp_feat.GetFieldCount()):
                    out_feat.SetField(shp_feat.GetFieldDefnRef(i).GetName(), shp_feat.GetField(i))
                out_layer.CreateFeature(out_feat)
                out_feat = None

        shp_layer.ResetReading()

    # 7. 清理
    temp_ds = None
    shp_ds = None
    out_ds = None

    print(f"重叠区域已保存至: {output_shp_path}")


# 示例调用
if __name__ == "__main__":
    tif_file = r"F:\uint8.tif"
    shp_file = r"F:\ansu.shp"
    output_shp = r"F:\uint8_.shp"

    overlay_tif_shp(tif_file, shp_file, output_shp)