Skip to content

Curator

Contained within this file are experimental interfaces for working with the Synapse Python Client. Unless otherwise noted these interfaces are subject to change at any time. Use at your own risk.

API reference

synapseclient.models.CurationTask dataclass

Bases: CurationTaskSynchronousProtocol

The CurationTask provides instructions for a Data Contributor on how data or metadata of a specific type should be both added to a project and curated.

Represents a Synapse CurationTask.

ATTRIBUTE DESCRIPTION
task_id

The unique identifier issued to this task when it was created

TYPE: Optional[int]

data_type

Will match the data type that a contributor plans to contribute

TYPE: Optional[str]

project_id

The synId of the project

TYPE: Optional[str]

instructions

Instructions to the data contributor

TYPE: Optional[str]

task_properties

The properties of a CurationTask. This can be either FileBasedMetadataTaskProperties or RecordBasedMetadataTaskProperties.

TYPE: Optional[Union[FileBasedMetadataTaskProperties, RecordBasedMetadataTaskProperties]]

etag

Synapse employs an Optimistic Concurrency Control (OCC) scheme to handle concurrent updates. Since the E-Tag changes every time an entity is updated it is used to detect when a client's current representation of an entity is out-of-date.

TYPE: Optional[str]

created_on

(Read Only) The date this task was created

TYPE: Optional[str]

modified_on

(Read Only) The date this task was last modified

TYPE: Optional[str]

created_by

(Read Only) The ID of the user that created this task

TYPE: Optional[str]

modified_by

(Read Only) The ID of the user that last modified this task

TYPE: Optional[str]

Complete curation task workflow

 

from synapseclient import Synapse
from synapseclient.models import CurationTask, FileBasedMetadataTaskProperties

syn = Synapse()
syn.login()

# Create a new file-based curation task
file_properties = FileBasedMetadataTaskProperties(
    upload_folder_id="syn1234567",
    file_view_id="syn2345678"
)

task = CurationTask(
    project_id="syn9876543",
    data_type="genomics_data",
    instructions="Upload your genomics files and complete metadata",
    task_properties=file_properties
)
task = task.store()
print(f"Created task: {task.task_id}")

# Later, retrieve and update the task
existing_task = CurationTask(task_id=task.task_id).get()
existing_task.instructions = "Updated instructions with new requirements"
existing_task.store()

# List all tasks in the project
for project_task in CurationTask.list(project_id="syn9876543"):
    print(f"Task: {project_task.data_type} - {project_task.task_id}")
Source code in synapseclient/models/curation.py
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
@dataclass
@async_to_sync
class CurationTask(CurationTaskSynchronousProtocol):
    """
    The CurationTask provides instructions for a Data Contributor on how data or metadata
    of a specific type should be both added to a project and curated.

    Represents a [Synapse CurationTask](https://rest-docs.synapse.org/org/sagebionetworks/repo/model/curation/CurationTask.html).

    Attributes:
        task_id: The unique identifier issued to this task when it was created
        data_type: Will match the data type that a contributor plans to contribute
        project_id: The synId of the project
        instructions: Instructions to the data contributor
        task_properties: The properties of a CurationTask. This can be either
            FileBasedMetadataTaskProperties or RecordBasedMetadataTaskProperties.
        etag: Synapse employs an Optimistic Concurrency Control (OCC) scheme to handle
            concurrent updates. Since the E-Tag changes every time an entity is updated
            it is used to detect when a client's current representation of an entity is
            out-of-date.
        created_on: (Read Only) The date this task was created
        modified_on: (Read Only) The date this task was last modified
        created_by: (Read Only) The ID of the user that created this task
        modified_by: (Read Only) The ID of the user that last modified this task

    Example: Complete curation task workflow
         

        ```python
        from synapseclient import Synapse
        from synapseclient.models import CurationTask, FileBasedMetadataTaskProperties

        syn = Synapse()
        syn.login()

        # Create a new file-based curation task
        file_properties = FileBasedMetadataTaskProperties(
            upload_folder_id="syn1234567",
            file_view_id="syn2345678"
        )

        task = CurationTask(
            project_id="syn9876543",
            data_type="genomics_data",
            instructions="Upload your genomics files and complete metadata",
            task_properties=file_properties
        )
        task = task.store()
        print(f"Created task: {task.task_id}")

        # Later, retrieve and update the task
        existing_task = CurationTask(task_id=task.task_id).get()
        existing_task.instructions = "Updated instructions with new requirements"
        existing_task.store()

        # List all tasks in the project
        for project_task in CurationTask.list(project_id="syn9876543"):
            print(f"Task: {project_task.data_type} - {project_task.task_id}")
        ```
    """

    task_id: Optional[int] = None
    """The unique identifier issued to this task when it was created"""

    data_type: Optional[str] = None
    """Will match the data type that a contributor plans to contribute. The dataType must be unique within a project"""

    project_id: Optional[str] = None
    """The synId of the project"""

    instructions: Optional[str] = None
    """Instructions to the data contributor"""

    task_properties: Optional[
        Union[FileBasedMetadataTaskProperties, RecordBasedMetadataTaskProperties]
    ] = None
    """The properties of a CurationTask"""

    etag: Optional[str] = None
    """Synapse employs an Optimistic Concurrency Control (OCC) scheme to handle concurrent updates. Since the E-Tag changes every time an entity is updated it is used to detect when a client's current representation of an entity is out-of-date"""

    created_on: Optional[str] = None
    """(Read Only) The date this task was created"""

    modified_on: Optional[str] = None
    """(Read Only) The date this task was last modified"""

    created_by: Optional[str] = None
    """(Read Only) The ID of the user that created this task"""

    modified_by: Optional[str] = None
    """(Read Only) The ID of the user that last modified this task"""

    _last_persistent_instance: Optional["CurationTask"] = field(
        default=None, repr=False, compare=False
    )
    """The last persistent instance of this object. This is used to determine if the
    object has been changed and needs to be updated in Synapse."""

    @property
    def has_changed(self) -> bool:
        """Determines if the object has been changed and needs to be updated in Synapse."""
        return (
            not self._last_persistent_instance or self._last_persistent_instance != self
        )

    def _set_last_persistent_instance(self) -> None:
        """Stash the last time this object interacted with Synapse. This is used to
        determine if the object has been changed and needs to be updated in Synapse."""
        del self._last_persistent_instance
        self._last_persistent_instance = replace(self)

    def fill_from_dict(
        self, synapse_response: Union[Dict[str, Any], Any]
    ) -> "CurationTask":
        """
        Converts a response from the REST API into this dataclass.

        Arguments:
            synapse_response: The response from the REST API.

        Returns:
            The CurationTask object.
        """
        self.task_id = (
            int(synapse_response.get("taskId", None))
            if synapse_response.get("taskId", None)
            else None
        )
        self.data_type = synapse_response.get("dataType", None)
        self.project_id = synapse_response.get("projectId", None)
        self.instructions = synapse_response.get("instructions", None)
        self.etag = synapse_response.get("etag", None)
        self.created_on = synapse_response.get("createdOn", None)
        self.modified_on = synapse_response.get("modifiedOn", None)
        self.created_by = synapse_response.get("createdBy", None)
        self.modified_by = synapse_response.get("modifiedBy", None)

        task_properties_dict = synapse_response.get("taskProperties", None)
        if task_properties_dict:
            self.task_properties = _create_task_properties_from_dict(
                task_properties_dict
            )

        return self

    def to_synapse_request(self) -> Dict[str, Any]:
        """
        Converts this dataclass to a dictionary suitable for a Synapse REST API request.

        Returns:
            A dictionary representation of this object for API requests.
        """
        request_dict = {}
        request_dict["taskId"] = self.task_id
        request_dict["dataType"] = self.data_type
        request_dict["projectId"] = self.project_id
        request_dict["instructions"] = self.instructions
        request_dict["etag"] = self.etag
        request_dict["createdOn"] = self.created_on
        request_dict["modifiedOn"] = self.modified_on
        request_dict["createdBy"] = self.created_by
        request_dict["modifiedBy"] = self.modified_by

        if self.task_properties is not None:
            request_dict["taskProperties"] = self.task_properties.to_synapse_request()

        delete_none_keys(request_dict)
        return request_dict

    async def get_async(
        self, *, synapse_client: Optional[Synapse] = None
    ) -> "CurationTask":
        """
        Gets a CurationTask from Synapse by ID.

        Arguments:
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

        Returns:
            CurationTask: The CurationTask object.

        Raises:
            ValueError: If the CurationTask object does not have a task_id.

        Example: Get a curation task asynchronously
             

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import CurationTask

            syn = Synapse()
            syn.login()

            async def main():
                task = await CurationTask(task_id=123).get_async()
                print(f"Data type: {task.data_type}")
                print(f"Instructions: {task.instructions}")

            asyncio.run(main())
            ```
        """
        if not self.task_id:
            raise ValueError("task_id is required to get a CurationTask")

        trace.get_current_span().set_attributes(
            {
                "synapse.task_id": str(self.task_id),
            }
        )

        task_result = await get_curation_task(
            task_id=self.task_id, synapse_client=synapse_client
        )
        self.fill_from_dict(synapse_response=task_result)
        self._set_last_persistent_instance()
        return self

    async def delete_async(self, *, synapse_client: Optional[Synapse] = None) -> None:
        """
        Deletes a CurationTask from Synapse.

        Arguments:
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

        Raises:
            ValueError: If the CurationTask object does not have a task_id.

        Example: Delete a curation task asynchronously
             

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import CurationTask

            syn = Synapse()
            syn.login()

            async def main():
                task = CurationTask(task_id=123)
                await task.delete_async()
                print("Task deleted successfully")

            asyncio.run(main())
            ```
        """
        if not self.task_id:
            raise ValueError("task_id is required to delete a CurationTask")

        trace.get_current_span().set_attributes(
            {
                "synapse.task_id": str(self.task_id),
            }
        )

        await delete_curation_task(task_id=self.task_id, synapse_client=synapse_client)

    async def store_async(
        self, *, synapse_client: Optional[Synapse] = None
    ) -> "CurationTask":
        """
        Creates a new CurationTask or updates an existing one on Synapse.

        This method implements non-destructive updates. If a CurationTask with the same
        project_id and data_type exists and this instance hasn't been retrieved from
        Synapse before, it will merge the existing task data with the current instance
        before updating.

        Arguments:
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

        Returns:
            CurationTask: The CurationTask object.

        Example: Create a new curation task asynchronously
             

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import CurationTask, FileBasedMetadataTaskProperties

            syn = Synapse()
            syn.login()

            async def main():
                # Create file-based task properties
                file_properties = FileBasedMetadataTaskProperties(
                    upload_folder_id="syn1234567",
                    file_view_id="syn2345678"
                )

                # Create and store the curation task
                task = CurationTask(
                    project_id="syn9876543",
                    data_type="genomics_data",
                    instructions="Upload your genomics files to the specified folder",
                    task_properties=file_properties
                )
                task = await task.store_async()
                print(f"Created task with ID: {task.task_id}")

            asyncio.run(main())
            ```
        """
        if not self.project_id:
            raise ValueError("project_id is required")
        if not self.data_type:
            raise ValueError("data_type is required")

        trace.get_current_span().set_attributes(
            {
                "synapse.data_type": self.data_type or "",
                "synapse.project_id": self.project_id or "",
                "synapse.task_id": str(self.task_id) if self.task_id else "",
            }
        )

        if (
            not self._last_persistent_instance
            and not self.task_id
            and (
                existing_task_id := await _get_existing_curation_task_id(
                    project_id=self.project_id,
                    data_type=self.data_type,
                    synapse_client=synapse_client,
                )
            )
            and (
                existing_task := await CurationTask(task_id=existing_task_id).get_async(
                    synapse_client=synapse_client
                )
            )
        ):
            merge_dataclass_entities(source=existing_task, destination=self)

        if self.task_id:
            task_result = await update_curation_task(
                task_id=self.task_id,
                curation_task=self.to_synapse_request(),
                synapse_client=synapse_client,
            )
            self.fill_from_dict(synapse_response=task_result)
            self._set_last_persistent_instance()
            return self
        else:
            if not self.project_id:
                raise ValueError("project_id is required to create a CurationTask")
            if not self.data_type:
                raise ValueError("data_type is required to create a CurationTask")
            if not self.instructions:
                raise ValueError("instructions is required to create a CurationTask")
            if not self.task_properties:
                raise ValueError("task_properties is required to create a CurationTask")

            task_result = await create_curation_task(
                curation_task=self.to_synapse_request(),
                synapse_client=synapse_client,
            )
            self.fill_from_dict(synapse_response=task_result)
            self._set_last_persistent_instance()
            return self

    @skip_async_to_sync
    @classmethod
    async def list_async(
        cls,
        project_id: str,
        *,
        synapse_client: Optional[Synapse] = None,
    ) -> AsyncGenerator["CurationTask", None]:
        """
        Generator that yields CurationTasks for a project as they become available.

        Arguments:
            project_id: The synId of the project.
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

        Yields:
            CurationTask objects as they are retrieved from the API.

        Example: List all curation tasks in a project asynchronously
             

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import CurationTask

            syn = Synapse()
            syn.login()

            async def main():
                # List all curation tasks in the project
                async for task in CurationTask.list_async(project_id="syn9876543"):
                    print(f"Task ID: {task.task_id}")
                    print(f"Data Type: {task.data_type}")
                    print(f"Instructions: {task.instructions}")
                    print("---")

            asyncio.run(main())
            ```
        """
        trace.get_current_span().set_attributes(
            {
                "synapse.project_id": project_id,
            }
        )

        async for task_dict in list_curation_tasks(
            project_id=project_id, synapse_client=synapse_client
        ):
            task = cls().fill_from_dict(synapse_response=task_dict)
            yield task

Functions

get_async async

get_async(*, synapse_client: Optional[Synapse] = None) -> CurationTask

Gets a CurationTask from Synapse by ID.

PARAMETER DESCRIPTION
synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
CurationTask

The CurationTask object.

TYPE: CurationTask

RAISES DESCRIPTION
ValueError

If the CurationTask object does not have a task_id.

Get a curation task asynchronously

 

import asyncio
from synapseclient import Synapse
from synapseclient.models import CurationTask

syn = Synapse()
syn.login()

async def main():
    task = await CurationTask(task_id=123).get_async()
    print(f"Data type: {task.data_type}")
    print(f"Instructions: {task.instructions}")

asyncio.run(main())
Source code in synapseclient/models/curation.py
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
async def get_async(
    self, *, synapse_client: Optional[Synapse] = None
) -> "CurationTask":
    """
    Gets a CurationTask from Synapse by ID.

    Arguments:
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.

    Returns:
        CurationTask: The CurationTask object.

    Raises:
        ValueError: If the CurationTask object does not have a task_id.

    Example: Get a curation task asynchronously
         

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import CurationTask

        syn = Synapse()
        syn.login()

        async def main():
            task = await CurationTask(task_id=123).get_async()
            print(f"Data type: {task.data_type}")
            print(f"Instructions: {task.instructions}")

        asyncio.run(main())
        ```
    """
    if not self.task_id:
        raise ValueError("task_id is required to get a CurationTask")

    trace.get_current_span().set_attributes(
        {
            "synapse.task_id": str(self.task_id),
        }
    )

    task_result = await get_curation_task(
        task_id=self.task_id, synapse_client=synapse_client
    )
    self.fill_from_dict(synapse_response=task_result)
    self._set_last_persistent_instance()
    return self

delete_async async

delete_async(*, synapse_client: Optional[Synapse] = None) -> None

Deletes a CurationTask from Synapse.

PARAMETER DESCRIPTION
synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RAISES DESCRIPTION
ValueError

If the CurationTask object does not have a task_id.

Delete a curation task asynchronously

 

import asyncio
from synapseclient import Synapse
from synapseclient.models import CurationTask

syn = Synapse()
syn.login()

async def main():
    task = CurationTask(task_id=123)
    await task.delete_async()
    print("Task deleted successfully")

asyncio.run(main())
Source code in synapseclient/models/curation.py
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
async def delete_async(self, *, synapse_client: Optional[Synapse] = None) -> None:
    """
    Deletes a CurationTask from Synapse.

    Arguments:
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.

    Raises:
        ValueError: If the CurationTask object does not have a task_id.

    Example: Delete a curation task asynchronously
         

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import CurationTask

        syn = Synapse()
        syn.login()

        async def main():
            task = CurationTask(task_id=123)
            await task.delete_async()
            print("Task deleted successfully")

        asyncio.run(main())
        ```
    """
    if not self.task_id:
        raise ValueError("task_id is required to delete a CurationTask")

    trace.get_current_span().set_attributes(
        {
            "synapse.task_id": str(self.task_id),
        }
    )

    await delete_curation_task(task_id=self.task_id, synapse_client=synapse_client)

store_async async

store_async(*, synapse_client: Optional[Synapse] = None) -> CurationTask

Creates a new CurationTask or updates an existing one on Synapse.

This method implements non-destructive updates. If a CurationTask with the same project_id and data_type exists and this instance hasn't been retrieved from Synapse before, it will merge the existing task data with the current instance before updating.

PARAMETER DESCRIPTION
synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
CurationTask

The CurationTask object.

TYPE: CurationTask

Create a new curation task asynchronously

 

import asyncio
from synapseclient import Synapse
from synapseclient.models import CurationTask, FileBasedMetadataTaskProperties

syn = Synapse()
syn.login()

async def main():
    # Create file-based task properties
    file_properties = FileBasedMetadataTaskProperties(
        upload_folder_id="syn1234567",
        file_view_id="syn2345678"
    )

    # Create and store the curation task
    task = CurationTask(
        project_id="syn9876543",
        data_type="genomics_data",
        instructions="Upload your genomics files to the specified folder",
        task_properties=file_properties
    )
    task = await task.store_async()
    print(f"Created task with ID: {task.task_id}")

asyncio.run(main())
Source code in synapseclient/models/curation.py
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
async def store_async(
    self, *, synapse_client: Optional[Synapse] = None
) -> "CurationTask":
    """
    Creates a new CurationTask or updates an existing one on Synapse.

    This method implements non-destructive updates. If a CurationTask with the same
    project_id and data_type exists and this instance hasn't been retrieved from
    Synapse before, it will merge the existing task data with the current instance
    before updating.

    Arguments:
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.

    Returns:
        CurationTask: The CurationTask object.

    Example: Create a new curation task asynchronously
         

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import CurationTask, FileBasedMetadataTaskProperties

        syn = Synapse()
        syn.login()

        async def main():
            # Create file-based task properties
            file_properties = FileBasedMetadataTaskProperties(
                upload_folder_id="syn1234567",
                file_view_id="syn2345678"
            )

            # Create and store the curation task
            task = CurationTask(
                project_id="syn9876543",
                data_type="genomics_data",
                instructions="Upload your genomics files to the specified folder",
                task_properties=file_properties
            )
            task = await task.store_async()
            print(f"Created task with ID: {task.task_id}")

        asyncio.run(main())
        ```
    """
    if not self.project_id:
        raise ValueError("project_id is required")
    if not self.data_type:
        raise ValueError("data_type is required")

    trace.get_current_span().set_attributes(
        {
            "synapse.data_type": self.data_type or "",
            "synapse.project_id": self.project_id or "",
            "synapse.task_id": str(self.task_id) if self.task_id else "",
        }
    )

    if (
        not self._last_persistent_instance
        and not self.task_id
        and (
            existing_task_id := await _get_existing_curation_task_id(
                project_id=self.project_id,
                data_type=self.data_type,
                synapse_client=synapse_client,
            )
        )
        and (
            existing_task := await CurationTask(task_id=existing_task_id).get_async(
                synapse_client=synapse_client
            )
        )
    ):
        merge_dataclass_entities(source=existing_task, destination=self)

    if self.task_id:
        task_result = await update_curation_task(
            task_id=self.task_id,
            curation_task=self.to_synapse_request(),
            synapse_client=synapse_client,
        )
        self.fill_from_dict(synapse_response=task_result)
        self._set_last_persistent_instance()
        return self
    else:
        if not self.project_id:
            raise ValueError("project_id is required to create a CurationTask")
        if not self.data_type:
            raise ValueError("data_type is required to create a CurationTask")
        if not self.instructions:
            raise ValueError("instructions is required to create a CurationTask")
        if not self.task_properties:
            raise ValueError("task_properties is required to create a CurationTask")

        task_result = await create_curation_task(
            curation_task=self.to_synapse_request(),
            synapse_client=synapse_client,
        )
        self.fill_from_dict(synapse_response=task_result)
        self._set_last_persistent_instance()
        return self

list_async async classmethod

list_async(project_id: str, *, synapse_client: Optional[Synapse] = None) -> AsyncGenerator[CurationTask, None]

Generator that yields CurationTasks for a project as they become available.

PARAMETER DESCRIPTION
project_id

The synId of the project.

TYPE: str

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

YIELDS DESCRIPTION
AsyncGenerator[CurationTask, None]

CurationTask objects as they are retrieved from the API.

List all curation tasks in a project asynchronously

 

import asyncio
from synapseclient import Synapse
from synapseclient.models import CurationTask

syn = Synapse()
syn.login()

async def main():
    # List all curation tasks in the project
    async for task in CurationTask.list_async(project_id="syn9876543"):
        print(f"Task ID: {task.task_id}")
        print(f"Data Type: {task.data_type}")
        print(f"Instructions: {task.instructions}")
        print("---")

asyncio.run(main())
Source code in synapseclient/models/curation.py
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
@skip_async_to_sync
@classmethod
async def list_async(
    cls,
    project_id: str,
    *,
    synapse_client: Optional[Synapse] = None,
) -> AsyncGenerator["CurationTask", None]:
    """
    Generator that yields CurationTasks for a project as they become available.

    Arguments:
        project_id: The synId of the project.
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.

    Yields:
        CurationTask objects as they are retrieved from the API.

    Example: List all curation tasks in a project asynchronously
         

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import CurationTask

        syn = Synapse()
        syn.login()

        async def main():
            # List all curation tasks in the project
            async for task in CurationTask.list_async(project_id="syn9876543"):
                print(f"Task ID: {task.task_id}")
                print(f"Data Type: {task.data_type}")
                print(f"Instructions: {task.instructions}")
                print("---")

        asyncio.run(main())
        ```
    """
    trace.get_current_span().set_attributes(
        {
            "synapse.project_id": project_id,
        }
    )

    async for task_dict in list_curation_tasks(
        project_id=project_id, synapse_client=synapse_client
    ):
        task = cls().fill_from_dict(synapse_response=task_dict)
        yield task

