遥感影像黑边去除

使用Python和GDAL去除遥感影像黑边

遥感影像常因传感器限制或预处理过程而在边缘区域出现黑色或无效像素。这些黑边通常表现为多个波段的负值或零值,会干扰后续的分析、建模或可视化工作。本文将介绍一个基于 PythonGDAL 库的脚本,用于处理遥感影像的边缘区域,将满足条件的负值像素置为零,仅针对指定宽度的边界区域进行操作,从而保留影像主体数据的完整性。

问题背景

遥感影像(如卫星影像)在边缘区域常出现无效像素,这些像素在多个波段中可能表现为负值或零值。黑边不仅影响影像的美观,还可能导致统计分析偏差或机器学习模型性能下降。我们的目标是识别边界区域内至少有指定波段数量(如2个波段)为负值的像素,并将其在所有波段的值置为零,同时仅处理边缘区域,避免影响影像内部的有效数据。下图是一个含有边缘黑边的遥感影像。

image-20250418101710894

解决方案

本脚本使用 GDAL 处理地理空间数据,并结合 NumPy 进行高效的数组操作。它采用分块处理方式以应对大尺寸影像,仅针对边界区域(例如100像素宽)进行处理,并根据负值波段数量应用置零规则。主要功能包括:

  • 分块处理:通过分块读取和处理影像,高效处理大尺寸数据。
  • 边缘区域限定:仅处理指定边界宽度内的像素(如100像素)。
  • 条件置零:当某像素位置至少有 min_neg_bands 个波段为负值时,将其在所有波段置为零。
  • 进度跟踪:支持进度回调函数,提供处理进度的实时更新。

以下是完整的脚本代码及其详细解析。

#!/usr/bin/env python
# -*- coding: utf-8 -*- 
'''
去除遥感影像边缘黑边,将边界区域内满足条件的负值像素置为零
'''
import sys
from osgeo import gdal
import numpy as np
import os

