Skip to content

API

collimator.dashboard.project

Project

Represents a project in the Collimator dashboard.

Source code in collimator/dashboard/project.py
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
class Project:
    """
    Represents a project in the Collimator dashboard.
    """

    def __init__(
        self,
        summary: ProjectSummary,
        models: dict[str, model_json.Model],
        files: list[str],
        submodels: dict[str, str],
        init_scripts: dict[str, InitScriptVariables] = None,
    ):
        """
        Initialize a new Project instance.

        Args:
            summary (ProjectSummary): The summary of the project.
            models (dict[str, model_json.Model]): A dictionary of models associated with the project.
                The keys are the model names and the values are the json representations of the models.
            files (list[str]): A list of file paths associated with the project.
            submodels (dict[str, str]): A dictionary of submodels associated with the project.
                The keys are the submodel names and the values are the reference IDs of the submodels.
            init_scripts (dict[str, InitScriptVariables], optional): A dictionary of initialization scripts
                associated with the project. Defaults to None.

        """
        self._models = models
        self._submodels = submodels
        self._summary = summary
        self._files = files
        self._init_scripts = init_scripts or {}

    def get_model(self, name: str) -> SimulationContext:
        """
        Retrieves a model from the project.

        Args:
            name (str): The name of the model to retrieve.

        Returns:
            SimulationContext: The simulation context for the retrieved model.

        Raises:
            KeyError: If the specified model name is not found.
        """
        if name not in self._models:
            raise KeyError(f"Model '{name}' not found")

        model = self._models.get(name)
        init_script = None
        if model.configuration.workspace.init_scripts:
            init_script = model.configuration.workspace.init_scripts[0].file_name
        _globals = {}
        if init_script in self._init_scripts:
            _globals = self._init_scripts[init_script].as_dict()

        if model.diagram.nodes:
            logger.info('Loading model "%s"', model.name)
            try:
                return from_model_json.loads_model(model.to_json(), namespace=_globals)
            except BaseException as e:
                logger.error(
                    "Failed to load model %s: %s", model.name, e, exc_info=True
                )

    def create_submodel_instance(
        self,
        name: str,
        instance_name: str,
        instance_parameters: dict[str, Any] = None,
        **kwargs,
    ) -> Diagram:
        """
        Creates an instance of a submodel.

        Args:
            name (str): The name of the submodel.
            instance_name (str): The name of the instance.
            instance_parameters (dict[str, Any], optional): Parameters for the instance. Defaults to None.
            **kwargs: Additional keyword arguments.

        Returns:
            Diagram: The created submodel instance.

        Raises:
            KeyError: If the specified submodel is not found.
            ValueError: If the specified submodel is found but has a None reference ID.
        """
        if name not in self._submodels:
            raise KeyError(
                f"Submodel '{name}' not found. Available submodels: {list(self._submodels.keys())}"
            )
        ref_id = self._submodels.get(name)
        if ref_id is None:
            raise ValueError(f"Submodel '{name}' not found")
        return ReferenceSubdiagram.create_diagram(
            ref_id,
            instance_name,
            instance_parameters=instance_parameters,
            **kwargs,
        )

    def _save_single_submodel(
        self,
        model: model_json.Model,
        submodel_uuid: str = None,
    ):
        project_uuid = self.summary.uuid
        submodel = None
        if submodel_uuid:
            submodel = model_api.get_reference_submodel(project_uuid, submodel_uuid)
        if submodel is None:
            for submodel_name, ref_id in self._submodels.items():
                if ref_id == submodel_uuid:
                    submodel = model_api.get_reference_submodel_by_name(
                        project_uuid, submodel_name
                    )
                    break

        if submodel is None:
            logger.info("Creating submodel '%s' (%s)", model.name, submodel_uuid)
            response = model_api.create_reference_submodel(
                project_uuid,
                model,
                model.parameter_definitions,
                model_uuid=submodel_uuid,
            )
            submodel_uuid = response["uuid"]
            edit_id = response["edit_id"]
        else:
            edit_id = submodel.edit_id
            submodel_uuid = submodel.uuid
            uiprops.copy(submodel, model)
            model.name = submodel.name
            logger.info("Updating submodel '%s' (%s)", model.name, submodel_uuid)
        model_api.update_reference_submodel(
            project_uuid,
            submodel_uuid,
            model,
            edit_id,
            parameter_definitions=model.parameter_definitions,
        )
        return submodel_uuid

    def _save_submodel(self, diagram: Diagram):
        """
        Saves a submodel into the project.

        Args:
            project_uuid (str): The UUID of the project.
            diagram (Diagram): The diagram object representing the submodel.

        Returns:
            None
        """
        model, ref_submodels = to_model_json.convert(diagram)

        for ref_id, ref_submodel in ref_submodels.items():
            self._save_single_submodel(ref_submodel, submodel_uuid=ref_id)

        self._save_single_submodel(model, submodel_uuid=diagram.ref_id)

    def _save_model(
        self,
        diagram: Diagram,
        configuration: model_json.Configuration = None,
    ) -> str:
        """
        Updates or creates a model in the dashboard based on the provided diagram.

        Args:
            project_uuid (str): The UUID of the project to which the model belongs.
            diagram (Diagram): The diagram object representing the model.
            configuration (model_json.Configuration, optional): The configuration object for the model. Defaults to None.

        Returns:
            str: The UUID of the updated or created model.

        Raises:
            None
        """
        project_uuid = self.summary.uuid
        model_json, ref_submodels = to_model_json.convert(
            diagram,
            configuration=configuration,
        )

        for ref_id, ref_submodel in ref_submodels.items():
            self._save_single_submodel(ref_submodel, submodel_uuid=ref_id)

        model_uuid = diagram.ui_id

        model = None
        if model_uuid:
            model = model_api.get_model(model_uuid)

        if model is None:
            model = _get_model_by_name(project_uuid, diagram.name)
            model_uuid = model.uuid if model else None

        if model is None:
            response = model_api.create_model(project_uuid, model_json)
            model_uuid = response["uuid"]
            logger.info("Creating model '%s' (%s)", model_json.name, model_uuid)
        else:
            logger.info("Updating model '%s' (%s)", model_json.name, model_uuid)
            model_json.version = model.version
            uiprops.copy(model, model_json)

        model_api.update_model(model_uuid, model_json)

        return model_uuid

    def save_model(
        self, diagram: Diagram, configuration: model_json.Configuration = None
    ) -> str:
        """
        Save the given diagram as a model. If the diagram already exists, it will be updated.

        Args:
            diagram (Diagram): The diagram to be saved as a model.
            configuration (model_json.Configuration, optional): The configuration for the model. Defaults to None.

        Returns:
            The UUID of the saved model.
        """
        models = {m.name: m for m in self.summary.models}
        if diagram.name in models:
            diagram.ui_id = models[diagram.name].uuid
        return self._save_model(diagram, configuration=configuration)

    def save_submodel(
        self,
        constructor: Callable,
        name: str,
        default_parameters: list[Parameter] = None,
    ) -> str:
        """
        Saves a submodel with the given reference ID and name.

        Args:
            constructor (Callable): The constructor function for the submodel.
            name (str): The name of the submodel.
            default_parameters (list[Parameter], optional): A list of default parameters for the submodel. Defaults to None.

        Returns:
            str: The reference ID of the saved submodel.
        """
        submodel = model_api.get_reference_submodel_by_name(self.summary.uuid, name)
        ref_id = submodel.uuid if submodel else None
        ref_id = ReferenceSubdiagram.register(
            constructor, default_parameters, ref_id=ref_id
        )

        submodel = ReferenceSubdiagram.create_diagram(ref_id, name)
        self._save_submodel(submodel)
        self._submodels[submodel.name] = ref_id
        return ref_id

    def upload_file(self, name: str, file: str, overwrite=True):
        """
        Uploads a file to a project.

        Args:
            name (str): The name of the file.
            file (str): The path to the file to be uploaded.
            overwrite (bool, optional): Flag indicating whether to overwrite an existing file with the same name.
                Defaults to True.

        Returns:
            dict: A dictionary containing the summary of the uploaded file.

        Raises:
            api.CollimatorApiError: If the file upload fails.
        """

        project_uuid = self.summary.uuid

        mime_type, _ = mimetypes.guess_type(name)
        mime_type = mime_type or "application/octet-stream"

        with open(file, "rb") as fp:
            size = os.fstat(fp.fileno()).st_size

            logger.info(
                "Uploading file %s (type: %s, size: %d) to project %s...",
                name,
                mime_type,
                size,
                project_uuid,
            )
            body = {
                "name": name,
                "content_type": mime_type,
                "overwrite": overwrite,
                "size": size,
            }

            try:
                put_url_response = api.post(
                    f"/projects/{project_uuid}/files", body=body
                )
            except api.CollimatorApiError as e:
                logger.error("Failed to upload file %s: %s", name, e)
                raise

            s3_presigned_url = put_url_response["put_presigned_url"]
            s3_response = requests.put(
                s3_presigned_url,
                headers={"Content-Type": mime_type, "Content-Length": str(size)},
                data=fp if size > 0 else b"",
                verify=False,
            )

        if s3_response.status_code != 200:
            logger.error("s3 upload failed: %s", s3_response.text)
            raise api.CollimatorApiError(
                f"Failed to upload file {name} to project {project_uuid}"
            )
        file_uuid = put_url_response["summary"]["uuid"]
        process_response = api.call(
            f"/projects/{project_uuid}/files/{file_uuid}/process", "POST"
        )
        logger.info("Finished uploading file %s", name)

        return process_response["summary"]

    def upload_files(self, files: list[str], overwrite=True):
        """
        Uploads multiple files to a project.

        Args:
            files (list[str]): A list of file paths to be uploaded.
            overwrite (bool, optional): Flag indicating whether to overwrite existing files.
                Defaults to True.
        """
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = []
            for file in files:
                name = os.path.basename(file)
                futures.append(executor.submit(self.upload_file, name, file, overwrite))
            for future in concurrent.futures.as_completed(futures):
                future.result()

    def upload_directory(
        self, directory: str, overwrite=False, sync=False, target_dir=""
    ):
        """
        Uploads a directory of files to a project, keeping the directory structure.

        This function will ignore files specified in the .gitignore or .collimatorignore file in the directory.

        Args:
            directory (str): The path to the directory containing the files to be uploaded.
            overwrite (bool, optional): Flag indicating whether to overwrite existing files. Defaults to False.
            sync (bool, optional): Flag indicating whether to synchronize the directory with the project. Defaults to False.
                This will delete files in the project that are not present in the directory.
            target_dir (str, optional): The target directory in the project. Defaults to "".
        """
        if not os.path.isdir(directory):
            raise ValueError(f"Directory '{directory}' does not exist")

        ignored_files = utils.get_ignored_files(directory)

        if sync:
            root = os.path.dirname(directory)
            for file in self.summary.files:
                if file.name.startswith(target_dir):
                    file_path = os.path.join(root, file.name[len(target_dir) :])
                else:
                    file_path = os.path.join(root, file.name)
                rel_path = os.path.relpath(file_path, directory)
                if not os.path.exists(file_path) or rel_path in ignored_files:
                    logger.info("Deleting file %s from project", file.name)
                    api.call(
                        f"/projects/{self.summary.uuid}/files/{file.uuid}", "DELETE"
                    )

        dirname = os.path.basename(directory)
        futures = []
        with concurrent.futures.ThreadPoolExecutor() as executor:
            for root, dirnames, files in os.walk(directory):
                for file in files:
                    file_path = os.path.join(root, file)
                    if ignored_files:
                        rel_path = os.path.relpath(file_path, directory)
                        if rel_path in ignored_files:
                            continue
                    name = os.path.relpath(file_path, directory)
                    name = os.path.join(target_dir, dirname, name)
                    futures.append(
                        executor.submit(
                            self.upload_file,
                            name,
                            file_path,
                            overwrite=overwrite or sync,
                        )
                    )

            for future in concurrent.futures.as_completed(futures):
                future.result()

    @property
    def uuid(self) -> str:
        return self._summary.uuid

    @property
    def summary(self) -> ProjectSummary:
        return self._summary

    @property
    def init_scripts(self) -> dict[str, InitScriptVariables]:
        return self._init_scripts

