Skip to content

Dataclass

In the following the KielMAT dataclass is described. The dataclass is used to store motion data in a standardized way. We provide some small set of import functions, each of which returns a pandas.DataFrame or a dict. User should easily be able to write their own import functions, to get the their data into the provided dataclass (this step might take some thinking). After the data is in the dataclass, running functions on the data from our toolbox should be really straight forward.

KielMAT data class

classDiagram
   class KielMATRecording {
      data: dict[str, pd.DataFrame]
      channels: dict[str, pd.DataFrame]
      info: None | dict[str, Any] = None
      events: None | dict[str, pd.DataFrame] = None
      events_info: None | dict[str, Any] = None
      add_events(tracking_system, new_events)
      add_info(key, value)
      export_events(file_path, tracking_system=None, file_name=None, bids_compatible_fname=False)
   }

A recording consists of the motion data from one or more tracking systems, where each tracking system may consist motion data from one or more tracked points. Therefore, the motion data (KielMATRecording.data) are organized as a dictionary where the dictionary keys refer to the tracking systems, and the corresponding values the actual (raw) data as a pandas.DataFrame. The description of data channels (KielMATRecording.channels) is availabe as a dictionary with the same keys, and the values contain the channels description.

KielMATRecording dataclass

Dataclass to hold any data and associated infos for a KielMAT recording.

Attributes:

Name Type Description
data dict

The data is stored as a pandas DataFrame for each unique tracking system.

channels dict

The channels descriptions are stored as a pandas DataFrame for each unique tracking system.

info dict

The infos on the subject, task, and more, are stored as a nested dictionary.

events dict

The events are stored as a pandas DataFrame for each unique tracking system.

events_info dict

The event infos are stored as a nested dictionary.