synapseclient.models.RecordSet dataclass

Bases: RecordSetSynchronousProtocol, AccessControllable, BaseJSONSchema

A RecordSet within Synapse.

ATTRIBUTE DESCRIPTION
id

The unique immutable ID for this file. A new ID will be generated for new Files. Once issued, this ID is guaranteed to never change or be re-issued.

TYPE: Optional[str]

name

The name of this entity. Must be 256 characters or less. Names may only contain: letters, numbers, spaces, underscores, hyphens, periods, plus signs, apostrophes, and parentheses. If not specified, the name will be derived from the file name.

TYPE: Optional[str]

path

The path to the file on disk. Using shorthand ~ will be expanded to the user's home directory.

This is used during a get operation to specify where to download the file to. It should be pointing to a directory.

This is also used during a store operation to specify the file to upload. It should be pointing to a file.

TYPE: Optional[str]

description

The description of this file. Must be 1000 characters or less.

TYPE: Optional[str]

parent_id

The ID of the Entity that is the parent of this Entity. Setting this to a new value and storing it will move this File under the new parent.

TYPE: Optional[str]

version_label

The version label for this entity. Updates to the entity will increment the version number.

TYPE: Optional[str]

version_comment

The version comment for this entity.

TYPE: Optional[str]

data_file_handle_id

ID of the file handle associated with this entity. You may define an existing data_file_handle_id to use the existing data_file_handle_id. The creator of the file must also be the owner of the data_file_handle_id to have permission to store the file.

TYPE: Optional[str]

activity

The Activity model represents the main record of Provenance in Synapse. It is analygous to the Activity defined in the W3C Specification on Provenance. Activity cannot be removed during a store operation by setting it to None. You must use: synapseclient.models.Activity.delete_async or synapseclient.models.Activity.disassociate_from_entity_async.

TYPE: Optional[Activity]

annotations

Additional metadata associated with the entity. The key is the name of your desired annotations. The value is an object containing a list of values (use empty list to represent no values for key) and the value type associated with all values in the list. To remove all annotations set this to an empty dict {}.

TYPE: Optional[Dict[str, Union[List[str], List[bool], List[float], List[int], List[date], List[datetime]]]]

upsert_keys

One or more column names that define this upsert key for this set. This key is used to determine if a new record should be treated as an update or an insert.

TYPE: Optional[List[str]]

csv_descriptor

The description of a CSV for upload or download.

TYPE: Optional[CsvTableDescriptor]

validation_summary

Summary statistics for the JSON schema validation results for the children of an Entity container (Project or Folder).

TYPE: Optional[ValidationSummary]

file_name_override

An optional replacement for the name of the uploaded file. This is distinct from the entity name. If omitted the file will retain its original name.

TYPE: Optional[str]

content_type

(New Upload Only) Used to manually specify Content-type header, for example 'application/png' or 'application/json; charset=UTF-8'. If not specified, the content type will be derived from the file extension.

This can be specified only during the initial store of this file. In order to change this after the File has been created use synapseclient.models.File.change_metadata.

TYPE: Optional[str]

content_size

(New Upload Only) The size of the file in bytes. This can be specified only during the initial creation of the File. This is also only applicable to files not uploaded to Synapse. ie: synapse_store is False.

TYPE: Optional[int]

content_md5

(Store only) The MD5 of the file is known. If not supplied this will be computed in the client is possible. If supplied for a file entity already stored in Synapse it will be calculated again to check if a new upload needs to occur. This will not be filled in during a read for data. It is only used during a store operation. To retrieve the md5 of the file after read from synapse use the .file_handle.content_md5 attribute.

TYPE: Optional[str]

create_or_update

(Store only) Indicates whether the method should automatically perform an update if the file conflicts with an existing Synapse object.

TYPE: bool

force_version

(Store only) Indicates whether the method should increment the version of the object if something within the entity has changed. For example updating the description or name. You may set this to False and an update to the entity will not increment the version.

Updating the version_label attribute will also cause a version update regardless of this flag.

An update to the MD5 of the file will force a version update regardless of this flag.

TYPE: bool

is_restricted

(Store only) If set to true, an email will be sent to the Synapse access control team to start the process of adding terms-of-use or review board approval for this entity. You will be contacted with regards to the specific data being restricted and the requirements of access.

This may be used only by an administrator of the specified file.

TYPE: bool

merge_existing_annotations

(Store only) Works in conjunction with create_or_update in that this is only evaluated if create_or_update is True. If this entity exists in Synapse that has annotations that are not present in a store operation, these annotations will be added to the entity. If this is False any annotations that are not present within a store operation will be removed from this entity. This allows one to complete a destructive update of annotations on an entity.

TYPE: bool

associate_activity_to_new_version

(Store only) Works in conjunction with create_or_update in that this is only evaluated if create_or_update is True. When true an activity already attached to the current version of this entity will be associated the new version during a store operation if the version was updated. This is useful if you are updating the entity and want to ensure that the activity is persisted onto the new version the entity.

When this is False the activity will not be associated to the new version of the entity during a store operation.

Regardless of this setting, if you have an Activity object on the entity it will be persisted onto the new version. This is only used when you don't have an Activity object on the entity.

TYPE: bool

synapse_store

(Store only) Whether the File should be uploaded or if false: only the path should be stored when synapseclient.models.File.store is called.

TYPE: bool

download_file

(Get only) If True the file will be downloaded.

TYPE: bool

if_collision

(Get only) Determines how to handle file collisions. Defaults to "keep.both". May be:

  • overwrite.local
  • keep.local
  • keep.both

TYPE: str

synapse_container_limit

(Get only) A Synanpse ID used to limit the search in Synapse if file is specified as a local file. That is, if the file is stored in multiple locations in Synapse only the ones in the specified folder/project will be returned.

TYPE: Optional[str]

etag

(Read Only) Synapse employs an Optimistic Concurrency Control (OCC) scheme to handle concurrent updates. Since the E-Tag changes every time an entity is updated it is used to detect when a client's current representation of an entity is out-of-date.

TYPE: Optional[str]

created_on

(Read Only) The date this entity was created.

TYPE: Optional[str]

modified_on

(Read Only) The date this entity was last modified.

TYPE: Optional[str]

created_by

(Read Only) The ID of the user that created this entity.

TYPE: Optional[str]

modified_by

(Read Only) The ID of the user that last modified this entity.

TYPE: Optional[str]

version_number

(Read Only) The version number issued to this version on the object.

TYPE: Optional[int]

is_latest_version

(Read Only) If this is the latest version of the object.

TYPE: Optional[bool]

file_handle

(Read Only) The file handle associated with this entity.

TYPE: Optional[FileHandle]

Source code in synapseclient/models/recordset.py
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
@dataclass()
@async_to_sync
class RecordSet(RecordSetSynchronousProtocol, AccessControllable, BaseJSONSchema):
    """A RecordSet within Synapse.

    Attributes:
        id: The unique immutable ID for this file. A new ID will be generated for new
            Files. Once issued, this ID is guaranteed to never change or be re-issued.
        name: The name of this entity. Must be 256 characters or less. Names may only
            contain: letters, numbers, spaces, underscores, hyphens, periods, plus
            signs, apostrophes, and parentheses. If not specified, the name will be
            derived from the file name.
        path: The path to the file on disk. Using shorthand `~` will be expanded to
            the user's home directory.

            This is used during a `get` operation to specify where to download the
            file to. It should be pointing to a directory.

            This is also used during a `store` operation to specify the file to
            upload. It should be pointing to a file.
        description: The description of this file. Must be 1000 characters or less.
        parent_id: The ID of the Entity that is the parent of this Entity. Setting
            this to a new value and storing it will move this File under the new
            parent.
        version_label: The version label for this entity. Updates to the entity will
            increment the version number.
        version_comment: The version comment for this entity.
        data_file_handle_id: ID of the file handle associated with this entity. You
            may define an existing data_file_handle_id to use the existing
            data_file_handle_id. The creator of the file must also be the owner of
            the data_file_handle_id to have permission to store the file.
        activity: The Activity model represents the main record of Provenance in
            Synapse.  It is analygous to the Activity defined in the
            [W3C Specification](https://www.w3.org/TR/prov-n/) on Provenance.
            Activity cannot be removed during a store operation by setting it to None.
            You must use: [synapseclient.models.Activity.delete_async][] or
            [synapseclient.models.Activity.disassociate_from_entity_async][].
        annotations: Additional metadata associated with the entity. The key is the
            name of your desired annotations. The value is an object containing a list
            of values (use empty list to represent no values for key) and the value
            type associated with all values in the list. To remove all annotations
            set this to an empty dict `{}`.
        upsert_keys: One or more column names that define this upsert key for this
            set. This key is used to determine if a new record should be treated as
            an update or an insert.
        csv_descriptor: The description of a CSV for upload or download.
        validation_summary: Summary statistics for the JSON schema validation results
            for the children of an Entity container (Project or Folder).
        file_name_override: An optional replacement for the name of the uploaded
            file. This is distinct from the entity name. If omitted the file will
            retain its original name.
        content_type: (New Upload Only) Used to manually specify Content-type header,
            for example 'application/png' or 'application/json; charset=UTF-8'. If not
            specified, the content type will be derived from the file extension.

            This can be specified only during the initial store of this file. In order
            to change this after the File has been created use
            [synapseclient.models.File.change_metadata][].
        content_size: (New Upload Only) The size of the file in bytes. This can be
            specified only during the initial creation of the File. This is also only
            applicable to files not uploaded to Synapse. ie: `synapse_store` is False.
        content_md5: (Store only) The MD5 of the file is known. If not supplied this
            will be computed in the client is possible. If supplied for a file entity
            already stored in Synapse it will be calculated again to check if a new
            upload needs to occur. This will not be filled in during a read for data.
            It is only used during a store operation. To retrieve the md5 of the file
            after read from synapse use the `.file_handle.content_md5` attribute.
        create_or_update: (Store only) Indicates whether the method should
            automatically perform an update if the file conflicts with an existing
            Synapse object.
        force_version: (Store only) Indicates whether the method should increment the
            version of the object if something within the entity has changed. For
            example updating the description or name. You may set this to False and
            an update to the entity will not increment the version.

            Updating the `version_label` attribute will also cause a version update
            regardless of this flag.

            An update to the MD5 of the file will force a version update regardless
            of this flag.
        is_restricted: (Store only) If set to true, an email will be sent to the
            Synapse access control team to start the process of adding terms-of-use
            or review board approval for this entity. You will be contacted with
            regards to the specific data being restricted and the requirements of
            access.

            This may be used only by an administrator of the specified file.
        merge_existing_annotations: (Store only) Works in conjunction with
            `create_or_update` in that this is only evaluated if `create_or_update`
            is True. If this entity exists in Synapse that has annotations that are
            not present in a store operation, these annotations will be added to the
            entity. If this is False any annotations that are not present within a
            store operation will be removed from this entity. This allows one to
            complete a destructive update of annotations on an entity.
        associate_activity_to_new_version: (Store only) Works in conjunction with
            `create_or_update` in that this is only evaluated if `create_or_update`
            is True. When true an activity already attached to the current version of
            this entity will be associated the new version during a store operation
            if the version was updated. This is useful if you are updating the entity
            and want to ensure that the activity is persisted onto the new version
            the entity.

            When this is False the activity will not be associated to the new version
            of the entity during a store operation.

            Regardless of this setting, if you have an Activity object on the entity
            it will be persisted onto the new version. This is only used when you
            don't have an Activity object on the entity.
        synapse_store: (Store only) Whether the File should be uploaded or if false:
            only the path should be stored when [synapseclient.models.File.store][]
            is called.
        download_file: (Get only) If True the file will be downloaded.
        if_collision: (Get only) Determines how to handle file collisions. Defaults
            to "keep.both". May be:

            - `overwrite.local`
            - `keep.local`
            - `keep.both`
        synapse_container_limit: (Get only) A Synanpse ID used to limit the search in
            Synapse if file is specified as a local file. That is, if the file is
            stored in multiple locations in Synapse only the ones in the specified
            folder/project will be returned.
        etag: (Read Only) Synapse employs an Optimistic Concurrency Control (OCC)
            scheme to handle concurrent updates. Since the E-Tag changes every time
            an entity is updated it is used to detect when a client's current
            representation of an entity is out-of-date.
        created_on: (Read Only) The date this entity was created.
        modified_on: (Read Only) The date this entity was last modified.
        created_by: (Read Only) The ID of the user that created this entity.
        modified_by: (Read Only) The ID of the user that last modified this entity.
        version_number: (Read Only) The version number issued to this version on the
            object.
        is_latest_version: (Read Only) If this is the latest version of the object.
        file_handle: (Read Only) The file handle associated with this entity.
    """

    id: Optional[str] = None
    """The unique immutable ID for this file. A new ID will be generated for new Files.
    Once issued, this ID is guaranteed to never change or be re-issued."""

    name: Optional[str] = None
    """
    The name of this entity. Must be 256 characters or less.
    Names may only contain: letters, numbers, spaces, underscores, hyphens, periods,
    plus signs, apostrophes, and parentheses. If not specified, the name will be
    derived from the file name.
    """

    path: Optional[str] = field(default=None, compare=False)
    """The path to the file on disk. Using shorthand `~` will be expanded to the user's
    home directory.

    This is used during a `get` operation to specify where to download the file to. It
    should be pointing to a directory.

    This is also used during a `store` operation to specify the file to upload. It
    should be pointing to a file."""

    description: Optional[str] = None
    """The description of this file. Must be 1000 characters or less."""

    parent_id: Optional[str] = None
    """The ID of the Entity that is the parent of this Entity. Setting this to a new
    value and storing it will move this File under the new parent."""

    version_label: Optional[str] = None
    """The version label for this entity. Updates to the entity will increment the
    version number."""

    version_comment: Optional[str] = None
    """The version comment for this entity."""

    data_file_handle_id: Optional[str] = None
    """
    ID of the file handle associated with this entity. You may define an existing
    data_file_handle_id to use the existing data_file_handle_id. The creator of the
    file must also be the owner of the data_file_handle_id to have permission to
    store the file.
    """

    activity: Optional[Activity] = field(default=None, compare=False)
    """The Activity model represents the main record of Provenance in Synapse.  It is
    analygous to the Activity defined in the
    [W3C Specification](https://www.w3.org/TR/prov-n/) on Provenance. Activity cannot
    be removed during a store operation by setting it to None. You must use:
    [synapseclient.models.Activity.delete_async][] or
    [synapseclient.models.Activity.disassociate_from_entity_async][].
    """

    annotations: Optional[
        Dict[
            str,
            Union[
                List[str],
                List[bool],
                List[float],
                List[int],
                List[date],
                List[datetime],
            ],
        ]
    ] = field(default_factory=dict, compare=False)
    """Additional metadata associated with the folder. The key is the name of your
    desired annotations. The value is an object containing a list of values
    (use empty list to represent no values for key) and the value type associated with
    all values in the list. To remove all annotations set this to an empty dict `{}`."""

    upsert_keys: Optional[List[str]] = field(default_factory=list)
    """One or more column names that define this upsert key for this set. This key is
    used to determine if a new record should be treated as an update or an insert.
    """

    csv_descriptor: Optional["CsvTableDescriptor"] = field(default=None)
    """The description of a CSV for upload or download."""

    validation_summary: Optional[ValidationSummary] = field(default=None, compare=False)
    """Summary statistics for the JSON schema validation results for the children of
    an Entity container (Project or Folder)"""

    file_name_override: Optional[str] = None
    """An optional replacement for the name of the uploaded file. This is distinct from
    the entity name. If omitted the file will retain its original name.
    """

    content_type: Optional[str] = None
    """
    (New Upload Only)
    Used to manually specify Content-type header, for example 'application/png'
    or 'application/json; charset=UTF-8'. If not specified, the content type will be
    derived from the file extension.

    This can be specified only during the initial store of this file. In order to change
    this after the File has been created use
    [synapseclient.models.File.change_metadata][].
    """

    content_size: Optional[int] = None
    """
    (New Upload Only)
    The size of the file in bytes. This can be specified only during the initial
    creation of the File. This is also only applicable to files not uploaded to Synapse.
    ie: `synapse_store` is False.
    """

    content_md5: Optional[str] = field(default=None, compare=False)
    """
    (Store only)
    The MD5 of the file is known. If not supplied this will be computed in the client
    is possible. If supplied for a file entity already stored in Synapse it will be
    calculated again to check if a new upload needs to occur. This will not be filled
    in during a read for data. It is only used during a store operation. To retrieve
    the md5 of the file after read from synapse use the `.file_handle.content_md5`
    attribute.
    """

    create_or_update: bool = field(default=True, repr=False, compare=False)
    """
    (Store only)

    Indicates whether the method should automatically perform an update if the file
    conflicts with an existing Synapse object.
    """

    force_version: bool = field(default=True, repr=False, compare=False)
    """
    (Store only)

    Indicates whether the method should increment the version of the object if something
    within the entity has changed. For example updating the description or name.
    You may set this to False and an update to the entity will not increment the
    version.

    Updating the `version_label` attribute will also cause a version update regardless
    of this flag.

    An update to the MD5 of the file will force a version update regardless of this
    flag.
    """

    is_restricted: bool = field(default=False, repr=False)
    """
    (Store only)

    If set to true, an email will be sent to the Synapse access control team to start
    the process of adding terms-of-use or review board approval for this entity.
    You will be contacted with regards to the specific data being restricted and the
    requirements of access.

    This may be used only by an administrator of the specified file.
    """

    merge_existing_annotations: bool = field(default=True, repr=False, compare=False)
    """
    (Store only)

    Works in conjunction with `create_or_update` in that this is only evaluated if
    `create_or_update` is True. If this entity exists in Synapse that has annotations
    that are not present in a store operation, these annotations will be added to the
    entity. If this is False any annotations that are not present within a store
    operation will be removed from this entity. This allows one to complete a
    destructive update of annotations on an entity.
    """

    associate_activity_to_new_version: bool = field(
        default=False, repr=False, compare=False
    )
    """
    (Store only)

    Works in conjunction with `create_or_update` in that this is only evaluated if
    `create_or_update` is True. When true an activity already attached to the current
    version of this entity will be associated the new version during a store operation
    if the version was updated. This is useful if you are updating the entity and want
    to ensure that the activity is persisted onto the new version the entity.

    When this is False the activity will not be associated to the new version of the
    entity during a store operation.

    Regardless of this setting, if you have an Activity object on the entity it will be
    persisted onto the new version. This is only used when you don't have an Activity
    object on the entity.
    """

    synapse_store: bool = field(default=True, repr=False)
    """
    (Store only)

    Whether the File should be uploaded or if false: only the path should be stored when
    [synapseclient.models.File.store][] is called.
    """

    download_file: bool = field(default=True, repr=False, compare=False)
    """
    (Get only)

    If True the file will be downloaded."""

    if_collision: str = field(default="keep.both", repr=False, compare=False)
    """
    (Get only)

    Determines how to handle file collisions. Defaults to "keep.both".
            May be

            - `overwrite.local`
            - `keep.local`
            - `keep.both`
    """

    synapse_container_limit: Optional[str] = field(
        default=None, repr=False, compare=False
    )
    """A Synanpse ID used to limit the search in Synapse if file is specified as a local
    file. That is, if the file is stored in multiple locations in Synapse only the
    ones in the specified folder/project will be returned."""

    etag: Optional[str] = field(default=None, compare=False)
    """
    (Read Only)
    Synapse employs an Optimistic Concurrency Control (OCC) scheme to handle
    concurrent updates. Since the E-Tag changes every time an entity is updated it is
    used to detect when a client's current representation of an entity is out-of-date.
    """

    created_on: Optional[str] = field(default=None, compare=False)
    """(Read Only) The date this entity was created."""

    modified_on: Optional[str] = field(default=None, compare=False)
    """(Read Only) The date this entity was last modified."""

    created_by: Optional[str] = field(default=None, compare=False)
    """(Read Only) The ID of the user that created this entity."""

    modified_by: Optional[str] = field(default=None, compare=False)
    """(Read Only) The ID of the user that last modified this entity."""

    version_number: Optional[int] = field(default=None, compare=False)
    """(Read Only) The version number issued to this version on the object."""

    is_latest_version: Optional[bool] = field(default=None, compare=False)
    """(Read Only) If this is the latest version of the object."""

    file_handle: Optional["FileHandle"] = field(default=None, compare=False)
    """(Read Only) The file handle associated with this entity."""

    _last_persistent_instance: Optional["RecordSet"] = field(
        default=None, repr=False, compare=False
    )
    """The last persistent instance of this object. This is used to determine if the
    object has been changed and needs to be updated in Synapse."""

    @property
    def has_changed(self) -> bool:
        """
        Determines if the object has been changed and needs to be updated in Synapse."""
        return (
            not self._last_persistent_instance or self._last_persistent_instance != self
        )

    def _set_last_persistent_instance(self) -> None:
        """Stash the last time this object interacted with Synapse. This is used to
        determine if the object has been changed and needs to be updated in Synapse."""
        del self._last_persistent_instance
        self._last_persistent_instance = dataclasses.replace(self)
        self._last_persistent_instance.activity = (
            dataclasses.replace(self.activity) if self.activity else None
        )
        self._last_persistent_instance.annotations = (
            deepcopy(self.annotations) if self.annotations else {}
        )

    def _fill_from_file_handle(self) -> None:
        """Fill the file object from the file handle."""
        if self.file_handle:
            self.data_file_handle_id = self.file_handle.id
            self.content_type = self.file_handle.content_type
            self.content_size = self.file_handle.content_size

    def fill_from_dict(
        self,
        entity: Dict[str, Union[bool, str, int]],
        set_annotations: bool = True,
    ) -> "RecordSet":
        """
        Converts a response from the REST API into this dataclass.

        This method populates the RecordSet instance with data from a Synapse REST API
        response or a dictionary containing RecordSet information. It handles the
        conversion from Synapse API field names (camelCase) to Python attribute names
        (snake_case) and processes nested objects appropriately.

        Arguments:
            synapse_file: The response from the REST API or a dictionary containing
                RecordSet data. Can be either a Synapse_File object or a dictionary
                with string keys and various value types.
            set_annotations: Whether to set the annotations from the response.
                If True, annotations will be populated from the API response.

        Returns:
            The RecordSet object with updated attributes from the API response.
        """
        self.id = entity.get("id", None)
        self.name = entity.get("name", None)
        self.description = entity.get("description", None)
        self.etag = entity.get("etag", None)
        self.created_on = entity.get("createdOn", None)
        self.modified_on = entity.get("modifiedOn", None)
        self.created_by = entity.get("createdBy", None)
        self.modified_by = entity.get("modifiedBy", None)
        self.parent_id = entity.get("parentId", None)
        self.version_number = entity.get("versionNumber", None)
        self.version_label = entity.get("versionLabel", None)
        self.version_comment = entity.get("versionComment", None)
        self.is_latest_version = entity.get("isLatestVersion", False)
        self.data_file_handle_id = entity.get("dataFileHandleId", None)
        self.path = entity.get("path", self.path)
        self.file_name_override = entity.get("fileNameOverride", None)
        csv_descriptor = entity.get("csvDescriptor", None)
        if csv_descriptor:
            from synapseclient.models import CsvTableDescriptor

            self.csv_descriptor = CsvTableDescriptor().fill_from_dict(csv_descriptor)

        validation_summary = entity.get("validationSummary", None)

        if validation_summary:
            self.validation_summary = ValidationSummary(
                container_id=validation_summary.get("containerId", None),
                total_number_of_children=validation_summary.get(
                    "totalNumberOfChildren", None
                ),
                number_of_valid_children=validation_summary.get(
                    "numberOfValidChildren", None
                ),
                number_of_invalid_children=validation_summary.get(
                    "numberOfInvalidChildren", None
                ),
                number_of_unknown_children=validation_summary.get(
                    "numberOfUnknownChildren", None
                ),
                generated_on=validation_summary.get("generatedOn", None),
            )

        self.upsert_keys = entity.get("upsertKey", [])

        synapse_file_handle = entity.get("_file_handle", None)
        if synapse_file_handle:
            from synapseclient.models import FileHandle

            file_handle = self.file_handle or FileHandle()
            self.file_handle = file_handle.fill_from_dict(
                synapse_instance=synapse_file_handle
            )
            self._fill_from_file_handle()

        if set_annotations:
            self.annotations = Annotations.from_dict(entity.get("annotations", {}))
        return self

    def _cannot_store(self) -> bool:
        """
        Determines based on guard conditions if a store operation can proceed.
        """
        return (
            not (
                self.id is not None
                and (self.path is not None or self.data_file_handle_id is not None)
            )
            and not (self.path is not None and self.parent_id is not None)
            and not (
                self.parent_id is not None and self.data_file_handle_id is not None
            )
        )

    async def _load_local_md5(self) -> None:
        """
        Load the MD5 hash of a local file if it exists and hasn't been loaded yet.

        This method computes and sets the content_md5 attribute for local files
        that exist on disk. It only performs the calculation if the content_md5
        is not already set and the path points to an existing file.
        """
        if not self.content_md5 and self.path and os.path.isfile(self.path):
            self.content_md5 = utils.md5_for_file_hex(filename=self.path)

    async def _find_existing_entity(
        self, *, synapse_client: Optional[Synapse] = None
    ) -> Union["RecordSet", None]:
        """
        Determines if the RecordSet already exists in Synapse.

        This method searches for an existing RecordSet in Synapse that matches the
        current instance. If found, it returns the existing RecordSet object, otherwise
        it returns None. This is used to determine if the RecordSet should be updated
        or created during a store operation.

        Arguments:
            synapse_client: If not passed in and caching was not disabled, this will
                use the last created instance from the Synapse class constructor.

        Returns:
            The existing RecordSet object if it exists in Synapse, None otherwise.
        """

        async def get_entity(existing_id: str) -> "RecordSet":
            """Small wrapper to retrieve a file instance without raising an error if it
            does not exist.

            Arguments:
                existing_id: The ID of the file to retrieve.

            Returns:
                The file object if it exists, otherwise None.
            """
            try:
                entity_copy = RecordSet(
                    id=existing_id,
                    download_file=False,
                    version_number=self.version_number,
                    synapse_container_limit=self.synapse_container_limit,
                    parent_id=self.parent_id,
                )
                return await entity_copy.get_async(
                    synapse_client=synapse_client,
                    include_activity=self.activity is not None
                    or self.associate_activity_to_new_version,
                )
            except SynapseFileNotFoundError:
                return None

        if (
            self.create_or_update
            and not self._last_persistent_instance
            and (
                existing_entity_id := await get_id(
                    entity=self,
                    failure_strategy=None,
                    synapse_client=synapse_client,
                )
            )
            and (existing_file := await get_entity(existing_entity_id))
        ):
            return existing_file
        return None

    def _determine_fields_to_ignore_in_merge(self) -> List[str]:
        """
        Determine which fields should not be merged when merging two entities.

        This method returns a list of field names that should be ignored during
        entity merging operations. This allows for fine-tuned destructive updates
        of an entity based on the current configuration settings.

        The method has special handling for manifest uploads where specific fields
        are provided in the manifest and should take precedence over existing
        entity values.

        Returns:
            A list of field names that should not be merged from the existing entity.
        """
        fields_to_not_merge = []
        if not self.merge_existing_annotations:
            fields_to_not_merge.append("annotations")

        if not self.associate_activity_to_new_version:
            fields_to_not_merge.append("activity")

        return fields_to_not_merge

    def to_synapse_request(self) -> Dict[str, Any]:
        """
        Converts this dataclass to a dictionary suitable for a Synapse REST API request.

        This method transforms the RecordSet object into a dictionary format that
        matches the structure expected by the Synapse REST API. It handles the
        conversion of Python snake_case attribute names to the camelCase format
        used by the API, and ensures that nested objects are properly serialized.

        Returns:
            A dictionary representation of this object formatted for API requests.
            None values are automatically removed from the dictionary.

        Example: Converting a RecordSet for API submission
            This method is used internally when storing or updating RecordSets:

            ```python
            from synapseclient.models import RecordSet

            record_set = RecordSet(
                name="My RecordSet",
                description="A test record set",
                parent_id="syn123456"
            )
            api_dict = record_set.to_synapse_request()
            # api_dict contains properly formatted data for the REST API
            ```
        """

        entity = {
            "concreteType": concrete_types.RECORD_SET_ENTITY,
            "name": self.name,
            "description": self.description,
            "id": self.id,
            "etag": self.etag,
            "createdOn": self.created_on,
            "modifiedOn": self.modified_on,
            "createdBy": self.created_by,
            "modifiedBy": self.modified_by,
            "parentId": self.parent_id,
            "versionNumber": self.version_number,
            "versionLabel": self.version_label,
            "versionComment": self.version_comment,
            "isLatestVersion": self.is_latest_version,
            "dataFileHandleId": self.data_file_handle_id,
            "upsertKey": self.upsert_keys,
            "csvDescriptor": self.csv_descriptor.to_synapse_request()
            if self.csv_descriptor
            else None,
            "validationSummary": {
                "containerId": self.validation_summary.container_id,
                "totalNumberOfChildren": self.validation_summary.total_number_of_children,
                "numberOfValidChildren": self.validation_summary.number_of_valid_children,
                "numberOfInvalidChildren": self.validation_summary.number_of_invalid_children,
                "numberOfUnknownChildren": self.validation_summary.number_of_unknown_children,
                "generatedOn": self.validation_summary.generated_on,
            }
            if self.validation_summary
            else None,
            "fileNameOverride": self.file_name_override,
        }
        delete_none_keys(entity)

        return entity

    async def store_async(
        self,
        parent: Optional[Union["Folder", "Project"]] = None,
        *,
        synapse_client: Optional[Synapse] = None,
    ) -> "RecordSet":
        """
        Store the RecordSet in Synapse.

        This method uploads or updates a RecordSet in Synapse. It can handle both
        creating new RecordSets and updating existing ones based on the
        `create_or_update` flag. The method supports file uploads, metadata updates,
        and merging with existing entities when appropriate.

        Arguments:
            parent: The parent Folder or Project for this RecordSet. If provided,
                this will override the `parent_id` attribute.
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)`, this will use the last created
                instance from the Synapse class constructor.

        Returns:
            The RecordSet object with updated metadata from Synapse after the
            store operation.

        Raises:
            ValueError: If the RecordSet does not have the required information
                for storing. Must have either: (ID with path or data_file_handle_id),
                or (path with parent_id), or (data_file_handle_id with parent_id).

        Example: Storing a new RecordSet
            Creating and storing a new RecordSet in Synapse:

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import RecordSet

            async def main():
                syn = Synapse()
                syn.login()

                record_set = RecordSet(
                    name="My RecordSet",
                    description="A dataset for analysis",
                    parent_id="syn123456",
                    path="/path/to/data.csv"
                )
                stored_record_set = await record_set.store_async()
                print(f"Stored RecordSet with ID: {stored_record_set.id}")

            asyncio.run(main())
            ```

            Updating an existing RecordSet:
            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import RecordSet

            async def main():
                syn = Synapse()
                syn.login()

                record_set = await RecordSet(id="syn789012").get_async()
                record_set.description = "Updated description"
                updated_record_set = await record_set.store_async()

            asyncio.run(main())
            ```
        """
        self.parent_id = parent.id if parent else self.parent_id
        if self._cannot_store():
            raise ValueError(
                "The file must have an (ID with a (path or `data_file_handle_id`)), or a "
                "(path with a (`parent_id` or parent with an id)), or a "
                "(data_file_handle_id with a (`parent_id` or parent with an id)) to store."
            )
        self.name = self.name or (guess_file_name(self.path) if self.path else None)
        client = Synapse.get_client(synapse_client=synapse_client)

        if existing_file := await self._find_existing_entity(synapse_client=client):
            merge_dataclass_entities(
                source=existing_file,
                destination=self,
                fields_to_ignore=self._determine_fields_to_ignore_in_merge(),
            )

        if self.id:
            trace.get_current_span().set_attributes(
                {
                    "synapse.id": self.id,
                }
            )

        if self.path:
            self.path = os.path.expanduser(self.path)
            async with client._get_parallel_file_transfer_semaphore(
                asyncio_event_loop=asyncio.get_running_loop()
            ):
                from synapseclient.models.file import _upload_file

                await _upload_file(entity_to_upload=self, synapse_client=client)
        elif self.data_file_handle_id:
            self.path = client.cache.get(file_handle_id=self.data_file_handle_id)

        if self.has_changed:
            entity = await store_entity(
                resource=self, entity=self.to_synapse_request(), synapse_client=client
            )

            self.fill_from_dict(entity=entity, set_annotations=False)

        re_read_required = await store_entity_components(
            root_resource=self, synapse_client=client
        )
        if re_read_required:
            before_download_file = self.download_file
            self.download_file = False
            await self.get_async(
                synapse_client=client,
            )
            self.download_file = before_download_file

        self._set_last_persistent_instance()

        client.logger.debug(f"Stored File {self.name}, id: {self.id}: {self.path}")
        # Clear the content_md5 so that it is recalculated if the file is updated
        self.content_md5 = None
        return self

    async def get_async(
        self,
        include_activity: bool = False,
        *,
        synapse_client: Optional[Synapse] = None,
    ) -> "RecordSet":
        """
        Get the RecordSet from Synapse.

        This method retrieves a RecordSet entity from Synapse. You may retrieve
        a RecordSet by either its ID or path. If you specify both, the ID will
        take precedence.

        If you specify the path and the RecordSet is stored in multiple locations
        in Synapse, only the first one found will be returned. The other matching
        RecordSets will be printed to the console.

        You may also specify a `version_number` to get a specific version of the
        RecordSet.

        Arguments:
            include_activity: If True, the activity will be included in the RecordSet
                if it exists.
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)`, this will use the last created
                instance from the Synapse class constructor.

        Returns:
            The RecordSet object with data populated from Synapse.

        Raises:
            ValueError: If the RecordSet does not have an ID or path to retrieve.

        Example: Retrieving a RecordSet by ID
            Get an existing RecordSet from Synapse:

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import RecordSet

            async def main():
                syn = Synapse()
                syn.login()

                record_set = await RecordSet(id="syn123").get_async()
                print(f"RecordSet name: {record_set.name}")

            asyncio.run(main())
            ```

            Downloading a RecordSet to a specific directory:
            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import RecordSet

            async def main():
                syn = Synapse()
                syn.login()

                record_set = await RecordSet(
                    id="syn123",
                    path="/path/to/download/directory"
                ).get_async()

            asyncio.run(main())
            ```

            Including activity information:
            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import RecordSet

            async def main():
                syn = Synapse()
                syn.login()

                record_set = await RecordSet(id="syn123").get_async(include_activity=True)
                if record_set.activity:
                    print(f"Activity: {record_set.activity.name}")

            asyncio.run(main())
            ```
        """
        if not self.id and not self.path:
            raise ValueError("The file must have an ID or path to get.")
        syn = Synapse.get_client(synapse_client=synapse_client)

        await self._load_local_md5()

        await get_from_entity_factory(
            entity_to_update=self,
            synapse_id_or_path=self.id or self.path,
            version=self.version_number,
            if_collision=self.if_collision,
            limit_search=self.synapse_container_limit or self.parent_id,
            download_file=self.download_file,
            download_location=os.path.dirname(self.path)
            if self.path and os.path.isfile(self.path)
            else self.path,
            md5=self.content_md5,
            synapse_client=syn,
        )

        if (
            self.data_file_handle_id
            and (not self.path or (self.path and not os.path.isfile(self.path)))
            and (cached_path := syn.cache.get(file_handle_id=self.data_file_handle_id))
        ):
            self.path = cached_path

        if include_activity:
            self.activity = await Activity.from_parent_async(
                parent=self, synapse_client=synapse_client
            )

        self._set_last_persistent_instance()
        Synapse.get_client(synapse_client=synapse_client).logger.debug(
            f"Got file {self.name}, id: {self.id}, path: {self.path}"
        )
        return self

    async def delete_async(
        self,
        version_only: Optional[bool] = False,
        *,
        synapse_client: Optional[Synapse] = None,
    ) -> None:
        """
        Delete the RecordSet from Synapse using its ID.

        This method removes a RecordSet entity from Synapse. You can choose to
        delete either a specific version or the entire RecordSet including all
        its versions.

        Arguments:
            version_only: If True, only the version specified in the `version_number`
                attribute of the RecordSet will be deleted. If False, the entire
                RecordSet including all versions will be deleted.
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)`, this will use the last created
                instance from the Synapse class constructor.

        Returns:
            None

        Raises:
            ValueError: If the RecordSet does not have an ID to delete.
            ValueError: If the RecordSet does not have a version number to delete a
                specific version, and `version_only` is True.

        Example: Deleting a RecordSet
            Delete an entire RecordSet and all its versions:

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import RecordSet

            async def main():
                syn = Synapse()
                syn.login()

                await RecordSet(id="syn123").delete_async()

            asyncio.run(main())
            ```

            Delete only a specific version:
            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import RecordSet

            async def main():
                syn = Synapse()
                syn.login()

                record_set = RecordSet(id="syn123", version_number=2)
                await record_set.delete_async(version_only=True)

            asyncio.run(main())
            ```
        """
        if not self.id:
            raise ValueError("The file must have an ID to delete.")
        if version_only and not self.version_number:
            raise ValueError("The file must have a version number to delete a version.")

        loop = asyncio.get_event_loop()
        await loop.run_in_executor(
            None,
            lambda: Synapse.get_client(synapse_client=synapse_client).delete(
                obj=self.id,
                version=self.version_number if version_only else None,
            ),
        )
        Synapse.get_client(synapse_client=synapse_client).logger.debug(
            f"Deleted file {self.id}"
        )