__init__(summary, models, files, submodels, init_scripts=None)

Initialize a new Project instance.

Parameters:

Name Type Description Default
summary ProjectSummary

The summary of the project.

required
models dict[str, Model]

A dictionary of models associated with the project. The keys are the model names and the values are the json representations of the models.

required
files list[str]

A list of file paths associated with the project.

required
submodels dict[str, str]

A dictionary of submodels associated with the project. The keys are the submodel names and the values are the reference IDs of the submodels.

required
init_scripts dict[str, InitScriptVariables]

A dictionary of initialization scripts associated with the project. Defaults to None.

None
Source code in collimator/dashboard/project.py
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(
    self,
    summary: ProjectSummary,
    models: dict[str, model_json.Model],
    files: list[str],
    submodels: dict[str, str],
    init_scripts: dict[str, InitScriptVariables] = None,
):
    """
    Initialize a new Project instance.

    Args:
        summary (ProjectSummary): The summary of the project.
        models (dict[str, model_json.Model]): A dictionary of models associated with the project.
            The keys are the model names and the values are the json representations of the models.
        files (list[str]): A list of file paths associated with the project.
        submodels (dict[str, str]): A dictionary of submodels associated with the project.
            The keys are the submodel names and the values are the reference IDs of the submodels.
        init_scripts (dict[str, InitScriptVariables], optional): A dictionary of initialization scripts
            associated with the project. Defaults to None.

    """
    self._models = models
    self._submodels = submodels
    self._summary = summary
    self._files = files
    self._init_scripts = init_scripts or {}

