winファイルの振動データをPythonに取り込む必要に迫られました。
もともとは 既存の GUI プログラムでテキストに変換していたのですが、これだと CSV で保存してから Python で読みに行くといった無駄な時間が発生します。
C の共有メモリに Python でアクセスする方法が最も速いのですが、これは私には荷が重い。
ということで、刻々書き出される win ファイルを Python で読むことにしました。
といっても、今まで win format を読んだことがありません。確認すると、これがフクザツ。入れ子状態で説明が書かれています。
ということで、GPT-o1 の出番。
win format の解説と wdisk のソースをすべて読ませてから、読み込みスクリプトを作ってもらいました。
フムフム、ナルホド、コウイウコトダッタノカ。
- import pandas as pd
- from datetime import datetime, timedelta
-
- def bcd_byte_to_int(b: int) -> int:
- """1 バイトBCDを整数に変換 (0x23 -> 23など)"""
- hi = (b & 0xF0) >> 4
- lo = (b & 0x0F)
- return hi * 10 + lo
-
- def decode_bcd_time(bcd6: bytes) -> datetime:
- """
- 秒ヘッダー6バイト (BCD: YY, MM, DD, HH, mm, SS) を datetime に変換。
- ここでは 0~49 => 2000~2049, 50~99 => 1950~1999 の例。
- """
- if len(bcd6) < 6:
- return None
-
- year = bcd_byte_to_int(bcd6[0]) # 0 ~ 99
- month = bcd_byte_to_int(bcd6[1]) # 1 ~ 12
- day = bcd_byte_to_int(bcd6[2]) # 1 ~ 31
- hour = bcd_byte_to_int(bcd6[3]) # 0 ~ 23
- minu = bcd_byte_to_int(bcd6[4]) # 0 ~ 59
- sec = bcd_byte_to_int(bcd6[5]) # 0 ~ 59
-
- # 年の補正 (例: 年<50 => 2000年代, それ以外 => 1900年代)
- if year < 50:
- year_full = 2000 + year
- else:
- year_full = 1900 + year
-
- try:
- return datetime(year_full, month, day, hour, minu, sec)
- except ValueError:
- # 不正な日時など
- return None
-
- def read_4bit_2s_comp_no_cross(data: bytes, bit_offset: int) -> int:
- """
- data バッファの bit_offset ビット目から 4 ビットを取り出し、
- 2 の補数(4bit)として返す (バイト境界は跨がない前提)。
- - 0x0~0x7 => 0~7, 0x8~0xF => -8 ~ -1
- """
- byte_index = bit_offset // 8
- bit_in_byte = bit_offset % 8 # 0~7
-
- val = data[byte_index] if byte_index < len(data) else 0
- shift_amount = (8 - 4) - bit_in_byte
- nibble = (val >> shift_amount) & 0x0F
-
- if nibble >= 8:
- nibble -= 16 # => -8 ~ -1
- return nibble
-
-
- def parse_channel_block(data: bytes, offset: int):
- """
- 1つのチャネルブロックをパースして情報を返す。
- 戻り値: (消費バイト数, dict(channel=<ch_num>,
- sample_size_info=<info>,
- sampling_rate=<rate>,
- samples=[<int値>, ...]))
- """
- if offset + 4 > len(data):
- return (0, None)
- chan_header = data[offset : offset+4]
-
- # チャネル番号
- ch_num_raw = int.from_bytes(chan_header[0:2], 'big', signed=False)
- ch_num = f"{ch_num_raw:04X}" # 90EA のような16進大文字表記
- # サンプリングレート等
- last2 = int.from_bytes(chan_header[2:4], 'big', signed=False)
- sample_size_info = (last2 >> 12) & 0x0F
- sampling_rate = last2 & 0x0FFF
-
- pos = offset + 4
-
- # 先頭サンプル 4バイト
- if pos + 4 > len(data):
- return (4, {
- 'channel': ch_num,
- 'sample_size_info': sample_size_info,
- 'sampling_rate': sampling_rate,
- 'samples': [],
- })
- first_sample_bytes = data[pos : pos+4]
- pos += 4
-
- current_value = int.from_bytes(first_sample_bytes, 'big', signed=True)
- samples = [current_value]
-
- diff_count = sampling_rate - 1
- if sample_size_info == 0:
- # 4bit(0.5バイト) 差分
- bit_offset = pos * 8
- for _ in range(diff_count):
- #nibble = read_4bit_2s_comp(data, bit_offset)
- nibble = read_4bit_2s_comp_no_cross(data, bit_offset)
- bit_offset += 4
- current_value += nibble
- samples.append(current_value)
- pos = (bit_offset + 7)//8
-
- elif sample_size_info in [1, 2, 3, 4]:
- # 差分
- sample_len = sample_size_info
- for _ in range(diff_count):
- if pos + sample_len > len(data):
- break
- diff_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True)
- pos += sample_len
- current_value += diff_val
- samples.append(current_value)
-
- elif sample_size_info == 5:
- # 4バイト絶対値
- sample_len = 4
- for _ in range(diff_count):
- if pos + sample_len > len(data):
- break
- abs_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True)
- pos += sample_len
- current_value = abs_val
- samples.append(current_value)
-
- ch_info = {
- 'channel': ch_num,
- 'sample_size_info': sample_size_info,
- 'sampling_rate': sampling_rate,
- 'samples': samples
- }
- consumed = pos - offset
- return (consumed, ch_info)
-
- def read_win_file_as_dataframe(filename: str) -> pd.DataFrame:
- """
- ディスクファイル上の WINフォーマットを読み込み、
- ・行: 「各サンプル」の時刻 (秒ヘッダ + サンプルオフセット)
- ・列: チャネル番号
- ・値: 各サンプルの値
- として1つの DataFrame を返す
- """
- # "各サンプル"を行にするため、時刻単位(秒 + fraction)で紐づける。
- # 複数チャネルを同一時刻にマージするため、以下のような手順:
- # - data_dict: { pd.Timestamp: { ch_name: sampleVal, ...}, ... }
- # - 最後に data_dict -> DataFrame へ変換
-
- data_dict = {}
-
- with open(filename, 'rb') as f:
- while True:
- # 1) ブロックサイズ
- size_buf = f.read(4)
- if len(size_buf) < 4:
- break
- block_size = int.from_bytes(size_buf, 'big', signed=False)
- if block_size < 4:
- break
-
- # 2) 秒ブロック本体読み込み
- sec_block = f.read(block_size - 4)
- if len(sec_block) < (block_size - 4):
- break
-
- # a) BCD時刻 (先頭6バイト)
- if len(sec_block) < 6:
- continue
- bcd_part = sec_block[0:6]
- base_dt = decode_bcd_time(bcd_part) # datetime
- if base_dt is None:
- continue
-
- # b) チャネルブロックを読み込む
- offset = 6
- channel_results = []
- while offset < len(sec_block):
- consumed, ch_info = parse_channel_block(sec_block, offset)
- if not ch_info or consumed <= 0:
- break
- offset += consumed
- channel_results.append(ch_info)
-
- # c) チャネル結果 ⇒ サンプルの時刻と値を data_dict に格納
- for ch_info in channel_results:
- ch_num = ch_info['channel']
- sr = ch_info['sampling_rate']
- samples = ch_info['samples']
- # 1秒ブロック内で sr 個のサンプルがあると仮定(不足分は途中でbreakしてる場合もあり)
- # 各サンプル i 番は base_dt + i*(1/sr) 秒後とみなす
- # → Timedelta等で補正
- # カラム名は16進表記でも10進表記でもOK
- ch_name = ch_num
-
- for i, val in enumerate(samples):
- # サンプル時刻
- if sr > 1:
- frac_sec = i / sr
- else:
- frac_sec = 0.0
-
- sample_dt = base_dt + timedelta(seconds=frac_sec)
-
- # data_dict[sample_dt] に ch_name: val を入れる
- if sample_dt not in data_dict:
- data_dict[sample_dt] = {}
- data_dict[sample_dt][ch_name] = val
-
- # 辞書 -> DataFrame
- df = pd.DataFrame.from_dict(data_dict, orient='index')
- # 時刻順にソート
- df.sort_index(inplace=True)
- return df,sr
-
- fname = "input/25012123.36"
- df, sr = read_win_file_as_dataframe(fname)
-
-
これでA/D値を取り込めます。テストしたファイルは sample_size_info = 2 のみでしたが、これまでのプログラムと出力が一致していることを確認しました。他は追々。
data frame に格納したので、物理量への変換はもちろん指定時刻から前〇〇秒とか、欠測秒をNanで埋めるとか後で容易に加工できます。
コードを読むことでファイル構造を理解するという逆の手順ですが、GPTの性能向上によって可能となりました。ラクナジダイニナッタ。