Functions

get_async async

get_async(include_activity: bool = False, *, synapse_client: Optional[Synapse] = None) -> RecordSet

Get the RecordSet from Synapse.

This method retrieves a RecordSet entity from Synapse. You may retrieve a RecordSet by either its ID or path. If you specify both, the ID will take precedence.

If you specify the path and the RecordSet is stored in multiple locations in Synapse, only the first one found will be returned. The other matching RecordSets will be printed to the console.

You may also specify a version_number to get a specific version of the RecordSet.

PARAMETER DESCRIPTION
include_activity

If True, the activity will be included in the RecordSet if it exists.

TYPE: bool DEFAULT: False

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False), this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
RecordSet

The RecordSet object with data populated from Synapse.

RAISES DESCRIPTION
ValueError

If the RecordSet does not have an ID or path to retrieve.

Retrieving a RecordSet by ID

Get an existing RecordSet from Synapse:

import asyncio
from synapseclient import Synapse
from synapseclient.models import RecordSet

async def main():
    syn = Synapse()
    syn.login()

    record_set = await RecordSet(id="syn123").get_async()
    print(f"RecordSet name: {record_set.name}")

asyncio.run(main())

Downloading a RecordSet to a specific directory:

import asyncio
from synapseclient import Synapse
from synapseclient.models import RecordSet

async def main():
    syn = Synapse()
    syn.login()

    record_set = await RecordSet(
        id="syn123",
        path="/path/to/download/directory"
    ).get_async()

asyncio.run(main())

Including activity information:

import asyncio
from synapseclient import Synapse
from synapseclient.models import RecordSet

async def main():
    syn = Synapse()
    syn.login()

    record_set = await RecordSet(id="syn123").get_async(include_activity=True)
    if record_set.activity:
        print(f"Activity: {record_set.activity.name}")

asyncio.run(main())

Source code in synapseclient/models/recordset.py
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
async def get_async(
    self,
    include_activity: bool = False,
    *,
    synapse_client: Optional[Synapse] = None,
) -> "RecordSet":
    """
    Get the RecordSet from Synapse.

    This method retrieves a RecordSet entity from Synapse. You may retrieve
    a RecordSet by either its ID or path. If you specify both, the ID will
    take precedence.

    If you specify the path and the RecordSet is stored in multiple locations
    in Synapse, only the first one found will be returned. The other matching
    RecordSets will be printed to the console.

    You may also specify a `version_number` to get a specific version of the
    RecordSet.

    Arguments:
        include_activity: If True, the activity will be included in the RecordSet
            if it exists.
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)`, this will use the last created
            instance from the Synapse class constructor.

    Returns:
        The RecordSet object with data populated from Synapse.

    Raises:
        ValueError: If the RecordSet does not have an ID or path to retrieve.

    Example: Retrieving a RecordSet by ID
        Get an existing RecordSet from Synapse:

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import RecordSet

        async def main():
            syn = Synapse()
            syn.login()

            record_set = await RecordSet(id="syn123").get_async()
            print(f"RecordSet name: {record_set.name}")

        asyncio.run(main())
        ```

        Downloading a RecordSet to a specific directory:
        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import RecordSet

        async def main():
            syn = Synapse()
            syn.login()

            record_set = await RecordSet(
                id="syn123",
                path="/path/to/download/directory"
            ).get_async()

        asyncio.run(main())
        ```

        Including activity information:
        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import RecordSet

        async def main():
            syn = Synapse()
            syn.login()

            record_set = await RecordSet(id="syn123").get_async(include_activity=True)
            if record_set.activity:
                print(f"Activity: {record_set.activity.name}")

        asyncio.run(main())
        ```
    """
    if not self.id and not self.path:
        raise ValueError("The file must have an ID or path to get.")
    syn = Synapse.get_client(synapse_client=synapse_client)

    await self._load_local_md5()

    await get_from_entity_factory(
        entity_to_update=self,
        synapse_id_or_path=self.id or self.path,
        version=self.version_number,
        if_collision=self.if_collision,
        limit_search=self.synapse_container_limit or self.parent_id,
        download_file=self.download_file,
        download_location=os.path.dirname(self.path)
        if self.path and os.path.isfile(self.path)
        else self.path,
        md5=self.content_md5,
        synapse_client=syn,
    )

    if (
        self.data_file_handle_id
        and (not self.path or (self.path and not os.path.isfile(self.path)))
        and (cached_path := syn.cache.get(file_handle_id=self.data_file_handle_id))
    ):
        self.path = cached_path

    if include_activity:
        self.activity = await Activity.from_parent_async(
            parent=self, synapse_client=synapse_client
        )

    self._set_last_persistent_instance()
    Synapse.get_client(synapse_client=synapse_client).logger.debug(
        f"Got file {self.name}, id: {self.id}, path: {self.path}"
    )
    return self

store_async async

store_async(parent: Optional[Union[Folder, Project]] = None, *, synapse_client: Optional[Synapse] = None) -> RecordSet

Store the RecordSet in Synapse.

This method uploads or updates a RecordSet in Synapse. It can handle both creating new RecordSets and updating existing ones based on the create_or_update flag. The method supports file uploads, metadata updates, and merging with existing entities when appropriate.

PARAMETER DESCRIPTION
parent

The parent Folder or Project for this RecordSet. If provided, this will override the parent_id attribute.

TYPE: Optional[Union[Folder, Project]] DEFAULT: None

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False), this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
RecordSet

The RecordSet object with updated metadata from Synapse after the

RecordSet

store operation.

RAISES DESCRIPTION
ValueError

If the RecordSet does not have the required information for storing. Must have either: (ID with path or data_file_handle_id), or (path with parent_id), or (data_file_handle_id with parent_id).

Storing a new RecordSet

Creating and storing a new RecordSet in Synapse:

import asyncio
from synapseclient import Synapse
from synapseclient.models import RecordSet

async def main():
    syn = Synapse()
    syn.login()

    record_set = RecordSet(
        name="My RecordSet",
        description="A dataset for analysis",
        parent_id="syn123456",
        path="/path/to/data.csv"
    )
    stored_record_set = await record_set.store_async()
    print(f"Stored RecordSet with ID: {stored_record_set.id}")

asyncio.run(main())

Updating an existing RecordSet:

import asyncio
from synapseclient import Synapse
from synapseclient.models import RecordSet

async def main():
    syn = Synapse()
    syn.login()

    record_set = await RecordSet(id="syn789012").get_async()
    record_set.description = "Updated description"
    updated_record_set = await record_set.store_async()

asyncio.run(main())