create_submodel_instance(name, instance_name, instance_parameters=None, **kwargs)

Creates an instance of a submodel.

Parameters:

Name Type Description Default
name str

The name of the submodel.

required
instance_name str

The name of the instance.

required
instance_parameters dict[str, Any]

Parameters for the instance. Defaults to None.

None
**kwargs

Additional keyword arguments.

{}

Returns:

Name Type Description
Diagram Diagram

The created submodel instance.

Raises:

Type Description
KeyError

If the specified submodel is not found.

ValueError

If the specified submodel is found but has a None reference ID.

Source code in collimator/dashboard/project.py
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
def create_submodel_instance(
    self,
    name: str,
    instance_name: str,
    instance_parameters: dict[str, Any] = None,
    **kwargs,
) -> Diagram:
    """
    Creates an instance of a submodel.

    Args:
        name (str): The name of the submodel.
        instance_name (str): The name of the instance.
        instance_parameters (dict[str, Any], optional): Parameters for the instance. Defaults to None.
        **kwargs: Additional keyword arguments.

    Returns:
        Diagram: The created submodel instance.

    Raises:
        KeyError: If the specified submodel is not found.
        ValueError: If the specified submodel is found but has a None reference ID.
    """
    if name not in self._submodels:
        raise KeyError(
            f"Submodel '{name}' not found. Available submodels: {list(self._submodels.keys())}"
        )
    ref_id = self._submodels.get(name)
    if ref_id is None:
        raise ValueError(f"Submodel '{name}' not found")
    return ReferenceSubdiagram.create_diagram(
        ref_id,
        instance_name,
        instance_parameters=instance_parameters,
        **kwargs,
    )

