2025年1月24日金曜日

win ファイル読み込み Python

 winファイルの振動データをPythonに取り込む必要に迫られました。

もともとは 既存の GUI プログラムでテキストに変換していたのですが、これだと CSV で保存してから Python で読みに行くといった無駄な時間が発生します。
C の共有メモリに Python でアクセスする方法が最も速いのですが、これは私には荷が重い。

ということで、刻々書き出される win ファイルを Python で読むことにしました。

といっても、今まで win format を読んだことがありません。確認すると、これがフクザツ。入れ子状態で説明が書かれています。

ということで、GPT-o1 の出番。
win format の解説と wdisk のソースをすべて読ませてから、読み込みスクリプトを作ってもらいました。

フムフム、ナルホド、コウイウコトダッタノカ。

import pandas as pd
from datetime import datetime, timedelta

def bcd_byte_to_int(b: int) -> int:
    """1 バイトBCDを整数に変換 (0x23 -> 23など)"""
    hi = (b & 0xF0) >> 4
    lo = (b & 0x0F)
    return hi * 10 + lo

def decode_bcd_time(bcd6: bytes) -> datetime:
    """
    秒ヘッダー6バイト (BCD: YY, MM, DD, HH, mm, SS) を datetime に変換。
    ここでは 0~49 => 2000~2049, 50~99 => 1950~1999 の例。
    """
    if len(bcd6) < 6:
        return None

    year  = bcd_byte_to_int(bcd6[0])  # 0 ~ 99
    month = bcd_byte_to_int(bcd6[1])  # 1 ~ 12
    day   = bcd_byte_to_int(bcd6[2])  # 1 ~ 31
    hour  = bcd_byte_to_int(bcd6[3])  # 0 ~ 23
    minu  = bcd_byte_to_int(bcd6[4])  # 0 ~ 59
    sec   = bcd_byte_to_int(bcd6[5])  # 0 ~ 59

    # 年の補正 (例: 年<50 => 2000年代, それ以外 => 1900年代)
    if year < 50:
        year_full = 2000 + year
    else:
        year_full = 1900 + year

    try:
        return datetime(year_full, month, day, hour, minu, sec)
    except ValueError:
        # 不正な日時など
        return None

def read_4bit_2s_comp_no_cross(data: bytes, bit_offset: int) -> int:
    """
    data バッファの bit_offset ビット目から 4 ビットを取り出し、
    2 の補数(4bit)として返す (バイト境界は跨がない前提)。
     - 0x0~0x7 => 0~7,  0x8~0xF => -8 ~ -1
    """
    byte_index = bit_offset // 8
    bit_in_byte = bit_offset % 8  # 0~7

    val = data[byte_index] if byte_index < len(data) else 0
    shift_amount = (8 - 4) - bit_in_byte
    nibble = (val >> shift_amount) & 0x0F

    if nibble >= 8:
        nibble -= 16  # => -8 ~ -1
    return nibble


def parse_channel_block(data: bytes, offset: int):
    """
    1つのチャネルブロックをパースして情報を返す。
    戻り値: (消費バイト数, dict(channel=<ch_num>,
                                 sample_size_info=<info>,
                                 sampling_rate=<rate>,
                                 samples=[<int値>, ...]))
    """
    if offset + 4 > len(data):
        return (0, None)
    chan_header = data[offset : offset+4]
    
    # チャネル番号
    ch_num_raw = int.from_bytes(chan_header[0:2], 'big', signed=False)
    ch_num = f"{ch_num_raw:04X}"  # 90EA のような16進大文字表記
    # サンプリングレート等
    last2 = int.from_bytes(chan_header[2:4], 'big', signed=False)
    sample_size_info = (last2 >> 12) & 0x0F
    sampling_rate    = last2 & 0x0FFF
    
    pos = offset + 4
    
    # 先頭サンプル 4バイト
    if pos + 4 > len(data):
        return (4, {
            'channel': ch_num,
            'sample_size_info': sample_size_info,
            'sampling_rate': sampling_rate,
            'samples': [],
        })
    first_sample_bytes = data[pos : pos+4]
    pos += 4
    
    current_value = int.from_bytes(first_sample_bytes, 'big', signed=True)
    samples = [current_value]
    
    diff_count = sampling_rate - 1
    if sample_size_info == 0:
        # 4bit(0.5バイト) 差分
        bit_offset = pos * 8
        for _ in range(diff_count):
            #nibble = read_4bit_2s_comp(data, bit_offset)
            nibble = read_4bit_2s_comp_no_cross(data, bit_offset)
            bit_offset += 4
            current_value += nibble
            samples.append(current_value)
        pos = (bit_offset + 7)//8
        
    elif sample_size_info in [1, 2, 3, 4]:
        # 差分
        sample_len = sample_size_info
        for _ in range(diff_count):
            if pos + sample_len > len(data):
                break
            diff_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True)
            pos += sample_len
            current_value += diff_val
            samples.append(current_value)
            
    elif sample_size_info == 5:
        # 4バイト絶対値
        sample_len = 4
        for _ in range(diff_count):
            if pos + sample_len > len(data):
                break
            abs_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True)
            pos += sample_len
            current_value = abs_val
            samples.append(current_value)
    
    ch_info = {
        'channel': ch_num,
        'sample_size_info': sample_size_info,
        'sampling_rate': sampling_rate,
        'samples': samples
    }
    consumed = pos - offset
    return (consumed, ch_info)

