Skip to content

Postural Transition Detection

Postural Transition Detection (Pham)

This algorithm aims to detect postural transitions (e.g., sit to stand or stand to sit movements) using accelerometer and gyroscope data collected from a lower back inertial measurement unit (IMU) sensor.

The algorithm is designed to be robust in detecting postural transitions using inertial sensor data and provides detailed information about these transitions. It starts by loading the accelerometer and gyro data, which includes three columns corresponding to the acceleration and gyro signals across the x, y, and z axes, along with the sampling frequency of the data. It first checks the validity of the input data. Then, it calculates the sampling period, selects accelerometer and gyro data. Tilt angle estimation is performed using gyro data in lateral or anteroposterior direction which represent movements or rotations in the mediolateral direction. The tilt angle is decomposed using wavelet transformation to identify stationary periods. Stationary periods are detected using accelerometer variance and gyro variance. Then, peaks in the wavelet-transformed tilt signal are detected as potential postural transition events.

If there's enough stationary data, further processing is done to estimate the orientation using quaternions and to identify the beginning and end of postural transitions using gyro data. Otherwise, if there's insufficient stationary data, direction changes in gyro data are used to infer postural transitions. Finally, the detected postural transitions along with their characteristics (onset, duration, etc.) are stored in a pandas DataFrame (postural_transitions_ attribute).

In addition, spatial-temporal parameters are calculated using detected postural transitions and their characteristics by the spatio_temporal_parameters method. As a return, the postural transition id along with its spatial-temporal parameters including type of postural transition (sit to stand or stand to sit), angle of postural transition, maximum flexion velocity, and maximum extension velocity are stored in a pandas DataFrame (parameters_ attribute).

If requested (plot_results set to True), it generates plots of the accelerometer and gyroscope data along with the detected postural transitions.

Methods:

Name Description
detect

Detects sit to stand and stand to sit using accelerometer and gyro signals.

spatio_temporal_parameters

Extracts spatio-temporal parameters of the detected turns.

Examples:

>>> pham = PhamPosturalTransitionDetection()
>>> pham.detect(
        data=input_data,
        gyro_mediolateral="pelvis_GYRO_y",
        accel_unit="g",
        gyro_unit="deg/s",
        sampling_freq_Hz=200.0,
        tracking_system="imu",
        tracked_point="LowerBack",
        plot_results=False
        )
>>> print(pham.postural_transitions_)
        onset      duration     event_type              tracking_systems    tracked_points
    0   17.895     1.8          postural transition     imu                 LowerBack
    1   54.655     1.9          postural transition     imu                 LowerBack
>>> pham.spatio_temporal_parameters()
>>> print(pham.parameters_)
        type of postural transition    angle of postural transition     maximum flexion velocity    maximum extension velocity
    0   sit to stand                   53.26                            79                          8
    1   stand to sit                   47.12                            91                      120
References

[1] Pham et al. (2018). Validation of a Lower Back "Wearable"-Based Sit-to-Stand and Stand-to-Sit Algorithm... https://doi.org/10.3389/fneur.2018.00652