get_model(name)

Retrieves a model from the project.

Parameters:

Name Type Description Default
name str

The name of the model to retrieve.

required

Returns:

Name Type Description
SimulationContext SimulationContext

The simulation context for the retrieved model.

Raises:

Type Description
KeyError

If the specified model name is not found.

Source code in collimator/dashboard/project.py
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
def get_model(self, name: str) -> SimulationContext:
    """
    Retrieves a model from the project.

    Args:
        name (str): The name of the model to retrieve.

    Returns:
        SimulationContext: The simulation context for the retrieved model.

    Raises:
        KeyError: If the specified model name is not found.
    """
    if name not in self._models:
        raise KeyError(f"Model '{name}' not found")

    model = self._models.get(name)
    init_script = None
    if model.configuration.workspace.init_scripts:
        init_script = model.configuration.workspace.init_scripts[0].file_name
    _globals = {}
    if init_script in self._init_scripts:
        _globals = self._init_scripts[init_script].as_dict()

    if model.diagram.nodes:
        logger.info('Loading model "%s"', model.name)
        try:
            return from_model_json.loads_model(model.to_json(), namespace=_globals)
        except BaseException as e:
            logger.error(
                "Failed to load model %s: %s", model.name, e, exc_info=True
            )

save_model(diagram, configuration=None)

Save the given diagram as a model. If the diagram already exists, it will be updated.

Parameters:

Name Type Description Default
diagram Diagram

The diagram to be saved as a model.

required
configuration Configuration

The configuration for the model. Defaults to None.

None

Returns:

Type Description
str

The UUID of the saved model.

Source code in collimator/dashboard/project.py
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
def save_model(
    self, diagram: Diagram, configuration: model_json.Configuration = None
) -> str:
    """
    Save the given diagram as a model. If the diagram already exists, it will be updated.

    Args:
        diagram (Diagram): The diagram to be saved as a model.
        configuration (model_json.Configuration, optional): The configuration for the model. Defaults to None.

    Returns:
        The UUID of the saved model.
    """
    models = {m.name: m for m in self.summary.models}
    if diagram.name in models:
        diagram.ui_id = models[diagram.name].uuid
    return self._save_model(diagram, configuration=configuration)

save_submodel(constructor, name, default_parameters=None)

Saves a submodel with the given reference ID and name.

Parameters:

Name Type Description Default
constructor Callable

The constructor function for the submodel.

required
name str

The name of the submodel.

required
default_parameters list[Parameter]

A list of default parameters for the submodel. Defaults to None.

None

Returns:

Name Type Description
str str

The reference ID of the saved submodel.

Source code in collimator/dashboard/project.py
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
def save_submodel(
    self,
    constructor: Callable,
    name: str,
    default_parameters: list[Parameter] = None,
) -> str:
    """
    Saves a submodel with the given reference ID and name.

    Args:
        constructor (Callable): The constructor function for the submodel.
        name (str): The name of the submodel.
        default_parameters (list[Parameter], optional): A list of default parameters for the submodel. Defaults to None.

    Returns:
        str: The reference ID of the saved submodel.
    """
    submodel = model_api.get_reference_submodel_by_name(self.summary.uuid, name)
    ref_id = submodel.uuid if submodel else None
    ref_id = ReferenceSubdiagram.register(
        constructor, default_parameters, ref_id=ref_id
    )

    submodel = ReferenceSubdiagram.create_diagram(ref_id, name)
    self._save_submodel(submodel)
    self._submodels[submodel.name] = ref_id
    return ref_id

upload_directory(directory, overwrite=False, sync=False, target_dir='')

Uploads a directory of files to a project, keeping the directory structure.

This function will ignore files specified in the .gitignore or .collimatorignore file in the directory.

Parameters:

Name Type Description Default
directory str

The path to the directory containing the files to be uploaded.

required
overwrite bool

Flag indicating whether to overwrite existing files. Defaults to False.

False
sync bool

Flag indicating whether to synchronize the directory with the project. Defaults to False. This will delete files in the project that are not present in the directory.

False
target_dir str

The target directory in the project. Defaults to "".