Source code in synapseclient/models/recordset.py
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
async def store_async(
    self,
    parent: Optional[Union["Folder", "Project"]] = None,
    *,
    synapse_client: Optional[Synapse] = None,
) -> "RecordSet":
    """
    Store the RecordSet in Synapse.

    This method uploads or updates a RecordSet in Synapse. It can handle both
    creating new RecordSets and updating existing ones based on the
    `create_or_update` flag. The method supports file uploads, metadata updates,
    and merging with existing entities when appropriate.

    Arguments:
        parent: The parent Folder or Project for this RecordSet. If provided,
            this will override the `parent_id` attribute.
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)`, this will use the last created
            instance from the Synapse class constructor.

    Returns:
        The RecordSet object with updated metadata from Synapse after the
        store operation.

    Raises:
        ValueError: If the RecordSet does not have the required information
            for storing. Must have either: (ID with path or data_file_handle_id),
            or (path with parent_id), or (data_file_handle_id with parent_id).

    Example: Storing a new RecordSet
        Creating and storing a new RecordSet in Synapse:

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import RecordSet

        async def main():
            syn = Synapse()
            syn.login()

            record_set = RecordSet(
                name="My RecordSet",
                description="A dataset for analysis",
                parent_id="syn123456",
                path="/path/to/data.csv"
            )
            stored_record_set = await record_set.store_async()
            print(f"Stored RecordSet with ID: {stored_record_set.id}")

        asyncio.run(main())
        ```

        Updating an existing RecordSet:
        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import RecordSet

        async def main():
            syn = Synapse()
            syn.login()

            record_set = await RecordSet(id="syn789012").get_async()
            record_set.description = "Updated description"
            updated_record_set = await record_set.store_async()

        asyncio.run(main())
        ```
    """
    self.parent_id = parent.id if parent else self.parent_id
    if self._cannot_store():
        raise ValueError(
            "The file must have an (ID with a (path or `data_file_handle_id`)), or a "
            "(path with a (`parent_id` or parent with an id)), or a "
            "(data_file_handle_id with a (`parent_id` or parent with an id)) to store."
        )
    self.name = self.name or (guess_file_name(self.path) if self.path else None)
    client = Synapse.get_client(synapse_client=synapse_client)

    if existing_file := await self._find_existing_entity(synapse_client=client):
        merge_dataclass_entities(
            source=existing_file,
            destination=self,
            fields_to_ignore=self._determine_fields_to_ignore_in_merge(),
        )

    if self.id:
        trace.get_current_span().set_attributes(
            {
                "synapse.id": self.id,
            }
        )

    if self.path:
        self.path = os.path.expanduser(self.path)
        async with client._get_parallel_file_transfer_semaphore(
            asyncio_event_loop=asyncio.get_running_loop()
        ):
            from synapseclient.models.file import _upload_file

            await _upload_file(entity_to_upload=self, synapse_client=client)
    elif self.data_file_handle_id:
        self.path = client.cache.get(file_handle_id=self.data_file_handle_id)

    if self.has_changed:
        entity = await store_entity(
            resource=self, entity=self.to_synapse_request(), synapse_client=client
        )

        self.fill_from_dict(entity=entity, set_annotations=False)

    re_read_required = await store_entity_components(
        root_resource=self, synapse_client=client
    )
    if re_read_required:
        before_download_file = self.download_file
        self.download_file = False
        await self.get_async(
            synapse_client=client,
        )
        self.download_file = before_download_file

    self._set_last_persistent_instance()

    client.logger.debug(f"Stored File {self.name}, id: {self.id}: {self.path}")
    # Clear the content_md5 so that it is recalculated if the file is updated
    self.content_md5 = None
    return self

delete_async async

delete_async(version_only: Optional[bool] = False, *, synapse_client: Optional[Synapse] = None) -> None

Delete the RecordSet from Synapse using its ID.

This method removes a RecordSet entity from Synapse. You can choose to delete either a specific version or the entire RecordSet including all its versions.

PARAMETER DESCRIPTION
version_only

If True, only the version specified in the version_number attribute of the RecordSet will be deleted. If False, the entire RecordSet including all versions will be deleted.

TYPE: Optional[bool] DEFAULT: False

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False), this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
ValueError

If the RecordSet does not have an ID to delete.

ValueError

If the RecordSet does not have a version number to delete a specific version, and version_only is True.

Deleting a RecordSet

Delete an entire RecordSet and all its versions:

import asyncio
from synapseclient import Synapse
from synapseclient.models import RecordSet

async def main():
    syn = Synapse()
    syn.login()

    await RecordSet(id="syn123").delete_async()

asyncio.run(main())

Delete only a specific version:

import asyncio
from synapseclient import Synapse
from synapseclient.models import RecordSet

async def main():
    syn = Synapse()
    syn.login()

    record_set = RecordSet(id="syn123", version_number=2)
    await record_set.delete_async(version_only=True)

asyncio.run(main())

Source code in synapseclient/models/recordset.py
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
1263
1264
1265
1266
1267
1268
1269
1270
1271
1272
1273
1274
1275
1276
1277
1278
1279
1280
1281
1282
1283
1284
1285
1286
1287
async def delete_async(
    self,
    version_only: Optional[bool] = False,
    *,
    synapse_client: Optional[Synapse] = None,
) -> None:
    """
    Delete the RecordSet from Synapse using its ID.

    This method removes a RecordSet entity from Synapse. You can choose to
    delete either a specific version or the entire RecordSet including all
    its versions.

    Arguments:
        version_only: If True, only the version specified in the `version_number`
            attribute of the RecordSet will be deleted. If False, the entire
            RecordSet including all versions will be deleted.
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)`, this will use the last created
            instance from the Synapse class constructor.

    Returns:
        None

    Raises:
        ValueError: If the RecordSet does not have an ID to delete.
        ValueError: If the RecordSet does not have a version number to delete a
            specific version, and `version_only` is True.

    Example: Deleting a RecordSet
        Delete an entire RecordSet and all its versions:

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import RecordSet

        async def main():
            syn = Synapse()
            syn.login()

            await RecordSet(id="syn123").delete_async()

        asyncio.run(main())
        ```

        Delete only a specific version:
        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import RecordSet

        async def main():
            syn = Synapse()
            syn.login()

            record_set = RecordSet(id="syn123", version_number=2)
            await record_set.delete_async(version_only=True)

        asyncio.run(main())
        ```
    """
    if not self.id:
        raise ValueError("The file must have an ID to delete.")
    if version_only and not self.version_number:
        raise ValueError("The file must have a version number to delete a version.")

    loop = asyncio.get_event_loop()
    await loop.run_in_executor(
        None,
        lambda: Synapse.get_client(synapse_client=synapse_client).delete(
            obj=self.id,
            version=self.version_number if version_only else None,
        ),
    )
    Synapse.get_client(synapse_client=synapse_client).logger.debug(
        f"Deleted file {self.id}"
    )

get_acl_async async

get_acl_async(principal_id: int = None, check_benefactor: bool = True, *, synapse_client: Optional[Synapse] = None) -> List[str]

Get the ACL that a user or group has on an Entity.

Note: If the entity does not have local sharing settings, or ACL set directly on it, this will look up the ACL on the benefactor of the entity. The benefactor is the entity that the current entity inherits its permissions from. The benefactor is usually the parent entity, but it can be any ancestor in the hierarchy. For example, a newly created Project will be its own benefactor, while a new FileEntity's benefactor will start off as its containing Project or Folder. If the entity already has local sharing settings, the benefactor would be itself.

PARAMETER DESCRIPTION
principal_id

Identifier of a user or group (defaults to PUBLIC users)

TYPE: int DEFAULT: None

check_benefactor

If True (default), check the benefactor for the entity to get the ACL. If False, only check the entity itself. This is useful for checking the ACL of an entity that has local sharing settings, but you want to check the ACL of the entity itself and not the benefactor it may inherit from.

TYPE: bool DEFAULT: True

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
List[str]

An array containing some combination of ['READ', 'UPDATE', 'CREATE', 'DELETE', 'DOWNLOAD', 'MODERATE', 'CHANGE_PERMISSIONS', 'CHANGE_SETTINGS'] or an empty array

Source code in synapseclient/models/mixins/access_control.py
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
async def get_acl_async(
    self,
    principal_id: int = None,
    check_benefactor: bool = True,
    *,
    synapse_client: Optional[Synapse] = None,
) -> List[str]:
    """
    Get the [ACL][synapseclient.core.models.permission.Permissions.access_types]
    that a user or group has on an Entity.

    Note: If the entity does not have local sharing settings, or ACL set directly
    on it, this will look up the ACL on the benefactor of the entity. The
    benefactor is the entity that the current entity inherits its permissions from.
    The benefactor is usually the parent entity, but it can be any ancestor in the
    hierarchy. For example, a newly created Project will be its own benefactor,
    while a new FileEntity's benefactor will start off as its containing Project or
    Folder. If the entity already has local sharing settings, the benefactor would
    be itself.

    Arguments:
        principal_id: Identifier of a user or group (defaults to PUBLIC users)
        check_benefactor: If True (default), check the benefactor for the entity
            to get the ACL. If False, only check the entity itself.
            This is useful for checking the ACL of an entity that has local sharing
            settings, but you want to check the ACL of the entity itself and not
            the benefactor it may inherit from.
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.

    Returns:
        An array containing some combination of
            ['READ', 'UPDATE', 'CREATE', 'DELETE', 'DOWNLOAD', 'MODERATE',
            'CHANGE_PERMISSIONS', 'CHANGE_SETTINGS']
            or an empty array
    """
    return await get_entity_acl_list(
        entity_id=self.id,
        principal_id=str(principal_id) if principal_id is not None else None,
        check_benefactor=check_benefactor,
        synapse_client=synapse_client,
    )

get_permissions_async async

get_permissions_async(*, synapse_client: Optional[Synapse] = None) -> Permissions

Get the permissions that the caller has on an Entity.

PARAMETER DESCRIPTION
synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
Permissions

A Permissions object

Using this function:

Getting permissions for a Synapse Entity

import asyncio
from synapseclient import Synapse
from synapseclient.models import File

syn = Synapse()
syn.login()

async def main():
    permissions = await File(id="syn123").get_permissions_async()

asyncio.run(main())

Getting access types list from the Permissions object

permissions.access_types
Source code in synapseclient/models/mixins/access_control.py
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
async def get_permissions_async(
    self,
    *,
    synapse_client: Optional[Synapse] = None,
) -> "Permissions":
    """
    Get the [permissions][synapseclient.core.models.permission.Permissions]
    that the caller has on an Entity.

    Arguments:
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.

    Returns:
        A Permissions object


    Example: Using this function:
        Getting permissions for a Synapse Entity

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import File

        syn = Synapse()
        syn.login()

        async def main():
            permissions = await File(id="syn123").get_permissions_async()

        asyncio.run(main())
        ```

        Getting access types list from the Permissions object

        ```
        permissions.access_types
        ```
    """
    from synapseclient.core.models.permission import Permissions

    permissions_dict = await get_entity_permissions(
        entity_id=self.id,
        synapse_client=synapse_client,
    )
    return Permissions.from_dict(data=permissions_dict)

set_permissions_async async

set_permissions_async(principal_id: int = None, access_type: List[str] = None, modify_benefactor: bool = False, warn_if_inherits: bool = True, overwrite: bool = True, *, synapse_client: Optional[Synapse] = None) -> Dict[str, Union[str, list]]

Sets permission that a user or group has on an Entity. An Entity may have its own ACL or inherit its ACL from a benefactor.

PARAMETER DESCRIPTION
principal_id

Identifier of a user or group. 273948 is for all registered Synapse users and 273949 is for public access. None implies public access.

TYPE: int DEFAULT: None

access_type

Type of permission to be granted. One or more of CREATE, READ, DOWNLOAD, UPDATE, DELETE, CHANGE_PERMISSIONS.

Defaults to ['READ', 'DOWNLOAD']

TYPE: List[str] DEFAULT: None

modify_benefactor

Set as True when modifying a benefactor's ACL. The term 'benefactor' is used to indicate which Entity an Entity inherits its ACL from. For example, a newly created Project will be its own benefactor, while a new FileEntity's benefactor will start off as its containing Project. If the entity already has local sharing settings the benefactor would be itself. It may also be the immediate parent, somewhere in the parent tree, or the project itself.

TYPE: bool DEFAULT: False

warn_if_inherits

When modify_benefactor is True, this does not have any effect. When modify_benefactor is False, and warn_if_inherits is True, a warning log message is produced if the benefactor for the entity you passed into the function is not itself, i.e., it's the parent folder, or another entity in the parent tree.

TYPE: bool DEFAULT: True

overwrite

By default this function overwrites existing permissions for the specified user. Set this flag to False to add new permissions non-destructively.

TYPE: bool DEFAULT: True

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
Dict[str, Union[str, list]]
Setting permissions

Grant all registered users download access

import asyncio
from synapseclient import Synapse
from synapseclient.models import File

syn = Synapse()
syn.login()

async def main():
    await File(id="syn123").set_permissions_async(principal_id=273948, access_type=['READ','DOWNLOAD'])

asyncio.run(main())

Grant the public view access

import asyncio
from synapseclient import Synapse
from synapseclient.models import File

syn = Synapse()
syn.login()

async def main():
    await File(id="syn123").set_permissions_async(principal_id=273949, access_type=['READ'])

asyncio.run(main())
Source code in synapseclient/models/mixins/access_control.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
async def set_permissions_async(
    self,
    principal_id: int = None,
    access_type: List[str] = None,
    modify_benefactor: bool = False,
    warn_if_inherits: bool = True,
    overwrite: bool = True,
    *,
    synapse_client: Optional[Synapse] = None,
) -> Dict[str, Union[str, list]]:
    """
    Sets permission that a user or group has on an Entity.
    An Entity may have its own ACL or inherit its ACL from a benefactor.

    Arguments:
        principal_id: Identifier of a user or group. `273948` is for all
            registered Synapse users and `273949` is for public access.
            None implies public access.
        access_type: Type of permission to be granted. One or more of CREATE,
            READ, DOWNLOAD, UPDATE, DELETE, CHANGE_PERMISSIONS.

            **Defaults to ['READ', 'DOWNLOAD']**
        modify_benefactor: Set as True when modifying a benefactor's ACL. The term
            'benefactor' is used to indicate which Entity an Entity inherits its
            ACL from. For example, a newly created Project will be its own
            benefactor, while a new FileEntity's benefactor will start off as its
            containing Project. If the entity already has local sharing settings
            the benefactor would be itself. It may also be the immediate parent,
            somewhere in the parent tree, or the project itself.
        warn_if_inherits: When `modify_benefactor` is True, this does not have any
            effect. When `modify_benefactor` is False, and `warn_if_inherits` is
            True, a warning log message is produced if the benefactor for the
            entity you passed into the function is not itself, i.e., it's the
            parent folder, or another entity in the parent tree.
        overwrite: By default this function overwrites existing permissions for
            the specified user. Set this flag to False to add new permissions
            non-destructively.
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.

    Returns:
        An Access Control List object matching <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/AccessControlList.html>.

    Example: Setting permissions
        Grant all registered users download access

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import File

        syn = Synapse()
        syn.login()

        async def main():
            await File(id="syn123").set_permissions_async(principal_id=273948, access_type=['READ','DOWNLOAD'])

        asyncio.run(main())
        ```

        Grant the public view access

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import File

        syn = Synapse()
        syn.login()

        async def main():
            await File(id="syn123").set_permissions_async(principal_id=273949, access_type=['READ'])

        asyncio.run(main())
        ```
    """
    if access_type is None:
        access_type = ["READ", "DOWNLOAD"]

    return await set_entity_permissions(
        entity_id=self.id,
        principal_id=str(principal_id) if principal_id is not None else None,
        access_type=access_type,
        modify_benefactor=modify_benefactor,
        warn_if_inherits=warn_if_inherits,
        overwrite=overwrite,
        synapse_client=synapse_client,
    )

delete_permissions_async async

delete_permissions_async(include_self: bool = True, include_container_content: bool = False, recursive: bool = False, target_entity_types: Optional[List[str]] = None, dry_run: bool = False, show_acl_details: bool = True, show_files_in_containers: bool = True, *, synapse_client: Optional[Synapse] = None, _benefactor_tracker: Optional[BenefactorTracker] = None) -> None

Delete the entire Access Control List (ACL) for a given Entity. This is not scoped to a specific user or group, but rather removes all permissions associated with the Entity. After this operation, the Entity will inherit permissions from its benefactor, which is typically its parent entity or the Project it belongs to.

In order to remove permissions for a specific user or group, you should use the set_permissions_async method with the access_type set to an empty list.

By default, Entities such as FileEntity and Folder inherit their permission from their containing Project. For such Entities the Project is the Entity's 'benefactor'. This permission inheritance can be overridden by creating an ACL for the Entity. When this occurs the Entity becomes its own benefactor and all permission are determined by its own ACL.

If the ACL of an Entity is deleted, then its benefactor will automatically be set to its parent's benefactor.

Special notice for Projects: The ACL for a Project cannot be deleted, you must individually update or revoke the permissions for each user or group.

PARAMETER DESCRIPTION
include_self

If True (default), delete the ACL of the current entity. If False, skip deleting the ACL of the current entity.

TYPE: bool DEFAULT: True

include_container_content

If True, delete ACLs from contents directly within containers (files and folders inside self). This must be set to True for recursive to have any effect. Defaults to False.

TYPE: bool DEFAULT: False

recursive

If True and the entity is a container (e.g., Project or Folder), recursively process child containers. Note that this must be used with include_container_content=True to have any effect. Setting recursive=True with include_container_content=False will raise a ValueError. Only works on classes that support the sync_from_synapse_async method.

TYPE: bool DEFAULT: False

target_entity_types

Specify which entity types to process when deleting ACLs. Allowed values are "folder", "file", "project", "table", "entityview", "materializedview", "virtualtable", "dataset", "datasetcollection", "submissionview" (case-insensitive). If None, defaults to ["folder", "file"]. This does not affect the entity type of the current entity, which is always processed if include_self=True.

TYPE: Optional[List[str]] DEFAULT: None

dry_run

If True, log the changes that would be made instead of actually performing the deletions. When enabled, all ACL deletion operations are simulated and logged at info level. Defaults to False.

TYPE: bool DEFAULT: False

show_acl_details

When dry_run=True, controls whether current ACL details are displayed for entities that will have their permissions changed. If True (default), shows detailed ACL information. If False, hides ACL details for cleaner output. Has no effect when dry_run=False.

TYPE: bool DEFAULT: True

show_files_in_containers

When dry_run=True, controls whether files within containers are displayed in the preview. If True (default), shows all files. If False, hides files when their only change is benefactor inheritance (but still shows files with local ACLs being deleted). Has no effect when dry_run=False.

TYPE: bool DEFAULT: True

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

_benefactor_tracker

Internal use tracker for managing benefactor relationships. Used for recursive functionality to track which entities will be affected

TYPE: Optional[BenefactorTracker] DEFAULT: None

RETURNS DESCRIPTION
None

None

RAISES DESCRIPTION
ValueError

If the entity does not have an ID or if an invalid entity type is provided.

SynapseHTTPError

If there are permission issues or if the entity already inherits permissions.

Exception

For any other errors that may occur during the process.

Note: The caller must be granted ACCESS_TYPE.CHANGE_PERMISSIONS on the Entity to call this method.

Delete permissions for a single entity
import asyncio
from synapseclient import Synapse
from synapseclient.models import File

syn = Synapse()
syn.login()

async def main():
    await File(id="syn123").delete_permissions_async()

asyncio.run(main())
Delete permissions recursively for a folder and all its children
import asyncio
from synapseclient import Synapse
from synapseclient.models import Folder

syn = Synapse()
syn.login()

async def main():
    # Delete permissions for this folder only (does not affect children)
    await Folder(id="syn123").delete_permissions_async()

    # Delete permissions for all files and folders directly within this folder,
    # but not the folder itself
    await Folder(id="syn123").delete_permissions_async(
        include_self=False,
        include_container_content=True
    )

    # Delete permissions for all items in the entire hierarchy (folders and their files)
    # Both recursive and include_container_content must be True
    await Folder(id="syn123").delete_permissions_async(
        recursive=True,
        include_container_content=True
    )

    # Delete permissions only for folder entities within this folder recursively
    # and their contents
    await Folder(id="syn123").delete_permissions_async(
        recursive=True,
        include_container_content=True,
        target_entity_types=["folder"]
    )

    # Delete permissions only for files within this folder and all subfolders
    await Folder(id="syn123").delete_permissions_async(
        include_self=False,
        recursive=True,
        include_container_content=True,
        target_entity_types=["file"]
    )

    # Delete permissions for specific entity types (e.g., tables and views)
    await Folder(id="syn123").delete_permissions_async(
        recursive=True,
        include_container_content=True,
        target_entity_types=["table", "entityview", "materializedview"]
    )

    # Dry run example: Log what would be deleted without making changes
    await Folder(id="syn123").delete_permissions_async(
        recursive=True,
        include_container_content=True,
        dry_run=True
    )