Source code in kielmat/modules/ptd/_pham.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
class PhamPosturalTransitionDetection:
    """
    This algorithm aims to detect postural transitions (e.g., sit to stand or stand to sit movements)
    using accelerometer and gyroscope data collected from a lower back inertial measurement unit (IMU)
    sensor.

    The algorithm is designed to be robust in detecting postural transitions using inertial sensor data
    and provides detailed information about these transitions. It starts by loading the accelerometer and
    gyro data, which includes three columns corresponding to the acceleration and gyro signals across
    the x, y, and z axes, along with the sampling frequency of the data. It first checks the validity of
    the input data. Then, it calculates the sampling period, selects accelerometer and gyro data. Tilt angle
    estimation is performed using gyro data in lateral or anteroposterior direction which represent movements
    or rotations in the mediolateral direction. The tilt angle is decomposed using wavelet transformation to
    identify stationary periods. Stationary periods are detected using accelerometer variance and gyro variance.
    Then, peaks in the wavelet-transformed tilt signal are detected as potential postural transition events.

    If there's enough stationary data, further processing is done to estimate the orientation using
    quaternions and to identify the beginning and end of postural transitions using gyro data. Otherwise,
    if there's insufficient stationary data, direction changes in gyro data are used to infer postural
    transitions. Finally, the detected postural transitions along with their characteristics (onset, duration, etc.)
    are stored in a pandas DataFrame (postural_transitions_ attribute).

    In addition, spatial-temporal parameters are calculated using detected postural transitions and their
    characteristics by the spatio_temporal_parameters method. As a return, the postural transition id along
    with its spatial-temporal parameters including type of postural transition (sit to stand or stand to sit),
    angle of postural transition, maximum flexion velocity, and maximum extension velocity are stored in a pandas
    DataFrame (parameters_ attribute).

    If requested (plot_results set to True), it generates plots of the accelerometer and gyroscope data
    along with the detected postural transitions.

    Methods:
        detect(data, gyro_mediolateral, accel_unit, gyro_unit, sampling_freq_Hz, dt_data, tracking_system, tracked_point, plot_results):
            Detects  sit to stand and stand to sit using accelerometer and gyro signals.

        spatio_temporal_parameters():
            Extracts spatio-temporal parameters of the detected turns.

    Examples:
        >>> pham = PhamPosturalTransitionDetection()
        >>> pham.detect(
                data=input_data,
                gyro_mediolateral="pelvis_GYRO_y",
                accel_unit="g",
                gyro_unit="deg/s",
                sampling_freq_Hz=200.0,
                tracking_system="imu",
                tracked_point="LowerBack",
                plot_results=False
                )
        >>> print(pham.postural_transitions_)
                onset      duration     event_type              tracking_systems    tracked_points
            0   17.895     1.8          postural transition     imu                 LowerBack
            1   54.655     1.9          postural transition     imu                 LowerBack

        >>> pham.spatio_temporal_parameters()
        >>> print(pham.parameters_)
                type of postural transition    angle of postural transition     maximum flexion velocity    maximum extension velocity
            0   sit to stand                   53.26                            79                          8
            1   stand to sit                   47.12                            91                      120

    References:
        [1] Pham et al. (2018). Validation of a Lower Back "Wearable"-Based Sit-to-Stand and Stand-to-Sit Algorithm... https://doi.org/10.3389/fneur.2018.00652
    """

    def __init__(
        self,
        cutoff_freq_hz: float = 5.0,
        thr_accel_var: float = 0.05,
        thr_gyro_var: float = 10e-2,
        min_postural_transition_angle_deg: float = 15.0,
    ):
        """
        Initializes the PhamPosturalTransitionDetection instance.

        Args:
            cutoff_freq_hz (float, optional): Cutoff frequency for low-pass Butterworth filer. Default is 5.0.
            thr_accel_var (float): Threshold value for identifying periods where the acceleartion variance is low. Default is 0.5.
            thr_gyro_var (float): Threshold value for identifying periods where the gyro variance is low. Default is 2e-4.
            min_turn_angle_deg (float): Minimum angle which is considered as postural transition in degrees. Default is 15.0.
        """
        self.cutoff_freq_hz = cutoff_freq_hz
        self.thr_accel_var = thr_accel_var
        self.thr_gyro_var = thr_gyro_var
        self.min_postural_transition_angle_deg = min_postural_transition_angle_deg

    def detect(
        self,
        data: pd.DataFrame,
        gyro_mediolateral: str,
        accel_unit: str,
        gyro_unit: str,
        sampling_freq_Hz: float,
        dt_data: Optional[pd.Series] = None,
        tracking_system: Optional[str] = None,
        tracked_point: Optional[str] = None,
        plot_results: bool = False,
    ) -> "PhamPosturalTransitionDetection":
        """
        Detects postural transitions based on the input accelerometer and gyro data.

        Args:
            data (pd.DataFrame): Input accelerometer and gyro data (N, 6) for x, y, and z axes.
            gyro_mediolateral (str): It corresponds to the gyroscope component representing movements or rotations in the mediolateral direction. This could correspond to rotation around the y-axis in your coordinate system.
            accel_unit (str): Unit of acceleration data.
            gyro_unit (str): Unit of gyro data.
            sampling_freq_Hz (float): Sampling frequency of the input data.
            dt_data (pd.Series, optional): Original datetime in the input data. If original datetime is provided, the output onset will be based on that.
            tracking_system (str, optional): Tracking systems.
            tracked_point (str, optional): Tracked points on the body.
            plot_results (bool, optional): If True, generates a plot. Default is False.

        Returns:
            The postural transition information is stored in the 'postural_transitions_' attribute,
            which is a pandas DataFrame in BIDS format with the following columns:
                - onset: Start time of the postural transition in second.
                - duration: Duration of the postural transition in second.
                - event_type: Type of the event which is postural transition.
                - tracking_systems: Name of the tracking systems.
                - tracked_points: Name of the tracked points on the body.
        """
        # check if dt_data is a pandas Series with datetime values
        if dt_data is not None and (
            not isinstance(dt_data, pd.Series)
            or not pd.api.types.is_datetime64_any_dtype(dt_data)
        ):
            raise ValueError("dt_data must be a pandas Series with datetime values")

        # check if dt_data is provided and if it is a series with the same length as data
        if dt_data is not None and len(dt_data) != len(data):
            raise ValueError("dt_data must be a series with the same length as data")

        # Check if data is a DataFrame
        if not isinstance(data, pd.DataFrame):
            raise ValueError("Input data must be a pandas DataFrame")

        # Error handling for invalid input data
        if not isinstance(data, pd.DataFrame) or data.shape[1] != 6:
            raise ValueError(
                "Input accelerometer and gyro data must be a DataFrame with 6 columns for x, y, and z axes."
            )

        # Check if sampling frequency is positive
        if sampling_freq_Hz <= 0:
            raise ValueError("Sampling frequency must be positive")

        # Check if plot_results is a boolean
        if not isinstance(plot_results, bool):
            raise ValueError("plot_results must be a boolean value")

        # Calculate sampling period
        sampling_period = 1 / sampling_freq_Hz

        # Identify the columns in the DataFrame that correspond to accelerometer data
        accel_columns = [col for col in data.columns if "ACCEL" in col]

        # Identify the columns in the DataFrame that correspond to gyroscope data
        gyro_columns = [col for col in data.columns if "GYRO" in col]

        # Ensure that there are exactly 3 columns each for accelerometer and gyroscope data
        if len(accel_columns) != 3 or len(gyro_columns) != 3:
            raise ValueError(
                "Data must contain 3 accelerometer and 3 gyroscope columns."
            )

        # Select acceleration data and convert it to numpy array format
        accel = data[accel_columns].copy().to_numpy()

        # Select gyro data and convert it to numpy array format
        gyro = data[gyro_columns].copy().to_numpy()
        self.gyro = gyro

        # Extract mediolateral gyro data using the specified index
        self.gyro_mediolateral = data[gyro_mediolateral].to_numpy()

        # Convert variations of acceleration unit to "m/s^2"
        if accel_unit in ["meters/s^2", "meter/s^2"]:
            accel_unit = "m/s^2"

        # Check unit of acceleration data if it is in "m/s^2" or "g"
        if accel_unit == "m/s^2":
            # Convert acceleration data from "m/s^2" to "g"
            accel /= 9.81
        elif accel_unit in ["g", "G"]:
            pass  # No conversion needed
        else:
            raise ValueError(
                "Invalid unit for acceleration data. Must be 'm/s^2' or 'g'"
            )

        # Check unit of gyro data if it is in deg/s or rad/s
        if gyro_unit in ["rad/s", "radians per second"]:
            # Convert gyro data from rad/s to deg/s (if not already is in deg/s)
            self.gyro = np.rad2deg(self.gyro)
            self.gyro_mediolateral = np.rad2deg(self.gyro_mediolateral)

        # Check different variations of gyro unit
        elif gyro_unit in ["degrees per second", "°/s"]:
            gyro_unit = "deg/s"

        elif gyro_unit == "deg/s":
            pass  # Gyro data is already in deg/s
        else:
            raise ValueError("Invalid unit for gyro data. Must be 'deg/s' or 'rad/s'")

        # Calculate timestamps to use in next calculation
        time = np.arange(1, len(accel[:, 0]) + 1) * sampling_period

        # Estimate tilt angle in deg
        tilt_angle_deg = preprocessing.tilt_angle_estimation(
            data=self.gyro_mediolateral, sampling_frequency_hz=sampling_freq_Hz
        )

        # Convert tilt angle to rad
        tilt_angle_rad = np.deg2rad(tilt_angle_deg)

        # Calculate sine of the tilt angle in radians
        tilt_sin = np.sin(tilt_angle_rad)

        # Apply wavelet decomposition with level of 3
        tilt_dwt_3 = preprocessing.wavelet_decomposition(
            data=tilt_sin, level=3, wavetype="coif5"
        )

        # Apply wavelet decomposition with level of 10
        tilt_dwt_10 = preprocessing.wavelet_decomposition(
            data=tilt_sin, level=10, wavetype="coif5"
        )

        # Calculate difference
        tilt_dwt = tilt_dwt_3 - tilt_dwt_10

        # Find peaks in denoised tilt signal
        # Peaks of the tilt_dwt signal with magnitude and prominence >0.2 were defined as postural transition events
        # Separate positive and negative peak detection
        positive_peaks, _ = scipy.signal.find_peaks(
            tilt_dwt, height=-0.2, prominence=0.2
        )
        negative_peaks, _ = scipy.signal.find_peaks(
            tilt_dwt, height=0.2, prominence=0.2
        )

        # Merge, sort, and eliminate redundant peaks
        self.local_peaks = np.unique(np.concatenate((positive_peaks, negative_peaks)))

        # Calculate the norm of acceleration
        accel_norm = np.sqrt(accel[:, 0] ** 2 + accel[:, 1] ** 2 + accel[:, 2] ** 2)

        # Calculate absolute value of the acceleration signal
        accel_norm = np.abs(accel_norm)

        # Detect stationary parts of the signal based on the deifned threshold
        stationary_1 = accel_norm < self.thr_accel_var
        stationary_1 = (stationary_1).astype(int)

        # Compute the variance of the moving window acceleration
        accel_var = preprocessing.moving_var(data=accel_norm, window=sampling_freq_Hz)

        # Calculate stationary_2 from acceleration variance
        stationary_2 = (accel_var <= self.thr_gyro_var).astype(int)

        # Convert unit of the gyro data from deg/s to rad/s
        self.gyro = np.deg2rad(self.gyro)

        # Calculate stationary of gyro variance
        gyro_norm = np.sqrt(
            self.gyro[:, 0] ** 2 + self.gyro[:, 1] ** 2 + self.gyro[:, 2] ** 2
        )

        # Compute the variance of the moving window of gyro
        gyro_var = preprocessing.moving_var(data=gyro_norm, window=sampling_freq_Hz)

        # Calculate stationary of gyro variance
        stationary_3 = (gyro_var <= self.thr_gyro_var).astype(int)

        # Perform stationarity checks
        stationary = stationary_1 & stationary_2 & stationary_3

        # Remove consecutive True values in the stationary array
        for i in range(len(stationary) - 1):
            if stationary[i] == 1:
                if stationary[i + 1] == 0:
                    stationary[i] = 0

        # Set initial period and check if enough stationary data is available
        # Stationary periods are defined as the episodes when the lower back of the participant was almost not moving and not rotating.
        # The thresholds are determined based on the training data set.
        init_period = 0.1

        if (
            np.sum(stationary[: int(sampling_freq_Hz * init_period)]) >= 200
        ):  # If the process is stationary in the first 2s
            # If there is enough stationary data, perform sensor fusion using accelerometer and gyro data
            (
                postural_transition_onset,
                postural_transition_type,
                self.postural_transition_angle,
                duration,
                self.flexion_max_vel,
                self.extension_max_vel,
            ) = preprocessing.process_postural_transitions_stationary_periods(
                time,
                accel,
                self.gyro,
                stationary,
                tilt_angle_deg,
                sampling_period,
                sampling_freq_Hz,
                init_period,
                self.local_peaks,
            )

            # Create a DataFrame with postural transition information
            postural_transitions_ = pd.DataFrame(
                {
                    "onset": postural_transition_onset,
                    "duration": duration,
                    "event_type": postural_transition_type,
                    "tracking_systems": tracking_system,
                    "tracked_points": tracked_point,
                }
            )

        else:
            # Handle cases where there is not enough stationary data
            # Find indices where the product of consecutive changes sign, indicating a change in direction
            iZeroCr = np.where(
                (self.gyro_mediolateral[:-1] * self.gyro_mediolateral[1:]) < 0
            )[0]

            # Calculate the difference between consecutive values
            gyro_y_diff = np.diff(self.gyro_mediolateral, axis=0)

            # Initialize arrays to store left and right indices for each local peak
            # Initialize left side indices with ones
            self.left_side = np.ones_like(self.local_peaks)

            # Initialize right side indices with length of gyro data
            self.right_side = len(self.gyro_mediolateral) * np.ones_like(
                self.local_peaks
            )

            # Loop through each local peak
            for i in range(len(self.local_peaks)):
                # Get the index of the current local peak
                postural_transitions = self.local_peaks[i]

                # Calculate distances to all zero-crossing points relative to the peak
                dist2peak = iZeroCr - postural_transitions

                # Extract distances to zero-crossing points on the left side of the peak
                dist2peak_left_side = dist2peak[dist2peak < 0]

                # Extract distances to zero-crossing points on the right side of the peak
                dist2peak_right_side = dist2peak[dist2peak > 0]

                # Iterate over distances to zero-crossing points on the left side of the peak (in reverse order)
                for j in range(len(dist2peak_left_side) - 1, -1, -1):
                    # Check if slope is down and the left side not too close to the peak (more than 200ms)
                    if gyro_y_diff[
                        postural_transitions + dist2peak_left_side[j]
                    ] < 0 and -dist2peak_left_side[j] > (0.2 * sampling_freq_Hz):
                        if j > 0:
                            # If the left side peak is far enough or small enough
                            if (
                                dist2peak_left_side[j] - dist2peak_left_side[j - 1]
                            ) >= (0.2 * sampling_freq_Hz) or (
                                tilt_angle_deg[
                                    postural_transitions + dist2peak_left_side[j - 1]
                                ]
                                - tilt_angle_deg[
                                    postural_transitions + dist2peak_left_side[j]
                                ]
                            ) > 1:
                                # Store the index of the left side
                                self.left_side[i] = (
                                    postural_transitions + dist2peak_left_side[j]
                                )
                                break
                            else:
                                self.left_side[i] = (
                                    postural_transitions + dist2peak_left_side[j]
                                )
                                break
                for j in range(len(dist2peak_right_side)):
                    if gyro_y_diff[
                        postural_transitions + dist2peak_right_side[j]
                    ] < 0 and dist2peak_right_side[j] > (0.2 * sampling_freq_Hz):
                        self.right_side[i] = (
                            postural_transitions + dist2peak_right_side[j]
                        )
                        break

                # Calculate postural transition angle
                self.postural_transition_angle = np.abs(
                    tilt_angle_deg[self.local_peaks] - tilt_angle_deg[self.left_side]
                )
                if self.left_side[0] == 1:
                    self.postural_transition_angle[0] = abs(
                        tilt_angle_deg[self.local_peaks[0]]
                        - tilt_angle_deg[self.right_side[0]]
                    )

                # Calculate duration of each postural transition
                duration = (self.right_side - self.left_side) / sampling_freq_Hz

                # Convert peak times to integers
                postural_transition_onset = time[self.local_peaks]

        # Remove too small postural transitions
        i = self.postural_transition_angle >= self.min_postural_transition_angle_deg
        postural_transition_onset = self.left_side[i] / sampling_freq_Hz
        duration = duration[i]

        # Check if dt_data is provided for datetime conversion
        if dt_data is not None:
            # Convert onset times to datetime format
            starting_datetime = dt_data.iloc[
                0
            ]  # Assuming dt_data is aligned with the signal data
            postural_transition_onset = [
                starting_datetime + pd.Timedelta(seconds=t)
                for t in postural_transition_onset
            ]

        # Create a DataFrame with postural transition information
        postural_transitions_ = pd.DataFrame(
            {
                "onset": postural_transition_onset,
                "duration": duration,
                "event_type": "postural transition",
                "tracking_systems": tracking_system,
                "tracked_points": tracked_point,
            }
        )

        # Assign the DataFrame to the 'postural_transitions_' attribute
        self.postural_transitions_ = postural_transitions_

        # If Plot_results set to true
        if plot_results:
            viz_utils.plot_postural_transitions(
                accel,
                self.gyro,
                accel_unit,
                gyro_unit,
                postural_transitions_,
                sampling_freq_Hz,
            )

        # Return an instance of the class
        return self

    def spatio_temporal_parameters(self) -> None:
        """
        Extracts spatio-temporal parameters of the detected postural transitions.

        Returns:
            The spatio-temporal parameter information is stored in the 'spatio_temporal_parameters'
            attribute, which is a pandas DataFrame as:
                - type_of_postural_transition: Type of postural transition which is either "sit to stand" or "stand to sit".
                - angel_of_postural_transition: Angle of the postural transition in degrees.
                - maximum_flexion_velocity: Maximum flexion velocity in deg/s.
                - maximum_extension_velocity: Maximum extension velocity in deg/s.
        """
        if self.postural_transitions_ is None:
            raise ValueError(
                "No postural transition detected. Please run the detect method first."
            )

        # Initialize list for postural transition types
        postural_transition_type = []

        # Distinguish between different types of postural transitions
        for i in range(len(self.local_peaks)):
            gyro_temp = self.gyro_mediolateral[self.left_side[i] : self.right_side[i]]
            min_peak = np.min(gyro_temp)
            max_peak = np.max(gyro_temp)
            if (abs(max_peak) - abs(min_peak)) > 0.5:
                postural_transition_type.append("sit to stand")
            else:
                postural_transition_type.append("stand to sit")

        # Postural transition type and angle determination
        i = self.postural_transition_angle >= self.min_postural_transition_angle_deg
        postural_transition_type = [
            postural_transition_type[idx]
            for idx, val in enumerate(postural_transition_type)
            if i[idx]
        ]
        self.postural_transition_angle = self.postural_transition_angle[i]

        # Calculate maximum flexion velocity and maximum extension velocity
        flexion_max_vel = np.zeros_like(self.local_peaks)
        extension_max_vel = np.zeros_like(self.local_peaks)

        for id in range(len(self.local_peaks)):
            flexion_max_vel[id] = max(
                abs(self.gyro_mediolateral[self.left_side[id] : self.local_peaks[id]])
            )
            extension_max_vel[id] = max(
                abs(self.gyro_mediolateral[self.local_peaks[id] : self.right_side[id]])
            )

        # Filter the velocities based on valid indices
        flexion_max_vel = [
            flexion_max_vel[idx] for idx, val in enumerate(flexion_max_vel) if i[idx]
        ]

        # Calculate maximum extension velocity
        extension_max_vel = [
            extension_max_vel[idx]
            for idx, val in enumerate(extension_max_vel)
            if i[idx]
        ]

        # Create a DataFrame with the calculated spatio-temporal parameters
        self.parameters_ = pd.DataFrame(
            {
                "type_of_postural_transition": postural_transition_type,
                "angle_of_postural_transition": self.postural_transition_angle,
                "maximum_flexion_velocity": flexion_max_vel,
                "maximum_extension_velocity": extension_max_vel,
            }
        )

        # Set the index name to 'postural transition id'
        self.parameters_.index.name = "postural transition id"