''
Source code in collimator/dashboard/project.py
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
def upload_directory(
    self, directory: str, overwrite=False, sync=False, target_dir=""
):
    """
    Uploads a directory of files to a project, keeping the directory structure.

    This function will ignore files specified in the .gitignore or .collimatorignore file in the directory.

    Args:
        directory (str): The path to the directory containing the files to be uploaded.
        overwrite (bool, optional): Flag indicating whether to overwrite existing files. Defaults to False.
        sync (bool, optional): Flag indicating whether to synchronize the directory with the project. Defaults to False.
            This will delete files in the project that are not present in the directory.
        target_dir (str, optional): The target directory in the project. Defaults to "".
    """
    if not os.path.isdir(directory):
        raise ValueError(f"Directory '{directory}' does not exist")

    ignored_files = utils.get_ignored_files(directory)

    if sync:
        root = os.path.dirname(directory)
        for file in self.summary.files:
            if file.name.startswith(target_dir):
                file_path = os.path.join(root, file.name[len(target_dir) :])
            else:
                file_path = os.path.join(root, file.name)
            rel_path = os.path.relpath(file_path, directory)
            if not os.path.exists(file_path) or rel_path in ignored_files:
                logger.info("Deleting file %s from project", file.name)
                api.call(
                    f"/projects/{self.summary.uuid}/files/{file.uuid}", "DELETE"
                )

    dirname = os.path.basename(directory)
    futures = []
    with concurrent.futures.ThreadPoolExecutor() as executor:
        for root, dirnames, files in os.walk(directory):
            for file in files:
                file_path = os.path.join(root, file)
                if ignored_files:
                    rel_path = os.path.relpath(file_path, directory)
                    if rel_path in ignored_files:
                        continue
                name = os.path.relpath(file_path, directory)
                name = os.path.join(target_dir, dirname, name)
                futures.append(
                    executor.submit(
                        self.upload_file,
                        name,
                        file_path,
                        overwrite=overwrite or sync,
                    )
                )

        for future in concurrent.futures.as_completed(futures):
            future.result()

upload_file(name, file, overwrite=True)

Uploads a file to a project.

Parameters:

Name Type Description Default
name str

The name of the file.

required
file str

The path to the file to be uploaded.

required
overwrite bool

Flag indicating whether to overwrite an existing file with the same name. Defaults to True.

True

Returns:

Name Type Description
dict

A dictionary containing the summary of the uploaded file.

Raises:

Type Description
CollimatorApiError

If the file upload fails.

Source code in collimator/dashboard/project.py
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
def upload_file(self, name: str, file: str, overwrite=True):
    """
    Uploads a file to a project.

    Args:
        name (str): The name of the file.
        file (str): The path to the file to be uploaded.
        overwrite (bool, optional): Flag indicating whether to overwrite an existing file with the same name.
            Defaults to True.

    Returns:
        dict: A dictionary containing the summary of the uploaded file.

    Raises:
        api.CollimatorApiError: If the file upload fails.
    """

    project_uuid = self.summary.uuid

    mime_type, _ = mimetypes.guess_type(name)
    mime_type = mime_type or "application/octet-stream"

    with open(file, "rb") as fp:
        size = os.fstat(fp.fileno()).st_size

        logger.info(
            "Uploading file %s (type: %s, size: %d) to project %s...",
            name,
            mime_type,
            size,
            project_uuid,
        )
        body = {
            "name": name,
            "content_type": mime_type,
            "overwrite": overwrite,
            "size": size,
        }

        try:
            put_url_response = api.post(
                f"/projects/{project_uuid}/files", body=body
            )
        except api.CollimatorApiError as e:
            logger.error("Failed to upload file %s: %s", name, e)
            raise

        s3_presigned_url = put_url_response["put_presigned_url"]
        s3_response = requests.put(
            s3_presigned_url,
            headers={"Content-Type": mime_type, "Content-Length": str(size)},
            data=fp if size > 0 else b"",
            verify=False,
        )

    if s3_response.status_code != 200:
        logger.error("s3 upload failed: %s", s3_response.text)
        raise api.CollimatorApiError(
            f"Failed to upload file {name} to project {project_uuid}"
        )
    file_uuid = put_url_response["summary"]["uuid"]
    process_response = api.call(
        f"/projects/{project_uuid}/files/{file_uuid}/process", "POST"
    )
    logger.info("Finished uploading file %s", name)

    return process_response["summary"]

upload_files(files, overwrite=True)

Uploads multiple files to a project.

Parameters:

Name Type Description Default
files list[str]

A list of file paths to be uploaded.

required
overwrite bool

Flag indicating whether to overwrite existing files. Defaults to True.

True
Source code in collimator/dashboard/project.py
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
def upload_files(self, files: list[str], overwrite=True):
    """
    Uploads multiple files to a project.

    Args:
        files (list[str]): A list of file paths to be uploaded.
        overwrite (bool, optional): Flag indicating whether to overwrite existing files.
            Defaults to True.
    """
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = []
        for file in files:
            name = os.path.basename(file)
            futures.append(executor.submit(self.upload_file, name, file, overwrite))
        for future in concurrent.futures.as_completed(futures):
            future.result()

create_project(name)

Creates a new project with the given name.