asyncio.run(main())
Source code in synapseclient/models/mixins/access_control.py
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
async def delete_permissions_async(
    self,
    include_self: bool = True,
    include_container_content: bool = False,
    recursive: bool = False,
    target_entity_types: Optional[List[str]] = None,
    dry_run: bool = False,
    show_acl_details: bool = True,
    show_files_in_containers: bool = True,
    *,
    synapse_client: Optional[Synapse] = None,
    _benefactor_tracker: Optional[BenefactorTracker] = None,
) -> None:
    """
    Delete the entire Access Control List (ACL) for a given Entity. This is not
    scoped to a specific user or group, but rather removes all permissions
    associated with the Entity. After this operation, the Entity will inherit
    permissions from its benefactor, which is typically its parent entity or
    the Project it belongs to.

    In order to remove permissions for a specific user or group, you
    should use the `set_permissions_async` method with the `access_type` set to
    an empty list.

    By default, Entities such as FileEntity and Folder inherit their permission from
    their containing Project. For such Entities the Project is the Entity's 'benefactor'.
    This permission inheritance can be overridden by creating an ACL for the Entity.
    When this occurs the Entity becomes its own benefactor and all permission are
    determined by its own ACL.

    If the ACL of an Entity is deleted, then its benefactor will automatically be set
    to its parent's benefactor.

    **Special notice for Projects:** The ACL for a Project cannot be deleted, you
    must individually update or revoke the permissions for each user or group.

    Arguments:
        include_self: If True (default), delete the ACL of the current entity.
            If False, skip deleting the ACL of the current entity.
        include_container_content: If True, delete ACLs from contents directly within
            containers (files and folders inside self). This must be set to
            True for recursive to have any effect. Defaults to False.
        recursive: If True and the entity is a container (e.g., Project or Folder),
            recursively process child containers. Note that this must be used with
            include_container_content=True to have any effect. Setting recursive=True
            with include_container_content=False will raise a ValueError.
            Only works on classes that support the `sync_from_synapse_async` method.
        target_entity_types: Specify which entity types to process when deleting ACLs.
            Allowed values are "folder", "file", "project", "table", "entityview",
            "materializedview", "virtualtable", "dataset", "datasetcollection",
            "submissionview" (case-insensitive). If None, defaults to ["folder", "file"].
            This does not affect the entity type of the current entity, which is always
            processed if `include_self=True`.
        dry_run: If True, log the changes that would be made instead of actually
            performing the deletions. When enabled, all ACL deletion operations are
            simulated and logged at info level. Defaults to False.
        show_acl_details: When dry_run=True, controls whether current ACL details are
            displayed for entities that will have their permissions changed. If True (default),
            shows detailed ACL information. If False, hides ACL details for cleaner output.
            Has no effect when dry_run=False.
        show_files_in_containers: When dry_run=True, controls whether files within containers
            are displayed in the preview. If True (default), shows all files. If False, hides
            files when their only change is benefactor inheritance (but still shows files with
            local ACLs being deleted). Has no effect when dry_run=False.
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.
        _benefactor_tracker: Internal use tracker for managing benefactor relationships.
            Used for recursive functionality to track which entities will be affected

    Returns:
        None

    Raises:
        ValueError: If the entity does not have an ID or if an invalid entity type is provided.
        SynapseHTTPError: If there are permission issues or if the entity already inherits permissions.
        Exception: For any other errors that may occur during the process.

    Note: The caller must be granted ACCESS_TYPE.CHANGE_PERMISSIONS on the Entity to
    call this method.

    Example: Delete permissions for a single entity
        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import File

        syn = Synapse()
        syn.login()

        async def main():
            await File(id="syn123").delete_permissions_async()

        asyncio.run(main())
        ```

    Example: Delete permissions recursively for a folder and all its children
        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import Folder

        syn = Synapse()
        syn.login()

        async def main():
            # Delete permissions for this folder only (does not affect children)
            await Folder(id="syn123").delete_permissions_async()

            # Delete permissions for all files and folders directly within this folder,
            # but not the folder itself
            await Folder(id="syn123").delete_permissions_async(
                include_self=False,
                include_container_content=True
            )

            # Delete permissions for all items in the entire hierarchy (folders and their files)
            # Both recursive and include_container_content must be True
            await Folder(id="syn123").delete_permissions_async(
                recursive=True,
                include_container_content=True
            )

            # Delete permissions only for folder entities within this folder recursively
            # and their contents
            await Folder(id="syn123").delete_permissions_async(
                recursive=True,
                include_container_content=True,
                target_entity_types=["folder"]
            )

            # Delete permissions only for files within this folder and all subfolders
            await Folder(id="syn123").delete_permissions_async(
                include_self=False,
                recursive=True,
                include_container_content=True,
                target_entity_types=["file"]
            )

            # Delete permissions for specific entity types (e.g., tables and views)
            await Folder(id="syn123").delete_permissions_async(
                recursive=True,
                include_container_content=True,
                target_entity_types=["table", "entityview", "materializedview"]
            )

            # Dry run example: Log what would be deleted without making changes
            await Folder(id="syn123").delete_permissions_async(
                recursive=True,
                include_container_content=True,
                dry_run=True
            )
        asyncio.run(main())
        ```
    """
    if not self.id:
        raise ValueError("The entity must have an ID to delete permissions.")

    client = Synapse.get_client(synapse_client=synapse_client)

    if include_self and self.__class__.__name__.lower() == "project":
        client.logger.warning(
            "The ACL for a Project cannot be deleted, you must individually update or "
            "revoke the permissions for each user or group. Continuing without deleting "
            "the Project's ACL."
        )
        include_self = False

    normalized_types = self._normalize_target_entity_types(target_entity_types)

    is_top_level = not _benefactor_tracker
    benefactor_tracker = _benefactor_tracker or BenefactorTracker()

    should_process_children = (recursive or include_container_content) and hasattr(
        self, "sync_from_synapse_async"
    )
    all_entities = [self] if include_self else []

    custom_message = "Deleting ACLs [Dry Run]..." if dry_run else "Deleting ACLs..."
    with shared_download_progress_bar(
        file_size=1, synapse_client=client, custom_message=custom_message, unit=None
    ) as progress_bar:
        if progress_bar:
            progress_bar.update(1)  # Initial setup complete

        if should_process_children:
            if recursive and not include_container_content:
                raise ValueError(
                    "When recursive=True, include_container_content must also be True. "
                    "Setting recursive=True with include_container_content=False has no effect."
                )

            if progress_bar:
                progress_bar.total += 1
                progress_bar.refresh()

            all_entities = await self._collect_entities(
                client=client,
                target_entity_types=normalized_types,
                include_container_content=include_container_content,
                recursive=recursive,
                progress_bar=progress_bar,
            )
            if progress_bar:
                progress_bar.update(1)

            entity_ids = [entity.id for entity in all_entities if entity.id]
            if entity_ids:
                if progress_bar:
                    progress_bar.total += 1
                    progress_bar.refresh()
                await benefactor_tracker.track_entity_benefactor(
                    entity_ids=entity_ids,
                    synapse_client=client,
                    progress_bar=progress_bar,
                )
            else:
                if progress_bar:
                    progress_bar.total += 1
                    progress_bar.refresh()
                    progress_bar.update(1)

        if is_top_level:
            if progress_bar:
                progress_bar.total += 1
                progress_bar.refresh()
            await self._build_and_log_run_tree(
                client=client,
                benefactor_tracker=benefactor_tracker,
                collected_entities=all_entities,
                include_self=include_self,
                show_acl_details=show_acl_details,
                show_files_in_containers=show_files_in_containers,
                progress_bar=progress_bar,
                dry_run=dry_run,
            )

        if dry_run:
            return

        if include_self:
            if progress_bar:
                progress_bar.total += 1
                progress_bar.refresh()
            await self._delete_current_entity_acl(
                client=client,
                benefactor_tracker=benefactor_tracker,
                progress_bar=progress_bar,
            )

        if should_process_children:
            if include_container_content:
                if progress_bar:
                    progress_bar.total += 1
                    progress_bar.refresh()
                await self._process_container_contents(
                    client=client,
                    target_entity_types=normalized_types,
                    benefactor_tracker=benefactor_tracker,
                    progress_bar=progress_bar,
                    recursive=recursive,
                    include_container_content=include_container_content,
                )
                if progress_bar:
                    progress_bar.update(1)  # Process container contents complete

list_acl_async async

list_acl_async(recursive: bool = False, include_container_content: bool = False, target_entity_types: Optional[List[str]] = None, log_tree: bool = False, *, synapse_client: Optional[Synapse] = None, _progress_bar: Optional[tqdm] = None) -> AclListResult

List the Access Control Lists (ACLs) for this entity and optionally its children.

This function returns the local sharing settings for the entity and optionally its children. It provides a mapping of all ACLs for the given container/entity.

Important Note: This function returns the LOCAL sharing settings only, not the effective permissions that each Synapse User ID/Team has on the entities. More permissive permissions could be granted via a Team that the user has access to that has permissions on the entity, or through inheritance from parent entities.

PARAMETER DESCRIPTION
recursive

If True and the entity is a container (e.g., Project or Folder), recursively process child containers. Note that this must be used with include_container_content=True to have any effect. Setting recursive=True with include_container_content=False will raise a ValueError. Only works on classes that support the sync_from_synapse_async method.

TYPE: bool DEFAULT: False

include_container_content

If True, include ACLs from contents directly within containers (files and folders inside self). This must be set to True for recursive to have any effect. Defaults to False.

TYPE: bool DEFAULT: False

target_entity_types

Specify which entity types to process when listing ACLs. Allowed values are "folder", "file", "project", "table", "entityview", "materializedview", "virtualtable", "dataset", "datasetcollection", "submissionview" (case-insensitive). If None, defaults to ["folder", "file"].

TYPE: Optional[List[str]] DEFAULT: None

log_tree

If True, logs the ACL results to console in ASCII tree format showing entity hierarchies and their ACL permissions in a tree-like structure. Defaults to False.

TYPE: bool DEFAULT: False

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

_progress_bar

Internal parameter. Progress bar instance to use for updates when called recursively. Should not be used by external callers.

TYPE: Optional[tqdm] DEFAULT: None

RETURNS DESCRIPTION
AclListResult

An AclListResult object containing a structured representation of ACLs where:

AclListResult
  • entity_acls: A list of EntityAcl objects, each representing one entity's ACL
AclListResult
  • Each EntityAcl contains acl_entries (a list of AclEntry objects)
AclListResult
  • Each AclEntry contains the principal_id and their list of permissions
RAISES DESCRIPTION
ValueError

If the entity does not have an ID or if an invalid entity type is provided.

SynapseHTTPError

If there are permission issues accessing ACLs.

Exception

For any other errors that may occur during the process.

List ACLs for a single entity
import asyncio
from synapseclient import Synapse
from synapseclient.models import File

syn = Synapse()
syn.login()

async def main():
    acl_result = await File(id="syn123").list_acl_async()
    print(acl_result)

    # Access entity ACLs (entity_acls is a list, not a dict)
    for entity_acl in acl_result.all_entity_acls:
        if entity_acl.entity_id == "syn123":
            # Access individual ACL entries
            for acl_entry in entity_acl.acl_entries:
                if acl_entry.principal_id == "273948":
                    print(f"Principal 273948 has permissions: {acl_entry.permissions}")

    # I can also access the ACL for the file itself
    print(acl_result.entity_acl)

    print(acl_result)

asyncio.run(main())
List ACLs recursively for a folder and all its children
import asyncio
from synapseclient import Synapse
from synapseclient.models import Folder

syn = Synapse()
syn.login()

async def main():
    acl_result = await Folder(id="syn123").list_acl_async(
        recursive=True,
        include_container_content=True
    )

    # Access each entity's ACL (entity_acls is a list)
    for entity_acl in acl_result.all_entity_acls:
        print(f"Entity {entity_acl.entity_id} has ACL with {len(entity_acl.acl_entries)} principals")

    # I can also access the ACL for the folder itself
    print(acl_result.entity_acl)

    # List ACLs for only folder entities
    folder_acl_result = await Folder(id="syn123").list_acl_async(
        recursive=True,
        include_container_content=True,
        target_entity_types=["folder"]
    )

    # List ACLs for specific entity types (e.g., tables and views)
    table_view_acl_result = await Folder(id="syn123").list_acl_async(
        recursive=True,
        include_container_content=True,
        target_entity_types=["table", "entityview", "materializedview"]
    )

asyncio.run(main())
List ACLs with ASCII tree visualization

When log_tree=True, the ACLs will be logged in a tree format. Additionally, the ascii_tree attribute of the AclListResult will contain the ASCII tree representation of the ACLs.

import asyncio
from synapseclient import Synapse
from synapseclient.models import Folder

syn = Synapse()
syn.login()

async def main():
    acl_result = await Folder(id="syn123").list_acl_async(
        recursive=True,
        include_container_content=True,
        log_tree=True, # Enable ASCII tree logging
    )

    # The ASCII tree representation of the ACLs will also be available
    # in acl_result.ascii_tree
    print(acl_result.ascii_tree)

asyncio.run(main())
Source code in synapseclient/models/mixins/access_control.py
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
async def list_acl_async(
    self,
    recursive: bool = False,
    include_container_content: bool = False,
    target_entity_types: Optional[List[str]] = None,
    log_tree: bool = False,
    *,
    synapse_client: Optional[Synapse] = None,
    _progress_bar: Optional[tqdm] = None,  # Internal parameter for recursive calls
) -> AclListResult:
    """
    List the Access Control Lists (ACLs) for this entity and optionally its children.

    This function returns the local sharing settings for the entity and optionally
    its children. It provides a mapping of all ACLs for the given container/entity.

    **Important Note:** This function returns the LOCAL sharing settings only, not
    the effective permissions that each Synapse User ID/Team has on the entities.
    More permissive permissions could be granted via a Team that the user has access
    to that has permissions on the entity, or through inheritance from parent entities.

    Arguments:
        recursive: If True and the entity is a container (e.g., Project or Folder),
            recursively process child containers. Note that this must be used with
            include_container_content=True to have any effect. Setting recursive=True
            with include_container_content=False will raise a ValueError.
            Only works on classes that support the `sync_from_synapse_async` method.
        include_container_content: If True, include ACLs from contents directly within
            containers (files and folders inside self). This must be set to
            True for recursive to have any effect. Defaults to False.
        target_entity_types: Specify which entity types to process when listing ACLs.
            Allowed values are "folder", "file", "project", "table", "entityview",
            "materializedview", "virtualtable", "dataset", "datasetcollection",
            "submissionview" (case-insensitive). If None, defaults to ["folder", "file"].
        log_tree: If True, logs the ACL results to console in ASCII tree format showing
            entity hierarchies and their ACL permissions in a tree-like structure.
            Defaults to False.
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.
        _progress_bar: Internal parameter. Progress bar instance to use for updates
            when called recursively. Should not be used by external callers.

    Returns:
        An AclListResult object containing a structured representation of ACLs where:
        - entity_acls: A list of EntityAcl objects, each representing one entity's ACL
        - Each EntityAcl contains acl_entries (a list of AclEntry objects)
        - Each AclEntry contains the principal_id and their list of permissions

    Raises:
        ValueError: If the entity does not have an ID or if an invalid entity type is provided.
        SynapseHTTPError: If there are permission issues accessing ACLs.
        Exception: For any other errors that may occur during the process.

    Example: List ACLs for a single entity
        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import File

        syn = Synapse()
        syn.login()

        async def main():
            acl_result = await File(id="syn123").list_acl_async()
            print(acl_result)

            # Access entity ACLs (entity_acls is a list, not a dict)
            for entity_acl in acl_result.all_entity_acls:
                if entity_acl.entity_id == "syn123":
                    # Access individual ACL entries
                    for acl_entry in entity_acl.acl_entries:
                        if acl_entry.principal_id == "273948":
                            print(f"Principal 273948 has permissions: {acl_entry.permissions}")

            # I can also access the ACL for the file itself
            print(acl_result.entity_acl)

            print(acl_result)

        asyncio.run(main())
        ```

    Example: List ACLs recursively for a folder and all its children
        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import Folder

        syn = Synapse()
        syn.login()

        async def main():
            acl_result = await Folder(id="syn123").list_acl_async(
                recursive=True,
                include_container_content=True
            )

            # Access each entity's ACL (entity_acls is a list)
            for entity_acl in acl_result.all_entity_acls:
                print(f"Entity {entity_acl.entity_id} has ACL with {len(entity_acl.acl_entries)} principals")

            # I can also access the ACL for the folder itself
            print(acl_result.entity_acl)

            # List ACLs for only folder entities
            folder_acl_result = await Folder(id="syn123").list_acl_async(
                recursive=True,
                include_container_content=True,
                target_entity_types=["folder"]
            )

            # List ACLs for specific entity types (e.g., tables and views)
            table_view_acl_result = await Folder(id="syn123").list_acl_async(
                recursive=True,
                include_container_content=True,
                target_entity_types=["table", "entityview", "materializedview"]
            )

        asyncio.run(main())
        ```

    Example: List ACLs with ASCII tree visualization
        When `log_tree=True`, the ACLs will be logged in a tree format. Additionally,
        the `ascii_tree` attribute of the AclListResult will contain the ASCII tree
        representation of the ACLs.

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import Folder

        syn = Synapse()
        syn.login()

        async def main():
            acl_result = await Folder(id="syn123").list_acl_async(
                recursive=True,
                include_container_content=True,
                log_tree=True, # Enable ASCII tree logging
            )

            # The ASCII tree representation of the ACLs will also be available
            # in acl_result.ascii_tree
            print(acl_result.ascii_tree)

        asyncio.run(main())
        ```
    """
    if not self.id:
        raise ValueError("The entity must have an ID to list ACLs.")

    normalized_types = self._normalize_target_entity_types(target_entity_types)
    client = Synapse.get_client(synapse_client=synapse_client)

    all_acls: Dict[str, Dict[str, List[str]]] = {}
    all_entities = []

    # Only update progress bar for self ACL if we're the top-level call (not recursive)
    # When _progress_bar is passed, it means this is a recursive call and the parent
    # is managing progress updates
    update_progress_for_self = _progress_bar is None
    acl = await self._get_current_entity_acl(
        client=client,
        progress_bar=_progress_bar if update_progress_for_self else None,
    )
    if acl is not None:
        all_acls[self.id] = acl
    all_entities.append(self)

    should_process_children = (recursive or include_container_content) and hasattr(
        self, "sync_from_synapse_async"
    )

    if should_process_children and (recursive and not include_container_content):
        raise ValueError(
            "When recursive=True, include_container_content must also be True. "
            "Setting recursive=True with include_container_content=False has no effect."
        )

    if should_process_children and _progress_bar is None:
        with shared_download_progress_bar(
            file_size=1,
            synapse_client=client,
            custom_message="Collecting ACLs...",
            unit=None,
        ) as progress_bar:
            await self._process_children_with_progress(
                client=client,
                normalized_types=normalized_types,
                include_container_content=include_container_content,
                recursive=recursive,
                all_entities=all_entities,
                all_acls=all_acls,
                progress_bar=progress_bar,
            )
            # Ensure progress bar reaches 100% completion
            if progress_bar:
                remaining = (
                    progress_bar.total - progress_bar.n
                    if progress_bar.total > progress_bar.n
                    else 0
                )
                if remaining > 0:
                    progress_bar.update(remaining)
    elif should_process_children:
        await self._process_children_with_progress(
            client=client,
            normalized_types=normalized_types,
            include_container_content=include_container_content,
            recursive=recursive,
            all_entities=all_entities,
            all_acls=all_acls,
            progress_bar=_progress_bar,
        )
    current_acl = all_acls.get(self.id)
    acl_result = AclListResult.from_dict(
        all_acl_dict=all_acls, current_acl_dict=current_acl
    )

    if log_tree:
        logged_tree = await self._log_acl_tree(acl_result, all_entities, client)
        acl_result.ascii_tree = logged_tree

    return acl_result

bind_schema_async async

bind_schema_async(json_schema_uri: str, *, enable_derived_annotations: bool = False, synapse_client: Optional[Synapse] = None) -> JSONSchemaBinding

Bind a JSON schema to the entity.

PARAMETER DESCRIPTION
json_schema_uri

The URI of the JSON schema to bind to the entity.

TYPE: str

enable_derived_annotations

If true, enable derived annotations. Defaults to False.

TYPE: bool DEFAULT: False

synapse_client

The Synapse client instance. If not provided, the last created instance from the Synapse class constructor will be used.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
JSONSchemaBinding

An object containing details about the JSON schema binding.

Using this function

Binding JSON schema to a folder or a file. This example expects that you have a Synapse project to use, and a file to upload. Set the PROJECT_NAME and FILE_PATH variables to your project name and file path respectively.

import asyncio
from synapseclient import Synapse
from synapseclient.models import File, Folder

syn = Synapse()
syn.login()

# Define Project and JSON schema info
PROJECT_NAME = "test_json_schema_project"  # replace with your project name
FILE_PATH = "~/Sample.txt"  # replace with your test file path

PROJECT_ID = syn.findEntityId(name=PROJECT_NAME)
ORG_NAME = "UniqueOrg"  # replace with your organization name
SCHEMA_NAME = "myTestSchema"  # replace with your schema name
FOLDER_NAME = "test_script_folder"
VERSION = "0.0.1"
SCHEMA_URI = f"{ORG_NAME}-{SCHEMA_NAME}-{VERSION}"

# Create organization (if not already created)
js = syn.service("json_schema")
all_orgs = js.list_organizations()
for org in all_orgs:
    if org["name"] == ORG_NAME:
        print(f"Organization {ORG_NAME} already exists: {org}")
        break
else:
    print(f"Creating organization {ORG_NAME}.")
    created_organization = js.create_organization(ORG_NAME)
    print(f"Created organization: {created_organization}")

my_test_org = js.JsonSchemaOrganization(ORG_NAME)
test_schema = my_test_org.get_json_schema(SCHEMA_NAME)

if not test_schema:
    # Create the schema (if not already created)
    schema_definition = {
        "$id": "mySchema",
        "type": "object",
        "properties": {
            "foo": {"type": "string"},
            "bar": {"type": "integer"},
        },
        "required": ["foo"]
    }
    test_schema = my_test_org.create_json_schema(schema_definition, SCHEMA_NAME, VERSION)
    print(f"Created new schema: {SCHEMA_NAME}")

async def main():
    # Create a test folder
    test_folder = Folder(name=FOLDER_NAME, parent_id=PROJECT_ID)
    await test_folder.store_async()
    print(f"Created test folder: {FOLDER_NAME}")

    # Bind JSON schema to the folder
    bound_schema = await test_folder.bind_schema_async(
        json_schema_uri=SCHEMA_URI,
        enable_derived_annotations=True
    )
    print(f"Result from binding schema to folder: {bound_schema}")

    # Create and bind schema to a file
    example_file = File(
        path=FILE_PATH,  # Replace with your test file path
        parent_id=test_folder.id,
    )
    await example_file.store_async()

    bound_schema_file = await example_file.bind_schema_async(
        json_schema_uri=SCHEMA_URI,
        enable_derived_annotations=True
    )
    print(f"Result from binding schema to file: {bound_schema_file}")