def read_win_file_as_dataframe(filename: str) -> pd.DataFrame:
    """
    ディスクファイル上の WINフォーマットを読み込み、
    ・行: 「各サンプル」の時刻 (秒ヘッダ + サンプルオフセット)
    ・列: チャネル番号
    ・値: 各サンプルの値
    として1つの DataFrame を返す
    """
    # "各サンプル"を行にするため、時刻単位(秒 + fraction)で紐づける。
    # 複数チャネルを同一時刻にマージするため、以下のような手順:
    #   - data_dict: { pd.Timestamp: { ch_name: sampleVal, ...}, ... }
    #   - 最後に data_dict -> DataFrame へ変換
    
    data_dict = {}
    
    with open(filename, 'rb') as f:
        while True:
            # 1) ブロックサイズ
            size_buf = f.read(4)
            if len(size_buf) < 4:
                break
            block_size = int.from_bytes(size_buf, 'big', signed=False)
            if block_size < 4:
                break
            
            # 2) 秒ブロック本体読み込み
            sec_block = f.read(block_size - 4)
            if len(sec_block) < (block_size - 4):
                break
            
            # a) BCD時刻 (先頭6バイト)
            if len(sec_block) < 6:
                continue
            bcd_part = sec_block[0:6]
            base_dt = decode_bcd_time(bcd_part)  # datetime
            if base_dt is None:
                continue
            
            # b) チャネルブロックを読み込む
            offset = 6
            channel_results = []
            while offset < len(sec_block):
                consumed, ch_info = parse_channel_block(sec_block, offset)
                if not ch_info or consumed <= 0:
                    break
                offset += consumed
                channel_results.append(ch_info)
            
            # c) チャネル結果 ⇒ サンプルの時刻と値を data_dict に格納
            for ch_info in channel_results:
                ch_num = ch_info['channel']
                sr = ch_info['sampling_rate']
                samples = ch_info['samples']
                # 1秒ブロック内で sr 個のサンプルがあると仮定(不足分は途中でbreakしてる場合もあり)
                # 各サンプル i 番は base_dt + i*(1/sr) 秒後とみなす
                #   → Timedelta等で補正
                # カラム名は16進表記でも10進表記でもOK
                ch_name = ch_num
                
                for i, val in enumerate(samples):
                    # サンプル時刻
                    if sr > 1:
                        frac_sec = i / sr
                    else:
                        frac_sec = 0.0
                    
                    sample_dt = base_dt + timedelta(seconds=frac_sec)
                    
                    # data_dict[sample_dt] に ch_name: val を入れる
                    if sample_dt not in data_dict:
                        data_dict[sample_dt] = {}
                    data_dict[sample_dt][ch_name] = val
    
    # 辞書 -> DataFrame
    df = pd.DataFrame.from_dict(data_dict, orient='index')
    # 時刻順にソート
    df.sort_index(inplace=True)
    return df,sr

fname = "input/25012123.36"
df, sr = read_win_file_as_dataframe(fname)


これでA/D値を取り込めます。テストしたファイルは sample_size_info = 2 のみでしたが、これまでのプログラムと出力が一致していることを確認しました。他は追々。

data frame に格納したので、物理量への変換はもちろん指定時刻から前〇〇秒とか、欠測秒をNanで埋めるとか後で容易に加工できます。

コードを読むことでファイル構造を理解するという逆の手順ですが、GPTの性能向上によって可能となりました。ラクナジダイニナッタ。

0 件のコメント:

コメントを投稿