Skip to content

Initial Contact Detection

Initial Contact Detection (Paraschiv-Ionescu)

This Paraschiv-Ionescu initial contact detection algorithm identifies initial contact in accelerometer data collected from a low back IMU sensor. The purpose of algorithm is to identify and characterize initial contacts within walking bouts.

The algorithm takes accelerometer data as input, and the vertical acceleration component, and processes each specified gait sequence independently. The signal is first detrended and then low-pass filtered. The resulting signal is numerically integrated and differentiated using a Gaussian continuous wavelet transformation. The initial contact (IC) events are identified as the positive maximal peaks between successive zero-crossings.

Finally, initial contacts information is provided as a DataFrame with columns onset, event_type, and tracking_systems.

Methods:

Name Description
detect

Detects initial contacts on the accelerometer signal.

Examples:

Find initial contacts based on the detected gait sequence

>>> icd = ParaschivIonescuInitialContactDetection()
>>> icd = icd.detect(accel_data=acceleration_data, sampling_freq_Hz=100)
>>> print(icd.initial_contacts_)
        onset   event_type       duration   tracking_systems
    0   5       initial contact  0          SU
    1   5.6     initial contact  0          SU
References

[1] Paraschiv-Ionescu et al. (2019). Locomotion and cadence detection using a single trunk-fixed accelerometer...

[2] Paraschiv-Ionescu et al. (2020). Real-world speed estimation using single trunk IMU: methodological challenges...

Source code in kielmat/modules/icd/_paraschiv.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
class ParaschivIonescuInitialContactDetection:
    """
    This Paraschiv-Ionescu initial contact detection algorithm identifies initial contact in accelerometer data
    collected from a low back IMU sensor. The purpose of algorithm is to identify and characterize initial contacts
    within walking bouts.

    The algorithm takes accelerometer data as input, and the vertical acceleration component, and processes each
    specified gait sequence independently. The signal is first detrended and then low-pass filtered. The resulting
    signal is numerically integrated and differentiated using a Gaussian continuous wavelet transformation. The
    initial contact (IC) events are identified as the positive maximal peaks between successive zero-crossings.

    Finally, initial contacts information is provided as a DataFrame with columns `onset`, `event_type`, and
    `tracking_systems`.

    Methods:
        detect(accel_data, gait_sequences, sampling_freq_Hz):
            Detects initial contacts on the accelerometer signal.

    Examples:
        Find initial contacts based on the detected gait sequence

        >>> icd = ParaschivIonescuInitialContactDetection()
        >>> icd = icd.detect(accel_data=acceleration_data, sampling_freq_Hz=100)
        >>> print(icd.initial_contacts_)
                onset   event_type       duration   tracking_systems
            0   5       initial contact  0          SU
            1   5.6     initial contact  0          SU

    References:
        [1] Paraschiv-Ionescu et al. (2019). Locomotion and cadence detection using a single trunk-fixed accelerometer...

        [2] Paraschiv-Ionescu et al. (2020). Real-world speed estimation using single trunk IMU: methodological challenges...
    """

    def __init__(
        self,
    ):
        """
        Initializes the ParaschivIonescuInitialContactDetection instance.
        """
        self.initial_contacts_ = None

    def detect(
        self,
        accel_data: pd.DataFrame,
        sampling_freq_Hz: float,
        v_acc_col_name: str,
        gait_sequences: Optional[pd.DataFrame] = None,
        dt_data: Optional[pd.Series] = None,
        tracking_system: Optional[str] = None,
    ) -> pd.DataFrame:
        """
        Detects initial contacts based on the input accelerometer data.

        Args:
            accel_data (pd.DataFrame): Input accelerometer data (N, 3) for x, y, and z axes.
            sampling_freq_Hz (float): Sampling frequency of the accelerometer data.
            v_acc_col_name (str): The column name that corresponds to the vertical acceleration.
            gait_sequences (pd.DataFrame, optional): A dataframe of detected gait sequences. If not provided, the entire acceleration time series will be used for detecting initial contacts.
            dt_data (pd.Series, optional): Original datetime in the input data. If original datetime is provided, the output onset will be based on that.
            tracking_system (str, optional): Tracking system the data is from to be used for events df. Default is None.

        Returns:
            ParaschivIonescuInitialContactDetection: Returns an instance of the class.
                The initial contacts information is stored in the 'initial_contacts_' attribute,
                which is a pandas DataFrame in BIDS format with the following columns:
                    - onset: Initial contacts.
                    - event_type: Type of the event (default is 'Inital contact').
                    - tracking_system: Tracking systems used the events are derived from.
        """
        # Check if data is empty
        if accel_data.empty:
            self.initial_contacts_ = pd.DataFrame()
            return self  # Return without performing further processing

        # check if dt_data is a pandas Series with datetime values
        if dt_data is not None and (
            not isinstance(dt_data, pd.Series)
            or not pd.api.types.is_datetime64_any_dtype(dt_data)
        ):
            raise ValueError("dt_data must be a pandas Series with datetime values")

        # check if tracking_system is a string
        if tracking_system is not None and not isinstance(tracking_system, str):
            raise ValueError("tracking_system must be a string")

        # check if dt_data is provided and if it is a series with the same length as data
        if dt_data is not None and len(dt_data) != len(accel_data):
            raise ValueError("dt_data must be a series with the same length as data")

        # Convert acceleration data from "m/s^2" to "g"
        accel_data /= 9.81

        # Extract vertical accelerometer data using the specified index
        acc_vertical = accel_data[v_acc_col_name]

        # Initialize an empty list to store the processed output
        processed_output = []

        # Initialize an empty list to store all onsets
        all_onsets = []

        # Process each gait sequence
        if gait_sequences is None:
            gait_sequences = pd.DataFrame(
                {"onset": [0], "duration": [len(accel_data) / sampling_freq_Hz]}
            )
        for _, gait_seq in gait_sequences.iterrows():
            # Calculate start and stop indices for the current gait sequence
            start_index = int(sampling_freq_Hz * gait_seq["onset"])
            stop_index = int(
                sampling_freq_Hz * (gait_seq["onset"] + gait_seq["duration"])
            )
            accv_gait_seq = acc_vertical[start_index:stop_index].to_numpy()

            try:
                # Perform Signal Decomposition Algorithm for Initial Contacts (ICs)
                initial_contacts_rel, _ = preprocessing.signal_decomposition_algorithm(
                    accv_gait_seq, sampling_freq_Hz
                )
                initial_contacts = gait_seq["onset"] + initial_contacts_rel

                gait_seq["IC"] = initial_contacts.tolist()

                # Append onsets to the all_onsets list
                all_onsets.extend(initial_contacts)

            except Exception as e:
                print(
                    "Signal decomposition algorithm did not run successfully. Returning an empty vector of initial contacts"
                )
                print(f"Error: {e}")
                initial_contacts = []
                gait_seq["IC"] = []

            # Append the information to the processed_output list
            processed_output.append(gait_seq)

        # Check if processed_output is not empty
        if not processed_output:
            print("No initial contacts detected.")
            return pd.DataFrame()

        # Create a DataFrame from the processed_output list
        initial_contacts_ = pd.DataFrame(processed_output)

        # Create a BIDS-compatible DataFrame with all onsets
        self.initial_contacts_ = pd.DataFrame(
            {
                "onset": all_onsets,
                "event_type": "initial contact",
                "duration": 0,
                "tracking_systems": tracking_system,
            }
        )

        # If original datetime is available, update the 'onset' column
        if dt_data is not None:
            valid_indices = [
                index
                for index in self.initial_contacts_["onset"]
                if index < len(dt_data)
            ]
            invalid_indices = len(self.initial_contacts_["onset"]) - len(valid_indices)

            if invalid_indices > 0:
                print(f"Warning: {invalid_indices} invalid index/indices found.")

            # Only use valid indices to access dt_data
            valid_dt_data = dt_data.iloc[valid_indices]

            # Update the 'onset' column
            self.initial_contacts_["onset"] = valid_dt_data.reset_index(drop=True)

        return self