asyncio.run(main())
Source code in synapseclient/models/mixins/json_schema.py
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
async def bind_schema_async(
    self,
    json_schema_uri: str,
    *,
    enable_derived_annotations: bool = False,
    synapse_client: Optional["Synapse"] = None,
) -> JSONSchemaBinding:
    """
    Bind a JSON schema to the entity.

    Arguments:
        json_schema_uri: The URI of the JSON schema to bind to the entity.
        enable_derived_annotations: If true, enable derived annotations. Defaults to False.
        synapse_client: The Synapse client instance. If not provided,
            the last created instance from the Synapse class constructor will be used.

    Returns:
        An object containing details about the JSON schema binding.

    Example: Using this function
        Binding JSON schema to a folder or a file. This example expects that you
        have a Synapse project to use, and a file to upload. Set the `PROJECT_NAME`
        and `FILE_PATH` variables to your project name and file path respectively.

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import File, Folder

        syn = Synapse()
        syn.login()

        # Define Project and JSON schema info
        PROJECT_NAME = "test_json_schema_project"  # replace with your project name
        FILE_PATH = "~/Sample.txt"  # replace with your test file path

        PROJECT_ID = syn.findEntityId(name=PROJECT_NAME)
        ORG_NAME = "UniqueOrg"  # replace with your organization name
        SCHEMA_NAME = "myTestSchema"  # replace with your schema name
        FOLDER_NAME = "test_script_folder"
        VERSION = "0.0.1"
        SCHEMA_URI = f"{ORG_NAME}-{SCHEMA_NAME}-{VERSION}"

        # Create organization (if not already created)
        js = syn.service("json_schema")
        all_orgs = js.list_organizations()
        for org in all_orgs:
            if org["name"] == ORG_NAME:
                print(f"Organization {ORG_NAME} already exists: {org}")
                break
        else:
            print(f"Creating organization {ORG_NAME}.")
            created_organization = js.create_organization(ORG_NAME)
            print(f"Created organization: {created_organization}")

        my_test_org = js.JsonSchemaOrganization(ORG_NAME)
        test_schema = my_test_org.get_json_schema(SCHEMA_NAME)

        if not test_schema:
            # Create the schema (if not already created)
            schema_definition = {
                "$id": "mySchema",
                "type": "object",
                "properties": {
                    "foo": {"type": "string"},
                    "bar": {"type": "integer"},
                },
                "required": ["foo"]
            }
            test_schema = my_test_org.create_json_schema(schema_definition, SCHEMA_NAME, VERSION)
            print(f"Created new schema: {SCHEMA_NAME}")

        async def main():
            # Create a test folder
            test_folder = Folder(name=FOLDER_NAME, parent_id=PROJECT_ID)
            await test_folder.store_async()
            print(f"Created test folder: {FOLDER_NAME}")

            # Bind JSON schema to the folder
            bound_schema = await test_folder.bind_schema_async(
                json_schema_uri=SCHEMA_URI,
                enable_derived_annotations=True
            )
            print(f"Result from binding schema to folder: {bound_schema}")

            # Create and bind schema to a file
            example_file = File(
                path=FILE_PATH,  # Replace with your test file path
                parent_id=test_folder.id,
            )
            await example_file.store_async()

            bound_schema_file = await example_file.bind_schema_async(
                json_schema_uri=SCHEMA_URI,
                enable_derived_annotations=True
            )
            print(f"Result from binding schema to file: {bound_schema_file}")

        asyncio.run(main())
        ```
    """
    response = await bind_json_schema_to_entity(
        synapse_id=self.id,
        json_schema_uri=json_schema_uri,
        enable_derived_annotations=enable_derived_annotations,
        synapse_client=synapse_client,
    )
    json_schema_version = response.get("jsonSchemaVersionInfo", {})
    return JSONSchemaBinding(
        json_schema_version_info=JSONSchemaVersionInfo(
            organization_id=json_schema_version.get("organizationId", None),
            organization_name=json_schema_version.get("organizationName", None),
            schema_id=json_schema_version.get("schemaId", None),
            id=json_schema_version.get("$id", None),
            schema_name=json_schema_version.get("schemaName", None),
            version_id=json_schema_version.get("versionId", None),
            semantic_version=json_schema_version.get("semanticVersion", None),
            json_sha256_hex=json_schema_version.get("jsonSHA256Hex", None),
            created_on=json_schema_version.get("createdOn", None),
            created_by=json_schema_version.get("createdBy", None),
        ),
        object_id=response.get("objectId", None),
        object_type=response.get("objectType", None),
        created_on=response.get("createdOn", None),
        created_by=response.get("createdBy", None),
        enable_derived_annotations=response.get("enableDerivedAnnotations", None),
    )

get_schema_async async

get_schema_async(*, synapse_client: Optional[Synapse] = None) -> JSONSchemaBinding

Get the JSON schema bound to the entity.

PARAMETER DESCRIPTION
synapse_client

The Synapse client instance. If not provided, the last created instance from the Synapse class constructor will be used.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
JSONSchemaBinding

An object containing details about the bound JSON schema.

Using this function

Retrieving the bound JSON schema from a folder or file. This example demonstrates how to get existing schema bindings from entities that already have schemas bound. Set the PROJECT_NAME and FILE_PATH variables to your project name and file path respectively.

import asyncio
from synapseclient import Synapse
from synapseclient.models import File, Folder

syn = Synapse()
syn.login()

# Define Project and JSON schema info
PROJECT_NAME = "test_json_schema_project"  # replace with your project name
FILE_PATH = "~/Sample.txt"  # replace with your test file path

PROJECT_ID = syn.findEntityId(name=PROJECT_NAME)
ORG_NAME = "UniqueOrg"  # replace with your organization name
SCHEMA_NAME = "myTestSchema"  # replace with your schema name
FOLDER_NAME = "test_script_folder"
VERSION = "0.0.1"
SCHEMA_URI = f"{ORG_NAME}-{SCHEMA_NAME}-{VERSION}"

# Create organization (if not already created)
js = syn.service("json_schema")
all_orgs = js.list_organizations()
for org in all_orgs:
    if org["name"] == ORG_NAME:
        print(f"Organization {ORG_NAME} already exists: {org}")
        break
else:
    print(f"Creating organization {ORG_NAME}.")
    created_organization = js.create_organization(ORG_NAME)
    print(f"Created organization: {created_organization}")

my_test_org = js.JsonSchemaOrganization(ORG_NAME)
test_schema = my_test_org.get_json_schema(SCHEMA_NAME)

if not test_schema:
    # Create the schema (if not already created)
    schema_definition = {
        "$id": "mySchema",
        "type": "object",
        "properties": {
            "foo": {"type": "string"},
            "bar": {"type": "integer"},
        },
        "required": ["foo"]
    }
    test_schema = my_test_org.create_json_schema(schema_definition, SCHEMA_NAME, VERSION)
    print(f"Created new schema: {SCHEMA_NAME}")

async def main():
    # Create a test folder
    test_folder = Folder(name=FOLDER_NAME, parent_id=PROJECT_ID)
    await test_folder.store_async()
    print(f"Created test folder: {FOLDER_NAME}")

    # Bind JSON schema to the folder first
    bound_schema = await test_folder.bind_schema_async(
        json_schema_uri=SCHEMA_URI,
        enable_derived_annotations=True
    )
    print(f"Bound schema to folder: {bound_schema}")

    # Create and bind schema to a file
    example_file = File(
        path=FILE_PATH,  # Replace with your test file path
        parent_id=test_folder.id,
    )
    await example_file.store_async()

    bound_schema_file = await example_file.bind_schema_async(
        json_schema_uri=SCHEMA_URI,
        enable_derived_annotations=True
    )
    print(f"Bound schema to file: {bound_schema_file}")

    # Retrieve the bound schema from the folder
    bound_schema = await test_folder.get_schema_async()
    print(f"Retrieved schema from folder: {bound_schema}")

    # Retrieve the bound schema from the file
    bound_schema_file = await example_file.get_schema_async()
    print(f"Retrieved schema from file: {bound_schema_file}")

asyncio.run(main())
Source code in synapseclient/models/mixins/json_schema.py
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
async def get_schema_async(
    self, *, synapse_client: Optional["Synapse"] = None
) -> JSONSchemaBinding:
    """
    Get the JSON schema bound to the entity.

    Arguments:
        synapse_client: The Synapse client instance. If not provided,
            the last created instance from the Synapse class constructor will be used.

    Returns:
        An object containing details about the bound JSON schema.

    Example: Using this function
        Retrieving the bound JSON schema from a folder or file. This example demonstrates
        how to get existing schema bindings from entities that already have schemas bound.
        Set the `PROJECT_NAME` and `FILE_PATH` variables to your project name
        and file path respectively.

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import File, Folder

        syn = Synapse()
        syn.login()

        # Define Project and JSON schema info
        PROJECT_NAME = "test_json_schema_project"  # replace with your project name
        FILE_PATH = "~/Sample.txt"  # replace with your test file path

        PROJECT_ID = syn.findEntityId(name=PROJECT_NAME)
        ORG_NAME = "UniqueOrg"  # replace with your organization name
        SCHEMA_NAME = "myTestSchema"  # replace with your schema name
        FOLDER_NAME = "test_script_folder"
        VERSION = "0.0.1"
        SCHEMA_URI = f"{ORG_NAME}-{SCHEMA_NAME}-{VERSION}"

        # Create organization (if not already created)
        js = syn.service("json_schema")
        all_orgs = js.list_organizations()
        for org in all_orgs:
            if org["name"] == ORG_NAME:
                print(f"Organization {ORG_NAME} already exists: {org}")
                break
        else:
            print(f"Creating organization {ORG_NAME}.")
            created_organization = js.create_organization(ORG_NAME)
            print(f"Created organization: {created_organization}")

        my_test_org = js.JsonSchemaOrganization(ORG_NAME)
        test_schema = my_test_org.get_json_schema(SCHEMA_NAME)

        if not test_schema:
            # Create the schema (if not already created)
            schema_definition = {
                "$id": "mySchema",
                "type": "object",
                "properties": {
                    "foo": {"type": "string"},
                    "bar": {"type": "integer"},
                },
                "required": ["foo"]
            }
            test_schema = my_test_org.create_json_schema(schema_definition, SCHEMA_NAME, VERSION)
            print(f"Created new schema: {SCHEMA_NAME}")

        async def main():
            # Create a test folder
            test_folder = Folder(name=FOLDER_NAME, parent_id=PROJECT_ID)
            await test_folder.store_async()
            print(f"Created test folder: {FOLDER_NAME}")

            # Bind JSON schema to the folder first
            bound_schema = await test_folder.bind_schema_async(
                json_schema_uri=SCHEMA_URI,
                enable_derived_annotations=True
            )
            print(f"Bound schema to folder: {bound_schema}")

            # Create and bind schema to a file
            example_file = File(
                path=FILE_PATH,  # Replace with your test file path
                parent_id=test_folder.id,
            )
            await example_file.store_async()

            bound_schema_file = await example_file.bind_schema_async(
                json_schema_uri=SCHEMA_URI,
                enable_derived_annotations=True
            )
            print(f"Bound schema to file: {bound_schema_file}")

            # Retrieve the bound schema from the folder
            bound_schema = await test_folder.get_schema_async()
            print(f"Retrieved schema from folder: {bound_schema}")

            # Retrieve the bound schema from the file
            bound_schema_file = await example_file.get_schema_async()
            print(f"Retrieved schema from file: {bound_schema_file}")

        asyncio.run(main())
        ```
    """
    response = await get_json_schema_from_entity(
        synapse_id=self.id, synapse_client=synapse_client
    )
    json_schema_version_info = response.get("jsonSchemaVersionInfo", {})
    return JSONSchemaBinding(
        json_schema_version_info=JSONSchemaVersionInfo(
            organization_id=json_schema_version_info.get("organizationId", None),
            organization_name=json_schema_version_info.get(
                "organizationName", None
            ),
            schema_id=json_schema_version_info.get("schemaId", None),
            id=json_schema_version_info.get("$id", None),
            schema_name=json_schema_version_info.get("schemaName", None),
            version_id=json_schema_version_info.get("versionId", None),
            semantic_version=json_schema_version_info.get("semanticVersion", None),
            json_sha256_hex=json_schema_version_info.get("jsonSHA256Hex", None),
            created_on=json_schema_version_info.get("createdOn", None),
            created_by=json_schema_version_info.get("createdBy", None),
        ),
        object_id=response.get("objectId", None),
        object_type=response.get("objectType", None),
        created_on=response.get("createdOn", None),
        created_by=response.get("createdBy", None),
        enable_derived_annotations=response.get("enableDerivedAnnotations", None),
    )

unbind_schema_async async

unbind_schema_async(*, synapse_client: Optional[Synapse] = None) -> None

Unbind the JSON schema bound to the entity.

PARAMETER DESCRIPTION
synapse_client

The Synapse client instance. If not provided, the last created instance from the Synapse class constructor will be used.

TYPE: Optional[Synapse] DEFAULT: None

Using this function

Unbinding a JSON schema from a folder or file. This example demonstrates how to remove schema bindings from entities. Assumes entities already have schemas bound. Set the PROJECT_NAME and FILE_PATH variables to your project name and file path respectively.

import asyncio
from synapseclient import Synapse
from synapseclient.models import File, Folder

syn = Synapse()
syn.login()

# Define Project and JSON schema info
PROJECT_NAME = "test_json_schema_project"  # replace with your project name
FILE_PATH = "~/Sample.txt"  # replace with your test file path

PROJECT_ID = syn.findEntityId(name=PROJECT_NAME)
ORG_NAME = "UniqueOrg"  # replace with your organization name
SCHEMA_NAME = "myTestSchema"  # replace with your schema name
FOLDER_NAME = "test_script_folder"
VERSION = "0.0.1"
SCHEMA_URI = f"{ORG_NAME}-{SCHEMA_NAME}-{VERSION}"

# Create organization (if not already created)
js = syn.service("json_schema")
all_orgs = js.list_organizations()
for org in all_orgs:
    if org["name"] == ORG_NAME:
        print(f"Organization {ORG_NAME} already exists: {org}")
        break
else:
    print(f"Creating organization {ORG_NAME}.")
    created_organization = js.create_organization(ORG_NAME)
    print(f"Created organization: {created_organization}")

my_test_org = js.JsonSchemaOrganization(ORG_NAME)
test_schema = my_test_org.get_json_schema(SCHEMA_NAME)

if not test_schema:
    # Create the schema (if not already created)
    schema_definition = {
        "$id": "mySchema",
        "type": "object",
        "properties": {
            "foo": {"type": "string"},
            "bar": {"type": "integer"},
        },
        "required": ["foo"]
    }
    test_schema = my_test_org.create_json_schema(schema_definition, SCHEMA_NAME, VERSION)
    print(f"Created new schema: {SCHEMA_NAME}")

async def main():
    # Create a test folder
    test_folder = Folder(name=FOLDER_NAME, parent_id=PROJECT_ID)
    await test_folder.store_async()
    print(f"Created test folder: {FOLDER_NAME}")

    # Bind JSON schema to the folder first
    bound_schema = await test_folder.bind_schema_async(
        json_schema_uri=SCHEMA_URI,
        enable_derived_annotations=True
    )
    print(f"Bound schema to folder: {bound_schema}")

    # Create and bind schema to a file
    example_file = File(
        path=FILE_PATH,  # Replace with your test file path
        parent_id=test_folder.id,
    )
    await example_file.store_async()
    print(f"Created test file: {FILE_PATH}")

    bound_schema_file = await example_file.bind_schema_async(
        json_schema_uri=SCHEMA_URI,
        enable_derived_annotations=True
    )
    print(f"Bound schema to file: {bound_schema_file}")

    # Unbind the schema from the folder
    await test_folder.unbind_schema_async()
    print("Successfully unbound schema from folder")

    # Unbind the schema from the file
    await example_file.unbind_schema_async()
    print("Successfully unbound schema from file")

asyncio.run(main())
Source code in synapseclient/models/mixins/json_schema.py
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
async def unbind_schema_async(
    self, *, synapse_client: Optional["Synapse"] = None
) -> None:
    """
    Unbind the JSON schema bound to the entity.

    Arguments:
        synapse_client: The Synapse client instance. If not provided,
            the last created instance from the Synapse class constructor will be used.

    Example: Using this function
        Unbinding a JSON schema from a folder or file. This example demonstrates
        how to remove schema bindings from entities. Assumes entities already have
        schemas bound. Set the `PROJECT_NAME` and `FILE_PATH` variables to your
        project name and file path respectively.


        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import File, Folder

        syn = Synapse()
        syn.login()

        # Define Project and JSON schema info
        PROJECT_NAME = "test_json_schema_project"  # replace with your project name
        FILE_PATH = "~/Sample.txt"  # replace with your test file path

        PROJECT_ID = syn.findEntityId(name=PROJECT_NAME)
        ORG_NAME = "UniqueOrg"  # replace with your organization name
        SCHEMA_NAME = "myTestSchema"  # replace with your schema name
        FOLDER_NAME = "test_script_folder"
        VERSION = "0.0.1"
        SCHEMA_URI = f"{ORG_NAME}-{SCHEMA_NAME}-{VERSION}"

        # Create organization (if not already created)
        js = syn.service("json_schema")
        all_orgs = js.list_organizations()
        for org in all_orgs:
            if org["name"] == ORG_NAME:
                print(f"Organization {ORG_NAME} already exists: {org}")
                break
        else:
            print(f"Creating organization {ORG_NAME}.")
            created_organization = js.create_organization(ORG_NAME)
            print(f"Created organization: {created_organization}")

        my_test_org = js.JsonSchemaOrganization(ORG_NAME)
        test_schema = my_test_org.get_json_schema(SCHEMA_NAME)

        if not test_schema:
            # Create the schema (if not already created)
            schema_definition = {
                "$id": "mySchema",
                "type": "object",
                "properties": {
                    "foo": {"type": "string"},
                    "bar": {"type": "integer"},
                },
                "required": ["foo"]
            }
            test_schema = my_test_org.create_json_schema(schema_definition, SCHEMA_NAME, VERSION)
            print(f"Created new schema: {SCHEMA_NAME}")

        async def main():
            # Create a test folder
            test_folder = Folder(name=FOLDER_NAME, parent_id=PROJECT_ID)
            await test_folder.store_async()
            print(f"Created test folder: {FOLDER_NAME}")

            # Bind JSON schema to the folder first
            bound_schema = await test_folder.bind_schema_async(
                json_schema_uri=SCHEMA_URI,
                enable_derived_annotations=True
            )
            print(f"Bound schema to folder: {bound_schema}")

            # Create and bind schema to a file
            example_file = File(
                path=FILE_PATH,  # Replace with your test file path
                parent_id=test_folder.id,
            )
            await example_file.store_async()
            print(f"Created test file: {FILE_PATH}")

            bound_schema_file = await example_file.bind_schema_async(
                json_schema_uri=SCHEMA_URI,
                enable_derived_annotations=True
            )
            print(f"Bound schema to file: {bound_schema_file}")

            # Unbind the schema from the folder
            await test_folder.unbind_schema_async()
            print("Successfully unbound schema from folder")

            # Unbind the schema from the file
            await example_file.unbind_schema_async()
            print("Successfully unbound schema from file")

        asyncio.run(main())
        ```
    """
    return await delete_json_schema_from_entity(
        synapse_id=self.id, synapse_client=synapse_client
    )

validate_schema_async async

validate_schema_async(*, synapse_client: Optional[Synapse] = None) -> Union[JSONSchemaValidation, InvalidJSONSchemaValidation]

Validate the entity against the bound JSON schema.

PARAMETER DESCRIPTION
synapse_client

The Synapse client instance. If not provided, the last created instance from the Synapse class constructor will be used.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
Union[JSONSchemaValidation, InvalidJSONSchemaValidation]

The validation results.

Using this function

Validating a folder or file against the bound JSON schema. This example demonstrates how to validate entities with annotations against their bound schemas. Requires entities to have schemas already bound. Set the PROJECT_NAME and FILE_PATH variables to your project name and file path respectively.

import asyncio
import time
from synapseclient import Synapse
from synapseclient.models import File, Folder

syn = Synapse()
syn.login()

# Define Project and JSON schema info
PROJECT_NAME = "test_json_schema_project"  # replace with your project name
FILE_PATH = "~/Sample.txt"  # replace with your test file path

PROJECT_ID = syn.findEntityId(name=PROJECT_NAME)
ORG_NAME = "UniqueOrg"  # replace with your organization name
SCHEMA_NAME = "myTestSchema"  # replace with your schema name
FOLDER_NAME = "test_script_folder"
VERSION = "0.0.1"
SCHEMA_URI = f"{ORG_NAME}-{SCHEMA_NAME}-{VERSION}"

# Create organization (if not already created)
js = syn.service("json_schema")
all_orgs = js.list_organizations()
for org in all_orgs:
    if org["name"] == ORG_NAME:
        print(f"Organization {ORG_NAME} already exists: {org}")
        break
else:
    print(f"Creating organization {ORG_NAME}.")
    created_organization = js.create_organization(ORG_NAME)
    print(f"Created organization: {created_organization}")

my_test_org = js.JsonSchemaOrganization(ORG_NAME)
test_schema = my_test_org.get_json_schema(SCHEMA_NAME)

if not test_schema:
    # Create the schema (if not already created)
    schema_definition = {
        "$id": "mySchema",
        "type": "object",
        "properties": {
            "foo": {"type": "string"},
            "bar": {"type": "integer"},
        },
        "required": ["foo"]
    }
    test_schema = my_test_org.create_json_schema(schema_definition, SCHEMA_NAME, VERSION)
    print(f"Created new schema: {SCHEMA_NAME}")

async def main():
    # Create a test folder
    test_folder = Folder(name=FOLDER_NAME, parent_id=PROJECT_ID)
    await test_folder.store_async()
    print(f"Created test folder: {FOLDER_NAME}")

    # Bind JSON schema to the folder
    bound_schema = await test_folder.bind_schema_async(
        json_schema_uri=SCHEMA_URI,
        enable_derived_annotations=True
    )
    print(f"Bound schema to folder: {bound_schema}")

    # Create and bind schema to a file
    example_file = File(
        path=FILE_PATH,  # Replace with your test file path
        parent_id=test_folder.id,
    )
    await example_file.store_async()

    bound_schema_file = await example_file.bind_schema_async(
        json_schema_uri=SCHEMA_URI,
        enable_derived_annotations=True
    )
    print(f"Bound schema to file: {bound_schema_file}")

    # Validate the folder entity against the bound schema
    test_folder.annotations = {"foo": "test_value", "bar": 42}  # Example annotations
    await test_folder.store_async()
    print("Added annotations to folder and stored")
    time.sleep(2)  # Allow time for processing

    validation_response = await test_folder.validate_schema_async()
    print(f"Folder validation response: {validation_response}")

    # Validate the file entity against the bound schema
    example_file.annotations = {"foo": "test_value", "bar": 43}  # Example annotations
    await example_file.store_async()
    print("Added annotations to file and stored")
    time.sleep(2)  # Allow time for processing

    validation_response_file = await example_file.validate_schema_async()
    print(f"File validation response: {validation_response_file}")

