Skip to content

Turn Detection

Turn Detection (Pham)

This algorithm aims to detect turns using accelerometer and gyroscope data collected from a lower back inertial measurement unit (IMU) sensor.

The core of the algorithm lies in the detect method, where turns are identified using accelerometer and gyroscope data. The method first processes the gyro data, converting it to rad/s if needed and computing the variance to identify periods of low variance, which may indicate bias. It then calculates the gyro bias and subtracts it from the original gyro signal to remove any biases. Next, the yaw angle is computed by integrating the vertical component of the gyro data, and zero-crossings indices are found to detect turns. Then, turns are identified based on significant changes in the yaw angle.

The algorithm also accounts for hesitations, which are brief pauses or fluctuations in the signal that may occur within a turn. Hesitations are marked based on specific conditions related to the magnitude and continuity of the yaw angle changes.

Then, the detected turns are characterized by their onset and duration. Turns with angles equal to or greater than 90 degrees and durations between 0.5 and 10 seconds are selected for further analysis. Finally, the detected turns along with their characteristics (onset, duration, etc.) are stored in a pandas DataFrame (turns_ attribute).

In addition, spatial-temporal parameters are calculated using detected turns and their characteristics by the spatio_temporal_parameters method. As a return, the turn id along with its spatial-temporal parameters including direction (left or right), angle of turn and peak angular velocity are stored in a pandas DataFrame (parameters_ attribute).

Optionally, if plot_results is set to True, the algorithm generates a plot visualizing the accelerometer and gyroscope data alongside the detected turns. This visualization aids in the qualitative assessment of the algorithm's performance and provides insights into the dynamics of the detected turns.

Methods:

Name Description
detect

Detects turns using accelerometer and gyro signals.

Returns: PhamTurnDetection: an instance of the class with the detected turns stored in the 'turns_' attribute.

spatio_temporal_parameters

Extracts spatio-temporal parameters of the detected turns.

Examples:

>>> pham = PhamTurnDetection()
>>> pham.detect(
        accel_data=accel_data,
        gyro_data=gyro_data,
        gyro_vertical="pelvis_GYRO_x",
        sampling_freq_Hz=200.0,
        tracking_system="imu",
        tracked_point="LowerBack",
        plot_results=False
        )
>>> print(pham.turns_)
        onset   duration   event_type   tracking_systems    tracked_points
    0   4.04    3.26       turn         imu                 LowerBack
    1   9.44    3.35       turn         imu                 LowerBack
>>> pham.spatio_temporal_parameters()
>>> print(pham.parameters_)
        direction_of_turn   angle_of_turn   peak_angular_velocity
    0   left               -197.55          159.45
    1   right               199.69          144.67
References

[1] Pham et al. (2017). Algorithm for Turning Detection and Analysis Validated under Home-Like Conditions... https://doi.org/10.3389/fneur.2017.00135

