winファイルの振動データをPythonに取り込む必要に迫られました。
もともとは 既存の GUI プログラムでテキストに変換していたのですが、これだと CSV で保存してから Python で読みに行くといった無駄な時間が発生します。
C の共有メモリに Python でアクセスする方法が最も速いのですが、これは私には荷が重い。
ということで、刻々書き出される win ファイルを Python で読むことにしました。
といっても、今まで win format を読んだことがありません。確認すると、これがフクザツ。入れ子状態で説明が書かれています。
ということで、GPT-o1 の出番。
win format の解説と wdisk のソースをすべて読ませてから、読み込みスクリプトを作ってもらいました。
フムフム、ナルホド、コウイウコトダッタノカ。
import pandas as pd from datetime import datetime, timedelta def bcd_byte_to_int(b: int) -> int: """1 バイトBCDを整数に変換 (0x23 -> 23など)""" hi = (b & 0xF0) >> 4 lo = (b & 0x0F) return hi * 10 + lo def decode_bcd_time(bcd6: bytes) -> datetime: """ 秒ヘッダー6バイト (BCD: YY, MM, DD, HH, mm, SS) を datetime に変換。 ここでは 0~49 => 2000~2049, 50~99 => 1950~1999 の例。 """ if len(bcd6) < 6: return None year = bcd_byte_to_int(bcd6[0]) # 0 ~ 99 month = bcd_byte_to_int(bcd6[1]) # 1 ~ 12 day = bcd_byte_to_int(bcd6[2]) # 1 ~ 31 hour = bcd_byte_to_int(bcd6[3]) # 0 ~ 23 minu = bcd_byte_to_int(bcd6[4]) # 0 ~ 59 sec = bcd_byte_to_int(bcd6[5]) # 0 ~ 59 # 年の補正 (例: 年<50 => 2000年代, それ以外 => 1900年代) if year < 50: year_full = 2000 + year else: year_full = 1900 + year try: return datetime(year_full, month, day, hour, minu, sec) except ValueError: # 不正な日時など return None def read_4bit_2s_comp_no_cross(data: bytes, bit_offset: int) -> int: """ data バッファの bit_offset ビット目から 4 ビットを取り出し、 2 の補数(4bit)として返す (バイト境界は跨がない前提)。 - 0x0~0x7 => 0~7, 0x8~0xF => -8 ~ -1 """ byte_index = bit_offset // 8 bit_in_byte = bit_offset % 8 # 0~7 val = data[byte_index] if byte_index < len(data) else 0 shift_amount = (8 - 4) - bit_in_byte nibble = (val >> shift_amount) & 0x0F if nibble >= 8: nibble -= 16 # => -8 ~ -1 return nibble def parse_channel_block(data: bytes, offset: int): """ 1つのチャネルブロックをパースして情報を返す。 戻り値: (消費バイト数, dict(channel=<ch_num>, sample_size_info=<info>, sampling_rate=<rate>, samples=[<int値>, ...])) """ if offset + 4 > len(data): return (0, None) chan_header = data[offset : offset+4] # チャネル番号 ch_num_raw = int.from_bytes(chan_header[0:2], 'big', signed=False) ch_num = f"{ch_num_raw:04X}" # 90EA のような16進大文字表記 # サンプリングレート等 last2 = int.from_bytes(chan_header[2:4], 'big', signed=False) sample_size_info = (last2 >> 12) & 0x0F sampling_rate = last2 & 0x0FFF pos = offset + 4 # 先頭サンプル 4バイト if pos + 4 > len(data): return (4, { 'channel': ch_num, 'sample_size_info': sample_size_info, 'sampling_rate': sampling_rate, 'samples': [], }) first_sample_bytes = data[pos : pos+4] pos += 4 current_value = int.from_bytes(first_sample_bytes, 'big', signed=True) samples = [current_value] diff_count = sampling_rate - 1 if sample_size_info == 0: # 4bit(0.5バイト) 差分 bit_offset = pos * 8 for _ in range(diff_count): #nibble = read_4bit_2s_comp(data, bit_offset) nibble = read_4bit_2s_comp_no_cross(data, bit_offset) bit_offset += 4 current_value += nibble samples.append(current_value) pos = (bit_offset + 7)//8 elif sample_size_info in [1, 2, 3, 4]: # 差分 sample_len = sample_size_info for _ in range(diff_count): if pos + sample_len > len(data): break diff_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True) pos += sample_len current_value += diff_val samples.append(current_value) elif sample_size_info == 5: # 4バイト絶対値 sample_len = 4 for _ in range(diff_count): if pos + sample_len > len(data): break abs_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True) pos += sample_len current_value = abs_val samples.append(current_value) ch_info = { 'channel': ch_num, 'sample_size_info': sample_size_info, 'sampling_rate': sampling_rate, 'samples': samples } consumed = pos - offset return (consumed, ch_info) def read_win_file_as_dataframe(filename: str) -> pd.DataFrame: """ ディスクファイル上の WINフォーマットを読み込み、 ・行: 「各サンプル」の時刻 (秒ヘッダ + サンプルオフセット) ・列: チャネル番号 ・値: 各サンプルの値 として1つの DataFrame を返す """ # "各サンプル"を行にするため、時刻単位(秒 + fraction)で紐づける。 # 複数チャネルを同一時刻にマージするため、以下のような手順: # - data_dict: { pd.Timestamp: { ch_name: sampleVal, ...}, ... } # - 最後に data_dict -> DataFrame へ変換 data_dict = {} with open(filename, 'rb') as f: while True: # 1) ブロックサイズ size_buf = f.read(4) if len(size_buf) < 4: break block_size = int.from_bytes(size_buf, 'big', signed=False) if block_size < 4: break # 2) 秒ブロック本体読み込み sec_block = f.read(block_size - 4) if len(sec_block) < (block_size - 4): break # a) BCD時刻 (先頭6バイト) if len(sec_block) < 6: continue bcd_part = sec_block[0:6] base_dt = decode_bcd_time(bcd_part) # datetime if base_dt is None: continue # b) チャネルブロックを読み込む offset = 6 channel_results = [] while offset < len(sec_block): consumed, ch_info = parse_channel_block(sec_block, offset) if not ch_info or consumed <= 0: break offset += consumed channel_results.append(ch_info) # c) チャネル結果 ⇒ サンプルの時刻と値を data_dict に格納 for ch_info in channel_results: ch_num = ch_info['channel'] sr = ch_info['sampling_rate'] samples = ch_info['samples'] # 1秒ブロック内で sr 個のサンプルがあると仮定(不足分は途中でbreakしてる場合もあり) # 各サンプル i 番は base_dt + i*(1/sr) 秒後とみなす # → Timedelta等で補正 # カラム名は16進表記でも10進表記でもOK ch_name = ch_num for i, val in enumerate(samples): # サンプル時刻 if sr > 1: frac_sec = i / sr else: frac_sec = 0.0 sample_dt = base_dt + timedelta(seconds=frac_sec) # data_dict[sample_dt] に ch_name: val を入れる if sample_dt not in data_dict: data_dict[sample_dt] = {} data_dict[sample_dt][ch_name] = val # 辞書 -> DataFrame df = pd.DataFrame.from_dict(data_dict, orient='index') # 時刻順にソート df.sort_index(inplace=True) return df,sr fname = "input/25012123.36" df, sr = read_win_file_as_dataframe(fname)
これでA/D値を取り込めます。テストしたファイルは sample_size_info = 2 のみでしたが、これまでのプログラムと出力が一致していることを確認しました。他は追々。
data frame に格納したので、物理量への変換はもちろん指定時刻から前〇〇秒とか、欠測秒をNanで埋めるとか後で容易に加工できます。
コードを読むことでファイル構造を理解するという逆の手順ですが、GPTの性能向上によって可能となりました。ラクナジダイニナッタ。
0 件のコメント:
コメントを投稿