__init__()

Initializes the ParaschivIonescuInitialContactDetection instance.

Source code in kielmat/modules/icd/_paraschiv.py
46
47
48
49
50
51
52
def __init__(
    self,
):
    """
    Initializes the ParaschivIonescuInitialContactDetection instance.
    """
    self.initial_contacts_ = None

detect(accel_data, sampling_freq_Hz, v_acc_col_name, gait_sequences=None, dt_data=None, tracking_system=None)

Detects initial contacts based on the input accelerometer data.

Parameters:

Name Type Description Default
accel_data DataFrame

Input accelerometer data (N, 3) for x, y, and z axes.

required
sampling_freq_Hz float

Sampling frequency of the accelerometer data.

required
v_acc_col_name str

The column name that corresponds to the vertical acceleration.

required
gait_sequences DataFrame

A dataframe of detected gait sequences. If not provided, the entire acceleration time series will be used for detecting initial contacts.

None
dt_data Series

Original datetime in the input data. If original datetime is provided, the output onset will be based on that.

None
tracking_system str

Tracking system the data is from to be used for events df. Default is None.

None

Returns:

Name Type Description
ParaschivIonescuInitialContactDetection DataFrame

Returns an instance of the class. The initial contacts information is stored in the 'initial_contacts_' attribute, which is a pandas DataFrame in BIDS format with the following columns: - onset: Initial contacts. - event_type: Type of the event (default is 'Inital contact'). - tracking_system: Tracking systems used the events are derived from.

