首页

技术审计文档

本文档面向安全研究员、密码学审计人员和开发者,精确描述 Nephele Workshop 安全相关模块的实现细节、已知限制与威胁模型。

所有核心源码直接附于文中,白盒供审。敏感常数(如水印密码)已脱敏处理。

本文档对应 Nephele Workshop v0.4.0-beta 代码库状态。

提示

本文档中所有源码片段以 MIT License 公开。自由复制、修改、商用;保留版权声明即可。


1. 审计范围

1.1 受审文件清单

功能文件审计范围
数字存证tools/rights/logic.py文件哈希、批量存证、深度验证
tools/rights/utils.pyMerkle Tree
tools/rights/tsa_client.pyRFC 3161 TSA 客户端
tools/rights/rights_packer.py.nep 容器打包
维权取证tools/rights/url_evidence.pyURL 取证主流程、TLS、CAPTCHA
core/browser/session.pyPlaywright 会话 / 截图
隐水印tools/packer/watermark_protection.py定长编码、round-trip 验证、异常回退
tools/packer/logic.py / agent_api.py业务层调用
core/workers/watermark_worker.py后台提取线程
blind_watermark (PyPI)DWT+DCT+SVD 底层算法
AI 元数据检测tools/validator/logic.py元数据读取、规则匹配、证据分级
tools/validator/c2pa_verifier.py官方 C2PA SDK 适配、信任状态解析
core/workers/ai_detector_worker.py批量检测线程

1.2 产品边界