asyncio.run(main())
Source code in synapseclient/models/mixins/json_schema.py
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
async def validate_schema_async(
    self, *, synapse_client: Optional["Synapse"] = None
) -> Union[JSONSchemaValidation, InvalidJSONSchemaValidation]:
    """
    Validate the entity against the bound JSON schema.

    Arguments:
        synapse_client (Optional[Synapse], optional): The Synapse client instance. If not provided,
            the last created instance from the Synapse class constructor will be used.

    Returns:
        The validation results.

    Example: Using this function
        Validating a folder or file against the bound JSON schema. This example demonstrates
        how to validate entities with annotations against their bound schemas. Requires entities
        to have schemas already bound. Set the `PROJECT_NAME` and `FILE_PATH` variables to your project name
        and file path respectively.

        ```python
        import asyncio
        import time
        from synapseclient import Synapse
        from synapseclient.models import File, Folder

        syn = Synapse()
        syn.login()

        # Define Project and JSON schema info
        PROJECT_NAME = "test_json_schema_project"  # replace with your project name
        FILE_PATH = "~/Sample.txt"  # replace with your test file path

        PROJECT_ID = syn.findEntityId(name=PROJECT_NAME)
        ORG_NAME = "UniqueOrg"  # replace with your organization name
        SCHEMA_NAME = "myTestSchema"  # replace with your schema name
        FOLDER_NAME = "test_script_folder"
        VERSION = "0.0.1"
        SCHEMA_URI = f"{ORG_NAME}-{SCHEMA_NAME}-{VERSION}"

        # Create organization (if not already created)
        js = syn.service("json_schema")
        all_orgs = js.list_organizations()
        for org in all_orgs:
            if org["name"] == ORG_NAME:
                print(f"Organization {ORG_NAME} already exists: {org}")
                break
        else:
            print(f"Creating organization {ORG_NAME}.")
            created_organization = js.create_organization(ORG_NAME)
            print(f"Created organization: {created_organization}")

        my_test_org = js.JsonSchemaOrganization(ORG_NAME)
        test_schema = my_test_org.get_json_schema(SCHEMA_NAME)

        if not test_schema:
            # Create the schema (if not already created)
            schema_definition = {
                "$id": "mySchema",
                "type": "object",
                "properties": {
                    "foo": {"type": "string"},
                    "bar": {"type": "integer"},
                },
                "required": ["foo"]
            }
            test_schema = my_test_org.create_json_schema(schema_definition, SCHEMA_NAME, VERSION)
            print(f"Created new schema: {SCHEMA_NAME}")

        async def main():
            # Create a test folder
            test_folder = Folder(name=FOLDER_NAME, parent_id=PROJECT_ID)
            await test_folder.store_async()
            print(f"Created test folder: {FOLDER_NAME}")

            # Bind JSON schema to the folder
            bound_schema = await test_folder.bind_schema_async(
                json_schema_uri=SCHEMA_URI,
                enable_derived_annotations=True
            )
            print(f"Bound schema to folder: {bound_schema}")

            # Create and bind schema to a file
            example_file = File(
                path=FILE_PATH,  # Replace with your test file path
                parent_id=test_folder.id,
            )
            await example_file.store_async()

            bound_schema_file = await example_file.bind_schema_async(
                json_schema_uri=SCHEMA_URI,
                enable_derived_annotations=True
            )
            print(f"Bound schema to file: {bound_schema_file}")

            # Validate the folder entity against the bound schema
            test_folder.annotations = {"foo": "test_value", "bar": 42}  # Example annotations
            await test_folder.store_async()
            print("Added annotations to folder and stored")
            time.sleep(2)  # Allow time for processing

            validation_response = await test_folder.validate_schema_async()
            print(f"Folder validation response: {validation_response}")

            # Validate the file entity against the bound schema
            example_file.annotations = {"foo": "test_value", "bar": 43}  # Example annotations
            await example_file.store_async()
            print("Added annotations to file and stored")
            time.sleep(2)  # Allow time for processing

            validation_response_file = await example_file.validate_schema_async()
            print(f"File validation response: {validation_response_file}")

        asyncio.run(main())
        ```
    """
    response = await validate_entity_with_json_schema(
        synapse_id=self.id, synapse_client=synapse_client
    )
    if "validationException" in response:
        return InvalidJSONSchemaValidation(
            validation_response=JSONSchemaValidation(
                object_id=response.get("objectId", None),
                object_type=response.get("objectType", None),
                object_etag=response.get("objectEtag", None),
                id=response.get("schema$id", None),
                is_valid=response.get("isValid", None),
                validated_on=response.get("validatedOn", None),
            ),
            validation_error_message=response.get("validationErrorMessage", None),
            all_validation_messages=response.get("allValidationMessages", []),
            validation_exception=ValidationException(
                pointer_to_violation=response.get("validationException", {}).get(
                    "pointerToViolation", None
                ),
                message=response.get("validationException", {}).get(
                    "message", None
                ),
                schema_location=response.get("validationException", {}).get(
                    "schemaLocation", None
                ),
                causing_exceptions=[
                    CausingException(
                        keyword=ce.get("keyword", None),
                        pointer_to_violation=ce.get("pointerToViolation", None),
                        message=ce.get("message", None),
                        schema_location=ce.get("schemaLocation", None),
                        causing_exceptions=[
                            CausingException(
                                keyword=nce.get("keyword", None),
                                pointer_to_violation=nce.get(
                                    "pointerToViolation", None
                                ),
                                message=nce.get("message", None),
                                schema_location=nce.get("schemaLocation", None),
                            )
                            for nce in ce.get("causingExceptions", [])
                        ],
                    )
                    for ce in response.get("validationException", {}).get(
                        "causingExceptions", []
                    )
                ],
            ),
        )
    return JSONSchemaValidation(
        object_id=response.get("objectId", None),
        object_type=response.get("objectType", None),
        object_etag=response.get("objectEtag", None),
        id=response.get("schema$id", None),
        is_valid=response.get("isValid", None),
        validated_on=response.get("validatedOn", None),
    )

get_schema_derived_keys_async async

get_schema_derived_keys_async(*, synapse_client: Optional[Synapse] = None) -> JSONSchemaDerivedKeys

Retrieve derived JSON schema keys for the entity.

PARAMETER DESCRIPTION
synapse_client

The Synapse client instance. If not provided, the last created instance from the Synapse class constructor will be used.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
JSONSchemaDerivedKeys

An object containing the derived keys for the entity.

Using this function

Retrieving derived keys from a folder or file. This example demonstrates how to get derived annotation keys from schemas with constant values. Set the PROJECT_NAME variable to your project name.

import asyncio
from synapseclient import Synapse
from synapseclient.models import File, Folder

syn = Synapse()
syn.login()

# Define Project and JSON schema info
PROJECT_NAME = "test_json_schema_project"  # replace with your project name
FILE_PATH = "~/Sample.txt"  # replace with your test file path

PROJECT_ID = syn.findEntityId(name=PROJECT_NAME)
ORG_NAME = "UniqueOrg"  # replace with your organization name
DERIVED_TEST_SCHEMA_NAME = "myTestDerivedSchema"  # replace with your derived schema name
FOLDER_NAME = "test_script_folder"
VERSION = "0.0.1"
SCHEMA_URI = f"{ORG_NAME}-{DERIVED_TEST_SCHEMA_NAME}-{VERSION}"

# Create organization (if not already created)
js = syn.service("json_schema")
all_orgs = js.list_organizations()
for org in all_orgs:
    if org["name"] == ORG_NAME:
        print(f"Organization {ORG_NAME} already exists: {org}")
        break
else:
    print(f"Creating organization {ORG_NAME}.")
    created_organization = js.create_organization(ORG_NAME)
    print(f"Created organization: {created_organization}")

my_test_org = js.JsonSchemaOrganization(ORG_NAME)
test_schema = my_test_org.get_json_schema(DERIVED_TEST_SCHEMA_NAME)

if not test_schema:
    # Create the schema (if not already created)
    schema_definition = {
        "$id": "mySchema",
        "type": "object",
        "properties": {
            "foo": {"type": "string"},
            "baz": {"type": "string", "const": "example_value"},  # Example constant for derived annotation
            "bar": {"type": "integer"},
        },
        "required": ["foo"]
    }
    test_schema = my_test_org.create_json_schema(schema_definition, DERIVED_TEST_SCHEMA_NAME, VERSION)
    print(f"Created new derived schema: {DERIVED_TEST_SCHEMA_NAME}")

async def main():
    # Create a test folder
    test_folder = Folder(name=FOLDER_NAME, parent_id=PROJECT_ID)
    await test_folder.store_async()
    print(f"Created test folder: {FOLDER_NAME}")

    # Bind JSON schema to the folder
    bound_schema = await test_folder.bind_schema_async(
        json_schema_uri=SCHEMA_URI,
        enable_derived_annotations=True
    )
    print(f"Bound schema to folder with derived annotations: {bound_schema}")

    # Create and bind schema to a file
    example_file = File(
        path=FILE_PATH,  # Replace with your test file path
        parent_id=test_folder.id,
    )
    await example_file.store_async()

    bound_schema_file = await example_file.bind_schema_async(
        json_schema_uri=SCHEMA_URI,
        enable_derived_annotations=True
    )
    print(f"Bound schema to file with derived annotations: {bound_schema_file}")

    # Get the derived keys from the bound schema of the folder
    test_folder.annotations = {"foo": "test_value_new", "bar": 42}  # Example annotations
    await test_folder.store_async()
    print("Added annotations to folder and stored")

    derived_keys = await test_folder.get_schema_derived_keys_async()
    print(f"Derived keys from folder: {derived_keys}")

    # Get the derived keys from the bound schema of the file
    example_file.annotations = {"foo": "test_value_new", "bar": 43}  # Example annotations
    await example_file.store_async()
    print("Added annotations to file and stored")

    derived_keys_file = await example_file.get_schema_derived_keys_async()
    print(f"Derived keys from file: {derived_keys_file}")

asyncio.run(main())
Source code in synapseclient/models/mixins/json_schema.py
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
async def get_schema_derived_keys_async(
    self, *, synapse_client: Optional["Synapse"] = None
) -> JSONSchemaDerivedKeys:
    """
    Retrieve derived JSON schema keys for the entity.

    Arguments:
        synapse_client (Optional[Synapse], optional): The Synapse client instance. If not provided,
            the last created instance from the Synapse class constructor will be used.

    Returns:
        An object containing the derived keys for the entity.

    Example: Using this function
        Retrieving derived keys from a folder or file. This example demonstrates
        how to get derived annotation keys from schemas with constant values.
        Set the `PROJECT_NAME` variable to your project name.

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import File, Folder

        syn = Synapse()
        syn.login()

        # Define Project and JSON schema info
        PROJECT_NAME = "test_json_schema_project"  # replace with your project name
        FILE_PATH = "~/Sample.txt"  # replace with your test file path

        PROJECT_ID = syn.findEntityId(name=PROJECT_NAME)
        ORG_NAME = "UniqueOrg"  # replace with your organization name
        DERIVED_TEST_SCHEMA_NAME = "myTestDerivedSchema"  # replace with your derived schema name
        FOLDER_NAME = "test_script_folder"
        VERSION = "0.0.1"
        SCHEMA_URI = f"{ORG_NAME}-{DERIVED_TEST_SCHEMA_NAME}-{VERSION}"

        # Create organization (if not already created)
        js = syn.service("json_schema")
        all_orgs = js.list_organizations()
        for org in all_orgs:
            if org["name"] == ORG_NAME:
                print(f"Organization {ORG_NAME} already exists: {org}")
                break
        else:
            print(f"Creating organization {ORG_NAME}.")
            created_organization = js.create_organization(ORG_NAME)
            print(f"Created organization: {created_organization}")

        my_test_org = js.JsonSchemaOrganization(ORG_NAME)
        test_schema = my_test_org.get_json_schema(DERIVED_TEST_SCHEMA_NAME)

        if not test_schema:
            # Create the schema (if not already created)
            schema_definition = {
                "$id": "mySchema",
                "type": "object",
                "properties": {
                    "foo": {"type": "string"},
                    "baz": {"type": "string", "const": "example_value"},  # Example constant for derived annotation
                    "bar": {"type": "integer"},
                },
                "required": ["foo"]
            }
            test_schema = my_test_org.create_json_schema(schema_definition, DERIVED_TEST_SCHEMA_NAME, VERSION)
            print(f"Created new derived schema: {DERIVED_TEST_SCHEMA_NAME}")

        async def main():
            # Create a test folder
            test_folder = Folder(name=FOLDER_NAME, parent_id=PROJECT_ID)
            await test_folder.store_async()
            print(f"Created test folder: {FOLDER_NAME}")

            # Bind JSON schema to the folder
            bound_schema = await test_folder.bind_schema_async(
                json_schema_uri=SCHEMA_URI,
                enable_derived_annotations=True
            )
            print(f"Bound schema to folder with derived annotations: {bound_schema}")

            # Create and bind schema to a file
            example_file = File(
                path=FILE_PATH,  # Replace with your test file path
                parent_id=test_folder.id,
            )
            await example_file.store_async()

            bound_schema_file = await example_file.bind_schema_async(
                json_schema_uri=SCHEMA_URI,
                enable_derived_annotations=True
            )
            print(f"Bound schema to file with derived annotations: {bound_schema_file}")

            # Get the derived keys from the bound schema of the folder
            test_folder.annotations = {"foo": "test_value_new", "bar": 42}  # Example annotations
            await test_folder.store_async()
            print("Added annotations to folder and stored")

            derived_keys = await test_folder.get_schema_derived_keys_async()
            print(f"Derived keys from folder: {derived_keys}")

            # Get the derived keys from the bound schema of the file
            example_file.annotations = {"foo": "test_value_new", "bar": 43}  # Example annotations
            await example_file.store_async()
            print("Added annotations to file and stored")

            derived_keys_file = await example_file.get_schema_derived_keys_async()
            print(f"Derived keys from file: {derived_keys_file}")

        asyncio.run(main())
        ```
    """
    response = await get_json_schema_derived_keys(
        synapse_id=self.id, synapse_client=synapse_client
    )
    return JSONSchemaDerivedKeys(keys=response["keys"])

synapseclient.models.RecordBasedMetadataTaskProperties dataclass

A CurationTaskProperties for record-based metadata.

Represents a Synapse RecordBasedMetadataTaskProperties.

ATTRIBUTE DESCRIPTION
record_set_id

The synId of the RecordSet that will contain all record-based metadata

TYPE: Optional[str]

Source code in synapseclient/models/curation.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
@dataclass
class RecordBasedMetadataTaskProperties:
    """
    A CurationTaskProperties for record-based metadata.

    Represents a [Synapse RecordBasedMetadataTaskProperties](https://rest-docs.synapse.org/org/sagebionetworks/repo/model/curation/metadata/RecordBasedMetadataTaskProperties.html).

    Attributes:
        record_set_id: The synId of the RecordSet that will contain all record-based metadata
    """

    record_set_id: Optional[str] = None
    """The synId of the RecordSet that will contain all record-based metadata"""

    def fill_from_dict(
        self, synapse_response: Union[Dict[str, Any], Any]
    ) -> "RecordBasedMetadataTaskProperties":
        """
        Converts a response from the REST API into this dataclass.

        Arguments:
            synapse_response: The response from the REST API.

        Returns:
            The RecordBasedMetadataTaskProperties object.
        """
        self.record_set_id = synapse_response.get("recordSetId", None)
        return self

    def to_synapse_request(self) -> Dict[str, Any]:
        """
        Converts this dataclass to a dictionary suitable for a Synapse REST API request.

        Returns:
            A dictionary representation of this object for API requests.
        """
        request_dict = {"concreteType": RECORD_BASED_METADATA_TASK_PROPERTIES}
        if self.record_set_id is not None:
            request_dict["recordSetId"] = self.record_set_id
        return request_dict

Attributes

record_set_id class-attribute instance-attribute

record_set_id: Optional[str] = None

The synId of the RecordSet that will contain all record-based metadata


synapseclient.models.FileBasedMetadataTaskProperties dataclass

A CurationTaskProperties for file-based data, describing where data is uploaded and a view which contains the annotations.

Represents a Synapse FileBasedMetadataTaskProperties.

ATTRIBUTE DESCRIPTION
upload_folder_id

The synId of the folder where data files of this type are to be uploaded

TYPE: Optional[str]

file_view_id

The synId of the FileView that shows all data of this type

TYPE: Optional[str]

Source code in synapseclient/models/curation.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
@dataclass
class FileBasedMetadataTaskProperties:
    """
    A CurationTaskProperties for file-based data, describing where data is uploaded
    and a view which contains the annotations.

    Represents a [Synapse FileBasedMetadataTaskProperties](https://rest-docs.synapse.org/org/sagebionetworks/repo/model/curation/metadata/FileBasedMetadataTaskProperties.html).

    Attributes:
        upload_folder_id: The synId of the folder where data files of this type are to be uploaded
        file_view_id: The synId of the FileView that shows all data of this type
    """

    upload_folder_id: Optional[str] = None
    """The synId of the folder where data files of this type are to be uploaded"""

    file_view_id: Optional[str] = None
    """The synId of the FileView that shows all data of this type"""

    def fill_from_dict(
        self, synapse_response: Union[Dict[str, Any], Any]
    ) -> "FileBasedMetadataTaskProperties":
        """
        Converts a response from the REST API into this dataclass.

        Arguments:
            synapse_response: The response from the REST API.

        Returns:
            The FileBasedMetadataTaskProperties object.
        """
        self.upload_folder_id = synapse_response.get("uploadFolderId", None)
        self.file_view_id = synapse_response.get("fileViewId", None)
        return self

    def to_synapse_request(self) -> Dict[str, Any]:
        """
        Converts this dataclass to a dictionary suitable for a Synapse REST API request.

        Returns:
            A dictionary representation of this object for API requests.
        """
        request_dict = {"concreteType": FILE_BASED_METADATA_TASK_PROPERTIES}
        if self.upload_folder_id is not None:
            request_dict["uploadFolderId"] = self.upload_folder_id
        if self.file_view_id is not None:
            request_dict["fileViewId"] = self.file_view_id
        return request_dict

Attributes

upload_folder_id class-attribute instance-attribute

upload_folder_id: Optional[str] = None

The synId of the folder where data files of this type are to be uploaded

file_view_id class-attribute instance-attribute

file_view_id: Optional[str] = None

The synId of the FileView that shows all data of this type


synapseclient.models.Grid dataclass

Bases: GridSynchronousProtocol

A GridSession provides functionality to create and manage grid sessions in Synapse. Grid sessions are used for curation workflows where data can be edited in a grid format and then exported back to record sets.

ATTRIBUTE DESCRIPTION
record_set_id

The synId of the RecordSet to use for initializing the grid

TYPE: Optional[str]

initial_query

Initialize a grid session from an EntityView. Mutually exclusive with record_set_id.

TYPE: Optional[Query]

session_id

The unique sessionId that identifies the grid session

TYPE: Optional[str]

started_by

The user that started this session

TYPE: Optional[str]

started_on

The date-time when the session was started

TYPE: Optional[str]

etag

Changes when the session changes

TYPE: Optional[str]

modified_on

The date-time when the session was last changed

TYPE: Optional[str]

last_replica_id_client

The last replica ID issued to a client

TYPE: Optional[int]

last_replica_id_service

The last replica ID issued to a service

TYPE: Optional[int]

grid_json_schema_id

The $id of the JSON schema used for model validation

TYPE: Optional[str]

source_entity_id

The synId of the table/view/csv that this grid was cloned from

TYPE: Optional[str]

record_set_version_number

The version number of the exported record set

TYPE: Optional[int]

validation_summary_statistics

Summary statistics for validation results

TYPE: Optional[ValidationSummary]

Create and manage a grid session workflow

 

from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

# Create a new grid session from a record set
grid = Grid(record_set_id="syn1234567")
grid = grid.create()
print(f"Created grid session: {grid.session_id}")

# Later, export the modified data back to the record set
grid = grid.export_to_record_set()
print(f"Exported to version: {grid.record_set_version_number}")

# Clean up by deleting the session when done
grid.delete()
Working with grid sessions using queries

 

from synapseclient import Synapse
from synapseclient.models import Grid
from synapseclient.models.table_components import Query

syn = Synapse()
syn.login()

# Create a grid from an entity view query
query = Query(sql="SELECT * FROM syn1234567")
grid = Grid(initial_query=query)
grid = grid.create()

