2025年1月25日土曜日

win format

winformat
https://wwweic.eri.u-tokyo.ac.jp/WIN/man.ja/winformat.html

win file
├1秒目
│├─ブロックサイズ(4B)
│└─秒ブロック(可変長)
│    ├─秒ヘッダー(6B)
│    ├─第1チャネルブロック(1秒分)
│    │  ├─チャネルヘッダー(4B)
│    │  │  ├─チャネル番号 (2B)
│    │  │  ├─サンプルサイズ情報(0.5B=4bit)
│    │  │  └─サンプリングレート(1.5B=12bit)
│    │  └─データブロック 1チャネル・1秒分(可変長)
│    │       ├─先頭サンプル(4B)
│    │       └─差分(または生値) x (サンプリングレート-1)個
│    ├─・・・・
│    └─最終チャネルブロック(1秒分)
├2秒目
├・・・・
└最終秒


●ディスク/テープ上の win フォーマット
[ ブロックサイズ(4B) | 秒ブロック(可変長) ]
 ブロックサイズ(4B) | 秒ブロック(可変長) ]
     ... (秒数分だけ繰り返し) ...

●ブロックサイズ(4Byte)
ex.
1バイト目: 0x00
2バイト目: 0x00
3バイト目: 0x0B
4バイト目: 0xD4

big-endian として無符号整数に変換
val = int.from_bytes(b'\x00\x00\x0b\xd4', byteorder='big')
0x00000BD4 (16進数) = 3028 (10進数) 

0BD4 = 11 × 16^2 + 13 × 16^1 + 4 × 16^0
= 11 × 256 + 13 × 16 + 4 × 1
= 2816 + 208 + 4
= 3028
(3028-4)Byteが秒ブロック長

●秒ブロック
[ 秒ヘッダー(6B)| 第1チャネルブロック(1秒分) | ... | 最終チャネルブロック(1秒分) ]

●秒ヘッダー(6Byte)のBCD(Binary Coded Decimal)形式
ex.
項目 バイト数 BCD格納
1B 0x20 → 「20」 (2020年)
1B 0x10 → 「10」
1B 0x31 → 「31」
1B 0x13 → 「13」
1B 0x59 → 「59」
1B 0x08 → 「08」

●チャネルブロック(1秒分)
[ チャネルヘッダー(4B) | データブロック 1チャネル・1秒分(可変長) ]

●チャンネルヘッダー
[ チャネル番号 (2B) | サンプルサイズ情報(0.5B=4bit) | サンプリングレート(Hz)(1.5B=12bit)]

サンプルサイズ情報 サンプルサイズ(B) 差分or生値
0 0.5 差分値
1 1 差分値
2 2 差分値
3 3 差分値
4 4 差分値
5 4 生値

sample_size_info = (last2 >> 12) & 0x0F  # 上位4bit
sampling_rate    =  last2        & 0x0FFF # 下位12bit

(last2 >> 12) & 0x0F
まず last2 を右に12bitシフトして、最上位4bitを右端へ持ってくる。
& 0x0F で、その4ビットだけを取り出す (残りは 0)。
16進数:0x0F、10進数: 15、2進数: 0000 1111

last2 & 0x0FFF
0x0FFF は下位12ビットが1、それより上位4bitが0のパターン (16進定数)。
これで、last2 の下位12bitだけを取り出す。

●データブロック
[ 先頭サンプル(4B) | 差分(または生値) x (サンプリングレート-1)個 ]
すべて2の補数表示。
int.from_bytes(b, 'big', signed=True) とすることで、バイト列 b は「ビッグエンディアンの符号付き2の補数値」として解釈され、Python の整数 (int) に変換される。


2025年1月24日金曜日

win ファイル読み込み Python

 winファイルの振動データをPythonに取り込む必要に迫られました。

もともとは 既存の GUI プログラムでテキストに変換していたのですが、これだと CSV で保存してから Python で読みに行くといった無駄な時間が発生します。
C の共有メモリに Python でアクセスする方法が最も速いのですが、これは私には荷が重い。

ということで、刻々書き出される win ファイルを Python で読むことにしました。

といっても、今まで win format を読んだことがありません。確認すると、これがフクザツ。入れ子状態で説明が書かれています。

ということで、GPT-o1 の出番。
win format の解説と wdisk のソースをすべて読ませてから、読み込みスクリプトを作ってもらいました。

フムフム、ナルホド、コウイウコトダッタノカ。

import pandas as pd
from datetime import datetime, timedelta