Source code in kielmat/modules/td/_pham.py
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
class PhamTurnDetection:
    """
    This algorithm aims to detect turns using accelerometer and gyroscope data collected from a lower back
    inertial measurement unit (IMU) sensor.

    The core of the algorithm lies in the detect method, where turns are identified using accelerometer and
    gyroscope data. The method first processes the gyro data, converting it to rad/s if needed and computing
    the variance to identify periods of low variance, which may indicate bias. It then calculates the gyro bias
    and subtracts it from the original gyro signal to remove any biases. Next, the yaw angle is computed by
    integrating the vertical component of the gyro data, and zero-crossings indices are found to detect turns.
    Then, turns are identified based on significant changes in the yaw angle.

    The algorithm also accounts for hesitations, which are brief pauses or fluctuations in the signal that may
    occur within a turn. Hesitations are marked based on specific conditions related to the magnitude and
    continuity of the yaw angle changes.

    Then, the detected turns are characterized by their onset and duration. Turns with angles equal to or greater
    than 90 degrees and durations between 0.5 and 10 seconds are selected for further analysis. Finally, the detected
    turns along with their characteristics (onset, duration, etc.) are stored in a pandas DataFrame
    (turns_ attribute).

    In addition, spatial-temporal parameters are calculated using detected turns and their characteristics by
    the spatio_temporal_parameters method. As a return, the turn id along with its spatial-temporal parameters
    including direction (left or right), angle of turn and peak angular velocity are stored in a pandas DataFrame
    (parameters_ attribute).

    Optionally, if plot_results is set to True, the algorithm generates a plot visualizing the accelerometer
    and gyroscope data alongside the detected turns. This visualization aids in the qualitative assessment of
    the algorithm's performance and provides insights into the dynamics of the detected turns.

    Methods:
        detect(accel_data, gyro_data, gyro_vertical, sampling_freq_Hz, dt_data, tracking_system, tracked_point, plot_results):
            Detects turns using accelerometer and gyro signals.

            Returns:
                PhamTurnDetection: an instance of the class with the detected turns
                stored in the 'turns_' attribute.

        spatio_temporal_parameters():
            Extracts spatio-temporal parameters of the detected turns.

    Examples:
        >>> pham = PhamTurnDetection()
        >>> pham.detect(
                accel_data=accel_data,
                gyro_data=gyro_data,
                gyro_vertical="pelvis_GYRO_x",
                sampling_freq_Hz=200.0,
                tracking_system="imu",
                tracked_point="LowerBack",
                plot_results=False
                )
        >>> print(pham.turns_)
                onset   duration   event_type   tracking_systems    tracked_points
            0   4.04    3.26       turn         imu                 LowerBack
            1   9.44    3.35       turn         imu                 LowerBack

        >>> pham.spatio_temporal_parameters()
        >>> print(pham.parameters_)
                direction_of_turn   angle_of_turn   peak_angular_velocity
            0   left               -197.55          159.45
            1   right               199.69          144.67

    References:
        [1] Pham et al. (2017). Algorithm for Turning Detection and Analysis Validated under Home-Like Conditions... https://doi.org/10.3389/fneur.2017.00135
    """

    def __init__(
        self,
        thr_gyro_var: float = 2e-4,
        min_turn_duration_s: float = 0.5,
        max_turn_duration_s: float = 10,
        min_turn_angle_deg: float = 90,
    ):
        """
        Initializes the PhamTurnDetection instance.

        Args:
            thr_gyro_var (float): Threshold value for identifying periods where the variance is low. Default is 2e-4.
            min_turn_duration_s (float): Minimum duration of a turn in seconds. Default is 0.5.
            max_turn_duration_s (float): Maximum duration of a turn in seconds. Default is 10.
            min_turn_angle_deg (float): Minimum angle of a turn in degrees. Default is 90.
        """
        self.thr_gyro_var = thr_gyro_var
        self.min_turn_duration_s = min_turn_duration_s
        self.max_turn_duration_s = max_turn_duration_s
        self.min_turn_angle_deg = min_turn_angle_deg

    def detect(
        self,
        accel_data: pd.DataFrame,
        gyro_data: pd.DataFrame,
        gyro_vertical: str,
        sampling_freq_Hz: float,
        dt_data: Optional[pd.Series] = None,
        tracking_system: Optional[str] = None,
        tracked_point: Optional[str] = None,
        plot_results: bool = False,
    ) -> pd.DataFrame:
        """
        Detects truns based on the input accelerometer and gyro data.

        Args:
            accel_data (pd.DataFrame): Input accelerometer data (N, 3) for x, y, and z axes.
            gyro_data (pd.DataFrame): Input gyro data (N, 3) for x, y, and z axes.
            gyro_vertical (str): The column name that corresponds to the vertical component gyro.
            sampling_freq_Hz (float): Sampling frequency of the input data in Hz.
            dt_data (pd.Series, optional): Original datetime in the input data. If original datetime is provided, the output onset will be based on that.
            tracking_system (str, optional): Tracking systems.
            tracked_point (str, optional): Tracked points on the body.
            plot_results (bool, optional): If True, generates a plot. Default is False.

        Returns:
            The turns information is stored in the 'turns_' attribute, which is a pandas DataFrame in BIDS format with the following information:

                - onset: Start time of the turn in second.
                - duration: Duration of the turn in second.
                - event_type: Type of the event (turn).
                - tracking_systems: Name of the tracking systems.
                - tracked_points: Name of the tracked points on the body.
        """
        # check if dt_data is a pandas Series with datetime values
        if dt_data is not None:
            # Ensure dt_data is a pandas Series
            if not isinstance(dt_data, pd.Series):
                raise ValueError("dt_data must be a pandas Series with datetime values")

            # Ensure dt_data has datetime values
            if not pd.api.types.is_datetime64_any_dtype(dt_data):
                raise ValueError("dt_data must be a pandas Series with datetime values")

            # Ensure dt_data has the same length as input data
            if len(dt_data) != len(accel_data):
                raise ValueError(
                    "dt_data must be a series with the same length as data"
                )

        # Check if data is a DataFrame
        if not isinstance(accel_data, pd.DataFrame):
            raise ValueError("Acceleration data must be a pandas DataFrame")

        if not isinstance(gyro_data, pd.DataFrame):
            raise ValueError("Gyro data must be a pandas DataFrame")

        # Error handling for invalid input data
        if not isinstance(accel_data, pd.DataFrame) or accel_data.shape[1] != 3:
            raise ValueError(
                "Accelerometer data must be a DataFrame with 3 columns for x, y, and z axes."
            )

        if not isinstance(gyro_data, pd.DataFrame) or gyro_data.shape[1] != 3:
            raise ValueError(
                "Gyro data must be a DataFrame with 3 columns for x, y, and z axes."
            )

        # Check if sampling frequency is positive
        if sampling_freq_Hz <= 0:
            raise ValueError("Sampling frequency must be positive")

        # Check if plot_results is a boolean
        if not isinstance(plot_results, bool):
            raise ValueError("plot_results must be a boolean value")

        # Select acceleration data and convert it to numpy array format
        accel = accel_data.to_numpy()

        # Select gyro data and convert it to numpy array format
        gyro = gyro_data.to_numpy()
        self.gyro = gyro

        # Convert acceleration data from "m/s^2" to "g"
        accel /= 9.81

        # Convert gyro data from deg/s to rad/s
        gyro = np.deg2rad(gyro)

        # Compute the variance of the moving window of gyro signal
        gyro_vars = []

        for i in range(3):
            gyro_var = preprocessing.moving_var(gyro[:, i], sampling_freq_Hz)
            gyro_vars.append(gyro_var)
        gyro_vars = np.array(gyro_vars)
        gyro_vars = np.mean(gyro_vars, axis=0)

        # Find bias period
        bias_periods = np.where(gyro_vars < self.thr_gyro_var)[0]

        # Remove the last sampling_freq_Hz samples from bias_periods to avoid edge effects
        bias_periods = bias_periods[bias_periods < (len(gyro_vars) - sampling_freq_Hz)]

        # Compute gyro bias (mean of gyro signal during bias periods)
        self.gyro_bias = np.mean(gyro[bias_periods, :], axis=0)

        # Subtract gyro bias from the original gyro signal
        gyro_unbiased = gyro - self.gyro_bias

        # Get the index of the vertical component of gyro from data
        gyro_vertical_index = [
            i for i, col in enumerate(gyro_data) if gyro_vertical in col
        ][0]

        # Integrate x component of the gyro signal to get yaw angle (also convert gyro unit to deg/s)
        self.yaw = (
            scipy.integrate.cumulative_trapezoid(
                np.rad2deg(gyro_unbiased[:, gyro_vertical_index]), initial=0
            )
            / sampling_freq_Hz
        )

        # Find zero-crossings indices
        self.index_zero_crossings = np.where(
            np.diff(np.sign(gyro[:, gyro_vertical_index]))
        )[0]

        # Calculate turns from yaw angle
        self.turns_all = (
            self.yaw[self.index_zero_crossings[1:]]
            - self.yaw[self.index_zero_crossings[:-1]]
        )

        # Marks hesitations in the signal
        # Initialize an array to mark hesitations
        hesitation_markers = np.zeros(len(self.turns_all))

        # Loop through each index in the turns_all array
        for i in range(len(self.turns_all)):
            # Check if the absolute value of the turn angle at index i is greater than or equal to 10
            if abs(self.turns_all[i]) >= 10:
                # Loop to search for potential hesitations
                for j in range(i + 1, len(self.turns_all)):
                    # Check if the difference between current index and i exceeds 4, or if the time between zero crossings exceeds half a second
                    if (j - i) > 4 or (
                        self.index_zero_crossings[j] - self.index_zero_crossings[i + 1]
                        > (sampling_freq_Hz / 2)
                    ):
                        # Break the loop if the conditions for hesitation are not met
                        break
                    else:
                        # Check conditions for hesitation:
                        # - Absolute values of both turns are greater than or equal to 10
                        # - The relative change in yaw angle is less than 20% of the minimum turn angle
                        # - The signs of both turns are the same
                        if (
                            abs(self.turns_all[i]) >= 10
                            and abs(self.turns_all[j]) >= 10
                            and abs(
                                self.yaw[self.index_zero_crossings[i + 1]]
                                - self.yaw[self.index_zero_crossings[j]]
                            )
                            / min(abs(self.turns_all[i]), abs(self.turns_all[j]))
                            < 0.2
                            and np.sign(self.turns_all[i]) == np.sign(self.turns_all[j])
                        ):
                            # Mark the range between i and j (inclusive) as a hesitation
                            hesitation_markers[i : j + 1] = 1
                            # Break the inner loop as the hesitation condition is met
                            break

        # Initialize variables to store data related to turns without hesitation
        sum_temp = 0  # Temporary sum for accumulating turn angles
        turns_no_hesitation = []  # List to store turn angles without hesitation
        flags_start_no_hesitation = (
            []
        )  # List to store start indices of turns without hesitation
        flags_end_no_hesitation = (
            []
        )  # List to store end indices of turns without hesitation
        durations_no_hesitation = (
            []
        )  # List to store durations of turns without hesitation
        z = 1  # Index for keeping track of the current turn

        # Iterate through each index in the hesitation_markers array
        for i in range(len(hesitation_markers)):
            # Check if there is a hesitation marker at the current index
            if hesitation_markers[i] == 1:
                # Check if sum_temp is zero, indicating the start of a new turn
                if sum_temp == 0:
                    f1 = self.index_zero_crossings[
                        i
                    ]  # Store the start index of the turn

                # Check if the absolute value of the turn angle is greater than or equal to 10
                if abs(self.turns_all[i]) >= 10:
                    try:
                        # Check if the next index also has a hesitation marker
                        if hesitation_markers[i + 1] != 0:
                            # Iterate through subsequent indices to find the end of the turn
                            for j in range(i + 1, len(hesitation_markers)):
                                # Check if the absolute value of the turn angle is greater than or equal to 10
                                if abs(self.turns_all[j]) >= 10:
                                    # Check if the signs of the turn angles are the same
                                    if np.sign(self.turns_all[j]) == np.sign(
                                        self.turns_all[i]
                                    ):
                                        sum_temp += self.turns_all[
                                            i
                                        ]  # Accumulate the turn angle
                                    else:
                                        f2 = hesitation_markers[
                                            i + 1
                                        ]  # Store the end index of the turn
                                        sum_temp += self.turns_all[
                                            i
                                        ]  # Accumulate the turn angle
                                        turns_no_hesitation.append(
                                            sum_temp
                                        )  # Store the turn angle without hesitation
                                        flags_start_no_hesitation.append(
                                            f1
                                        )  # Store the start index of the turn
                                        flags_end_no_hesitation.append(
                                            f2
                                        )  # Store the end index of the turn
                                        durations_no_hesitation.append(
                                            (f2 - f1) / sampling_freq_Hz
                                        )  # Calculate and store the duration of the turn
                                        z += 1  # Increment the turn index
                                        sum_temp = 0  # Reset the temporary sum
                                        del (
                                            f1,
                                            f2,
                                        )  # Delete stored indices to avoid conflicts
                                    break  # Exit the loop once the turn is processed
                        else:
                            f2 = self.index_zero_crossings[
                                i + 1
                            ]  # Store the end index of the turn
                            sum_temp += self.turns_all[i]  # Accumulate the turn angle
                            turns_no_hesitation.append(
                                sum_temp
                            )  # Store the turn angle without hesitation
                            flags_start_no_hesitation.append(
                                f1
                            )  # Store the start index of the turn
                            flags_end_no_hesitation.append(
                                f2
                            )  # Store the end index of the turn
                            durations_no_hesitation.append(
                                (f2 - f1) / sampling_freq_Hz
                            )  # Calculate and store the duration of the turn
                            z += 1  # Increment the turn index
                            del f1, f2  # Delete stored indices to avoid conflicts
                            sum_temp = 0  # Reset the temporary sum
                    except:
                        f2 = self.index_zero_crossings[
                            i + 1
                        ]  # Store the end index of the turn
                        sum_temp += self.turns_all[i]  # Accumulate the turn angle
                        turns_no_hesitation.append(
                            sum_temp
                        )  # Store the turn angle without hesitation
                        flags_start_no_hesitation.append(
                            f1
                        )  # Store the start index of the turn
                        flags_end_no_hesitation.append(
                            f2
                        )  # Store the end index of the turn
                        durations_no_hesitation.append(
                            (f2 - f1) / sampling_freq_Hz
                        )  # Calculate and store the duration of the turn
                        z += 1
                        del f1, f2
                        sum_temp = 0
                else:
                    sum_temp += self.turns_all[
                        i
                    ]  # Accumulate the turn angle if it's smaller than 10 degrees
            else:  # If there's no hesitation marker at the current index
                turns_no_hesitation.append(self.turns_all[i])
                flags_start_no_hesitation.append(self.index_zero_crossings[i])
                flags_end_no_hesitation.append(self.index_zero_crossings[i + 1])
                durations_no_hesitation.append(
                    (self.index_zero_crossings[i + 1] - self.index_zero_crossings[i])
                    / sampling_freq_Hz
                )  # Calculate and store the duration of the turn
                z += 1  # Increment the turn index

        # Initialize lists to store information about turns >= 90 degrees
        turns_90 = []
        flags_start_90 = []
        flags_end_90 = []

        # Iterate through each turn without hesitation
        for k in range(len(turns_no_hesitation)):
            # Check if the turn angle is greater than or equal to 90 degrees
            # and if the duration of the turn is between 0.5 and 10 seconds
            if (
                abs(turns_no_hesitation[k]) >= self.min_turn_angle_deg
                and durations_no_hesitation[k] >= self.min_turn_duration_s
                and durations_no_hesitation[k] < self.max_turn_duration_s
            ):
                # If conditions are met, store information about the turn
                turns_90.append(turns_no_hesitation[k])
                flags_start_90.append(flags_start_no_hesitation[k])
                flags_end_90.append(flags_end_no_hesitation[k])

        # Initialize lists to store additional information about >= 90 degree turns
        duration_90 = []

        # Assign detected truns attribute
        self.turns_90 = turns_90
        self.flags_start_90 = flags_start_90
        self.flags_end_90 = flags_end_90

        # Assign sampling frequency to the attribute
        self.sampling_freq_Hz = sampling_freq_Hz

        # Compute duration of the turn in seconds
        for k in range(len(flags_start_90)):
            # Compute duration of the turn in seconds
            duration_nsamples = self.flags_end_90[k] - self.flags_start_90[k]
            duration_90.append(duration_nsamples / sampling_freq_Hz)

        # Create a DataFrame with postural transition information
        self.turns_ = pd.DataFrame(
            {
                "onset": np.array(flags_start_90) / sampling_freq_Hz,
                "duration": duration_90,
                "event_type": "turn",
                "tracking_systems": tracking_system,
                "tracked_points": tracked_point,
            }
        )

        # If original datetime is available, update the 'onset' and 'duration'
        if dt_data is not None:
            # Update the 'onset' based on the original datetime information
            self.turns_["onset"] = dt_data.iloc[flags_start_90].reset_index(drop=True)

            # Update the 'duration' based on the difference between end and start indices
            self.turns_["duration"] = dt_data.iloc[flags_end_90].reset_index(
                drop=True
            ) - dt_data.iloc[flags_start_90].reset_index(drop=True)

        # If Plot_results set to true
        if plot_results:
            viz_utils.plot_turns(accel, gyro, self.turns_, sampling_freq_Hz)

        # Return an instance of the class
        return self

    def spatio_temporal_parameters(self) -> pd.DataFrame:
        """
        Extracts spatio-temporal parameters of the detected turns.

        Returns:
            The spatio-temporal parameter information is stored in the 'spatio_temporal_parameters' attribute, which is a pandas DataFrame as:

                - direction_of_turn: Direction of turn which is either "left" or "right".
                - angle_of_turn: Angle of the turn in degrees.
                - peak_angular_velocity: Peak angular velocity during turn in deg/s.
        """
        if self.turns_ is None:
            raise ValueError("No turns detected. Please run the detect method first.")

        # Calculate additional information for each >= 90 degree turn
        peak_angular_velocities = []
        diff_yaw = np.diff(self.yaw)

        for k in range(len(self.flags_start_90)):
            # Calculate peak angular velocity during the turn
            diff_vector = abs(
                diff_yaw[(self.flags_start_90[k] - 1) : (self.flags_end_90[k] - 1)]
            )
            peak_angular_velocities.append(np.max(diff_vector) * self.sampling_freq_Hz)

        # Determine direction of the turn (left or right)
        direction_of_turns = []

        for turn_angle in self.turns_90:
            if turn_angle < 0:
                direction_of_turns.append("left")
            else:
                direction_of_turns.append("right")

        # Create a DataFrame with the calculated spatio-temporal parameters
        self.parameters_ = pd.DataFrame(
            {
                "direction_of_turn": direction_of_turns,
                "angle_of_turn": self.turns_90,
                "peak_angular_velocity": peak_angular_velocities,
            }
        )

        # Set the index name to 'turn id'
        self.parameters_.index.name = "turn id"