Source code in kielmat/utils/kielmat_dataclass.py
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
@dataclass(kw_only=True)
class KielMATRecording:
    """Dataclass to hold any data and associated infos for a KielMAT recording.

    Attributes:
        data (dict): The data is stored as a pandas DataFrame for each unique tracking system.
        channels (dict): The channels descriptions are stored as a pandas DataFrame for each unique tracking system.
        info (dict): The infos on the subject, task, and more, are stored as a nested dictionary.
        events (dict): The events are stored as a pandas DataFrame for each unique tracking system.
        events_info (dict): The event infos are stored as a nested dictionary.
    """

    data: dict[str, pd.DataFrame]
    channels: dict[str, pd.DataFrame]
    info: None | dict[str, Any] = None
    events: None | dict[str, pd.DataFrame] = None
    events_info: None | dict[str, Any] = None

    def __post_init__(self):
        # Validate channels when an instance is created
        self.validate_channels()

    def validate_channels(self):
        """
        Validates the channel dataframes for each system.

        This function checks if the channel dataframes have the required columns in the correct order,
        and if the data types of the columns are valid. It also performs additional value checks for
        optional columns.

        Raises:
            ValueError: If the channel dataframe does not have the required columns in the correct order,
                or if the 'component' column contains invalid values, or if the 'type' column is not
                uppercase strings, or if the 'status' column contains invalid values.
            TypeError: If the 'name' column is not of type string.

        Returns:
            str: A message indicating that all channel dataframes are valid.
        """
        for system_name, df in self.channels.items():
            # Check required columns and their order
            if not df.columns.tolist()[:6] == REQUIRED_COLUMNS:
                raise ValueError(
                    f"Channels dataframe for '{system_name}' does not have the required columns in correct order. The correct order is: {REQUIRED_COLUMNS}."
                )

            # Check data types
            if not all(isinstance(name, str) for name in df["name"]):
                raise TypeError(
                    f"Column 'name' in '{system_name}' must be of type string."
                )
            invalid_components = set(
                [
                    item
                    for item in df["component"]
                    if item not in VALID_COMPONENT_TYPES and not pd.isna(item)
                ]
            )
            if invalid_components:
                raise ValueError(
                    f"Column 'component' in '{system_name}' contains invalid values: {invalid_components}."
                )
            if not all(isinstance(typ, str) and typ.isupper() for typ in df["type"]):
                raise ValueError(
                    f"Column 'type' in '{system_name}' must be uppercase strings."
                )

            # Additional value checks for optional columns
            if "status" in df.columns and not all(
                s in VALID_CHANNEL_STATUS_VALUES for s in df["status"] if s != "n/a"
            ):
                raise ValueError(
                    f"Column 'status' in '{system_name}' contains invalid values."
                )

        return "All channel dataframes are valid."

    def add_events(self, tracking_system: str, new_events: pd.DataFrame) -> None:
        """Add events to the recording for a specific tracking system.

        Args:
            tracking_system (str): Tracking system for which events are to be added.
            new_events (pd.DataFrame): Events to be added in BIDS format.
        """
        if self.events is None:
            self.events = {}

        if tracking_system not in self.events:
            self.events[tracking_system] = new_events
        else:
            existing_events = self.events[tracking_system]
            self.events[tracking_system] = pd.concat(
                [existing_events, new_events], ignore_index=True
            )

    def add_info(self, key: str, value: Any) -> None:
        """Add information to the info dictionary. Valid keys are : 'Subject', 'Session', 'Task'.

        Args:
            key (str): The key for the information.
            value (Any): The value of the information.

        Raises:
            ValueError: If the provided 'key' is not one of the valid info keys.

        Examples:
            >>> recording.add_info("Subject", "01")
        """
        if self.info is None:
            self.info = {}

        # Check if the key belongs to a list of keywords
        if key not in VALID_INFO_KEYS:
            print(
                f"Warning: Invalid info key '{key}'. Valid info keys are: {VALID_INFO_KEYS}"
            )

        # add the key-value pair to the info dictionary
        self.info[key] = value

        # Check if the value are lower case, if not, convert to lower case and give warning
        if isinstance(value, str):
            self.info[key] = value.lower()
            print(
                f"Warning: The value of the key '{key}' should be lower case. Converted to lower case."
            )

        # check if value contains underscore or space, if yes, remove and give warning
        if "_" in value or " " in value:
            self.info[key] = value.replace("_", "").replace(" ", "")
            print(
                f"Warning: The value of the key '{key}' should not contain underscore or space. Removed underscore and space."
            )

    def export_events(
        self,
        file_path: str,
        tracking_system: Optional[str] = None,
        file_name: Optional[str] = None,
        bids_compatible_fname: Optional[bool] = False,
    ) -> None:
        """Export events for a specific tracking system to a file.

        Args:
            tracking_system (Optional[str]): Tracking system for which events are to be exported.
                If None, events from all tracking systems will be exported (default is None).
            file_path (str): Path to the directory where the file should be saved.
            file_name (Optional[str]): Name of the file to be exported. If None, a default name will be used.
            bids_compatible_fname (bool): Flag indicating whether the exported filename should be BIDS compatible (default is False).
        """
        if self.events is not None:
            if tracking_system is None:
                all_events = pd.concat(
                    self.events.values(),
                    keys=self.events.keys(),
                    names=["tracking_system"],
                )
                if file_name is None:
                    file_name = "all_events.csv"
                if bids_compatible_fname:
                    # Construct the filename using subject ID and task name
                    subject_id = self.info.get("Subject", "")
                    task_name = self.info.get("Task", "")
                    # check if subject_id and task_name are present in the info dictionary
                    if subject_id == None or task_name == None:
                        raise ValueError(
                            "Subject ID and Task Name should be specified in the info dictionary."
                        )
                    file_name = f"sub-{subject_id}_task-{task_name}_events.csv"
                    # check if session is present in the info dictionary
                    session = self.info.get("Session")
                    if session != None:
                        file_name = f"sub-{subject_id}_ses-{session}_task-{task_name}_events.csv"
                    file_path = Path(file_path).joinpath(file_name)
                    all_events.to_csv(file_path, sep="\t", index=False)
                else:
                    file_path = Path(file_path).joinpath(file_name)
                    all_events.to_csv(file_path, index=False)
            elif tracking_system in self.events:
                if file_name is None:
                    file_name = f"{tracking_system}_events.csv"
                if bids_compatible_fname:
                    file_name = file_name.replace(".csv", "_events.tsv")
                    file_path = Path(file_path).joinpath(file_name)
                    self.events[tracking_system].to_csv(
                        file_path, sep="\t", index=False
                    )
                else:
                    file_path = Path(file_path).joinpath(file_name)
                    self.events[tracking_system].to_csv(file_path, index=False)

            # check if file_path is BIDS compatible
            if bids_compatible_fname:
                # validate the file_path
                validator = BIDSValidator()
                errors = validator.is_bids(file_path)
                if errors:
                    raise ValueError(f"File path '{file_path}' is not BIDS compatible.")

