清理日志信息 | AI生成和翻译

Home 2025.03

import sys
import argparse
from difflib import SequenceMatcher

def clean_log(input_path=None, output_path=None, similarity_threshold=1.0, lines_to_compare=1):
    """
    读取日志文件,基于相似度移除重复的连续标准日志行,
    并将清理后的日志写入指定文件,默认覆盖输入文件。

    参数:
        input_path (str, 可选): 输入日志文件路径。如果为None,则从stdin读取。
        output_path (str, 可选): 输出日志文件路径。如果为None,则覆盖输入文件。
        similarity_threshold (float, 可选): 判定行重复的相似度比率(0.0到1.0)。默认为1.0(精确匹配)。
        lines_to_compare (int, 可选): 要比较的连续行数。默认为1。
    """

    if not isinstance(lines_to_compare, int) or lines_to_compare < 1:
        raise ValueError("lines_to_compare必须为大于等于1的整数。")

    # 确定输入源
    if input_path:
        try:
            with open(input_path, 'r') as infile:
                lines = infile.readlines()
        except FileNotFoundError:
            print(f"错误: 在路径未找到文件: {input_path}", file=sys.stderr)
            sys.exit(1)
    else:
        lines = sys.stdin.readlines()  # 从stdin读取所有行

    # 确定输出目标
    if output_path:
        try:
            outfile = open(output_path, 'w')
        except IOError:
            print(f"错误: 无法打开文件进行写入: {output_path}", file=sys.stderr)
            sys.exit(1)
    elif input_path:
        try:
            outfile = open(input_path, 'w')  # 覆盖输入文件
        except IOError:
            print(f"错误: 无法打开文件进行写入: {input_path}", file=sys.stderr)
            sys.exit(1)
    else:
        outfile = sys.stdout  # 如果没有input_path,默认输出到stdout

    num_lines = len(lines)
    i = 0
    removed_lines = 0
    while i < num_lines:
        # 收集'lines_to_compare'行,如果剩余行数不足则收集剩余行
        current_lines = lines[i:min(i + lines_to_compare, num_lines)]

        # 只有在有足够行数进行比较时才处理
        if len(current_lines) == lines_to_compare:
            # 从第一组行中提取标准信息
            current_standards = []
            all_standard = True
            for line in current_lines:
                parts = line.split(" | ", 3)
                if len(parts) == 4:
                    level, _, thread, message = parts
                    current_standards.append((thread, message))
                else:
                    print(f"非标准行: {line.strip()}")
                    print(line, end='', file=outfile)
                    all_standard = False
                    break  # 如果找到非标准行,停止处理该组

            if all_standard:
                # 从第二组行中提取标准信息(如果可用)
                next_lines_start_index = i + lines_to_compare
                next_lines_end_index = min(next_lines_start_index + lines_to_compare, num_lines)
                next_lines = lines[next_lines_start_index:next_lines_end_index]

                if len(next_lines) == lines_to_compare:
                    next_standards = []
                    for line in next_lines:
                        parts = line.split(" | ", 3)
                        if len(parts) == 4:
                            level, _, thread, message = parts
                            next_standards.append((thread, message))
                        else:
                            # 如果任何下一行是非标准的,则将下一组行视为非标准
                            next_standards = None
                            break

                    if next_standards:
                        similarity = SequenceMatcher(None, ' '.join([' '.join(x) for x in current_standards]), ' '.join([' '.join(x) for x in next_standards])).ratio()
                        print(f"相似度: {similarity:.4f}, 阈值: {similarity_threshold:.4f}")

                        if similarity < similarity_threshold:
                            for line in current_lines:
                                print(line, end='', file=outfile)
                        else:
                            print(f"跳过重复行: { ''.join([line.strip() for line in current_lines])}")
                            removed_lines += len(current_lines)
                    else:
                        for line in current_lines:
                            print(line, end='', file=outfile)
                else:
                    for line in current_lines:
                        print(line, end='', file=outfile)
            i += lines_to_compare  # 移动到下一组行
        else:
            # 处理剩余行(少于'lines_to_compare'行)
            for line in current_lines:
                parts = line.split(" | ", 3)
                if len(parts) == 4:
                    print(line, end='', file=outfile)
                else:
                    print(f"非标准行: {line.strip()}")
                    print(line, end='', file=outfile)
            i += len(current_lines)

    if output_path or input_path:
        outfile.close()

    print(f"移除了{removed_lines}个重复行。")


def is_valid_similarity_threshold(value):
    """
    检查给定值是否为有效的相似度阈值。
    """
    try:
        value = float(value)
    except ValueError:
        raise argparse.ArgumentTypeError("相似度阈值必须为浮点数。")
    if 0.0 <= value <= 1.0:
        return value
    else:
        raise argparse.ArgumentTypeError("相似度阈值必须在0.0到1.0之间。")


if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="从文件或stdin中清理重复日志行并写入文件,默认覆盖输入文件。")
    parser.add_argument("input_path", nargs="?", type=str, help="输入日志文件路径(可选,默认为stdin)")
    parser.add_argument("-o", "--output_path", type=str, help="输出日志文件路径(可选,默认为覆盖输入文件)")
    parser.add_argument("-s", "--similarity", type=is_valid_similarity_threshold, default=1.0, help="判定行重复的相似度阈值(0.0-1.0)(默认: 1.0)")
    parser.add_argument("-l", "--lines", type=int, default=1, help="要比较的连续行数(默认: 1)")

    args = parser.parse_args()

    clean_log(args.input_path, args.output_path, args.similarity, args.lines)

这个Python脚本clean_log.py旨在从文件或标准输入中移除重复的日志行。它使用相似度阈值来确定连续的日志行是否足够相似以被视为重复。

以下是代码的详细说明:

1. 导入模块:

2. clean_log函数:

3. is_valid_similarity_threshold函数:

4. if __name__ == "__main__":块:

总之,该脚本提供了一种灵活的方式来清理日志文件,通过基于可配置的相似度阈值和要比较的行数来移除重复行。它支持从stdin读取、写入stdout以及覆盖输入文件。


Back Donate