def bcd_byte_to_int(b: int) -> int:
    """1 バイトBCDを整数に変換 (0x23 -> 23など)"""
    hi = (b & 0xF0) >> 4
    lo = (b & 0x0F)
    return hi * 10 + lo

def decode_bcd_time(bcd6: bytes) -> datetime:
    """
    秒ヘッダー6バイト (BCD: YY, MM, DD, HH, mm, SS) を datetime に変換。
    ここでは 0~49 => 2000~2049, 50~99 => 1950~1999 の例。
    """
    if len(bcd6) < 6:
        return None

    year  = bcd_byte_to_int(bcd6[0])  # 0 ~ 99
    month = bcd_byte_to_int(bcd6[1])  # 1 ~ 12
    day   = bcd_byte_to_int(bcd6[2])  # 1 ~ 31
    hour  = bcd_byte_to_int(bcd6[3])  # 0 ~ 23
    minu  = bcd_byte_to_int(bcd6[4])  # 0 ~ 59
    sec   = bcd_byte_to_int(bcd6[5])  # 0 ~ 59

    # 年の補正 (例: 年<50 => 2000年代, それ以外 => 1900年代)
    if year < 50:
        year_full = 2000 + year
    else:
        year_full = 1900 + year

    try:
        return datetime(year_full, month, day, hour, minu, sec)
    except ValueError:
        # 不正な日時など
        return None

def read_4bit_2s_comp_no_cross(data: bytes, bit_offset: int) -> int:
    """
    data バッファの bit_offset ビット目から 4 ビットを取り出し、
    2 の補数(4bit)として返す (バイト境界は跨がない前提)。
     - 0x0~0x7 => 0~7,  0x8~0xF => -8 ~ -1
    """
    byte_index = bit_offset // 8
    bit_in_byte = bit_offset % 8  # 0~7

    val = data[byte_index] if byte_index < len(data) else 0
    shift_amount = (8 - 4) - bit_in_byte
    nibble = (val >> shift_amount) & 0x0F

    if nibble >= 8:
        nibble -= 16  # => -8 ~ -1
    return nibble


def parse_channel_block(data: bytes, offset: int):
    """
    1つのチャネルブロックをパースして情報を返す。
    戻り値: (消費バイト数, dict(channel=<ch_num>,
                                 sample_size_info=<info>,
                                 sampling_rate=<rate>,
                                 samples=[<int値>, ...]))
    """
    if offset + 4 > len(data):
        return (0, None)
    chan_header = data[offset : offset+4]
    
    # チャネル番号
    ch_num_raw = int.from_bytes(chan_header[0:2], 'big', signed=False)
    ch_num = f"{ch_num_raw:04X}"  # 90EA のような16進大文字表記
    # サンプリングレート等
    last2 = int.from_bytes(chan_header[2:4], 'big', signed=False)
    sample_size_info = (last2 >> 12) & 0x0F
    sampling_rate    = last2 & 0x0FFF
    
    pos = offset + 4
    
    # 先頭サンプル 4バイト
    if pos + 4 > len(data):
        return (4, {
            'channel': ch_num,
            'sample_size_info': sample_size_info,
            'sampling_rate': sampling_rate,
            'samples': [],
        })
    first_sample_bytes = data[pos : pos+4]
    pos += 4
    
    current_value = int.from_bytes(first_sample_bytes, 'big', signed=True)
    samples = [current_value]
    
    diff_count = sampling_rate - 1
    if sample_size_info == 0:
        # 4bit(0.5バイト) 差分
        bit_offset = pos * 8
        for _ in range(diff_count):
            #nibble = read_4bit_2s_comp(data, bit_offset)
            nibble = read_4bit_2s_comp_no_cross(data, bit_offset)
            bit_offset += 4
            current_value += nibble
            samples.append(current_value)
        pos = (bit_offset + 7)//8
        
    elif sample_size_info in [1, 2, 3, 4]:
        # 差分
        sample_len = sample_size_info
        for _ in range(diff_count):
            if pos + sample_len > len(data):
                break
            diff_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True)
            pos += sample_len
            current_value += diff_val
            samples.append(current_value)
            
    elif sample_size_info == 5:
        # 4バイト絶対値
        sample_len = 4
        for _ in range(diff_count):
            if pos + sample_len > len(data):
                break
            abs_val = int.from_bytes(data[pos : pos+sample_len], 'big', signed=True)
            pos += sample_len
            current_value = abs_val
            samples.append(current_value)
    
    ch_info = {
        'channel': ch_num,
        'sample_size_info': sample_size_info,
        'sampling_rate': sampling_rate,
        'samples': samples
    }
    consumed = pos - offset
    return (consumed, ch_info)