__init__(thr_gyro_var=0.0002, min_turn_duration_s=0.5, max_turn_duration_s=10, min_turn_angle_deg=90)

Initializes the PhamTurnDetection instance.

Parameters:

Name Type Description Default
thr_gyro_var float

Threshold value for identifying periods where the variance is low. Default is 2e-4.

0.0002
min_turn_duration_s float

Minimum duration of a turn in seconds. Default is 0.5.

0.5
max_turn_duration_s float

Maximum duration of a turn in seconds. Default is 10.

10
min_turn_angle_deg float

Minimum angle of a turn in degrees. Default is 90.

90
Source code in kielmat/modules/td/_pham.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
def __init__(
    self,
    thr_gyro_var: float = 2e-4,
    min_turn_duration_s: float = 0.5,
    max_turn_duration_s: float = 10,
    min_turn_angle_deg: float = 90,
):
    """
    Initializes the PhamTurnDetection instance.

    Args:
        thr_gyro_var (float): Threshold value for identifying periods where the variance is low. Default is 2e-4.
        min_turn_duration_s (float): Minimum duration of a turn in seconds. Default is 0.5.
        max_turn_duration_s (float): Maximum duration of a turn in seconds. Default is 10.
        min_turn_angle_deg (float): Minimum angle of a turn in degrees. Default is 90.
    """
    self.thr_gyro_var = thr_gyro_var
    self.min_turn_duration_s = min_turn_duration_s
    self.max_turn_duration_s = max_turn_duration_s
    self.min_turn_angle_deg = min_turn_angle_deg