__init__(cutoff_freq_hz=5.0, thr_accel_var=0.05, thr_gyro_var=0.1, min_postural_transition_angle_deg=15.0)

Initializes the PhamPosturalTransitionDetection instance.

Parameters:

Name Type Description Default
cutoff_freq_hz float

Cutoff frequency for low-pass Butterworth filer. Default is 5.0.

5.0
thr_accel_var float

Threshold value for identifying periods where the acceleartion variance is low. Default is 0.5.

0.05
thr_gyro_var float

Threshold value for identifying periods where the gyro variance is low. Default is 2e-4.

0.1
min_turn_angle_deg float

Minimum angle which is considered as postural transition in degrees. Default is 15.0.

required
Source code in kielmat/modules/ptd/_pham.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def __init__(
    self,
    cutoff_freq_hz: float = 5.0,
    thr_accel_var: float = 0.05,
    thr_gyro_var: float = 10e-2,
    min_postural_transition_angle_deg: float = 15.0,
):
    """
    Initializes the PhamPosturalTransitionDetection instance.

    Args:
        cutoff_freq_hz (float, optional): Cutoff frequency for low-pass Butterworth filer. Default is 5.0.
        thr_accel_var (float): Threshold value for identifying periods where the acceleartion variance is low. Default is 0.5.
        thr_gyro_var (float): Threshold value for identifying periods where the gyro variance is low. Default is 2e-4.
        min_turn_angle_deg (float): Minimum angle which is considered as postural transition in degrees. Default is 15.0.
    """
    self.cutoff_freq_hz = cutoff_freq_hz
    self.thr_accel_var = thr_accel_var
    self.thr_gyro_var = thr_gyro_var
    self.min_postural_transition_angle_deg = min_postural_transition_angle_deg