def read_win_file_as_dataframe(filename: str) -> pd.DataFrame:
    """
    ディスクファイル上の WINフォーマットを読み込み、
    ・行: 「各サンプル」の時刻 (秒ヘッダ + サンプルオフセット)
    ・列: チャネル番号
    ・値: 各サンプルの値
    として1つの DataFrame を返す
    """
    # "各サンプル"を行にするため、時刻単位(秒 + fraction)で紐づける。
    # 複数チャネルを同一時刻にマージするため、以下のような手順:
    #   - data_dict: { pd.Timestamp: { ch_name: sampleVal, ...}, ... }
    #   - 最後に data_dict -> DataFrame へ変換
    
    data_dict = {}
    
    with open(filename, 'rb') as f:
        while True:
            # 1) ブロックサイズ
            size_buf = f.read(4)
            if len(size_buf) < 4:
                break
            block_size = int.from_bytes(size_buf, 'big', signed=False)
            if block_size < 4:
                break
            
            # 2) 秒ブロック本体読み込み
            sec_block = f.read(block_size - 4)
            if len(sec_block) < (block_size - 4):
                break
            
            # a) BCD時刻 (先頭6バイト)
            if len(sec_block) < 6:
                continue
            bcd_part = sec_block[0:6]
            base_dt = decode_bcd_time(bcd_part)  # datetime
            if base_dt is None:
                continue
            
            # b) チャネルブロックを読み込む
            offset = 6
            channel_results = []
            while offset < len(sec_block):
                consumed, ch_info = parse_channel_block(sec_block, offset)
                if not ch_info or consumed <= 0:
                    break
                offset += consumed
                channel_results.append(ch_info)
            
            # c) チャネル結果 ⇒ サンプルの時刻と値を data_dict に格納
            for ch_info in channel_results:
                ch_num = ch_info['channel']
                sr = ch_info['sampling_rate']
                samples = ch_info['samples']
                # 1秒ブロック内で sr 個のサンプルがあると仮定(不足分は途中でbreakしてる場合もあり)
                # 各サンプル i 番は base_dt + i*(1/sr) 秒後とみなす
                #   → Timedelta等で補正
                # カラム名は16進表記でも10進表記でもOK
                ch_name = ch_num
                
                for i, val in enumerate(samples):
                    # サンプル時刻
                    if sr > 1:
                        frac_sec = i / sr
                    else:
                        frac_sec = 0.0
                    
                    sample_dt = base_dt + timedelta(seconds=frac_sec)
                    
                    # data_dict[sample_dt] に ch_name: val を入れる
                    if sample_dt not in data_dict:
                        data_dict[sample_dt] = {}
                    data_dict[sample_dt][ch_name] = val
    
    # 辞書 -> DataFrame
    df = pd.DataFrame.from_dict(data_dict, orient='index')
    # 時刻順にソート
    df.sort_index(inplace=True)
    return df,sr

fname = "input/25012123.36"
df, sr = read_win_file_as_dataframe(fname)


これでA/D値を取り込めます。テストしたファイルは sample_size_info = 2 のみでしたが、これまでのプログラムと出力が一致していることを確認しました。他は追々。

data frame に格納したので、物理量への変換はもちろん指定時刻から前〇〇秒とか、欠測秒をNanで埋めるとか後で容易に加工できます。

コードを読むことでファイル構造を理解するという逆の手順ですが、GPTの性能向上によって可能となりました。ラクナジダイニナッタ。

2025年1月22日水曜日

LSMに使用する地形

手を動かして気づいたのがLSMに使用すべき地形(DEM)です。

LSMで「地すべり地形分布図」のポリゴンを正解として使用する場合、地すべり発生前の地形データは存在しないので、地すべり発生後の地形(DEM)に対し特徴量を抽出することになります。そうすると出来上がるLSMは、過去に地すべりが発生した可能性が高い場所を地形以外の特徴量を加味して危険度として表現したものになります。

一方、LSMで表現したいのは既に発生した場所ではなく、今後地すべりや崩壊が発生する危険度です。この場合、発生した場所での特徴量を発生前の地形等から求め、他の未発生の地域に展開し危険度を表すという作業になります。こちらの方が需要は高いでしょう。

LSM作成の目的によって使用するデータの選択が必要です。
正解データ(すべり・崩壊ポリゴン)と地形(DEM)には、十分に留意が必要です。