add_events(tracking_system, new_events)

Add events to the recording for a specific tracking system.

Parameters:

Name Type Description Default
tracking_system str

Tracking system for which events are to be added.

required
new_events DataFrame

Events to be added in BIDS format.

required
Source code in kielmat/utils/kielmat_dataclass.py
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
def add_events(self, tracking_system: str, new_events: pd.DataFrame) -> None:
    """Add events to the recording for a specific tracking system.

    Args:
        tracking_system (str): Tracking system for which events are to be added.
        new_events (pd.DataFrame): Events to be added in BIDS format.
    """
    if self.events is None:
        self.events = {}

    if tracking_system not in self.events:
        self.events[tracking_system] = new_events
    else:
        existing_events = self.events[tracking_system]
        self.events[tracking_system] = pd.concat(
            [existing_events, new_events], ignore_index=True
        )

add_info(key, value)

Add information to the info dictionary. Valid keys are : 'Subject', 'Session', 'Task'.

Parameters:

Name Type Description Default
key str

The key for the information.

required
value Any

The value of the information.

required

Raises:

Type Description
ValueError

If the provided 'key' is not one of the valid info keys.

Examples:

>>> recording.add_info("Subject", "01")
Source code in kielmat/utils/kielmat_dataclass.py
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def add_info(self, key: str, value: Any) -> None:
    """Add information to the info dictionary. Valid keys are : 'Subject', 'Session', 'Task'.

    Args:
        key (str): The key for the information.
        value (Any): The value of the information.

    Raises:
        ValueError: If the provided 'key' is not one of the valid info keys.

    Examples:
        >>> recording.add_info("Subject", "01")
    """
    if self.info is None:
        self.info = {}

    # Check if the key belongs to a list of keywords
    if key not in VALID_INFO_KEYS:
        print(
            f"Warning: Invalid info key '{key}'. Valid info keys are: {VALID_INFO_KEYS}"
        )

    # add the key-value pair to the info dictionary
    self.info[key] = value

    # Check if the value are lower case, if not, convert to lower case and give warning
    if isinstance(value, str):
        self.info[key] = value.lower()
        print(
            f"Warning: The value of the key '{key}' should be lower case. Converted to lower case."
        )

    # check if value contains underscore or space, if yes, remove and give warning
    if "_" in value or " " in value:
        self.info[key] = value.replace("_", "").replace(" ", "")
        print(
            f"Warning: The value of the key '{key}' should not contain underscore or space. Removed underscore and space."
        )

export_events(file_path, tracking_system=None, file_name=None, bids_compatible_fname=False)

Export events for a specific tracking system to a file.

Parameters:

Name Type Description Default
tracking_system Optional[str]

Tracking system for which events are to be exported. If None, events from all tracking systems will be exported (default is None).

None
file_path str

Path to the directory where the file should be saved.

required
file_name Optional[str]

Name of the file to be exported. If None, a default name will be used.

None
bids_compatible_fname bool

Flag indicating whether the exported filename should be BIDS compatible (default is False).