detect(accel_data, gyro_data, gyro_vertical, sampling_freq_Hz, dt_data=None, tracking_system=None, tracked_point=None, plot_results=False)

Detects truns based on the input accelerometer and gyro data.

Parameters:

Name Type Description Default
accel_data DataFrame

Input accelerometer data (N, 3) for x, y, and z axes.

required
gyro_data DataFrame

Input gyro data (N, 3) for x, y, and z axes.

required
gyro_vertical str

The column name that corresponds to the vertical component gyro.

required
sampling_freq_Hz float

Sampling frequency of the input data in Hz.

required
dt_data Series

Original datetime in the input data. If original datetime is provided, the output onset will be based on that.

None
tracking_system str

Tracking systems.

None
tracked_point str

Tracked points on the body.

None
plot_results bool

If True, generates a plot. Default is False.

False

Returns:

Type Description
DataFrame

The turns information is stored in the 'turns_' attribute, which is a pandas DataFrame in BIDS format with the following information:

  • onset: Start time of the turn in second.
  • duration: Duration of the turn in second.
  • event_type: Type of the event (turn).
  • tracking_systems: Name of the tracking systems.
  • tracked_points: Name of the tracked points on the body.
Source code in kielmat/modules/td/_pham.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
def detect(
    self,
    accel_data: pd.DataFrame,
    gyro_data: pd.DataFrame,
    gyro_vertical: str,
    sampling_freq_Hz: float,
    dt_data: Optional[pd.Series] = None,
    tracking_system: Optional[str] = None,
    tracked_point: Optional[str] = None,
    plot_results: bool = False,
) -> pd.DataFrame:
    """
    Detects truns based on the input accelerometer and gyro data.

    Args:
        accel_data (pd.DataFrame): Input accelerometer data (N, 3) for x, y, and z axes.
        gyro_data (pd.DataFrame): Input gyro data (N, 3) for x, y, and z axes.
        gyro_vertical (str): The column name that corresponds to the vertical component gyro.
        sampling_freq_Hz (float): Sampling frequency of the input data in Hz.
        dt_data (pd.Series, optional): Original datetime in the input data. If original datetime is provided, the output onset will be based on that.
        tracking_system (str, optional): Tracking systems.
        tracked_point (str, optional): Tracked points on the body.
        plot_results (bool, optional): If True, generates a plot. Default is False.

    Returns:
        The turns information is stored in the 'turns_' attribute, which is a pandas DataFrame in BIDS format with the following information:

            - onset: Start time of the turn in second.
            - duration: Duration of the turn in second.
            - event_type: Type of the event (turn).
            - tracking_systems: Name of the tracking systems.
            - tracked_points: Name of the tracked points on the body.
    """
    # check if dt_data is a pandas Series with datetime values
    if dt_data is not None:
        # Ensure dt_data is a pandas Series
        if not isinstance(dt_data, pd.Series):
            raise ValueError("dt_data must be a pandas Series with datetime values")

        # Ensure dt_data has datetime values
        if not pd.api.types.is_datetime64_any_dtype(dt_data):
            raise ValueError("dt_data must be a pandas Series with datetime values")

        # Ensure dt_data has the same length as input data
        if len(dt_data) != len(accel_data):
            raise ValueError(
                "dt_data must be a series with the same length as data"
            )

    # Check if data is a DataFrame
    if not isinstance(accel_data, pd.DataFrame):
        raise ValueError("Acceleration data must be a pandas DataFrame")

    if not isinstance(gyro_data, pd.DataFrame):
        raise ValueError("Gyro data must be a pandas DataFrame")

    # Error handling for invalid input data
    if not isinstance(accel_data, pd.DataFrame) or accel_data.shape[1] != 3:
        raise ValueError(
            "Accelerometer data must be a DataFrame with 3 columns for x, y, and z axes."
        )

    if not isinstance(gyro_data, pd.DataFrame) or gyro_data.shape[1] != 3:
        raise ValueError(
            "Gyro data must be a DataFrame with 3 columns for x, y, and z axes."
        )

    # Check if sampling frequency is positive
    if sampling_freq_Hz <= 0:
        raise ValueError("Sampling frequency must be positive")

    # Check if plot_results is a boolean
    if not isinstance(plot_results, bool):
        raise ValueError("plot_results must be a boolean value")

    # Select acceleration data and convert it to numpy array format
    accel = accel_data.to_numpy()

    # Select gyro data and convert it to numpy array format
    gyro = gyro_data.to_numpy()
    self.gyro = gyro

    # Convert acceleration data from "m/s^2" to "g"
    accel /= 9.81

    # Convert gyro data from deg/s to rad/s
    gyro = np.deg2rad(gyro)

    # Compute the variance of the moving window of gyro signal
    gyro_vars = []

    for i in range(3):
        gyro_var = preprocessing.moving_var(gyro[:, i], sampling_freq_Hz)
        gyro_vars.append(gyro_var)
    gyro_vars = np.array(gyro_vars)
    gyro_vars = np.mean(gyro_vars, axis=0)

    # Find bias period
    bias_periods = np.where(gyro_vars < self.thr_gyro_var)[0]

    # Remove the last sampling_freq_Hz samples from bias_periods to avoid edge effects
    bias_periods = bias_periods[bias_periods < (len(gyro_vars) - sampling_freq_Hz)]

    # Compute gyro bias (mean of gyro signal during bias periods)
    self.gyro_bias = np.mean(gyro[bias_periods, :], axis=0)

    # Subtract gyro bias from the original gyro signal
    gyro_unbiased = gyro - self.gyro_bias

    # Get the index of the vertical component of gyro from data
    gyro_vertical_index = [
        i for i, col in enumerate(gyro_data) if gyro_vertical in col
    ][0]

    # Integrate x component of the gyro signal to get yaw angle (also convert gyro unit to deg/s)
    self.yaw = (
        scipy.integrate.cumulative_trapezoid(
            np.rad2deg(gyro_unbiased[:, gyro_vertical_index]), initial=0
        )
        / sampling_freq_Hz
    )

    # Find zero-crossings indices
    self.index_zero_crossings = np.where(
        np.diff(np.sign(gyro[:, gyro_vertical_index]))
    )[0]

    # Calculate turns from yaw angle
    self.turns_all = (
        self.yaw[self.index_zero_crossings[1:]]
        - self.yaw[self.index_zero_crossings[:-1]]
    )

    # Marks hesitations in the signal
    # Initialize an array to mark hesitations
    hesitation_markers = np.zeros(len(self.turns_all))

    # Loop through each index in the turns_all array
    for i in range(len(self.turns_all)):
        # Check if the absolute value of the turn angle at index i is greater than or equal to 10
        if abs(self.turns_all[i]) >= 10:
            # Loop to search for potential hesitations
            for j in range(i + 1, len(self.turns_all)):
                # Check if the difference between current index and i exceeds 4, or if the time between zero crossings exceeds half a second
                if (j - i) > 4 or (
                    self.index_zero_crossings[j] - self.index_zero_crossings[i + 1]
                    > (sampling_freq_Hz / 2)
                ):
                    # Break the loop if the conditions for hesitation are not met
                    break
                else:
                    # Check conditions for hesitation:
                    # - Absolute values of both turns are greater than or equal to 10
                    # - The relative change in yaw angle is less than 20% of the minimum turn angle
                    # - The signs of both turns are the same
                    if (
                        abs(self.turns_all[i]) >= 10
                        and abs(self.turns_all[j]) >= 10
                        and abs(
                            self.yaw[self.index_zero_crossings[i + 1]]
                            - self.yaw[self.index_zero_crossings[j]]
                        )
                        / min(abs(self.turns_all[i]), abs(self.turns_all[j]))
                        < 0.2
                        and np.sign(self.turns_all[i]) == np.sign(self.turns_all[j])
                    ):
                        # Mark the range between i and j (inclusive) as a hesitation
                        hesitation_markers[i : j + 1] = 1
                        # Break the inner loop as the hesitation condition is met
                        break

    # Initialize variables to store data related to turns without hesitation
    sum_temp = 0  # Temporary sum for accumulating turn angles
    turns_no_hesitation = []  # List to store turn angles without hesitation
    flags_start_no_hesitation = (
        []
    )  # List to store start indices of turns without hesitation
    flags_end_no_hesitation = (
        []
    )  # List to store end indices of turns without hesitation
    durations_no_hesitation = (
        []
    )  # List to store durations of turns without hesitation
    z = 1  # Index for keeping track of the current turn

    # Iterate through each index in the hesitation_markers array
    for i in range(len(hesitation_markers)):
        # Check if there is a hesitation marker at the current index
        if hesitation_markers[i] == 1:
            # Check if sum_temp is zero, indicating the start of a new turn
            if sum_temp == 0:
                f1 = self.index_zero_crossings[
                    i
                ]  # Store the start index of the turn

            # Check if the absolute value of the turn angle is greater than or equal to 10
            if abs(self.turns_all[i]) >= 10:
                try:
                    # Check if the next index also has a hesitation marker
                    if hesitation_markers[i + 1] != 0:
                        # Iterate through subsequent indices to find the end of the turn
                        for j in range(i + 1, len(hesitation_markers)):
                            # Check if the absolute value of the turn angle is greater than or equal to 10
                            if abs(self.turns_all[j]) >= 10:
                                # Check if the signs of the turn angles are the same
                                if np.sign(self.turns_all[j]) == np.sign(
                                    self.turns_all[i]
                                ):
                                    sum_temp += self.turns_all[
                                        i
                                    ]  # Accumulate the turn angle
                                else:
                                    f2 = hesitation_markers[
                                        i + 1
                                    ]  # Store the end index of the turn
                                    sum_temp += self.turns_all[
                                        i
                                    ]  # Accumulate the turn angle
                                    turns_no_hesitation.append(
                                        sum_temp
                                    )  # Store the turn angle without hesitation
                                    flags_start_no_hesitation.append(
                                        f1
                                    )  # Store the start index of the turn
                                    flags_end_no_hesitation.append(
                                        f2
                                    )  # Store the end index of the turn
                                    durations_no_hesitation.append(
                                        (f2 - f1) / sampling_freq_Hz
                                    )  # Calculate and store the duration of the turn
                                    z += 1  # Increment the turn index
                                    sum_temp = 0  # Reset the temporary sum
                                    del (
                                        f1,
                                        f2,
                                    )  # Delete stored indices to avoid conflicts
                                break  # Exit the loop once the turn is processed
                    else:
                        f2 = self.index_zero_crossings[
                            i + 1
                        ]  # Store the end index of the turn
                        sum_temp += self.turns_all[i]  # Accumulate the turn angle
                        turns_no_hesitation.append(
                            sum_temp
                        )  # Store the turn angle without hesitation
                        flags_start_no_hesitation.append(
                            f1
                        )  # Store the start index of the turn
                        flags_end_no_hesitation.append(
                            f2
                        )  # Store the end index of the turn
                        durations_no_hesitation.append(
                            (f2 - f1) / sampling_freq_Hz
                        )  # Calculate and store the duration of the turn
                        z += 1  # Increment the turn index
                        del f1, f2  # Delete stored indices to avoid conflicts
                        sum_temp = 0  # Reset the temporary sum
                except:
                    f2 = self.index_zero_crossings[
                        i + 1
                    ]  # Store the end index of the turn
                    sum_temp += self.turns_all[i]  # Accumulate the turn angle
                    turns_no_hesitation.append(
                        sum_temp
                    )  # Store the turn angle without hesitation
                    flags_start_no_hesitation.append(
                        f1
                    )  # Store the start index of the turn
                    flags_end_no_hesitation.append(
                        f2
                    )  # Store the end index of the turn
                    durations_no_hesitation.append(
                        (f2 - f1) / sampling_freq_Hz
                    )  # Calculate and store the duration of the turn
                    z += 1
                    del f1, f2
                    sum_temp = 0
            else:
                sum_temp += self.turns_all[
                    i
                ]  # Accumulate the turn angle if it's smaller than 10 degrees
        else:  # If there's no hesitation marker at the current index
            turns_no_hesitation.append(self.turns_all[i])
            flags_start_no_hesitation.append(self.index_zero_crossings[i])
            flags_end_no_hesitation.append(self.index_zero_crossings[i + 1])
            durations_no_hesitation.append(
                (self.index_zero_crossings[i + 1] - self.index_zero_crossings[i])
                / sampling_freq_Hz
            )  # Calculate and store the duration of the turn
            z += 1  # Increment the turn index

    # Initialize lists to store information about turns >= 90 degrees
    turns_90 = []
    flags_start_90 = []
    flags_end_90 = []

    # Iterate through each turn without hesitation
    for k in range(len(turns_no_hesitation)):
        # Check if the turn angle is greater than or equal to 90 degrees
        # and if the duration of the turn is between 0.5 and 10 seconds
        if (
            abs(turns_no_hesitation[k]) >= self.min_turn_angle_deg
            and durations_no_hesitation[k] >= self.min_turn_duration_s
            and durations_no_hesitation[k] < self.max_turn_duration_s
        ):
            # If conditions are met, store information about the turn
            turns_90.append(turns_no_hesitation[k])
            flags_start_90.append(flags_start_no_hesitation[k])
            flags_end_90.append(flags_end_no_hesitation[k])

    # Initialize lists to store additional information about >= 90 degree turns
    duration_90 = []

    # Assign detected truns attribute
    self.turns_90 = turns_90
    self.flags_start_90 = flags_start_90
    self.flags_end_90 = flags_end_90

    # Assign sampling frequency to the attribute
    self.sampling_freq_Hz = sampling_freq_Hz

    # Compute duration of the turn in seconds
    for k in range(len(flags_start_90)):
        # Compute duration of the turn in seconds
        duration_nsamples = self.flags_end_90[k] - self.flags_start_90[k]
        duration_90.append(duration_nsamples / sampling_freq_Hz)

    # Create a DataFrame with postural transition information
    self.turns_ = pd.DataFrame(
        {
            "onset": np.array(flags_start_90) / sampling_freq_Hz,
            "duration": duration_90,
            "event_type": "turn",
            "tracking_systems": tracking_system,
            "tracked_points": tracked_point,
        }
    )

    # If original datetime is available, update the 'onset' and 'duration'
    if dt_data is not None:
        # Update the 'onset' based on the original datetime information
        self.turns_["onset"] = dt_data.iloc[flags_start_90].reset_index(drop=True)

        # Update the 'duration' based on the difference between end and start indices
        self.turns_["duration"] = dt_data.iloc[flags_end_90].reset_index(
            drop=True
        ) - dt_data.iloc[flags_start_90].reset_index(drop=True)

    # If Plot_results set to true
    if plot_results:
        viz_utils.plot_turns(accel, gyro, self.turns_, sampling_freq_Hz)

    # Return an instance of the class
    return self

