winファイルの振動データをPythonに取り込む必要に迫られました。
もともとは 既存の GUI プログラムでテキストに変換していたのですが、これだと CSV で保存してから Python で読みに行くといった無駄な時間が発生します。
C の共有メモリに Python でアクセスする方法が最も速いのですが、これは私には荷が重い。
ということで、刻々書き出される win ファイルを Python で読むことにしました。
といっても、今まで win format を読んだことがありません。確認すると、これがフクザツ。入れ子状態で説明が書かれています。
ということで、GPT-o1 の出番。
win format の解説と wdisk のソースをすべて読ませてから、読み込みスクリプトを作ってもらいました。
フムフム、ナルホド、コウイウコトダッタノカ。
import pandas as pd
from datetime import datetime, timedelta
def bcd_byte_to_int(b: int) -> int:
"""1 バイトBCDを整数に変換 (0x23 -> 23など)"""
hi = (b & 0xF0) >> 4
lo = (b & 0x0F)
return hi * 10 + lo
def decode_bcd_time(bcd6: bytes) -> datetime:
"""
秒ヘッダー6バイト (BCD: YY, MM, DD, HH, mm, SS) を datetime に変換。
ここでは 0~49 => 2000~2049, 50~99 => 1950~1999 の例。
"""
if len(bcd6) < 6:
return None
year = bcd_byte_to_int(bcd6[0]) # 0 ~ 99
month = bcd_byte_to_int(bcd6[1]) # 1 ~ 12
day = bcd_byte_to_int(bcd6[2]) # 1 ~ 31
hour = bcd_byte_to_int(bcd6[3]) # 0 ~ 23
minu = bcd_byte_to_int(bcd6[4]) # 0 ~ 59
sec = bcd_byte_to_int(bcd6[5]) # 0 ~ 59
# 年の補正 (例: 年<50 => 2000年代, それ以外 => 1900年代)
if year < 50:
year_full = 2000 + year
else:
year_full = 1900 + year
try:
return datetime(year_full, month, day, hour, minu, sec)
except ValueError:
# 不正な日時など
return None
def read_4bit_2s_comp_no_cross(data: bytes, bit_offset: int) -> int:
"""
data バッファの bit_offset ビット目から 4 ビットを取り出し、
2 の補数(4bit)として返す (バイト境界は跨がない前提)。
- 0x0~0x7 => 0~7, 0x8~0xF => -8 ~ -1
"""
byte_index = bit_offset // 8
bit_in_byte = bit_offset % 8 # 0~7
val = data[byte_index] if byte_index < len(data) else 0
shift_amount = (8 - 4) - bit_in_byte
nibble = (val >> shift_amount) & 0x0F
if nibble >= 8:
nibble -= 16 # => -8 ~ -1
return nibble
def parse_channel_block(data: bytes, offset: int):
"""
1つのチャネルブロックをパースして情報を返す。
戻り値: (消費バイト数, dict(channel=<ch_num>,
sample_size_info=<info>,
sampling_rate=<rate>,
samples=[<int値>, ...]))
"""
if offset + 4 > len(data):
return (0, None)
chan_header = data[offset : offset+4]
# チャネル番号
ch_num_raw = int.from_bytes(chan_header[0:2], 'big', signed=False)
ch_num = f"{ch_num_raw:04X}" # 90EA のような16進大文字表記
# サンプリングレート等
last2 = int.from_bytes(chan_header[2:4], 'big', signed=False)
sample_size_info = (last2 >> 12) & 0x0F
sampling_rate = last2 & 0x0FFF
pos = offset + 4
# 先頭サンプル 4バイト
if pos + 4 > len(data):
return (4, {
'channel': ch_num,
'sample_size_info': sample_size_info,
'sampling_rate': sampling_rate,
'samples': [],
})
first_sample_bytes = data[pos : pos+4]
pos += 4
current_value = int.from_bytes(first_sample_bytes, 'big', signed=True)
samples = [current_value]
diff_count = sampling_rate - 1
if sample_size_info == 0:
# 4bit(0.5バイト) 差分
bit_offset = pos * 8
for _ in range(diff_count):
#nibble = read_4bit_2s_comp(data, bit_offset)
nibble = read_4bit_2s_comp_no_cross(data, bit_offset)
bit_offset += 4
current_value += nibble
samples.append(current_value)
pos = (bit_offset + 7)//8
elif sample_size_info in [1, 2, 3, 4]:
# 差分
sample_len = sample_size_info
for _ in range(diff_count):
if pos + sample_len > len(data):
break
diff_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True)
pos += sample_len
current_value += diff_val
samples.append(current_value)
elif sample_size_info == 5:
# 4バイト絶対値
sample_len = 4
for _ in range(diff_count):
if pos + sample_len > len(data):
break
abs_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True)
pos += sample_len
current_value = abs_val
samples.append(current_value)
ch_info = {
'channel': ch_num,
'sample_size_info': sample_size_info,
'sampling_rate': sampling_rate,
'samples': samples
}
consumed = pos - offset
return (consumed, ch_info)
def read_win_file_as_dataframe(filename: str) -> pd.DataFrame:
"""
ディスクファイル上の WINフォーマットを読み込み、
・行: 「各サンプル」の時刻 (秒ヘッダ + サンプルオフセット)
・列: チャネル番号
・値: 各サンプルの値
として1つの DataFrame を返す
"""
# "各サンプル"を行にするため、時刻単位(秒 + fraction)で紐づける。
# 複数チャネルを同一時刻にマージするため、以下のような手順:
# - data_dict: { pd.Timestamp: { ch_name: sampleVal, ...}, ... }
# - 最後に data_dict -> DataFrame へ変換
data_dict = {}
with open(filename, 'rb') as f:
while True:
# 1) ブロックサイズ
size_buf = f.read(4)
if len(size_buf) < 4:
break
block_size = int.from_bytes(size_buf, 'big', signed=False)
if block_size < 4:
break
# 2) 秒ブロック本体読み込み
sec_block = f.read(block_size - 4)
if len(sec_block) < (block_size - 4):
break
# a) BCD時刻 (先頭6バイト)
if len(sec_block) < 6:
continue
bcd_part = sec_block[0:6]
base_dt = decode_bcd_time(bcd_part) # datetime
if base_dt is None:
continue
# b) チャネルブロックを読み込む
offset = 6
channel_results = []
while offset < len(sec_block):
consumed, ch_info = parse_channel_block(sec_block, offset)
if not ch_info or consumed <= 0:
break
offset += consumed
channel_results.append(ch_info)
# c) チャネル結果 ⇒ サンプルの時刻と値を data_dict に格納
for ch_info in channel_results:
ch_num = ch_info['channel']
sr = ch_info['sampling_rate']
samples = ch_info['samples']
# 1秒ブロック内で sr 個のサンプルがあると仮定(不足分は途中でbreakしてる場合もあり)
# 各サンプル i 番は base_dt + i*(1/sr) 秒後とみなす
# → Timedelta等で補正
# カラム名は16進表記でも10進表記でもOK
ch_name = ch_num
for i, val in enumerate(samples):
# サンプル時刻
if sr > 1:
frac_sec = i / sr
else:
frac_sec = 0.0
sample_dt = base_dt + timedelta(seconds=frac_sec)
# data_dict[sample_dt] に ch_name: val を入れる
if sample_dt not in data_dict:
data_dict[sample_dt] = {}
data_dict[sample_dt][ch_name] = val
# 辞書 -> DataFrame
df = pd.DataFrame.from_dict(data_dict, orient='index')
# 時刻順にソート
df.sort_index(inplace=True)
return df,sr
fname = "input/25012123.36"
df, sr = read_win_file_as_dataframe(fname)
これでA/D値を取り込めます。テストしたファイルは sample_size_info = 2 のみでしたが、これまでのプログラムと出力が一致していることを確認しました。他は追々。
data frame に格納したので、物理量への変換はもちろん指定時刻から前〇〇秒とか、欠測秒をNanで埋めるとか後で容易に加工できます。
コードを読むことでファイル構造を理解するという逆の手順ですが、GPTの性能向上によって可能となりました。ラクナジダイニナッタ。
0 件のコメント:
コメントを投稿