本审计涉及:

  • 付费 / 许可证模块(core/license_manager.pycore/payment.py
  • 认证 / JWT / CAPTCHA 对接(core/auth/
  • AI 对话 Agent 与云端推理(core/agent_loop.pynephele-api/
  • 客户端更新与 SSL 固定(core/updater.pycore/ssl_pinning.py

这些模块各有独立的安全边界和威胁模型,不在本文档范围内。


2. 数字存证核心实现

2.1 文件哈希计算

rights/logic.py · L352-L395
GitHub
def calculate_file_hash(file_path: Path, algorithm: str = 'sha256') -> str:
    """
    计算文件的哈希值(极简版,仅用于非确权场景)
    
    Args:
        file_path: 文件路径
        algorithm: 哈希算法,默认 'sha256'
    
    Returns:
        文件的十六进制哈希值
    
    Raises:
        RightsError: 文件不存在或读取失败
    """
    if not file_path or not isinstance(file_path, Path):
        raise RightsError(f"无效的文件路径: {file_path}")
    
    if not file_path.exists():
        raise RightsError(f"文件不存在: {file_path}")
    
    if not file_path.is_file():
        raise RightsError(f"路径不是文件: {file_path}")
    
    try:
        file_size = file_path.stat().st_size
        if file_size > 10 * 1024 * 1024 * 1024:  # 10GB 限制
            raise RightsError(f"文件过大(超过10GB): {file_path}")
        
        hash_obj = hashlib.new(algorithm)
        with open(file_path, 'rb') as f:
            chunk_size = 8192
            while True:
                chunk = f.read(chunk_size)
                if not chunk:
                    break
                hash_obj.update(chunk)
        
        return hash_obj.hexdigest()
    except PermissionError:
        raise RightsError(f"没有权限读取文件: {file_path}")
    except OSError as e:
        raise RightsError(f"读取文件失败: {file_path}, 错误: {str(e)}")
    except Exception as e:
        raise RightsError(f"计算文件哈希失败: {file_path}, 错误: {str(e)}")
tools/rights/logic.py:calculate_file_hash()

审计点

  • 算法:SHA-256,无盐值,无密钥(非 HMAC)
  • 分块:8,192 字节,流式处理
  • 上限:10 GB,超限拒绝

2.2 Merkle Tree 完整实现

rights/utils.py · L13-L185
GitHub
class MerkleTree:
    """
    Merkle Tree 实现,用于将多个文件的哈希值聚合成单个根哈希

    优势:
    - 支持 100+ 文件批量处理
    - 单个根哈希可代表整个批次
    - 节省 TSA 调用成本(1 次调用 vs N 次调用)

    Known limitation (second-preimage resistance):
        This implementation does NOT use domain separation prefixes for leaf vs
        internal nodes (i.e. b'\\x00' for leaves, b'\\x01' for internal nodes as
        recommended by RFC 6962 §2.1).  Adding prefixes would change the root hash
        computation and break backward compatibility with all existing .nep files
        and the verification website (verify.arisfusion.com).  A future tree_version
        bump can introduce domain separation; the current version is safe for our
        threat model (user-submitted files, not adversarial tree construction).
    """
    
    def __init__(self, hash_algorithm: str = 'sha256'):
        """
        初始化 Merkle Tree
        
        Args:
            hash_algorithm: 哈希算法,默认 'sha256'
        """
        self.hash_algorithm = hash_algorithm
        self.leaves: List[str] = []
        self.tree: List[List[str]] = []
        self.root_hash: Optional[str] = None
    
    def add_leaf(self, data: bytes) -> str:
        """
        添加叶子节点(文件哈希)
        
        Args:
            data: 文件数据或哈希值(bytes)
        
        Returns:
            叶子节点的哈希值
        """
        hash_obj = hashlib.new(self.hash_algorithm)
        hash_obj.update(data)
        leaf_hash = hash_obj.hexdigest()
        self.leaves.append(leaf_hash)
        return leaf_hash
    
    def add_file_hash(self, file_hash: str) -> None:
        """
        直接添加文件哈希值(已计算好的)
        
        Args:
            file_hash: 文件的十六进制哈希值
        """
        self.leaves.append(file_hash)
    
    def build(self) -> str:
        """
        构建 Merkle Tree 并返回根哈希
        
        Returns:
            根哈希值(十六进制字符串)
        """
        if not self.leaves:
            raise ValueError("Merkle Tree 没有叶子节点")
        
        # 如果只有一个叶子节点,直接返回
        if len(self.leaves) == 1:
            self.root_hash = self.leaves[0]
            return self.root_hash
        
        # 构建树:从叶子节点开始,逐层向上
        current_level = self.leaves.copy()
        self.tree = [current_level]
        
        while len(current_level) > 1:
            next_level = []
            
            # 成对处理节点
            for i in range(0, len(current_level), 2):
                if i + 1 < len(current_level):
                    # 两个节点:合并哈希
                    combined = current_level[i] + current_level[i + 1]
                else:
                    # 奇数个节点:最后一个节点复制后与自己合并
                    combined = current_level[i] + current_level[i]
                
                # 计算父节点哈希
                hash_obj = hashlib.new(self.hash_algorithm)
                hash_obj.update(combined.encode('utf-8'))
                parent_hash = hash_obj.hexdigest()
                next_level.append(parent_hash)
            
            self.tree.append(next_level)
            current_level = next_level
        
        # 根哈希是最后一层的唯一节点
        self.root_hash = current_level[0]
        return self.root_hash
    
    def get_proof(self, leaf_index: int) -> List[Dict]:
        """
        获取指定叶子节点的 Merkle Proof(用于验证)

        Args:
            leaf_index: 叶子节点索引

        Returns:
            Merkle Proof 路径,每个元素为 {'hash': str, 'position': 'left'|'right'}
            position 表示兄弟节点在合并时的位置
        """
        if not self.tree:
            self.build()

        if leaf_index >= len(self.leaves):
            raise IndexError(f"叶子节点索引超出范围: {leaf_index}")

        proof = []
        current_index = leaf_index
        current_level = 0

        while current_level < len(self.tree) - 1:
            level = self.tree[current_level]

            # 找到兄弟节点并记录位置
            if current_index % 2 == 0:
                # 当前是左节点,兄弟在右侧
                sibling_index = current_index + 1
                if sibling_index < len(level):
                    proof.append({'hash': level[sibling_index], 'position': 'right'})
                else:
                    # 奇数情况,兄弟是自己(已复制)
                    proof.append({'hash': level[current_index], 'position': 'right'})
            else:
                # 当前是右节点,兄弟在左侧
                sibling_index = current_index - 1
                proof.append({'hash': level[sibling_index], 'position': 'left'})

            # 移动到上一层
            current_index = current_index // 2
            current_level += 1

        return proof

    def verify_proof(self, leaf_hash: str, proof: List[Dict], root_hash: str) -> bool:
        """
        验证 Merkle Proof

        Args:
            leaf_hash: 叶子节点哈希
            proof: Merkle Proof 路径(由 get_proof 返回)
            root_hash: 根哈希

        Returns:
            验证是否通过
        """
        current_hash = leaf_hash

        for step in proof:
            sibling_hash = step['hash']
            position = step['position']

            # 按照 build() 相同的位置顺序合并:左 + 右
            if position == 'right':
                combined = current_hash + sibling_hash
            else:
                combined = sibling_hash + current_hash

            hash_obj = hashlib.new(self.hash_algorithm)
            hash_obj.update(combined.encode('utf-8'))
            current_hash = hash_obj.hexdigest()

        return current_hash == root_hash
tools/rights/utils.py:MerkleTree

已知安全限制(主动披露)

当前实现没有使用 RFC 6962 §2.1 推荐的域分隔前缀(叶子节点未加 \x00,内部节点未加 \x01)。这意味着在极端对抗性场景下,存在 second-preimage 构造的理论可能。

实际风险评估

  • 威胁模型为"用户为自己的作品生成存证"时,风险可忽略
  • 威胁模型若要求"抵抗恶意构造碰撞",当前实现不满足该安全等级

2.3 TSA 客户端完整实现

rights/tsa_client.py · L29-L514
GitHub
class TSAClient:
    """
    RFC 3161 时间戳服务客户端

    支持的服务:
    - FreeTSA (https://freetsa.org/tsr) - 免费,无需注册
    - DigiCert (http://timestamp.digicert.com)
    - IdenTrust (http://timestamp.identrust.com)
    - 其他 RFC 3161 兼容服务
    """

    # 预定义的 TSA 服务提供商
    PROVIDERS = {
        'freetsa': {
            'name': 'FreeTSA',
            'url': 'https://freetsa.org/tsr',
            'hashname': 'sha256',
            'description': '免费时间戳服务,国际标准 RFC 3161',
            'requires_auth': False,
            'legal_strength': 3,  # 1-5 评分
            'price': 0
        },
        'digicert': {
            'name': 'DigiCert',
            'url': 'http://timestamp.digicert.com',
            'hashname': 'sha256',
            'description': 'DigiCert 免费时间戳服务',
            'requires_auth': False,
            'legal_strength': 4,
            'price': 0
        },
        'identrust': {
            'name': 'IdenTrust',
            'url': 'http://timestamp.identrust.com',
            'hashname': 'sha256',
            'description': 'IdenTrust 免费时间戳服务',
            'requires_auth': False,
            'legal_strength': 4,
            'price': 0
        }
    }

    def __init__(
        self,
        provider: str = 'freetsa',
        custom_url: Optional[str] = None,
        hashname: str = 'sha256',
        timeout: int = 30
    ):
        """
        初始化 TSA 客户端

        Args:
            provider: 预定义的服务提供商名称 ('freetsa', 'digicert', 'identrust')
            custom_url: 自定义 TSA URL(如果指定,则忽略 provider)
            hashname: 哈希算法 ('sha256', 'sha512' 等)
            timeout: 请求超时时间(秒)
        """
        if not RFC3161_AVAILABLE:
            raise ImportError(
                "rfc3161ng 库未安装。请运行: pip install rfc3161ng"
            )

        if custom_url:
            self.url = custom_url
            self.provider_name = "Custom TSA"
            self.provider_key = None
        elif provider in self.PROVIDERS:
            config = self.PROVIDERS[provider]
            self.url = config['url']
            self.provider_name = config['name']
            self.provider_key = provider
            hashname = config['hashname']
        else:
            raise ValueError(
                f"未知的 TSA 提供商: {provider}。"
                f"支持的提供商: {', '.join(self.PROVIDERS.keys())}"
            )

        self.hashname = hashname
        self.timeout = timeout

        # 初始化 rfc3161ng 时间戳器
        # 注意:某些环境可能遇到 SSL 握手问题,这是正常的
        # 我们的设计会自动降级到本地哈希
        try:
            self.stamper = rfc3161ng.RemoteTimestamper(
                url=self.url,
                hashname=self.hashname,
                timeout=self.timeout
            )
        except Exception as e:
            # 如果初始化失败,记录错误但不抛出异常
            # 后续调用时会返回失败状态
            self.stamper = None
            self._init_error = str(e)

    FAILOVER_ORDER: List[str] = ['digicert', 'freetsa', 'identrust']

    def _call_with_retry(self, hash_bytes: bytes, max_retries: int = 3) -> bytes:
        """
        带指数退避和提供商故障转移的 TSA 调用

        Args:
            hash_bytes: 要签名的哈希值(二进制)
            max_retries: 每个提供商的最大重试次数

        Returns:
            TSR 令牌(二进制)

        Raises:
            Exception: 所有提供商均失败
        """
        # 构建尝试顺序:当前提供商优先,然后是其他提供商
        providers_to_try = []
        if self.provider_key:
            providers_to_try.append(self.provider_key)
            for p in self.FAILOVER_ORDER:
                if p != self.provider_key:
                    providers_to_try.append(p)
        else:
            # 自定义 URL,无故障转移
            providers_to_try = [None]

        last_error = None
        for provider_key in providers_to_try:
            if provider_key is not None:
                config = self.PROVIDERS[provider_key]
                url = config['url']
                hashname = config['hashname']
                provider_name = config['name']
            else:
                url = self.url
                hashname = self.hashname
                provider_name = self.provider_name

            for attempt in range(max_retries):
                try:
                    stamper = rfc3161ng.RemoteTimestamper(
                        url=url,
                        hashname=hashname,
                        timeout=self.timeout
                    )
                    tsr_token = stamper(digest=hash_bytes)
                    # 成功后更新当前提供商信息
                    if provider_key is not None:
                        self.provider_name = provider_name
                        self.provider_key = provider_key
                        self.url = url
                        self.stamper = stamper
                    return tsr_token
                except Exception as e:
                    last_error = e
                    if attempt < max_retries - 1:
                        time.sleep(2 ** attempt)

        raise Exception(
            f"所有 TSA 提供商均失败 (尝试: {', '.join(p or 'custom' for p in providers_to_try)}): {last_error}"
        )

    def timestamp_data(self, data: bytes) -> bytes:
        """
        对原始数据生成时间戳

        Args:
            data: 要加时间戳的数据(二进制)

        Returns:
            TSR (Time-Stamp Response) 二进制令牌

        Raises:
            Exception: 时间戳请求失败
        """
        try:
            # 计算数据的哈希值
            hash_obj = hashlib.new(self.hashname)
            hash_obj.update(data)
            data_hash = hash_obj.digest()

            # 调用 TSA 服务获取时间戳(带重试和故障转移)
            tsr_token = self._call_with_retry(data_hash)

            return tsr_token
        except Exception as e:
            raise Exception(f"时间戳请求失败: {str(e)}")

    def timestamp_file(self, file_path: Path, output_path: Optional[Path] = None) -> Dict:
        """
        为文件生成时间戳(流式处理,不将整个文件载入内存)

        Args:
            file_path: 要加时间戳的文件路径
            output_path: TSR 文件输出路径(可选,默认为 file_path.tsr)

        Returns:
            包含时间戳信息的字典
        """
        file_path = Path(file_path)

        if not file_path.exists():
            return {
                'success': False,
                'message': f"文件不存在: {file_path}"
            }

        try:
            # 流式计算文件哈希(8KB 分块,避免大文件内存溢出)
            hash_obj = hashlib.new(self.hashname)
            with open(file_path, 'rb') as f:
                while True:
                    chunk = f.read(8192)
                    if not chunk:
                        break
                    hash_obj.update(chunk)

            file_hash = hash_obj.hexdigest()

            # 确定输出路径
            if output_path is None:
                output_path = file_path.parent / f"{file_path.stem}.tsr"
            else:
                output_path = Path(output_path)

            # 委托给 timestamp_hash(处理 TSA 调用、重试和文件写入)
            result = self.timestamp_hash(file_hash, output_path)

            # 确保返回文件哈希
            if result.get('success'):
                result['hash'] = file_hash

            return result

        except Exception as e:
            return {
                'success': False,
                'message': f"时间戳生成失败: {str(e)}"
            }

    def timestamp_hash(self, hash_value: str, output_path: Path) -> Dict:
        """
        为已知的哈希值生成时间戳(用于 Merkle Root 等场景)

        Args:
            hash_value: 十六进制哈希值字符串
            output_path: TSR 文件输出路径

        Returns:
            包含时间戳信息的字典
        """
        try:
            # 将十六进制哈希值转换为二进制
            hash_bytes = bytes.fromhex(hash_value)

            # 获取时间戳令牌(带重试和故障转移)
            tsr_token = self._call_with_retry(hash_bytes)

            # 保存 TSR 文件
            output_path = Path(output_path)
            output_path.parent.mkdir(parents=True, exist_ok=True)
            with open(output_path, 'wb') as f:
                f.write(tsr_token)

            # 从 TSR 令牌中提取 TSA 认证时间(而非本机时钟)
            tsa_timestamp = datetime.now().isoformat()  # fallback
            tsa_issuer = self.provider_name
            try:
                from asn1crypto import tsp, cms
                # Try TimeStampResp first, then ContentInfo
                signed_data = None
                try:
                    ts_resp = tsp.TimeStampResp.load(tsr_token)
                    signed_data = ts_resp['time_stamp_token']['content']
                except (ValueError, KeyError, TypeError):
                    try:
                        ci = cms.ContentInfo.load(tsr_token)
                        if ci['content_type'].native == 'signed_data':
                            signed_data = ci['content']
                    except (ValueError, KeyError, TypeError):
                        pass

                if signed_data:
                    tst_info = signed_data['encap_content_info']['content'].parsed
                    gen_time = tst_info['gen_time'].native
                    if gen_time:
                        tsa_timestamp = gen_time.isoformat()

                    # Extract issuer CN from signer info
                    try:
                        signer_infos = signed_data['signer_infos']
                        if signer_infos:
                            sid = signer_infos[0]['sid']
                            if sid.name == 'issuer_and_serial_number':
                                for rdn in sid.chosen['issuer'].chosen:
                                    for attr in rdn:
                                        if attr['type'].dotted == '2.5.4.3':
                                            tsa_issuer = attr['value'].native
                                            break
                    except (KeyError, IndexError, ValueError):
                        pass
            except ImportError:
                pass  # asn1crypto not available, use fallback values

            return {
                'success': True,
                'timestamp': tsa_timestamp,
                'hash': hash_value,
                'issuer': tsa_issuer,
                'tsr_path': str(output_path),
                'algorithm': self.hashname.upper(),
                'message': f"时间戳生成成功,TSR 文件: {output_path.name}"
            }

        except Exception as e:
            return {
                'success': False,
                'message': f"时间戳生成失败: {str(e)}"
            }

    def verify_tsr(
        self,
        tsr_path: Path,
        data: Optional[bytes] = None,
        digest: Optional[bytes] = None,
    ) -> Dict:
        """
        验证 TSR 时间戳令牌

        Args:
            tsr_path: TSR 文件路径
            data: 原始数据(可选,库内部计算哈希后比对)
            digest: 预计算的哈希值(可选,直接与 TSR 中记录的哈希比对)
                    data 和 digest 二选一;都不传则仅验证 TSR 结构

        Returns:
            验证结果字典,包含 'valid', 'message', 'issuer' 等字段。
            当 data/digest 未提供时,结果包含 'partial_verification': True 标志。
        """
        tsr_path = Path(tsr_path)

        if not tsr_path.exists():
            return {
                'valid': False,
                'message': f"TSR 文件不存在: {tsr_path}"
            }

        try:
            with open(tsr_path, 'rb') as f:
                tsr_token = f.read()

            if digest is not None:
                # 直接用预计算的哈希比对(用于 Merkle Root 等场景)
                verified = rfc3161ng.check_timestamp(tsr_token, digest=digest)
                if verified:
                    return {
                        'valid': True,
                        'message': '时间戳验证通过(数据完整性已确认)',
                        'issuer': self._extract_issuer_from_tsr(tsr_token),
                    }
                else:
                    return {
                        'valid': False,
                        'message': '时间戳验证失败:哈希值不匹配'
                    }
            elif data is not None:
                # 传入原始数据,由库内部计算哈希后比对
                verified = rfc3161ng.check_timestamp(tsr_token, data=data)
                if verified:
                    return {
                        'valid': True,
                        'message': '时间戳验证通过(数据完整性已确认)',
                        'issuer': self._extract_issuer_from_tsr(tsr_token),
                    }
                else:
                    return {
                        'valid': False,
                        'message': '时间戳验证失败:哈希值不匹配'
                    }
            else:
                # 结构验证:无原始数据时,验证 TSR 文件结构有效性
                return self._verify_tsr_structure(tsr_token)

        except Exception as e:
            return {
                'valid': False,
                'message': f'时间戳验证失败: {str(e)}'
            }

    def _extract_issuer_from_tsr(self, tsr_token: bytes) -> str:
        """Extract the actual issuer CN from a TSR token using asn1crypto.

        Falls back to self.provider_name if parsing fails or asn1crypto
        is unavailable.  This ensures verify_tsr() reports the real issuer
        even after failover to a different TSA provider.
        """
        try:
            from asn1crypto import tsp, cms

            signed_data = None
            try:
                ts_resp = tsp.TimeStampResp.load(tsr_token)
                signed_data = ts_resp['time_stamp_token']['content']
            except (ValueError, KeyError, TypeError):
                try:
                    ci = cms.ContentInfo.load(tsr_token)
                    if ci['content_type'].native == 'signed_data':
                        signed_data = ci['content']
                except (ValueError, KeyError, TypeError):
                    pass

            if signed_data:
                signer_infos = signed_data['signer_infos']
                if signer_infos:
                    sid = signer_infos[0]['sid']
                    if sid.name == 'issuer_and_serial_number':
                        for rdn in sid.chosen['issuer'].chosen:
                            for attr in rdn:
                                if attr['type'].dotted == '2.5.4.3':  # CN
                                    return attr['value'].native
        except (ImportError, Exception):
            pass

        return self.provider_name

    def _verify_tsr_structure(self, tsr_token: bytes) -> Dict:
        """验证 TSR 令牌的 ASN.1 结构(不验证数据完整性)"""
        if len(tsr_token) < 20:
            return {'valid': False, 'message': 'TSR 文件过小,可能已损坏'}

        if tsr_token[0] != 0x30:
            return {'valid': False, 'message': 'TSR 文件不是有效的 ASN.1 DER 格式'}

        # 尝试 asn1crypto 深度结构验证
        try:
            from asn1crypto import tsp, cms

            # 格式 1: TimeStampResp
            try:
                ts_resp = tsp.TimeStampResp.load(tsr_token)
                status = ts_resp['status']['status'].native
                if status not in ('granted', 'granted_with_mods'):
                    return {
                        'valid': False,
                        'message': f'TSR 状态异常: {status}'
                    }
                return {
                    'valid': True,
                    'message': '时间戳结构验证通过(未验证数据完整性)',
                    'issuer': self.provider_name,
                    'partial_verification': True
                }
            except (ValueError, KeyError, TypeError):
                pass

            # 格式 2: ContentInfo/SignedData (DigiCert 等)
            try:
                content_info = cms.ContentInfo.load(tsr_token)
                if content_info['content_type'].native == 'signed_data':
                    return {
                        'valid': True,
                        'message': '时间戳结构验证通过(未验证数据完整性)',
                        'issuer': self.provider_name,
                        'partial_verification': True
                    }
            except (ValueError, KeyError, TypeError):
                pass

            return {'valid': False, 'message': '无法解析 TSR 结构'}

        except ImportError:
            # asn1crypto 不可用,基础结构检查已通过(size + 0x30 tag)
            return {
                'valid': True,
                'message': '时间戳基础结构检查通过(安装 asn1crypto 可获得深度验证)',
                'issuer': self.provider_name,
                'partial_verification': True
            }

    @classmethod
    def get_provider_info(cls, provider: str) -> Optional[Dict]:
        """获取预定义服务提供商的信息"""
        return cls.PROVIDERS.get(provider)

    @classmethod
    def list_providers(cls) -> Dict[str, Dict]:
        """列出所有预定义的服务提供商"""
        return cls.PROVIDERS.copy()
tools/rights/tsa_client.py:TSAClient

审计要点

  • 默认构造:provider='freetsa'
  • UI 调用 batch_protect_works(tsa_provider='digicert'),因此用户实际首选 DigiCert
  • 故障转移顺序:['digicert', 'freetsa', 'identrust']
  • 指数退避:sleep(2 ** attempt),即 1s, 2s, 4s
  • 单提供商最多 3 次重试
  • timestamp_hash 中的 tsa_timestamp 初始回退值为 datetime.now().isoformat()(本地时钟),仅在 asn1crypto 成功解析后才替换为 TSA 断言时间

2.4 批量存证主流程

rights/logic.py · L660-L810
GitHub
def batch_protect_works(
    file_paths: List[Path],
    author_name: str,
    inspiration: Optional[str] = None,
    output_dir: Optional[Path] = None,
    password: Optional[str] = None,
    progress_callback=None,
    tsa_provider: str = 'digicert',
    tsa_timeout: int = 30,
    cert_mode: str = 'simple',
) -> Dict:
    """
    批量保护作品(数字存证核心流程)
    
    流程:
    1. 计算所有文件哈希
    2. 构建 Merkle Tree
    3. 生成 manifest.json
    4. 生成缩略图拼贴
    5. 调用 TSA 获取时间戳(使用根哈希)
    6. 生成 PDF 报告
    7. 打包为 .nep 文件
    
    Args:
        file_paths: 文件路径列表
        author_name: 作者名称
        inspiration: 创作灵感(可选)
        output_dir: 输出目录
        password: .nep 文件密码(可选)
        progress_callback: 进度回调 (current, total, message)
    
    Returns:
        包含处理结果的字典
    """
    from .utils import build_merkle_tree_from_files
    from .rights_packer import RightsPacker
    from .pdf_generator import PDFGenerator
    
    if not file_paths:
        raise RightsError("文件列表为空")
    
    if output_dir is None:
        output_dir = Path.cwd() / "digital_evidence"
    output_dir = Path(output_dir)
    output_dir.mkdir(parents=True, exist_ok=True)
    
    try:
        # 步骤 1: 计算文件哈希并构建 Merkle Tree
        if progress_callback:
            progress_callback(0, len(file_paths), "正在计算文件哈希...")
        
        def hash_progress(current, total):
            if progress_callback:
                progress_callback(current, total, f"计算哈希: {current}/{total}")
        
        tree = build_merkle_tree_from_files(file_paths, progress_callback=hash_progress)
        root_hash = tree.root_hash

        image_extensions = {'.jpg', '.jpeg', '.png', '.webp', '.bmp'}

        # 步骤 2: 收集文件哈希和作品信息 + 计算感知哈希
        # Reuse hashes already computed by Merkle Tree (avoid double I/O)
        file_hashes = dict(tree.file_hashes)
        works = []
        fingerprints = []  # Perceptual hashes for image files

        for file_path in file_paths:
            file_hash = file_hashes.get(str(file_path), calculate_file_hash(file_path))

            # 获取文件创建时间
            try:
                creation_time = datetime.fromtimestamp(file_path.stat().st_mtime)
            except (OSError, ValueError, OverflowError):
                creation_time = datetime.now()

            works.append({
                'title': file_path.stem,
                'creation_date': creation_time.isoformat(),
                'file_path': str(file_path),
                'file_hash': file_hash
            })

            # Compute perceptual hash for image files (non-blocking, best-effort)
            if file_path.suffix.lower() in image_extensions:
                try:
                    from .fingerprint import compute_fingerprint
                    fp = compute_fingerprint(file_path, file_sha256=file_hash)
                    fingerprints.append(fp)
                except Exception as e:
                    import logging as _logging
                    _logging.getLogger(__name__).debug(
                        "Skipping perceptual hash for %s: %s", file_path.name, e
                    )
        
        # 步骤 3: 生成 manifest.json
        if progress_callback:
            progress_callback(len(file_paths), len(file_paths), "正在生成清单...")
        
        packer = RightsPacker(output_dir / "evidence.nep", password=password)
        manifest_data = packer.create_manifest(
            author_name=author_name,
            inspiration=inspiration,
            works=works,
            file_hashes=file_hashes
        )
        
        # 步骤 4: 生成缩略图(仅图片文件)
        image_paths = [p for p in file_paths if p.suffix.lower() in image_extensions]
        
        thumbnail_path = None
        if image_paths:
            if progress_callback:
                progress_callback(len(file_paths), len(file_paths), "正在生成缩略图...")
            
            thumbnail_path = output_dir / "thumbnail.jpg"
            packer.generate_thumbnail(image_paths, thumbnail_path)
        
        # 步骤 5: 调用 DigiCert 获取真实 RFC 3161 时间戳
        if progress_callback:
            progress_callback(len(file_paths), len(file_paths), "正在获取 DigiCert 时间戳...")

        tsa_binary_path = output_dir / "proof.tsa"    # RFC 3161 二进制
        local_json_path = output_dir / "proof.json"   # 本地降级 JSON
        timestamp_file = tsa_binary_path              # 最终实际写入的文件

        try:
            from .tsa_client import TSAClient

            # 初始化 TSA 客户端(可配置提供商)
            tsa_client = TSAClient(provider=tsa_provider, timeout=tsa_timeout)

            # 用 Merkle Root Hash 获取时间戳
            tsa_result = tsa_client.timestamp_hash(root_hash, tsa_binary_path)

            if tsa_result['success']:
                timestamp_file = tsa_binary_path
                timestamp_info = {
                    'timestamp': tsa_result['timestamp'],
                    'hash': root_hash,
                    'issuer': tsa_result['issuer'],
                    'algorithm': tsa_result['algorithm'],
                    'valid': True,
                    'tsr_path': str(tsa_binary_path)
                }
            else:
                # TSA 失败,使用本地时间戳(写 .json,不混淆 .tsa)
                timestamp_file = local_json_path
                timestamp_info = {
                    'timestamp': datetime.now().isoformat(),
                    'hash': root_hash,
                    'issuer': 'Nephele Workshop (本地)',
tools/rights/logic.py:batch_protect_works() (核心片段)

关键行为

  • cert_mode 默认 "simple",不会自动检测源文件升级
  • UI 层(PipelineWindow.qml)根据 certifySourceFiles.length > 0 显式传入 "full"
  • rfc3161ng 未安装 → 强制降级本地时间戳,不阻断流程
  • manifest_sha256 计算时排除自身键,防止循环依赖

2.5 .nep 打包实现

rights/rights_packer.py · L196-L232
GitHub
    def _write_zip_contents(zipf, manifest_data, thumbnail_path, timestamp_file,
                            pdf_report, additional_files, source_files):
        """将内容写入 ZIP 文件(供 pack() 内部使用)"""
        # 1. 添加 manifest.json
        manifest_json = json.dumps(manifest_data, indent=2, ensure_ascii=False)
        zipf.writestr('manifest.json', manifest_json.encode('utf-8'))

        # 2. 添加缩略图
        if thumbnail_path and thumbnail_path.exists():
            zipf.write(thumbnail_path, 'thumbnail.jpg')

        # 3. 添加时间戳文件(保持原始扩展名:.tsa=RFC 3161 二进制,.json=本地降级)
        if timestamp_file and timestamp_file.exists():
            archive_name = 'proof.tsa' if timestamp_file.suffix == '.tsa' else 'proof.json'
            zipf.write(timestamp_file, archive_name)

        # 4. 添加 PDF 报告
        if pdf_report and pdf_report.exists():
            zipf.write(pdf_report, 'VerificationReport.pdf')

        # 5. 原始作品文件 → works/ 目录
        # Use indexed filenames (e.g. works/000_photo.jpg) to avoid collisions
        # when multiple source files share the same basename.  Must match the
        # indexed keys in manifest_data['works_map'].
        if source_files:
            for idx, file_path in enumerate(source_files):
                if isinstance(file_path, str):
                    file_path = Path(file_path)
                if file_path.exists() and file_path.is_file():
                    zipf.write(file_path, f'works/{idx:03d}_{file_path.name}')

        # 6. 添加额外文件
        if additional_files:
            for file_path in additional_files:
                if file_path.exists():
                    zipf.write(file_path, f'additional/{file_path.name}')
tools/rights/rights_packer.py:RightsPacker._write_zip_contents()

密码保护逻辑(pack() 方法):

  • pyzipper 可用且设置了密码:使用 AESZipFile + WZ_AES(AES-256)
  • pyzipper 不可用但设置了密码:回退标准 ZIP,manifest 中写入 _warning
  • 无密码:标准 zipfile.ZipFile

2.6 独立验证器(verify.arisfusion.com)

公开部署于 verify.arisfusion.com.nep 独立验证器。

部署形态

  • 单文件 HTML(2087 行),无构建工具、无打包流程、无外部依赖
  • 纯客户端计算(SubtleCrypto + 纯 JS 实现的 ASN.1 解析)
  • 浏览器"查看页面源代码"即可审计完整逻辑

验证链路

  1. 用户上传 .nep → 浏览器本地解包
  2. works/ 重算 SHA-256 → 按文件名字典序构建 Merkle Tree
  3. 解析 proof.tsa(RFC 3161 TSR ASN.1 结构)→ 提取 messageImprint.hashedMessage
  4. 比对本地 Merkle Root vs TSR 内嵌 digest
  5. 解析 TSR 中的 genTimetsa 字段,展示签发机构与时间

信任边界

  • 验证器本身不签发任何时间戳,只读取 .nep 内已存在的 proof.tsa
  • TSA 签名的密码学验证(证书链 + 公钥链)在当前版本为结构性比对 + TSA 公钥指纹匹配,完整的 CA 链验证推荐使用 openssl ts -verifyrfc3161ng 交叉确认
  • 源码 MIT 许可,任何人可以自行搭建镜像或离线使用(保存 HTML 文件即可)

2.7 深度验证实现

rights/logic.py · L553-L660
GitHub
def verify_evidence_package(
    tsa_path: Path,
    file_paths: List[Path],
) -> Tuple[bool, Dict]:
    """
    深度验证存证包:重算文件哈希 → 重建 Merkle Tree → 与 TSR 中签名的哈希比对。

    验证链路:
      文件列表 → SHA-256 → Merkle Tree → Root Hash → 与 TSR 内 messageImprint 比对

    Args:
        tsa_path: .tsa/.tsr 时间戳文件路径
        file_paths: 被存证的原始文件路径列表(顺序必须与存证时一致)

    Returns:
        (是否通过, 验证结果字典)
    """
    from .utils import build_merkle_tree_from_files

    if not tsa_path.exists():
        return False, {'valid': False, 'message': f"时间戳文件不存在: {tsa_path}"}

    missing = [str(p) for p in file_paths if not p.exists()]
    if missing:
        return False, {'valid': False, 'message': f"原始文件缺失: {', '.join(missing)}"}

    try:
        # Step 1: 重算文件哈希,重建 Merkle Tree
        tree = build_merkle_tree_from_files(file_paths)
        computed_root = tree.root_hash

        # Step 2: 检查是否为本地时间戳(无第三方签名,不具备证明力)
        suffix = tsa_path.suffix.lower()
        if suffix == '.json':
            tsr_result = parse_tsr(tsa_path)
            tsr_hash = tsr_result.get('hash', tsr_result.get('work_identity', ''))
            if computed_root.lower() == tsr_hash.lower():
                return False, {
                    'valid': False,
                    'message': "本地时间戳无第三方签名,不具备密码学证明力。文件哈希一致但无法证明时间。",
                    'root_hash': computed_root,
                    'local_only': True,
                    'file_count': len(file_paths),
                }
            else:
                return False, {
                    'valid': False,
                    'message': "验证失败:Merkle Root 与本地时间戳记录不匹配",
                    'computed_root': computed_root,
                    'file_count': len(file_paths),
                }

        # Step 3: RFC 3161 TSR — 用 rfc3161ng 做真正的密码学签名验证
        computed_digest = bytes.fromhex(computed_root)
        try:
            from .tsa_client import TSAClient
            tsa_client = TSAClient()
            verify_result = tsa_client.verify_tsr(tsa_path, digest=computed_digest)

            if verify_result.get('valid'):
                # 同时解析 TSR 获取时间戳详情(签发时间、签发方)
                tsr_result = parse_tsr(tsa_path)
                return True, {
                    'valid': True,
                    'message': f"深度验证通过:{len(file_paths)} 个文件的 Merkle Root 与 TSA 签名匹配(密码学验证)",
                    'timestamp': tsr_result.get('timestamp'),
                    'issuer': tsr_result.get('issuer'),
                    'root_hash': computed_root,
                    'file_count': len(file_paths),
                }
            else:
                return False, {
                    'valid': False,
                    'message': f"TSA 签名验证失败:{verify_result.get('message', 'unknown')}",
                    'root_hash': computed_root,
                    'file_count': len(file_paths),
                }

        except ImportError:
            # rfc3161ng 未安装,降级为结构性比对(明确标注)
            tsr_result = parse_tsr(tsa_path)
            tsr_hash_raw = tsr_result.get('hash', '')
            tsr_hash = tsr_hash_raw.split(':', 1)[1] if ':' in tsr_hash_raw else tsr_hash_raw

            if computed_root.lower() == tsr_hash.lower():
                return True, {
                    'valid': True,
                    'message': f"结构验证通过(安装 rfc3161ng 可启用密码学签名验证)",
                    'timestamp': tsr_result.get('timestamp'),
                    'issuer': tsr_result.get('issuer'),
                    'root_hash': computed_root,
                    'file_count': len(file_paths),
                    'partial_verification': True,
                }
            else:
                return False, {
                    'valid': False,
                    'message': "验证失败:Merkle Root 与 TSR 记录的哈希不匹配",
                    'computed_root': computed_root,
                    'tsr_hash': tsr_hash,
                    'file_count': len(file_paths),
                }

    except Exception as e:
        return False, {'valid': False, 'message': f"深度验证失败: {e}"}


def batch_protect_works(
tools/rights/logic.py:verify_evidence_package()

3. 维权取证核心实现

3.1 浏览器截图

browser/session.py · L1057-L1071
GitHub
    def screenshot(self, path: Optional[str] = None) -> dict:
        """截图保存"""
        if not self._page:
            return {"success": False, "message": "浏览器未打开"}
        try:
            return self._run(self._async_screenshot(path))
        except Exception as e:
            return {"success": False, "message": f"截图失败: {e}"}

    async def _async_screenshot(self, path: Optional[str]) -> dict:
        import tempfile
        if not path:
            path = os.path.join(tempfile.gettempdir(), "nephele_screenshot.png")
        await self._page.screenshot(path=path, full_page=False)
        return {"success": True, "message": f"截图已保存: {path}", "output_path": path}
core/browser/session.py:BrowserManager.screenshot / _async_screenshot

关键事实full_page=False。这是视口截图,不是滚动长截图。超长页面的下方内容不会被视觉 capture。


3.2 维权取证主流程

rights/url_evidence.py · L669-L803
GitHub
    def capture(
        self,
        url: str,
        progress_callback=None,
    ) -> Dict:
        """
        Execute the full URL evidence capture pipeline.

        Args:
            url: Target URL to capture
            progress_callback: Optional (step, total, message) callback

        Returns:
            {
                "success": bool,
                "evidence_id": str,
                "output_dir": str,
                "manifest": dict,
                "timestamp_info": dict,
                "message": str,
            }
        """
        total_phases = 5
        self._record(f"Starting evidence capture for: {url}")

        try:
            # Phase 1: Environment + DNS + TLS certificate
            if progress_callback:
                progress_callback(1, total_phases, "Collecting environment info...")
            environment = self.collect_environment()
            dns_info = self.resolve_dns(url)
            tls_info = self.capture_tls_certificate(url)

            # Phase 2: Browser capture (navigate + screenshot + HTML + images)
            if progress_callback:
                progress_callback(2, total_phases, "Capturing page...")
            artifacts = self.capture_page(url, progress_callback=None)

            # Phase 3: Hash all artifacts (including TLS cert files)
            if progress_callback:
                progress_callback(3, total_phases, "Computing hashes...")
            file_hashes = self.hash_artifacts(artifacts)

            # Also hash TLS cert + response headers files
            for extra_key in ("der_path", "pem_path"):
                p = Path(tls_info.get(extra_key, ""))
                if p.exists():
                    file_hashes[p.name] = {
                        "sha256": self._sha256_file(p),
                        "size": p.stat().st_size,
                        "type": "tls_certificate",
                    }
            resp_headers_path = artifacts.get("response_headers_path")
            if resp_headers_path and Path(resp_headers_path).exists():
                p = Path(resp_headers_path)
                file_hashes[p.name] = {
                    "sha256": self._sha256_file(p),
                    "size": p.stat().st_size,
                    "type": "response_headers",
                }

            # Phase 4: Generate manifest (save log first so it's included)
            log_path = Path(self.save_log())
            self._log_committed = True  # Log content is now frozen for hashing
            file_hashes[log_path.name] = {
                "sha256": self._sha256_file(log_path),
                "size": log_path.stat().st_size,
                "type": "operation_log",
            }

            if progress_callback:
                progress_callback(4, total_phases, "Generating manifest...")
            manifest = self.generate_manifest(
                target_url=url,
                environment=environment,
                dns_info=dns_info,
                artifacts=artifacts,
                file_hashes=file_hashes,
                tls_info=tls_info,
            )

            # Phase 5: Timestamp
            if progress_callback:
                progress_callback(5, total_phases, "Requesting timestamp...")
            ts_info = self.timestamp_manifest(manifest)

            # DO NOT re-save operation log — the version already hashed in manifest
            # is the authoritative one. Any further _record() calls only live in memory.

            image_count = len(artifacts.get("images", []))

            # Phase 6: Package as .nep (tamper-proof archive)
            # Uses files on disk (which match manifest hashes)
            nep_path = self._package_nep()

            return {
                "success": True,
                "evidence_id": self._evidence_id,
                "output_dir": str(self._output_dir),
                "nep_path": str(nep_path),
                "manifest": manifest,
                "timestamp_info": ts_info,
                "page_title": artifacts.get("page_title", ""),
                "image_count": image_count,
                "file_count": len(file_hashes),
                "message": (
                    f"URL evidence captured: {len(file_hashes)} files, "
                    f"{image_count} images, timestamp by {ts_info.get('issuer', 'N/A')}"
                ),
            }

        except URLEvidenceError as e:
            self._record(f"FATAL: {e}")
            if not self._log_committed:
                self.save_log()
            return {
                "success": False,
                "evidence_id": self._evidence_id,
                "output_dir": str(self._output_dir),
                "message": str(e),
            }
        except Exception as e:
            self._record(f"UNEXPECTED ERROR: {e}")
            if not self._log_committed:
                self.save_log()
            logger.exception("URL evidence capture failed")
            return {
                "success": False,
                "evidence_id": self._evidence_id,
                "output_dir": str(self._output_dir),
                "message": f"Unexpected error: {e}",
            }

    # ===== Helpers =====
tools/rights/url_evidence.py:URLEvidenceCapture.capture()

日志不可变性保证

  • save_log() 在 manifest 生成前调用
  • 写入后设置 self._log_committed = True
  • 此后 _record() 只追加内存列表,不再写入磁盘
  • manifest 中的 file_hashes 包含日志文件的 SHA-256
  • 因此 manifest 哈希锚定了"冻结"后的日志状态

3.3 TLS 证书抓取

rights/url_evidence.py · L98-L172
GitHub
    def capture_tls_certificate(self, url: str) -> Dict:
        """
        Capture the server's TLS certificate chain.
        This proves the connection was made to the authentic server —
        you can't forge a CA-signed certificate.
        """
        import ssl
        from urllib.parse import urlparse

        self._record("Capturing TLS server certificate")
        parsed = urlparse(url if "://" in url else f"https://{url}")
        hostname = parsed.hostname or ""
        port = parsed.port or 443

        if not hostname:
            return {"error": "Invalid hostname"}

        try:
            ctx = ssl.create_default_context()
            with ctx.wrap_socket(
                socket.socket(socket.AF_INET, socket.SOCK_STREAM),
                server_hostname=hostname,
            ) as sock:
                sock.settimeout(10)
                sock.connect((hostname, port))
                cert = sock.getpeercert()
                cert_der = sock.getpeercert(binary_form=True)

            # Save DER certificate to file
            cert_path = self._output_dir / "server_certificate.der"
            cert_path.write_bytes(cert_der)

            # Also save human-readable PEM
            import base64
            pem_data = (
                "-----BEGIN CERTIFICATE-----\n"
                + base64.encodebytes(cert_der).decode()
                + "-----END CERTIFICATE-----\n"
            )
            pem_path = self._output_dir / "server_certificate.pem"
            pem_path.write_text(pem_data, encoding="utf-8")

            # Extract key fields
            subject = dict(x[0] for x in cert.get("subject", ()))
            issuer = dict(x[0] for x in cert.get("issuer", ()))
            cert_info = {
                "subject_cn": subject.get("commonName", ""),
                "issuer_cn": issuer.get("commonName", ""),
                "issuer_org": issuer.get("organizationName", ""),
                "not_before": cert.get("notBefore", ""),
                "not_after": cert.get("notAfter", ""),
                "serial_number": cert.get("serialNumber", ""),
                "san": [
                    entry[1]
                    for entry in cert.get("subjectAltName", ())
                    if entry[0] == "DNS"
                ],
                "der_path": str(cert_path),
                "pem_path": str(pem_path),
                "der_sha256": hashlib.sha256(cert_der).hexdigest(),
            }

            self._record(
                f"TLS cert captured: {cert_info['subject_cn']} "
                f"(issuer: {cert_info['issuer_org']}, "
                f"serial: {cert_info['serial_number'][:16]}...)"
            )
            return cert_info

        except Exception as e:
            self._record(f"TLS certificate capture failed: {e}")
            return {"error": str(e)}

    # ===== Step 2: DNS resolution =====
tools/rights/url_evidence.py:URLEvidenceCapture.capture_tls_certificate()

失败场景:自签名证书、ssl.SSLError、连接超时 → 返回 {"error": ...},非致命。


3.4 CAPTCHA 检测与处理

rights/url_evidence.py · L195-L208
GitHub
    _CAPTCHA_KEYWORDS = (
        "验证码", "验证", "captcha", "verify", "challenge",
        "human verification", "robot", "机器人",
    )

    def _is_captcha_page(self, title: str, url: str) -> bool:
        """Detect if the loaded page is a CAPTCHA or anti-bot challenge."""
        text = (title or "").lower()
        url_lower = (url or "").lower()
        for kw in self._CAPTCHA_KEYWORDS:
            if kw in text or kw in url_lower:
                return True
        return False
tools/rights/url_evidence.py:URLEvidenceCapture._CAPTCHA_KEYWORDS / _is_captcha_page

处理流程:

  1. headless 导航
  2. 检测标题关键词 → 判定 CAPTCHA
  3. 关闭 headless,打开 visible 浏览器
  4. 重新导航
  5. time.sleep(2) 轮询,最多 120 秒
  6. 超时后 capture 当前状态

阻塞风险:轮询期间使用 time.sleep(2) 阻塞当前线程。


4. 隐水印技术审计

4.1 架构概述

隐水印模块分为三层:

层级文件职责
底层库blind_watermark (PyPI)DWT+DCT+SVD 嵌入/提取核心算法
引擎层tools/packer/watermark_protection.py包装层:定长编码、round-trip 验证、Alpha 保留、异常回退
业务层tools/packer/logic.py / agent_api.py打包参数编排、可见水印与隐水印叠加
Worker 层core/workers/watermark_worker.py后台线程提取,避免阻塞 UI

全部运行在本地,零网络依赖。


4.2 底层库算法(blind_watermark)

Nephele 使用的底层库为 blind_watermark(github.com/guofei9987/blind_watermark),采用 DWT(离散小波变换)→ DCT(离散余弦变换)→ SVD(奇异值分解) 的三级混合域嵌入策略。

4.2.1 WaterMark 封装类

blind_watermark/blind_watermark.py

python
class WaterMark:
    def __init__(self, password_wm=1, password_img=1, block_shape=(4, 4), mode='common', processes=None):
        self.bwm_core = WaterMarkCore(password_img=password_img, mode=mode, processes=processes)
        self.password_wm = password_wm
        self.wm_bit = None
        self.wm_size = 0
 
    def read_img(self, filename=None, img=None):
        if img is None:
            img = cv2.imread(filename, flags=cv2.IMREAD_UNCHANGED)
        self.bwm_core.read_img_arr(img=img)
        return img
 
    def read_wm(self, wm_content, mode='img'):
        if mode == 'bit':
            self.wm_bit = np.array(wm_content)
        # ... img / str 模式省略 ...
 
        self.wm_size = self.wm_bit.size
        # 水印加密:用 password_wm 作为种子对 bit 序列做伪随机置乱
        np.random.RandomState(self.password_wm).shuffle(self.wm_bit)
        self.bwm_core.read_wm(self.wm_bit)
 
    def embed(self, filename=None, compression_ratio=None):
        embed_img = self.bwm_core.embed()
        if filename is not None:
            cv2.imwrite(filename=filename, img=embed_img)
        return embed_img
 
    def extract_decrypt(self, wm_avg):
        # 逆置乱:根据相同的 seed 生成相同的 shuffle index,再逆序还原
        wm_index = np.arange(self.wm_size)
        np.random.RandomState(self.password_wm).shuffle(wm_index)
        wm_avg[wm_index] = wm_avg.copy()
        return wm_avg
 
    def extract(self, filename=None, embed_img=None, wm_shape=None, mode='img'):
        if filename is not None:
            embed_img = cv2.imread(filename, flags=cv2.IMREAD_COLOR)
        self.wm_size = np.array(wm_shape).prod()
 
        if mode in ('str', 'bit'):
            wm_avg = self.bwm_core.extract_with_kmeans(img=embed_img, wm_shape=wm_shape)
        else:
            wm_avg = self.bwm_core.extract(img=embed_img, wm_shape=wm_shape)
 
        wm = self.extract_decrypt(wm_avg=wm_avg)
        return wm

关键事实

  • password_wm 用于 水印 bit 序列的伪随机置乱np.random.RandomState.shuffle
  • password_img 传递给 WaterMarkCore,用于 图像块的选择置乱
  • 加密本质是"基于已知种子的确定性 shuffle",不是现代密码学加密

4.2.2 WaterMarkCore 核心引擎

blind_watermark/bwm_core.py

python
class WaterMarkCore:
    def __init__(self, password_img=1, mode='common', processes=None):
        self.block_shape = np.array([4, 4])
        self.password_img = password_img
        self.d1, self.d2 = 36, 20  # 量化步长:越大鲁棒性越强,但失真越大
        self.pool = AutoPool(mode=mode, processes=processes)

图像预处理(read_img_arr

python
    def read_img_arr(self, img):
        # 处理透明图
        self.alpha = None
        if img.shape[2] == 4:
            if img[:, :, 3].min() < 255:
                self.alpha = img[:, :, 3]
                img = img[:, :, :3]
 
        # BGR -> YUV,补白边使像素变偶数(DWT 要求)
        self.img = img.astype(np.float32)
        self.img_shape = self.img.shape[:2]
        self.img_YUV = cv2.copyMakeBorder(
            cv2.cvtColor(self.img, cv2.COLOR_BGR2YUV),
            0, self.img.shape[0] % 2, 0, self.img.shape[1] % 2,
            cv2.BORDER_CONSTANT, value=(0, 0, 0)
        )
 
        # 对 Y/U/V 三个通道分别做 1 级 Haar DWT
        self.ca_shape = [(i + 1) // 2 for i in self.img_shape]
        self.ca_block_shape = (
            self.ca_shape[0] // self.block_shape[0],
            self.ca_shape[1] // self.block_shape[1],
            self.block_shape[0], self.block_shape[1]
        )
 
        for channel in range(3):
            self.ca[channel], self.hvd[channel] = dwt2(
                self.img_YUV[:, :, channel], 'haar'
            )
            # 将 CA(近似系数)转为 4D 分块数组
            self.ca_block[channel] = np.lib.stride_tricks.as_strided(
                self.ca[channel].astype(np.float32),
                self.ca_block_shape,
                strides=4 * np.array([
                    self.ca_shape[1] * self.block_shape[0],
                    self.block_shape[1], self.ca_shape[1], 1
                ])
            )

审计点

  • 颜色空间:BGR → YUV,水印嵌入在 Y(亮度)通道的 DWT 近似子带
  • DWT 级数:仅 1 级 Haar,不是多级分解
  • 分块大小:固定 4×4,将 CA 子带切成不重叠的小块

4.2.3 块级嵌入流程(block_add_wm_slow

python
    def block_add_wm_slow(self, arg):
        block, shuffler, i = arg
        wm_1 = self.wm_bit[i % self.wm_size]
 
        # Step 1: 对 4x4 块做 DCT
        block_dct = dct(block)
 
        # Step 2: flatten 后按 shuffler 打乱顺序(块内置乱)
        block_dct_shuffled = block_dct.flatten()[shuffler].reshape(self.block_shape)
 
        # Step 3: SVD 分解
        u, s, v = svd(block_dct_shuffled)
 
        # Step 4: 在奇异值上嵌入 1 bit 水印
        # 量化公式:把 s[0] 量化到 d1 的整数倍,再根据 wm_1 偏移 1/4 个步长
        s[0] = (s[0] // self.d1 + 1/4 + 1/2 * wm_1) * self.d1
        if self.d2:
            s[1] = (s[1] // self.d2 + 1/4 + 1/2 * wm_1) * self.d2
 
        # Step 5: 逆 SVD
        block_dct_flatten = np.dot(u, np.dot(np.diag(s), v)).flatten()
 
        # Step 6: 逆置乱
        block_dct_flatten[shuffler] = block_dct_flatten.copy()
 
        # Step 7: 逆 DCT
        return idct(block_dct_flatten.reshape(self.block_shape))

数学原理

嵌入公式(以 s[0] 为例):

text
s'[0] = (floor(s[0] / d1) + 1/4 + 1/2 * w) * d1

其中 w ∈ {0, 1} 为水印 bit。提取时:

text
w = 1  if (s[0] mod d1) > (d1 / 2)  else 0

d1=36 意味着每个 bit 的量化间隔为 36,对系数的最大扰动约为 0.75 × d1 = 27


4.2.4 块级提取流程(block_get_wm_slow

python
    def block_get_wm_slow(self, args):
        block, shuffler = args
        block_dct_shuffled = dct(block).flatten()[shuffler].reshape(self.block_shape)
        u, s, v = svd(block_dct_shuffled)
 
        # 从 s[0] 提取 bit
        wm = (s[0] % self.d1 > self.d1 / 2) * 1
        if self.d2:
            # s[1] 作为辅助,加权平均
            tmp = (s[1] % self.d2 > self.d2 / 2) * 1
            wm = (wm * 3 + tmp * 1) / 4
        return wm

审计点

  • d2(默认 20)为辅助量化步长,s[0] 权重 3,s[1] 权重 1
  • d2=0 时退化为单奇异值提取

4.2.5 全局嵌入流程(embed

python
    def embed(self):
        self.init_block_index()
        embed_ca = copy.deepcopy(self.ca)
 
        # 生成块选择置乱序列(跨块置乱)
        self.idx_shuffle = random_strategy1(
            self.password_img, self.block_num,
            self.block_shape[0] * self.block_shape[1]
        )
 
        for channel in range(3):
            # 对每个块并行执行 block_add_wm
            tmp = self.pool.map(self.block_add_wm, [
                (self.ca_block[channel][self.block_index[i]], self.idx_shuffle[i], i)
                for i in range(self.block_num)
            ])
 
            # 写回 4D 数组
            for i in range(self.block_num):
                self.ca_block[channel][self.block_index[i]] = tmp[i]
 
            # 4D -> 2D,拼接回 CA 子带
            self.ca_part[channel] = np.concatenate(np.concatenate(self.ca_block[channel], 1), 1)
            embed_ca[channel][:self.part_shape[0], :self.part_shape[1]] = self.ca_part[channel]
 
            # 逆 DWT
            embed_YUV[channel] = idwt2((embed_ca[channel], self.hvd[channel]), "haar")
 
        # 合并三通道,YUV -> BGR,裁剪回原始尺寸
        embed_img_YUV = np.stack(embed_YUV, axis=2)
        embed_img_YUV = embed_img_YUV[:self.img_shape[0], :self.img_shape[1]]
        embed_img = cv2.cvtColor(embed_img_YUV, cv2.COLOR_YUV2BGR)
        embed_img = np.clip(embed_img, a_min=0, a_max=255)
 
        if self.alpha is not None:
            embed_img = cv2.merge([embed_img.astype(np.uint8), self.alpha])
        return embed_img

关键事实

  • 循环嵌入:水印 bit 序列在 block_num 个块中循环重复嵌入(wm_bit[i % wm_size]
  • 三通道独立:Y/U/V 三个通道各嵌入完整的一份水印,提取时做平均
  • 块内置乱idx_shuffle[i]):每个 4×4 块内部的 16 个 DCT 系数顺序被打乱
  • 跨块顺序block_index):块遍历顺序是固定的行列扫描,未置乱

4.2.6 K-Means 二值化(one_dim_kmeans

python
def one_dim_kmeans(inputs):
    threshold = 0
    e_tol = 10 ** (-6)
    center = [inputs.min(), inputs.max()]
    for i in range(300):
        threshold = (center[0] + center[1]) / 2
        is_class01 = inputs > threshold
        center = [inputs[~is_class01].mean(), inputs[is_class01].mean()]
        if np.abs((center[0] + center[1]) / 2 - threshold) < e_tol:
            threshold = (center[0] + center[1]) / 2
            break
    is_class01 = inputs > threshold
    return is_class01

用于 extract_with_kmeans 模式(Nephele 的 mode="bit" 不经过此路径,直接返回 wm_avg)。


4.2.7 随机置乱策略

python
def random_strategy1(seed, size, block_shape):
    return np.random.RandomState(seed) \
        .random(size=(size, block_shape)) \
        .argsort(axis=1)

生成 size × block_shape 的随机矩阵,按行 argsort 得到每行的置乱索引。对于 password_imgsize = block_numblock_shape = 16


4.3 引擎包装层源码

Nephele 在底层库之上增加了定长编码、round-trip 验证、Alpha 通道保留和异常回退。

4.3.1 常量与工具函数

packer/watermark_protection.py · L1-L67
GitHub
"""
Nephele Workshop - Watermark Protection Module (local)

Embeds/extracts invisible blind watermarks using blind_watermark locally.
No network dependency — works offline.

Uses file-based I/O + bit mode for reliable embedding/extraction
(numpy array mode has known issues with blind_watermark library).

Public API unchanged:
    protect_image(image, level, copyright_info) -> Image
    extract_watermark(image) -> str | None
    save_with_watermark(image, output_path) -> bool

Developer: ArisFusion Studio
"""

import logging
import tempfile
from enum import Enum
from pathlib import Path
from typing import Optional

import numpy as np
from PIL import Image

logger = logging.getLogger(__name__)

# Watermark payload: fixed 32 bytes (256 bits)
# Enough for 10 Chinese chars (UTF-8, 3 bytes each) or 32 ASCII chars
WATERMARK_BYTES = 32
WATERMARK_BITS = WATERMARK_BYTES * 8
_WM_PASSWORD_IMG = 2024
_WM_PASSWORD_WM = 1314


class ProtectionLevel(Enum):
    NONE = "none"
    INVISIBLE = "invisible"


LEVEL_ALIASES = {"maximum": "invisible"}


def _text_to_bits(text: str) -> list[int]:
    """Convert text to fixed-length bit array via UTF-8."""
    raw = text.encode("utf-8")[:WATERMARK_BYTES]
    padded = raw.ljust(WATERMARK_BYTES, b"\x00")
    bits = []
    for byte in padded:
        for i in range(7, -1, -1):
            bits.append((byte >> i) & 1)
    return bits


def _bits_to_text(bits: list) -> str:
    """Convert bit array back to text via UTF-8."""
    raw = bytearray()
    for i in range(0, len(bits), 8):
        chunk = bits[i:i+8]
        if len(chunk) < 8:
            break
        val = 0
        for b in chunk:
            val = (val << 1) | (1 if b > 0.5 else 0)
        raw.append(val)
    return raw.rstrip(b"\x00").decode("utf-8", errors="replace")
tools/packer/watermark_protection.py: constants & bit helpers

审计点

  • UTF-8 定长截断:超长文本静默截断至 32 字节
  • 阈值判定:提取时 b > 0.5 视为 1,对噪声有一定容忍

4.3.2 嵌入引擎

packer/watermark_protection.py · L70-L122
GitHub
class WatermarkEngine:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
        return cls._instance

    def embed(self, image: Image.Image, text: str) -> Image.Image:
        """Embed invisible watermark using file-based blind_watermark."""
        try:
            from blind_watermark import WaterMark

            rgb = image.convert("RGB")
            alpha = image.split()[3] if image.mode == "RGBA" else None

            with tempfile.TemporaryDirectory() as tmpdir:
                orig_path = str(Path(tmpdir) / "orig.png")
                wm_path = str(Path(tmpdir) / "watermarked.png")

                rgb.save(orig_path, format="PNG")

                bits = _text_to_bits(text)

                bwm = WaterMark(password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM)
                bwm.read_img(orig_path)
                bwm.read_wm(np.array(bits), mode="bit")
                bwm.embed(wm_path)

                result_img = Image.open(wm_path).convert("RGB")

                # Verify extraction round-trip
                extracted_bits = WaterMark(
                    password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM
                ).extract(wm_path, wm_shape=WATERMARK_BITS, mode="bit")
                extracted_text = _bits_to_text(extracted_bits)

                if extracted_text == text[:WATERMARK_BYTES]:
                    logger.info("[Watermark] Verified: '%s'", extracted_text)
                else:
                    logger.warning("[Watermark] Verify mismatch: '%s' -> '%s'",
                                   text[:WATERMARK_BYTES], extracted_text)

            if alpha:
                result_img = result_img.convert("RGBA")
                result_img.putalpha(alpha)

            return result_img

        except Exception as e:
            logger.error("Embed failed: %s", e, exc_info=True)
            return image
tools/packer/watermark_protection.py:WatermarkEngine.__new__ / embed

审计点

  • 单例模式:WatermarkEngine 为单例,但 blind_watermark.WaterMark 每次新建实例
  • 文件级 I/O:通过 TemporaryDirectory + PNG 临时文件工作,规避 numpy array 模式的 dtype/shape 兼容性 bug
  • 验证失败仅打 warning,仍返回含水印图片
  • Alpha 通道:RGBA 输入先转 RGB 嵌入,完成后恢复 alpha
  • 异常回退:任何异常返回原始 image,调用方无感知失败

4.3.3 提取引擎

packer/watermark_protection.py · L123-L145
GitHub
    def extract(self, image: Image.Image) -> Optional[str]:
        """Extract invisible watermark using file-based blind_watermark."""
        try:
            from blind_watermark import WaterMark

            rgb = image.convert("RGB")
            bit_len = WATERMARK_BITS

            with tempfile.TemporaryDirectory() as tmpdir:
                img_path = str(Path(tmpdir) / "check.png")
                rgb.save(img_path, format="PNG")

                extracted_bits = WaterMark(
                    password_img=_WM_PASSWORD_IMG, password_wm=_WM_PASSWORD_WM
                ).extract(img_path, wm_shape=bit_len, mode="bit")

            text = _bits_to_text(extracted_bits)
            return text.strip() if text.strip() else None
        except Exception as e:
            logger.warning("Extract failed: %s", e)
            return None

tools/packer/watermark_protection.py:WatermarkEngine.extract

审计点

  • 提取失败返回 None,无法区分"图片无水印"与"提取过程出错"
  • 空字符串(全零填充)经 strip() 后同样返回 None

4.3.4 公共 API

packer/watermark_protection.py · L146-L167
GitHub
def protect_image(
    image: Image.Image,
    level: str = "none",
    copyright_info: str = "ARIS"
) -> Image.Image:
    level = LEVEL_ALIASES.get(level, level)
    if level == "invisible":
        return WatermarkEngine().embed(image, copyright_info)
    return image


def extract_watermark(image: Image.Image) -> Optional[str]:
    return WatermarkEngine().extract(image)


def save_with_watermark(image: Image.Image, output_path: str) -> bool:
    try:
        image.save(output_path, format='PNG')
        return True
    except Exception as e:
        logger.warning("Save failed: %s", e)
        return False
tools/packer/watermark_protection.py:protect_image / extract_watermark / save_with_watermark

4.4 业务层调用

packer/agent_api.py · L19-L76
GitHub
def pack_image(
    input_path: str,
    watermark_path: Optional[str] = None,
    output_dir: Optional[str] = None,
    watermark_mode: str = "center",
    watermark_opacity: float = 0.3,
    preview_max_size: int = 1920,
    thumbnail_max_size: int = 500,
    protection_level: str = "none",
    copyright_info: str = "© ArisFusion Studio",
    output_folder_name: str = "Delivery_Pack",
) -> dict:
    """One-click image packing: HD + preview + thumbnail."""
    try:
        src = Path(input_path)
        if not src.exists():
            return api_err(f"文件不存在: {input_path}")

        mode_map = {"center": WatermarkMode.CENTER, "tile": WatermarkMode.TILE}
        wm_mode = mode_map.get(watermark_mode, WatermarkMode.CENTER)

        wm_path = Path(watermark_path) if watermark_path else None
        if wm_path and not wm_path.exists():
            return api_err(f"水印文件不存在: {watermark_path}")

        out_dir = Path(output_dir) if output_dir else None

        packer = DeliveryPacker(
            preview_max_size=preview_max_size,
            thumbnail_max_size=thumbnail_max_size,
            watermark_opacity=watermark_opacity,
            protection_level=protection_level,
            copyright_info=copyright_info,
            output_folder_name=output_folder_name,
        )

        result_dir, results = packer.process_image(
            input_path=src,
            watermark_path=wm_path,
            output_dir=out_dir,
            watermark_mode=wm_mode,
        )

        file_info = {k: str(v) for k, v in results.items()}
        return api_ok(
            f"打包完成,共生成 {len(results)} 个文件",
            output_path=str(result_dir),
            data={"files": file_info},
        )

    except PackerError as e:
        logger.error("打包失败: %s", e)
        return api_err(str(e))
    except Exception as e:
        logger.exception("打包时发生意外错误")
        return api_err(f"意外错误: {e}")

tools/packer/agent_api.py:pack_image()

关键事实

  • protection_level 默认 "none",即默认不启用隐水印
  • copyright_info 最长 32 字节(超长静默截断)

4.5 Worker 层实现

workers/watermark_worker.py · L1-L47
GitHub
"""Watermark Extraction Worker"""
from pathlib import Path
from PySide6.QtCore import QThread, Signal, QCoreApplication

_tr = QCoreApplication.translate

from .._utils import ensure_src_path


class WatermarkExtractWorker(QThread):
    """Worker thread for watermark extraction (rivaGan model loading is slow)."""
    finished = Signal(str)  # watermark result or empty string
    logMessage = Signal(str, str)  # (message, level)

    def __init__(self, image_path: str):
        super().__init__()
        self.image_path = image_path

    def run(self):
        """Extract watermark in background thread."""
        try:
            ensure_src_path()

            from PIL import Image
            from tools.packer.watermark_protection import extract_watermark

            img_path = Path(self.image_path)
            if not img_path.exists():
                self.logMessage.emit(_tr("WatermarkExtractWorker", "文件不存在: %s") % self.image_path, "error")
                self.finished.emit("")
                return

            self.logMessage.emit(_tr("WatermarkExtractWorker", "正在提取水印: %s") % img_path.name, "info")

            image = Image.open(img_path)
            watermark = extract_watermark(image)

            if watermark:
                self.logMessage.emit(_tr("WatermarkExtractWorker", "提取成功: %s") % watermark, "success")
                self.finished.emit(watermark)
            else:
                self.logMessage.emit("未检测到隐形水印", "warning")
                self.finished.emit("")

        except Exception as e:
            self.logMessage.emit(f"提取失败: {str(e)}", "error")
            self.finished.emit("")
core/workers/watermark_worker.py (full file)

4.6 容量与编码

编码每字符字节最大字符数
ASCII132
CJK (UTF-8)310
混合视具体字符而定

5. AI 元数据检测白盒源码审计

本节审计 Nephele Workshop 的 AI 元数据 / C2PA 凭据检测 功能。该功能用于读取图片文件中已有的机器可读证据,包括 C2PA 内容凭证、生成工具元数据、平台声明和导出痕迹。它不使用视觉风格分类模型,不把"未检出"解释为"非 AI"。

5.1 架构概述

AI 元数据检测分为四层:

层级文件职责
规则层tools/validator/logic.py:MetaDataDetector元数据读取、规则匹配、证据分级、最终状态输出
C2PA 层tools/validator/c2pa_verifier.py官方 C2PA SDK 适配、manifest 读取、签名链与信任状态解析
Worker 层core/workers/ai_detector_worker.py批量检测线程、错误隔离、结果信号
UI 层gui/qml/views/AIValidatorView.qml将 raw evidence 映射成用户友好的证据标签

数据流:

text
用户选择图片
  -> core/workers/ai_detector_worker.py
  -> tools/validator/logic.py:MetaDataDetector.detect()
       ├── Pillow 读取 PNG/JPEG/WebP/TIFF 元数据(PNG info / EXIF)
       ├── tools/validator/c2pa_verifier.py:verify_c2pa_file()(官方 SDK)
       └── 原始字节扫描(JUMBF / APP11 fallback)
  -> 返回 {status, reason, tool, evidence}
  -> UI 映射证据标签

检测结果结构:

python
{
    "status": "ai" | "unknown" | "human" | "error",
    "reason": str,
    "tool": str,
    "evidence": str,
}

注意

status="human" 是历史字段名。UI 不应把它展示为"人类作品实锤",而应展示为"未发现凭据"或"凭据不足"。


5.2 MetaDataDetector 规则层

tools/validator/logic.py:MetaDataDetector 是规则匹配的主类,本身不持有状态,__init__ 为空。

5.2.1 规则常量

validator/logic.py · L20-L105
GitHub
class MetaDataDetector:
    """
    Detects AI generation metadata from image files using heuristic analysis
    of EXIF, PNG info chunks, and generation parameters.
    """

    # 1. 明确的软件签名 (强特征,优先匹配专有字符串)
    # 顺序:Midjourney 先于 Gemini,以避免误判
    AI_SOFTWARE_SIGNATURES = {
        "Midjourney": [
            re.compile(r"job id:\s*[a-f0-9\-]+", re.IGNORECASE),  # 专有 Job ID
            re.compile(r"--ar\s*\d+:\d+", re.IGNORECASE),         # --ar 参数
            re.compile(r"--v\s*\d+", re.IGNORECASE),              # --v 参数
            re.compile(r"--stylize\s*\d+", re.IGNORECASE),        # --stylize
            re.compile(r"midjourney", re.IGNORECASE),             # 显式名称
            re.compile(r"mj v", re.IGNORECASE),
            re.compile(r"mj_", re.IGNORECASE),
        ],
        "ComfyUI": [
            re.compile(r"comfyui", re.IGNORECASE),
            re.compile(r"comfyland", re.IGNORECASE),
            # workflow/prompt JSON 在 detect 中单独处理
        ],
        "Gemini (Google)": [
            re.compile(r"gemini", re.IGNORECASE),
            re.compile(r"google deepmind", re.IGNORECASE),
            re.compile(r"generated by google", re.IGNORECASE),
            re.compile(r"google imagen", re.IGNORECASE),          # 必须有 "google"
            re.compile(r"imagen by google", re.IGNORECASE),
            re.compile(r"synthid", re.IGNORECASE),                # Google SynthID 水印
            re.compile(r"nano banana", re.IGNORECASE),            # 你的工具标识
            re.compile(r"nanobanana", re.IGNORECASE),
        ],
        "DALL-E": [re.compile(r"dall-e", re.IGNORECASE), re.compile(r"dalle", re.IGNORECASE)],
        "NovelAI": [re.compile(r"novelai", re.IGNORECASE), re.compile(r"nai-diffusion", re.IGNORECASE)],
        "InvokeAI": [re.compile(r"invokeai", re.IGNORECASE), re.compile(r"invoke ai", re.IGNORECASE)],
        "Fooocus": [re.compile(r"fooocus", re.IGNORECASE)],
        "Stable Diffusion": [re.compile(r"stable diffusion", re.IGNORECASE), re.compile(r"sd\.?next", re.IGNORECASE), re.compile(r"forge", re.IGNORECASE)],
        "Leonardo.ai": [re.compile(r"leonardo.ai", re.IGNORECASE)],
        "Adobe Firefly": [re.compile(r"adobe firefly", re.IGNORECASE)],
        "Bing Image Creator": [re.compile(r"bing image creator", re.IGNORECASE)],
    }

    # 2. 生成参数指纹 (次优先,当没有软件名时)
    # 移除 Gemini 专有,将 trainedAlgorithmicMedia 作为通用 AI 标记
    GENERATION_PARAM_FINGERPRINTS = [
        (r"Steps:\s*\d+", "Stable Diffusion (Parameters)"),
        (r"CFG scale:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
        (r"Sampler:\s*\w+", "Stable Diffusion (Parameters)"),
        (r"Seed:\s*\d+", "Stable Diffusion (Parameters)"),
        (r"Model hash:\s*[a-f0-9]+", "Stable Diffusion (Parameters)"),
        (r"Model:\s*[^,\n]+", "Stable Diffusion (Parameters)"),
        (r"Negative prompt:", "Stable Diffusion (Parameters)"),
        (r"Size:\s*\d+x\d+", "Stable Diffusion (Parameters)"),
        (r"Clip skip:\s*\d+", "Stable Diffusion (Parameters)"),
        (r"Schedule type:\s*[^,\n]+", "Stable Diffusion (Parameters)"),
        (r"Denoising strength:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
        (r"Hires upscale:\s*[\d\.]+", "Stable Diffusion (Parameters)"),
        # 更新 regex 以匹配 IPTC/XMP 变体(包括 URL)
        (r"DigitalSourceType\s*[:=]\s*(?:http://cv\.iptc\.org/newscodes/digitalsourcetype/)?trainedAlgorithmicMedia", "Generative AI (IPTC/XMP Standard)"),
    ]

    RAW_METADATA_SCAN_LIMIT = 128 * 1024 * 1024

    C2PA_CONTAINER_MARKERS = [
        b"c2pa",
        b"jumbf",
        b"content credentials",
        b"contentcredentials",
        b"contentauth",
    ]

    C2PA_AI_MARKERS = [
        b"trainedalgorithmicmedia",
        b"compositewithtrainedalgorithmicmedia",
        b"algorithmicmedia",
        b"generated by ai",
        b"ai generated",
        b"adobe firefly",
        b"google imagen",
        b"synthid",
        b"dall-e",
        b"dalle",
        b"midjourney",
        b"stable diffusion",
    ]
tools/validator/logic.py:MetaDataDetector class header & rule constants

审计点

  • 规则层级:软件签名(强)→ 参数指纹(次)→ C2PA 容器 + AI marker(强)→ 弱特征(文件名)
  • 顺序敏感AI_SOFTWARE_SIGNATURESdict,Python 3.7+ 保留插入顺序。Midjourney 排在 Gemini (Google) 前,用于阻止 Midjourney 图内引用 Google 工具时被错判为 Gemini。
  • SynthID 歧义synthid 字符串被视为 Gemini 强证据,但这只是字符串匹配,本模块不解码 SynthID 像素水印
  • IPTC 通用标记trainedAlgorithmicMedia 不是 Google 专有;只有在同一文本中同时出现 Google 证据时才归因 Gemini,否则归为 Generative AI (Unknown)
  • 原始扫描上限:128 MB,超过即跳过字节级 fallback(只依赖 Pillow 与 C2PA SDK)。

5.2.2 detect() 主流程

validator/logic.py · L444-L657
GitHub
    def detect(self, image_path: str) -> Dict[str, str]:
        """
        检测图像文件中的 AI 生成元数据
        
        Args:
            image_path: 图像文件路径
            
        Returns:
            检测结果字典,包含:
            - status: "ai" | "human" | "error"
            - reason: 检测原因描述
            - tool: 检测到的 AI 工具名称(如果为 AI)
            - evidence: 证据描述
        """
        img_path = Path(image_path)
        if not img_path.exists():
            return {"status": "error", "reason": "文件不存在", "tool": "", "evidence": ""}

        evidence_found = []
        weak_evidence_found = []
        context_evidence_found = []
        detected_tool = None
        extension_mismatch = False
        minimal_web_jpeg = False
        
        try:
            with Image.open(img_path) as img:
                img.load()  # Ensure header is loaded
                actual_format = (img.format or "").upper()
                suffix = img_path.suffix.lower()
                if actual_format == "JPEG" and suffix not in {".jpg", ".jpeg", ".jpe"}:
                    extension_mismatch = True
                    context_evidence_found.append(f"File extension mismatch: {suffix} file contains JPEG data")
                elif actual_format == "PNG" and suffix != ".png":
                    extension_mismatch = True
                    context_evidence_found.append(f"File extension mismatch: {suffix} file contains PNG data")

                minimal_jpeg_keys = {"jfif", "jfif_density", "jfif_unit", "jfif_version", "progression", "progressive"}
                if actual_format == "JPEG" and set(img.info.keys()).issubset(minimal_jpeg_keys):
                    minimal_web_jpeg = True
                    context_evidence_found.append("Minimal JPEG metadata only")

                # --- 1. Check PNG Info / tEXt chunks ---
                if hasattr(img, 'info') and img.info:
                    software_value = img.info.get("Software") or img.info.get("software")
                    if isinstance(software_value, bytes):
                        software_text = software_value.decode("utf-8", errors="ignore")
                    else:
                        software_text = str(software_value or "")
                    software_lower = software_text.lower()
                    if "celsys" in software_lower or "clip studio" in software_lower:
                        context_evidence_found.append("Edited/exported by CELSYS/Clip Studio Paint")

                    if not detected_tool:
                        for key, val in img.info.items():
                            if not isinstance(val, (str, bytes)):
                                continue
                            structured_res = self._detect_structured_generator_metadata(key, val)
                            if structured_res:
                                detected_tool, marker = structured_res
                                evidence_found.append(marker)
                                break

                    # Case A: A1111 / SD
                    if 'parameters' in img.info:
                        val = img.info['parameters']
                        if isinstance(val, str) and ("Steps:" in val or "Prompt" in val):
                            detected_tool = "Stable Diffusion (A1111)"
                            evidence_found.append("Stable Diffusion parameters chunk")
                            res = self._analyze_text(val)
                            if res: 
                                detected_tool, marker = res
                                evidence_found.append(f"Parameters: {marker}")

                    # Case B: ComfyUI (专有检查)
                    if not detected_tool and ('workflow' in img.info or 'prompt' in img.info):
                        # 验证是否是 JSON
                        try:
                            if 'workflow' in img.info:
                                json.loads(img.info['workflow'])
                                evidence_found.append("Valid 'workflow' JSON")
                            if 'prompt' in img.info:
                                json.loads(img.info['prompt'])
                                evidence_found.append("Valid 'prompt' JSON")
                            detected_tool = "ComfyUI"
                        except json.JSONDecodeError:
                            pass  # 非 JSON,忽略

                    # Case C: Generic Scan (其他 info)
                    if not detected_tool:
                        for key, val in img.info.items():
                            if isinstance(val, (str, bytes)):
                                val_str = self._metadata_value_to_text(val)
                                res = self._analyze_text(val_str)
                                if res:
                                    detected_tool, marker = res
                                    evidence_found.append(f"PNG Info '{key}': {marker}")
                                    break

                    for key, val in img.info.items():
                        if not isinstance(val, (str, bytes)):
                            continue
                        val_str = val.decode("utf-8", errors="ignore") if isinstance(val, bytes) else str(val)
                        if "DigitalSourceType" in val_str and "trainedAlgorithmicMedia" in val_str:
                            marker = f"PNG Info '{key}': IPTC/XMP trainedAlgorithmicMedia"
                            if marker not in evidence_found:
                                evidence_found.append(marker)

                # --- 2. Check EXIF / XMP Data ---
                if not detected_tool:
                    exif = img.getexif()
                    if exif:
                        for tag_id, value in exif.items():
                            tag_name = ExifTags.TAGS.get(tag_id, str(tag_id))
                            
                            # Handle UserComment or other bytes
                            if isinstance(value, bytes):
                                try:
                                    value_str = value.decode('utf-8', errors='ignore')
                                except:
                                    continue
                            else:
                                value_str = str(value)

                            res = self._analyze_text(value_str)
                            if res:
                                detected_tool, marker = res
                                evidence_found.append(f"EXIF {tag_name}: {marker}")
                                break

        except Exception as e:
            return {"status": "error", "reason": f"读取错误: {str(e)}", "tool": "", "evidence": ""}

        # --- 3. Official C2PA manifest/signature verification ---
        c2pa_available = False
        c2pa_has_manifest = False
        c2pa_claim_generator = ""
        try:
            from tools.validator.c2pa_verifier import verify_c2pa_file

            c2pa_result = verify_c2pa_file(img_path)
            c2pa_available = c2pa_result.available
            if c2pa_result.has_manifest:
                c2pa_has_manifest = True
                c2pa_claim_generator = c2pa_result.claim_generator or ""
                evidence_found.append(c2pa_result.evidence_summary())
                if c2pa_result.ai_generated:
                    detected_tool = "Generative AI (C2PA Content Credentials)"
            elif c2pa_result.available and c2pa_result.error:
                evidence_found.append(f"C2PA verification error: {c2pa_result.error}")
            elif c2pa_result.available:
                evidence_found.append("No C2PA manifest found")
        except Exception as e:
            evidence_found.append(f"C2PA verification error: {e}")

        # --- 4. Raw metadata fallback: C2PA/JUMBF/XMP payloads ---
        if not detected_tool and not c2pa_has_manifest:
            res, has_c2pa_container = self._scan_raw_metadata(img_path)
            if res:
                detected_tool, marker = res
                prefix = "Raw metadata"
                if not c2pa_available and has_c2pa_container:
                    prefix = "Raw metadata (official C2PA verifier unavailable)"
                evidence_found.append(f"{prefix}: {marker}")
            elif has_c2pa_container:
                if c2pa_available:
                    evidence_found.append("C2PA Content Credentials found, no AI generation marker")
                else:
                    evidence_found.append("C2PA Content Credentials found, official verifier unavailable")

        # --- 5. Visible Google/Gemini watermark ---
        google_context = "google" in c2pa_claim_generator.lower()
        filename_lower = img_path.name.lower()
        filename_gemini_hint = "gemini" in filename_lower or "google" in filename_lower
        if not detected_tool and (google_context or filename_gemini_hint):
            if self._detect_google_visible_watermark(img_path):
                detected_tool = "Gemini (Google Visible Watermark)"
                evidence_found.append("Visible watermark: Google/Gemini sparkle mark")

        # --- 6. Final Fallback: Filename Check ---
        if not detected_tool:
            res = self._analyze_text(filename_lower, include_weak_markers=False)
            if res:
                weak_tool, marker = res
                if c2pa_has_manifest:
                    weak_evidence_found.append(f"Filename suggests {weak_tool}: {marker}")
                else:
                    weak_evidence_found.append(f"Filename suggests {weak_tool}: {marker}")

        if extension_mismatch and minimal_web_jpeg:
            context_evidence_found.append("Downloaded file appears re-encoded or metadata-stripped")

        # --- Result Construction ---
        if detected_tool:
            return {
                "status": "ai",
                "reason": f"检测到 {detected_tool} 元数据",
                "tool": detected_tool,
                "evidence": "; ".join(evidence_found + context_evidence_found)
            }
        else:
            if weak_evidence_found:
                return {
                    "status": "unknown",
                    "reason": "AI indicators found, but C2PA credentials do not declare AI generation",
                    "tool": "",
                    "evidence": "; ".join(evidence_found + context_evidence_found + weak_evidence_found)
                }
            reason = "C2PA Content Credentials do not declare AI generation" if c2pa_has_manifest else "No known AI generation metadata detected"
            return {
                "status": "human",
                "reason": reason,
                "tool": "",
                "evidence": "; ".join(evidence_found + context_evidence_found) if evidence_found or context_evidence_found else "No metadata signatures found"
tools/validator/logic.py:MetaDataDetector.detect()

审计点(执行顺序)

  1. 打开 Pillow 句柄,识别 format / suffix 不一致、最小 JPEG 元数据、CELSYS 导出等上下文证据(不升级状态)
  2. PNG info:parameters → A1111;workflow / prompt JSON → ComfyUI;其他 key 用 _detect_structured_generator_metadata() 识别 NovelAI / InvokeAI / Fooocus;最后通用 _analyze_text() 跑一遍
  3. EXIF:对每个标签做 _analyze_text()
  4. 官方 C2PA SDKverify_c2pa_file):manifest 存在即抓 claim_generator 与 AI marker
  5. 原始字节 fallback:仅在前两步都未命中时执行,读取整个文件头(≤ 128 MB),搜 c2pa / jumbf / contentauth 容器 + AI marker 字符串
  6. Google 可见水印:仅在"有 Google claim_generator"或"文件名含 gemini / google"时启用,避免对所有图片做视觉扫描
  7. 文件名弱线索:不会触发 ai,最多产生 unknown

状态收敛规则

证据组合status
detected_tool 被赋值(任一强证据命中)ai
无强证据,但有 weak_evidence_found(文件名提示)unknown
无任何证据 + C2PA manifest 存在但未声明 AIhuman(reason: "C2PA Content Credentials do not declare AI generation")
无任何证据 + 无 manifesthuman(reason: "No known AI generation metadata detected")
Pillow 抛异常error

注意

"无元数据"与"有 C2PA manifest 但声明非 AI"在 status 上都归为 human,只能通过 reason 区分。UI 必须读取 reason,否则会把"元数据被清洗"的图片误展示为"确认非 AI"。


5.2.3 元数据辅助解析

validator/logic.py · L110-L171
GitHub
    def _decode_metadata_bytes(self, data: bytes) -> str:
        """Best-effort decoding for embedded XMP/C2PA text inside binary assets."""
        if not data:
            return ""
        text = data.decode("utf-8", errors="ignore")
        if len(text.strip()) < 8:
            text = data.decode("latin-1", errors="ignore")
        return text

    def _metadata_value_to_text(self, value: Any) -> str:
        if isinstance(value, bytes):
            return value.decode("utf-8", errors="ignore")
        return str(value)

    def _json_loads(self, value: str) -> Any:
        try:
            return json.loads(value)
        except Exception:
            return None

    def _json_has_keys(self, value: Any, keys: set[str]) -> bool:
        if isinstance(value, dict):
            lowered = {str(key).lower() for key in value.keys()}
            if lowered.intersection(keys):
                return True
            return any(self._json_has_keys(item, keys) for item in value.values())
        if isinstance(value, list):
            return any(self._json_has_keys(item, keys) for item in value)
        return False

    def _detect_structured_generator_metadata(self, key: str, value: Any) -> Optional[Tuple[str, str]]:
        text = self._metadata_value_to_text(value)
        lowered_key = key.lower()
        lowered_text = text.lower()
        parsed = self._json_loads(text)

        if "invoke" in lowered_key or "invokeai" in lowered_text or "invoke ai" in lowered_text:
            return "InvokeAI", f"PNG Info '{key}': InvokeAI metadata"

        if "fooocus" in lowered_key or "fooocus" in lowered_text:
            return "Fooocus", f"PNG Info '{key}': Fooocus metadata"

        if "novelai" in lowered_text or "nai-diffusion" in lowered_text:
            return "NovelAI", f"PNG Info '{key}': NovelAI metadata"

        if isinstance(parsed, dict):
            has_generation_keys = self._json_has_keys(
                parsed,
                {"sampler", "sampler_name", "steps", "scale", "cfg_scale", "seed", "model", "model_hash", "uc"},
            )
            has_novelai_shape = self._json_has_keys(parsed, {"uc"}) and self._json_has_keys(parsed, {"sampler", "steps", "scale"})
            if has_novelai_shape:
                return "NovelAI", f"PNG Info '{key}': NovelAI generation JSON"

            if "invoke" in lowered_key and has_generation_keys:
                return "InvokeAI", f"PNG Info '{key}': InvokeAI generation JSON"

            if lowered_key in {"sd-metadata", "sd_metadata", "generation_data", "generation_data_formatted"} and has_generation_keys:
                return "Stable Diffusion", f"PNG Info '{key}': Stable Diffusion generation JSON"

        return None
tools/validator/logic.py:MetaDataDetector helpers

审计点

  • NovelAI 判据:JSON 中同时含 uc 与(sampler / steps / scale)之一 → NovelAI generation JSON。这个形状判据允许 NovelAI 把显式名称清洗后仍被识别。
  • InvokeAI / SD JSON:必须"key 名 + 生成字段"同时命中,防止任意 JSON 被当成生成元数据。

5.2.4 原始字节扫描(C2PA/JUMBF fallback)

validator/logic.py · L172-L317
GitHub
    def _analyze_c2pa_bytes(self, data: bytes) -> Optional[Tuple[str, str]]:
        """Detect AI signals embedded in C2PA/JUMBF Content Credentials payloads."""
        if not data:
            return None

        lowered = data.lower()
        has_c2pa_container = any(marker in lowered for marker in self.C2PA_CONTAINER_MARKERS)
        if not has_c2pa_container:
            return None

        for marker in self.C2PA_AI_MARKERS:
            if marker in lowered:
                return "Generative AI (C2PA Content Credentials)", marker.decode("ascii", errors="ignore")

        text = self._decode_metadata_bytes(data)
        res = self._analyze_text(text, include_weak_markers=False)
        if res:
            tool, marker = res
            return tool, f"C2PA payload: {marker}"

        return None

    # PNG ancillary chunks that may carry metadata text. Skip IDAT/PLTE/etc.
    PNG_METADATA_CHUNKS = {b"tEXt", b"iTXt", b"zTXt", b"eXIf", b"iCCP", b"caBX", b"jumb"}
    # WebP RIFF chunks that may carry metadata.
    WEBP_METADATA_CHUNKS = {b"EXIF", b"XMP ", b"ICCP", b"JUMB"}

    def _extract_png_metadata(self, data: bytes) -> bytes:
        out = bytearray()
        pos = 8  # skip signature
        n = len(data)
        iend_end = n
        while pos + 12 <= n:
            try:
                length = int.from_bytes(data[pos:pos + 4], "big")
                chunk_type = data[pos + 4:pos + 8]
            except Exception:
                break
            data_end = pos + 8 + length
            if length < 0 or data_end + 4 > n:
                break
            if chunk_type in self.PNG_METADATA_CHUNKS:
                out.extend(data[pos + 8:data_end])
                out.append(0)
            if chunk_type == b"IEND":
                iend_end = data_end + 4
                break
            pos = data_end + 4  # skip CRC
        # Some pipelines append C2PA/JUMBF payloads after IEND. Include any
        # trailing bytes verbatim — they cannot be pixel data.
        if iend_end < n:
            out.extend(data[iend_end:])
        return bytes(out)

    def _extract_jpeg_metadata(self, data: bytes) -> bytes:
        out = bytearray()
        pos = 2  # skip SOI (FFD8)
        n = len(data)
        while pos + 4 <= n:
            if data[pos] != 0xFF:
                break
            # Skip fill bytes
            while pos < n and data[pos] == 0xFF:
                pos += 1
            if pos >= n:
                break
            marker = data[pos]
            pos += 1
            # Standalone markers without length payload
            if marker == 0xD9:  # EOI
                break
            if marker == 0xDA:  # SOS — compressed image data starts here
                break
            if marker == 0x00 or marker == 0x01 or 0xD0 <= marker <= 0xD8:
                continue
            if pos + 2 > n:
                break
            seg_len = int.from_bytes(data[pos:pos + 2], "big")
            if seg_len < 2 or pos + seg_len > n:
                break
            seg_data = data[pos + 2:pos + seg_len]
            # APP0..APP15 (E0..EF) and COM (FE) carry text-style metadata
            if 0xE0 <= marker <= 0xEF or marker == 0xFE:
                out.extend(seg_data)
                out.append(0)
            pos += seg_len
        return bytes(out)

    def _extract_webp_metadata(self, data: bytes) -> bytes:
        out = bytearray()
        n = len(data)
        if n < 12:
            return b""
        pos = 12  # skip RIFF + size + WEBP
        while pos + 8 <= n:
            fourcc = data[pos:pos + 4]
            size = int.from_bytes(data[pos + 4:pos + 8], "little")
            if size < 0 or pos + 8 + size > n:
                break
            if fourcc in self.WEBP_METADATA_CHUNKS:
                out.extend(data[pos + 8:pos + 8 + size])
                out.append(0)
            pos += 8 + size + (size & 1)  # chunks are padded to even size
        return bytes(out)

    def _extract_metadata_blocks(self, data: bytes) -> bytes:
        """
        Return bytes from text/metadata-bearing containers only, skipping
        pixel/compressed payloads. This prevents random regex hits in raw
        IDAT / SOS data from being mistaken for AI markers.
        """
        if data.startswith(b"\x89PNG\r\n\x1a\n"):
            return self._extract_png_metadata(data)
        if data[:2] == b"\xff\xd8":
            return self._extract_jpeg_metadata(data)
        if data[:4] == b"RIFF" and data[8:12] == b"WEBP":
            return self._extract_webp_metadata(data)
        # Unknown format: keep legacy behavior (whole file).
        return data

    def _scan_raw_metadata(self, image_path: Path) -> Tuple[Optional[Tuple[str, str]], bool]:
        """
        Scan only metadata-bearing chunks/segments of an image, not pixel data.

        C2PA manifests live in PNG `caBX`/`jumb` chunks or JPEG APP11 segments.
        Pillow may not surface them through Image.info, so we parse the
        container ourselves and feed only those bytes to the text analyzer.
        """
        try:
            size = image_path.stat().st_size
            if size > self.RAW_METADATA_SCAN_LIMIT:
                return None, False
            data = image_path.read_bytes()
        except OSError:
            return None, False

        metadata_bytes = self._extract_metadata_blocks(data)
        lowered = metadata_bytes.lower()
        has_c2pa_container = any(marker in lowered for marker in self.C2PA_CONTAINER_MARKERS)
        c2pa_res = self._analyze_c2pa_bytes(metadata_bytes)
        if c2pa_res:
            return c2pa_res, has_c2pa_container

        text = self._decode_metadata_bytes(metadata_bytes)
        return self._analyze_text(text, include_weak_markers=False), has_c2pa_container
tools/validator/logic.py:_analyze_c2pa_bytes / metadata extractors / _scan_raw_metadata

审计点

  • 仅在"官方 C2PA SDK 读不到 manifest"且"PNG info/EXIF 未命中"时执行,避免对所有图片做整文件扫描
  • 扫描整个文件字节,性能开销取决于文件大小;> 128 MB 的文件直接跳过
  • 扫描到 container 但无 AI marker → has_c2pa_container=Truedetect() 把它作为上下文信息附在 evidence 里,不升级状态
  • 该路径对恶意伪造 C2PA 字符串没有防御能力(见 §5.3 官方 SDK 做的签名验证才是可信链路)

5.2.5 文本规则分析

validator/logic.py · L394-L443
GitHub
    def _analyze_text(self, text: str, include_weak_markers: bool = True) -> Optional[Tuple[str, str]]:
        """
        Analyze a string for AI markers.
        Returns: (tool_name, found_marker) or None
        
        检测优先级:
        1. 先检查明确的软件签名(使用 regex 更严格匹配)
        2. 再检查生成参数指纹(如果 IPTC 标记,检查是否有 Google 证据,否则通用)
        """
        if not text:
            return None
        
        text_lower = text.lower()

        # 1. Check Explicit Software Names (按字典顺序,Midjourney 先)
        weak_patterns = {r"mj_"}
        for tool, patterns in self.AI_SOFTWARE_SIGNATURES.items():
            for pattern in patterns:
                if not include_weak_markers and pattern.pattern in weak_patterns:
                    continue
                if pattern.search(text_lower):
                    return tool, pattern.pattern

        # 2. Check Generation Parameter Fingerprints
        match_count = 0
        evidence = []
        detected_tool = None
        weak_evidence_found = []
        for pattern, tool_name in self.GENERATION_PARAM_FINGERPRINTS:
            match = re.search(pattern, text, re.IGNORECASE)
            if match:
                if "IPTC/XMP Standard" in tool_name:
                    # 如果是通用 IPTC,检查是否有 Google 证据
                    if any(re.search(p, text_lower) for p in self.AI_SOFTWARE_SIGNATURES["Gemini (Google)"]):
                        return "Gemini (Google)", "IPTC/XMP with Google Evidence"
                    else:
                        detected_tool = "Generative AI (Unknown)"
                        evidence.append("IPTC/XMP Signature")
                else:
                    match_count += 1
                    evidence.append(pattern)
        
        if match_count >= 1:
            return "Stable Diffusion WebUI", "Generation Parameters Detected"

        if detected_tool:
            return detected_tool, "; ".join(evidence)

        return None
tools/validator/logic.py:MetaDataDetector._analyze_text

审计点

  • include_weak_markers=False 是文件名扫描模式:mj_ 这种两字符前缀太容易误命中(比如 mj_portrait.jpg),在文件名上下文中被屏蔽
  • 参数指纹匹配 ≥ 1 即升级 SD:单个 Steps:Sampler: 就足够判定,宽松但可能对"用户在注释里抄了 SD 参数"的非 AI 图产生误报
  • IPTC 归因分支trainedAlgorithmicMedia 碰到 Google 关键词即归 Gemini,否则标 Generative AI (Unknown)。不会归给 Midjourney / DALL-E

5.2.6 可见 Google/Gemini 水印

validator/logic.py · L318-L393
GitHub
    def _detect_google_visible_watermark(self, image_path: Path) -> bool:
        """
        Detect the visible Google/Gemini sparkle mark often placed near the
        lower-right area of generated images. This is visual evidence, not a
        SynthID decoder.
        """
        try:
            with Image.open(image_path) as img:
                img = img.convert("RGB")
                scale = 512 / max(img.size)
                if scale < 1:
                    img = img.resize((round(img.width * scale), round(img.height * scale)))

                width, height = img.size
                pix = img.load()
                mask = set()
                for y in range(height // 2, height):
                    for x in range(width // 2, width):
                        r, g, b = pix[x, y]
                        saturation = max(r, g, b) - min(r, g, b)
                        luminance = (r * 299 + g * 587 + b * 114) // 1000
                        if 110 <= luminance <= 245 and saturation < 28:
                            if not (luminance > 235 and saturation < 8):
                                mask.add((x, y))

                seen = set()
                for pt in list(mask):
                    if pt in seen:
                        continue
                    stack = [pt]
                    seen.add(pt)
                    xs = []
                    ys = []
                    while stack:
                        x, y = stack.pop()
                        xs.append(x)
                        ys.append(y)
                        for nx in (x - 1, x, x + 1):
                            for ny in (y - 1, y, y + 1):
                                npt = (nx, ny)
                                if npt in mask and npt not in seen:
                                    seen.add(npt)
                                    stack.append(npt)

                    area = len(xs)
                    min_x, max_x = min(xs), max(xs)
                    min_y, max_y = min(ys), max(ys)
                    comp_w = max_x - min_x + 1
                    comp_h = max_y - min_y + 1
                    center_x = (min_x + max_x) / 2
                    center_y = (min_y + max_y) / 2
                    density = area / max(1, comp_w * comp_h)

                    if not (80 <= area <= 900):
                        continue
                    if not (14 <= comp_w <= 60 and 14 <= comp_h <= 80):
                        continue
                    if not (0.12 <= density <= 0.70):
                        continue
                    if center_x < width * 0.62 or center_y < height * 0.55:
                        continue

                    # A sparkle mark has a sparse center-heavy diamond shape.
                    mid_x = (min_x + max_x) / 2
                    mid_y = (min_y + max_y) / 2
                    near_center = sum(
                        1 for x, y in zip(xs, ys)
                        if abs(x - mid_x) <= comp_w * 0.25 and abs(y - mid_y) <= comp_h * 0.25
                    )
                    if near_center / area >= 0.18:
                        return True
        except Exception:
            return False

        return False
tools/validator/logic.py:MetaDataDetector._detect_google_visible_watermark

审计点

  • 不是 SynthID 解码器,纯形态学判别:低饱和、中偏高亮度、右下象限、面积 80–900 px、长宽比约束、中心稠密
  • 长边缩放到 512 px以标准化判据
  • 仅在 Google 上下文下启用(见 §5.2.2 流程第 5 步),避免对所有图片跑一遍 O(W·H) 的扫描
  • 漏检场景:白底图、裁剪掉右下角、重压缩导致 sparkle 连通块破碎
  • 误报场景:右下角原本有低饱和装饰元素(月亮、星星、LOGO 等)

5.3 C2PA 官方 SDK 适配

tools/validator/c2pa_verifier.py 把可选的 c2pa-python 依赖隔离到单独模块。已安装时,它通过官方 SDK 读取 manifest 并要求 SDK 验证 manifest / 签名链。

5.3.1 C2PAVerificationResult

validator/c2pa_verifier.py · L17-L87
GitHub
AI_DIGITAL_SOURCE_MARKERS = {
    "trainedalgorithmicmedia",
    "compositewithtrainedalgorithmicmedia",
    "algorithmicmedia",
    "generated by ai",
    "ai generated",
    "adobe firefly",
    "google imagen",
    "synthid",
    "dall-e",
    "dalle",
    "midjourney",
    "stable diffusion",
}


@dataclass
class C2PAVerificationResult:
    available: bool
    has_manifest: bool = False
    verified: bool | None = None
    trusted: bool | None = None
    validation_state: str = ""
    validation_results: dict[str, Any] | None = None
    manifest_store: dict[str, Any] | None = None
    active_manifest: dict[str, Any] | None = None
    sdk_version: str = ""
    embedded: bool | None = None
    remote_url: str | None = None
    ai_markers: list[str] = field(default_factory=list)
    claim_generator: str = ""
    error: str = ""
    asset_format: str = ""
    extension_mismatch: bool = False
    validation_issues: list[str] = field(default_factory=list)

    @property
    def ai_generated(self) -> bool:
        return bool(self.ai_markers)

    def evidence_summary(self) -> str:
        if not self.has_manifest:
            if self.available:
                parts = ["No C2PA manifest found"]
                if self.asset_format:
                    parts.append(f"asset_format={self.asset_format}")
                if self.extension_mismatch:
                    parts.append("extension_mismatch=true")
                return "; ".join(parts)
            return self.error or "C2PA verifier unavailable"

        parts = ["C2PA manifest found"]
        if self.validation_state:
            parts.append(f"validation_state={self.validation_state}")
        if self.verified is not None:
            parts.append(f"signature_chain={'verified' if self.verified else 'failed'}")
        if self.trusted is not None:
            parts.append(f"trust={'trusted' if self.trusted else 'untrusted'}")
        if self.claim_generator:
            parts.append(f"claim_generator={self.claim_generator}")
        if self.ai_markers:
            parts.append("ai_markers=" + ",".join(self.ai_markers[:4]))
        if self.validation_issues:
            parts.append("validation_issues=" + ",".join(self.validation_issues[:4]))
        if self.remote_url:
            parts.append(f"remote_manifest={self.remote_url}")
        if self.sdk_version:
            parts.append(f"c2pa_sdk={self.sdk_version}")
        return "; ".join(parts)

tools/validator/c2pa_verifier.py:AI_DIGITAL_SOURCE_MARKERS / C2PAVerificationResult

审计点

  • verified / trustedbool | None 三值:None = 无法判定,不是失败
  • ai_generated派生属性:凡是在 manifest store 任意深度字符串中命中 AI_DIGITAL_SOURCE_MARKERS 即为 True,不要求签名 / trust
  • evidence_summary() 是 UI 摘要,最多列 4 个 AI marker 和 4 个 validation issue

5.3.2 verify_c2pa_file 入口与 SDK 配置

validator/c2pa_verifier.py · L285-L388
GitHub
def _read_json(reader: Any) -> dict[str, Any]:
    raw = reader.json()
    if isinstance(raw, bytes):
        raw = raw.decode("utf-8", errors="replace")
    return json.loads(raw)


def _make_context(c2pa_module: Any) -> Any:
    settings = {
        "verify": {
            "verify_after_reading": True,
            "verify_trust": True,
            "verify_timestamp_trust": True,
            "ocsp_fetch": True,
            "remote_manifest_fetch": True,
        },
        "trust": {
            "verify_trust_list": True,
        },
    }
    context_cls = getattr(c2pa_module, "Context", None)
    if context_cls and hasattr(context_cls, "from_dict"):
        return context_cls.from_dict(settings)
    if hasattr(c2pa_module, "load_settings"):
        c2pa_module.load_settings(settings)
    return None


def _detect_asset_format(path: Path) -> tuple[str, bool]:
    suffix = path.suffix.lower()
    try:
        header = path.read_bytes()[:16]
    except OSError:
        return "", False

    asset_format = ""
    if header.startswith(b"\x89PNG\r\n\x1a\n"):
        asset_format = "png"
    elif header.startswith(b"\xff\xd8\xff"):
        asset_format = "jpeg"
    elif header[:4] == b"RIFF" and header[8:12] == b"WEBP":
        asset_format = "webp"

    expected_suffixes = {
        "png": {".png"},
        "jpeg": {".jpg", ".jpeg", ".jpe"},
        "webp": {".webp"},
    }
    mismatch = bool(asset_format and suffix and suffix not in expected_suffixes.get(asset_format, set()))
    return asset_format, mismatch


def verify_c2pa_file(image_path: str | Path) -> C2PAVerificationResult:
    path = Path(image_path)

    try:
        import c2pa
    except Exception as exc:
        return C2PAVerificationResult(
            available=False,
            error=f"c2pa-python unavailable: {exc}",
        )

    result = C2PAVerificationResult(available=True)
    try:
        result.asset_format, result.extension_mismatch = _detect_asset_format(path)
        result.sdk_version = str(c2pa.sdk_version()) if hasattr(c2pa, "sdk_version") else ""
        context = _make_context(c2pa)
        if result.extension_mismatch and result.asset_format:
            with path.open("rb") as stream:
                try:
                    reader = c2pa.Reader.try_create(result.asset_format, stream, None, context)
                except TypeError:
                    reader = c2pa.Reader.try_create(result.asset_format, stream)
        else:
            try:
                reader = c2pa.Reader.try_create(str(path), None, None, context)
            except TypeError:
                reader = c2pa.Reader.try_create(str(path))

        if reader is None:
            return result

        with reader:
            result.has_manifest = True
            result.manifest_store = _read_json(reader)
            result.validation_state = str(reader.get_validation_state() or "")
            result.validation_results = reader.get_validation_results() or None
            result.active_manifest = reader.get_active_manifest() or None
            result.embedded = bool(reader.is_embedded())
            result.remote_url = reader.get_remote_url() or None

        result.verified, result.trusted = _verification_flags(
            result.validation_state,
            result.validation_results,
        )
        result.validation_issues = _collect_validation_issues(result.validation_results)
        result.ai_markers = _find_ai_markers(result.manifest_store or {})
        result.claim_generator = _extract_claim_generator(result.active_manifest)
        return result

    except Exception as exc:
        result.error = str(exc)
        return result
tools/validator/c2pa_verifier.py:_make_context / _detect_asset_format / verify_c2pa_file

审计点

  • 扩展名错配时的 SDK 回退:如果文件头显示是 JPEG 但后缀是 .png,改用 Reader.try_create(format, stream, ...) 流式接口,告诉 SDK 真实格式。直接传路径会让 SDK 按后缀判断失败。
  • SDK 配置启用:trust list、timestamp trust、OCSP 吊销检查、remote manifest 远端拉取全部开启
  • SDK 版本兼容:两种 try_create 签名都尝试(带 context / 不带),兼容不同版本 c2pa-python
  • reader 为空不是错误has_manifest 保持 Falseavailable=True,错误字段为空

5.3.3 签名链与信任状态分离

validator/c2pa_verifier.py · L141-L284
GitHub
def _contains_failure(value: Any) -> bool:
    if isinstance(value, dict):
        for key, item in value.items():
            key_lower = str(key).lower()
            if key_lower in {"failure", "failures", "error", "errors"} and item:
                return True
            if _contains_failure(item):
                return True
    elif isinstance(value, list):
        for item in value:
            if _contains_failure(item):
                return True
    elif isinstance(value, str):
        lowered = value.lower()
        return any(token in lowered for token in ("invalid", "failure", "error", "untrusted"))
    return False


def _contains_trust_failure(value: Any) -> bool:
    if isinstance(value, dict):
        for key, item in value.items():
            key_lower = str(key).lower()
            if "trust" in key_lower and _contains_failure(item):
                return True
            if _contains_trust_failure(item):
                return True
    elif isinstance(value, list):
        for item in value:
            if _contains_trust_failure(item):
                return True
    elif isinstance(value, str):
        lowered = value.lower()
        return "trust" in lowered and any(token in lowered for token in ("invalid", "failure", "error", "untrusted"))
    return False


def _contains_non_trust_failure(value: Any) -> bool:
    if isinstance(value, dict):
        for key, item in value.items():
            key_lower = str(key).lower()
            if key_lower in {"failure", "failures", "error", "errors"} and item:
                if not _contains_only_trust_related(item):
                    return True
            if _contains_non_trust_failure(item):
                return True
    elif isinstance(value, list):
        for item in value:
            if _contains_non_trust_failure(item):
                return True
    elif isinstance(value, str):
        lowered = value.lower()
        if any(token in lowered for token in ("invalid", "failure", "error")):
            return "trust" not in lowered and "untrusted" not in lowered
    return False


def _contains_only_trust_related(value: Any) -> bool:
    strings = list(_iter_strings(value))
    if not strings:
        return False
    for text in strings:
        lowered = text.lower()
        if any(token in lowered for token in ("invalid", "failure", "error")):
            if "trust" not in lowered and "untrusted" not in lowered:
                return False
    return any("trust" in text.lower() or "untrusted" in text.lower() for text in strings)


def _contains_trust_signal(value: Any) -> bool:
    if isinstance(value, dict):
        for key, item in value.items():
            if "trust" in str(key).lower():
                return True
            if _contains_trust_signal(item):
                return True
    elif isinstance(value, list):
        for item in value:
            if _contains_trust_signal(item):
                return True
    elif isinstance(value, str):
        return "trust" in value.lower()
    return False


def _active_manifest_results(validation_results: dict[str, Any] | None) -> dict[str, Any] | None:
    if not isinstance(validation_results, dict):
        return None
    active = validation_results.get("activeManifest")
    return active if isinstance(active, dict) else None


def _has_validation_code(value: Any, code_fragment: str) -> bool:
    if isinstance(value, dict):
        code = value.get("code")
        if isinstance(code, str) and code_fragment in code:
            return True
        return any(_has_validation_code(item, code_fragment) for item in value.values())
    if isinstance(value, list):
        return any(_has_validation_code(item, code_fragment) for item in value)
    return False


def _collect_validation_issues(validation_results: dict[str, Any] | None) -> list[str]:
    issues: list[str] = []
    if not validation_results:
        return issues
    if _has_validation_code(validation_results, "ingredient.malformed"):
        issues.append("ingredient_malformed")
    if _has_validation_code(validation_results, "timeStamp.untrusted"):
        issues.append("timestamp_untrusted")
    return issues


def _verification_flags(validation_state: str, validation_results: dict[str, Any] | None) -> tuple[bool | None, bool | None]:
    state = (validation_state or "").lower()
    active_results = _active_manifest_results(validation_results) or validation_results
    has_failure = _contains_failure(active_results) if active_results else False
    has_non_trust_failure = _contains_non_trust_failure(active_results) if active_results else False
    has_trust_failure = _contains_trust_failure(active_results) if active_results else False
    has_trust_signal = _contains_trust_signal(active_results) if active_results else False
    active_signature_valid = _has_validation_code(active_results, "claimSignature.validated")
    active_data_hash_valid = _has_validation_code(active_results, "assertion.dataHash.match")

    verified: bool | None
    if active_signature_valid and active_data_hash_valid and not has_non_trust_failure:
        verified = True
    elif "valid" in state and "invalid" not in state and not has_non_trust_failure:
        verified = True
    elif "invalid" in state or (has_failure and has_non_trust_failure):
        verified = False
    else:
        verified = None

    trusted: bool | None
    if has_trust_failure:
        trusted = False
    elif verified is True and has_trust_signal:
        trusted = True
    else:
        trusted = None

    return verified, trusted

tools/validator/c2pa_verifier.py: failure / trust / verification helpers

审计点

  • active manifest 优先_active_manifest_results() 先取 validation_results["activeManifest"],取不到才回退到全树
  • "签名有效 + 数据哈希有效""没有非 trust 类失败"verified=True。即使 validation_state 字段本身含 "invalid"(可能来自 ingredient chain 的问题),也不拉低 active manifest 的签名结论
  • trust 独立评估:trust 失败不会把 verified 打成 False。UI 可以得到 signature_chain=verified; trust=untrusted 这种组合,表示签名本身可验证,但签名证书不在当前信任列表(常见于 Google、OpenAI 尚未纳入默认 trust anchors)

5.3.4 AI marker 与 claim_generator 提取

validator/c2pa_verifier.py · L88-L140
GitHub
def _iter_strings(value: Any) -> Iterable[str]:
    if isinstance(value, str):
        yield value
    elif isinstance(value, dict):
        for key, item in value.items():
            yield str(key)
            yield from _iter_strings(item)
    elif isinstance(value, list):
        for item in value:
            yield from _iter_strings(item)


def _find_ai_markers(manifest_store: dict[str, Any]) -> list[str]:
    found: list[str] = []
    seen = set()
    for text in _iter_strings(manifest_store):
        lowered = text.lower()
        for marker in AI_DIGITAL_SOURCE_MARKERS:
            if marker in lowered and marker not in seen:
                seen.add(marker)
                found.append(marker)
    return found


def _extract_claim_generator(active_manifest: dict[str, Any] | None) -> str:
    if not active_manifest:
        return ""

    claim_generator = active_manifest.get("claim_generator")
    if isinstance(claim_generator, str):
        return claim_generator
    if isinstance(claim_generator, dict):
        name = claim_generator.get("name") or claim_generator.get("identifier")
        version = claim_generator.get("version")
        if name and version:
            return f"{name} {version}"
        if name:
            return str(name)

    infos = active_manifest.get("claim_generator_info")
    if isinstance(infos, list) and infos:
        first = infos[0]
        if isinstance(first, dict):
            name = first.get("name") or first.get("identifier")
            version = first.get("version")
            if name and version:
                return f"{name} {version}"
            if name:
                return str(name)

    return ""

tools/validator/c2pa_verifier.py:_iter_strings / _find_ai_markers / _extract_claim_generator

审计点

  • 递归遍历 manifest store 所有字符串(包括 key),大小写无关匹配
  • trainedalgorithmicmedia / algorithmicmedia / synthid 等 C2PA / IPTC 语义被视为强证据
  • 不解码不可见 SynthID 水印synthid 出现在 manifest store 中代表 C2PA 声明"图片含 SynthID",不代表本地完成了 SynthID 解码
  • claim_generator 兼容三种形态:字符串、dict({name, version} / {identifier, version})、claim_generator_info 数组

5.3.5 Validation issues

validator/c2pa_verifier.py · L232-L253
GitHub
def _has_validation_code(value: Any, code_fragment: str) -> bool:
    if isinstance(value, dict):
        code = value.get("code")
        if isinstance(code, str) and code_fragment in code:
            return True
        return any(_has_validation_code(item, code_fragment) for item in value.values())
    if isinstance(value, list):
        return any(_has_validation_code(item, code_fragment) for item in value)
    return False


def _collect_validation_issues(validation_results: dict[str, Any] | None) -> list[str]:
    issues: list[str] = []
    if not validation_results:
        return issues
    if _has_validation_code(validation_results, "ingredient.malformed"):
        issues.append("ingredient_malformed")
    if _has_validation_code(validation_results, "timeStamp.untrusted"):
        issues.append("timestamp_untrusted")
    return issues

tools/validator/c2pa_verifier.py:_has_validation_code / _collect_validation_issues

典型 evidence summary 输出:

text
C2PA manifest found;
validation_state=Invalid;
signature_chain=verified;
trust=untrusted;
ai_markers=algorithmicmedia,trainedalgorithmicmedia,synthid;
validation_issues=ingredient_malformed,timestamp_untrusted

审计点

  • validation_state=Invalid 不一定代表当前图片数据被篡改。active manifest 的签名和数据哈希可以有效,同时 ingredient chain 存在问题
  • UI 应展示为"签名链有效 / 证书未在当前信任列表 / 存在链路问题",不是"签名失败"

5.4 Worker 层

workers/ai_detector_worker.py · L1-L63
GitHub
"""AI Metadata Detection Worker"""
import logging
from pathlib import Path
from PySide6.QtCore import QThread, Signal, QCoreApplication

_tr = QCoreApplication.translate

logger = logging.getLogger(__name__)


class AIDetectorWorker(QThread):
    """Worker thread for AI metadata detection."""

    progress = Signal(int, int, str)  # (current, total, filename)
    item_finished = Signal(str, str, str, str, str)  # (path, status, reason, tool, evidence)
    all_finished = Signal()
    model_status = Signal(str)  # Status for DynamicIsland

    def __init__(self, file_paths: list):
        super().__init__()
        self.file_paths = file_paths

    def run(self):
        """Execute metadata detection in background thread."""
        try:
            from .._utils import ensure_src_path
            ensure_src_path()
            
            from tools.validator.logic import MetaDataDetector
            detector = MetaDataDetector()

            self.model_status.emit(_tr("AIDetectorWorker", "扫描元数据..."))

            total = len(self.file_paths)
            logger.info("开始检测 %d 个文件", total)

            for i, path in enumerate(self.file_paths):
                if self.isInterruptionRequested():
                    break

                filename = Path(path).name
                self.progress.emit(i + 1, total, filename)

                try:
                    res = detector.detect(path)
                    self.item_finished.emit(
                        path,
                        res["status"],
                        res["reason"],
                        res["tool"] or "",
                        res["evidence"] or ""
                    )
                except Exception as e:
                    logger.error("[MetaDetector] 检测文件出错 %s: %s", path, e)
                    self.item_finished.emit(path, "error", _tr("AIDetectorWorker", "检测出错: %s") % str(e), "", "")

            logger.info("[MetaDetector] 检测完成,共 %d 个文件", total)

        except Exception as e:
            logger.error("[MetaDetector] Worker error: %s", e, exc_info=True)
        finally:
            self.all_finished.emit()
core/workers/ai_detector_worker.py (full file)

审计点

  • 逐文件错误隔离:单个文件的异常被捕获并转换为 status="error" 信号,不影响后续文件
  • 可中断isInterruptionRequested() 允许 UI 取消批量任务
  • Detector 实例复用:整批共享一个 MetaDataDetector,但该类自身无状态(__init__ 为空),无跨文件污染风险
  • 信号载荷item_finished 串一张图一发,避免积压

5.5 证据分级表

证据等级是否触发 ai
C2PA ai_markerstrainedAlgorithmicMedia / synthid / ...)
ComfyUI workflow / prompt JSON
Stable Diffusion parameters chunk
NovelAI 生成 JSON(uc + sampler/steps/scale
InvokeAI / Fooocus 元数据
Midjourney Job ID / --ar / --v / --stylize
IPTC/XMP trainedAlgorithmicMedia
Google 可见 sparkle 水印(Google context 下)
文件名含平台词否(最多 unknown
扩展名错配上下文
最小 JPEG 元数据上下文
CELSYS / Clip Studio 导出标记上下文
无任何元数据无证据

5.6 测试覆盖

tests/test_validator_c2pa.py 当前覆盖:

  • C2PA raw payload AI marker
  • C2PA manifest 无 AI marker 不误报
  • 官方 C2PA verifier mock 驱动检测
  • Google C2PA 无 AI marker 时压制 raw false positive
  • Google / Gemini 可见水印上下文检测
  • Midjourney Job ID 与 IPTC AI source
  • 文件名单独 Gemini 仅输出 unknown
  • 平台重编码 / .png 后缀 JPEG 不升级 AI
  • A1111 Stable Diffusion parameters
  • NovelAI generation JSON
  • InvokeAI metadata
  • Fooocus metadata
  • active C2PA signature valid but ingredient chain has issue 的分离解释

建议持续加入真实样本回归集:OpenAI、Gemini、Midjourney、ComfyUI、A1111、Forge、Fooocus、NovelAI、InvokeAI、Adobe Firefly、Tusi / Liblib / TensorArt 等平台下载图。


6. 威胁模型总览

6.1 数字存证

威胁缓解措施残余风险
用户篡改原始文件Merkle Tree 根哈希验证无(篡改必被发现)
TSA 私钥泄露多供应商故障转移单一 TSA 泄露不影响历史验证
本地 JSON 被修改可选 AES-256 加密未加密时可修改,但文件哈希验证仍会暴露
Merkle second-preimage无域分隔前缀不满足对抗性碰撞构造
作者身份伪造用户自声明author_name 无第三方验证

6.2 维权取证

威胁缓解措施残余风险
目标页面删除立即取证 + RFC 3161取证前已删除则无法补救
本地伪造网页TLS 证书抓取仅验证域名证书,不验证内容真实性
截图被 PSmanifest SHA-256截图本身无法证明"未经过 PS"
浏览器被识别为 botstealth + visible fallback部分平台仍可能拦截
HAR / 证书缺失多源采集单点失败不会导致整包失效
超长页面视口截图full_page=False,下方内容不会被 capture

6.3 隐水印

威胁缓解措施残余风险
密码硬编码编译期常量所有用户实例共享同一对密码,逆向获取后可批量提取
无认证水印双密码系统无法证明"这条水印是我嵌入的",只能证明"图片包含这段文本"
伪造水印密码保密知道密码后可将任意文本嵌入并声称来自 Nephele
去除水印量化嵌入(d1=36)重度压缩、旋转、大面积裁切(> 50%)可破坏
静默截断定长 32 字节编码超长文本被静默截断,用户可能误以为完整嵌入
验证失败仍输出round-trip 检查mismatch 时仅记录 warning,图片仍会输出

注意

隐水印在 Nephele 中的定位是辅助溯源工具,不是密码学意义上的数字签名。其核心价值在于"增加盗图者去除水印的成本",而非"提供不可伪造的权属证明"。如需法律级确权,请使用数字存证功能。

6.4 AI 元数据检测

威胁 / 场景结果
原始 ComfyUI PNG可检出 workflow / prompt
原始 A1111 PNG可检出 parameters
OpenAI / Google C2PA 图可读取 manifest、AI marker、签名链、trust 状态
Midjourney 保留 Job ID / XMP可检出强证据
平台重编码图(微博 / Twitter / 小红书下载)只能提示凭据不足,status=human(reason 区分)
截图原始 metadata 通常丢失,无法判定
恶意清理元数据无法恢复已删除证据
恶意伪造非 C2PA 文本元数据无密码学真实性保证,可能误报
仅视觉风格像 AI不判断
用户在注释里抄 SD 参数的非 AI 图可能误报为 ai
右下角有低饱和装饰物的非 AI 图(Google context 下)可能触发 sparkle 误报

注意

本功能的审计结论是:适合作为 AI 生成凭据与元数据筛查工具,不应被宣传为通用 AI 图像真伪检测器。"未检出"不等于"非 AI 生成"。


7. 依赖清单与降级行为

用途缺失时行为
rfc3161ngTSA 通信TSA 完全不可用,强制降级本地 .json
asn1cryptoTSR 解析回退到本地时钟和 provider_name
pyzipper.nep AES-256密码不生效,标准 ZIP
Pillow缩略图 / 图像 I/O / 元数据读取存证流程阻断 / 打包功能阻断 / AI 检测阻断
reportlabPDF 报告存证流程阻断
qrcodePDF 二维码回退纯文本 URL
playwright浏览器取证功能完全不可用
blind_watermark隐水印 DWT 嵌入 / 提取隐水印功能完全不可用,返回原图
numpy隐水印 bit 数组转换 / 图像处理隐水印功能完全不可用
pywt小波变换(blind_watermark 传递依赖)隐水印功能完全不可用
c2pa-python官方 C2PA SDK回退到字节级扫描(无签名验证),evidence 注明 "official C2PA verifier unavailable"

8. 隐私与网络行为

8.1 默认联网路径

功能网络动作是否可关闭
数字存证(TSA)向 DigiCert / FreeTSA / IdenTrust 发送 SHA-256 摘要请求可切回纯本地时间戳(降级)
维权取证通过 Playwright 向目标站点发起 HTTPS 请求、TLS 握手、DNS 解析否(功能本质即为联网取证)
AI 元数据检测(C2PA SDK)ocsp_fetch=True + remote_manifest_fetch=True当前通过 SDK settings 硬编码开启,无 UI 开关
隐水印

8.2 离线性说明

  • 数字存证:向 TSA 只发送哈希值,不发送原始文件内容

  • 维权取证:按设计发起对目标 URL 的完整请求,这就是取证本身。

  • AI 元数据检测:常规 PNG info / EXIF / 字节扫描在本地完成。 C2PA SDK 在验证签名链时可能:

    • 拉取远端 manifest(remote_manifest_fetch
    • 检查证书吊销(ocsp_fetch
    • 验证时间戳 trust(verify_timestamp_trust

    因此产品文案不应笼统宣称"C2PA 验证永远不联网"。如用户需要严格离线模式,应提供关闭 remote manifest / OCSP 的开关(尚未实现)。

  • 隐水印:完全本地,嵌入 / 提取均不触网。