2025年1月18日土曜日

3次元すべり面推定→安定計算(逆算)→(順算)LSM作成

土質力学の視点からのLSM作成手順です。

文献に書かれている手法を組み合わせることで、複数の Landslide に対する3次元でのすべり面推定から、安定計算(逆算)によるc・φの確率分布推定、順算によるLSMの作成まで実施できました。

LSMの出来は、感覚的に良くはないが悪いとまでは言えない程度。今回は10mDEMを使用したので、数十mのすべり範囲ではスライス数が少なく精度がイマイチなのでしょう。日本全国を扱うとなると、精度と計算時間のトレードオフに悩まされるのでしょうね。
使った文献、手順は大別して3つです。

1.3次元でのすべり面推定: SLBL
文献:Structural analysis of Turtle Mountain (Alberta) using digital elevation model: Toward a progressive failure - ScienceDirect
日本の深層崩壊での適用例:ESurf - Testing a failure surface prediction and deposit reconstruction method for a landslide cluster that occurred during Typhoon Talas (Japan)

実装:GitHub - ngu/pySLBL: This ArcGIS toolbox uses the Sloping Local Base Level method (SLBL) to create a surface below a topography and converts the outputs for subsequent use in DAN3D

入力:DEM(Pre-Landslide)、崩壊・すべり範囲SHP
出力:崩壊・すべり面DEM

・崩壊・すべり前のDEM(TIF)が必要です。
・崩壊・すべり範囲ポリゴンには堆積域を含めません。
・複数のポリゴンを一つのSHPにまとめます。
・DEMとSHPの座標系を一致させます。
・個々のポリゴン毎にDEM作成する場合:Multi Area
・結果を1枚のDEMにまとめる場合:Single Area
・出力結果と崩壊・すべり発生後のDEMと見比べ、パラメータを調整します。

2.3次元安定計算(逆算)によるc・φの確率分布推定
文献:Geologic Trends in Shear Strength Properties Inferred Through Three‐Dimensional Back Analysis of Landslide Inventories - Bunn - 2020 - Journal of Geophysical Research: Earth Surface - Wiley Online Library

実装:GitHub - benalesh/Landslide-Forensicsを修正
https://phreeqc.blogspot.com/2025/01/three-dimensional-back-analysis_18.html

入力:DEM(Pre-Landslide)、崩壊・すべり範囲SHP、崩壊・すべり面DEM
出力:c・Φ確率分布MAT

・座標系は一致させておきます。
・DEM範外の崩壊・すべりポリゴンを消去しておきます。
・飽和度、水平震度、c固定、φ固定など、いくつかの仮定が必要になります。
・Catastrophic と Progressive で読み込む Tif が異なります。
・実装が古いため、MATLABの現行Verで扱うには修正が必要です。
・確率分布出力のコード追加が必要です。

3.3次元安定計算(順算)によるLSMの作成: RegionGrow3D
文献:RegionGrow3D: A deterministic analysis for characterizing discrete three-dimensional landslide source areas on a regional scale
土砂分布深度を地形から推定する場合:How well can hillslope evolution models “explain” topography? Simulating soil transport and production with high-resolution topographic dataHow well can hillslope evolution models “explain” topography? | GSA Bulletin | GeoScienceWorld

実装:ghsc / Landslide Hazards Program / RegionGrow3D · GitLab

入力:DEM(Pre-Landslide)、c・Φ確率分布MAT
出力:LSM(土質力学の視点から、すべり・崩壊確率として表現したマップ)

・飽和度、土砂分布深度、PGAなど、いくつかの仮定が必要になります。
・実装が古いため、MATLABの現行Verで扱うには修正が必要です。
・確率分布出力のコード追加が必要です。


全体を通して、1地域、数千のすべりを処理するのに数時間程度でした。案外速いので、5mDEMでも実用的な時間内で処理できそうです。
精度は落ちるものの大量に処理して確率分布としてc・φを扱う目的外でも、日本の深層崩壊例のように、ある崩壊に着目し精度よく3次元の安定性を計算するという目的にも使えそうです。

Three-Dimensional Back Analysis コード修正

文献:Geologic Trends in Shear Strength Properties Inferred Through Three‐Dimensional Back Analysis of Landslide Inventories - Bunn - 2020 - Journal of Geophysical Research: Earth Surface - Wiley Online Library

実装:GitHub - benalesh/Landslide-Forensics の修正箇所


BackAnalysis_3D.m