Source code in kielmat/modules/icd/_paraschiv.py
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def detect(
    self,
    accel_data: pd.DataFrame,
    sampling_freq_Hz: float,
    v_acc_col_name: str,
    gait_sequences: Optional[pd.DataFrame] = None,
    dt_data: Optional[pd.Series] = None,
    tracking_system: Optional[str] = None,
) -> pd.DataFrame:
    """
    Detects initial contacts based on the input accelerometer data.

    Args:
        accel_data (pd.DataFrame): Input accelerometer data (N, 3) for x, y, and z axes.
        sampling_freq_Hz (float): Sampling frequency of the accelerometer data.
        v_acc_col_name (str): The column name that corresponds to the vertical acceleration.
        gait_sequences (pd.DataFrame, optional): A dataframe of detected gait sequences. If not provided, the entire acceleration time series will be used for detecting initial contacts.
        dt_data (pd.Series, optional): Original datetime in the input data. If original datetime is provided, the output onset will be based on that.
        tracking_system (str, optional): Tracking system the data is from to be used for events df. Default is None.

    Returns:
        ParaschivIonescuInitialContactDetection: Returns an instance of the class.
            The initial contacts information is stored in the 'initial_contacts_' attribute,
            which is a pandas DataFrame in BIDS format with the following columns:
                - onset: Initial contacts.
                - event_type: Type of the event (default is 'Inital contact').
                - tracking_system: Tracking systems used the events are derived from.
    """
    # Check if data is empty
    if accel_data.empty:
        self.initial_contacts_ = pd.DataFrame()
        return self  # Return without performing further processing

    # check if dt_data is a pandas Series with datetime values
    if dt_data is not None and (
        not isinstance(dt_data, pd.Series)
        or not pd.api.types.is_datetime64_any_dtype(dt_data)
    ):
        raise ValueError("dt_data must be a pandas Series with datetime values")

    # check if tracking_system is a string
    if tracking_system is not None and not isinstance(tracking_system, str):
        raise ValueError("tracking_system must be a string")

    # check if dt_data is provided and if it is a series with the same length as data
    if dt_data is not None and len(dt_data) != len(accel_data):
        raise ValueError("dt_data must be a series with the same length as data")

    # Convert acceleration data from "m/s^2" to "g"
    accel_data /= 9.81

    # Extract vertical accelerometer data using the specified index
    acc_vertical = accel_data[v_acc_col_name]

    # Initialize an empty list to store the processed output
    processed_output = []

    # Initialize an empty list to store all onsets
    all_onsets = []

    # Process each gait sequence
    if gait_sequences is None:
        gait_sequences = pd.DataFrame(
            {"onset": [0], "duration": [len(accel_data) / sampling_freq_Hz]}
        )
    for _, gait_seq in gait_sequences.iterrows():
        # Calculate start and stop indices for the current gait sequence
        start_index = int(sampling_freq_Hz * gait_seq["onset"])
        stop_index = int(
            sampling_freq_Hz * (gait_seq["onset"] + gait_seq["duration"])
        )
        accv_gait_seq = acc_vertical[start_index:stop_index].to_numpy()

        try:
            # Perform Signal Decomposition Algorithm for Initial Contacts (ICs)
            initial_contacts_rel, _ = preprocessing.signal_decomposition_algorithm(
                accv_gait_seq, sampling_freq_Hz
            )
            initial_contacts = gait_seq["onset"] + initial_contacts_rel

            gait_seq["IC"] = initial_contacts.tolist()

            # Append onsets to the all_onsets list
            all_onsets.extend(initial_contacts)

        except Exception as e:
            print(
                "Signal decomposition algorithm did not run successfully. Returning an empty vector of initial contacts"
            )
            print(f"Error: {e}")
            initial_contacts = []
            gait_seq["IC"] = []

        # Append the information to the processed_output list
        processed_output.append(gait_seq)

    # Check if processed_output is not empty
    if not processed_output:
        print("No initial contacts detected.")
        return pd.DataFrame()

    # Create a DataFrame from the processed_output list
    initial_contacts_ = pd.DataFrame(processed_output)

    # Create a BIDS-compatible DataFrame with all onsets
    self.initial_contacts_ = pd.DataFrame(
        {
            "onset": all_onsets,
            "event_type": "initial contact",
            "duration": 0,
            "tracking_systems": tracking_system,
        }
    )

    # If original datetime is available, update the 'onset' column
    if dt_data is not None:
        valid_indices = [
            index
            for index in self.initial_contacts_["onset"]
            if index < len(dt_data)
        ]
        invalid_indices = len(self.initial_contacts_["onset"]) - len(valid_indices)

        if invalid_indices > 0:
            print(f"Warning: {invalid_indices} invalid index/indices found.")

        # Only use valid indices to access dt_data
        valid_dt_data = dt_data.iloc[valid_indices]

        # Update the 'onset' column
        self.initial_contacts_["onset"] = valid_dt_data.reset_index(drop=True)

    return self