# Work with the grid session...
# Export when ready
grid = grid.export_to_record_set()
Source code in synapseclient/models/curation.py
1340
1341
1342
1343
1344
1345
1346
1347
1348
1349
1350
1351
1352
1353
1354
1355
1356
1357
1358
1359
1360
1361
1362
1363
1364
1365
1366
1367
1368
1369
1370
1371
1372
1373
1374
1375
1376
1377
1378
1379
1380
1381
1382
1383
1384
1385
1386
1387
1388
1389
1390
1391
1392
1393
1394
1395
1396
1397
1398
1399
1400
1401
1402
1403
1404
1405
1406
1407
1408
1409
1410
1411
1412
1413
1414
1415
1416
1417
1418
1419
1420
1421
1422
1423
1424
1425
1426
1427
1428
1429
1430
1431
1432
1433
1434
1435
1436
1437
1438
1439
1440
1441
1442
1443
1444
1445
1446
1447
1448
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
1541
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
1605
1606
1607
1608
1609
1610
1611
1612
1613
1614
1615
1616
1617
1618
1619
1620
1621
1622
1623
1624
1625
1626
1627
1628
1629
1630
1631
1632
1633
1634
1635
1636
1637
1638
1639
1640
1641
1642
1643
1644
1645
1646
1647
1648
1649
1650
1651
1652
1653
1654
1655
1656
1657
1658
1659
1660
1661
1662
1663
1664
1665
1666
1667
1668
1669
1670
1671
1672
1673
1674
1675
1676
1677
1678
1679
1680
1681
1682
1683
1684
1685
1686
1687
1688
1689
1690
1691
1692
1693
1694
1695
1696
1697
1698
1699
1700
1701
1702
1703
1704
1705
1706
1707
1708
1709
1710
1711
1712
1713
1714
1715
1716
1717
1718
1719
1720
1721
1722
1723
1724
1725
1726
1727
1728
1729
1730
1731
1732
1733
1734
1735
1736
1737
1738
1739
1740
1741
1742
1743
1744
1745
1746
1747
1748
1749
@dataclass
@async_to_sync
class Grid(GridSynchronousProtocol):
    """
    A GridSession provides functionality to create and manage grid sessions in Synapse.
    Grid sessions are used for curation workflows where data can be edited in a grid format
    and then exported back to record sets.

    Attributes:
        record_set_id: The synId of the RecordSet to use for initializing the grid
        initial_query: Initialize a grid session from an EntityView.
            Mutually exclusive with record_set_id.
        session_id: The unique sessionId that identifies the grid session
        started_by: The user that started this session
        started_on: The date-time when the session was started
        etag: Changes when the session changes
        modified_on: The date-time when the session was last changed
        last_replica_id_client: The last replica ID issued to a client
        last_replica_id_service: The last replica ID issued to a service
        grid_json_schema_id: The $id of the JSON schema used for model validation
        source_entity_id: The synId of the table/view/csv that this grid was cloned from
        record_set_version_number: The version number of the exported record set
        validation_summary_statistics: Summary statistics for validation results

    Example: Create and manage a grid session workflow
        &nbsp;

        ```python
        from synapseclient import Synapse
        from synapseclient.models import Grid

        syn = Synapse()
        syn.login()

        # Create a new grid session from a record set
        grid = Grid(record_set_id="syn1234567")
        grid = grid.create()
        print(f"Created grid session: {grid.session_id}")

        # Later, export the modified data back to the record set
        grid = grid.export_to_record_set()
        print(f"Exported to version: {grid.record_set_version_number}")

        # Clean up by deleting the session when done
        grid.delete()
        ```

    Example: Working with grid sessions using queries
        &nbsp;

        ```python
        from synapseclient import Synapse
        from synapseclient.models import Grid
        from synapseclient.models.table_components import Query

        syn = Synapse()
        syn.login()

        # Create a grid from an entity view query
        query = Query(sql="SELECT * FROM syn1234567")
        grid = Grid(initial_query=query)
        grid = grid.create()

        # Work with the grid session...
        # Export when ready
        grid = grid.export_to_record_set()
        ```
    """

    record_set_id: Optional[str] = None
    """The synId of the RecordSet to use for initializing the grid"""

    initial_query: Optional[Query] = None
    """Initialize a grid session from an EntityView.
    Mutually exclusive with record_set_id."""

    session_id: Optional[str] = None
    """The unique sessionId that identifies the grid session"""

    started_by: Optional[str] = None
    """The user that started this session"""

    started_on: Optional[str] = None
    """The date-time when the session was started"""

    etag: Optional[str] = None
    """Changes when the session changes"""

    modified_on: Optional[str] = None
    """The date-time when the session was last changed"""

    last_replica_id_client: Optional[int] = None
    """The last replica ID issued to a client. Client replica IDs are incremented."""

    last_replica_id_service: Optional[int] = None
    """The last replica ID issued to a service. Service replica IDs are decremented."""

    grid_json_schema_id: Optional[str] = None
    """The $id of the JSON schema that will be used for model validation in this grid session"""

    source_entity_id: Optional[str] = None
    """The synId of the table/view/csv that this grid was cloned from"""

    record_set_version_number: Optional[int] = None
    """The version number of the exported record set"""

    validation_summary_statistics: Optional[ValidationSummary] = None
    """Summary statistics for validation results"""

    async def create_async(
        self,
        attach_to_previous_session=True,
        *,
        timeout: int = 120,
        synapse_client: Optional[Synapse] = None,
    ) -> "Grid":
        """
        Creates a new grid session from a `record_set_id` or `initial_query`.

        When using `record_set_id`, first checks for existing active sessions that match
        the record set before creating a new one. When using `initial_query`, always
        creates a new session due to the complexity of matching query parameters.

        Arguments:
            attach_to_previous_session: If True and using `record_set_id`, will attach
                to an existing active session if one exists. Defaults to True.
            timeout: The number of seconds to wait for the job to complete or progress
                before raising a SynapseTimeoutError. Defaults to 120.
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

        Returns:
            GridSession: The GridSession object with populated session_id.

        Raises:
            ValueError: If `record_set_id` or `initial_query` is not provided.

        Example: Create a grid session asynchronously
            &nbsp;

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import Grid

            syn = Synapse()
            syn.login()

            async def main():
                # Create a grid session from a record set
                grid = Grid(record_set_id="syn1234567")
                grid = await grid.create_async()
                print(f"Created grid session: {grid.session_id}")

            asyncio.run(main())
            ```
        """
        if not self.record_set_id and not self.initial_query:
            raise ValueError(
                "record_set_id or initial_query is required to create a GridSession"
            )

        trace.get_current_span().set_attributes(
            {
                "synapse.record_set_id": self.record_set_id or "",
                "synapse.session_id": self.session_id or "",
            }
        )

        # Check for existing active sessions only when using record_set_id
        # For initial_query, always create a new session due to complexity of matching
        if self.record_set_id and attach_to_previous_session:
            # Look for existing active sessions for this record set
            async for existing_session in self.list_async(
                source_id=self.record_set_id, synapse_client=synapse_client
            ):
                # Found an existing session, populate this object with its data and return
                self.session_id = existing_session.session_id
                self.started_by = existing_session.started_by
                self.started_on = existing_session.started_on
                self.etag = existing_session.etag
                self.modified_on = existing_session.modified_on
                self.last_replica_id_client = existing_session.last_replica_id_client
                self.last_replica_id_service = existing_session.last_replica_id_service
                self.grid_json_schema_id = existing_session.grid_json_schema_id
                self.source_entity_id = existing_session.source_entity_id
                return self

        # No existing session found, create a new one
        create_request = CreateGridRequest(
            record_set_id=self.record_set_id, initial_query=self.initial_query
        )
        result = await create_request.send_job_and_wait_async(
            timeout=timeout, synapse_client=synapse_client
        )

        # Fill this GridSession with the grid session data from the async job response
        result.fill_grid_session_from_response(self)

        return self

    async def export_to_record_set_async(
        self, *, timeout: int = 120, synapse_client: Optional[Synapse] = None
    ) -> "Grid":
        """
        Exports the grid session data back to a record set. This will create a new version
        of the original record set with the modified data from the grid session.

        Arguments:
            timeout: The number of seconds to wait for the job to complete or progress
                before raising a SynapseTimeoutError. Defaults to 120.
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

        Returns:
            GridSession: The GridSession object with export information populated.

        Raises:
            ValueError: If session_id is not provided.

        Example: Export grid session data back to record set asynchronously
            &nbsp;

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import Grid

            syn = Synapse()
            syn.login()

            async def main():
                # Export modified grid data back to the record set
                grid = Grid(session_id="abc-123-def")
                grid = await grid.export_to_record_set_async()
                print(f"Exported to record set: {grid.record_set_id}")
                print(f"Version number: {grid.record_set_version_number}")
                if grid.validation_summary_statistics:
                    print(f"Valid records: {grid.validation_summary_statistics.number_of_valid_children}")

            asyncio.run(main())
            ```
        """
        if not self.session_id:
            raise ValueError("session_id is required to export a GridSession")

        trace.get_current_span().set_attributes(
            {
                "synapse.session_id": self.session_id or "",
            }
        )

        # Create and send the export request
        export_request = GridRecordSetExportRequest(session_id=self.session_id)
        result = await export_request.send_job_and_wait_async(
            timeout=timeout, synapse_client=synapse_client
        )

        self.record_set_id = result.response_record_set_id
        self.record_set_version_number = result.record_set_version_number
        self.validation_summary_statistics = result.validation_summary_statistics

        return self

    def fill_from_dict(self, synapse_response: Dict[str, Any]) -> "Grid":
        """Converts a response from the REST API into this dataclass."""
        self.session_id = synapse_response.get("sessionId", None)
        self.started_by = synapse_response.get("startedBy", None)
        self.started_on = synapse_response.get("startedOn", None)
        self.etag = synapse_response.get("etag", None)
        self.modified_on = synapse_response.get("modifiedOn", None)
        self.last_replica_id_client = synapse_response.get("lastReplicaIdClient", None)
        self.last_replica_id_service = synapse_response.get(
            "lastReplicaIdService", None
        )
        self.grid_json_schema_id = synapse_response.get("gridJsonSchema$Id", None)
        self.source_entity_id = synapse_response.get("sourceEntityId", None)
        return self

    @skip_async_to_sync
    @classmethod
    async def list_async(
        cls,
        source_id: Optional[str] = None,
        *,
        synapse_client: Optional[Synapse] = None,
    ) -> AsyncGenerator["Grid", None]:
        """
        Generator to get a list of active grid sessions for the user.

        Arguments:
            source_id: Optional. When provided, only sessions with this synId will be returned.
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

        Yields:
            Grid objects representing active grid sessions.

        Example: List all active grid sessions asynchronously
            &nbsp;

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import Grid

            syn = Synapse()
            syn.login()

            async def main():
                # List all active grid sessions for the user
                async for grid in Grid.list_async():
                    print(f"Session ID: {grid.session_id}")
                    print(f"Source Entity: {grid.source_entity_id}")
                    print(f"Started: {grid.started_on}")
                    print("---")

                # List grid sessions for a specific source
                async for grid in Grid.list_async(source_id="syn1234567"):
                    print(f"Session ID: {grid.session_id}")
                    print(f"Modified: {grid.modified_on}")

            asyncio.run(main())
            ```
        """
        async for session_dict in list_grid_sessions(
            source_id=source_id, synapse_client=synapse_client
        ):
            # Convert the dictionary to a Grid object
            grid = cls()
            grid.fill_from_dict(session_dict)
            yield grid

    @classmethod
    def list(
        cls,
        source_id: Optional[str] = None,
        *,
        synapse_client: Optional[Synapse] = None,
    ) -> Generator["Grid", None, None]:
        """
        Generator to get a list of active grid sessions for the user.

        Arguments:
            source_id: Optional. When provided, only sessions with this synId will be returned.
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

        Yields:
            Grid objects representing active grid sessions.
        """
        return wrap_async_generator_to_sync_generator(
            async_gen_func=cls.list_async,
            source_id=source_id,
            synapse_client=synapse_client,
        )

    async def delete_async(self, *, synapse_client: Optional[Synapse] = None) -> None:
        """
        Delete the grid session.

        Note: Only the user that created a grid session may delete it.

        Arguments:
            synapse_client: If not passed in and caching was not disabled by
                `Synapse.allow_client_caching(False)` this will use the last created
                instance from the Synapse class constructor.

        Returns:
            None

        Raises:
            ValueError: If session_id is not provided.

        Example: Delete a grid session asynchronously
            &nbsp;

            ```python
            import asyncio
            from synapseclient import Synapse
            from synapseclient.models import Grid

            syn = Synapse()
            syn.login()

            async def main():
                # Delete the grid session
                grid = Grid(session_id="abc-123-def")
                await grid.delete_async()
                print("Grid session deleted successfully")

            asyncio.run(main())
            ```
        """
        if not self.session_id:
            raise ValueError("session_id is required to delete a GridSession")

        trace.get_current_span().set_attributes(
            {
                "synapse.session_id": self.session_id or "",
            }
        )

        await delete_grid_session(
            session_id=self.session_id, synapse_client=synapse_client
        )

Functions

create_async async

create_async(attach_to_previous_session=True, *, timeout: int = 120, synapse_client: Optional[Synapse] = None) -> Grid

Creates a new grid session from a record_set_id or initial_query.

When using record_set_id, first checks for existing active sessions that match the record set before creating a new one. When using initial_query, always creates a new session due to the complexity of matching query parameters.

PARAMETER DESCRIPTION
attach_to_previous_session

If True and using record_set_id, will attach to an existing active session if one exists. Defaults to True.

DEFAULT: True

timeout

The number of seconds to wait for the job to complete or progress before raising a SynapseTimeoutError. Defaults to 120.

TYPE: int DEFAULT: 120

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
GridSession

The GridSession object with populated session_id.

TYPE: Grid

RAISES DESCRIPTION
ValueError

If record_set_id or initial_query is not provided.

Create a grid session asynchronously

 

import asyncio
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

async def main():
    # Create a grid session from a record set
    grid = Grid(record_set_id="syn1234567")
    grid = await grid.create_async()
    print(f"Created grid session: {grid.session_id}")

asyncio.run(main())
Source code in synapseclient/models/curation.py
1449
1450
1451
1452
1453
1454
1455
1456
1457
1458
1459
1460
1461
1462
1463
1464
1465
1466
1467
1468
1469
1470
1471
1472
1473
1474
1475
1476
1477
1478
1479
1480
1481
1482
1483
1484
1485
1486
1487
1488
1489
1490
1491
1492
1493
1494
1495
1496
1497
1498
1499
1500
1501
1502
1503
1504
1505
1506
1507
1508
1509
1510
1511
1512
1513
1514
1515
1516
1517
1518
1519
1520
1521
1522
1523
1524
1525
1526
1527
1528
1529
1530
1531
1532
1533
1534
1535
1536
1537
1538
1539
1540
async def create_async(
    self,
    attach_to_previous_session=True,
    *,
    timeout: int = 120,
    synapse_client: Optional[Synapse] = None,
) -> "Grid":
    """
    Creates a new grid session from a `record_set_id` or `initial_query`.

    When using `record_set_id`, first checks for existing active sessions that match
    the record set before creating a new one. When using `initial_query`, always
    creates a new session due to the complexity of matching query parameters.

    Arguments:
        attach_to_previous_session: If True and using `record_set_id`, will attach
            to an existing active session if one exists. Defaults to True.
        timeout: The number of seconds to wait for the job to complete or progress
            before raising a SynapseTimeoutError. Defaults to 120.
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.

    Returns:
        GridSession: The GridSession object with populated session_id.

    Raises:
        ValueError: If `record_set_id` or `initial_query` is not provided.

    Example: Create a grid session asynchronously
        &nbsp;

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import Grid

        syn = Synapse()
        syn.login()

        async def main():
            # Create a grid session from a record set
            grid = Grid(record_set_id="syn1234567")
            grid = await grid.create_async()
            print(f"Created grid session: {grid.session_id}")

        asyncio.run(main())
        ```
    """
    if not self.record_set_id and not self.initial_query:
        raise ValueError(
            "record_set_id or initial_query is required to create a GridSession"
        )

    trace.get_current_span().set_attributes(
        {
            "synapse.record_set_id": self.record_set_id or "",
            "synapse.session_id": self.session_id or "",
        }
    )

    # Check for existing active sessions only when using record_set_id
    # For initial_query, always create a new session due to complexity of matching
    if self.record_set_id and attach_to_previous_session:
        # Look for existing active sessions for this record set
        async for existing_session in self.list_async(
            source_id=self.record_set_id, synapse_client=synapse_client
        ):
            # Found an existing session, populate this object with its data and return
            self.session_id = existing_session.session_id
            self.started_by = existing_session.started_by
            self.started_on = existing_session.started_on
            self.etag = existing_session.etag
            self.modified_on = existing_session.modified_on
            self.last_replica_id_client = existing_session.last_replica_id_client
            self.last_replica_id_service = existing_session.last_replica_id_service
            self.grid_json_schema_id = existing_session.grid_json_schema_id
            self.source_entity_id = existing_session.source_entity_id
            return self

    # No existing session found, create a new one
    create_request = CreateGridRequest(
        record_set_id=self.record_set_id, initial_query=self.initial_query
    )
    result = await create_request.send_job_and_wait_async(
        timeout=timeout, synapse_client=synapse_client
    )

    # Fill this GridSession with the grid session data from the async job response
    result.fill_grid_session_from_response(self)

    return self

export_to_record_set_async async

export_to_record_set_async(*, timeout: int = 120, synapse_client: Optional[Synapse] = None) -> Grid

Exports the grid session data back to a record set. This will create a new version of the original record set with the modified data from the grid session.

PARAMETER DESCRIPTION
timeout

The number of seconds to wait for the job to complete or progress before raising a SynapseTimeoutError. Defaults to 120.

TYPE: int DEFAULT: 120

synapse_client

If not passed in and caching was not disabled by Synapse.allow_client_caching(False) this will use the last created instance from the Synapse class constructor.

TYPE: Optional[Synapse] DEFAULT: None

RETURNS DESCRIPTION
GridSession

The GridSession object with export information populated.

TYPE: Grid

RAISES DESCRIPTION
ValueError

If session_id is not provided.

Export grid session data back to record set asynchronously

 

import asyncio
from synapseclient import Synapse
from synapseclient.models import Grid

syn = Synapse()
syn.login()

async def main():
    # Export modified grid data back to the record set
    grid = Grid(session_id="abc-123-def")
    grid = await grid.export_to_record_set_async()
    print(f"Exported to record set: {grid.record_set_id}")
    print(f"Version number: {grid.record_set_version_number}")
    if grid.validation_summary_statistics:
        print(f"Valid records: {grid.validation_summary_statistics.number_of_valid_children}")

asyncio.run(main())
Source code in synapseclient/models/curation.py
1542
1543
1544
1545
1546
1547
1548
1549
1550
1551
1552
1553
1554
1555
1556
1557
1558
1559
1560
1561
1562
1563
1564
1565
1566
1567
1568
1569
1570
1571
1572
1573
1574
1575
1576
1577
1578
1579
1580
1581
1582
1583
1584
1585
1586
1587
1588
1589
1590
1591
1592
1593
1594
1595
1596
1597
1598
1599
1600
1601
1602
1603
1604
async def export_to_record_set_async(
    self, *, timeout: int = 120, synapse_client: Optional[Synapse] = None
) -> "Grid":
    """
    Exports the grid session data back to a record set. This will create a new version
    of the original record set with the modified data from the grid session.

    Arguments:
        timeout: The number of seconds to wait for the job to complete or progress
            before raising a SynapseTimeoutError. Defaults to 120.
        synapse_client: If not passed in and caching was not disabled by
            `Synapse.allow_client_caching(False)` this will use the last created
            instance from the Synapse class constructor.

    Returns:
        GridSession: The GridSession object with export information populated.

    Raises:
        ValueError: If session_id is not provided.

    Example: Export grid session data back to record set asynchronously
        &nbsp;

        ```python
        import asyncio
        from synapseclient import Synapse
        from synapseclient.models import Grid

        syn = Synapse()
        syn.login()

        async def main():
            # Export modified grid data back to the record set
            grid = Grid(session_id="abc-123-def")
            grid = await grid.export_to_record_set_async()
            print(f"Exported to record set: {grid.record_set_id}")
            print(f"Version number: {grid.record_set_version_number}")
            if grid.validation_summary_statistics:
                print(f"Valid records: {grid.validation_summary_statistics.number_of_valid_children}")

        asyncio.run(main())
        ```
    """
    if not self.session_id:
        raise ValueError("session_id is required to export a GridSession")

    trace.get_current_span().set_attributes(
        {
            "synapse.session_id": self.session_id or "",
        }
    )

    # Create and send the export request
    export_request = GridRecordSetExportRequest(session_id=self.session_id)
    result = await export_request.send_job_and_wait_async(
        timeout=timeout, synapse_client=synapse_client
    )

    self.record_set_id = result.response_record_set_id
    self.record_set_version_number = result.record_set_version_number
    self.validation_summary_statistics = result.validation_summary_statistics

    return self

synapseclient.models.Query dataclass

Represents a SQL query with optional parameters.

This result is modeled from: https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/Query.html

Source code in synapseclient/models/table_components.py
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
@dataclass
class Query:
    """
    Represents a SQL query with optional parameters.

    This result is modeled from: <https://rest-docs.synapse.org/rest/org/sagebionetworks/repo/model/table/Query.html>
    """

    sql: str
    """The SQL query string"""

    additional_filters: Optional[List[Dict[str, Any]]] = None
    """Appends additional filters to the SQL query. These are applied before facets.
    Filters within the list have an AND relationship. If a WHERE clause already exists
    on the SQL query or facets are selected, it will also be ANDed with the query
    generated by these additional filters."""
    """TODO: create QueryFilter dataclass: https://sagebionetworks.jira.com/browse/SYNPY-1651"""

    selected_facets: Optional[List[Dict[str, Any]]] = None
    """The selected facet filters"""
    """TODO: create FacetColumnRequest dataclass: https://sagebionetworks.jira.com/browse/SYNPY-1651"""

    include_entity_etag: Optional[bool] = False
    """Optional, default false. When true, a query results against views will include
    the Etag of each entity in the results. Note: The etag is necessary to update
    Entities in the view."""

    select_file_column: Optional[int] = None
    """The id of the column used to select file entities (e.g. to fetch the action
    required for download). The column needs to be an ENTITYID type column and be
    part of the schema of the underlying table/view."""

    select_file_version_column: Optional[int] = None
    """The id of the column used as the version for selecting file entities when required
    (e.g. to add a materialized view query to the download cart with version enabled).
    The column needs to be an INTEGER type column and be part of the schema of the
    underlying table/view."""

    offset: Optional[int] = None
    """The optional offset into the results"""

    limit: Optional[int] = None
    """The optional limit to the results"""

    sort: Optional[List[Dict[str, Any]]] = None
    """The sort order for the query results (ARRAY<SortItem>)"""
    """TODO: Add SortItem dataclass: https://sagebionetworks.jira.com/browse/SYNPY-1651 """

    def to_synapse_request(self) -> Dict[str, Any]:
        """Converts the Query object into a dictionary that can be passed into the REST API."""
        result = {
            "sql": self.sql,
            "additionalFilters": self.additional_filters,
            "selectedFacets": self.selected_facets,
            "includeEntityEtag": self.include_entity_etag,
            "selectFileColumn": self.select_file_column,
            "selectFileVersionColumn": self.select_file_version_column,
            "offset": self.offset,
            "limit": self.limit,
            "sort": self.sort,
        }
        delete_none_keys(result)
        return result

Attributes

sql instance-attribute

sql: str

The SQL query string

additional_filters class-attribute instance-attribute

additional_filters: Optional[List[Dict[str, Any]]] = None

Appends additional filters to the SQL query. These are applied before facets. Filters within the list have an AND relationship. If a WHERE clause already exists on the SQL query or facets are selected, it will also be ANDed with the query generated by these additional filters.

selected_facets class-attribute instance-attribute

selected_facets: Optional[List[Dict[str, Any]]] = None

The selected facet filters

include_entity_etag class-attribute instance-attribute

include_entity_etag: Optional[bool] = False

Optional, default false. When true, a query results against views will include the Etag of each entity in the results. Note: The etag is necessary to update Entities in the view.

select_file_column class-attribute instance-attribute

select_file_column: Optional[int] = None

The id of the column used to select file entities (e.g. to fetch the action required for download). The column needs to be an ENTITYID type column and be part of the schema of the underlying table/view.

select_file_version_column class-attribute instance-attribute

select_file_version_column: Optional[int] = None

The id of the column used as the version for selecting file entities when required (e.g. to add a materialized view query to the download cart with version enabled). The column needs to be an INTEGER type column and be part of the schema of the underlying table/view.

offset class-attribute instance-attribute

offset: Optional[int] = None

The optional offset into the results

limit class-attribute instance-attribute

limit: Optional[int] = None

The optional limit to the results

sort class-attribute instance-attribute

sort: Optional[List[Dict[str, Any]]] = None

The sort order for the query results (ARRAY)