% Read attributes from F
%ポリゴンに属性を与えていない場合の代替え処理
%asp = F(idx).aspect;% Mean aspect of all surficial pixels
%fail_type = F(idx).FAIL_TYPE;% Fail type: 'Catastrophic' or 'Progressive'
asp = 0; %すべり方向は後に繰り返し処理で決定する
fail_type = 'Progressive'

% Create binary mask from input polygons
%[x,y] = pixcenters(geotiffinfo);%現在は廃止された関数
%[X,Y] = meshgrid(x,y);
[X,Y] = worldGrid(R);

% Loop through all features in F (extent shapefile)
for idx = 1:length(F)
    try %エラー処理追加_
        繰り返し処理
  catch ME
  warning('Slide %d: Error occurred. Skipping...\n\t%s', idx, ME.message);
        % 必要に応じて、失敗スライド用の初期値を入れておく (任意)
        F(idx).cf = double(0);
        F(idx).phif = double(0);
        F(idx).rot = double(0);
     continue;
    end
end

% F 内のすべてのフィールド名を取得
fn = fieldnames(F);
% F の各属性に対してループ
for iF = 1:numel(F)
    for iField = 1:numel(fn)
        fieldName = fn{iField};
        val = F(iF).(fieldName);
        % 数値型かつ空配列なら NaN で埋める
        if isnumeric(val) && isempty(val)
            F(iF).(fieldName) = NaN;
        end
    end
end

%ヒストグラムの作成
phif_arr=[F.phif]
idx_valid  = (phif_arr > phi_thresh); % 論理索引(true/false配列)
phif_filt  = phif_arr(idx_valid); % 有効な値のみ抽出
figure;
h = histogram(phif_filt, 10, 'Normalization','probability');
xlabel('phif (deg)');
ylabel('Probability Density');
title('Distribution of phi');
% h.Values : 各ビンの高さ (確率)
% h.BinEdges : ビン境界(N+1個)
% ビンの中心を各隣り合う境界の平均で計算
center_phi = (h.BinEdges(1:end-1) + h.BinEdges(2:end)) / 2;
prob     = h.Values;
prob_phi = center_phi;
prob_coh  = repmat(c_thresh, 1, 10);
save(fullfile(path, 'shear_strength.mat'), 'prob', 'prob_coh', 'prob_phi');


SimpJanbu3D.m

    maxC   = 1000;     % c の上限 [kN/m^2]
    maxPhi = 90;       % φ の上限 [度]
    while FRy<FDy
        if st == 1
            phi = phi+phi_inc;
            if phi >= maxPhi
                break;  % 上限到達で打ち切り
            end
        else
            c = c+c_inc;
            if c >= maxC
                break;  % 上限到達で打ち切り
            end
        end

2025年1月13日月曜日

Three-Dimensional Back Analysis

数か月前に読んでいた RegionGrow3D を実際に動かしてみました。
https://phreeqc.blogspot.com/2024/09/landslide-susceptibility-map-generated.html

いくつか修正は必要でしたが、用意したDEMでも素直に動きました。どの程度の予測性能なのか、他の LSM と比較する方法が欲しいですね。

使い勝手は良いと思います。サクション、土層厚、強度の確率分布など、いくらでも細かく設定できそうですし、逆に簡略化も可能です。すべる、すべらないの2値分類でよければ、DEMのみでもよさそうです。

確率分布の設定に利用する土質強度の逆算方法は以下。
Geologic Trends in Shear Strength Properties Inferred Through Three-Dimensional Back Analysis of Landslide Inventories.pdf

3次元での逆算です。これもコードが公開されています。計算の流れは以下の通り。

BackAnalysis_3D.m
  1. 土の単位体積重量・想定する最小 c/Φ等の入力。
  2. DEM (すべり面・地表面) と地すべり範囲を示すSHPの指定。
  3. gradient_king 関数で、滑落面の傾斜角と方位を計算。
  4. 各地すべりのポリゴン領域に対して、マスクを作成。必要な部分だけ DEM 情報を抜き出し、土の重量や間隙水圧を計算。
  5. SimpJanbu3D.m:FS=1の c or Φの逆算:縦方向FS=1に加え、横方向FSを2ステップで計算。その後、崩壊方向(asp)を回転させながらFDx=0境界を探る。
  6. F(idx) に解析結果を格納し、SHP(ba_shp)へ書き出し。
今回は RG3D の文献で使用されていた確率分布を使ったので逆算まで行っていません。逆算する場合は、SLBLですべり面を求めてから動かすのが楽かな。