detect(data, gyro_mediolateral, accel_unit, gyro_unit, sampling_freq_Hz, dt_data=None, tracking_system=None, tracked_point=None, plot_results=False)

Detects postural transitions based on the input accelerometer and gyro data.

Parameters:

Name Type Description Default
data DataFrame

Input accelerometer and gyro data (N, 6) for x, y, and z axes.

required
gyro_mediolateral str

It corresponds to the gyroscope component representing movements or rotations in the mediolateral direction. This could correspond to rotation around the y-axis in your coordinate system.

required
accel_unit str

Unit of acceleration data.

required
gyro_unit str

Unit of gyro data.

required
sampling_freq_Hz float

Sampling frequency of the input data.

required
dt_data Series

Original datetime in the input data. If original datetime is provided, the output onset will be based on that.

None
tracking_system str

Tracking systems.

None
tracked_point str

Tracked points on the body.

None
plot_results bool

If True, generates a plot. Default is False.

False

Returns:

Type Description
PhamPosturalTransitionDetection

The postural transition information is stored in the 'postural_transitions_' attribute,

PhamPosturalTransitionDetection

which is a pandas DataFrame in BIDS format with the following columns: - onset: Start time of the postural transition in second. - duration: Duration of the postural transition in second. - event_type: Type of the event which is postural transition. - tracking_systems: Name of the tracking systems. - tracked_points: Name of the tracked points on the body.