Parameters:

Name Type Description Default
name str

The name of the project.

required

Returns:

Name Type Description
ProjectSummary Project

The summary of the created project.

Source code in collimator/dashboard/project.py
738
739
740
741
742
743
744
745
746
747
748
749
750
751
def create_project(name: str) -> Project:
    """
    Creates a new project with the given name.

    Args:
        name (str): The name of the project.

    Returns:
        ProjectSummary: The summary of the created project.
    """
    logger.info("Creating project %s...", name)
    response = api.call("/projects", "POST", body={"title": name})
    summary = _parse_project_summary(response)
    return Project(summary=summary, models={}, files=[], submodels={})

delete_project(project_uuid)

Deletes a project with the given UUID.

Parameters:

Name Type Description Default
project_uuid str

The UUID of the project to delete.

required
Source code in collimator/dashboard/project.py
771
772
773
774
775
776
777
778
779
def delete_project(project_uuid: str):
    """
    Deletes a project with the given UUID.

    Args:
        project_uuid (str): The UUID of the project to delete.
    """
    logger.info("Deleting project %s...", project_uuid)
    api.call(f"/projects/{project_uuid}", "DELETE")

get_or_create_project(name, **kwargs)

Retrieves a project by its name or creates a new one if it doesn't exist.

Parameters:

Name Type Description Default
name str

The name of the project.

required
**kwargs

Additional keyword arguments. See get_project_by_uuid() for details.

{}

Returns:

Name Type Description
ProjectSummary Project

The summary of the retrieved or created project.

Source code in collimator/dashboard/project.py
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
def get_or_create_project(name: str, **kwargs) -> Project:
    """
    Retrieves a project by its name or creates a new one if it doesn't exist.

    Args:
        name (str): The name of the project.
        **kwargs: Additional keyword arguments. See get_project_by_uuid() for details.

    Returns:
        ProjectSummary: The summary of the retrieved or created project.
    """
    try:
        return get_project_by_name(name, **kwargs)
    except api.CollimatorNotFoundError:
        return create_project(name)

get_project_by_name(project_name, **kwargs)

Retrieves a project by its name.

Parameters:

Name Type Description Default
project_name str

The name of the project to retrieve.

required
**kwargs

Additional keyword arguments. See get_project_by_uuid() for details.

{}

Returns:

Name Type Description
Project Project

The project object.

Raises:

Type Description
CollimatorNotFoundError

If the project with the specified name is not found.

CollimatorApiError

If multiple projects with the same name are found.

Source code in collimator/dashboard/project.py
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
def get_project_by_name(project_name: str, **kwargs) -> Project:
    """
    Retrieves a project by its name.

    Args:
        project_name (str): The name of the project to retrieve.
        **kwargs: Additional keyword arguments. See get_project_by_uuid() for details.

    Returns:
        Project: The project object.

    Raises:
        CollimatorNotFoundError: If the project with the specified name is not found.
        CollimatorApiError: If multiple projects with the same name are found.

    """

    response = api.get("/projects")
    results = []
    for project in response["projects"]:
        if project["title"] == project_name:
            results.append(project)

    user_profile = api.get("/user/profile")

    if len(results) == 0:
        raise api.CollimatorNotFoundError(f"Project '{project_name}' not found")
    elif len(results) > 1:
        uuids = []
        for p in results:
            if p["is_default"]:
                uuids.append(f"- {p['uuid']} (public project)")
            elif p["owner_uuid"] == user_profile["uuid"]:
                uuids.append(f"- {p['uuid']} (private project)")
            else:
                uuids.append(f"- {p['uuid']} (shared with me)")
        uuids = "\n".join(uuids)

        raise api.CollimatorApiError(
            f"Multiple projects found with name '{project_name}':\n{uuids}\n"
            "Please use get_project_by_uuid() instead."
        )

    project_uuid = results[0]["uuid"]
    return get_project_by_uuid(project_uuid, **kwargs)

get_project_by_uuid(project_uuid, overwrite=False, download_files=True)

Retrieves a project with the given UUID and downloads its files.

Parameters:

Name Type Description Default
project_uuid str

The UUID of the project to retrieve.

required
overwrite bool

Flag indicating whether to overwrite local files. Defaults to False.

False
download_files bool

Flag indicating whether to download project files. Defaults to True.

True

Returns:

Name Type Description
Project Project

The downloaded project, including its models and files.

