2025年1月24日金曜日

win ファイル読み込み Python

 winファイルの振動データをPythonに取り込む必要に迫られました。

もともとは 既存の GUI プログラムでテキストに変換していたのですが、これだと CSV で保存してから Python で読みに行くといった無駄な時間が発生します。
C の共有メモリに Python でアクセスする方法が最も速いのですが、これは私には荷が重い。

ということで、刻々書き出される win ファイルを Python で読むことにしました。

といっても、今まで win format を読んだことがありません。確認すると、これがフクザツ。入れ子状態で説明が書かれています。

ということで、GPT-o1 の出番。
win format の解説と wdisk のソースをすべて読ませてから、読み込みスクリプトを作ってもらいました。

フムフム、ナルホド、コウイウコトダッタノカ。

  1. import pandas as pd
  2. from datetime import datetime, timedelta
  3.  
  4. def bcd_byte_to_int(b: int) -> int:
  5. """1 バイトBCDを整数に変換 (0x23 -> 23など)"""
  6. hi = (b & 0xF0) >> 4
  7. lo = (b & 0x0F)
  8. return hi * 10 + lo
  9.  
  10. def decode_bcd_time(bcd6: bytes) -> datetime:
  11. """
  12. 秒ヘッダー6バイト (BCD: YY, MM, DD, HH, mm, SS) を datetime に変換。
  13. ここでは 0~49 => 2000~2049, 50~99 => 1950~1999 の例。
  14. """
  15. if len(bcd6) < 6:
  16. return None
  17.  
  18. year = bcd_byte_to_int(bcd6[0]) # 0 ~ 99
  19. month = bcd_byte_to_int(bcd6[1]) # 1 ~ 12
  20. day = bcd_byte_to_int(bcd6[2]) # 1 ~ 31
  21. hour = bcd_byte_to_int(bcd6[3]) # 0 ~ 23
  22. minu = bcd_byte_to_int(bcd6[4]) # 0 ~ 59
  23. sec = bcd_byte_to_int(bcd6[5]) # 0 ~ 59
  24.  
  25. # 年の補正 (例: 年<50 => 2000年代, それ以外 => 1900年代)
  26. if year < 50:
  27. year_full = 2000 + year
  28. else:
  29. year_full = 1900 + year
  30.  
  31. try:
  32. return datetime(year_full, month, day, hour, minu, sec)
  33. except ValueError:
  34. # 不正な日時など
  35. return None
  36.  
  37. def read_4bit_2s_comp_no_cross(data: bytes, bit_offset: int) -> int:
  38. """
  39. data バッファの bit_offset ビット目から 4 ビットを取り出し、
  40. 2 の補数(4bit)として返す (バイト境界は跨がない前提)。
  41. - 0x0~0x7 => 0~7, 0x8~0xF => -8 ~ -1
  42. """
  43. byte_index = bit_offset // 8
  44. bit_in_byte = bit_offset % 8 # 0~7
  45.  
  46. val = data[byte_index] if byte_index < len(data) else 0
  47. shift_amount = (8 - 4) - bit_in_byte
  48. nibble = (val >> shift_amount) & 0x0F
  49.  
  50. if nibble >= 8:
  51. nibble -= 16 # => -8 ~ -1
  52. return nibble
  53.  
  54.  
  55. def parse_channel_block(data: bytes, offset: int):
  56. """
  57. 1つのチャネルブロックをパースして情報を返す。
  58. 戻り値: (消費バイト数, dict(channel=<ch_num>,
  59. sample_size_info=<info>,
  60. sampling_rate=<rate>,
  61. samples=[<int値>, ...]))
  62. """
  63. if offset + 4 > len(data):
  64. return (0, None)
  65. chan_header = data[offset : offset+4]
  66. # チャネル番号
  67. ch_num_raw = int.from_bytes(chan_header[0:2], 'big', signed=False)
  68. ch_num = f"{ch_num_raw:04X}" # 90EA のような16進大文字表記
  69. # サンプリングレート等
  70. last2 = int.from_bytes(chan_header[2:4], 'big', signed=False)
  71. sample_size_info = (last2 >> 12) & 0x0F
  72. sampling_rate = last2 & 0x0FFF
  73. pos = offset + 4
  74. # 先頭サンプル 4バイト
  75. if pos + 4 > len(data):
  76. return (4, {
  77. 'channel': ch_num,
  78. 'sample_size_info': sample_size_info,
  79. 'sampling_rate': sampling_rate,
  80. 'samples': [],
  81. })
  82. first_sample_bytes = data[pos : pos+4]
  83. pos += 4
  84. current_value = int.from_bytes(first_sample_bytes, 'big', signed=True)
  85. samples = [current_value]
  86. diff_count = sampling_rate - 1
  87. if sample_size_info == 0:
  88. # 4bit(0.5バイト) 差分
  89. bit_offset = pos * 8
  90. for _ in range(diff_count):
  91. #nibble = read_4bit_2s_comp(data, bit_offset)
  92. nibble = read_4bit_2s_comp_no_cross(data, bit_offset)
  93. bit_offset += 4
  94. current_value += nibble
  95. samples.append(current_value)
  96. pos = (bit_offset + 7)//8
  97. elif sample_size_info in [1, 2, 3, 4]:
  98. # 差分
  99. sample_len = sample_size_info
  100. for _ in range(diff_count):
  101. if pos + sample_len > len(data):
  102. break
  103. diff_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True)
  104. pos += sample_len
  105. current_value += diff_val
  106. samples.append(current_value)
  107. elif sample_size_info == 5:
  108. # 4バイト絶対値
  109. sample_len = 4
  110. for _ in range(diff_count):
  111. if pos + sample_len > len(data):
  112. break
  113. abs_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True)
  114. pos += sample_len
  115. current_value = abs_val
  116. samples.append(current_value)
  117. ch_info = {
  118. 'channel': ch_num,
  119. 'sample_size_info': sample_size_info,
  120. 'sampling_rate': sampling_rate,
  121. 'samples': samples
  122. }
  123. consumed = pos - offset
  124. return (consumed, ch_info)
  125.  
  126. def read_win_file_as_dataframe(filename: str) -> pd.DataFrame:
  127. """
  128. ディスクファイル上の WINフォーマットを読み込み、
  129. ・行: 「各サンプル」の時刻 (秒ヘッダ + サンプルオフセット)
  130. ・列: チャネル番号
  131. ・値: 各サンプルの値
  132. として1つの DataFrame を返す
  133. """
  134. # "各サンプル"を行にするため、時刻単位(秒 + fraction)で紐づける。
  135. # 複数チャネルを同一時刻にマージするため、以下のような手順:
  136. # - data_dict: { pd.Timestamp: { ch_name: sampleVal, ...}, ... }
  137. # - 最後に data_dict -> DataFrame へ変換
  138. data_dict = {}
  139. with open(filename, 'rb') as f:
  140. while True:
  141. # 1) ブロックサイズ
  142. size_buf = f.read(4)
  143. if len(size_buf) < 4:
  144. break
  145. block_size = int.from_bytes(size_buf, 'big', signed=False)
  146. if block_size < 4:
  147. break
  148. # 2) 秒ブロック本体読み込み
  149. sec_block = f.read(block_size - 4)
  150. if len(sec_block) < (block_size - 4):
  151. break
  152. # a) BCD時刻 (先頭6バイト)
  153. if len(sec_block) < 6:
  154. continue
  155. bcd_part = sec_block[0:6]
  156. base_dt = decode_bcd_time(bcd_part) # datetime
  157. if base_dt is None:
  158. continue
  159. # b) チャネルブロックを読み込む
  160. offset = 6
  161. channel_results = []
  162. while offset < len(sec_block):
  163. consumed, ch_info = parse_channel_block(sec_block, offset)
  164. if not ch_info or consumed <= 0:
  165. break
  166. offset += consumed
  167. channel_results.append(ch_info)
  168. # c) チャネル結果 ⇒ サンプルの時刻と値を data_dict に格納
  169. for ch_info in channel_results:
  170. ch_num = ch_info['channel']
  171. sr = ch_info['sampling_rate']
  172. samples = ch_info['samples']
  173. # 1秒ブロック内で sr 個のサンプルがあると仮定(不足分は途中でbreakしてる場合もあり)
  174. # 各サンプル i 番は base_dt + i*(1/sr) 秒後とみなす
  175. # → Timedelta等で補正
  176. # カラム名は16進表記でも10進表記でもOK
  177. ch_name = ch_num
  178. for i, val in enumerate(samples):
  179. # サンプル時刻
  180. if sr > 1:
  181. frac_sec = i / sr
  182. else:
  183. frac_sec = 0.0
  184. sample_dt = base_dt + timedelta(seconds=frac_sec)
  185. # data_dict[sample_dt] に ch_name: val を入れる
  186. if sample_dt not in data_dict:
  187. data_dict[sample_dt] = {}
  188. data_dict[sample_dt][ch_name] = val
  189. # 辞書 -> DataFrame
  190. df = pd.DataFrame.from_dict(data_dict, orient='index')
  191. # 時刻順にソート
  192. df.sort_index(inplace=True)
  193. return df,sr
  194.  
  195. fname = "input/25012123.36"
  196. df, sr = read_win_file_as_dataframe(fname)
  197.  
  198.  

これでA/D値を取り込めます。テストしたファイルは sample_size_info = 2 のみでしたが、これまでのプログラムと出力が一致していることを確認しました。他は追々。

data frame に格納したので、物理量への変換はもちろん指定時刻から前〇〇秒とか、欠測秒をNanで埋めるとか後で容易に加工できます。

コードを読むことでファイル構造を理解するという逆の手順ですが、GPTの性能向上によって可能となりました。ラクナジダイニナッタ。

0 件のコメント:

コメントを投稿