Source code in kielmat/modules/ptd/_pham.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
def detect(
    self,
    data: pd.DataFrame,
    gyro_mediolateral: str,
    accel_unit: str,
    gyro_unit: str,
    sampling_freq_Hz: float,
    dt_data: Optional[pd.Series] = None,
    tracking_system: Optional[str] = None,
    tracked_point: Optional[str] = None,
    plot_results: bool = False,
) -> "PhamPosturalTransitionDetection":
    """
    Detects postural transitions based on the input accelerometer and gyro data.

    Args:
        data (pd.DataFrame): Input accelerometer and gyro data (N, 6) for x, y, and z axes.
        gyro_mediolateral (str): It corresponds to the gyroscope component representing movements or rotations in the mediolateral direction. This could correspond to rotation around the y-axis in your coordinate system.
        accel_unit (str): Unit of acceleration data.
        gyro_unit (str): Unit of gyro data.
        sampling_freq_Hz (float): Sampling frequency of the input data.
        dt_data (pd.Series, optional): Original datetime in the input data. If original datetime is provided, the output onset will be based on that.
        tracking_system (str, optional): Tracking systems.
        tracked_point (str, optional): Tracked points on the body.
        plot_results (bool, optional): If True, generates a plot. Default is False.

    Returns:
        The postural transition information is stored in the 'postural_transitions_' attribute,
        which is a pandas DataFrame in BIDS format with the following columns:
            - onset: Start time of the postural transition in second.
            - duration: Duration of the postural transition in second.
            - event_type: Type of the event which is postural transition.
            - tracking_systems: Name of the tracking systems.
            - tracked_points: Name of the tracked points on the body.
    """
    # check if dt_data is a pandas Series with datetime values
    if dt_data is not None and (
        not isinstance(dt_data, pd.Series)
        or not pd.api.types.is_datetime64_any_dtype(dt_data)
    ):
        raise ValueError("dt_data must be a pandas Series with datetime values")

    # check if dt_data is provided and if it is a series with the same length as data
    if dt_data is not None and len(dt_data) != len(data):
        raise ValueError("dt_data must be a series with the same length as data")

    # Check if data is a DataFrame
    if not isinstance(data, pd.DataFrame):
        raise ValueError("Input data must be a pandas DataFrame")

    # Error handling for invalid input data
    if not isinstance(data, pd.DataFrame) or data.shape[1] != 6:
        raise ValueError(
            "Input accelerometer and gyro data must be a DataFrame with 6 columns for x, y, and z axes."
        )

    # Check if sampling frequency is positive
    if sampling_freq_Hz <= 0:
        raise ValueError("Sampling frequency must be positive")

    # Check if plot_results is a boolean
    if not isinstance(plot_results, bool):
        raise ValueError("plot_results must be a boolean value")

    # Calculate sampling period
    sampling_period = 1 / sampling_freq_Hz

    # Identify the columns in the DataFrame that correspond to accelerometer data
    accel_columns = [col for col in data.columns if "ACCEL" in col]

    # Identify the columns in the DataFrame that correspond to gyroscope data
    gyro_columns = [col for col in data.columns if "GYRO" in col]

    # Ensure that there are exactly 3 columns each for accelerometer and gyroscope data
    if len(accel_columns) != 3 or len(gyro_columns) != 3:
        raise ValueError(
            "Data must contain 3 accelerometer and 3 gyroscope columns."
        )

    # Select acceleration data and convert it to numpy array format
    accel = data[accel_columns].copy().to_numpy()

    # Select gyro data and convert it to numpy array format
    gyro = data[gyro_columns].copy().to_numpy()
    self.gyro = gyro

    # Extract mediolateral gyro data using the specified index
    self.gyro_mediolateral = data[gyro_mediolateral].to_numpy()

    # Convert variations of acceleration unit to "m/s^2"
    if accel_unit in ["meters/s^2", "meter/s^2"]:
        accel_unit = "m/s^2"

    # Check unit of acceleration data if it is in "m/s^2" or "g"
    if accel_unit == "m/s^2":
        # Convert acceleration data from "m/s^2" to "g"
        accel /= 9.81
    elif accel_unit in ["g", "G"]:
        pass  # No conversion needed
    else:
        raise ValueError(
            "Invalid unit for acceleration data. Must be 'm/s^2' or 'g'"
        )

    # Check unit of gyro data if it is in deg/s or rad/s
    if gyro_unit in ["rad/s", "radians per second"]:
        # Convert gyro data from rad/s to deg/s (if not already is in deg/s)
        self.gyro = np.rad2deg(self.gyro)
        self.gyro_mediolateral = np.rad2deg(self.gyro_mediolateral)

    # Check different variations of gyro unit
    elif gyro_unit in ["degrees per second", "°/s"]:
        gyro_unit = "deg/s"

    elif gyro_unit == "deg/s":
        pass  # Gyro data is already in deg/s
    else:
        raise ValueError("Invalid unit for gyro data. Must be 'deg/s' or 'rad/s'")

    # Calculate timestamps to use in next calculation
    time = np.arange(1, len(accel[:, 0]) + 1) * sampling_period

    # Estimate tilt angle in deg
    tilt_angle_deg = preprocessing.tilt_angle_estimation(
        data=self.gyro_mediolateral, sampling_frequency_hz=sampling_freq_Hz
    )

    # Convert tilt angle to rad
    tilt_angle_rad = np.deg2rad(tilt_angle_deg)

    # Calculate sine of the tilt angle in radians
    tilt_sin = np.sin(tilt_angle_rad)

    # Apply wavelet decomposition with level of 3
    tilt_dwt_3 = preprocessing.wavelet_decomposition(
        data=tilt_sin, level=3, wavetype="coif5"
    )

    # Apply wavelet decomposition with level of 10
    tilt_dwt_10 = preprocessing.wavelet_decomposition(
        data=tilt_sin, level=10, wavetype="coif5"
    )

    # Calculate difference
    tilt_dwt = tilt_dwt_3 - tilt_dwt_10

    # Find peaks in denoised tilt signal
    # Peaks of the tilt_dwt signal with magnitude and prominence >0.2 were defined as postural transition events
    # Separate positive and negative peak detection
    positive_peaks, _ = scipy.signal.find_peaks(
        tilt_dwt, height=-0.2, prominence=0.2
    )
    negative_peaks, _ = scipy.signal.find_peaks(
        tilt_dwt, height=0.2, prominence=0.2
    )

    # Merge, sort, and eliminate redundant peaks
    self.local_peaks = np.unique(np.concatenate((positive_peaks, negative_peaks)))

    # Calculate the norm of acceleration
    accel_norm = np.sqrt(accel[:, 0] ** 2 + accel[:, 1] ** 2 + accel[:, 2] ** 2)

    # Calculate absolute value of the acceleration signal
    accel_norm = np.abs(accel_norm)

    # Detect stationary parts of the signal based on the deifned threshold
    stationary_1 = accel_norm < self.thr_accel_var
    stationary_1 = (stationary_1).astype(int)

    # Compute the variance of the moving window acceleration
    accel_var = preprocessing.moving_var(data=accel_norm, window=sampling_freq_Hz)

    # Calculate stationary_2 from acceleration variance
    stationary_2 = (accel_var <= self.thr_gyro_var).astype(int)

    # Convert unit of the gyro data from deg/s to rad/s
    self.gyro = np.deg2rad(self.gyro)

    # Calculate stationary of gyro variance
    gyro_norm = np.sqrt(
        self.gyro[:, 0] ** 2 + self.gyro[:, 1] ** 2 + self.gyro[:, 2] ** 2
    )

    # Compute the variance of the moving window of gyro
    gyro_var = preprocessing.moving_var(data=gyro_norm, window=sampling_freq_Hz)

    # Calculate stationary of gyro variance
    stationary_3 = (gyro_var <= self.thr_gyro_var).astype(int)

    # Perform stationarity checks
    stationary = stationary_1 & stationary_2 & stationary_3

    # Remove consecutive True values in the stationary array
    for i in range(len(stationary) - 1):
        if stationary[i] == 1:
            if stationary[i + 1] == 0:
                stationary[i] = 0

    # Set initial period and check if enough stationary data is available
    # Stationary periods are defined as the episodes when the lower back of the participant was almost not moving and not rotating.
    # The thresholds are determined based on the training data set.
    init_period = 0.1

    if (
        np.sum(stationary[: int(sampling_freq_Hz * init_period)]) >= 200
    ):  # If the process is stationary in the first 2s
        # If there is enough stationary data, perform sensor fusion using accelerometer and gyro data
        (
            postural_transition_onset,
            postural_transition_type,
            self.postural_transition_angle,
            duration,
            self.flexion_max_vel,
            self.extension_max_vel,
        ) = preprocessing.process_postural_transitions_stationary_periods(
            time,
            accel,
            self.gyro,
            stationary,
            tilt_angle_deg,
            sampling_period,
            sampling_freq_Hz,
            init_period,
            self.local_peaks,
        )

        # Create a DataFrame with postural transition information
        postural_transitions_ = pd.DataFrame(
            {
                "onset": postural_transition_onset,
                "duration": duration,
                "event_type": postural_transition_type,
                "tracking_systems": tracking_system,
                "tracked_points": tracked_point,
            }
        )

    else:
        # Handle cases where there is not enough stationary data
        # Find indices where the product of consecutive changes sign, indicating a change in direction
        iZeroCr = np.where(
            (self.gyro_mediolateral[:-1] * self.gyro_mediolateral[1:]) < 0
        )[0]

        # Calculate the difference between consecutive values
        gyro_y_diff = np.diff(self.gyro_mediolateral, axis=0)

        # Initialize arrays to store left and right indices for each local peak
        # Initialize left side indices with ones
        self.left_side = np.ones_like(self.local_peaks)

        # Initialize right side indices with length of gyro data
        self.right_side = len(self.gyro_mediolateral) * np.ones_like(
            self.local_peaks
        )

        # Loop through each local peak
        for i in range(len(self.local_peaks)):
            # Get the index of the current local peak
            postural_transitions = self.local_peaks[i]

            # Calculate distances to all zero-crossing points relative to the peak
            dist2peak = iZeroCr - postural_transitions

            # Extract distances to zero-crossing points on the left side of the peak
            dist2peak_left_side = dist2peak[dist2peak < 0]

            # Extract distances to zero-crossing points on the right side of the peak
            dist2peak_right_side = dist2peak[dist2peak > 0]

            # Iterate over distances to zero-crossing points on the left side of the peak (in reverse order)
            for j in range(len(dist2peak_left_side) - 1, -1, -1):
                # Check if slope is down and the left side not too close to the peak (more than 200ms)
                if gyro_y_diff[
                    postural_transitions + dist2peak_left_side[j]
                ] < 0 and -dist2peak_left_side[j] > (0.2 * sampling_freq_Hz):
                    if j > 0:
                        # If the left side peak is far enough or small enough
                        if (
                            dist2peak_left_side[j] - dist2peak_left_side[j - 1]
                        ) >= (0.2 * sampling_freq_Hz) or (
                            tilt_angle_deg[
                                postural_transitions + dist2peak_left_side[j - 1]
                            ]
                            - tilt_angle_deg[
                                postural_transitions + dist2peak_left_side[j]
                            ]
                        ) > 1:
                            # Store the index of the left side
                            self.left_side[i] = (
                                postural_transitions + dist2peak_left_side[j]
                            )
                            break
                        else:
                            self.left_side[i] = (
                                postural_transitions + dist2peak_left_side[j]
                            )
                            break
            for j in range(len(dist2peak_right_side)):
                if gyro_y_diff[
                    postural_transitions + dist2peak_right_side[j]
                ] < 0 and dist2peak_right_side[j] > (0.2 * sampling_freq_Hz):
                    self.right_side[i] = (
                        postural_transitions + dist2peak_right_side[j]
                    )
                    break

            # Calculate postural transition angle
            self.postural_transition_angle = np.abs(
                tilt_angle_deg[self.local_peaks] - tilt_angle_deg[self.left_side]
            )
            if self.left_side[0] == 1:
                self.postural_transition_angle[0] = abs(
                    tilt_angle_deg[self.local_peaks[0]]
                    - tilt_angle_deg[self.right_side[0]]
                )

            # Calculate duration of each postural transition
            duration = (self.right_side - self.left_side) / sampling_freq_Hz

            # Convert peak times to integers
            postural_transition_onset = time[self.local_peaks]

    # Remove too small postural transitions
    i = self.postural_transition_angle >= self.min_postural_transition_angle_deg
    postural_transition_onset = self.left_side[i] / sampling_freq_Hz
    duration = duration[i]

    # Check if dt_data is provided for datetime conversion
    if dt_data is not None:
        # Convert onset times to datetime format
        starting_datetime = dt_data.iloc[
            0
        ]  # Assuming dt_data is aligned with the signal data
        postural_transition_onset = [
            starting_datetime + pd.Timedelta(seconds=t)
            for t in postural_transition_onset
        ]

    # Create a DataFrame with postural transition information
    postural_transitions_ = pd.DataFrame(
        {
            "onset": postural_transition_onset,
            "duration": duration,
            "event_type": "postural transition",
            "tracking_systems": tracking_system,
            "tracked_points": tracked_point,
        }
    )

    # Assign the DataFrame to the 'postural_transitions_' attribute
    self.postural_transitions_ = postural_transitions_

    # If Plot_results set to true
    if plot_results:
        viz_utils.plot_postural_transitions(
            accel,
            self.gyro,
            accel_unit,
            gyro_unit,
            postural_transitions_,
            sampling_freq_Hz,
        )

    # Return an instance of the class
    return self