Source code in collimator/dashboard/project.py
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
def get_project_by_uuid(
    project_uuid: str, overwrite: bool = False, download_files: bool = True
) -> Project:
    """
    Retrieves a project with the given UUID and downloads its files.

    Args:
        project_uuid (str): The UUID of the project to retrieve.
        overwrite (bool, optional): Flag indicating whether to overwrite local files. Defaults to False.
        download_files (bool, optional): Flag indicating whether to download project files. Defaults to True.

    Returns:
        Project: The downloaded project, including its models and files.
    """

    logger.info("Downloading project %s...", project_uuid)

    project_response = api.get(f"/projects/{project_uuid}")
    project_summary = _parse_project_summary(project_response)

    project_dir = os.getcwd()
    logger.info("Project dir: %s", project_dir)

    # download project files
    files = []
    if download_files:
        for file in project_summary.files:
            if file.status == "processing_completed":
                dst = os.path.join(project_dir, file.name)
                _download_file(file, project_uuid, dst, overwrite=overwrite)
                files.append(dst)
            else:
                logger.warning(
                    "File %s is not ready to be downloaded (status: %s)",
                    file.name,
                    file.status,
                )

    # For loading custom leaf systems
    utils.add_py_init_file(project_dir)
    if project_dir not in sys.path:
        sys.path.append(project_dir)

    # Must first register submodels
    submodels_response = api.get(f"/project/{project_uuid}/submodels")
    visited = set()
    ref_submodels: dict[str, model_json.Model] = {}
    submodels = {}

    for model_summary in submodels_response["submodels"]:
        submodel = model_api.get_reference_submodel(project_uuid, model_summary["uuid"])
        if submodel is None:
            logger.warning(
                "Could not find submodel %s (%s)",
                model_summary["name"],
                model_summary["uuid"],
            )
            continue
        ref_submodels.update(
            {
                model_summary["uuid"]: submodel,
                **_get_reference_submodels(project_uuid, submodel, visited),
            }
        )
    for model_summary in project_summary.models:
        model = model_api.get_model(model_summary.uuid)
        if model is None:
            logger.warning(
                "Could not find model %s (%s)", model_summary.name, model_summary.uuid
            )
            continue
        ref_submodels.update(_get_reference_submodels(project_uuid, model, visited))
    for submodel_uuid, submodel in ref_submodels.items():
        logger.info("Registering submodel %s", submodel.name)
        from_model_json.register_reference_submodel(submodel_uuid, submodel)
        submodels[submodel.name] = submodel_uuid

    # Load models
    models = {}
    init_scripts_vars = {}
    for model_summary in project_summary.models:
        if model_summary.kind == ModelKind.MODEL:
            model = model_api.get_model(model_summary.uuid)
            init_scripts = model.configuration.workspace.init_scripts
            if len(init_scripts) > 1:
                raise NotImplementedError("Only one init script is supported")
            elif len(init_scripts) == 1 and init_scripts[0]:
                filename = init_scripts[0].file_name
                init_script_path = os.path.join(project_dir, filename)
                if os.path.exists(init_script_path):
                    logger.info("Evaluating %s", init_scripts[0])
                    with open(init_script_path, "r") as f:
                        import numpy as np

                        _globals = {**globals(), "np": np}
                        exec(f.read(), _globals)
                        init_scripts_vars[filename] = InitScriptVariables(_globals)

            models[model_summary.name] = model

    return Project(
        summary=project_summary,
        models=models,
        files=files,
        submodels=submodels,
        init_scripts=init_scripts_vars,
    )

simulate(model_uuid, timeout=None, ignore_cache=False, parameters=None, num_simulations=1, recorded_signals=None) async

Runs a simulation for the specified model UUID.

Parameters:

Name Type Description Default
model_uuid str

The UUID of the model to run the simulation for.

required
timeout int

The maximum time to wait for the simulation to complete, in seconds. Defaults to None.

None
ignore_cache bool

Flag indicating whether to ignore the cache. Defaults to False.

False
parameters dict[str, SweepValues | RandomDistribution | Parameter]

A dictionary of parameters to sweep. Defaults to None.

None
num_simulations int

The number of simulations to run, used in the case of monte-carlo. Defaults to 1.

1
recorded_signals list[str]

A list of signals to record. Must be specified in the case of ensemble simulations. Defaults to None.

None

Returns:

Type Description
SimulationResults | list[SimulationResults]

SimulationResults | list[EnsembleSimulationResults]: The results of the simulation.

Raises:

Type Description
TimeoutError

If the simulation does not complete within the specified timeout.

SimulationFailedError

If the simulation fails.