def remove_black_edges_border_only(input_path, output_path, block_size=2000, edge_width=100, min_neg_bands=2, progress_callback=None):
    """
    处理遥感影像边缘区域,将所有波段中对应位置满足条件的负值像素统一置为0。
    如果边缘区域某位置至少有指定数量波段(默认2个)为负值,则所有波段的该位置置为0。

    参数:
        input_path (str): 输入影像文件路径
        output_path (str): 输出影像文件路径
        block_size (int): 分块处理的大小(默认2000像素)
        edge_width (int): 处理的边界宽度(单位:像素,默认100)
        min_neg_bands (int): 最小负值波段数,触发置零的条件(默认2)
        progress_callback (callable, optional): 进度更新回调函数,接收百分比(0-100)

    返回:
        bool: 处理成功返回 True,否则抛出异常
    """
    # 打开输入影像(只读模式)
    ds = gdal.Open(input_path, gdal.GA_ReadOnly)
    if ds is None:
        raise FileNotFoundError(f"无法打开输入文件:{input_path}")

    cols = ds.RasterXSize
    rows = ds.RasterYSize
    bands = ds.RasterCount
    geotransform = ds.GetGeoTransform()
    projection = ds.GetProjection()
    dtype = ds.GetRasterBand(1).DataType

    # 检查波段数是否足够
    if bands < min_neg_bands:
        raise ValueError(f"波段数 ({bands}) 小于最小负值波段数 ({min_neg_bands}),无法满足条件")

    # 创建输出影像
    driver = gdal.GetDriverByName('GTiff')
    out_ds = driver.Create(output_path, cols, rows, bands, dtype)
    if out_ds is None:
        raise RuntimeError(f"无法创建输出文件:{output_path}")
    out_ds.SetGeoTransform(geotransform)
    out_ds.SetProjection(projection)

    # 计算总块数,用于进度更新
    total_blocks = ((rows + block_size - 1) // block_size) * ((cols + block_size - 1) // block_size)
    current_block = 0

    # 分块处理
    for y in range(0, rows, block_size):
        block_height = min(block_size, rows - y)
        for x in range(0, cols, block_size):
            block_width = min(block_size, cols - x)

            # 读取所有波段的当前块数据
            block_data = np.zeros((bands, block_height, block_width), dtype=np.float32)
            for band_idx in range(bands):
                in_band = ds.GetRasterBand(band_idx + 1)
                block_data[band_idx] = in_band.ReadAsArray(x, y, block_width, block_height).astype(np.float32)

            # 判断当前块是否在边缘区域
            is_border = (
                x < edge_width or  # 左边缘
                y < edge_width or  # 上边缘
                (x + block_width) > (cols - edge_width) or  # 右边缘
                (y + block_height) > (rows - edge_width)    # 下边缘
            )

            # 如果在边缘区域,处理负值
            if is_border:
                # 计算每个位置负值的波段数
                neg_count = np.sum(block_data < 0, axis=0)  # 沿波段轴统计负值数量
                # 创建掩膜:负值波段数 >= min_neg_bands
                neg_mask = neg_count >= min_neg_bands
                # 对所有波段应用掩膜
                block_data[:, neg_mask] = 0

            # 将处理后的数据写回每个波段
            for band_idx in range(bands):
                out_band = out_ds.GetRasterBand(band_idx + 1)
                out_band.WriteArray(block_data[band_idx], xoff=x, yoff=y)

            # 更新进度
            current_block += 1
            if progress_callback:
                progress_percent = int((current_block / total_blocks) * 100)
                try:
                    progress_callback(progress_percent)
                except Exception as e:
                    print(f"进度回调函数调用失败: {e}")

            print(f"处理块: x={x}, y={y}, 宽度={block_width}, 高度={block_height}, 是否边缘={is_border}")

    # 确保所有数据写入磁盘
    out_ds.FlushCache()

    # 关闭数据集并清理
    ds = None
    out_ds = None
    print(f"分块处理完成,输出文件保存在:{output_path}")
    return True

结果展示

image-20250418101932120

代码解析

1. 输入与输出

  • 输入:一个多波段的遥感影像文件(支持GDAL可读取的格式,如GeoTIFF)。
  • 输出:处理后的影像文件,保留原始的地理参考信息(投影、变换参数)。

2. 分块处理

为了处理大尺寸影像,脚本将影像分成小块(默认2000x2000像素)。每次循环处理一块区域,减少内存占用:

for y in range(0, rows, block_size):
    block_height = min(block_size, rows - y)
    for x in range(0, cols, block_size):
        block_width = min(block_size, cols - x)

3. 边缘区域判定

通过以下条件判断当前块是否在边缘区域(默认宽度100像素):

is_border = (
    x < edge_width or  # 左边缘
    y < edge_width or  # 上边缘
    (x + block_width) > (cols - edge_width) or  # 右边缘
    (y + block_height) > (rows - edge_width)    # 下边缘
)

4. 负值像素处理

在边缘区域内,脚本统计每个像素位置的负值波段数量:

neg_count = np.sum(block_data < 0, axis=0)  # 沿波段轴统计负值数量
neg_mask = neg_count >= min_neg_bands  # 负值波段数 >= 阈值
block_data[:, neg_mask] = 0  # 将满足条件的像素置为零

5. 进度更新

通过 progress_callback 函数提供处理进度(0-100%),便于在大型任务中监控进度:

progress_percent = int((current_block / total_blocks) * 100)
progress_callback(progress_percent)

6. 输出保存

处理后的块数据逐波段写回输出文件,并保留原始影像的地理参考信息:

out_ds.SetGeoTransform(geotransform)
out_ds.SetProjection(projection)
out_band.WriteArray(block_data[band_idx], xoff=x, yoff=y)

使用方法

  1. 安装依赖
    确保安装了 GDALNumPy

    pip install gdal numpy
  2. 运行脚本
    修改主函数部分,指定输入和输出路径:

    if __name__ == '__main__':
        input_path = "input.tif"
        output_path = "output.tif"
        remove_black_edges_border_only(input_path, output_path, min_neg_bands=2)
  3. 参数调整

    • block_size:根据内存大小调整分块大小。
    • edge_width:设置处理的边界宽度。
    • min_neg_bands:调整触发置零的负值波段数量。

注意事项

  • 波段数验证:脚本会检查影像的波段数是否满足 min_neg_bands 要求。

  • 数据类型:像素值转换为 float32 进行处理,确保数值计算的准确性。

  • 错误处理:脚本包含文件打开失败、输出创建失败等异常处理,确保鲁棒性。

应用场景

  • 影像预处理:为机器学习或深度学习模型准备干净的遥感数据集。
  • 可视化优化:去除黑边以提高影像展示效果。
  • 数据分析:消除边缘无效像素对统计分析的干扰。

总结

通过结合 GDALNumPy,我们实现了一个高效的遥感影像黑边去除工具。该脚本通过分块处理和边缘区域限定,既保证了处理效率,又避免了对影像主体数据的破坏。开发者可以根据具体需求调整参数(如边界宽度或负值波段阈值),以适应不同类型的遥感影像处理任务。

周六去爬山,周日有空把这个功能集成到rstool中。