spatio_temporal_parameters()

Extracts spatio-temporal parameters of the detected postural transitions.

Returns:

Type Description
None

The spatio-temporal parameter information is stored in the 'spatio_temporal_parameters'

None

attribute, which is a pandas DataFrame as: - type_of_postural_transition: Type of postural transition which is either "sit to stand" or "stand to sit". - angel_of_postural_transition: Angle of the postural transition in degrees. - maximum_flexion_velocity: Maximum flexion velocity in deg/s. - maximum_extension_velocity: Maximum extension velocity in deg/s.

Source code in kielmat/modules/ptd/_pham.py
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
def spatio_temporal_parameters(self) -> None:
    """
    Extracts spatio-temporal parameters of the detected postural transitions.

    Returns:
        The spatio-temporal parameter information is stored in the 'spatio_temporal_parameters'
        attribute, which is a pandas DataFrame as:
            - type_of_postural_transition: Type of postural transition which is either "sit to stand" or "stand to sit".
            - angel_of_postural_transition: Angle of the postural transition in degrees.
            - maximum_flexion_velocity: Maximum flexion velocity in deg/s.
            - maximum_extension_velocity: Maximum extension velocity in deg/s.
    """
    if self.postural_transitions_ is None:
        raise ValueError(
            "No postural transition detected. Please run the detect method first."
        )

    # Initialize list for postural transition types
    postural_transition_type = []

    # Distinguish between different types of postural transitions
    for i in range(len(self.local_peaks)):
        gyro_temp = self.gyro_mediolateral[self.left_side[i] : self.right_side[i]]
        min_peak = np.min(gyro_temp)
        max_peak = np.max(gyro_temp)
        if (abs(max_peak) - abs(min_peak)) > 0.5:
            postural_transition_type.append("sit to stand")
        else:
            postural_transition_type.append("stand to sit")

    # Postural transition type and angle determination
    i = self.postural_transition_angle >= self.min_postural_transition_angle_deg
    postural_transition_type = [
        postural_transition_type[idx]
        for idx, val in enumerate(postural_transition_type)
        if i[idx]
    ]
    self.postural_transition_angle = self.postural_transition_angle[i]

    # Calculate maximum flexion velocity and maximum extension velocity
    flexion_max_vel = np.zeros_like(self.local_peaks)
    extension_max_vel = np.zeros_like(self.local_peaks)

    for id in range(len(self.local_peaks)):
        flexion_max_vel[id] = max(
            abs(self.gyro_mediolateral[self.left_side[id] : self.local_peaks[id]])
        )
        extension_max_vel[id] = max(
            abs(self.gyro_mediolateral[self.local_peaks[id] : self.right_side[id]])
        )

    # Filter the velocities based on valid indices
    flexion_max_vel = [
        flexion_max_vel[idx] for idx, val in enumerate(flexion_max_vel) if i[idx]
    ]

    # Calculate maximum extension velocity
    extension_max_vel = [
        extension_max_vel[idx]
        for idx, val in enumerate(extension_max_vel)
        if i[idx]
    ]

    # Create a DataFrame with the calculated spatio-temporal parameters
    self.parameters_ = pd.DataFrame(
        {
            "type_of_postural_transition": postural_transition_type,
            "angle_of_postural_transition": self.postural_transition_angle,
            "maximum_flexion_velocity": flexion_max_vel,
            "maximum_extension_velocity": extension_max_vel,
        }
    )

    # Set the index name to 'postural transition id'
    self.parameters_.index.name = "postural transition id"