spatio_temporal_parameters()

Extracts spatio-temporal parameters of the detected turns.

Returns:

Type Description
DataFrame

The spatio-temporal parameter information is stored in the 'spatio_temporal_parameters' attribute, which is a pandas DataFrame as:

  • direction_of_turn: Direction of turn which is either "left" or "right".
  • angle_of_turn: Angle of the turn in degrees.
  • peak_angular_velocity: Peak angular velocity during turn in deg/s.
Source code in kielmat/modules/td/_pham.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def spatio_temporal_parameters(self) -> pd.DataFrame:
    """
    Extracts spatio-temporal parameters of the detected turns.

    Returns:
        The spatio-temporal parameter information is stored in the 'spatio_temporal_parameters' attribute, which is a pandas DataFrame as:

            - direction_of_turn: Direction of turn which is either "left" or "right".
            - angle_of_turn: Angle of the turn in degrees.
            - peak_angular_velocity: Peak angular velocity during turn in deg/s.
    """
    if self.turns_ is None:
        raise ValueError("No turns detected. Please run the detect method first.")

    # Calculate additional information for each >= 90 degree turn
    peak_angular_velocities = []
    diff_yaw = np.diff(self.yaw)

    for k in range(len(self.flags_start_90)):
        # Calculate peak angular velocity during the turn
        diff_vector = abs(
            diff_yaw[(self.flags_start_90[k] - 1) : (self.flags_end_90[k] - 1)]
        )
        peak_angular_velocities.append(np.max(diff_vector) * self.sampling_freq_Hz)

    # Determine direction of the turn (left or right)
    direction_of_turns = []

    for turn_angle in self.turns_90:
        if turn_angle < 0:
            direction_of_turns.append("left")
        else:
            direction_of_turns.append("right")

    # Create a DataFrame with the calculated spatio-temporal parameters
    self.parameters_ = pd.DataFrame(
        {
            "direction_of_turn": direction_of_turns,
            "angle_of_turn": self.turns_90,
            "peak_angular_velocity": peak_angular_velocities,
        }
    )

    # Set the index name to 'turn id'
    self.parameters_.index.name = "turn id"