False
Source code in kielmat/utils/kielmat_dataclass.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
def export_events(
    self,
    file_path: str,
    tracking_system: Optional[str] = None,
    file_name: Optional[str] = None,
    bids_compatible_fname: Optional[bool] = False,
) -> None:
    """Export events for a specific tracking system to a file.

    Args:
        tracking_system (Optional[str]): Tracking system for which events are to be exported.
            If None, events from all tracking systems will be exported (default is None).
        file_path (str): Path to the directory where the file should be saved.
        file_name (Optional[str]): Name of the file to be exported. If None, a default name will be used.
        bids_compatible_fname (bool): Flag indicating whether the exported filename should be BIDS compatible (default is False).
    """
    if self.events is not None:
        if tracking_system is None:
            all_events = pd.concat(
                self.events.values(),
                keys=self.events.keys(),
                names=["tracking_system"],
            )
            if file_name is None:
                file_name = "all_events.csv"
            if bids_compatible_fname:
                # Construct the filename using subject ID and task name
                subject_id = self.info.get("Subject", "")
                task_name = self.info.get("Task", "")
                # check if subject_id and task_name are present in the info dictionary
                if subject_id == None or task_name == None:
                    raise ValueError(
                        "Subject ID and Task Name should be specified in the info dictionary."
                    )
                file_name = f"sub-{subject_id}_task-{task_name}_events.csv"
                # check if session is present in the info dictionary
                session = self.info.get("Session")
                if session != None:
                    file_name = f"sub-{subject_id}_ses-{session}_task-{task_name}_events.csv"
                file_path = Path(file_path).joinpath(file_name)
                all_events.to_csv(file_path, sep="\t", index=False)
            else:
                file_path = Path(file_path).joinpath(file_name)
                all_events.to_csv(file_path, index=False)
        elif tracking_system in self.events:
            if file_name is None:
                file_name = f"{tracking_system}_events.csv"
            if bids_compatible_fname:
                file_name = file_name.replace(".csv", "_events.tsv")
                file_path = Path(file_path).joinpath(file_name)
                self.events[tracking_system].to_csv(
                    file_path, sep="\t", index=False
                )
            else:
                file_path = Path(file_path).joinpath(file_name)
                self.events[tracking_system].to_csv(file_path, index=False)

        # check if file_path is BIDS compatible
        if bids_compatible_fname:
            # validate the file_path
            validator = BIDSValidator()
            errors = validator.is_bids(file_path)
            if errors:
                raise ValueError(f"File path '{file_path}' is not BIDS compatible.")

validate_channels()

Validates the channel dataframes for each system.

This function checks if the channel dataframes have the required columns in the correct order, and if the data types of the columns are valid. It also performs additional value checks for optional columns.

Raises:

Type Description
ValueError

If the channel dataframe does not have the required columns in the correct order, or if the 'component' column contains invalid values, or if the 'type' column is not uppercase strings, or if the 'status' column contains invalid values.

TypeError

If the 'name' column is not of type string.

Returns:

Name Type Description
str

A message indicating that all channel dataframes are valid.

Source code in kielmat/utils/kielmat_dataclass.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
def validate_channels(self):
    """
    Validates the channel dataframes for each system.

    This function checks if the channel dataframes have the required columns in the correct order,
    and if the data types of the columns are valid. It also performs additional value checks for
    optional columns.

    Raises:
        ValueError: If the channel dataframe does not have the required columns in the correct order,
            or if the 'component' column contains invalid values, or if the 'type' column is not
            uppercase strings, or if the 'status' column contains invalid values.
        TypeError: If the 'name' column is not of type string.

    Returns:
        str: A message indicating that all channel dataframes are valid.
    """
    for system_name, df in self.channels.items():
        # Check required columns and their order
        if not df.columns.tolist()[:6] == REQUIRED_COLUMNS:
            raise ValueError(
                f"Channels dataframe for '{system_name}' does not have the required columns in correct order. The correct order is: {REQUIRED_COLUMNS}."
            )

        # Check data types
        if not all(isinstance(name, str) for name in df["name"]):
            raise TypeError(
                f"Column 'name' in '{system_name}' must be of type string."
            )
        invalid_components = set(
            [
                item
                for item in df["component"]
                if item not in VALID_COMPONENT_TYPES and not pd.isna(item)
            ]
        )
        if invalid_components:
            raise ValueError(
                f"Column 'component' in '{system_name}' contains invalid values: {invalid_components}."
            )
        if not all(isinstance(typ, str) and typ.isupper() for typ in df["type"]):
            raise ValueError(
                f"Column 'type' in '{system_name}' must be uppercase strings."
            )

        # Additional value checks for optional columns
        if "status" in df.columns and not all(
            s in VALID_CHANNEL_STATUS_VALUES for s in df["status"] if s != "n/a"
        ):
            raise ValueError(
                f"Column 'status' in '{system_name}' contains invalid values."
            )

    return "All channel dataframes are valid."