Source code in collimator/dashboard/project.py
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
async def simulate(
    model_uuid: str,
    timeout: int = None,
    ignore_cache: bool = False,
    parameters: dict[
        str,
        ensemble.SweepValues
        | ensemble.RandomDistribution
        | Parameter
        | float
        | int
        | bool
        | str,
    ] = None,
    num_simulations: int = 1,
    recorded_signals: list[str] = None,
) -> SimulationResults | list[SimulationResults]:
    """
    Runs a simulation for the specified model UUID.

    Args:
        model_uuid (str): The UUID of the model to run the simulation for.
        timeout (int, optional): The maximum time to wait for the simulation
            to complete, in seconds. Defaults to None.
        ignore_cache (bool, optional): Flag indicating whether to ignore the
            cache. Defaults to False.
        parameters (dict[str, SweepValues | RandomDistribution | Parameter], optional):
            A dictionary of parameters to sweep. Defaults to None.
        num_simulations (int, optional): The number of simulations to run,
            used in the case of monte-carlo. Defaults to 1.
        recorded_signals (list[str], optional): A list of signals to record. Must be
            specified in the case of ensemble simulations. Defaults to None.

    Returns:
        SimulationResults | list[EnsembleSimulationResults]: The results of the simulation.

    Raises:
        TimeoutError: If the simulation does not complete within the specified timeout.
        SimulationFailedError: If the simulation fails.

    """
    start_time = time.perf_counter()
    body = {"ignore_cache": ignore_cache}
    is_ensemble = False
    if parameters:
        has_random_distrib = any(
            isinstance(v, ensemble.RandomDistribution) for v in parameters.values()
        )
        has_sweep_values = any(
            isinstance(v, ensemble.SweepValues) for v in parameters.values()
        )
        is_ensemble = has_random_distrib or has_sweep_values

        if has_random_distrib and has_sweep_values:
            raise ValueError("Cannot have both sweep values and random distributions")

        body["parameters"] = {}

        model_parameter_sweeps = []
        for param_name, param_value in parameters.items():
            if param_name is None:
                raise ValueError("Parameter name cannot be None")

            if isinstance(param_value, ensemble.SweepValues):
                model_parameter_sweeps.append(
                    {
                        "parameter_name": param_name,
                        "sweep_values": {
                            "sweep_kind": "array",
                            "values": str(param_value.values),
                        },
                    }
                )
            elif isinstance(param_value, ensemble.RandomDistribution):
                model_parameter_sweeps.append(
                    {
                        "parameter_name": param_name,
                        "sweep_values": {
                            "sweep_kind": "distribution",
                            "distribution_name": param_value.distribution,
                            "parameters": param_value.parameters,
                        },
                    }
                )
            elif isinstance(param_value, (float, int, bool, str)):
                body["parameters"][param_name] = model_json.Parameter(
                    value=str(param_value), is_string=False
                ).to_dict()
            else:
                expr, _ = param_value.value_as_api_param(
                    allow_param_name=False, allow_string_literal=False
                )
                json_param = model_json.Parameter(value=expr, is_string=False)
                body["parameters"][param_name] = json_param.to_dict()

    body["model_overrides"] = {}
    if recorded_signals:
        recorded_signals += ["time"]
        body["model_overrides"]["recorded_signals"] = {"signal_ids": recorded_signals}

    if is_ensemble:
        body["save_npz"] = True
        body["target"] = "ensemble"
        body["model_overrides"]["ensemble_config"] = {
            "model_parameter_sweeps": model_parameter_sweeps,
            "sweep_strategy": "all_combinations" if has_sweep_values else "monte_carlo",
        }
        if has_random_distrib:
            body["model_overrides"]["ensemble_config"]["num_sims"] = num_simulations

    try:
        summary = api.post(f"/models/{model_uuid}/simulations", body=body)

        # wait for simulation completion
        while summary["status"] not in ("completed", "failed"):
            await asyncio.sleep(1)
            logger.info("Waiting for simulation to complete...")
            summary = api.get(f"/models/{model_uuid}/simulations/{summary['uuid']}")
            if timeout is not None and time.perf_counter() - start_time > timeout:
                stop_simulation(model_uuid, summary["uuid"])
                raise TimeoutError
    except asyncio.CancelledError:  # happens on KeyboardInterrupt
        logger.info("Simulation interrupted")
        stop_simulation(model_uuid, summary["uuid"])
        raise

    logs = api.get(f"/models/{model_uuid}/simulations/{summary['uuid']}/logs")
    logger.info(logs)

    if summary["status"] == "failed":
        raise SimulationFailedError(summary["fail_reason"])

    signals = results.get_signals(model_uuid, summary["uuid"], signals=recorded_signals)

    if is_ensemble:
        ensemble_results = []
        for run_id, time_signal in signals["time"].items():
            params = run_id.split(" ")
            params = {c.split("=")[0]: c.split("=")[1] for c in params}
            ensemble_results.append(
                SimulationResults(
                    context=None,
                    time=time_signal,
                    outputs={s: signals[s][run_id] for s in signals if s != "time"},
                    parameters=params,
                )
            )
        return ensemble_results

    return SimulationResults(
        context=None,
        time=signals["time"],
        outputs=signals,
    )

stop_simulation(model_uuid, simulation_uuid)

Stops a running simulation.

Parameters:

Name Type Description Default
model_uuid str

The UUID of the model for which the simulation is running.

required
simulation_uuid str

The UUID of the simulation to stop.

required

Returns:

Name Type Description
dict dict

The response from the API call.

Source code in collimator/dashboard/project.py
782
783
784
785
786
787
788
789
790
791
792
793
794
def stop_simulation(model_uuid: str, simulation_uuid: str) -> dict:
    """
    Stops a running simulation.

    Args:
        model_uuid (str): The UUID of the model for which the simulation is running.
        simulation_uuid (str): The UUID of the simulation to stop.

    Returns:
        dict: The response from the API call.

    """
    return api.post(f"/jobs/{simulation_uuid}/stop")