Skip to content

API Reference

core

BioCypher core module.

Interfaces with the user and distributes tasks to submodules.

BioCypher

Orchestration of BioCypher operations.

Instantiate this class to interact with BioCypher.


dbms (str): The database management system to use. For supported
    systems see SUPPORTED_DBMS.

offline (bool): Whether to run in offline mode. In offline mode
    the Knowledge Graph is written to files. In online mode, it
    is written to a database or hold in memory.

strict_mode (bool): Whether to run in strict mode. If True, the
    translator will raise an error if a node or edge does not
    provide source, version, and licence information.

biocypher_config_path (str): Path to the BioCypher config file.

schema_config_path (str): Path to the user schema config
    file.

head_ontology (dict): The head ontology defined by URL ('url') and root
    node ('root_node').

tail_ontologies (dict): The tail ontologies defined by URL and
    join nodes for both head and tail ontology.

output_directory (str): Path to the output directory. If not
    provided, the default value 'biocypher-out' will be used.

cache_directory (str): Path to the cache directory.
Source code in biocypher/_core.py
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
class BioCypher:
    """Orchestration of BioCypher operations.

    Instantiate this class to interact with BioCypher.

    Args:
    ----
        dbms (str): The database management system to use. For supported
            systems see SUPPORTED_DBMS.

        offline (bool): Whether to run in offline mode. In offline mode
            the Knowledge Graph is written to files. In online mode, it
            is written to a database or hold in memory.

        strict_mode (bool): Whether to run in strict mode. If True, the
            translator will raise an error if a node or edge does not
            provide source, version, and licence information.

        biocypher_config_path (str): Path to the BioCypher config file.

        schema_config_path (str): Path to the user schema config
            file.

        head_ontology (dict): The head ontology defined by URL ('url') and root
            node ('root_node').

        tail_ontologies (dict): The tail ontologies defined by URL and
            join nodes for both head and tail ontology.

        output_directory (str): Path to the output directory. If not
            provided, the default value 'biocypher-out' will be used.

        cache_directory (str): Path to the cache directory.

    """

    def __init__(
        self,
        dbms: str = None,
        offline: bool = None,
        strict_mode: bool = None,
        biocypher_config_path: str = None,
        schema_config_path: str = None,
        head_ontology: dict = None,
        tail_ontologies: dict = None,
        output_directory: str = None,
        cache_directory: str = None,
        # legacy params
        db_name: str = None,
    ):
        # Update configuration if custom path is provided
        if biocypher_config_path:
            _file_update(biocypher_config_path)

        if db_name:
            logger.warning(
                "The parameter `db_name` is deprecated. Please set the "
                "`database_name` setting in the `biocypher_config.yaml` file "
                "instead.",
            )
            _config(**{db_name: {"database_name": db_name}})

        # Load configuration
        self.base_config = _config("biocypher")

        # Check for required configuration
        for key in REQUIRED_CONFIG:
            if key not in self.base_config:
                msg = f"Configuration key {key} is required."
                raise ValueError(msg)

        # Set configuration - mandatory
        self._dbms = dbms or self.base_config["dbms"]

        if offline is None:
            self._offline = self.base_config["offline"]
        else:
            self._offline = offline

        # Check if pandas/tabular is being used in offline mode
        if self._offline and self._dbms.lower() in ["pandas", "tabular"]:
            msg = (
                f"The '{self._dbms}' DBMS is only available in online mode. "
                f"If you want to write CSV files, use 'csv' as the DBMS. "
                f"If you want to use pandas, set 'offline: false' in your configuration."
            )
            raise ValueError(msg)

        if strict_mode is None:
            self._strict_mode = self.base_config["strict_mode"]
        else:
            self._strict_mode = strict_mode

        self._schema_config_path = schema_config_path or self.base_config.get(
            "schema_config_path",
        )

        if not self._schema_config_path:
            logger.warning("Running BioCypher without schema configuration.")
        else:
            logger.info(
                f"Running BioCypher with schema configuration from {self._schema_config_path}.",
            )

        self._head_ontology = head_ontology or self.base_config["head_ontology"]

        # Set configuration - optional
        self._output_directory = output_directory or self.base_config.get(
            "output_directory",
        )
        self._cache_directory = cache_directory or self.base_config.get(
            "cache_directory",
        )
        self._tail_ontologies = tail_ontologies or self.base_config.get(
            "tail_ontologies",
        )

        if self._dbms not in SUPPORTED_DBMS:
            msg = f"DBMS {self._dbms} not supported. Please select from {SUPPORTED_DBMS}."
            raise ValueError(msg)

        # Initialize
        self._ontology_mapping = None
        self._deduplicator = None
        self._translator = None
        self._downloader = None
        self._ontology = None
        self._writer = None
        self._driver = None
        self._in_memory_kg = None

        self._in_memory_kg = None
        self._nodes = None
        self._edges = None

    def _initialize_in_memory_kg(self) -> None:
        """Create in-memory KG instance.

        Set as instance variable `self._in_memory_kg`.
        """
        if not self._in_memory_kg:
            self._in_memory_kg = get_in_memory_kg(
                dbms=self._dbms,
                deduplicator=self._get_deduplicator(),
            )

    def add_nodes(self, nodes) -> None:
        """Add new nodes to the internal representation.

        Initially, receive nodes data from adaptor and create internal
        representation for nodes.

        Args:
        ----
            nodes(iterable): An iterable of nodes

        """
        if isinstance(nodes, list):
            self._nodes = list(itertools.chain(self._nodes, nodes))
        else:
            self._nodes = itertools.chain(self._nodes, nodes)

    def add_edges(self, edges) -> None:
        """Add new edges to the internal representation.

        Initially, receive edges data from adaptor and create internal
        representation for edges.

        Args:
        ----
             edges(iterable): An iterable of edges.

        """
        if isinstance(edges, list):
            self._edges = list(itertools.chain(self._edges, edges))
        else:
            self._edges = itertools.chain(self._edges, edges)

    def to_df(self):
        """Create DataFrame using internal representation.

        TODO: to_df implies data frame, should be specifically that use case
        """
        return self._to_KG()

    def to_networkx(self):
        """Create networkx using internal representation."""
        return self._to_KG()

    def _to_KG(self):
        """Convert the internal representation to knowledge graph.

        The knowledge graph is returned based on the `dbms` parameter in
        the biocypher configuration file.

        TODO: These conditionals are a hack, we need to refactor the in-memory
        KG to be generic, and simplify access and conversion to output formats.

        Returns
        -------
             Any: knowledge graph.

        """
        # If we're using an in-memory KG and it already exists, return it directly
        if self._in_memory_kg and self._is_online_and_in_memory():
            return self._in_memory_kg.get_kg()

        # Otherwise, initialize and populate the in-memory KG
        if not self._in_memory_kg:
            self._initialize_in_memory_kg()
        if not self._translator:
            self._get_translator()

        # These attributes might not exist when using in-memory KG directly
        if hasattr(self, "_nodes") and hasattr(self, "_edges"):
            tnodes = self._translator.translate_entities(self._nodes)
            tedges = self._translator.translate_entities(self._edges)
            self._in_memory_kg.add_nodes(tnodes)
            self._in_memory_kg.add_edges(tedges)

        return self._in_memory_kg.get_kg()

    def _get_deduplicator(self) -> Deduplicator:
        """Create deduplicator if not exists and return."""
        if not self._deduplicator:
            self._deduplicator = Deduplicator()

        return self._deduplicator

    def _get_ontology_mapping(self) -> OntologyMapping:
        """Create ontology mapping if not exists and return."""
        if not self._schema_config_path:
            self._ontology_mapping = OntologyMapping()

        if not self._ontology_mapping:
            self._ontology_mapping = OntologyMapping(
                config_file=self._schema_config_path,
            )

        return self._ontology_mapping

    def _get_ontology(self) -> Ontology:
        """Create ontology if not exists and return."""
        if not self._ontology:
            self._ontology = Ontology(
                ontology_mapping=self._get_ontology_mapping(),
                head_ontology=self._head_ontology,
                tail_ontologies=self._tail_ontologies,
            )

        return self._ontology

    def _get_translator(self) -> Translator:
        """Create translator if not exists and return."""
        if not self._translator:
            self._translator = Translator(
                ontology=self._get_ontology(),
                strict_mode=self._strict_mode,
            )

        return self._translator

    def _initialize_writer(self) -> None:
        """Create writer if not online.

        Set as instance variable `self._writer`.
        """
        if self._offline:

            def timestamp() -> str:
                return datetime.now().strftime("%Y%m%d%H%M%S")

            outdir = self._output_directory or os.path.join(
                "biocypher-out",
                timestamp(),
            )
            self._output_directory = os.path.abspath(outdir)

            self._writer = get_writer(
                dbms=self._dbms,
                translator=self._get_translator(),
                deduplicator=self._get_deduplicator(),
                output_directory=self._output_directory,
                strict_mode=self._strict_mode,
            )
        else:
            msg = "Cannot get writer in online mode."
            raise NotImplementedError(msg)

    def _get_driver(self):
        """Create driver if not exists.

        Set as instance variable `self._driver`.
        """
        if not self._offline:
            self._driver = get_connector(
                dbms=self._dbms,
                translator=self._get_translator(),
            )
        else:
            msg = "Cannot get driver in offline mode."
            raise NotImplementedError(msg)

        return self._driver

    def _get_in_memory_kg(self):
        """Create in-memory KG instance.

        Set as instance variable `self._in_memory_kg`.
        """
        if not self._in_memory_kg:
            self._in_memory_kg = get_in_memory_kg(
                dbms=self._dbms,
                deduplicator=self._get_deduplicator(),
            )

        return self._in_memory_kg

    def _add_nodes(
        self,
        nodes,
        batch_size: int = int(1e6),
        force: bool = False,
    ):
        """Add nodes to the BioCypher KG.

        First uses the `_translator` to translate the nodes to `BioCypherNode`
        objects. Depending on the configuration the translated nodes are then
        passed to the

        - `_writer`: if `_offline` is set to `False`

        - `_in_memory_kg`: if `_offline` is set to `False` and the `_dbms` is an
            `IN_MEMORY_DBMS`

        - `_driver`: if `_offline` is set to `True` and the `_dbms` is not an
            `IN_MEMORY_DBMS`

        """
        if not self._translator:
            self._get_translator()
        translated_nodes = self._translator.translate_entities(nodes)

        if self._offline:
            if not self._writer:
                self._initialize_writer()
            passed = self._writer.write_nodes(
                translated_nodes,
                batch_size=batch_size,
                force=force,
            )
        elif self._is_online_and_in_memory():
            passed = self._get_in_memory_kg().add_nodes(translated_nodes)
        else:
            passed = self._get_driver().add_biocypher_nodes(translated_nodes)

        return passed

    def _add_edges(self, edges, batch_size: int = int(1e6)):
        """Add edges to the BioCypher KG.

        First uses the `_translator` to translate the edges to `BioCypherEdge`
        objects. Depending on the configuration the translated edges are then
        passed to the

        - `_writer`: if `_offline` is set to `False`

        - `_in_memory_kg`: if `_offline` is set to `False` and the `_dbms` is an
            `IN_MEMORY_DBMS`

        - `_driver`: if `_offline` is set to `True` and the `_dbms` is not an
            `IN_MEMORY_DBMS`

        """
        if not self._translator:
            self._get_translator()
        translated_edges = self._translator.translate_entities(edges)

        if self._offline:
            if not self._writer:
                self._initialize_writer()
            passed = self._writer.write_edges(
                translated_edges,
                batch_size=batch_size,
            )
        elif self._is_online_and_in_memory():
            if not self._in_memory_kg:
                self._initialize_in_memory_kg()
            passed = self._in_memory_kg.add_edges(translated_edges)
        else:
            if not self._driver:
                self._initialize_driver()
            passed = self._driver.add_biocypher_nodes(translated_edges)

        return passed

    def _is_online_and_in_memory(self) -> bool:
        """Return True if in online mode and in-memory dbms is used."""
        return (not self._offline) & (self._dbms in IN_MEMORY_DBMS)

    def write_nodes(
        self,
        nodes,
        batch_size: int = int(1e6),
        force: bool = False,
    ) -> bool:
        """Write nodes to database.

        Either takes an iterable of tuples (if given, translates to
        ``BioCypherNode`` objects) or an iterable of ``BioCypherNode`` objects.

        Args:
        ----
            nodes (iterable): An iterable of nodes to write to the database.
            batch_size (int): The batch size to use when writing to disk.
            force (bool): Whether to force writing to the output directory even
                if the node type is not present in the schema config file.

        Returns:
        -------
            bool: True if successful.

        """
        return self._add_nodes(nodes, batch_size=batch_size, force=force)

    def write_edges(self, edges, batch_size: int = int(1e6)) -> bool:
        """Write edges to database.

        Either takes an iterable of tuples (if given, translates to
        ``BioCypherEdge`` objects) or an iterable of ``BioCypherEdge`` objects.

        Args:
        ----
            edges (iterable): An iterable of edges to write to the database.

        Returns:
        -------
            bool: True if successful.

        """
        return self._add_edges(edges, batch_size=batch_size)

    def add(self, entities) -> None:
        """Add entities to the in-memory database.

        Accepts an iterable of tuples (if given, translates to
        ``BioCypherNode`` or ``BioCypherEdge`` objects) or an iterable of
        ``BioCypherNode`` or ``BioCypherEdge`` objects.

        Args:
        ----
            entities (iterable): An iterable of entities to add to the database.
                Can be 3-tuples (nodes) or 5-tuples (edges); also accepts
                4-tuples for edges (deprecated).

        Returns:
        -------
            None

        """
        return self._add_nodes(entities)

    def merge_nodes(self, nodes) -> bool:
        """Merge nodes into database.

        Either takes an iterable of tuples (if given, translates to
        ``BioCypherNode`` objects) or an iterable of ``BioCypherNode`` objects.

        Args:
        ----
            nodes (iterable): An iterable of nodes to merge into the database.

        Returns:
        -------
            bool: True if successful.

        """
        return self._add_nodes(nodes)

    def merge_edges(self, edges) -> bool:
        """Merge edges into database.

        Either takes an iterable of tuples (if given, translates to
        ``BioCypherEdge`` objects) or an iterable of ``BioCypherEdge`` objects.

        Args:
        ----
            edges (iterable): An iterable of edges to merge into the database.

        Returns:
        -------
            bool: True if successful.

        """
        return self._add_edges(edges)

    def get_kg(self):
        """Get the in-memory KG instance.

        Depending on the specified `dbms` this could either be a list of Pandas
        dataframes or a NetworkX DiGraph.
        """
        if not self._is_online_and_in_memory():
            msg = (f"Getting the in-memory KG is only available in online mode for {IN_MEMORY_DBMS}.",)
            raise ValueError(msg)
        if not self._in_memory_kg:
            msg = "No in-memory KG instance found. Please call `add()` first."
            raise ValueError(msg)

        if not self._in_memory_kg:
            self._initialize_in_memory_kg()
        return self._in_memory_kg.get_kg()

    # DOWNLOAD AND CACHE MANAGEMENT METHODS ###

    def _get_downloader(self, cache_dir: str | None = None):
        """Create downloader if not exists."""
        if not self._downloader:
            self._downloader = Downloader(self._cache_directory)

    def download(self, *resources) -> None:
        """Download or load from cache the resources given by the adapter.

        Args:
        ----
            resources (iterable): An iterable of resources to download or load
                from cache.

        Returns:
        -------
            None

        """
        self._get_downloader()
        return self._downloader.download(*resources)

    # OVERVIEW AND CONVENIENCE METHODS ###

    def log_missing_input_labels(self) -> dict[str, list[str]] | None:
        """Log missing input labels.

        Get the set of input labels encountered without an entry in the
        `schema_config.yaml` and print them to the logger.

        Returns
        -------
            Optional[Dict[str, List[str]]]: A dictionary of Biolink types
            encountered without an entry in the `schema_config.yaml` file.

        """
        mt = self._translator.get_missing_biolink_types()

        if mt:
            msg = (
                "Input entities not accounted for due to them not being "
                f"present in the schema configuration file {self._schema_config_path} "
                "(this is not necessarily a problem, if you did not intend "
                "to include them in the database; see the log for details): \n"
            )
            for k, v in mt.items():
                msg += f"    {k}: {v} \n"

            logger.info(msg)
            return mt

        else:
            logger.info("No missing labels in input.")
            return None

    def log_duplicates(self) -> None:
        """Log duplicate nodes and edges.

        Get the set of duplicate nodes and edges encountered and print them to
        the logger.
        """
        dn = self._deduplicator.get_duplicate_nodes()

        if dn:
            ntypes = dn[0]
            nids = dn[1]

            msg = "Duplicate node types encountered (IDs in log): \n"
            for typ in ntypes:
                msg += f"    {typ}\n"

            logger.info(msg)

            idmsg = "Duplicate node IDs encountered: \n"
            for _id in nids:
                idmsg += f"    {_id}\n"

            logger.debug(idmsg)

        else:
            logger.info("No duplicate nodes in input.")

        de = self._deduplicator.get_duplicate_edges()

        if de:
            etypes = de[0]
            eids = de[1]

            msg = "Duplicate edge types encountered (IDs in log): \n"
            for typ in etypes:
                msg += f"    {typ}\n"

            logger.info(msg)

            idmsg = "Duplicate edge IDs encountered: \n"
            for _id in eids:
                idmsg += f"    {_id}\n"

            logger.debug(idmsg)

        else:
            logger.info("No duplicate edges in input.")

    def show_ontology_structure(self, **kwargs) -> None:
        """Show the ontology structure using treelib or write to GRAPHML file.

        Args:
        ----
            to_disk (str): If specified, the ontology structure will be saved
                to disk as a GRAPHML file, to be opened in your favourite
                graph visualisation tool.

            full (bool): If True, the full ontology structure will be shown,
                including all nodes and edges. If False, only the nodes and
                edges that are relevant to the extended schema will be shown.

        """
        if not self._ontology:
            self._get_ontology()

        return self._ontology.show_ontology_structure(**kwargs)

    def write_import_call(self) -> str:
        """Write a shell script to import the database.

        Shell script is written depending on the chosen DBMS.

        Returns
        -------
            str: path toward the file holding the import call.

        """
        if not self._offline:
            msg = "Cannot write import call in online mode."
            raise NotImplementedError(msg)
        else:
            if not self._writer:
                logger.warning(
                    "No edges or nodes were added, I'll try to continue, but you may want to double-check your data."
                )
                self._initialize_writer()

        return self._writer.write_import_call()

    def write_schema_info(self, as_node: bool = False) -> None:
        """Write an extended schema info to file or node.

        Creates a YAML file or KG node that extends the `schema_config.yaml`
        with run-time information of the built KG. For instance, include
        information on whether something present in the actual knowledge graph,
        whether it is a relationship (which is important in the case of
        representing relationships as nodes) and the actual sources and
        targets of edges. Since this file can be used in place of the original
        `schema_config.yaml` file, it indicates that it is the extended schema
        by setting `is_schema_info` to `true`.

        We start by using the `extended_schema` dictionary from the ontology
        class instance, which contains all expanded entities and relationships.
        The information of whether something is a relationship can be gathered
        from the deduplicator instance, which keeps track of all entities that
        have been seen.

        Args:
        ----
            as_node (bool): If True, the schema info is written as a KG node.
                If False, the schema info is written to a YAML file.

        """
        if (not self._offline) and self._dbms not in IN_MEMORY_DBMS:
            msg = "Cannot write schema info in online mode."
            raise NotImplementedError(msg)

        ontology = self._get_ontology()
        schema = ontology.mapping.extended_schema.copy()
        schema["is_schema_info"] = True

        deduplicator = self._get_deduplicator()
        for node in deduplicator.entity_types:
            if node in schema:
                schema[node]["present_in_knowledge_graph"] = True
                schema[node]["is_relationship"] = False
            else:
                logger.info(
                    f"Node {node} not present in extended schema. Skipping schema info.",
                )

        # find 'label_as_edge' cases in schema entries
        changed_labels = {}
        for k, v in schema.items():
            if not isinstance(v, dict):
                continue
            if "label_as_edge" in v:
                if v["label_as_edge"] in deduplicator.seen_relationships:
                    changed_labels[v["label_as_edge"]] = k

        for edge in deduplicator.seen_relationships:
            if edge in changed_labels:
                edge = changed_labels[edge]
            if edge in schema:
                schema[edge]["present_in_knowledge_graph"] = True
                schema[edge]["is_relationship"] = True
                # TODO information about source and target nodes
            else:
                logger.info(
                    f"Edge {edge} not present in extended schema. Skipping schema info.",
                )

        # write to output directory as YAML file
        path = os.path.join(self._output_directory, "schema_info.yaml")
        with open(path, "w") as f:
            f.write(yaml.dump(schema))

        if as_node:
            # write as node
            node = BioCypherNode(
                node_id="schema_info",
                node_label="schema_info",
                properties={"schema_info": json.dumps(schema)},
            )
            self.write_nodes([node], force=True)

            # override import call with added schema info node
            self.write_import_call()

        return schema

    # TRANSLATION METHODS ###

    def translate_term(self, term: str) -> str:
        """Translate a term to its BioCypher equivalent.

        Args:
        ----
            term (str): The term to translate.

        Returns:
        -------
            str: The BioCypher equivalent of the term.

        """
        # instantiate adapter if not exists
        self.start_ontology()

        return self._translator.translate_term(term)

    def summary(self) -> None:
        """Call convenience and reporting methods.

        Shows ontology structure and logs duplicates and missing input types.
        """
        self.show_ontology_structure()
        self.log_duplicates()
        self.log_missing_input_labels()

    def reverse_translate_term(self, term: str) -> str:
        """Reverse translate a term from its BioCypher equivalent.

        Args:
        ----
            term (str): The BioCypher term to reverse translate.

        Returns:
        -------
            str: The original term.

        """
        # instantiate adapter if not exists
        self.start_ontology()

        return self._translator.reverse_translate_term(term)

    def translate_query(self, query: str) -> str:
        """Translate a query to its BioCypher equivalent.

        Args:
        ----
            query (str): The query to translate.

        Returns:
        -------
            str: The BioCypher equivalent of the query.

        """
        # instantiate adapter if not exists
        self.start_ontology()

        return self._translator.translate(query)

    def reverse_translate_query(self, query: str) -> str:
        """Reverse translate a query from its BioCypher equivalent.

        Args:
        ----
            query (str): The BioCypher query to reverse translate.

        Returns:
        -------
            str: The original query.

        """
        # instantiate adapter if not exists
        self.start_ontology()

        return self._translator.reverse_translate(query)

add(entities)

Add entities to the in-memory database.

Accepts an iterable of tuples (if given, translates to BioCypherNode or BioCypherEdge objects) or an iterable of BioCypherNode or BioCypherEdge objects.


entities (iterable): An iterable of entities to add to the database.
    Can be 3-tuples (nodes) or 5-tuples (edges); also accepts
    4-tuples for edges (deprecated).

None
Source code in biocypher/_core.py
def add(self, entities) -> None:
    """Add entities to the in-memory database.

    Accepts an iterable of tuples (if given, translates to
    ``BioCypherNode`` or ``BioCypherEdge`` objects) or an iterable of
    ``BioCypherNode`` or ``BioCypherEdge`` objects.

    Args:
    ----
        entities (iterable): An iterable of entities to add to the database.
            Can be 3-tuples (nodes) or 5-tuples (edges); also accepts
            4-tuples for edges (deprecated).

    Returns:
    -------
        None

    """
    return self._add_nodes(entities)

add_edges(edges)

Add new edges to the internal representation.

Initially, receive edges data from adaptor and create internal representation for edges.


 edges(iterable): An iterable of edges.
Source code in biocypher/_core.py
def add_edges(self, edges) -> None:
    """Add new edges to the internal representation.

    Initially, receive edges data from adaptor and create internal
    representation for edges.

    Args:
    ----
         edges(iterable): An iterable of edges.

    """
    if isinstance(edges, list):
        self._edges = list(itertools.chain(self._edges, edges))
    else:
        self._edges = itertools.chain(self._edges, edges)

add_nodes(nodes)

Add new nodes to the internal representation.

Initially, receive nodes data from adaptor and create internal representation for nodes.


nodes(iterable): An iterable of nodes
Source code in biocypher/_core.py
def add_nodes(self, nodes) -> None:
    """Add new nodes to the internal representation.

    Initially, receive nodes data from adaptor and create internal
    representation for nodes.

    Args:
    ----
        nodes(iterable): An iterable of nodes

    """
    if isinstance(nodes, list):
        self._nodes = list(itertools.chain(self._nodes, nodes))
    else:
        self._nodes = itertools.chain(self._nodes, nodes)

download(*resources)

Download or load from cache the resources given by the adapter.


resources (iterable): An iterable of resources to download or load
    from cache.

None
Source code in biocypher/_core.py
def download(self, *resources) -> None:
    """Download or load from cache the resources given by the adapter.

    Args:
    ----
        resources (iterable): An iterable of resources to download or load
            from cache.

    Returns:
    -------
        None

    """
    self._get_downloader()
    return self._downloader.download(*resources)

get_kg()

Get the in-memory KG instance.

Depending on the specified dbms this could either be a list of Pandas dataframes or a NetworkX DiGraph.

Source code in biocypher/_core.py
def get_kg(self):
    """Get the in-memory KG instance.

    Depending on the specified `dbms` this could either be a list of Pandas
    dataframes or a NetworkX DiGraph.
    """
    if not self._is_online_and_in_memory():
        msg = (f"Getting the in-memory KG is only available in online mode for {IN_MEMORY_DBMS}.",)
        raise ValueError(msg)
    if not self._in_memory_kg:
        msg = "No in-memory KG instance found. Please call `add()` first."
        raise ValueError(msg)

    if not self._in_memory_kg:
        self._initialize_in_memory_kg()
    return self._in_memory_kg.get_kg()

log_duplicates()

Log duplicate nodes and edges.

Get the set of duplicate nodes and edges encountered and print them to the logger.

Source code in biocypher/_core.py
def log_duplicates(self) -> None:
    """Log duplicate nodes and edges.

    Get the set of duplicate nodes and edges encountered and print them to
    the logger.
    """
    dn = self._deduplicator.get_duplicate_nodes()

    if dn:
        ntypes = dn[0]
        nids = dn[1]

        msg = "Duplicate node types encountered (IDs in log): \n"
        for typ in ntypes:
            msg += f"    {typ}\n"

        logger.info(msg)

        idmsg = "Duplicate node IDs encountered: \n"
        for _id in nids:
            idmsg += f"    {_id}\n"

        logger.debug(idmsg)

    else:
        logger.info("No duplicate nodes in input.")

    de = self._deduplicator.get_duplicate_edges()

    if de:
        etypes = de[0]
        eids = de[1]

        msg = "Duplicate edge types encountered (IDs in log): \n"
        for typ in etypes:
            msg += f"    {typ}\n"

        logger.info(msg)

        idmsg = "Duplicate edge IDs encountered: \n"
        for _id in eids:
            idmsg += f"    {_id}\n"

        logger.debug(idmsg)

    else:
        logger.info("No duplicate edges in input.")

log_missing_input_labels()

Log missing input labels.

Get the set of input labels encountered without an entry in the schema_config.yaml and print them to the logger.

Returns
Optional[Dict[str, List[str]]]: A dictionary of Biolink types
encountered without an entry in the `schema_config.yaml` file.
Source code in biocypher/_core.py
def log_missing_input_labels(self) -> dict[str, list[str]] | None:
    """Log missing input labels.

    Get the set of input labels encountered without an entry in the
    `schema_config.yaml` and print them to the logger.

    Returns
    -------
        Optional[Dict[str, List[str]]]: A dictionary of Biolink types
        encountered without an entry in the `schema_config.yaml` file.

    """
    mt = self._translator.get_missing_biolink_types()

    if mt:
        msg = (
            "Input entities not accounted for due to them not being "
            f"present in the schema configuration file {self._schema_config_path} "
            "(this is not necessarily a problem, if you did not intend "
            "to include them in the database; see the log for details): \n"
        )
        for k, v in mt.items():
            msg += f"    {k}: {v} \n"

        logger.info(msg)
        return mt

    else:
        logger.info("No missing labels in input.")
        return None

merge_edges(edges)

Merge edges into database.

Either takes an iterable of tuples (if given, translates to BioCypherEdge objects) or an iterable of BioCypherEdge objects.


edges (iterable): An iterable of edges to merge into the database.

bool: True if successful.
Source code in biocypher/_core.py
def merge_edges(self, edges) -> bool:
    """Merge edges into database.

    Either takes an iterable of tuples (if given, translates to
    ``BioCypherEdge`` objects) or an iterable of ``BioCypherEdge`` objects.

    Args:
    ----
        edges (iterable): An iterable of edges to merge into the database.

    Returns:
    -------
        bool: True if successful.

    """
    return self._add_edges(edges)

merge_nodes(nodes)

Merge nodes into database.

Either takes an iterable of tuples (if given, translates to BioCypherNode objects) or an iterable of BioCypherNode objects.


nodes (iterable): An iterable of nodes to merge into the database.

bool: True if successful.
Source code in biocypher/_core.py
def merge_nodes(self, nodes) -> bool:
    """Merge nodes into database.

    Either takes an iterable of tuples (if given, translates to
    ``BioCypherNode`` objects) or an iterable of ``BioCypherNode`` objects.

    Args:
    ----
        nodes (iterable): An iterable of nodes to merge into the database.

    Returns:
    -------
        bool: True if successful.

    """
    return self._add_nodes(nodes)

reverse_translate_query(query)

Reverse translate a query from its BioCypher equivalent.


query (str): The BioCypher query to reverse translate.

str: The original query.
Source code in biocypher/_core.py
def reverse_translate_query(self, query: str) -> str:
    """Reverse translate a query from its BioCypher equivalent.

    Args:
    ----
        query (str): The BioCypher query to reverse translate.

    Returns:
    -------
        str: The original query.

    """
    # instantiate adapter if not exists
    self.start_ontology()

    return self._translator.reverse_translate(query)

reverse_translate_term(term)

Reverse translate a term from its BioCypher equivalent.


term (str): The BioCypher term to reverse translate.

str: The original term.
Source code in biocypher/_core.py
def reverse_translate_term(self, term: str) -> str:
    """Reverse translate a term from its BioCypher equivalent.

    Args:
    ----
        term (str): The BioCypher term to reverse translate.

    Returns:
    -------
        str: The original term.

    """
    # instantiate adapter if not exists
    self.start_ontology()

    return self._translator.reverse_translate_term(term)

show_ontology_structure(**kwargs)

Show the ontology structure using treelib or write to GRAPHML file.


to_disk (str): If specified, the ontology structure will be saved
    to disk as a GRAPHML file, to be opened in your favourite
    graph visualisation tool.

full (bool): If True, the full ontology structure will be shown,
    including all nodes and edges. If False, only the nodes and
    edges that are relevant to the extended schema will be shown.
Source code in biocypher/_core.py
def show_ontology_structure(self, **kwargs) -> None:
    """Show the ontology structure using treelib or write to GRAPHML file.

    Args:
    ----
        to_disk (str): If specified, the ontology structure will be saved
            to disk as a GRAPHML file, to be opened in your favourite
            graph visualisation tool.

        full (bool): If True, the full ontology structure will be shown,
            including all nodes and edges. If False, only the nodes and
            edges that are relevant to the extended schema will be shown.

    """
    if not self._ontology:
        self._get_ontology()

    return self._ontology.show_ontology_structure(**kwargs)

summary()

Call convenience and reporting methods.

Shows ontology structure and logs duplicates and missing input types.

Source code in biocypher/_core.py
def summary(self) -> None:
    """Call convenience and reporting methods.

    Shows ontology structure and logs duplicates and missing input types.
    """
    self.show_ontology_structure()
    self.log_duplicates()
    self.log_missing_input_labels()

to_df()

Create DataFrame using internal representation.

TODO: to_df implies data frame, should be specifically that use case

Source code in biocypher/_core.py
def to_df(self):
    """Create DataFrame using internal representation.

    TODO: to_df implies data frame, should be specifically that use case
    """
    return self._to_KG()

to_networkx()

Create networkx using internal representation.

Source code in biocypher/_core.py
def to_networkx(self):
    """Create networkx using internal representation."""
    return self._to_KG()

translate_query(query)

Translate a query to its BioCypher equivalent.


query (str): The query to translate.

str: The BioCypher equivalent of the query.
Source code in biocypher/_core.py
def translate_query(self, query: str) -> str:
    """Translate a query to its BioCypher equivalent.

    Args:
    ----
        query (str): The query to translate.

    Returns:
    -------
        str: The BioCypher equivalent of the query.

    """
    # instantiate adapter if not exists
    self.start_ontology()

    return self._translator.translate(query)

translate_term(term)

Translate a term to its BioCypher equivalent.


term (str): The term to translate.

str: The BioCypher equivalent of the term.
Source code in biocypher/_core.py
def translate_term(self, term: str) -> str:
    """Translate a term to its BioCypher equivalent.

    Args:
    ----
        term (str): The term to translate.

    Returns:
    -------
        str: The BioCypher equivalent of the term.

    """
    # instantiate adapter if not exists
    self.start_ontology()

    return self._translator.translate_term(term)

write_edges(edges, batch_size=int(1000000.0))

Write edges to database.

Either takes an iterable of tuples (if given, translates to BioCypherEdge objects) or an iterable of BioCypherEdge objects.


edges (iterable): An iterable of edges to write to the database.

bool: True if successful.
Source code in biocypher/_core.py
def write_edges(self, edges, batch_size: int = int(1e6)) -> bool:
    """Write edges to database.

    Either takes an iterable of tuples (if given, translates to
    ``BioCypherEdge`` objects) or an iterable of ``BioCypherEdge`` objects.

    Args:
    ----
        edges (iterable): An iterable of edges to write to the database.

    Returns:
    -------
        bool: True if successful.

    """
    return self._add_edges(edges, batch_size=batch_size)

write_import_call()

Write a shell script to import the database.

Shell script is written depending on the chosen DBMS.

Returns
str: path toward the file holding the import call.
Source code in biocypher/_core.py
def write_import_call(self) -> str:
    """Write a shell script to import the database.

    Shell script is written depending on the chosen DBMS.

    Returns
    -------
        str: path toward the file holding the import call.

    """
    if not self._offline:
        msg = "Cannot write import call in online mode."
        raise NotImplementedError(msg)
    else:
        if not self._writer:
            logger.warning(
                "No edges or nodes were added, I'll try to continue, but you may want to double-check your data."
            )
            self._initialize_writer()

    return self._writer.write_import_call()

write_nodes(nodes, batch_size=int(1000000.0), force=False)

Write nodes to database.

Either takes an iterable of tuples (if given, translates to BioCypherNode objects) or an iterable of BioCypherNode objects.


nodes (iterable): An iterable of nodes to write to the database.
batch_size (int): The batch size to use when writing to disk.
force (bool): Whether to force writing to the output directory even
    if the node type is not present in the schema config file.

bool: True if successful.
Source code in biocypher/_core.py
def write_nodes(
    self,
    nodes,
    batch_size: int = int(1e6),
    force: bool = False,
) -> bool:
    """Write nodes to database.

    Either takes an iterable of tuples (if given, translates to
    ``BioCypherNode`` objects) or an iterable of ``BioCypherNode`` objects.

    Args:
    ----
        nodes (iterable): An iterable of nodes to write to the database.
        batch_size (int): The batch size to use when writing to disk.
        force (bool): Whether to force writing to the output directory even
            if the node type is not present in the schema config file.

    Returns:
    -------
        bool: True if successful.

    """
    return self._add_nodes(nodes, batch_size=batch_size, force=force)

write_schema_info(as_node=False)

Write an extended schema info to file or node.

Creates a YAML file or KG node that extends the schema_config.yaml with run-time information of the built KG. For instance, include information on whether something present in the actual knowledge graph, whether it is a relationship (which is important in the case of representing relationships as nodes) and the actual sources and targets of edges. Since this file can be used in place of the original schema_config.yaml file, it indicates that it is the extended schema by setting is_schema_info to true.

We start by using the extended_schema dictionary from the ontology class instance, which contains all expanded entities and relationships. The information of whether something is a relationship can be gathered from the deduplicator instance, which keeps track of all entities that have been seen.


as_node (bool): If True, the schema info is written as a KG node.
    If False, the schema info is written to a YAML file.
Source code in biocypher/_core.py
def write_schema_info(self, as_node: bool = False) -> None:
    """Write an extended schema info to file or node.

    Creates a YAML file or KG node that extends the `schema_config.yaml`
    with run-time information of the built KG. For instance, include
    information on whether something present in the actual knowledge graph,
    whether it is a relationship (which is important in the case of
    representing relationships as nodes) and the actual sources and
    targets of edges. Since this file can be used in place of the original
    `schema_config.yaml` file, it indicates that it is the extended schema
    by setting `is_schema_info` to `true`.

    We start by using the `extended_schema` dictionary from the ontology
    class instance, which contains all expanded entities and relationships.
    The information of whether something is a relationship can be gathered
    from the deduplicator instance, which keeps track of all entities that
    have been seen.

    Args:
    ----
        as_node (bool): If True, the schema info is written as a KG node.
            If False, the schema info is written to a YAML file.

    """
    if (not self._offline) and self._dbms not in IN_MEMORY_DBMS:
        msg = "Cannot write schema info in online mode."
        raise NotImplementedError(msg)

    ontology = self._get_ontology()
    schema = ontology.mapping.extended_schema.copy()
    schema["is_schema_info"] = True

    deduplicator = self._get_deduplicator()
    for node in deduplicator.entity_types:
        if node in schema:
            schema[node]["present_in_knowledge_graph"] = True
            schema[node]["is_relationship"] = False
        else:
            logger.info(
                f"Node {node} not present in extended schema. Skipping schema info.",
            )

    # find 'label_as_edge' cases in schema entries
    changed_labels = {}
    for k, v in schema.items():
        if not isinstance(v, dict):
            continue
        if "label_as_edge" in v:
            if v["label_as_edge"] in deduplicator.seen_relationships:
                changed_labels[v["label_as_edge"]] = k

    for edge in deduplicator.seen_relationships:
        if edge in changed_labels:
            edge = changed_labels[edge]
        if edge in schema:
            schema[edge]["present_in_knowledge_graph"] = True
            schema[edge]["is_relationship"] = True
            # TODO information about source and target nodes
        else:
            logger.info(
                f"Edge {edge} not present in extended schema. Skipping schema info.",
            )

    # write to output directory as YAML file
    path = os.path.join(self._output_directory, "schema_info.yaml")
    with open(path, "w") as f:
        f.write(yaml.dump(schema))

    if as_node:
        # write as node
        node = BioCypherNode(
            node_id="schema_info",
            node_label="schema_info",
            properties={"schema_info": json.dumps(schema)},
        )
        self.write_nodes([node], force=True)

        # override import call with added schema info node
        self.write_import_call()

    return schema

create

BioCypher 'create' module. Handles the creation of BioCypher node and edge dataclasses.

BioCypherEdge dataclass

Handoff class to represent biomedical relationships in Neo4j.

Has source and target ids, label, property dict; ids and label (in the Neo4j sense of a label, ie, the entity descriptor after the colon, such as ":TARGETS") are non-optional and called source_id, target_id, and relationship_label to avoid confusion with properties called "label", which usually denotes the human-readable form. Relationship labels are written in UPPERCASE and as verbs, as per Neo4j consensus.

Args:

source_id (string): consensus "best" id for biological entity

target_id (string): consensus "best" id for biological entity

relationship_label (string): type of interaction, UPPERCASE

properties (dict): collection of all other properties of the
respective edge
Source code in biocypher/_create.py
@dataclass(frozen=True)
class BioCypherEdge:
    """
    Handoff class to represent biomedical relationships in Neo4j.

    Has source and target ids, label, property dict; ids and label (in
    the Neo4j sense of a label, ie, the entity descriptor after the
    colon, such as ":TARGETS") are non-optional and called source_id,
    target_id, and relationship_label to avoid confusion with properties
    called "label", which usually denotes the human-readable form.
    Relationship labels are written in UPPERCASE and as verbs, as per
    Neo4j consensus.

    Args:

        source_id (string): consensus "best" id for biological entity

        target_id (string): consensus "best" id for biological entity

        relationship_label (string): type of interaction, UPPERCASE

        properties (dict): collection of all other properties of the
        respective edge

    """

    source_id: str
    target_id: str
    relationship_label: str
    relationship_id: str = None
    properties: dict = field(default_factory=dict)

    def __post_init__(self):
        """
        Check for reserved keywords.
        """

        if ":TYPE" in self.properties.keys():
            logger.debug(
                "Keyword ':TYPE' is reserved for Neo4j. Removing from properties.",
                # "Renaming to 'type'."
            )
            # self.properties["type"] = self.properties[":TYPE"]
            del self.properties[":TYPE"]
        elif "id" in self.properties.keys():
            logger.debug(
                "Keyword 'id' is reserved for Neo4j. Removing from properties.",
                # "Renaming to 'type'."
            )
            # self.properties["type"] = self.properties[":TYPE"]
            del self.properties["id"]
        elif "_ID" in self.properties.keys():
            logger.debug(
                "Keyword '_ID' is reserved for Postgres. Removing from properties.",
                # "Renaming to 'type'."
            )
            # self.properties["type"] = self.properties[":TYPE"]
            del self.properties["_ID"]

    def get_id(self) -> Union[str, None]:
        """
        Returns primary node identifier or None.

        Returns:
            str: node_id
        """

        return self.relationship_id

    def get_source_id(self) -> str:
        """
        Returns primary node identifier of relationship source.

        Returns:
            str: source_id
        """
        return self.source_id

    def get_target_id(self) -> str:
        """
        Returns primary node identifier of relationship target.

        Returns:
            str: target_id
        """
        return self.target_id

    def get_label(self) -> str:
        """
        Returns relationship label.

        Returns:
            str: relationship_label
        """
        return self.relationship_label

    def get_type(self) -> str:
        """
        Returns relationship label.

        Returns:
            str: relationship_label
        """
        return self.relationship_label

    def get_properties(self) -> dict:
        """
        Returns all other relationship properties apart from primary ids
        and label as key-value pairs.

        Returns:
            dict: properties
        """
        return self.properties

    def get_dict(self) -> dict:
        """
        Return dict of ids, label, and properties.

        Returns:
            dict: source_id, target_id and relationship_label as
                top-level key-value pairs, properties as second-level
                dict.
        """
        return {
            "relationship_id": self.relationship_id or None,
            "source_id": self.source_id,
            "target_id": self.target_id,
            "relationship_label": self.relationship_label,
            "properties": self.properties,
        }

__post_init__()

Check for reserved keywords.

Source code in biocypher/_create.py
def __post_init__(self):
    """
    Check for reserved keywords.
    """

    if ":TYPE" in self.properties.keys():
        logger.debug(
            "Keyword ':TYPE' is reserved for Neo4j. Removing from properties.",
            # "Renaming to 'type'."
        )
        # self.properties["type"] = self.properties[":TYPE"]
        del self.properties[":TYPE"]
    elif "id" in self.properties.keys():
        logger.debug(
            "Keyword 'id' is reserved for Neo4j. Removing from properties.",
            # "Renaming to 'type'."
        )
        # self.properties["type"] = self.properties[":TYPE"]
        del self.properties["id"]
    elif "_ID" in self.properties.keys():
        logger.debug(
            "Keyword '_ID' is reserved for Postgres. Removing from properties.",
            # "Renaming to 'type'."
        )
        # self.properties["type"] = self.properties[":TYPE"]
        del self.properties["_ID"]

get_dict()

Return dict of ids, label, and properties.

Returns:

Name Type Description
dict dict

source_id, target_id and relationship_label as top-level key-value pairs, properties as second-level dict.

Source code in biocypher/_create.py
def get_dict(self) -> dict:
    """
    Return dict of ids, label, and properties.

    Returns:
        dict: source_id, target_id and relationship_label as
            top-level key-value pairs, properties as second-level
            dict.
    """
    return {
        "relationship_id": self.relationship_id or None,
        "source_id": self.source_id,
        "target_id": self.target_id,
        "relationship_label": self.relationship_label,
        "properties": self.properties,
    }

get_id()

Returns primary node identifier or None.

Returns:

Name Type Description
str Union[str, None]

node_id

Source code in biocypher/_create.py
def get_id(self) -> Union[str, None]:
    """
    Returns primary node identifier or None.

    Returns:
        str: node_id
    """

    return self.relationship_id

get_label()

Returns relationship label.

Returns:

Name Type Description
str str

relationship_label

Source code in biocypher/_create.py
def get_label(self) -> str:
    """
    Returns relationship label.

    Returns:
        str: relationship_label
    """
    return self.relationship_label

get_properties()

Returns all other relationship properties apart from primary ids and label as key-value pairs.

Returns:

Name Type Description
dict dict

properties

Source code in biocypher/_create.py
def get_properties(self) -> dict:
    """
    Returns all other relationship properties apart from primary ids
    and label as key-value pairs.

    Returns:
        dict: properties
    """
    return self.properties

get_source_id()

Returns primary node identifier of relationship source.

Returns:

Name Type Description
str str

source_id

Source code in biocypher/_create.py
def get_source_id(self) -> str:
    """
    Returns primary node identifier of relationship source.

    Returns:
        str: source_id
    """
    return self.source_id

get_target_id()

Returns primary node identifier of relationship target.

Returns:

Name Type Description
str str

target_id

Source code in biocypher/_create.py
def get_target_id(self) -> str:
    """
    Returns primary node identifier of relationship target.

    Returns:
        str: target_id
    """
    return self.target_id

get_type()

Returns relationship label.

Returns:

Name Type Description
str str

relationship_label

Source code in biocypher/_create.py
def get_type(self) -> str:
    """
    Returns relationship label.

    Returns:
        str: relationship_label
    """
    return self.relationship_label

BioCypherNode dataclass

Handoff class to represent biomedical entities as Neo4j nodes.

Has id, label, property dict; id and label (in the Neo4j sense of a label, ie, the entity descriptor after the colon, such as ":Protein") are non-optional and called node_id and node_label to avoid confusion with "label" properties. Node labels are written in PascalCase and as nouns, as per Neo4j consensus.

Parameters:

Name Type Description Default
node_id string

consensus "best" id for biological entity

required
node_label string

primary type of entity, capitalised

required
**properties kwargs

collection of all other properties to be passed to neo4j for the respective node (dict)

dict()
Todo
  • check and correct small inconsistencies such as capitalisation of ID names ("uniprot" vs "UniProt")
  • check for correct ID patterns (eg "ENSG" + string of numbers, uniprot length)
  • ID conversion using pypath translation facilities for now
Source code in biocypher/_create.py
@dataclass(frozen=True)
class BioCypherNode:
    """
    Handoff class to represent biomedical entities as Neo4j nodes.

    Has id, label, property dict; id and label (in the Neo4j sense of a
    label, ie, the entity descriptor after the colon, such as
    ":Protein") are non-optional and called node_id and node_label to
    avoid confusion with "label" properties. Node labels are written in
    PascalCase and as nouns, as per Neo4j consensus.

    Args:
        node_id (string): consensus "best" id for biological entity
        node_label (string): primary type of entity, capitalised
        **properties (kwargs): collection of all other properties to be
            passed to neo4j for the respective node (dict)

    Todo:
        - check and correct small inconsistencies such as capitalisation
            of ID names ("uniprot" vs "UniProt")
        - check for correct ID patterns (eg "ENSG" + string of numbers,
            uniprot length)
        - ID conversion using pypath translation facilities for now
    """

    node_id: str
    node_label: str
    preferred_id: str = "id"
    properties: dict = field(default_factory=dict)

    def __post_init__(self):
        """
        Add id field to properties.

        Check for reserved keywords.

        Replace unwanted characters in properties.
        """
        self.properties["id"] = self.node_id
        self.properties["preferred_id"] = self.preferred_id or None
        # TODO actually make None possible here; as is, "id" is the default in
        # the dataclass as well as in the configuration file

        if ":TYPE" in self.properties.keys():
            logger.warning(
                "Keyword ':TYPE' is reserved for Neo4j. Removing from properties.",
                # "Renaming to 'type'."
            )
            # self.properties["type"] = self.properties[":TYPE"]
            del self.properties[":TYPE"]

        for k, v in self.properties.items():
            if isinstance(v, str):
                self.properties[k] = (
                    v.replace(
                        os.linesep,
                        " ",
                    )
                    .replace(
                        "\n",
                        " ",
                    )
                    .replace(
                        "\r",
                        " ",
                    )
                )

            elif isinstance(v, list):
                self.properties[k] = [
                    val.replace(
                        os.linesep,
                        " ",
                    )
                    .replace(
                        "\n",
                        " ",
                    )
                    .replace("\r", " ")
                    for val in v
                ]

    def get_id(self) -> str:
        """
        Returns primary node identifier.

        Returns:
            str: node_id
        """
        return self.node_id

    def get_label(self) -> str:
        """
        Returns primary node label.

        Returns:
            str: node_label
        """
        return self.node_label

    def get_type(self) -> str:
        """
        Returns primary node label.

        Returns:
            str: node_label
        """
        return self.node_label

    def get_preferred_id(self) -> str:
        """
        Returns preferred id.

        Returns:
            str: preferred_id
        """
        return self.preferred_id

    def get_properties(self) -> dict:
        """
        Returns all other node properties apart from primary id and
        label as key-value pairs.

        Returns:
            dict: properties
        """
        return self.properties

    def get_dict(self) -> dict:
        """
        Return dict of id, labels, and properties.

        Returns:
            dict: node_id and node_label as top-level key-value pairs,
            properties as second-level dict.
        """
        return {
            "node_id": self.node_id,
            "node_label": self.node_label,
            "properties": self.properties,
        }

__post_init__()

Add id field to properties.

Check for reserved keywords.

Replace unwanted characters in properties.

Source code in biocypher/_create.py
def __post_init__(self):
    """
    Add id field to properties.

    Check for reserved keywords.

    Replace unwanted characters in properties.
    """
    self.properties["id"] = self.node_id
    self.properties["preferred_id"] = self.preferred_id or None
    # TODO actually make None possible here; as is, "id" is the default in
    # the dataclass as well as in the configuration file

    if ":TYPE" in self.properties.keys():
        logger.warning(
            "Keyword ':TYPE' is reserved for Neo4j. Removing from properties.",
            # "Renaming to 'type'."
        )
        # self.properties["type"] = self.properties[":TYPE"]
        del self.properties[":TYPE"]

    for k, v in self.properties.items():
        if isinstance(v, str):
            self.properties[k] = (
                v.replace(
                    os.linesep,
                    " ",
                )
                .replace(
                    "\n",
                    " ",
                )
                .replace(
                    "\r",
                    " ",
                )
            )

        elif isinstance(v, list):
            self.properties[k] = [
                val.replace(
                    os.linesep,
                    " ",
                )
                .replace(
                    "\n",
                    " ",
                )
                .replace("\r", " ")
                for val in v
            ]

get_dict()

Return dict of id, labels, and properties.

Returns:

Name Type Description
dict dict

node_id and node_label as top-level key-value pairs,

dict

properties as second-level dict.

Source code in biocypher/_create.py
def get_dict(self) -> dict:
    """
    Return dict of id, labels, and properties.

    Returns:
        dict: node_id and node_label as top-level key-value pairs,
        properties as second-level dict.
    """
    return {
        "node_id": self.node_id,
        "node_label": self.node_label,
        "properties": self.properties,
    }

get_id()

Returns primary node identifier.

Returns:

Name Type Description
str str

node_id

Source code in biocypher/_create.py
def get_id(self) -> str:
    """
    Returns primary node identifier.

    Returns:
        str: node_id
    """
    return self.node_id

get_label()

Returns primary node label.

Returns:

Name Type Description
str str

node_label

Source code in biocypher/_create.py
def get_label(self) -> str:
    """
    Returns primary node label.

    Returns:
        str: node_label
    """
    return self.node_label

get_preferred_id()

Returns preferred id.

Returns:

Name Type Description
str str

preferred_id

Source code in biocypher/_create.py
def get_preferred_id(self) -> str:
    """
    Returns preferred id.

    Returns:
        str: preferred_id
    """
    return self.preferred_id

get_properties()

Returns all other node properties apart from primary id and label as key-value pairs.

Returns:

Name Type Description
dict dict

properties

Source code in biocypher/_create.py
def get_properties(self) -> dict:
    """
    Returns all other node properties apart from primary id and
    label as key-value pairs.

    Returns:
        dict: properties
    """
    return self.properties

get_type()

Returns primary node label.

Returns:

Name Type Description
str str

node_label

Source code in biocypher/_create.py
def get_type(self) -> str:
    """
    Returns primary node label.

    Returns:
        str: node_label
    """
    return self.node_label

BioCypherRelAsNode dataclass

Class to represent relationships as nodes (with in- and outgoing edges) as a triplet of a BioCypherNode and two BioCypherEdges. Main usage in type checking (instances where the receiving function needs to check whether it receives a relationship as a single edge or as a triplet).

Args:

node (BioCypherNode): node representing the relationship

source_edge (BioCypherEdge): edge representing the source of the
    relationship

target_edge (BioCypherEdge): edge representing the target of the
    relationship
Source code in biocypher/_create.py
@dataclass(frozen=True)
class BioCypherRelAsNode:
    """
    Class to represent relationships as nodes (with in- and outgoing
    edges) as a triplet of a BioCypherNode and two BioCypherEdges. Main
    usage in type checking (instances where the receiving function needs
    to check whether it receives a relationship as a single edge or as
    a triplet).

    Args:

        node (BioCypherNode): node representing the relationship

        source_edge (BioCypherEdge): edge representing the source of the
            relationship

        target_edge (BioCypherEdge): edge representing the target of the
            relationship

    """

    node: BioCypherNode
    source_edge: BioCypherEdge
    target_edge: BioCypherEdge

    def __post_init__(self):
        if not isinstance(self.node, BioCypherNode):
            raise TypeError(
                f"BioCypherRelAsNode.node must be a BioCypherNode, " f"not {type(self.node)}.",
            )

        if not isinstance(self.source_edge, BioCypherEdge):
            raise TypeError(
                f"BioCypherRelAsNode.source_edge must be a BioCypherEdge, " f"not {type(self.source_edge)}.",
            )

        if not isinstance(self.target_edge, BioCypherEdge):
            raise TypeError(
                f"BioCypherRelAsNode.target_edge must be a BioCypherEdge, " f"not {type(self.target_edge)}.",
            )

    def get_node(self) -> BioCypherNode:
        return self.node

    def get_source_edge(self) -> BioCypherEdge:
        return self.source_edge

    def get_target_edge(self) -> BioCypherEdge:
        return self.target_edge

deduplicate

Deduplicator

Singleton class responsible of deduplicating BioCypher inputs. Maintains sets/dictionaries of node and edge types and their unique identifiers.

Nodes identifiers should be globally unique (represented as a set), while edge identifiers are only unique per edge type (represented as a dict of sets, keyed by edge type).

Stores collection of duplicate node and edge identifiers and types for troubleshooting and to avoid overloading the log.

Source code in biocypher/_deduplicate.py
class Deduplicator:
    """
    Singleton class responsible of deduplicating BioCypher inputs. Maintains
    sets/dictionaries of node and edge types and their unique identifiers.

    Nodes identifiers should be globally unique (represented as a set), while
    edge identifiers are only unique per edge type (represented as a dict of
    sets, keyed by edge type).

    Stores collection of duplicate node and edge identifiers and types for
    troubleshooting and to avoid overloading the log.
    """

    def __init__(self):
        self.seen_entity_ids = set()
        self.duplicate_entity_ids = set()

        self.entity_types = set()
        self.duplicate_entity_types = set()

        self.seen_relationships = {}
        self.duplicate_relationship_ids = set()
        self.duplicate_relationship_types = set()

    def node_seen(self, entity: BioCypherNode) -> bool:
        """
        Adds a node to the instance and checks if it has been seen before.

        Args:
            node: BioCypherNode to be added.

        Returns:
            True if the node has been seen before, False otherwise.
        """
        if entity.get_label() not in self.entity_types:
            self.entity_types.add(entity.get_label())

        if entity.get_id() in self.seen_entity_ids:
            self.duplicate_entity_ids.add(entity.get_id())
            if entity.get_label() not in self.duplicate_entity_types:
                logger.warning(f"Duplicate node type {entity.get_label()} found. ")
                self.duplicate_entity_types.add(entity.get_label())
            return True

        self.seen_entity_ids.add(entity.get_id())
        return False

    def edge_seen(self, relationship: BioCypherEdge) -> bool:
        """
        Adds an edge to the instance and checks if it has been seen before.

        Args:
            edge: BioCypherEdge to be added.

        Returns:
            True if the edge has been seen before, False otherwise.
        """
        if relationship.get_type() not in self.seen_relationships:
            self.seen_relationships[relationship.get_type()] = set()

        # concatenate source and target if no id is present
        if not relationship.get_id():
            _id = f"{relationship.get_source_id()}_{relationship.get_target_id()}"
        else:
            _id = relationship.get_id()

        if _id in self.seen_relationships[relationship.get_type()]:
            self.duplicate_relationship_ids.add(_id)
            if relationship.get_type() not in self.duplicate_relationship_types:
                logger.warning(f"Duplicate edge type {relationship.get_type()} found. ")
                self.duplicate_relationship_types.add(relationship.get_type())
            return True

        self.seen_relationships[relationship.get_type()].add(_id)
        return False

    def rel_as_node_seen(self, rel_as_node: BioCypherRelAsNode) -> bool:
        """
        Adds a rel_as_node to the instance (one entity and two relationships)
        and checks if it has been seen before. Only the node is relevant for
        identifying the rel_as_node as a duplicate.

        Args:
            rel_as_node: BioCypherRelAsNode to be added.

        Returns:
            True if the rel_as_node has been seen before, False otherwise.
        """
        node = rel_as_node.get_node()

        if node.get_label() not in self.seen_relationships:
            self.seen_relationships[node.get_label()] = set()

        # rel as node always has an id
        _id = node.get_id()

        if _id in self.seen_relationships[node.get_type()]:
            self.duplicate_relationship_ids.add(_id)
            if node.get_type() not in self.duplicate_relationship_types:
                logger.warning(f"Duplicate edge type {node.get_type()} found. ")
                self.duplicate_relationship_types.add(node.get_type())
            return True

        self.seen_relationships[node.get_type()].add(_id)
        return False

    def get_duplicate_nodes(self):
        """
        Function to return a list of duplicate nodes.

        Returns:
            list: list of duplicate nodes
        """

        if self.duplicate_entity_types:
            return (self.duplicate_entity_types, self.duplicate_entity_ids)
        else:
            return None

    def get_duplicate_edges(self):
        """
        Function to return a list of duplicate edges.

        Returns:
            list: list of duplicate edges
        """

        if self.duplicate_relationship_types:
            return (
                self.duplicate_relationship_types,
                self.duplicate_relationship_ids,
            )
        else:
            return None

edge_seen(relationship)

Adds an edge to the instance and checks if it has been seen before.

Parameters:

Name Type Description Default
edge

BioCypherEdge to be added.

required

Returns:

Type Description
bool

True if the edge has been seen before, False otherwise.

Source code in biocypher/_deduplicate.py
def edge_seen(self, relationship: BioCypherEdge) -> bool:
    """
    Adds an edge to the instance and checks if it has been seen before.

    Args:
        edge: BioCypherEdge to be added.

    Returns:
        True if the edge has been seen before, False otherwise.
    """
    if relationship.get_type() not in self.seen_relationships:
        self.seen_relationships[relationship.get_type()] = set()

    # concatenate source and target if no id is present
    if not relationship.get_id():
        _id = f"{relationship.get_source_id()}_{relationship.get_target_id()}"
    else:
        _id = relationship.get_id()

    if _id in self.seen_relationships[relationship.get_type()]:
        self.duplicate_relationship_ids.add(_id)
        if relationship.get_type() not in self.duplicate_relationship_types:
            logger.warning(f"Duplicate edge type {relationship.get_type()} found. ")
            self.duplicate_relationship_types.add(relationship.get_type())
        return True

    self.seen_relationships[relationship.get_type()].add(_id)
    return False

get_duplicate_edges()

Function to return a list of duplicate edges.

Returns:

Name Type Description
list

list of duplicate edges

Source code in biocypher/_deduplicate.py
def get_duplicate_edges(self):
    """
    Function to return a list of duplicate edges.

    Returns:
        list: list of duplicate edges
    """

    if self.duplicate_relationship_types:
        return (
            self.duplicate_relationship_types,
            self.duplicate_relationship_ids,
        )
    else:
        return None

get_duplicate_nodes()

Function to return a list of duplicate nodes.

Returns:

Name Type Description
list

list of duplicate nodes

Source code in biocypher/_deduplicate.py
def get_duplicate_nodes(self):
    """
    Function to return a list of duplicate nodes.

    Returns:
        list: list of duplicate nodes
    """

    if self.duplicate_entity_types:
        return (self.duplicate_entity_types, self.duplicate_entity_ids)
    else:
        return None

node_seen(entity)

Adds a node to the instance and checks if it has been seen before.

Parameters:

Name Type Description Default
node

BioCypherNode to be added.

required

Returns:

Type Description
bool

True if the node has been seen before, False otherwise.

Source code in biocypher/_deduplicate.py
def node_seen(self, entity: BioCypherNode) -> bool:
    """
    Adds a node to the instance and checks if it has been seen before.

    Args:
        node: BioCypherNode to be added.

    Returns:
        True if the node has been seen before, False otherwise.
    """
    if entity.get_label() not in self.entity_types:
        self.entity_types.add(entity.get_label())

    if entity.get_id() in self.seen_entity_ids:
        self.duplicate_entity_ids.add(entity.get_id())
        if entity.get_label() not in self.duplicate_entity_types:
            logger.warning(f"Duplicate node type {entity.get_label()} found. ")
            self.duplicate_entity_types.add(entity.get_label())
        return True

    self.seen_entity_ids.add(entity.get_id())
    return False

rel_as_node_seen(rel_as_node)

Adds a rel_as_node to the instance (one entity and two relationships) and checks if it has been seen before. Only the node is relevant for identifying the rel_as_node as a duplicate.

Parameters:

Name Type Description Default
rel_as_node BioCypherRelAsNode

BioCypherRelAsNode to be added.

required

Returns:

Type Description
bool

True if the rel_as_node has been seen before, False otherwise.

Source code in biocypher/_deduplicate.py
def rel_as_node_seen(self, rel_as_node: BioCypherRelAsNode) -> bool:
    """
    Adds a rel_as_node to the instance (one entity and two relationships)
    and checks if it has been seen before. Only the node is relevant for
    identifying the rel_as_node as a duplicate.

    Args:
        rel_as_node: BioCypherRelAsNode to be added.

    Returns:
        True if the rel_as_node has been seen before, False otherwise.
    """
    node = rel_as_node.get_node()

    if node.get_label() not in self.seen_relationships:
        self.seen_relationships[node.get_label()] = set()

    # rel as node always has an id
    _id = node.get_id()

    if _id in self.seen_relationships[node.get_type()]:
        self.duplicate_relationship_ids.add(_id)
        if node.get_type() not in self.duplicate_relationship_types:
            logger.warning(f"Duplicate edge type {node.get_type()} found. ")
            self.duplicate_relationship_types.add(node.get_type())
        return True

    self.seen_relationships[node.get_type()].add(_id)
    return False

get

BioCypher get module.

Used to download and cache data from external sources.

APIRequest

Bases: Resource

Source code in biocypher/_get.py
class APIRequest(Resource):
    def __init__(self, name: str, url_s: str | list[str], lifetime: int = 0):
        """Initialize an APIRequest object.

        Represents basic information for an API Request.

        Args:
        ----
            name(str): The name of the API Request.

            url_s(str|list): The URL of the API endpoint.

            lifetime(int): The lifetime of the API Request in days. If 0, the
                API Request is cached indefinitely.

        """
        super().__init__(name, url_s, lifetime)

__init__(name, url_s, lifetime=0)

Initialize an APIRequest object.

Represents basic information for an API Request.


name(str): The name of the API Request.

url_s(str|list): The URL of the API endpoint.

lifetime(int): The lifetime of the API Request in days. If 0, the
    API Request is cached indefinitely.
Source code in biocypher/_get.py
def __init__(self, name: str, url_s: str | list[str], lifetime: int = 0):
    """Initialize an APIRequest object.

    Represents basic information for an API Request.

    Args:
    ----
        name(str): The name of the API Request.

        url_s(str|list): The URL of the API endpoint.

        lifetime(int): The lifetime of the API Request in days. If 0, the
            API Request is cached indefinitely.

    """
    super().__init__(name, url_s, lifetime)

Downloader

Source code in biocypher/_get.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
class Downloader:
    def __init__(self, cache_dir: Optional[str] = None) -> None:
        """Initialize the Downloader.

        The Downloader is a class that manages resources that can be downloaded
        and cached locally. It manages the lifetime of downloaded resources by
        keeping a JSON record of the download date of each resource.

        Args:
        ----
            cache_dir (str): The directory where the resources are cached. If
                not given, a temporary directory is created.

        """
        self.cache_dir = cache_dir or TemporaryDirectory().name
        self.cache_file = os.path.join(self.cache_dir, "cache.json")
        self.cache_dict = self._load_cache_dict()

    def download(self, *resources: Resource):
        """Download one or multiple resources.

        Load from cache if the resource is already downloaded and the cache is
        not expired.

        Args:
        ----
            resources (Resource): The resource(s) to download or load from
                cache.

        Returns:
        -------
            list[str]: The path or paths to the resource(s) that were downloaded
                or loaded from cache.

        """
        paths = []
        for resource in resources:
            paths.append(self._download_or_cache(resource))

        # flatten list if it is nested
        if is_nested(paths):
            paths = [path for sublist in paths for path in sublist]

        return paths

    def _download_or_cache(self, resource: Resource, cache: bool = True):
        """Download a resource if it is not cached or exceeded its lifetime.

        Args:
        ----
            resource (Resource): The resource to download.

        Returns:
        -------
            list[str]: The path or paths to the downloaded resource(s).

        """
        expired = self._is_cache_expired(resource)

        if expired or not cache:
            self._delete_expired_cache(resource)
            if isinstance(resource, FileDownload):
                logger.info(f"Asking for download of resource {resource.name}.")
                paths = self._download_files(cache, resource)
            elif isinstance(resource, APIRequest):
                logger.info(f"Asking for download of api request {resource.name}.")
                paths = self._download_api_request(resource)
            else:
                raise TypeError(f"Unknown resource type: {type(resource)}")
        else:
            paths = self.get_cached_version(resource)
        self._update_cache_record(resource)
        return paths

    def _is_cache_expired(self, resource: Resource) -> bool:
        """Check if resource or API request cache is expired.

        Args:
        ----
            resource (Resource): The resource to download.

        Returns:
        -------
            bool: cache is expired or not.

        """
        cache_record = self._get_cache_record(resource)
        if cache_record:
            download_time = datetime.strptime(cache_record.get("date_downloaded"), "%Y-%m-%d %H:%M:%S.%f")
            lifetime = timedelta(days=resource.lifetime)
            expired = download_time + lifetime < datetime.now()
        else:
            expired = True
        return expired

    def _delete_expired_cache(self, resource: Resource):
        cache_resource_path = self.cache_dir + "/" + resource.name
        if os.path.exists(cache_resource_path) and os.path.isdir(cache_resource_path):
            shutil.rmtree(cache_resource_path)

    def _download_files(self, cache, file_download: FileDownload) -> list[str]:
        """Download a resource given it is a file or a directory.

        Upon downloading, return the path(s).

        Args:
        ----
            cache (bool): Whether to cache the resource or not.

            file_download (FileDownload): The resource to download.

        Returns:
        -------
            list[str]: The path or paths to the downloaded resource(s).

        """
        if file_download.is_dir:
            files = self._get_files(file_download)
            file_download.url_s = [file_download.url_s + "/" + file for file in files]
            file_download.is_dir = False
            paths = self._download_or_cache(file_download, cache)
        elif isinstance(file_download.url_s, list):
            paths = []
            for url in file_download.url_s:
                fname = self._trim_filename(url)
                path = self._retrieve(
                    url=url,
                    fname=fname,
                    path=os.path.join(self.cache_dir, file_download.name),
                )
                paths.append(path)
        else:
            paths = []
            fname = self._trim_filename(file_download.url_s)
            results = self._retrieve(
                url=file_download.url_s,
                fname=fname,
                path=os.path.join(self.cache_dir, file_download.name),
            )
            if isinstance(results, list):
                paths.extend(results)
            else:
                paths.append(results)

        # sometimes a compressed file contains multiple files
        # TODO ask for a list of files in the archive to be used from the
        # adapter
        return paths

    def _download_api_request(self, api_request: APIRequest) -> list[str]:
        """Download an API request and return the path.

        Args:
        ----
            api_request(APIRequest): The API request result that is being
                cached.

        Returns:
        -------
            list[str]: The path to the cached API request.

        """
        urls = api_request.url_s if isinstance(api_request.url_s, list) else [api_request.url_s]
        paths = []
        for url in urls:
            fname = self._trim_filename(url)
            logger.info(f"Asking for caching API of {api_request.name} {fname}.")
            response = requests.get(url=url)

            if response.status_code != 200:
                response.raise_for_status()
            response_data = response.json()
            api_path = os.path.join(self.cache_dir, api_request.name, f"{fname}.json")

            os.makedirs(os.path.dirname(api_path), exist_ok=True)
            with open(api_path, "w") as f:
                json.dump(response_data, f)
                logger.info(f"Caching API request to {api_path}.")
            paths.append(api_path)
        return paths

    def get_cached_version(self, resource: Resource) -> list[str]:
        """Get the cached version of a resource.

        Args:
        ----
            resource(Resource): The resource to get the cached version of.

        Returns:
        -------
            list[str]: The paths to the cached resource(s).

        """
        cached_location = os.path.join(self.cache_dir, resource.name)
        logger.info(f"Use cached version from {cached_location}.")
        paths = []
        for file in os.listdir(cached_location):
            paths.append(os.path.join(cached_location, file))
        return paths

    def _retrieve(
        self,
        url: str,
        fname: str,
        path: str,
        known_hash: str = None,
    ) -> str:
        """Retrieve a file from a URL using Pooch.

        Infer type of file from extension and use appropriate processor.

        Args:
        ----
            url (str): The URL to retrieve the file from.

            fname (str): The name of the file.

            path (str): The path to the file.

            known_hash (str): The known hash of the file.

        Returns:
        -------
            str: The path to the file.

        """
        if fname.endswith(".zip"):
            return pooch.retrieve(
                url=url,
                known_hash=known_hash,
                fname=fname,
                path=path,
                processor=pooch.Unzip(),
                progressbar=True,
            )

        elif fname.endswith(".tar.gz"):
            return pooch.retrieve(
                url=url,
                known_hash=known_hash,
                fname=fname,
                path=path,
                processor=pooch.Untar(),
                progressbar=True,
            )

        elif fname.endswith(".gz"):
            return pooch.retrieve(
                url=url,
                known_hash=known_hash,
                fname=fname,
                path=path,
                processor=pooch.Decompress(),
                progressbar=True,
            )

        else:
            return pooch.retrieve(
                url=url,
                known_hash=known_hash,
                fname=fname,
                path=path,
                progressbar=True,
            )

    def _get_files(self, file_download: FileDownload) -> list[str]:
        """Get the files contained in a directory file.

        Args:
        ----
            file_download (FileDownload): The directory file.

        Returns:
        -------
            list[str]: The files contained in the directory.

        """
        if file_download.url_s.startswith("ftp://"):
            # remove protocol
            url = file_download.url_s[6:]
            # get base url
            url = url[: url.find("/")]
            # get directory (remove initial slash as well)
            dir = file_download.url_s[7 + len(url) :]
            # get files
            ftp = ftplib.FTP(url)
            ftp.login()
            ftp.cwd(dir)
            files = ftp.nlst()
            ftp.quit()
        else:
            msg = "Only FTP directories are supported at the moment."
            logger.error(msg)
            raise NotImplementedError(msg)

        return files

    def _load_cache_dict(self) -> dict:
        """Load the cache dictionary from the cache file.

        Create an empty cache file if it does not exist.

        Args:
        ----
            None.

        Returns:
        -------
            dict: The cache dictionary.

        """
        if not os.path.exists(self.cache_dir):
            logger.info(f"Creating cache directory {self.cache_dir}.")
            os.makedirs(self.cache_dir)

        if not os.path.exists(self.cache_file):
            logger.info(f"Creating cache file {self.cache_file}.")
            with open(self.cache_file, "w") as f:
                json.dump({}, f)

        with open(self.cache_file) as f:
            logger.info(f"Loading cache file {self.cache_file}.")
            return json.load(f)

    def _get_cache_record(self, resource: Resource) -> dict:
        """Get the cache record of a resource.

        Args:
        ----
            resource (Resource): The resource to get the cache record of.

        Returns:
        -------
            dict: The cache record of the resource.

        """
        return self.cache_dict.get(resource.name, {})

    def _update_cache_record(self, resource: Resource) -> None:
        """Update the cache record of a resource.

        Args:
        ----
            resource (Resource): The resource to update the cache record of.

        """
        cache_record = {}
        cache_record["url"] = to_list(resource.url_s)
        cache_record["date_downloaded"] = str(datetime.now())
        cache_record["lifetime"] = resource.lifetime
        self.cache_dict[resource.name] = cache_record
        with open(self.cache_file, "w") as f:
            json.dump(self.cache_dict, f, default=str)

    def _trim_filename(self, url: str, max_length: int = 150) -> str:
        """Create a trimmed filename from a URL.

        If the URL exceeds max_length, create a hash of the filename.

        Args:
        ----
            url (str): The URL to generate a filename from
            max_length (int): Maximum filename length (default: 150)

        Returns:
        -------
            str: A valid filename derived from the URL, trimmed if necessary

        """
        # Extract the filename from the URL
        fname = url[url.rfind("/") + 1 :]

        # Remove query parameters if present
        if "?" in fname:
            fname = fname.split("?")[0]

        if len(fname) > max_length:
            import hashlib

            fname_trimmed = hashlib.md5(fname.encode()).hexdigest()
        else:
            fname_trimmed = fname

        return fname_trimmed

__init__(cache_dir=None)

Initialize the Downloader.

The Downloader is a class that manages resources that can be downloaded and cached locally. It manages the lifetime of downloaded resources by keeping a JSON record of the download date of each resource.


cache_dir (str): The directory where the resources are cached. If
    not given, a temporary directory is created.
Source code in biocypher/_get.py
def __init__(self, cache_dir: Optional[str] = None) -> None:
    """Initialize the Downloader.

    The Downloader is a class that manages resources that can be downloaded
    and cached locally. It manages the lifetime of downloaded resources by
    keeping a JSON record of the download date of each resource.

    Args:
    ----
        cache_dir (str): The directory where the resources are cached. If
            not given, a temporary directory is created.

    """
    self.cache_dir = cache_dir or TemporaryDirectory().name
    self.cache_file = os.path.join(self.cache_dir, "cache.json")
    self.cache_dict = self._load_cache_dict()

download(*resources)

Download one or multiple resources.

Load from cache if the resource is already downloaded and the cache is not expired.


resources (Resource): The resource(s) to download or load from
    cache.

list[str]: The path or paths to the resource(s) that were downloaded
    or loaded from cache.
Source code in biocypher/_get.py
def download(self, *resources: Resource):
    """Download one or multiple resources.

    Load from cache if the resource is already downloaded and the cache is
    not expired.

    Args:
    ----
        resources (Resource): The resource(s) to download or load from
            cache.

    Returns:
    -------
        list[str]: The path or paths to the resource(s) that were downloaded
            or loaded from cache.

    """
    paths = []
    for resource in resources:
        paths.append(self._download_or_cache(resource))

    # flatten list if it is nested
    if is_nested(paths):
        paths = [path for sublist in paths for path in sublist]

    return paths

get_cached_version(resource)

Get the cached version of a resource.


resource(Resource): The resource to get the cached version of.

list[str]: The paths to the cached resource(s).
Source code in biocypher/_get.py
def get_cached_version(self, resource: Resource) -> list[str]:
    """Get the cached version of a resource.

    Args:
    ----
        resource(Resource): The resource to get the cached version of.

    Returns:
    -------
        list[str]: The paths to the cached resource(s).

    """
    cached_location = os.path.join(self.cache_dir, resource.name)
    logger.info(f"Use cached version from {cached_location}.")
    paths = []
    for file in os.listdir(cached_location):
        paths.append(os.path.join(cached_location, file))
    return paths

FileDownload

Bases: Resource

Source code in biocypher/_get.py
class FileDownload(Resource):
    def __init__(
        self,
        name: str,
        url_s: str | list[str],
        lifetime: int = 0,
        is_dir: bool = False,
    ):
        """Initialize a FileDownload object.

        Represents basic information for a File Download.

        Args:
        ----
            name(str): The name of the File Download.

            url_s(str|list[str]): The URL(s) of the File Download.

            lifetime(int): The lifetime of the File Download in days. If 0, the
                File Download is cached indefinitely.

            is_dir (bool): Whether the URL points to a directory or not.

        """
        super().__init__(name, url_s, lifetime)
        self.is_dir = is_dir

__init__(name, url_s, lifetime=0, is_dir=False)

Initialize a FileDownload object.

Represents basic information for a File Download.


name(str): The name of the File Download.

url_s(str|list[str]): The URL(s) of the File Download.

lifetime(int): The lifetime of the File Download in days. If 0, the
    File Download is cached indefinitely.

is_dir (bool): Whether the URL points to a directory or not.
Source code in biocypher/_get.py
def __init__(
    self,
    name: str,
    url_s: str | list[str],
    lifetime: int = 0,
    is_dir: bool = False,
):
    """Initialize a FileDownload object.

    Represents basic information for a File Download.

    Args:
    ----
        name(str): The name of the File Download.

        url_s(str|list[str]): The URL(s) of the File Download.

        lifetime(int): The lifetime of the File Download in days. If 0, the
            File Download is cached indefinitely.

        is_dir (bool): Whether the URL points to a directory or not.

    """
    super().__init__(name, url_s, lifetime)
    self.is_dir = is_dir

Resource

Bases: ABC

Source code in biocypher/_get.py
class Resource(ABC):
    def __init__(
        self,
        name: str,
        url_s: str | list[str],
        lifetime: int = 0,
    ):
        """Initialize a Resource.

        A Resource is a file, a list of files, an API request, or a list of API
        requests, any of which can be downloaded from the given URL(s) and
        cached locally. This class implements checks of the minimum requirements
        for a resource, to be implemented by a biocypher adapter.

        Args:
        ----
            name (str): The name of the resource.

            url_s (str | list[str]): The URL or URLs of the resource.

            lifetime (int): The lifetime of the resource in days. If 0, the
                resource is considered to be permanent.

        """
        self.name = name
        self.url_s = url_s
        self.lifetime = lifetime

__init__(name, url_s, lifetime=0)

Initialize a Resource.

A Resource is a file, a list of files, an API request, or a list of API requests, any of which can be downloaded from the given URL(s) and cached locally. This class implements checks of the minimum requirements for a resource, to be implemented by a biocypher adapter.


name (str): The name of the resource.

url_s (str | list[str]): The URL or URLs of the resource.

lifetime (int): The lifetime of the resource in days. If 0, the
    resource is considered to be permanent.
Source code in biocypher/_get.py
def __init__(
    self,
    name: str,
    url_s: str | list[str],
    lifetime: int = 0,
):
    """Initialize a Resource.

    A Resource is a file, a list of files, an API request, or a list of API
    requests, any of which can be downloaded from the given URL(s) and
    cached locally. This class implements checks of the minimum requirements
    for a resource, to be implemented by a biocypher adapter.

    Args:
    ----
        name (str): The name of the resource.

        url_s (str | list[str]): The URL or URLs of the resource.

        lifetime (int): The lifetime of the resource in days. If 0, the
            resource is considered to be permanent.

    """
    self.name = name
    self.url_s = url_s
    self.lifetime = lifetime

graph

Unified Graph representation for BioCypher.

This module provides a comprehensive Graph class that can represent various graph types including simple graphs, directed graphs, weighted graphs, multigraphs, and hypergraphs. The design focuses on simplicity and extensibility for knowledge representation.

TODO: examine overlap with legacy BioCypher modules, synergise where possible. TODO: evaluate generalised graph class as consensus internal representation as technical intermediate for other output adapters. TODO: validation of new entities against schema. Rollback of inconsistent operations. TODO: retrieval of subgraphs from existing databases.

Edge dataclass

Represents an edge in the graph.

Source code in biocypher/_graph.py
@dataclass
class Edge:
    """Represents an edge in the graph."""

    id: str
    type: str
    source: str
    target: str
    properties: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        if not isinstance(self.id, str):
            raise ValueError("Edge ID must be a string")
        if not isinstance(self.type, str):
            raise ValueError("Edge type must be a string")
        if not isinstance(self.source, str):
            raise ValueError("Edge source must be a string")
        if not isinstance(self.target, str):
            raise ValueError("Edge target must be a string")

    def to_dict(self) -> dict[str, Any]:
        """Convert edge to dictionary representation."""
        return {
            "id": self.id,
            "type": self.type,
            "source": self.source,
            "target": self.target,
            "properties": self.properties,
        }

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "Edge":
        """Create edge from dictionary representation."""
        return cls(
            id=data["id"],
            type=data["type"],
            source=data["source"],
            target=data["target"],
            properties=data.get("properties", {}),
        )

from_dict(data) classmethod

Create edge from dictionary representation.

Source code in biocypher/_graph.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Edge":
    """Create edge from dictionary representation."""
    return cls(
        id=data["id"],
        type=data["type"],
        source=data["source"],
        target=data["target"],
        properties=data.get("properties", {}),
    )

to_dict()

Convert edge to dictionary representation.

Source code in biocypher/_graph.py
def to_dict(self) -> dict[str, Any]:
    """Convert edge to dictionary representation."""
    return {
        "id": self.id,
        "type": self.type,
        "source": self.source,
        "target": self.target,
        "properties": self.properties,
    }

EdgeType

Bases: Enum

Types of edges in the graph.

Source code in biocypher/_graph.py
class EdgeType(Enum):
    """Types of edges in the graph."""

    SIMPLE = "simple"
    DIRECTED = "directed"
    WEIGHTED = "weighted"
    HYPEREDGE = "hyperedge"

Graph

Unified graph representation supporting various graph types.

This class provides a comprehensive graph representation that can handle: - Simple undirected graphs - Directed graphs - Weighted graphs - Multigraphs (multiple edges between same nodes) - Hypergraphs (edges connecting multiple nodes) - Property graphs (nodes and edges with properties)

The design prioritizes simplicity and extensibility for knowledge representation.

Source code in biocypher/_graph.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
class Graph:
    """Unified graph representation supporting various graph types.

    This class provides a comprehensive graph representation that can handle:
    - Simple undirected graphs
    - Directed graphs
    - Weighted graphs
    - Multigraphs (multiple edges between same nodes)
    - Hypergraphs (edges connecting multiple nodes)
    - Property graphs (nodes and edges with properties)

    The design prioritizes simplicity and extensibility for knowledge representation.
    """

    def __init__(self, name: str = "graph", directed: bool = True):
        """Initialize a new graph.

        Args:
            name: Name of the graph
            directed: Whether the graph is directed (default: True)
        """
        self.name = name
        self.directed = directed

        # Core data structures
        self._nodes: dict[str, Node] = {}
        self._edges: dict[str, Edge] = {}
        self._hyperedges: dict[str, HyperEdge] = {}

        # Indexes for efficient querying
        self._node_types: dict[str, set[str]] = defaultdict(set)
        self._edge_types: dict[str, set[str]] = defaultdict(set)
        self._hyperedge_types: dict[str, set[str]] = defaultdict(set)

        # Adjacency indexes
        self._outgoing: dict[str, set[str]] = defaultdict(set)
        self._incoming: dict[str, set[str]] = defaultdict(set)

        # Statistics
        self._stats = {"nodes": 0, "edges": 0, "hyperedges": 0, "node_types": 0, "edge_types": 0, "hyperedge_types": 0}

    # ==================== NODE OPERATIONS ====================

    def add_node(self, node_id: str, node_type: str, properties: dict[str, Any] | None = None) -> bool:
        """Add a node to the graph.

        Args:
            node_id: Unique identifier for the node
            node_type: Type/category of the node
            properties: Optional properties dictionary

        Returns:
            bool: True if node was added, False if it already exists
        """
        if node_id in self._nodes:
            return False

        node = Node(id=node_id, type=node_type, properties=properties or {})

        self._nodes[node_id] = node
        self._node_types[node_type].add(node_id)
        self._stats["nodes"] += 1
        self._stats["node_types"] = len(self._node_types)

        return True

    def get_node(self, node_id: str) -> Node | None:
        """Get a node by ID.

        Args:
            node_id: Node identifier

        Returns:
            Node object or None if not found
        """
        return self._nodes.get(node_id)

    def has_node(self, node_id: str) -> bool:
        """Check if a node exists.

        Args:
            node_id: Node identifier

        Returns:
            bool: True if node exists
        """
        return node_id in self._nodes

    def remove_node(self, node_id: str) -> bool:
        """Remove a node and all its connected edges.

        Args:
            node_id: Node identifier

        Returns:
            bool: True if node was removed, False if not found
        """
        if node_id not in self._nodes:
            return False

        node = self._nodes[node_id]

        # Remove from type index
        self._node_types[node.type].discard(node_id)
        if not self._node_types[node.type]:
            del self._node_types[node.type]

        # Remove connected edges
        edges_to_remove = []
        for edge_id, edge in self._edges.items():
            if edge.source == node_id or edge.target == node_id:
                edges_to_remove.append(edge_id)

        for edge_id in edges_to_remove:
            self.remove_edge(edge_id)

        # Remove from adjacency indexes
        if node_id in self._outgoing:
            del self._outgoing[node_id]
        if node_id in self._incoming:
            del self._incoming[node_id]

        # Remove node
        del self._nodes[node_id]
        self._stats["nodes"] -= 1
        self._stats["node_types"] = len(self._node_types)

        return True

    def get_nodes(self, node_type: str | None = None) -> list[Node]:
        """Get all nodes, optionally filtered by type.

        Args:
            node_type: Optional filter by node type

        Returns:
            List of Node objects
        """
        if node_type is None:
            return list(self._nodes.values())

        node_ids = self._node_types.get(node_type, set())
        return [self._nodes[node_id] for node_id in node_ids]

    def get_node_ids(self, node_type: str | None = None) -> set[str]:
        """Get all node IDs, optionally filtered by type.

        Args:
            node_type: Optional filter by node type

        Returns:
            Set of node IDs
        """
        if node_type is None:
            return set(self._nodes.keys())

        return self._node_types.get(node_type, set()).copy()

    # ==================== EDGE OPERATIONS ====================

    def add_edge(
        self, edge_id: str, edge_type: str, source: str, target: str, properties: dict[str, Any] | None = None
    ) -> bool:
        """Add an edge to the graph.

        Args:
            edge_id: Unique identifier for the edge
            edge_type: Type/category of the edge
            source: Source node ID
            target: Target node ID
            properties: Optional properties dictionary

        Returns:
            bool: True if edge was added, False if it already exists
        """
        if edge_id in self._edges:
            return False

        # Check if nodes exist
        if source not in self._nodes:
            raise ValueError(f"Source node '{source}' does not exist")
        if target not in self._nodes:
            raise ValueError(f"Target node '{target}' does not exist")

        edge = Edge(id=edge_id, type=edge_type, source=source, target=target, properties=properties or {})

        self._edges[edge_id] = edge
        self._edge_types[edge_type].add(edge_id)

        # Update adjacency indexes
        self._outgoing[source].add(edge_id)
        self._incoming[target].add(edge_id)

        self._stats["edges"] += 1
        self._stats["edge_types"] = len(self._edge_types)

        return True

    def get_edge(self, edge_id: str) -> Edge | None:
        """Get an edge by ID.

        Args:
            edge_id: Edge identifier

        Returns:
            Edge object or None if not found
        """
        return self._edges.get(edge_id)

    def has_edge(self, edge_id: str) -> bool:
        """Check if an edge exists.

        Args:
            edge_id: Edge identifier

        Returns:
            bool: True if edge exists
        """
        return edge_id in self._edges

    def remove_edge(self, edge_id: str) -> bool:
        """Remove an edge from the graph.

        Args:
            edge_id: Edge identifier

        Returns:
            bool: True if edge was removed, False if not found
        """
        if edge_id not in self._edges:
            return False

        edge = self._edges[edge_id]

        # Remove from type index
        self._edge_types[edge.type].discard(edge_id)
        if not self._edge_types[edge.type]:
            del self._edge_types[edge.type]

        # Remove from adjacency indexes
        self._outgoing[edge.source].discard(edge_id)
        self._incoming[edge.target].discard(edge_id)

        # Remove edge
        del self._edges[edge_id]
        self._stats["edges"] -= 1
        self._stats["edge_types"] = len(self._edge_types)

        return True

    def get_edges(self, edge_type: str | None = None) -> list[Edge]:
        """Get all edges, optionally filtered by type.

        Args:
            edge_type: Optional filter by edge type

        Returns:
            List of Edge objects
        """
        if edge_type is None:
            return list(self._edges.values())

        edge_ids = self._edge_types.get(edge_type, set())
        return [self._edges[edge_id] for edge_id in edge_ids]

    def get_edges_between(self, source: str, target: str, edge_type: str | None = None) -> list[Edge]:
        """Get edges between two nodes.

        Args:
            source: Source node ID
            target: Target node ID
            edge_type: Optional filter by edge type

        Returns:
            List of Edge objects
        """
        edges = []
        source_edges = self._outgoing.get(source, set())

        for edge_id in source_edges:
            edge = self._edges[edge_id]
            if edge.target == target and (edge_type is None or edge.type == edge_type):
                edges.append(edge)

        return edges

    # ==================== HYPEREDGE OPERATIONS ====================

    def add_hyperedge(
        self, hyperedge_id: str, hyperedge_type: str, nodes: set[str], properties: dict[str, Any] | None = None
    ) -> bool:
        """Add a hyperedge to the graph.

        Args:
            hyperedge_id: Unique identifier for the hyperedge
            hyperedge_type: Type/category of the hyperedge
            nodes: Set of node IDs to connect
            properties: Optional properties dictionary

        Returns:
            bool: True if hyperedge was added, False if it already exists
        """
        if hyperedge_id in self._hyperedges:
            return False

        # Check if all nodes exist
        for node_id in nodes:
            if node_id not in self._nodes:
                raise ValueError(f"Node '{node_id}' does not exist")

        if len(nodes) < 2:
            raise ValueError("Hyperedge must connect at least 2 nodes")

        hyperedge = HyperEdge(id=hyperedge_id, type=hyperedge_type, nodes=nodes, properties=properties or {})

        self._hyperedges[hyperedge_id] = hyperedge
        self._hyperedge_types[hyperedge_type].add(hyperedge_id)

        self._stats["hyperedges"] += 1
        self._stats["hyperedge_types"] = len(self._hyperedge_types)

        return True

    def get_hyperedge(self, hyperedge_id: str) -> HyperEdge | None:
        """Get a hyperedge by ID.

        Args:
            hyperedge_id: Hyperedge identifier

        Returns:
            HyperEdge object or None if not found
        """
        return self._hyperedges.get(hyperedge_id)

    def has_hyperedge(self, hyperedge_id: str) -> bool:
        """Check if a hyperedge exists.

        Args:
            hyperedge_id: Hyperedge identifier

        Returns:
            bool: True if hyperedge exists
        """
        return hyperedge_id in self._hyperedges

    def get_hyperedges(self, hyperedge_type: str | None = None) -> list[HyperEdge]:
        """Get all hyperedges, optionally filtered by type.

        Args:
            hyperedge_type: Optional filter by hyperedge type

        Returns:
            List of HyperEdge objects
        """
        if hyperedge_type is None:
            return list(self._hyperedges.values())

        hyperedge_ids = self._hyperedge_types.get(hyperedge_type, set())
        return [self._hyperedges[hyperedge_id] for hyperedge_id in hyperedge_ids]

    # ==================== GRAPH TRAVERSAL ====================
    # These methods are placeholders. I am not sure it is useful to focus on traversal,
    # retrieval, analysis, etc. in this module. May be better to focus on the agentic
    # creation of the graph here, and figure out if graph traversal is needed in the
    # use cases we want to support.

    def get_neighbors(self, node_id: str, direction: str = "both") -> set[str]:
        """Get neighboring nodes.

        Args:
            node_id: Node identifier
            direction: "in", "out", or "both"

        Returns:
            Set of neighboring node IDs
        """
        if node_id not in self._nodes:
            return set()

        neighbors = set()

        if direction in ["out", "both"]:
            for edge_id in self._outgoing.get(node_id, set()):
                edge = self._edges[edge_id]
                neighbors.add(edge.target)

        if direction in ["in", "both"]:
            for edge_id in self._incoming.get(node_id, set()):
                edge = self._edges[edge_id]
                neighbors.add(edge.source)

        return neighbors

    def get_connected_edges(self, node_id: str, direction: str = "both") -> list[Edge]:
        """Get edges connected to a node.

        Args:
            node_id: Node identifier
            direction: "in", "out", or "both"

        Returns:
            List of connected Edge objects
        """
        edges = []

        if direction in ["out", "both"]:
            for edge_id in self._outgoing.get(node_id, set()):
                edges.append(self._edges[edge_id])

        if direction in ["in", "both"]:
            for edge_id in self._incoming.get(node_id, set()):
                edges.append(self._edges[edge_id])

        return edges

    def find_paths(self, source: str, target: str, max_length: int = 3) -> list[list[Edge]]:
        """Find all paths between two nodes.

        Args:
            source: Source node ID
            target: Target node ID
            max_length: Maximum path length

        Returns:
            List of paths, each path is a list of Edge objects
        """
        if source not in self._nodes or target not in self._nodes:
            return []

        paths = []
        queue = deque([([], source)])
        visited = set()

        while queue:
            path, current = queue.popleft()

            if current == target and path:
                paths.append(path)
                continue

            if len(path) >= max_length:
                continue

            state = (current, len(path))
            if state in visited:
                continue
            visited.add(state)

            # Explore outgoing edges
            for edge_id in self._outgoing.get(current, set()):
                edge = self._edges[edge_id]
                new_path = path + [edge]
                queue.append((new_path, edge.target))

        return paths

    # ==================== GRAPH ANALYSIS ====================
    # These methods are placeholders. Similar to the traversal methods, not fully clear if
    # these are needed.

    def get_statistics(self) -> dict[str, Any]:
        """Get comprehensive graph statistics.

        Returns:
            Dictionary with graph statistics
        """
        # Ensure we have the latest counts by recalculating from actual data
        actual_nodes = len(self._nodes)
        actual_edges = len(self._edges)
        actual_hyperedges = len(self._hyperedges)

        # Update internal stats to match actual counts
        self._stats["nodes"] = actual_nodes
        self._stats["edges"] = actual_edges
        self._stats["hyperedges"] = actual_hyperedges
        self._stats["node_types"] = len(self._node_types)
        self._stats["edge_types"] = len(self._edge_types)
        self._stats["hyperedge_types"] = len(self._hyperedge_types)

        # Node type distribution
        node_type_dist = {node_type: len(nodes) for node_type, nodes in self._node_types.items()}

        # Edge type distribution
        edge_type_dist = {edge_type: len(edges) for edge_type, edges in self._edge_types.items()}

        # Hyperedge type distribution
        hyperedge_type_dist = {
            hyperedge_type: len(hyperedges) for hyperedge_type, hyperedges in self._hyperedge_types.items()
        }

        # Connectivity analysis
        isolated_nodes = 0
        for node_id in self._nodes:
            if not self.get_neighbors(node_id):
                isolated_nodes += 1

        return {
            "basic": self._stats.copy(),
            "node_types": node_type_dist,
            "edge_types": edge_type_dist,
            "hyperedge_types": hyperedge_type_dist,
            "connectivity": {"isolated_nodes": isolated_nodes, "connected_nodes": actual_nodes - isolated_nodes},
        }

    def get_subgraph(self, node_ids: set[str], include_edges: bool = True) -> "Graph":
        """Extract a subgraph containing specified nodes.

        Args:
            node_ids: Set of node IDs to include
            include_edges: Whether to include edges between included nodes

        Returns:
            New Graph object containing the subgraph
        """
        subgraph = Graph(name=f"{self.name}_subgraph", directed=self.directed)

        # Add nodes
        for node_id in node_ids:
            if node_id in self._nodes:
                node = self._nodes[node_id]
                subgraph.add_node(node.id, node.type, node.properties)

        # Add edges (if requested)
        if include_edges:
            for edge in self._edges.values():
                if edge.source in node_ids and edge.target in node_ids:
                    subgraph.add_edge(edge.id, edge.type, edge.source, edge.target, edge.properties)

        return subgraph

    # ==================== SERIALIZATION ====================
    # Placeholder methods, as serialisation should probably be handled by the corresponding
    # legacy BioCypher modules.

    def to_dict(self) -> dict[str, Any]:
        """Convert graph to dictionary representation.

        Returns:
            Dictionary representation of the graph
        """
        return {
            "name": self.name,
            "directed": self.directed,
            "nodes": [node.to_dict() for node in self._nodes.values()],
            "edges": [edge.to_dict() for edge in self._edges.values()],
            "hyperedges": [hyperedge.to_dict() for hyperedge in self._hyperedges.values()],
            "statistics": self.get_statistics(),
        }

    def to_json(self) -> str:
        """Convert graph to JSON string.

        Returns:
            JSON string representation of the graph
        """
        return json.dumps(self.to_dict(), indent=2)

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "Graph":
        """Create graph from dictionary representation.

        Args:
            data: Dictionary representation of the graph

        Returns:
            Graph object
        """
        graph = cls(name=data["name"], directed=data["directed"])

        # Add nodes
        for node_data in data["nodes"]:
            node = Node.from_dict(node_data)
            graph._nodes[node.id] = node
            graph._node_types[node.type].add(node.id)

        # Add edges
        for edge_data in data["edges"]:
            edge = Edge.from_dict(edge_data)
            graph._edges[edge.id] = edge
            graph._edge_types[edge.type].add(edge.id)
            graph._outgoing[edge.source].add(edge.id)
            graph._incoming[edge.target].add(edge.id)

        # Add hyperedges
        for hyperedge_data in data["hyperedges"]:
            hyperedge = HyperEdge.from_dict(hyperedge_data)
            graph._hyperedges[hyperedge.id] = hyperedge
            graph._hyperedge_types[hyperedge.type].add(hyperedge.id)

        # Update statistics
        graph._stats["nodes"] = len(graph._nodes)
        graph._stats["edges"] = len(graph._edges)
        graph._stats["hyperedges"] = len(graph._hyperedges)
        graph._stats["node_types"] = len(graph._node_types)
        graph._stats["edge_types"] = len(graph._edge_types)
        graph._stats["hyperedge_types"] = len(graph._hyperedge_types)

        return graph

    @classmethod
    def from_json_string(cls, json_str: str) -> "Graph":
        """Create graph from JSON string.

        Args:
            json_str: JSON string representation of the graph

        Returns:
            Graph object
        """
        data = json.loads(json_str)
        return cls.from_dict(data)

    def from_json(self, json_str: str) -> None:
        """Load graph data from JSON string into this graph instance.

        This method clears the existing graph and loads new data from JSON.

        Args:
            json_str: JSON string representation of the graph
        """
        data = json.loads(json_str)

        # Clear existing data
        self.clear()

        # Update graph properties
        self.name = data["name"]
        self.directed = data["directed"]

        # Add nodes
        for node_data in data["nodes"]:
            node = Node.from_dict(node_data)
            self._nodes[node.id] = node
            self._node_types[node.type].add(node.id)

        # Add edges
        for edge_data in data["edges"]:
            edge = Edge.from_dict(edge_data)
            self._edges[edge.id] = edge
            self._edge_types[edge.type].add(edge.id)
            self._outgoing[edge.source].add(edge.id)
            self._incoming[edge.target].add(edge.id)

        # Add hyperedges
        for hyperedge_data in data["hyperedges"]:
            hyperedge = HyperEdge.from_dict(hyperedge_data)
            self._hyperedges[hyperedge.id] = hyperedge
            self._hyperedge_types[hyperedge.type].add(hyperedge.id)

        # Update statistics
        self._stats["nodes"] = len(self._nodes)
        self._stats["edges"] = len(self._edges)
        self._stats["hyperedges"] = len(self._hyperedges)
        self._stats["node_types"] = len(self._node_types)
        self._stats["edge_types"] = len(self._edge_types)
        self._stats["hyperedge_types"] = len(self._hyperedge_types)

    # ==================== UTILITY METHODS ====================

    def clear(self) -> None:
        """Clear all nodes, edges, and hyperedges from the graph."""
        self._nodes.clear()
        self._edges.clear()
        self._hyperedges.clear()
        self._node_types.clear()
        self._edge_types.clear()
        self._hyperedge_types.clear()
        self._outgoing.clear()
        self._incoming.clear()
        self._stats = {"nodes": 0, "edges": 0, "hyperedges": 0, "node_types": 0, "edge_types": 0, "hyperedge_types": 0}

    def __len__(self) -> int:
        """Return the number of nodes in the graph."""
        return len(self._nodes)

    def __contains__(self, node_id: str) -> bool:
        """Check if a node exists in the graph."""
        return node_id in self._nodes

    def __iter__(self) -> Iterator[Node]:
        """Iterate over all nodes in the graph."""
        return iter(self._nodes.values())

    def __str__(self) -> str:
        """String representation of the graph."""
        stats = self.get_statistics()
        return (
            f"Graph(name='{self.name}', nodes={stats['basic']['nodes']}, "
            f"edges={stats['basic']['edges']}, hyperedges={stats['basic']['hyperedges']})"
        )

    def __repr__(self) -> str:
        return self.__str__()

__contains__(node_id)

Check if a node exists in the graph.

Source code in biocypher/_graph.py
def __contains__(self, node_id: str) -> bool:
    """Check if a node exists in the graph."""
    return node_id in self._nodes

__init__(name='graph', directed=True)

Initialize a new graph.

Parameters:

Name Type Description Default
name str

Name of the graph

'graph'
directed bool

Whether the graph is directed (default: True)

True
Source code in biocypher/_graph.py
def __init__(self, name: str = "graph", directed: bool = True):
    """Initialize a new graph.

    Args:
        name: Name of the graph
        directed: Whether the graph is directed (default: True)
    """
    self.name = name
    self.directed = directed

    # Core data structures
    self._nodes: dict[str, Node] = {}
    self._edges: dict[str, Edge] = {}
    self._hyperedges: dict[str, HyperEdge] = {}

    # Indexes for efficient querying
    self._node_types: dict[str, set[str]] = defaultdict(set)
    self._edge_types: dict[str, set[str]] = defaultdict(set)
    self._hyperedge_types: dict[str, set[str]] = defaultdict(set)

    # Adjacency indexes
    self._outgoing: dict[str, set[str]] = defaultdict(set)
    self._incoming: dict[str, set[str]] = defaultdict(set)

    # Statistics
    self._stats = {"nodes": 0, "edges": 0, "hyperedges": 0, "node_types": 0, "edge_types": 0, "hyperedge_types": 0}

__iter__()

Iterate over all nodes in the graph.

Source code in biocypher/_graph.py
def __iter__(self) -> Iterator[Node]:
    """Iterate over all nodes in the graph."""
    return iter(self._nodes.values())

__len__()

Return the number of nodes in the graph.

Source code in biocypher/_graph.py
def __len__(self) -> int:
    """Return the number of nodes in the graph."""
    return len(self._nodes)

__str__()

String representation of the graph.

Source code in biocypher/_graph.py
def __str__(self) -> str:
    """String representation of the graph."""
    stats = self.get_statistics()
    return (
        f"Graph(name='{self.name}', nodes={stats['basic']['nodes']}, "
        f"edges={stats['basic']['edges']}, hyperedges={stats['basic']['hyperedges']})"
    )

add_edge(edge_id, edge_type, source, target, properties=None)

Add an edge to the graph.

Parameters:

Name Type Description Default
edge_id str

Unique identifier for the edge

required
edge_type str

Type/category of the edge

required
source str

Source node ID

required
target str

Target node ID

required
properties dict[str, Any] | None

Optional properties dictionary

None

Returns:

Name Type Description
bool bool

True if edge was added, False if it already exists

Source code in biocypher/_graph.py
def add_edge(
    self, edge_id: str, edge_type: str, source: str, target: str, properties: dict[str, Any] | None = None
) -> bool:
    """Add an edge to the graph.

    Args:
        edge_id: Unique identifier for the edge
        edge_type: Type/category of the edge
        source: Source node ID
        target: Target node ID
        properties: Optional properties dictionary

    Returns:
        bool: True if edge was added, False if it already exists
    """
    if edge_id in self._edges:
        return False

    # Check if nodes exist
    if source not in self._nodes:
        raise ValueError(f"Source node '{source}' does not exist")
    if target not in self._nodes:
        raise ValueError(f"Target node '{target}' does not exist")

    edge = Edge(id=edge_id, type=edge_type, source=source, target=target, properties=properties or {})

    self._edges[edge_id] = edge
    self._edge_types[edge_type].add(edge_id)

    # Update adjacency indexes
    self._outgoing[source].add(edge_id)
    self._incoming[target].add(edge_id)

    self._stats["edges"] += 1
    self._stats["edge_types"] = len(self._edge_types)

    return True

add_hyperedge(hyperedge_id, hyperedge_type, nodes, properties=None)

Add a hyperedge to the graph.

Parameters:

Name Type Description Default
hyperedge_id str

Unique identifier for the hyperedge

required
hyperedge_type str

Type/category of the hyperedge

required
nodes set[str]

Set of node IDs to connect

required
properties dict[str, Any] | None

Optional properties dictionary

None

Returns:

Name Type Description
bool bool

True if hyperedge was added, False if it already exists

Source code in biocypher/_graph.py
def add_hyperedge(
    self, hyperedge_id: str, hyperedge_type: str, nodes: set[str], properties: dict[str, Any] | None = None
) -> bool:
    """Add a hyperedge to the graph.

    Args:
        hyperedge_id: Unique identifier for the hyperedge
        hyperedge_type: Type/category of the hyperedge
        nodes: Set of node IDs to connect
        properties: Optional properties dictionary

    Returns:
        bool: True if hyperedge was added, False if it already exists
    """
    if hyperedge_id in self._hyperedges:
        return False

    # Check if all nodes exist
    for node_id in nodes:
        if node_id not in self._nodes:
            raise ValueError(f"Node '{node_id}' does not exist")

    if len(nodes) < 2:
        raise ValueError("Hyperedge must connect at least 2 nodes")

    hyperedge = HyperEdge(id=hyperedge_id, type=hyperedge_type, nodes=nodes, properties=properties or {})

    self._hyperedges[hyperedge_id] = hyperedge
    self._hyperedge_types[hyperedge_type].add(hyperedge_id)

    self._stats["hyperedges"] += 1
    self._stats["hyperedge_types"] = len(self._hyperedge_types)

    return True

add_node(node_id, node_type, properties=None)

Add a node to the graph.

Parameters:

Name Type Description Default
node_id str

Unique identifier for the node

required
node_type str

Type/category of the node

required
properties dict[str, Any] | None

Optional properties dictionary

None

Returns:

Name Type Description
bool bool

True if node was added, False if it already exists

Source code in biocypher/_graph.py
def add_node(self, node_id: str, node_type: str, properties: dict[str, Any] | None = None) -> bool:
    """Add a node to the graph.

    Args:
        node_id: Unique identifier for the node
        node_type: Type/category of the node
        properties: Optional properties dictionary

    Returns:
        bool: True if node was added, False if it already exists
    """
    if node_id in self._nodes:
        return False

    node = Node(id=node_id, type=node_type, properties=properties or {})

    self._nodes[node_id] = node
    self._node_types[node_type].add(node_id)
    self._stats["nodes"] += 1
    self._stats["node_types"] = len(self._node_types)

    return True

clear()

Clear all nodes, edges, and hyperedges from the graph.

Source code in biocypher/_graph.py
def clear(self) -> None:
    """Clear all nodes, edges, and hyperedges from the graph."""
    self._nodes.clear()
    self._edges.clear()
    self._hyperedges.clear()
    self._node_types.clear()
    self._edge_types.clear()
    self._hyperedge_types.clear()
    self._outgoing.clear()
    self._incoming.clear()
    self._stats = {"nodes": 0, "edges": 0, "hyperedges": 0, "node_types": 0, "edge_types": 0, "hyperedge_types": 0}

find_paths(source, target, max_length=3)

Find all paths between two nodes.

Parameters:

Name Type Description Default
source str

Source node ID

required
target str

Target node ID

required
max_length int

Maximum path length

3

Returns:

Type Description
list[list[Edge]]

List of paths, each path is a list of Edge objects

Source code in biocypher/_graph.py
def find_paths(self, source: str, target: str, max_length: int = 3) -> list[list[Edge]]:
    """Find all paths between two nodes.

    Args:
        source: Source node ID
        target: Target node ID
        max_length: Maximum path length

    Returns:
        List of paths, each path is a list of Edge objects
    """
    if source not in self._nodes or target not in self._nodes:
        return []

    paths = []
    queue = deque([([], source)])
    visited = set()

    while queue:
        path, current = queue.popleft()

        if current == target and path:
            paths.append(path)
            continue

        if len(path) >= max_length:
            continue

        state = (current, len(path))
        if state in visited:
            continue
        visited.add(state)

        # Explore outgoing edges
        for edge_id in self._outgoing.get(current, set()):
            edge = self._edges[edge_id]
            new_path = path + [edge]
            queue.append((new_path, edge.target))

    return paths

from_dict(data) classmethod

Create graph from dictionary representation.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary representation of the graph

required

Returns:

Type Description
Graph

Graph object

Source code in biocypher/_graph.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Graph":
    """Create graph from dictionary representation.

    Args:
        data: Dictionary representation of the graph

    Returns:
        Graph object
    """
    graph = cls(name=data["name"], directed=data["directed"])

    # Add nodes
    for node_data in data["nodes"]:
        node = Node.from_dict(node_data)
        graph._nodes[node.id] = node
        graph._node_types[node.type].add(node.id)

    # Add edges
    for edge_data in data["edges"]:
        edge = Edge.from_dict(edge_data)
        graph._edges[edge.id] = edge
        graph._edge_types[edge.type].add(edge.id)
        graph._outgoing[edge.source].add(edge.id)
        graph._incoming[edge.target].add(edge.id)

    # Add hyperedges
    for hyperedge_data in data["hyperedges"]:
        hyperedge = HyperEdge.from_dict(hyperedge_data)
        graph._hyperedges[hyperedge.id] = hyperedge
        graph._hyperedge_types[hyperedge.type].add(hyperedge.id)

    # Update statistics
    graph._stats["nodes"] = len(graph._nodes)
    graph._stats["edges"] = len(graph._edges)
    graph._stats["hyperedges"] = len(graph._hyperedges)
    graph._stats["node_types"] = len(graph._node_types)
    graph._stats["edge_types"] = len(graph._edge_types)
    graph._stats["hyperedge_types"] = len(graph._hyperedge_types)

    return graph

from_json(json_str)

Load graph data from JSON string into this graph instance.

This method clears the existing graph and loads new data from JSON.

Parameters:

Name Type Description Default
json_str str

JSON string representation of the graph

required
Source code in biocypher/_graph.py
def from_json(self, json_str: str) -> None:
    """Load graph data from JSON string into this graph instance.

    This method clears the existing graph and loads new data from JSON.

    Args:
        json_str: JSON string representation of the graph
    """
    data = json.loads(json_str)

    # Clear existing data
    self.clear()

    # Update graph properties
    self.name = data["name"]
    self.directed = data["directed"]

    # Add nodes
    for node_data in data["nodes"]:
        node = Node.from_dict(node_data)
        self._nodes[node.id] = node
        self._node_types[node.type].add(node.id)

    # Add edges
    for edge_data in data["edges"]:
        edge = Edge.from_dict(edge_data)
        self._edges[edge.id] = edge
        self._edge_types[edge.type].add(edge.id)
        self._outgoing[edge.source].add(edge.id)
        self._incoming[edge.target].add(edge.id)

    # Add hyperedges
    for hyperedge_data in data["hyperedges"]:
        hyperedge = HyperEdge.from_dict(hyperedge_data)
        self._hyperedges[hyperedge.id] = hyperedge
        self._hyperedge_types[hyperedge.type].add(hyperedge.id)

    # Update statistics
    self._stats["nodes"] = len(self._nodes)
    self._stats["edges"] = len(self._edges)
    self._stats["hyperedges"] = len(self._hyperedges)
    self._stats["node_types"] = len(self._node_types)
    self._stats["edge_types"] = len(self._edge_types)
    self._stats["hyperedge_types"] = len(self._hyperedge_types)

from_json_string(json_str) classmethod

Create graph from JSON string.

Parameters:

Name Type Description Default
json_str str

JSON string representation of the graph

required

Returns:

Type Description
Graph

Graph object

Source code in biocypher/_graph.py
@classmethod
def from_json_string(cls, json_str: str) -> "Graph":
    """Create graph from JSON string.

    Args:
        json_str: JSON string representation of the graph

    Returns:
        Graph object
    """
    data = json.loads(json_str)
    return cls.from_dict(data)

get_connected_edges(node_id, direction='both')

Get edges connected to a node.

Parameters:

Name Type Description Default
node_id str

Node identifier

required
direction str

"in", "out", or "both"

'both'

Returns:

Type Description
list[Edge]

List of connected Edge objects

Source code in biocypher/_graph.py
def get_connected_edges(self, node_id: str, direction: str = "both") -> list[Edge]:
    """Get edges connected to a node.

    Args:
        node_id: Node identifier
        direction: "in", "out", or "both"

    Returns:
        List of connected Edge objects
    """
    edges = []

    if direction in ["out", "both"]:
        for edge_id in self._outgoing.get(node_id, set()):
            edges.append(self._edges[edge_id])

    if direction in ["in", "both"]:
        for edge_id in self._incoming.get(node_id, set()):
            edges.append(self._edges[edge_id])

    return edges

get_edge(edge_id)

Get an edge by ID.

Parameters:

Name Type Description Default
edge_id str

Edge identifier

required

Returns:

Type Description
Edge | None

Edge object or None if not found

Source code in biocypher/_graph.py
def get_edge(self, edge_id: str) -> Edge | None:
    """Get an edge by ID.

    Args:
        edge_id: Edge identifier

    Returns:
        Edge object or None if not found
    """
    return self._edges.get(edge_id)

get_edges(edge_type=None)

Get all edges, optionally filtered by type.

Parameters:

Name Type Description Default
edge_type str | None

Optional filter by edge type

None

Returns:

Type Description
list[Edge]

List of Edge objects

Source code in biocypher/_graph.py
def get_edges(self, edge_type: str | None = None) -> list[Edge]:
    """Get all edges, optionally filtered by type.

    Args:
        edge_type: Optional filter by edge type

    Returns:
        List of Edge objects
    """
    if edge_type is None:
        return list(self._edges.values())

    edge_ids = self._edge_types.get(edge_type, set())
    return [self._edges[edge_id] for edge_id in edge_ids]

get_edges_between(source, target, edge_type=None)

Get edges between two nodes.

Parameters:

Name Type Description Default
source str

Source node ID

required
target str

Target node ID

required
edge_type str | None

Optional filter by edge type

None

Returns:

Type Description
list[Edge]

List of Edge objects

Source code in biocypher/_graph.py
def get_edges_between(self, source: str, target: str, edge_type: str | None = None) -> list[Edge]:
    """Get edges between two nodes.

    Args:
        source: Source node ID
        target: Target node ID
        edge_type: Optional filter by edge type

    Returns:
        List of Edge objects
    """
    edges = []
    source_edges = self._outgoing.get(source, set())

    for edge_id in source_edges:
        edge = self._edges[edge_id]
        if edge.target == target and (edge_type is None or edge.type == edge_type):
            edges.append(edge)

    return edges

get_hyperedge(hyperedge_id)

Get a hyperedge by ID.

Parameters:

Name Type Description Default
hyperedge_id str

Hyperedge identifier

required

Returns:

Type Description
HyperEdge | None

HyperEdge object or None if not found

Source code in biocypher/_graph.py
def get_hyperedge(self, hyperedge_id: str) -> HyperEdge | None:
    """Get a hyperedge by ID.

    Args:
        hyperedge_id: Hyperedge identifier

    Returns:
        HyperEdge object or None if not found
    """
    return self._hyperedges.get(hyperedge_id)

get_hyperedges(hyperedge_type=None)

Get all hyperedges, optionally filtered by type.

Parameters:

Name Type Description Default
hyperedge_type str | None

Optional filter by hyperedge type

None

Returns:

Type Description
list[HyperEdge]

List of HyperEdge objects

Source code in biocypher/_graph.py
def get_hyperedges(self, hyperedge_type: str | None = None) -> list[HyperEdge]:
    """Get all hyperedges, optionally filtered by type.

    Args:
        hyperedge_type: Optional filter by hyperedge type

    Returns:
        List of HyperEdge objects
    """
    if hyperedge_type is None:
        return list(self._hyperedges.values())

    hyperedge_ids = self._hyperedge_types.get(hyperedge_type, set())
    return [self._hyperedges[hyperedge_id] for hyperedge_id in hyperedge_ids]

get_neighbors(node_id, direction='both')

Get neighboring nodes.

Parameters:

Name Type Description Default
node_id str

Node identifier

required
direction str

"in", "out", or "both"

'both'

Returns:

Type Description
set[str]

Set of neighboring node IDs

Source code in biocypher/_graph.py
def get_neighbors(self, node_id: str, direction: str = "both") -> set[str]:
    """Get neighboring nodes.

    Args:
        node_id: Node identifier
        direction: "in", "out", or "both"

    Returns:
        Set of neighboring node IDs
    """
    if node_id not in self._nodes:
        return set()

    neighbors = set()

    if direction in ["out", "both"]:
        for edge_id in self._outgoing.get(node_id, set()):
            edge = self._edges[edge_id]
            neighbors.add(edge.target)

    if direction in ["in", "both"]:
        for edge_id in self._incoming.get(node_id, set()):
            edge = self._edges[edge_id]
            neighbors.add(edge.source)

    return neighbors

get_node(node_id)

Get a node by ID.

Parameters:

Name Type Description Default
node_id str

Node identifier

required

Returns:

Type Description
Node | None

Node object or None if not found

Source code in biocypher/_graph.py
def get_node(self, node_id: str) -> Node | None:
    """Get a node by ID.

    Args:
        node_id: Node identifier

    Returns:
        Node object or None if not found
    """
    return self._nodes.get(node_id)

get_node_ids(node_type=None)

Get all node IDs, optionally filtered by type.

Parameters:

Name Type Description Default
node_type str | None

Optional filter by node type

None

Returns:

Type Description
set[str]

Set of node IDs

Source code in biocypher/_graph.py
def get_node_ids(self, node_type: str | None = None) -> set[str]:
    """Get all node IDs, optionally filtered by type.

    Args:
        node_type: Optional filter by node type

    Returns:
        Set of node IDs
    """
    if node_type is None:
        return set(self._nodes.keys())

    return self._node_types.get(node_type, set()).copy()

get_nodes(node_type=None)

Get all nodes, optionally filtered by type.

Parameters:

Name Type Description Default
node_type str | None

Optional filter by node type

None

Returns:

Type Description
list[Node]

List of Node objects

Source code in biocypher/_graph.py
def get_nodes(self, node_type: str | None = None) -> list[Node]:
    """Get all nodes, optionally filtered by type.

    Args:
        node_type: Optional filter by node type

    Returns:
        List of Node objects
    """
    if node_type is None:
        return list(self._nodes.values())

    node_ids = self._node_types.get(node_type, set())
    return [self._nodes[node_id] for node_id in node_ids]

get_statistics()

Get comprehensive graph statistics.

Returns:

Type Description
dict[str, Any]

Dictionary with graph statistics

Source code in biocypher/_graph.py
def get_statistics(self) -> dict[str, Any]:
    """Get comprehensive graph statistics.

    Returns:
        Dictionary with graph statistics
    """
    # Ensure we have the latest counts by recalculating from actual data
    actual_nodes = len(self._nodes)
    actual_edges = len(self._edges)
    actual_hyperedges = len(self._hyperedges)

    # Update internal stats to match actual counts
    self._stats["nodes"] = actual_nodes
    self._stats["edges"] = actual_edges
    self._stats["hyperedges"] = actual_hyperedges
    self._stats["node_types"] = len(self._node_types)
    self._stats["edge_types"] = len(self._edge_types)
    self._stats["hyperedge_types"] = len(self._hyperedge_types)

    # Node type distribution
    node_type_dist = {node_type: len(nodes) for node_type, nodes in self._node_types.items()}

    # Edge type distribution
    edge_type_dist = {edge_type: len(edges) for edge_type, edges in self._edge_types.items()}

    # Hyperedge type distribution
    hyperedge_type_dist = {
        hyperedge_type: len(hyperedges) for hyperedge_type, hyperedges in self._hyperedge_types.items()
    }

    # Connectivity analysis
    isolated_nodes = 0
    for node_id in self._nodes:
        if not self.get_neighbors(node_id):
            isolated_nodes += 1

    return {
        "basic": self._stats.copy(),
        "node_types": node_type_dist,
        "edge_types": edge_type_dist,
        "hyperedge_types": hyperedge_type_dist,
        "connectivity": {"isolated_nodes": isolated_nodes, "connected_nodes": actual_nodes - isolated_nodes},
    }

get_subgraph(node_ids, include_edges=True)

Extract a subgraph containing specified nodes.

Parameters:

Name Type Description Default
node_ids set[str]

Set of node IDs to include

required
include_edges bool

Whether to include edges between included nodes

True

Returns:

Type Description
Graph

New Graph object containing the subgraph

Source code in biocypher/_graph.py
def get_subgraph(self, node_ids: set[str], include_edges: bool = True) -> "Graph":
    """Extract a subgraph containing specified nodes.

    Args:
        node_ids: Set of node IDs to include
        include_edges: Whether to include edges between included nodes

    Returns:
        New Graph object containing the subgraph
    """
    subgraph = Graph(name=f"{self.name}_subgraph", directed=self.directed)

    # Add nodes
    for node_id in node_ids:
        if node_id in self._nodes:
            node = self._nodes[node_id]
            subgraph.add_node(node.id, node.type, node.properties)

    # Add edges (if requested)
    if include_edges:
        for edge in self._edges.values():
            if edge.source in node_ids and edge.target in node_ids:
                subgraph.add_edge(edge.id, edge.type, edge.source, edge.target, edge.properties)

    return subgraph

has_edge(edge_id)

Check if an edge exists.

Parameters:

Name Type Description Default
edge_id str

Edge identifier

required

Returns:

Name Type Description
bool bool

True if edge exists

Source code in biocypher/_graph.py
def has_edge(self, edge_id: str) -> bool:
    """Check if an edge exists.

    Args:
        edge_id: Edge identifier

    Returns:
        bool: True if edge exists
    """
    return edge_id in self._edges

has_hyperedge(hyperedge_id)

Check if a hyperedge exists.

Parameters:

Name Type Description Default
hyperedge_id str

Hyperedge identifier

required

Returns:

Name Type Description
bool bool

True if hyperedge exists

Source code in biocypher/_graph.py
def has_hyperedge(self, hyperedge_id: str) -> bool:
    """Check if a hyperedge exists.

    Args:
        hyperedge_id: Hyperedge identifier

    Returns:
        bool: True if hyperedge exists
    """
    return hyperedge_id in self._hyperedges

has_node(node_id)

Check if a node exists.

Parameters:

Name Type Description Default
node_id str

Node identifier

required

Returns:

Name Type Description
bool bool

True if node exists

Source code in biocypher/_graph.py
def has_node(self, node_id: str) -> bool:
    """Check if a node exists.

    Args:
        node_id: Node identifier

    Returns:
        bool: True if node exists
    """
    return node_id in self._nodes

remove_edge(edge_id)

Remove an edge from the graph.

Parameters:

Name Type Description Default
edge_id str

Edge identifier

required

Returns:

Name Type Description
bool bool

True if edge was removed, False if not found

Source code in biocypher/_graph.py
def remove_edge(self, edge_id: str) -> bool:
    """Remove an edge from the graph.

    Args:
        edge_id: Edge identifier

    Returns:
        bool: True if edge was removed, False if not found
    """
    if edge_id not in self._edges:
        return False

    edge = self._edges[edge_id]

    # Remove from type index
    self._edge_types[edge.type].discard(edge_id)
    if not self._edge_types[edge.type]:
        del self._edge_types[edge.type]

    # Remove from adjacency indexes
    self._outgoing[edge.source].discard(edge_id)
    self._incoming[edge.target].discard(edge_id)

    # Remove edge
    del self._edges[edge_id]
    self._stats["edges"] -= 1
    self._stats["edge_types"] = len(self._edge_types)

    return True

remove_node(node_id)

Remove a node and all its connected edges.

Parameters:

Name Type Description Default
node_id str

Node identifier

required

Returns:

Name Type Description
bool bool

True if node was removed, False if not found

Source code in biocypher/_graph.py
def remove_node(self, node_id: str) -> bool:
    """Remove a node and all its connected edges.

    Args:
        node_id: Node identifier

    Returns:
        bool: True if node was removed, False if not found
    """
    if node_id not in self._nodes:
        return False

    node = self._nodes[node_id]

    # Remove from type index
    self._node_types[node.type].discard(node_id)
    if not self._node_types[node.type]:
        del self._node_types[node.type]

    # Remove connected edges
    edges_to_remove = []
    for edge_id, edge in self._edges.items():
        if edge.source == node_id or edge.target == node_id:
            edges_to_remove.append(edge_id)

    for edge_id in edges_to_remove:
        self.remove_edge(edge_id)

    # Remove from adjacency indexes
    if node_id in self._outgoing:
        del self._outgoing[node_id]
    if node_id in self._incoming:
        del self._incoming[node_id]

    # Remove node
    del self._nodes[node_id]
    self._stats["nodes"] -= 1
    self._stats["node_types"] = len(self._node_types)

    return True

to_dict()

Convert graph to dictionary representation.

Returns:

Type Description
dict[str, Any]

Dictionary representation of the graph

Source code in biocypher/_graph.py
def to_dict(self) -> dict[str, Any]:
    """Convert graph to dictionary representation.

    Returns:
        Dictionary representation of the graph
    """
    return {
        "name": self.name,
        "directed": self.directed,
        "nodes": [node.to_dict() for node in self._nodes.values()],
        "edges": [edge.to_dict() for edge in self._edges.values()],
        "hyperedges": [hyperedge.to_dict() for hyperedge in self._hyperedges.values()],
        "statistics": self.get_statistics(),
    }

to_json()

Convert graph to JSON string.

Returns:

Type Description
str

JSON string representation of the graph

Source code in biocypher/_graph.py
def to_json(self) -> str:
    """Convert graph to JSON string.

    Returns:
        JSON string representation of the graph
    """
    return json.dumps(self.to_dict(), indent=2)

HyperEdge dataclass

Represents a hyperedge connecting multiple nodes.

Source code in biocypher/_graph.py
@dataclass
class HyperEdge:
    """Represents a hyperedge connecting multiple nodes."""

    id: str
    type: str
    nodes: set[str]
    properties: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        if not isinstance(self.id, str):
            raise ValueError("HyperEdge ID must be a string")
        if not isinstance(self.type, str):
            raise ValueError("HyperEdge type must be a string")
        if not isinstance(self.nodes, set):
            raise ValueError("HyperEdge nodes must be a set")
        if len(self.nodes) < 2:
            raise ValueError("HyperEdge must connect at least 2 nodes")

    def to_dict(self) -> dict[str, Any]:
        """Convert hyperedge to dictionary representation."""
        return {"id": self.id, "type": self.type, "nodes": list(self.nodes), "properties": self.properties}

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "HyperEdge":
        """Create hyperedge from dictionary representation."""
        return cls(id=data["id"], type=data["type"], nodes=set(data["nodes"]), properties=data.get("properties", {}))

from_dict(data) classmethod

Create hyperedge from dictionary representation.

Source code in biocypher/_graph.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "HyperEdge":
    """Create hyperedge from dictionary representation."""
    return cls(id=data["id"], type=data["type"], nodes=set(data["nodes"]), properties=data.get("properties", {}))

to_dict()

Convert hyperedge to dictionary representation.

Source code in biocypher/_graph.py
def to_dict(self) -> dict[str, Any]:
    """Convert hyperedge to dictionary representation."""
    return {"id": self.id, "type": self.type, "nodes": list(self.nodes), "properties": self.properties}

Node dataclass

Represents a node in the graph.

Source code in biocypher/_graph.py
@dataclass
class Node:
    """Represents a node in the graph."""

    id: str
    type: str
    properties: dict[str, Any] = field(default_factory=dict)

    def __post_init__(self):
        if not isinstance(self.id, str):
            raise ValueError("Node ID must be a string")
        if not isinstance(self.type, str):
            raise ValueError("Node type must be a string")

    def to_dict(self) -> dict[str, Any]:
        """Convert node to dictionary representation."""
        return {"id": self.id, "type": self.type, "properties": self.properties}

    @classmethod
    def from_dict(cls, data: dict[str, Any]) -> "Node":
        """Create node from dictionary representation."""
        return cls(id=data["id"], type=data["type"], properties=data.get("properties", {}))

from_dict(data) classmethod

Create node from dictionary representation.

Source code in biocypher/_graph.py
@classmethod
def from_dict(cls, data: dict[str, Any]) -> "Node":
    """Create node from dictionary representation."""
    return cls(id=data["id"], type=data["type"], properties=data.get("properties", {}))

to_dict()

Convert node to dictionary representation.

Source code in biocypher/_graph.py
def to_dict(self) -> dict[str, Any]:
    """Convert node to dictionary representation."""
    return {"id": self.id, "type": self.type, "properties": self.properties}

logger

Configuration of the module logger.

get_logger(name='biocypher')

Access the module logger, create a new one if does not exist yet.

Method providing central logger instance to main module. Is called only from main submodule, :mod:biocypher.driver. In child modules, the standard Python logging facility is called (using logging.getLogger(__name__)), automatically inheriting the handlers from the central logger.

The file handler creates a log file named after the current date and time. Levels to output to file and console can be set here.

Parameters:

Name Type Description Default
name str

Name of the logger instance.

'biocypher'

Returns:

Type Description
Logger

An instance of the Python 🇵🇾mod:logging.Logger.

Source code in biocypher/_logger.py
def get_logger(name: str = "biocypher") -> logging.Logger:
    """
    Access the module logger, create a new one if does not exist yet.

    Method providing central logger instance to main module. Is called
    only from main submodule, :mod:`biocypher.driver`. In child modules,
    the standard Python logging facility is called
    (using ``logging.getLogger(__name__)``), automatically inheriting
    the handlers from the central logger.

    The file handler creates a log file named after the current date and
    time. Levels to output to file and console can be set here.

    Args:
        name:
            Name of the logger instance.

    Returns:
        An instance of the Python :py:mod:`logging.Logger`.
    """

    if not logging.getLogger(name).hasHandlers():
        # create logger
        logger = logging.getLogger(name)
        logger.setLevel(logging.DEBUG)
        logger.propagate = True

        # formatting
        file_formatter = logging.Formatter(
            "%(asctime)s\t%(levelname)s\tmodule:%(module)s\n%(message)s",
        )
        stdout_formatter = logging.Formatter("%(levelname)s -- %(message)s")

        # file name and creation
        now = datetime.now()
        date_time = now.strftime("%Y%m%d-%H%M%S")

        log_to_disk = _config.config("biocypher").get("log_to_disk")

        if log_to_disk:
            logdir = _config.config("biocypher").get("log_directory") or "biocypher-log"
            os.makedirs(logdir, exist_ok=True)
            logfile = os.path.join(logdir, f"biocypher-{date_time}.log")

            # file handler
            file_handler = logging.FileHandler(logfile)

            if _config.config("biocypher").get("debug"):
                file_handler.setLevel(logging.DEBUG)
            else:
                file_handler.setLevel(logging.INFO)

            file_handler.setFormatter(file_formatter)

            logger.addHandler(file_handler)

        # handlers
        # stream handler
        stdout_handler = logging.StreamHandler()
        stdout_handler.setLevel(logging.INFO)
        stdout_handler.setFormatter(stdout_formatter)

        # add handlers
        logger.addHandler(stdout_handler)

        # startup message
        logger.info(f"This is BioCypher v{__version__}.")
        if log_to_disk:
            logger.info(f"Logging into `{logfile}`.")
        else:
            logger.info("Logging into stdout.")

    return logging.getLogger(name)

log()

Browse the log file.

Source code in biocypher/_logger.py
def log():
    """
    Browse the log file.
    """

    with open(logfile()) as fp:
        pydoc.pager(fp.read())

logfile()

Path to the log file.

Source code in biocypher/_logger.py
def logfile() -> str:
    """
    Path to the log file.
    """

    return get_logger().handlers[0].baseFilename

mapping

BioCypher 'mapping' module. Handles the mapping of user-defined schema to the underlying ontology.

OntologyMapping

Class to store the ontology mapping and extensions.

Source code in biocypher/_mapping.py
class OntologyMapping:
    """
    Class to store the ontology mapping and extensions.
    """

    def __init__(self, config_file: str = None):
        self.schema = self._read_config(config_file)

        self.extended_schema = self._extend_schema()

    def _read_config(self, config_file: str = None):
        """
        Read the configuration file and store the ontology mapping and extensions.
        """
        if config_file is None:
            schema_config = {}

        # load yaml file from web
        elif config_file.startswith("http"):
            with urlopen(config_file) as f:
                schema_config = yaml.safe_load(f)

        # get graph state from config (assume file is local)
        else:
            with open(config_file, "r") as f:
                schema_config = yaml.safe_load(f)

        return schema_config

    def _extend_schema(self, d: Optional[dict] = None) -> dict:
        """
        Get leaves of the tree hierarchy from the data structure dict
        contained in the `schema_config.yaml`. Creates virtual leaves
        (as children) from entries that provide more than one preferred
        id type (and corresponding inputs).

        Args:
            d:
                Data structure dict from yaml file.

        """

        d = d or self.schema

        extended_schema = dict()

        # first pass: get parent leaves with direct representation in ontology
        for k, v in d.items():
            # k is not an entity
            if "represented_as" not in v:
                continue

            # preferred_id optional: if not provided, use `id`
            if not v.get("preferred_id"):
                v["preferred_id"] = "id"

            # k is an entity that is present in the ontology
            if "is_a" not in v:
                extended_schema[k] = v

        # second pass: "vertical" inheritance
        d = self._vertical_property_inheritance(d)
        for k, v in d.items():
            if "is_a" in v:
                # prevent loops
                if k == v["is_a"]:
                    logger.warning(
                        f"Loop detected in ontology mapping: {k} -> {v}. "
                        "Removing item. Please fix the inheritance if you want "
                        "to use this item."
                    )
                    continue

                extended_schema[k] = v

        # "horizontal" inheritance: create siblings for multiple identifiers or
        # sources -> virtual leaves or implicit children
        mi_leaves = {}
        ms_leaves = {}
        for k, v in d.items():
            # k is not an entity
            if "represented_as" not in v:
                continue

            if isinstance(v.get("preferred_id"), list):
                mi_leaves = self._horizontal_inheritance_pid(k, v)
                extended_schema.update(mi_leaves)

            elif isinstance(v.get("source"), list):
                ms_leaves = self._horizontal_inheritance_source(k, v)
                extended_schema.update(ms_leaves)

        return extended_schema

    def _vertical_property_inheritance(self, d):
        """
        Inherit properties from parents to children and update `d` accordingly.
        """
        for k, v in d.items():
            # k is not an entity
            if "represented_as" not in v:
                continue

            # k is an entity that is present in the ontology
            if "is_a" not in v:
                continue

            # "vertical" inheritance: inherit properties from parent
            if v.get("inherit_properties", False):
                # get direct ancestor
                if isinstance(v["is_a"], list):
                    parent = v["is_a"][0]
                else:
                    parent = v["is_a"]

                # ensure child has properties and exclude_properties
                if "properties" not in v:
                    v["properties"] = {}
                if "exclude_properties" not in v:
                    v["exclude_properties"] = {}

                # update properties of child
                parent_props = self.schema[parent].get("properties", {})
                if parent_props:
                    v["properties"].update(parent_props)

                parent_excl_props = self.schema[parent].get("exclude_properties", {})
                if parent_excl_props:
                    v["exclude_properties"].update(parent_excl_props)

                # update schema (d)
                d[k] = v

        return d

    def _horizontal_inheritance_pid(self, key, value):
        """
        Create virtual leaves for multiple preferred id types or sources.

        If we create virtual leaves, input_label/label_in_input always has to be
        a list.
        """

        leaves = {}

        preferred_id = value["preferred_id"]
        input_label = value.get("input_label") or value["label_in_input"]
        represented_as = value["represented_as"]

        # adjust lengths
        max_l = max(
            [
                len(_misc.to_list(preferred_id)),
                len(_misc.to_list(input_label)),
                len(_misc.to_list(represented_as)),
            ],
        )

        # adjust pid length if necessary
        if isinstance(preferred_id, str):
            pids = [preferred_id] * max_l
        else:
            pids = preferred_id

        # adjust rep length if necessary
        if isinstance(represented_as, str):
            reps = [represented_as] * max_l
        else:
            reps = represented_as

        for pid, lab, rep in zip(pids, input_label, reps):
            skey = pid + "." + key
            svalue = {
                "preferred_id": pid,
                "input_label": lab,
                "represented_as": rep,
                # mark as virtual
                "virtual": True,
            }

            # inherit is_a if exists
            if "is_a" in value.keys():
                # treat as multiple inheritance
                if isinstance(value["is_a"], list):
                    v = list(value["is_a"])
                    v.insert(0, key)
                    svalue["is_a"] = v

                else:
                    svalue["is_a"] = [key, value["is_a"]]

            else:
                # set parent as is_a
                svalue["is_a"] = key

            # inherit everything except core attributes
            for k, v in value.items():
                if k not in [
                    "is_a",
                    "preferred_id",
                    "input_label",
                    "label_in_input",
                    "represented_as",
                ]:
                    svalue[k] = v

            leaves[skey] = svalue

        return leaves

    def _horizontal_inheritance_source(self, key, value):
        """
        Create virtual leaves for multiple sources.

        If we create virtual leaves, input_label/label_in_input always has to be
        a list.
        """

        leaves = {}

        source = value["source"]
        input_label = value.get("input_label") or value["label_in_input"]
        represented_as = value["represented_as"]

        # adjust lengths
        src_l = len(source)

        # adjust label length if necessary
        if isinstance(input_label, str):
            labels = [input_label] * src_l
        else:
            labels = input_label

        # adjust rep length if necessary
        if isinstance(represented_as, str):
            reps = [represented_as] * src_l
        else:
            reps = represented_as

        for src, lab, rep in zip(source, labels, reps):
            skey = src + "." + key
            svalue = {
                "source": src,
                "input_label": lab,
                "represented_as": rep,
                # mark as virtual
                "virtual": True,
            }

            # inherit is_a if exists
            if "is_a" in value.keys():
                # treat as multiple inheritance
                if isinstance(value["is_a"], list):
                    v = list(value["is_a"])
                    v.insert(0, key)
                    svalue["is_a"] = v

                else:
                    svalue["is_a"] = [key, value["is_a"]]

            else:
                # set parent as is_a
                svalue["is_a"] = key

            # inherit everything except core attributes
            for k, v in value.items():
                if k not in [
                    "is_a",
                    "source",
                    "input_label",
                    "label_in_input",
                    "represented_as",
                ]:
                    svalue[k] = v

            leaves[skey] = svalue

        return leaves

metadata

Package metadata (version, authors, etc).

get_metadata()

Basic package metadata.

Retrieves package metadata from the current project directory or from the installed package.

Source code in biocypher/_metadata.py
def get_metadata():
    """
    Basic package metadata.

    Retrieves package metadata from the current project directory or from
    the installed package.
    """

    here = pathlib.Path(__file__).parent
    pyproj_toml = "pyproject.toml"
    meta = {}

    for project_dir in (here, here.parent):
        toml_path = str(project_dir.joinpath(pyproj_toml).absolute())

        if os.path.exists(toml_path) and toml is not None:
            try:
                pyproject = toml.load(toml_path)
            except Exception:
                # If toml parsing fails, skip and use fallback
                continue

            # Use modern PEP 621 format (uv/hatchling)
            if "project" in pyproject:
                project = pyproject["project"]
                meta = {
                    "name": project.get("name"),
                    "version": project.get("version"),
                    "author": project.get("authors", []),
                    "license": project.get("license", {}).get("text"),
                    "full_metadata": pyproject,
                }
            elif "tool" in pyproject and "poetry" in pyproject["tool"]:
                # Legacy Poetry format fallback (for backward compatibility)
                poetry = pyproject["tool"]["poetry"]
                meta = {
                    "name": poetry.get("name"),
                    "version": poetry.get("version"),
                    "author": poetry.get("authors", []),
                    "license": poetry.get("license"),
                    "full_metadata": pyproject,
                }

            break

    if not meta:
        try:
            meta = {k.lower(): v for k, v in importlib.metadata.metadata(here.name).items()}

        except importlib.metadata.PackageNotFoundError:
            pass

    meta["version"] = meta.get("version", None) or _VERSION

    return meta

misc

Handy functions for use in various places.

create_tree_visualisation(inheritance_graph)

Create a visualisation of the inheritance tree using treelib.

Source code in biocypher/_misc.py
def create_tree_visualisation(inheritance_graph: dict | nx.Graph) -> Tree:
    """Create a visualisation of the inheritance tree using treelib."""
    inheritance_tree = _get_inheritance_tree(inheritance_graph)
    classes, root = _find_root_node(inheritance_tree)

    tree = Tree()
    tree.create_node(root, root)
    while classes:
        for child in classes:
            parent = inheritance_tree[child]
            if parent in tree.nodes.keys() or parent == root:
                tree.create_node(child, child, parent=parent)

        for node in tree.nodes.keys():
            if node in classes:
                classes.remove(node)

    return tree

ensure_iterable(value)

Return iterables, except strings, wrap simple types into tuple.

Source code in biocypher/_misc.py
def ensure_iterable(value: Any) -> Iterable:
    """Return iterables, except strings, wrap simple types into tuple."""
    return value if isinstance(value, LIST_LIKE) else (value,)

is_nested(lst)

Check if a list is nested.


lst (list): The list to check.

bool: True if the list is nested, False otherwise.
Source code in biocypher/_misc.py
def is_nested(lst: list) -> bool:
    """Check if a list is nested.

    Args:
    ----
        lst (list): The list to check.

    Returns:
    -------
        bool: True if the list is nested, False otherwise.

    """
    for item in lst:
        if isinstance(item, list):
            return True
    return False

pascalcase_to_sentencecase(s)

Convert PascalCase to sentence case.


s: Input string in PascalCase

string in sentence case form
Source code in biocypher/_misc.py
def pascalcase_to_sentencecase(s: str) -> str:
    """Convert PascalCase to sentence case.

    Args:
    ----
        s: Input string in PascalCase

    Returns:
    -------
        string in sentence case form

    """
    return from_pascal(s, sep=" ")

sentencecase_to_pascalcase(s, sep='\\s')

Convert sentence case to PascalCase.


s: Input string in sentence case
sep: Separator for the words in the input string

string in PascalCase form
Source code in biocypher/_misc.py
def sentencecase_to_pascalcase(s: str, sep: str = r"\s") -> str:
    """Convert sentence case to PascalCase.

    Args:
    ----
        s: Input string in sentence case
        sep: Separator for the words in the input string

    Returns:
    -------
        string in PascalCase form

    """
    return re.sub(
        r"(?:^|[" + sep + "])([a-zA-Z])",
        lambda match: match.group(1).upper(),
        s,
    )

sentencecase_to_snakecase(s)

Convert sentence case to snake_case.


s: Input string in sentence case

string in snake_case form
Source code in biocypher/_misc.py
def sentencecase_to_snakecase(s: str) -> str:
    """Convert sentence case to snake_case.

    Args:
    ----
        s: Input string in sentence case

    Returns:
    -------
        string in snake_case form

    """
    return "_".join(s.lower().split())

snakecase_to_sentencecase(s)

Convert snake_case to sentence case.


s: Input string in snake_case

string in sentence case form
Source code in biocypher/_misc.py
def snakecase_to_sentencecase(s: str) -> str:
    """Convert snake_case to sentence case.

    Args:
    ----
        s: Input string in snake_case

    Returns:
    -------
        string in sentence case form

    """
    return " ".join(word.lower() for word in s.split("_"))

to_list(value)

Ensure that value is a list.

Source code in biocypher/_misc.py
def to_list(value: Any) -> list:
    """Ensure that ``value`` is a list."""
    if isinstance(value, LIST_LIKE):
        value = list(value)

    else:
        value = [value]

    return value

to_lower_sentence_case(s)

Convert any string to lower sentence case.

Works with snake_case, PascalCase, and sentence case.


s: Input string

string in lower sentence case form
Source code in biocypher/_misc.py
def to_lower_sentence_case(s: str) -> str:
    """Convert any string to lower sentence case.

    Works with snake_case, PascalCase, and sentence case.

    Args:
    ----
        s: Input string

    Returns:
    -------
        string in lower sentence case form

    """
    if "_" in s:
        return snakecase_to_sentencecase(s)
    elif " " in s:
        return s.lower()
    elif s[0].isupper():
        return pascalcase_to_sentencecase(s)
    else:
        return s

ontology

BioCypher 'ontology' module to parse and represent ontologies.

Also performs ontology hybridisation and other advanced operations.

Ontology

A class that represents the ontological "backbone" of a KG.

The ontology can be built from a single resource, or hybridised from a combination of resources, with one resource being the "head" ontology, while an arbitrary number of other resources can become "tail" ontologies at arbitrary fusion points inside the "head" ontology.

Source code in biocypher/_ontology.py
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
class Ontology:
    """A class that represents the ontological "backbone" of a KG.

    The ontology can be built from a single resource, or hybridised from a
    combination of resources, with one resource being the "head" ontology, while
    an arbitrary number of other resources can become "tail" ontologies at
    arbitrary fusion points inside the "head" ontology.
    """

    def __init__(
        self,
        head_ontology: dict,
        ontology_mapping: Optional["OntologyMapping"] = None,
        tail_ontologies: dict | None = None,
    ):
        """Initialize the Ontology class.

        Args:
        ----
            head_ontology (OntologyAdapter): The head ontology.

            tail_ontologies (list): A list of OntologyAdapters that will be
                added to the head ontology. Defaults to None.

        """
        self._head_ontology_meta = head_ontology
        self.mapping = ontology_mapping
        self._tail_ontology_meta = tail_ontologies

        self._tail_ontologies = None
        self._nx_graph = None

        # keep track of nodes that have been extended
        self._extended_nodes = set()

        self._main()

    def _main(self) -> None:
        """Instantiate the ontology.

        Loads the ontologies, joins them, and returns the hybrid ontology.
        Loads only the head ontology if nothing else is given. Adds user
        extensions and properties from the mapping.
        """
        self._load_ontologies()

        if self._tail_ontologies:
            for adapter in self._tail_ontologies.values():
                head_join_node = self._get_head_join_node(adapter)
                self._join_ontologies(adapter, head_join_node)
        else:
            self._nx_graph = self._head_ontology.get_nx_graph()

        if self.mapping:
            self._extend_ontology()

            # experimental: add connections of disjoint classes to entity
            # self._connect_biolink_classes()

            self._add_properties()

    def _load_ontologies(self) -> None:
        """For each ontology, load the OntologyAdapter object.

        Store it as an instance variable (head) or in an instance dictionary
        (tail).
        """
        logger.info("Loading ontologies...")

        self._head_ontology = OntologyAdapter(
            ontology_file=self._head_ontology_meta["url"],
            root_label=self._head_ontology_meta["root_node"],
            ontology_file_format=self._head_ontology_meta.get("format", None),
            switch_label_and_id=self._head_ontology_meta.get("switch_label_and_id", True),
        )

        if self._tail_ontology_meta:
            self._tail_ontologies = {}
            for key, value in self._tail_ontology_meta.items():
                self._tail_ontologies[key] = OntologyAdapter(
                    ontology_file=value["url"],
                    root_label=value["tail_join_node"],
                    head_join_node_label=value["head_join_node"],
                    ontology_file_format=value.get("format", None),
                    merge_nodes=value.get("merge_nodes", True),
                    switch_label_and_id=value.get("switch_label_and_id", True),
                )

    def _get_head_join_node(self, adapter: OntologyAdapter) -> str:
        """Try to find the head join node of the given ontology adapter.

        Find the node in the head ontology that is the head join node. If the
        join node is not found, the method will raise an error.

        Args:
        ----
            adapter (OntologyAdapter): The ontology adapter of which to find the
                join node in the head ontology.

        Returns:
        -------
            str: The head join node in the head ontology.

        Raises:
        ------
            ValueError: If the head join node is not found in the head ontology.

        """
        head_join_node = None
        user_defined_head_join_node_label = adapter.get_head_join_node()
        head_join_node_label_in_bc_format = to_lower_sentence_case(user_defined_head_join_node_label.replace("_", " "))

        if self._head_ontology._switch_label_and_id:
            head_join_node = head_join_node_label_in_bc_format
        elif not self._head_ontology._switch_label_and_id:
            for node_id, data in self._head_ontology.get_nx_graph().nodes(data=True):
                if "label" in data and data["label"] == head_join_node_label_in_bc_format:
                    head_join_node = node_id
                    break

        if head_join_node not in self._head_ontology.get_nx_graph().nodes:
            head_ontology = self._head_ontology._rdf_to_nx(
                self._head_ontology.get_rdf_graph(),
                self._head_ontology._root_label,
                self._head_ontology._switch_label_and_id,
                rename_nodes=False,
            )
            msg = (
                f"Head join node '{head_join_node}' not found in head ontology. "
                f"The head ontology contains the following nodes: {head_ontology.nodes}."
            )
            logger.error(msg)
            raise ValueError(msg)
        return head_join_node

    def _join_ontologies(self, adapter: OntologyAdapter, head_join_node) -> None:
        """Join the present ontologies.

        Join two ontologies by adding the tail ontology as a subgraph to the
        head ontology at the specified join nodes.

        Args:
        ----
            adapter (OntologyAdapter): The ontology adapter of the tail ontology
                to be added to the head ontology.

        """
        if not self._nx_graph:
            self._nx_graph = self._head_ontology.get_nx_graph().copy()

        tail_join_node = adapter.get_root_node()
        tail_ontology = adapter.get_nx_graph()

        # subtree of tail ontology at join node
        tail_ontology_subtree = nx.dfs_tree(tail_ontology.reverse(), tail_join_node).reverse()

        # transfer node attributes from tail ontology to subtree
        for node in tail_ontology_subtree.nodes:
            tail_ontology_subtree.nodes[node].update(tail_ontology.nodes[node])

        # if merge_nodes is False, create parent of tail join node from head
        # join node
        if not adapter._merge_nodes:
            # add head join node from head ontology to tail ontology subtree
            # as parent of tail join node
            tail_ontology_subtree.add_node(
                head_join_node,
                **self._head_ontology.get_nx_graph().nodes[head_join_node],
            )
            tail_ontology_subtree.add_edge(tail_join_node, head_join_node)

        # else rename tail join node to match head join node if necessary
        elif tail_join_node != head_join_node:
            tail_ontology_subtree = nx.relabel_nodes(tail_ontology_subtree, {tail_join_node: head_join_node})

        # combine head ontology and tail subtree
        self._nx_graph = nx.compose(self._nx_graph, tail_ontology_subtree)

    def _extend_ontology(self) -> None:
        """Add the user extensions to the ontology.

        Tries to find the parent in the ontology, adds it if necessary, and adds
        the child and a directed edge from child to parent. Can handle multiple
        parents.
        """
        if not self._nx_graph:
            self._nx_graph = self._head_ontology.get_nx_graph().copy()

        for key, value in self.mapping.extended_schema.items():
            # If this class is either a root or a synonym.
            if not value.get("is_a"):
                # If it is a synonym.
                if self._nx_graph.has_node(value.get("synonym_for")):
                    continue

                # If this class is in the schema, but not in the loaded vocabulary.
                if not self._nx_graph.has_node(key):
                    msg = (
                        f"Node {key} not found in ontology, but also has no inheritance definition. Please check your "
                        "schema for spelling errors, first letter not in lower case, use of underscores, a missing "
                        "`is_a` definition (SubClassOf a root node), or missing labels in class or super-classes."
                    )
                    logger.error(msg)
                    raise ValueError(msg)

                # It is a root and it is in the loaded vocabulary.
                continue

            # It is not a root.
            parents = to_list(value.get("is_a"))
            child = key

            while parents:
                parent = parents.pop(0)

                if parent not in self._nx_graph.nodes:
                    self._nx_graph.add_node(parent)
                    self._nx_graph.nodes[parent]["label"] = sentencecase_to_pascalcase(parent)

                    # mark parent as user extension
                    self._nx_graph.nodes[parent]["user_extension"] = True
                    self._extended_nodes.add(parent)

                if child not in self._nx_graph.nodes:
                    self._nx_graph.add_node(child)
                    self._nx_graph.nodes[child]["label"] = sentencecase_to_pascalcase(child)

                    # mark child as user extension
                    self._nx_graph.nodes[child]["user_extension"] = True
                    self._extended_nodes.add(child)

                self._nx_graph.add_edge(child, parent)

                child = parent

    def _connect_biolink_classes(self) -> None:
        """Experimental: Adds edges from disjoint classes to the entity node."""
        if not self._nx_graph:
            self._nx_graph = self._head_ontology.get_nx_graph().copy()

        if "entity" not in self._nx_graph.nodes:
            return

        # biolink classes that are disjoint from entity
        disjoint_classes = [
            "frequency qualifier mixin",
            "chemical entity to entity association mixin",
            "ontology class",
            "relationship quantifier",
            "physical essence or occurrent",
            "gene or gene product",
            "subject of investigation",
        ]

        for node in disjoint_classes:
            if not self._nx_graph.nodes.get(node):
                self._nx_graph.add_node(node)
                self._nx_graph.nodes[node]["label"] = sentencecase_to_pascalcase(node)

            self._nx_graph.add_edge(node, "entity")

    def _add_properties(self) -> None:
        """Add properties to the ontology.

        For each entity in the mapping, update the ontology with the properties
        specified in the mapping. Updates synonym information in the graph,
        setting the synonym as the primary node label.
        """
        for key, value in self.mapping.extended_schema.items():
            if key in self._nx_graph.nodes:
                self._nx_graph.nodes[key].update(value)

            if value.get("synonym_for"):
                # change node label to synonym
                if value["synonym_for"] not in self._nx_graph.nodes:
                    msg = f"Node {value['synonym_for']} not found in ontology."
                    logger.error(msg)
                    raise ValueError(msg)

                self._nx_graph = nx.relabel_nodes(self._nx_graph, {value["synonym_for"]: key})

    def get_ancestors(self, node_label: str) -> list:
        """Get the ancestors of a node in the ontology.

        Args:
        ----
            node_label (str): The label of the node in the ontology.

        Returns:
        -------
            list: A list of the ancestors of the node.

        """
        return nx.dfs_tree(self._nx_graph, node_label)

    def show_ontology_structure(self, to_disk: str = None, full: bool = False):
        """Show the ontology structure using treelib or write to GRAPHML file.

        Args:
        ----
            to_disk (str): If specified, the ontology structure will be saved
                to disk as a GRAPHML file at the location (directory) specified
                by the `to_disk` string, to be opened in your favourite graph
                visualisation tool.

            full (bool): If True, the full ontology structure will be shown,
                including all nodes and edges. If False, only the nodes and
                edges that are relevant to the extended schema will be shown.

        """
        if not full and not self.mapping.extended_schema:
            msg = (
                "You are attempting to visualise a subset of the loaded"
                "ontology, but have not provided a schema configuration. "
                "To display a partial ontology graph, please provide a schema "
                "configuration file; to visualise the full graph, please use "
                "the parameter `full = True`.",
            )
            logger.error(msg)
            raise ValueError(msg)

        if not self._nx_graph:
            msg = "Ontology not loaded."
            logger.error(msg)
            raise ValueError(msg)

        if not self._tail_ontologies:
            msg = f"Showing ontology structure based on {self._head_ontology._ontology_file}"

        else:
            msg = f"Showing ontology structure based on {len(self._tail_ontology_meta) + 1} ontologies: "

        logger.info(msg)

        if not full:
            # set of leaves and their intermediate parents up to the root
            filter_nodes = set(self.mapping.extended_schema.keys())

            for node in self.mapping.extended_schema.keys():
                filter_nodes.update(self.get_ancestors(node).nodes)

            # filter graph
            G = self._nx_graph.subgraph(filter_nodes)

        else:
            G = self._nx_graph

        if not to_disk:
            # create tree
            tree = create_tree_visualisation(G)

            # add synonym information
            for node in self.mapping.extended_schema:
                if not isinstance(self.mapping.extended_schema[node], dict):
                    continue
                if self.mapping.extended_schema[node].get("synonym_for"):
                    tree.nodes[node].tag = f"{node} = {self.mapping.extended_schema[node].get('synonym_for')}"

            logger.info(f"\n{tree}")

            return tree

        else:
            # convert lists/dicts to strings for vis only
            for node in G.nodes:
                # rename node and use former id as label
                label = G.nodes[node].get("label")

                if not label:
                    label = node

                G = nx.relabel_nodes(G, {node: label})
                G.nodes[label]["label"] = node

                for attrib in G.nodes[label]:
                    if type(G.nodes[label][attrib]) in [list, dict]:
                        G.nodes[label][attrib] = str(G.nodes[label][attrib])

            path = os.path.join(to_disk, "ontology_structure.graphml")

            logger.info(f"Writing ontology structure to {path}.")

            nx.write_graphml(G, path)

            return True

    def get_dict(self) -> dict:
        """Return a dictionary representation of the ontology.

        The dictionary is compatible with a BioCypher node for compatibility
        with the Neo4j driver.
        """
        d = {
            "node_id": self._get_current_id(),
            "node_label": "BioCypher",
            "properties": {
                "schema": "self.ontology_mapping.extended_schema",
            },
        }

        return d

    def _get_current_id(self):
        """Instantiate a version ID for the current session.

        For now does simple versioning using datetime.

        Can later implement incremental versioning, versioning from
        config file, or manual specification via argument.
        """
        now = datetime.now()
        return now.strftime("v%Y%m%d-%H%M%S")

    def get_rdf_graph(self):
        """Return the merged RDF graph.

        Return the merged graph of all loaded ontologies (head and tails).
        """
        graph = self._head_ontology.get_rdf_graph()
        if self._tail_ontologies:
            for key, onto in self._tail_ontologies.items():
                assert type(onto) == OntologyAdapter
                # RDFlib uses the + operator for merging.
                graph += onto.get_rdf_graph()
        return graph

__init__(head_ontology, ontology_mapping=None, tail_ontologies=None)

Initialize the Ontology class.


head_ontology (OntologyAdapter): The head ontology.

tail_ontologies (list): A list of OntologyAdapters that will be
    added to the head ontology. Defaults to None.
Source code in biocypher/_ontology.py
def __init__(
    self,
    head_ontology: dict,
    ontology_mapping: Optional["OntologyMapping"] = None,
    tail_ontologies: dict | None = None,
):
    """Initialize the Ontology class.

    Args:
    ----
        head_ontology (OntologyAdapter): The head ontology.

        tail_ontologies (list): A list of OntologyAdapters that will be
            added to the head ontology. Defaults to None.

    """
    self._head_ontology_meta = head_ontology
    self.mapping = ontology_mapping
    self._tail_ontology_meta = tail_ontologies

    self._tail_ontologies = None
    self._nx_graph = None

    # keep track of nodes that have been extended
    self._extended_nodes = set()

    self._main()

get_ancestors(node_label)

Get the ancestors of a node in the ontology.


node_label (str): The label of the node in the ontology.

list: A list of the ancestors of the node.
Source code in biocypher/_ontology.py
def get_ancestors(self, node_label: str) -> list:
    """Get the ancestors of a node in the ontology.

    Args:
    ----
        node_label (str): The label of the node in the ontology.

    Returns:
    -------
        list: A list of the ancestors of the node.

    """
    return nx.dfs_tree(self._nx_graph, node_label)

get_dict()

Return a dictionary representation of the ontology.

The dictionary is compatible with a BioCypher node for compatibility with the Neo4j driver.

Source code in biocypher/_ontology.py
def get_dict(self) -> dict:
    """Return a dictionary representation of the ontology.

    The dictionary is compatible with a BioCypher node for compatibility
    with the Neo4j driver.
    """
    d = {
        "node_id": self._get_current_id(),
        "node_label": "BioCypher",
        "properties": {
            "schema": "self.ontology_mapping.extended_schema",
        },
    }

    return d

get_rdf_graph()

Return the merged RDF graph.

Return the merged graph of all loaded ontologies (head and tails).

Source code in biocypher/_ontology.py
def get_rdf_graph(self):
    """Return the merged RDF graph.

    Return the merged graph of all loaded ontologies (head and tails).
    """
    graph = self._head_ontology.get_rdf_graph()
    if self._tail_ontologies:
        for key, onto in self._tail_ontologies.items():
            assert type(onto) == OntologyAdapter
            # RDFlib uses the + operator for merging.
            graph += onto.get_rdf_graph()
    return graph

show_ontology_structure(to_disk=None, full=False)

Show the ontology structure using treelib or write to GRAPHML file.


to_disk (str): If specified, the ontology structure will be saved
    to disk as a GRAPHML file at the location (directory) specified
    by the `to_disk` string, to be opened in your favourite graph
    visualisation tool.

full (bool): If True, the full ontology structure will be shown,
    including all nodes and edges. If False, only the nodes and
    edges that are relevant to the extended schema will be shown.
Source code in biocypher/_ontology.py
def show_ontology_structure(self, to_disk: str = None, full: bool = False):
    """Show the ontology structure using treelib or write to GRAPHML file.

    Args:
    ----
        to_disk (str): If specified, the ontology structure will be saved
            to disk as a GRAPHML file at the location (directory) specified
            by the `to_disk` string, to be opened in your favourite graph
            visualisation tool.

        full (bool): If True, the full ontology structure will be shown,
            including all nodes and edges. If False, only the nodes and
            edges that are relevant to the extended schema will be shown.

    """
    if not full and not self.mapping.extended_schema:
        msg = (
            "You are attempting to visualise a subset of the loaded"
            "ontology, but have not provided a schema configuration. "
            "To display a partial ontology graph, please provide a schema "
            "configuration file; to visualise the full graph, please use "
            "the parameter `full = True`.",
        )
        logger.error(msg)
        raise ValueError(msg)

    if not self._nx_graph:
        msg = "Ontology not loaded."
        logger.error(msg)
        raise ValueError(msg)

    if not self._tail_ontologies:
        msg = f"Showing ontology structure based on {self._head_ontology._ontology_file}"

    else:
        msg = f"Showing ontology structure based on {len(self._tail_ontology_meta) + 1} ontologies: "

    logger.info(msg)

    if not full:
        # set of leaves and their intermediate parents up to the root
        filter_nodes = set(self.mapping.extended_schema.keys())

        for node in self.mapping.extended_schema.keys():
            filter_nodes.update(self.get_ancestors(node).nodes)

        # filter graph
        G = self._nx_graph.subgraph(filter_nodes)

    else:
        G = self._nx_graph

    if not to_disk:
        # create tree
        tree = create_tree_visualisation(G)

        # add synonym information
        for node in self.mapping.extended_schema:
            if not isinstance(self.mapping.extended_schema[node], dict):
                continue
            if self.mapping.extended_schema[node].get("synonym_for"):
                tree.nodes[node].tag = f"{node} = {self.mapping.extended_schema[node].get('synonym_for')}"

        logger.info(f"\n{tree}")

        return tree

    else:
        # convert lists/dicts to strings for vis only
        for node in G.nodes:
            # rename node and use former id as label
            label = G.nodes[node].get("label")

            if not label:
                label = node

            G = nx.relabel_nodes(G, {node: label})
            G.nodes[label]["label"] = node

            for attrib in G.nodes[label]:
                if type(G.nodes[label][attrib]) in [list, dict]:
                    G.nodes[label][attrib] = str(G.nodes[label][attrib])

        path = os.path.join(to_disk, "ontology_structure.graphml")

        logger.info(f"Writing ontology structure to {path}.")

        nx.write_graphml(G, path)

        return True

OntologyAdapter

Class that represents an ontology to be used in the Biocypher framework.

Can read from a variety of formats, including OWL, OBO, and RDF/XML. The ontology is represented by a networkx.DiGraph object; an RDFlib graph is also kept. By default, the DiGraph reverses the label and identifier of the nodes, such that the node name in the graph is the human-readable label. The edges are oriented from child to parent. Labels are formatted in lower sentence case and underscores are replaced by spaces. Identifiers are taken as defined and the prefixes are removed by default.

Source code in biocypher/_ontology.py
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
class OntologyAdapter:
    """Class that represents an ontology to be used in the Biocypher framework.

    Can read from a variety of formats, including OWL, OBO, and RDF/XML. The
    ontology is represented by a networkx.DiGraph object; an RDFlib graph is
    also kept. By default, the DiGraph reverses the label and identifier of the
    nodes, such that the node name in the graph is the human-readable label. The
    edges are oriented from child to parent. Labels are formatted in lower
    sentence case and underscores are replaced by spaces. Identifiers are taken
    as defined and the prefixes are removed by default.
    """

    def __init__(
        self,
        ontology_file: str,
        root_label: str,
        ontology_file_format: str | None = None,
        head_join_node_label: str | None = None,
        merge_nodes: bool | None = True,
        switch_label_and_id: bool = True,
        remove_prefixes: bool = True,
    ):
        """Initialize the OntologyAdapter class.

        Args:
        ----
            ontology_file (str): Path to the ontology file. Can be local or
                remote.

            root_label (str): The label of the root node in the ontology. In
                case of a tail ontology, this is the tail join node.

            ontology_file_format (str): The format of the ontology file (e.g. "application/rdf+xml")
                If format is not passed, it is determined automatically.

            head_join_node_label (str): Optional variable to store the label of the
                node in the head ontology that should be used to join to the
                root node of the tail ontology. Defaults to None.

            merge_nodes (bool): If True, head and tail join nodes will be
                merged, using the label of the head join node. If False, the
                tail join node will be attached as a child of the head join
                node.

            switch_label_and_id (bool): If True, the node names in the graph will be
                the human-readable labels. If False, the node names will be the
                identifiers. Defaults to True.

            remove_prefixes (bool): If True, the prefixes of the identifiers will
                be removed. Defaults to True.

        """
        logger.info(f"Instantiating OntologyAdapter class for {ontology_file}.")

        self._ontology_file = ontology_file
        self._root_label = root_label
        self._format = ontology_file_format
        self._merge_nodes = merge_nodes
        self._head_join_node = head_join_node_label
        self._switch_label_and_id = switch_label_and_id
        self._remove_prefixes = remove_prefixes

        self._rdf_graph = self._load_rdf_graph(ontology_file)

        self._nx_graph = self._rdf_to_nx(self._rdf_graph, root_label, switch_label_and_id)

    def _rdf_to_nx(
        self,
        _rdf_graph: rdflib.Graph,
        root_label: str,
        switch_label_and_id: bool,
        rename_nodes: bool = True,
    ) -> nx.DiGraph:
        one_to_one_triples, one_to_many_dict = self._get_relevant_rdf_triples(_rdf_graph)
        nx_graph = self._convert_to_nx(one_to_one_triples, one_to_many_dict)
        nx_graph = self._add_labels_to_nodes(nx_graph, switch_label_and_id)
        nx_graph = self._change_nodes_to_biocypher_format(nx_graph, switch_label_and_id, rename_nodes)
        nx_graph = self._get_all_ancestors(nx_graph, root_label, switch_label_and_id, rename_nodes)
        return nx.DiGraph(nx_graph)

    def _get_relevant_rdf_triples(self, g: rdflib.Graph) -> tuple:
        one_to_one_inheritance_graph = self._get_one_to_one_inheritance_triples(g)
        intersection = self._get_multiple_inheritance_dict(g)
        return one_to_one_inheritance_graph, intersection

    def _get_one_to_one_inheritance_triples(self, g: rdflib.Graph) -> rdflib.Graph:
        """Get the one to one inheritance triples from the RDF graph.

        Args:
        ----
            g (rdflib.Graph): The RDF graph

        Returns:
        -------
            rdflib.Graph: The one to one inheritance graph

        """
        one_to_one_inheritance_graph = Graph()
        # for s, p, o in g.triples((None, rdflib.RDFS.subClassOf, None)):
        for s, p, o in chain(
            g.triples((None, rdflib.RDFS.subClassOf, None)),  # Node classes
            g.triples((None, rdflib.RDF.type, rdflib.RDFS.Class)),  # Root classes
            g.triples((None, rdflib.RDFS.subPropertyOf, None)),  # OWL "edges" classes
            g.triples((None, rdflib.RDF.type, rdflib.OWL.ObjectProperty)),  # OWL "edges" root classes
        ):
            if self.has_label(s, g):
                one_to_one_inheritance_graph.add((s, p, o))
        return one_to_one_inheritance_graph

    def _get_multiple_inheritance_dict(self, g: rdflib.Graph) -> dict:
        """Get the multiple inheritance dictionary from the RDF graph.

        Args:
        ----
            g (rdflib.Graph): The RDF graph

        Returns:
        -------
            dict: The multiple inheritance dictionary

        """
        multiple_inheritance = g.triples((None, rdflib.OWL.intersectionOf, None))
        intersection = {}
        for (
            node,
            has_multiple_parents,
            first_node_of_intersection_list,
        ) in multiple_inheritance:
            parents = self._retrieve_rdf_linked_list(first_node_of_intersection_list)
            child_name = None
            for s_, _, _ in chain(
                g.triples((None, rdflib.RDFS.subClassOf, node)),
                g.triples((None, rdflib.RDFS.subPropertyOf, node)),
            ):
                child_name = s_

            # Handle Snomed CT post coordinated expressions
            if not child_name:
                for s_, _, _ in g.triples((None, rdflib.OWL.equivalentClass, node)):
                    child_name = s_

            if child_name:
                intersection[node] = {
                    "child_name": child_name,
                    "parent_node_names": parents,
                }
        return intersection

    def has_label(self, node: rdflib.URIRef, g: rdflib.Graph) -> bool:
        """Check if the node has a label in the graph.

        Args:
        ----
            node (rdflib.URIRef): The node to check
            g (rdflib.Graph): The graph to check in
        Returns:
            bool: True if the node has a label, False otherwise

        """
        return (node, rdflib.RDFS.label, None) in g

    def _retrieve_rdf_linked_list(self, subject: rdflib.URIRef) -> list:
        """Recursively retrieve a linked list from RDF.

        Example RDF list with the items [item1, item2]:
        list_node - first -> item1
        list_node - rest -> list_node2
        list_node2 - first -> item2
        list_node2 - rest -> nil

        Args:
        ----
            subject (rdflib.URIRef): One list_node of the RDF list

        Returns:
        -------
            list: The items of the RDF list

        """
        g = self._rdf_graph
        rdf_list = []
        for s, p, o in g.triples((subject, rdflib.RDF.first, None)):
            rdf_list.append(o)
        for s, p, o in g.triples((subject, rdflib.RDF.rest, None)):
            if o != rdflib.RDF.nil:
                rdf_list.extend(self._retrieve_rdf_linked_list(o))
        return rdf_list

    def _convert_to_nx(self, one_to_one: rdflib.Graph, one_to_many: dict) -> nx.DiGraph:
        """Convert the one to one and one to many inheritance graphs to networkx.

        Args:
        ----
            one_to_one (rdflib.Graph): The one to one inheritance graph
            one_to_many (dict): The one to many inheritance dictionary

        Returns:
        -------
            nx.DiGraph: The networkx graph

        """
        nx_graph = rdflib_to_networkx_digraph(one_to_one, edge_attrs=lambda s, p, o: {}, calc_weights=False)
        for key, value in one_to_many.items():
            nx_graph.add_edges_from([(value["child_name"], parent) for parent in value["parent_node_names"]])
            if key in nx_graph.nodes:
                nx_graph.remove_node(key)
        return nx_graph

    def _add_labels_to_nodes(self, nx_graph: nx.DiGraph, switch_label_and_id: bool) -> nx.DiGraph:
        """Add labels to the nodes in the networkx graph.

        Args:
        ----
            nx_graph (nx.DiGraph): The networkx graph
            switch_label_and_id (bool): If True, id and label are switched

        Returns:
        -------
            nx.DiGraph: The networkx graph with labels

        """
        for node in list(nx_graph.nodes):
            nx_id, nx_label = self._get_nx_id_and_label(node, switch_label_and_id)
            if nx_id == "none":
                # remove node if it has no id
                nx_graph.remove_node(node)
                continue

            nx_graph.nodes[node]["label"] = nx_label
        return nx_graph

    def _change_nodes_to_biocypher_format(
        self,
        nx_graph: nx.DiGraph,
        switch_label_and_id: bool,
        rename_nodes: bool = True,
    ) -> nx.DiGraph:
        """Change the nodes in the networkx graph to BioCypher format.

        This involves:
            - removing the prefix of the identifier
            - switching the id and label if requested
            - adapting the labels (replace _ with space and convert to lower
                sentence case)
        Args:
        ----
            nx_graph (nx.DiGraph): The networkx graph
            switch_label_and_id (bool): If True, id and label are switched
            rename_nodes (bool): If True, the nodes are renamed

        Returns:
        -------
            nx.DiGraph: The networkx ontology graph in BioCypher format

        """
        mapping = {
            node: self._get_nx_id_and_label(node, switch_label_and_id, rename_nodes)[0] for node in nx_graph.nodes
        }
        renamed = nx.relabel_nodes(nx_graph, mapping, copy=False)
        return renamed

    def _get_all_ancestors(
        self,
        renamed: nx.DiGraph,
        root_label: str,
        switch_label_and_id: bool,
        rename_nodes: bool = True,
    ) -> nx.DiGraph:
        """Get all ancestors of the root node in the networkx graph.

        Args:
        ----
            renamed (nx.DiGraph): The renamed networkx graph
            root_label (str): The label of the root node in the ontology
            switch_label_and_id (bool): If True, id and label are switched
            rename_nodes (bool): If True, the nodes are renamed

        Returns:
        -------
            nx.DiGraph: The filtered networkx graph

        """
        root = self._get_nx_id_and_label(
            self._find_root_label(self._rdf_graph, root_label),
            switch_label_and_id,
            rename_nodes,
        )[0]
        ancestors = nx.ancestors(renamed, root)
        ancestors.add(root)
        filtered_graph = renamed.subgraph(ancestors)
        return filtered_graph

    def _get_nx_id_and_label(self, node, switch_id_and_label: bool, rename_nodes: bool = True) -> tuple[str, str]:
        """Rename node id and label for nx graph.

        Args:
        ----
            node (str): The node to rename
            switch_id_and_label (bool): If True, switch id and label

        Returns:
        -------
            tuple[str, str]: The renamed node id and label

        """
        node_id_str = self._remove_prefix(str(node))
        node_label_str = str(self._rdf_graph.value(node, rdflib.RDFS.label))
        if rename_nodes:
            node_label_str = node_label_str.replace("_", " ")
            node_label_str = to_lower_sentence_case(node_label_str)
        nx_id = node_label_str if switch_id_and_label else node_id_str
        nx_label = node_id_str if switch_id_and_label else node_label_str
        return nx_id, nx_label

    def _find_root_label(self, g, root_label):
        # Loop through all labels in the ontology
        for label_subject, _, label_in_ontology in g.triples((None, rdflib.RDFS.label, None)):
            # If the label is the root label, set the root node to the label's subject
            if str(label_in_ontology) == root_label:
                root = label_subject
                break
        else:
            labels_in_ontology = []
            for label_subject, _, label_in_ontology in g.triples((None, rdflib.RDFS.label, None)):
                labels_in_ontology.append(str(label_in_ontology))
            msg = (
                f"Could not find root node with label '{root_label}'. "
                f"The ontology contains the following labels: {labels_in_ontology}"
            )
            logger.error(msg)
            raise ValueError(msg)
        return root

    def _remove_prefix(self, uri: str) -> str:
        """Remove the prefix of a URI.

        URIs can contain either "#" or "/" as a separator between the prefix
        and the local name. The prefix is everything before the last separator.

        Args:
        ----
            uri (str): The URI to remove the prefix from

        Returns:
        -------
            str: The URI without the prefix

        """
        if self._remove_prefixes:
            return uri.rsplit("#", 1)[-1].rsplit("/", 1)[-1]
        else:
            return uri

    def _load_rdf_graph(self, ontology_file):
        """Load the ontology into an RDFlib graph.

        The ontology file can be in OWL, OBO, or RDF/XML format.

        Args:
        ----
            ontology_file (str): The path to the ontology file

        Returns:
        -------
            rdflib.Graph: The RDFlib graph

        """
        g = rdflib.Graph()
        g.parse(ontology_file, format=self._get_format(ontology_file))
        return g

    def _get_format(self, ontology_file):
        """Get the format of the ontology file."""
        if self._format:
            if self._format == "owl":
                return "application/rdf+xml"
            elif self._format == "obo":
                raise NotImplementedError("OBO format not yet supported")
            elif self._format == "rdf":
                return "application/rdf+xml"
            elif self._format == "ttl":
                return self._format
            else:
                msg = f"Could not determine format of ontology file {ontology_file}"
                logger.error(msg)
                raise ValueError(msg)

        if ontology_file.endswith(".owl"):
            return "application/rdf+xml"
        elif ontology_file.endswith(".obo"):
            msg = "OBO format not yet supported"
            logger.error(msg)
            raise NotImplementedError(msg)
        elif ontology_file.endswith(".rdf"):
            return "application/rdf+xml"
        elif ontology_file.endswith(".ttl"):
            return "ttl"
        else:
            msg = f"Could not determine format of ontology file {ontology_file}"
            logger.error(msg)
            raise ValueError(msg)

    def get_nx_graph(self):
        """Get the networkx graph representing the ontology."""
        return self._nx_graph

    def get_rdf_graph(self):
        """Get the RDFlib graph representing the ontology."""
        return self._rdf_graph

    def get_root_node(self):
        """Get root node in the ontology.

        Returns
        -------
            root_node: If _switch_label_and_id is True, the root node label is
                returned, otherwise the root node id is returned.

        """
        root_node = None
        root_label = self._root_label.replace("_", " ")

        if self._switch_label_and_id:
            root_node = to_lower_sentence_case(root_label)
        elif not self._switch_label_and_id:
            for node, data in self.get_nx_graph().nodes(data=True):
                if "label" in data and data["label"] == to_lower_sentence_case(root_label):
                    root_node = node
                    break

        return root_node

    def get_ancestors(self, node_label):
        """Get the ancestors of a node in the ontology."""
        return nx.dfs_preorder_nodes(self._nx_graph, node_label)

    def get_head_join_node(self):
        """Get the head join node of the ontology."""
        return self._head_join_node

__init__(ontology_file, root_label, ontology_file_format=None, head_join_node_label=None, merge_nodes=True, switch_label_and_id=True, remove_prefixes=True)

Initialize the OntologyAdapter class.


ontology_file (str): Path to the ontology file. Can be local or
    remote.

root_label (str): The label of the root node in the ontology. In
    case of a tail ontology, this is the tail join node.

ontology_file_format (str): The format of the ontology file (e.g. "application/rdf+xml")
    If format is not passed, it is determined automatically.

head_join_node_label (str): Optional variable to store the label of the
    node in the head ontology that should be used to join to the
    root node of the tail ontology. Defaults to None.

merge_nodes (bool): If True, head and tail join nodes will be
    merged, using the label of the head join node. If False, the
    tail join node will be attached as a child of the head join
    node.

switch_label_and_id (bool): If True, the node names in the graph will be
    the human-readable labels. If False, the node names will be the
    identifiers. Defaults to True.

remove_prefixes (bool): If True, the prefixes of the identifiers will
    be removed. Defaults to True.
Source code in biocypher/_ontology.py
def __init__(
    self,
    ontology_file: str,
    root_label: str,
    ontology_file_format: str | None = None,
    head_join_node_label: str | None = None,
    merge_nodes: bool | None = True,
    switch_label_and_id: bool = True,
    remove_prefixes: bool = True,
):
    """Initialize the OntologyAdapter class.

    Args:
    ----
        ontology_file (str): Path to the ontology file. Can be local or
            remote.

        root_label (str): The label of the root node in the ontology. In
            case of a tail ontology, this is the tail join node.

        ontology_file_format (str): The format of the ontology file (e.g. "application/rdf+xml")
            If format is not passed, it is determined automatically.

        head_join_node_label (str): Optional variable to store the label of the
            node in the head ontology that should be used to join to the
            root node of the tail ontology. Defaults to None.

        merge_nodes (bool): If True, head and tail join nodes will be
            merged, using the label of the head join node. If False, the
            tail join node will be attached as a child of the head join
            node.

        switch_label_and_id (bool): If True, the node names in the graph will be
            the human-readable labels. If False, the node names will be the
            identifiers. Defaults to True.

        remove_prefixes (bool): If True, the prefixes of the identifiers will
            be removed. Defaults to True.

    """
    logger.info(f"Instantiating OntologyAdapter class for {ontology_file}.")

    self._ontology_file = ontology_file
    self._root_label = root_label
    self._format = ontology_file_format
    self._merge_nodes = merge_nodes
    self._head_join_node = head_join_node_label
    self._switch_label_and_id = switch_label_and_id
    self._remove_prefixes = remove_prefixes

    self._rdf_graph = self._load_rdf_graph(ontology_file)

    self._nx_graph = self._rdf_to_nx(self._rdf_graph, root_label, switch_label_and_id)

get_ancestors(node_label)

Get the ancestors of a node in the ontology.

Source code in biocypher/_ontology.py
def get_ancestors(self, node_label):
    """Get the ancestors of a node in the ontology."""
    return nx.dfs_preorder_nodes(self._nx_graph, node_label)

get_head_join_node()

Get the head join node of the ontology.

Source code in biocypher/_ontology.py
def get_head_join_node(self):
    """Get the head join node of the ontology."""
    return self._head_join_node

get_nx_graph()

Get the networkx graph representing the ontology.

Source code in biocypher/_ontology.py
def get_nx_graph(self):
    """Get the networkx graph representing the ontology."""
    return self._nx_graph

get_rdf_graph()

Get the RDFlib graph representing the ontology.

Source code in biocypher/_ontology.py
def get_rdf_graph(self):
    """Get the RDFlib graph representing the ontology."""
    return self._rdf_graph

get_root_node()

Get root node in the ontology.

Returns
root_node: If _switch_label_and_id is True, the root node label is
    returned, otherwise the root node id is returned.
Source code in biocypher/_ontology.py
def get_root_node(self):
    """Get root node in the ontology.

    Returns
    -------
        root_node: If _switch_label_and_id is True, the root node label is
            returned, otherwise the root node id is returned.

    """
    root_node = None
    root_label = self._root_label.replace("_", " ")

    if self._switch_label_and_id:
        root_node = to_lower_sentence_case(root_label)
    elif not self._switch_label_and_id:
        for node, data in self.get_nx_graph().nodes(data=True):
            if "label" in data and data["label"] == to_lower_sentence_case(root_label):
                root_node = node
                break

    return root_node

has_label(node, g)

Check if the node has a label in the graph.


node (rdflib.URIRef): The node to check
g (rdflib.Graph): The graph to check in

Returns: bool: True if the node has a label, False otherwise

Source code in biocypher/_ontology.py
def has_label(self, node: rdflib.URIRef, g: rdflib.Graph) -> bool:
    """Check if the node has a label in the graph.

    Args:
    ----
        node (rdflib.URIRef): The node to check
        g (rdflib.Graph): The graph to check in
    Returns:
        bool: True if the node has a label, False otherwise

    """
    return (node, rdflib.RDFS.label, None) in g

translate

BioCypher 'translation' module.

Responsible for translating between the raw input data and the BioCypherNode and BioCypherEdge objects.

Translator

Class responsible for exacting the translation process.

Translation is configured in the schema_config.yaml file. Creates a mapping dictionary from that file, and, given nodes and edges, translates them into BioCypherNodes and BioCypherEdges. During this process, can also filter the properties of the entities if the schema_config.yaml file specifies a property whitelist or blacklist.

Provides utility functions for translating between input and output labels and cypher queries.

Source code in biocypher/_translate.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
class Translator:
    """Class responsible for exacting the translation process.

    Translation is configured in the schema_config.yaml file. Creates a mapping
    dictionary from that file, and, given nodes and edges, translates them into
    BioCypherNodes and BioCypherEdges. During this process, can also filter the
    properties of the entities if the schema_config.yaml file specifies a property
    whitelist or blacklist.

    Provides utility functions for translating between input and output labels
    and cypher queries.
    """

    def __init__(self, ontology: "Ontology", strict_mode: bool = False):
        """Initialise the translator.

        Args:
        ----
            ontology (Ontology): An Ontology object providing schema and mapping details.
            strict_mode:
                strict_mode (bool, optional): If True, enforces that every node and edge carries
                the required 'source', 'licence', and 'version' properties. Raises ValueError
                if these are missing. Defaults to False.


        """
        self.ontology = ontology
        self.strict_mode = strict_mode

        # record nodes without biolink type configured in schema_config.yaml
        self.notype = {}

        # mapping functionality for translating terms and queries
        self.mappings = {}
        self.reverse_mappings = {}

        self._update_ontology_types()

    def translate_entities(self, entities):
        entities = peekable(entities)
        if isinstance(entities.peek(), BioCypherEdge | BioCypherNode | BioCypherRelAsNode):
            translated_entities = entities
        elif len(entities.peek()) < 4:
            translated_entities = self.translate_nodes(entities)
        else:
            translated_entities = self.translate_edges(entities)
        return translated_entities

    def translate_nodes(
        self,
        node_tuples: Iterable,
    ) -> Generator[BioCypherNode, None, None]:
        """Translate input node representation.

        Translate the node tuples to a representation that conforms to the
        schema of the given BioCypher graph. For now requires explicit
        statement of node type on pass.

        Args:
        ----
            node_tuples (list of tuples): collection of tuples
                representing individual nodes by their unique id and a type
                that is translated from the original database notation to
                the corresponding BioCypher notation.

        """
        self._log_begin_translate(node_tuples, "nodes")

        for _id, _type, _props in node_tuples:
            # check for strict mode requirements
            required_props = ["source", "licence", "version"]

            if self.strict_mode:
                # rename 'license' to 'licence' in _props
                if _props.get("license"):
                    _props["licence"] = _props.pop("license")

                for prop in required_props:
                    if prop not in _props:
                        msg = (
                            f"Property `{prop}` missing from node {_id}. "
                            "Strict mode is enabled, so this is not allowed.",
                        )
                        logger.error(msg)
                        raise ValueError(msg)

            # find the node in leaves that represents ontology node type
            _ontology_class = self._get_ontology_mapping(_type)

            if _ontology_class:
                # filter properties for those specified in schema_config if any
                _filtered_props = self._filter_props(_ontology_class, _props)

                # preferred id
                _preferred_id = self._get_preferred_id(_ontology_class)

                yield BioCypherNode(
                    node_id=_id,
                    node_label=_ontology_class,
                    preferred_id=_preferred_id,
                    properties=_filtered_props,
                )

            else:
                self._record_no_type(_type, _id)

        self._log_finish_translate("nodes")

    def _get_preferred_id(self, _bl_type: str) -> str:
        """Return the preferred id for the given Biolink type.

        If the preferred id is not specified in the schema_config.yaml file,
        return "id".
        """
        return (
            self.ontology.mapping.extended_schema[_bl_type]["preferred_id"]
            if "preferred_id" in self.ontology.mapping.extended_schema.get(_bl_type, {})
            else "id"
        )

    def _filter_props(self, bl_type: str, props: dict) -> dict:
        """Filter properties for those specified in schema_config if any.

        If the properties are not specified in the schema_config.yaml file,
        return the original properties.
        """
        filter_props = self.ontology.mapping.extended_schema[bl_type].get("properties", {})

        # strict mode: add required properties (only if there is a whitelist)
        if self.strict_mode and filter_props:
            filter_props.update(
                {"source": "str", "licence": "str", "version": "str"},
            )

        exclude_props = self.ontology.mapping.extended_schema[bl_type].get("exclude_properties", [])

        if isinstance(exclude_props, str):
            exclude_props = [exclude_props]

        if filter_props and exclude_props:
            filtered_props = {k: v for k, v in props.items() if (k in filter_props.keys() and k not in exclude_props)}

        elif filter_props:
            filtered_props = {k: v for k, v in props.items() if k in filter_props.keys()}

        elif exclude_props:
            filtered_props = {k: v for k, v in props.items() if k not in exclude_props}

        else:
            return props

        missing_props = [k for k in filter_props.keys() if k not in filtered_props.keys()]
        # add missing properties with default values
        for k in missing_props:
            filtered_props[k] = None

        return filtered_props

    def translate_edges(
        self,
        edge_tuples: Iterable,
    ) -> Generator[BioCypherEdge | BioCypherRelAsNode, None, None]:
        """Translate input edge representation.

        Translate the edge tuples to a representation that conforms to the
        schema of the given BioCypher graph. For now requires explicit
        statement of edge type on pass.

        Args:
        ----
            edge_tuples (list of tuples):

                collection of tuples representing source and target of
                an interaction via their unique ids as well as the type
                of interaction in the original database notation, which
                is translated to BioCypher notation using the `leaves`.
                Can optionally possess its own ID.

        """
        self._log_begin_translate(edge_tuples, "edges")

        # legacy: deal with 4-tuples (no edge id)
        # TODO remove for performance reasons once safe
        edge_tuples = peekable(edge_tuples)
        if len(edge_tuples.peek()) == 4:
            edge_tuples = [(None, src, tar, typ, props) for src, tar, typ, props in edge_tuples]

        for _id, _src, _tar, _type, _props in edge_tuples:
            # check for strict mode requirements
            if self.strict_mode:
                if "source" not in _props:
                    msg = (
                        f"Edge {_id if _id else (_src, _tar)} does not have a `source` property."
                        " This is required in strict mode.",
                    )
                    logger.error(msg)
                    raise ValueError(msg)
                if "licence" not in _props:
                    msg = (
                        f"Edge {_id if _id else (_src, _tar)} does not have a `licence` property."
                        " This is required in strict mode.",
                    )
                    logger.error(msg)
                    raise ValueError(msg)

            # match the input label (_type) to
            # an ontology label from schema_config
            bl_type = self._get_ontology_mapping(_type)

            if bl_type:
                # filter properties for those specified in schema_config if any
                _filtered_props = self._filter_props(bl_type, _props)

                rep = self.ontology.mapping.extended_schema[bl_type]["represented_as"]

                if rep == "node":
                    if _id:
                        # if it brings its own ID, use it
                        node_id = _id

                    else:
                        # source target concat
                        node_id = str(_src) + "_" + str(_tar) + "_" + "_".join(str(v) for v in _filtered_props.values())

                    n = BioCypherNode(
                        node_id=node_id,
                        node_label=bl_type,
                        properties=_filtered_props,
                    )

                    # directionality check TODO generalise to account for
                    # different descriptions of directionality or find a
                    # more consistent solution for indicating directionality
                    if _filtered_props.get("directed") == True:  # noqa: E712 (seems to not work without '== True')
                        l1 = "IS_SOURCE_OF"
                        l2 = "IS_TARGET_OF"

                    elif _filtered_props.get(
                        "src_role",
                    ) and _filtered_props.get("tar_role"):
                        l1 = _filtered_props.get("src_role")
                        l2 = _filtered_props.get("tar_role")

                    else:
                        l1 = l2 = "IS_PART_OF"

                    e_s = BioCypherEdge(
                        source_id=_src,
                        target_id=node_id,
                        relationship_label=l1,
                        # additional here
                    )

                    e_t = BioCypherEdge(
                        source_id=_tar,
                        target_id=node_id,
                        relationship_label=l2,
                        # additional here
                    )

                    yield BioCypherRelAsNode(n, e_s, e_t)

                else:
                    edge_label = self.ontology.mapping.extended_schema[bl_type].get("label_as_edge")

                    if edge_label is None:
                        edge_label = bl_type

                    yield BioCypherEdge(
                        relationship_id=_id,
                        source_id=_src,
                        target_id=_tar,
                        relationship_label=edge_label,
                        properties=_filtered_props,
                    )

            else:
                self._record_no_type(_type, (_src, _tar))

        self._log_finish_translate("edges")

    def _record_no_type(self, _type: Any, what: Any) -> None:
        """Record the type of a non-represented node or edge.

        In case of an entity that is not represented in the schema_config,
        record the type and the entity.
        """
        logger.error(f"No ontology type defined for `{_type}`: {what}")

        if self.notype.get(_type, None):
            self.notype[_type] += 1

        else:
            self.notype[_type] = 1

    def get_missing_biolink_types(self) -> dict:
        """Return a dictionary of non-represented types.

        The dictionary contains the type as the key and the number of
        occurrences as the value.
        """
        return self.notype

    @staticmethod
    def _log_begin_translate(_input: Iterable, what: str):
        n = f"{len(_input)} " if hasattr(_input, "__len__") else ""

        logger.debug(f"Translating {n}{what} to BioCypher")

    @staticmethod
    def _log_finish_translate(what: str):
        logger.debug(f"Finished translating {what} to BioCypher.")

    def _update_ontology_types(self):
        """Create a dictionary to translate from input to ontology labels.

        If multiple input labels, creates mapping for each.
        """
        self._ontology_mapping = {}

        for key, value in self.ontology.mapping.extended_schema.items():
            labels = value.get("input_label") or value.get("label_in_input")

            if isinstance(labels, str):
                self._ontology_mapping[labels] = key

            elif isinstance(labels, list):
                for label in labels:
                    self._ontology_mapping[label] = key

            if value.get("label_as_edge"):
                self._add_translation_mappings(labels, value["label_as_edge"])

            else:
                self._add_translation_mappings(labels, key)

    def _get_ontology_mapping(self, label: str) -> str | None:
        """Find the ontology class for the given input type.

        For each given input type ("input_label" or "label_in_input"), find the
        corresponding ontology class in the leaves dictionary (from the
        `schema_config.yam`).

        Args:
        ----
            label:
                The input type to find (`input_label` or `label_in_input` in
                `schema_config.yaml`).

        """
        # FIXME does not seem like a necessary function.
        # commented out until behaviour of _update_bl_types is fixed
        return self._ontology_mapping.get(label, None)

    def translate_term(self, term):
        """Translate a single term."""
        return self.mappings.get(term, None)

    def reverse_translate_term(self, term):
        """Reverse translate a single term."""
        return self.reverse_mappings.get(term, None)

    def translate(self, query):
        """Translate a cypher query.

        Only translates labels as of now.
        """
        for key in self.mappings:
            query = query.replace(":" + key, ":" + self.mappings[key])
        return query

    def reverse_translate(self, query):
        """Reverse translate a cypher query.

        Only translates labels as of now.
        """
        for key in self.reverse_mappings:
            a = ":" + key + ")"
            b = ":" + key + "]"
            # TODO this conditional probably does not cover all cases
            if a in query or b in query:
                if isinstance(self.reverse_mappings[key], list):
                    msg = (
                        "Reverse translation of multiple inputs not "
                        "implemented yet. Many-to-one mappings are "
                        "not reversible. "
                        f"({key} -> {self.reverse_mappings[key]})",
                    )
                    logger.error(msg)
                    raise NotImplementedError(msg)
                else:
                    query = query.replace(
                        a,
                        ":" + self.reverse_mappings[key] + ")",
                    ).replace(b, ":" + self.reverse_mappings[key] + "]")
        return query

    def _add_translation_mappings(self, original_name, biocypher_name):
        """Add translation mappings for a label and name.

        We use here the PascalCase version of the BioCypher name, since
        sentence case is not useful for Cypher queries.
        """
        if isinstance(original_name, list):
            for on in original_name:
                self.mappings[on] = self.name_sentence_to_pascal(
                    biocypher_name,
                )
        else:
            self.mappings[original_name] = self.name_sentence_to_pascal(
                biocypher_name,
            )

        if isinstance(biocypher_name, list):
            for bn in biocypher_name:
                self.reverse_mappings[
                    self.name_sentence_to_pascal(
                        bn,
                    )
                ] = original_name
        else:
            self.reverse_mappings[
                self.name_sentence_to_pascal(
                    biocypher_name,
                )
            ] = original_name

    @staticmethod
    def name_sentence_to_pascal(name: str) -> str:
        """Convert a name in sentence case to pascal case."""
        # split on dots if dot is present
        if "." in name:
            return ".".join(
                [_misc.sentencecase_to_pascalcase(n) for n in name.split(".")],
            )
        else:
            return _misc.sentencecase_to_pascalcase(name)

__init__(ontology, strict_mode=False)

Initialise the translator.


ontology (Ontology): An Ontology object providing schema and mapping details.
strict_mode:
    strict_mode (bool, optional): If True, enforces that every node and edge carries
    the required 'source', 'licence', and 'version' properties. Raises ValueError
    if these are missing. Defaults to False.
Source code in biocypher/_translate.py
def __init__(self, ontology: "Ontology", strict_mode: bool = False):
    """Initialise the translator.

    Args:
    ----
        ontology (Ontology): An Ontology object providing schema and mapping details.
        strict_mode:
            strict_mode (bool, optional): If True, enforces that every node and edge carries
            the required 'source', 'licence', and 'version' properties. Raises ValueError
            if these are missing. Defaults to False.


    """
    self.ontology = ontology
    self.strict_mode = strict_mode

    # record nodes without biolink type configured in schema_config.yaml
    self.notype = {}

    # mapping functionality for translating terms and queries
    self.mappings = {}
    self.reverse_mappings = {}

    self._update_ontology_types()

Return a dictionary of non-represented types.

The dictionary contains the type as the key and the number of occurrences as the value.

Source code in biocypher/_translate.py
def get_missing_biolink_types(self) -> dict:
    """Return a dictionary of non-represented types.

    The dictionary contains the type as the key and the number of
    occurrences as the value.
    """
    return self.notype

name_sentence_to_pascal(name) staticmethod

Convert a name in sentence case to pascal case.

Source code in biocypher/_translate.py
@staticmethod
def name_sentence_to_pascal(name: str) -> str:
    """Convert a name in sentence case to pascal case."""
    # split on dots if dot is present
    if "." in name:
        return ".".join(
            [_misc.sentencecase_to_pascalcase(n) for n in name.split(".")],
        )
    else:
        return _misc.sentencecase_to_pascalcase(name)

reverse_translate(query)

Reverse translate a cypher query.

Only translates labels as of now.

Source code in biocypher/_translate.py
def reverse_translate(self, query):
    """Reverse translate a cypher query.

    Only translates labels as of now.
    """
    for key in self.reverse_mappings:
        a = ":" + key + ")"
        b = ":" + key + "]"
        # TODO this conditional probably does not cover all cases
        if a in query or b in query:
            if isinstance(self.reverse_mappings[key], list):
                msg = (
                    "Reverse translation of multiple inputs not "
                    "implemented yet. Many-to-one mappings are "
                    "not reversible. "
                    f"({key} -> {self.reverse_mappings[key]})",
                )
                logger.error(msg)
                raise NotImplementedError(msg)
            else:
                query = query.replace(
                    a,
                    ":" + self.reverse_mappings[key] + ")",
                ).replace(b, ":" + self.reverse_mappings[key] + "]")
    return query

reverse_translate_term(term)

Reverse translate a single term.

Source code in biocypher/_translate.py
def reverse_translate_term(self, term):
    """Reverse translate a single term."""
    return self.reverse_mappings.get(term, None)

translate(query)

Translate a cypher query.

Only translates labels as of now.

Source code in biocypher/_translate.py
def translate(self, query):
    """Translate a cypher query.

    Only translates labels as of now.
    """
    for key in self.mappings:
        query = query.replace(":" + key, ":" + self.mappings[key])
    return query

translate_edges(edge_tuples)

Translate input edge representation.

Translate the edge tuples to a representation that conforms to the schema of the given BioCypher graph. For now requires explicit statement of edge type on pass.


edge_tuples (list of tuples):

    collection of tuples representing source and target of
    an interaction via their unique ids as well as the type
    of interaction in the original database notation, which
    is translated to BioCypher notation using the `leaves`.
    Can optionally possess its own ID.
Source code in biocypher/_translate.py
def translate_edges(
    self,
    edge_tuples: Iterable,
) -> Generator[BioCypherEdge | BioCypherRelAsNode, None, None]:
    """Translate input edge representation.

    Translate the edge tuples to a representation that conforms to the
    schema of the given BioCypher graph. For now requires explicit
    statement of edge type on pass.

    Args:
    ----
        edge_tuples (list of tuples):

            collection of tuples representing source and target of
            an interaction via their unique ids as well as the type
            of interaction in the original database notation, which
            is translated to BioCypher notation using the `leaves`.
            Can optionally possess its own ID.

    """
    self._log_begin_translate(edge_tuples, "edges")

    # legacy: deal with 4-tuples (no edge id)
    # TODO remove for performance reasons once safe
    edge_tuples = peekable(edge_tuples)
    if len(edge_tuples.peek()) == 4:
        edge_tuples = [(None, src, tar, typ, props) for src, tar, typ, props in edge_tuples]

    for _id, _src, _tar, _type, _props in edge_tuples:
        # check for strict mode requirements
        if self.strict_mode:
            if "source" not in _props:
                msg = (
                    f"Edge {_id if _id else (_src, _tar)} does not have a `source` property."
                    " This is required in strict mode.",
                )
                logger.error(msg)
                raise ValueError(msg)
            if "licence" not in _props:
                msg = (
                    f"Edge {_id if _id else (_src, _tar)} does not have a `licence` property."
                    " This is required in strict mode.",
                )
                logger.error(msg)
                raise ValueError(msg)

        # match the input label (_type) to
        # an ontology label from schema_config
        bl_type = self._get_ontology_mapping(_type)

        if bl_type:
            # filter properties for those specified in schema_config if any
            _filtered_props = self._filter_props(bl_type, _props)

            rep = self.ontology.mapping.extended_schema[bl_type]["represented_as"]

            if rep == "node":
                if _id:
                    # if it brings its own ID, use it
                    node_id = _id

                else:
                    # source target concat
                    node_id = str(_src) + "_" + str(_tar) + "_" + "_".join(str(v) for v in _filtered_props.values())

                n = BioCypherNode(
                    node_id=node_id,
                    node_label=bl_type,
                    properties=_filtered_props,
                )

                # directionality check TODO generalise to account for
                # different descriptions of directionality or find a
                # more consistent solution for indicating directionality
                if _filtered_props.get("directed") == True:  # noqa: E712 (seems to not work without '== True')
                    l1 = "IS_SOURCE_OF"
                    l2 = "IS_TARGET_OF"

                elif _filtered_props.get(
                    "src_role",
                ) and _filtered_props.get("tar_role"):
                    l1 = _filtered_props.get("src_role")
                    l2 = _filtered_props.get("tar_role")

                else:
                    l1 = l2 = "IS_PART_OF"

                e_s = BioCypherEdge(
                    source_id=_src,
                    target_id=node_id,
                    relationship_label=l1,
                    # additional here
                )

                e_t = BioCypherEdge(
                    source_id=_tar,
                    target_id=node_id,
                    relationship_label=l2,
                    # additional here
                )

                yield BioCypherRelAsNode(n, e_s, e_t)

            else:
                edge_label = self.ontology.mapping.extended_schema[bl_type].get("label_as_edge")

                if edge_label is None:
                    edge_label = bl_type

                yield BioCypherEdge(
                    relationship_id=_id,
                    source_id=_src,
                    target_id=_tar,
                    relationship_label=edge_label,
                    properties=_filtered_props,
                )

        else:
            self._record_no_type(_type, (_src, _tar))

    self._log_finish_translate("edges")

translate_nodes(node_tuples)

Translate input node representation.

Translate the node tuples to a representation that conforms to the schema of the given BioCypher graph. For now requires explicit statement of node type on pass.


node_tuples (list of tuples): collection of tuples
    representing individual nodes by their unique id and a type
    that is translated from the original database notation to
    the corresponding BioCypher notation.
Source code in biocypher/_translate.py
def translate_nodes(
    self,
    node_tuples: Iterable,
) -> Generator[BioCypherNode, None, None]:
    """Translate input node representation.

    Translate the node tuples to a representation that conforms to the
    schema of the given BioCypher graph. For now requires explicit
    statement of node type on pass.

    Args:
    ----
        node_tuples (list of tuples): collection of tuples
            representing individual nodes by their unique id and a type
            that is translated from the original database notation to
            the corresponding BioCypher notation.

    """
    self._log_begin_translate(node_tuples, "nodes")

    for _id, _type, _props in node_tuples:
        # check for strict mode requirements
        required_props = ["source", "licence", "version"]

        if self.strict_mode:
            # rename 'license' to 'licence' in _props
            if _props.get("license"):
                _props["licence"] = _props.pop("license")

            for prop in required_props:
                if prop not in _props:
                    msg = (
                        f"Property `{prop}` missing from node {_id}. "
                        "Strict mode is enabled, so this is not allowed.",
                    )
                    logger.error(msg)
                    raise ValueError(msg)

        # find the node in leaves that represents ontology node type
        _ontology_class = self._get_ontology_mapping(_type)

        if _ontology_class:
            # filter properties for those specified in schema_config if any
            _filtered_props = self._filter_props(_ontology_class, _props)

            # preferred id
            _preferred_id = self._get_preferred_id(_ontology_class)

            yield BioCypherNode(
                node_id=_id,
                node_label=_ontology_class,
                preferred_id=_preferred_id,
                properties=_filtered_props,
            )

        else:
            self._record_no_type(_type, _id)

    self._log_finish_translate("nodes")

translate_term(term)

Translate a single term.

Source code in biocypher/_translate.py
def translate_term(self, term):
    """Translate a single term."""
    return self.mappings.get(term, None)

workflow

Unified BioCypher Workflow API for knowledge graph workflows.

This module provides a streamlined interface for creating and managing knowledge graphs using the unified Graph representation, with optional schema and ontology support. Designed for both agentic and deterministic workflows.

Design Philosophy

This API is designed with the following principles: 1. Agentic-First: Optimized for LLM agent workflows with computable functions 2. Zero Dependencies: Pure Python implementation for maximum compatibility 3. Future-Proof: Native BioCypher objects enable advanced agentic features 4. Migration-Ready: Wrapper methods provide compatibility with existing tools 5. Progressive Validation: Optional validation and deduplication with flexible modes

Validation and Deduplication

Unlike the legacy BioCypher which enforces strict validation and deduplication, this API provides progressive validation with three modes:

  • "none" (default): No validation or deduplication - maximum flexibility for agents
  • "warn": Log warnings for schema violations and duplicates but continue processing
  • "strict": Enforce schema validation and deduplication - fail fast on violations

This approach allows: - Agents to work with maximum flexibility (no validation overhead) - Deterministic workflows to use validation when needed - Gradual migration from legacy BioCypher (start with "none", add validation later)

Future Migration Path

This module represents the future direction of BioCypher's in-memory graph functionality. The plan is to:

  1. Phase 1 (Current): Keep separate from legacy code, provide compatibility wrappers
  2. Phase 2: Replace legacy in-memory implementations (PandasKG, NetworkxKG)
  3. Phase 3: Add advanced agentic features (computable functions, decision logging)
  4. Phase 4: Integrate with main BioCypher class as unified interface

Agentic Features (Future)

  • Computable functions attached to nodes/edges
  • Decision logging and reasoning traces
  • Counterfactual inference capabilities
  • MCP (Model Context Protocol) interface integration
  • Local graph computation for agent workflows

BioCypherWorkflow

Unified BioCypher interface for knowledge graph workflows.

This class provides a clean, simple API for creating and managing knowledge graphs with optional schema and ontology support. Designed for both agentic and deterministic workflows.

Source code in biocypher/_workflow.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
class BioCypherWorkflow:
    """Unified BioCypher interface for knowledge graph workflows.

    This class provides a clean, simple API for creating and managing
    knowledge graphs with optional schema and ontology support. Designed
    for both agentic and deterministic workflows.
    """

    def __init__(
        self,
        name: str = "workflow_graph",
        directed: bool = True,
        schema: dict[str, Any] | None = None,
        schema_file: str | None = None,
        head_ontology_url: str | None = None,
        validation_mode: str = "none",
        deduplication: bool = False,
    ):
        """Initialize the workflow with a unified graph.

        Args:
            name: Name of the knowledge graph
            directed: Whether the graph is directed (default: True)
            schema: Dictionary defining the knowledge graph schema
            schema_file: Path to YAML schema file
            head_ontology_url: URL to ontology file (defaults to Biolink model)
            validation_mode: Validation level ("none", "warn", "strict")
            deduplication: Whether to enable deduplication (default: False)
        """
        self.graph = Graph(name=name, directed=directed)
        self.name = name
        self.schema = schema
        self.schema_file = schema_file
        self.head_ontology_url = head_ontology_url
        self.validation_mode = validation_mode
        self.deduplication = deduplication

        # Track seen entities for deduplication
        self._seen_nodes = set()
        self._seen_edges = set()

        # Initialize schema if provided
        if schema_file:
            self._load_schema_from_file(schema_file)
        elif schema:
            self._load_schema_from_dict(schema)

    def _load_schema_from_file(self, schema_file: str) -> None:
        """Load schema from YAML file."""
        try:
            with open(schema_file, "r") as f:
                self.schema = yaml.safe_load(f)
            logger.info(f"Loaded schema from {schema_file}")
        except Exception as e:
            logger.warning(f"Could not load schema from {schema_file}: {e}")

    def _load_schema_from_dict(self, schema: dict[str, Any]) -> None:
        """Load schema from dictionary."""
        self.schema = schema
        logger.info("Loaded schema from dictionary")

    # ==================== NODE OPERATIONS ====================

    def add_node(self, node_id: str, node_type: str, **properties) -> bool:
        """Add a node to the knowledge graph.

        Args:
            node_id: Unique identifier for the node
            node_type: Type/category of the node
            **properties: Node properties as keyword arguments

        Returns:
            bool: True if node was added, False if it already exists

        Example:
            workflow.add_node("protein_1", "protein", name="TP53", function="tumor_suppressor")
        """
        # Check for duplicates if deduplication is enabled
        if self.deduplication:
            if node_id in self._seen_nodes:
                if self.validation_mode == "warn":
                    logger.warning(f"Duplicate node ID '{node_id}' detected")
                elif self.validation_mode == "strict":
                    raise ValueError(f"Duplicate node ID '{node_id}' not allowed in strict mode")
                return False
            self._seen_nodes.add(node_id)

        # Validate against schema if validation is enabled
        if self.validation_mode in ["warn", "strict"]:
            is_valid = self.validate_against_schema(node_type, properties)
            if not is_valid:
                if self.validation_mode == "strict":
                    raise ValueError(f"Node '{node_id}' of type '{node_type}' failed schema validation")
                elif self.validation_mode == "warn":
                    logger.warning(f"Node '{node_id}' of type '{node_type}' failed schema validation")

        # Try to add node to graph (Graph class handles its own deduplication)
        result = self.graph.add_node(node_id, node_type, properties)

        # If deduplication is enabled and we're tracking, update our tracking
        if self.deduplication and result:
            self._seen_nodes.add(node_id)

        return result

    def get_node(self, node_id: str) -> Node | None:
        """Get a node by ID.

        Args:
            node_id: Node identifier

        Returns:
            Node object or None if not found
        """
        return self.graph.get_node(node_id)

    def get_nodes(self, node_type: str | None = None) -> list[Node]:
        """Get all nodes, optionally filtered by type.

        Args:
            node_type: Optional filter by node type

        Returns:
            List of Node objects
        """
        return self.graph.get_nodes(node_type)

    def has_node(self, node_id: str) -> bool:
        """Check if a node exists.

        Args:
            node_id: Node identifier

        Returns:
            bool: True if node exists
        """
        return self.graph.has_node(node_id)

    def remove_node(self, node_id: str) -> bool:
        """Remove a node and all its connected edges.

        Args:
            node_id: Node identifier

        Returns:
            bool: True if node was removed, False if not found
        """
        return self.graph.remove_node(node_id)

    # ==================== EDGE OPERATIONS ====================

    def add_edge(self, edge_id: str, edge_type: str, source: str, target: str, **properties) -> bool:
        """Add an edge to the knowledge graph.

        Args:
            edge_id: Unique identifier for the edge
            edge_type: Type/category of the edge
            source: Source node ID
            target: Target node ID
            **properties: Edge properties as keyword arguments

        Returns:
            bool: True if edge was added, False if it already exists

        Example:
            workflow.add_edge("interaction_1", "interaction", "protein_1", "protein_2",
                          confidence=0.8, method="yeast_two_hybrid")
        """
        # Check for duplicates if deduplication is enabled
        if self.deduplication:
            edge_key = (edge_id, edge_type)
            if edge_key in self._seen_edges:
                if self.validation_mode == "warn":
                    logger.warning(f"Duplicate edge ID '{edge_id}' of type '{edge_type}' detected")
                elif self.validation_mode == "strict":
                    raise ValueError(f"Duplicate edge ID '{edge_id}' not allowed in strict mode")
                return False
            self._seen_edges.add(edge_key)

        # Validate against schema if validation is enabled
        if self.validation_mode in ["warn", "strict"]:
            is_valid = self.validate_against_schema(edge_type, properties)
            if not is_valid:
                if self.validation_mode == "strict":
                    raise ValueError(f"Edge '{edge_id}' of type '{edge_type}' failed schema validation")
                elif self.validation_mode == "warn":
                    logger.warning(f"Edge '{edge_id}' of type '{edge_type}' failed schema validation")

        # Try to add edge to graph (Graph class handles its own deduplication)
        result = self.graph.add_edge(edge_id, edge_type, source, target, properties)

        # If deduplication is enabled and we're tracking, update our tracking
        if self.deduplication and result:
            edge_key = (edge_id, edge_type)
            self._seen_edges.add(edge_key)

        return result

    def get_edge(self, edge_id: str) -> Edge | None:
        """Get an edge by ID.

        Args:
            edge_id: Edge identifier

        Returns:
            Edge object or None if not found
        """
        return self.graph.get_edge(edge_id)

    def get_edges(self, edge_type: str | None = None) -> list[Edge]:
        """Get all edges, optionally filtered by type.

        Args:
            edge_type: Optional filter by edge type

        Returns:
            List of Edge objects
        """
        return self.graph.get_edges(edge_type)

    def get_edges_between(self, source: str, target: str, edge_type: str | None = None) -> list[Edge]:
        """Get edges between two nodes.

        Args:
            source: Source node ID
            target: Target node ID
            edge_type: Optional filter by edge type

        Returns:
            List of Edge objects
        """
        return self.graph.get_edges_between(source, target, edge_type)

    def has_edge(self, edge_id: str) -> bool:
        """Check if an edge exists.

        Args:
            edge_id: Edge identifier

        Returns:
            bool: True if edge exists
        """
        return self.graph.has_edge(edge_id)

    def remove_edge(self, edge_id: str) -> bool:
        """Remove an edge from the graph.

        Args:
            edge_id: Edge identifier

        Returns:
            bool: True if edge was removed, False if not found
        """
        return self.graph.remove_edge(edge_id)

    # ==================== HYPEREDGE OPERATIONS ====================

    def add_hyperedge(self, hyperedge_id: str, hyperedge_type: str, nodes: set[str], **properties) -> bool:
        """Add a hyperedge connecting multiple nodes.

        Args:
            hyperedge_id: Unique identifier for the hyperedge
            hyperedge_type: Type/category of the hyperedge
            nodes: Set of node IDs to connect
            **properties: Hyperedge properties as keyword arguments

        Returns:
            bool: True if hyperedge was added, False if it already exists

        Example:
            workflow.add_hyperedge("complex_1", "protein_complex", {"protein_1", "protein_2", "protein_3"},
                               name="transcription_factor_complex")
        """
        return self.graph.add_hyperedge(hyperedge_id, hyperedge_type, nodes, properties)

    def get_hyperedge(self, hyperedge_id: str) -> HyperEdge | None:
        """Get a hyperedge by ID.

        Args:
            hyperedge_id: Hyperedge identifier

        Returns:
            HyperEdge object or None if not found
        """
        return self.graph.get_hyperedge(hyperedge_id)

    def get_hyperedges(self, hyperedge_type: str | None = None) -> list[HyperEdge]:
        """Get all hyperedges, optionally filtered by type.

        Args:
            hyperedge_type: Optional filter by hyperedge type

        Returns:
            List of HyperEdge objects
        """
        return self.graph.get_hyperedges(hyperedge_type)

    def has_hyperedge(self, hyperedge_id: str) -> bool:
        """Check if a hyperedge exists.

        Args:
            hyperedge_id: Hyperedge identifier

        Returns:
            bool: True if hyperedge exists
        """
        return self.graph.has_hyperedge(hyperedge_id)

    # ==================== GRAPH TRAVERSAL ====================

    def get_neighbors(self, node_id: str, direction: str = "both") -> set[str]:
        """Get neighboring nodes.

        Args:
            node_id: Node identifier
            direction: "in", "out", or "both"

        Returns:
            Set of neighboring node IDs
        """
        return self.graph.get_neighbors(node_id, direction)

    def get_connected_edges(self, node_id: str, direction: str = "both") -> list[Edge]:
        """Get edges connected to a node.

        Args:
            node_id: Node identifier
            direction: "in", "out", or "both"

        Returns:
            List of connected Edge objects
        """
        return self.graph.get_connected_edges(node_id, direction)

    def find_paths(self, source: str, target: str, max_length: int = 3) -> list[list[Edge]]:
        """Find all paths between two nodes.

        Args:
            source: Source node ID
            target: Target node ID
            max_length: Maximum path length

        Returns:
            List of paths, each path is a list of Edge objects
        """
        return self.graph.find_paths(source, target, max_length)

    # ==================== QUERY INTERFACE ====================

    def query_nodes(self, node_type: str | None = None) -> list[dict[str, Any]]:
        """Query nodes in the knowledge graph.

        Args:
            node_type: Optional filter by node type

        Returns:
            List of node dictionaries
        """
        nodes = self.graph.get_nodes(node_type)
        return [node.to_dict() for node in nodes]

    def query_edges(self, edge_type: str | None = None) -> list[dict[str, Any]]:
        """Query edges in the knowledge graph.

        Args:
            edge_type: Optional filter by edge type

        Returns:
            List of edge dictionaries
        """
        edges = self.graph.get_edges(edge_type)
        return [edge.to_dict() for edge in edges]

    def query_hyperedges(self, hyperedge_type: str | None = None) -> list[dict[str, Any]]:
        """Query hyperedges in the knowledge graph.

        Args:
            hyperedge_type: Optional filter by hyperedge type

        Returns:
            List of hyperedge dictionaries
        """
        hyperedges = self.graph.get_hyperedges(hyperedge_type)
        return [hyperedge.to_dict() for hyperedge in hyperedges]

    def find_connected_components(self, node_id: str, max_depth: int = 2) -> dict[str, Any]:
        """Find connected components around a node.

        Args:
            node_id: Starting node ID
            max_depth: Maximum depth to explore

        Returns:
            Dictionary with nodes and edges in the component
        """
        if not self.graph.has_node(node_id):
            return {"nodes": [], "edges": [], "hyperedges": []}

        # Collect nodes within max_depth
        component_nodes = {node_id}
        current_level = {node_id}

        for depth in range(max_depth):
            next_level = set()
            for node in current_level:
                neighbors = self.graph.get_neighbors(node)
                next_level.update(neighbors)
            current_level = next_level - component_nodes
            component_nodes.update(current_level)

            if not current_level:
                break

        # Get subgraph
        subgraph = self.graph.get_subgraph(component_nodes)

        return {
            "nodes": [node.to_dict() for node in subgraph.get_nodes()],
            "edges": [edge.to_dict() for edge in subgraph.get_edges()],
            "hyperedges": [hyperedge.to_dict() for hyperedge in subgraph.get_hyperedges()],
            "statistics": subgraph.get_statistics(),
        }

    # ==================== GRAPH ANALYSIS ====================

    def get_statistics(self) -> dict[str, Any]:
        """Get comprehensive graph statistics.

        Returns:
            Dictionary with graph statistics
        """
        return self.graph.get_statistics()

    def get_summary(self) -> dict[str, Any]:
        """Get a human-readable summary of the graph.

        Returns:
            Dictionary with graph summary
        """
        stats = self.graph.get_statistics()

        # Get top node types
        node_types = stats["node_types"]
        top_node_types = sorted(node_types.items(), key=lambda x: x[1], reverse=True)[:5]

        # Get top edge types
        edge_types = stats["edge_types"]
        top_edge_types = sorted(edge_types.items(), key=lambda x: x[1], reverse=True)[:5]

        return {
            "name": self.name,
            "total_nodes": stats["basic"]["nodes"],
            "total_edges": stats["basic"]["edges"],
            "total_hyperedges": stats["basic"]["hyperedges"],
            "top_node_types": top_node_types,
            "top_edge_types": top_edge_types,
            "connectivity": stats["connectivity"],
        }

    # ==================== SCHEMA AND ONTOLOGY SUPPORT ====================

    def get_schema(self) -> dict[str, Any] | None:
        """Get the current schema configuration.

        Returns:
            Dictionary representing the schema or None if no schema
        """
        return self.schema

    def export_schema(self, filepath: str) -> None:
        """Export the current schema to a YAML file.

        Args:
            filepath: Path to save the schema file
        """
        if self.schema:
            with open(filepath, "w") as f:
                yaml.dump(self.schema, f, default_flow_style=False)
            logger.info(f"Schema exported to {filepath}")
        else:
            logger.warning("No schema to export")

    def validate_against_schema(self, node_type: str, properties: dict[str, Any]) -> bool:
        """Validate node properties against schema (if available).

        Args:
            node_type: Type of node to validate
            properties: Properties to validate

        Returns:
            bool: True if valid, False otherwise
        """
        if not self.schema or node_type not in self.schema:
            return True  # No schema or type not in schema, assume valid

        schema_entry = self.schema[node_type]
        if "properties" not in schema_entry:
            return True  # No property constraints

        required_properties = schema_entry["properties"]

        # Check if all required properties are present and have correct types
        for prop_name, prop_type in required_properties.items():
            if prop_name not in properties:
                logger.warning(f"Missing required property '{prop_name}' for node type '{node_type}'")
                return False

            # Check property type
            actual_value = properties[prop_name]
            if not self._validate_property_type(actual_value, prop_type):
                logger.warning(
                    f"Property '{prop_name}' has wrong type. Expected {prop_type}, got {type(actual_value).__name__}"
                )
                return False

        return True

    def _validate_property_type(self, value: Any, expected_type: str) -> bool:
        """Validate that a property value matches the expected type.

        Args:
            value: The actual value
            expected_type: The expected type as string (e.g., 'str', 'int', 'float')

        Returns:
            bool: True if type matches, False otherwise
        """
        type_mapping = {
            "str": str,
            "int": int,
            "float": float,
            "bool": bool,
            "list": list,
            "dict": dict,
        }

        if expected_type not in type_mapping:
            return True  # Unknown type, assume valid

        expected_python_type = type_mapping[expected_type]
        return isinstance(value, expected_python_type)

    # ==================== SERIALIZATION ====================

    def to_json(self) -> str:
        """Export the knowledge graph to JSON format.

        Returns:
            JSON string representation of the graph
        """
        return self.graph.to_json()

    def from_json(self, json_data: str) -> None:
        """Import knowledge graph from JSON format.

        Args:
            json_data: JSON string containing graph data
        """
        data = json.loads(json_data)
        self.graph = Graph.from_dict(data)
        self.name = self.graph.name

    def save(self, filepath: str) -> None:
        """Save the graph to a file.

        Args:
            filepath: Path to save the graph
        """
        with open(filepath, "w") as f:
            f.write(self.to_json())
        logger.info(f"Graph saved to {filepath}")

    def load(self, filepath: str) -> None:
        """Load the graph from a file.

        Args:
            filepath: Path to load the graph from
        """
        with open(filepath, "r") as f:
            json_data = f.read()
        self.from_json(json_data)
        logger.info(f"Graph loaded from {filepath}")

    # ==================== UTILITY METHODS ====================

    def clear(self) -> None:
        """Clear all nodes and edges from the graph."""
        self.graph = Graph(name=self.name, directed=self.graph.directed)
        logger.info("Graph cleared")

    def copy(self) -> "BioCypherWorkflow":
        """Create a copy of the workflow and its graph.

        Returns:
            New BioCypherWorkflow instance
        """
        new_workflow = BioCypherWorkflow(
            name=self.name, directed=self.graph.directed, schema=self.schema, head_ontology_url=self.head_ontology_url
        )
        new_workflow.from_json(self.to_json())
        return new_workflow

    def get_graph(self) -> Graph:
        """Get the underlying Graph object.

        Returns:
            Graph object
        """
        return self.graph

    def __len__(self) -> int:
        """Return the number of nodes in the graph."""
        return len(self.graph)

    def __contains__(self, node_id: str) -> bool:
        """Check if a node exists in the graph."""
        return node_id in self.graph

    def __str__(self) -> str:
        """String representation of the workflow."""
        stats = self.get_statistics()
        return (
            f"BioCypherWorkflow(name='{self.name}', "
            f"nodes={stats['basic']['nodes']}, edges={stats['basic']['edges']}, "
            f"hyperedges={stats['basic']['hyperedges']})"
        )

    def __repr__(self) -> str:
        return self.__str__()

    # ==================== COMPATIBILITY WRAPPER METHODS ====================

    def to_networkx(self):
        """Convert to NetworkX graph for compatibility with existing tools.

        Returns:
            networkx.DiGraph: NetworkX representation of the graph

        Note:
            This method provides compatibility with existing NetworkX-based
            tools while maintaining the native BioCypher object structure.
            Future versions may use this as the primary backend.
        """
        try:
            import networkx as nx
        except ImportError:
            raise ImportError("NetworkX is required for to_networkx() conversion. Install with: pip install networkx")

        g = nx.DiGraph() if self.graph.directed else nx.Graph()

        # Add nodes with properties
        for node in self.graph._nodes.values():
            attrs = node.properties.copy()
            attrs["node_type"] = node.type
            g.add_node(node.id, **attrs)

        # Add edges with properties
        for edge in self.graph._edges.values():
            attrs = edge.properties.copy()
            attrs["edge_type"] = edge.type
            g.add_edge(edge.source, edge.target, **attrs)

        return g

    def to_pandas(self):
        """Convert to Pandas DataFrames for compatibility with existing tools.

        Returns:
            dict[str, pd.DataFrame]: Dictionary of DataFrames, one per node/edge type

        Note:
            This method provides compatibility with existing Pandas-based
            tools while maintaining the native BioCypher object structure.
            Future versions may use this as the primary backend.
        """
        try:
            import pandas as pd
        except ImportError:
            raise ImportError("Pandas is required for to_pandas() conversion. Install with: pip install pandas")

        dfs = {}

        # Create node DataFrames by type
        for node_type, node_ids in self.graph._node_types.items():
            nodes = [self.graph._nodes[node_id] for node_id in node_ids]
            data = []
            for node in nodes:
                row = {"node_id": node.id, "node_type": node.type}
                row.update(node.properties)
                data.append(row)
            dfs[node_type] = pd.DataFrame(data)

        # Create edge DataFrames by type
        for edge_type, edge_ids in self.graph._edge_types.items():
            edges = [self.graph._edges[edge_id] for edge_id in edge_ids]
            data = []
            for edge in edges:
                row = {"edge_id": edge.id, "edge_type": edge.type, "source_id": edge.source, "target_id": edge.target}
                row.update(edge.properties)
                data.append(row)
            dfs[edge_type] = pd.DataFrame(data)

        return dfs

__contains__(node_id)

Check if a node exists in the graph.

Source code in biocypher/_workflow.py
def __contains__(self, node_id: str) -> bool:
    """Check if a node exists in the graph."""
    return node_id in self.graph

__init__(name='workflow_graph', directed=True, schema=None, schema_file=None, head_ontology_url=None, validation_mode='none', deduplication=False)

Initialize the workflow with a unified graph.

Parameters:

Name Type Description Default
name str

Name of the knowledge graph

'workflow_graph'
directed bool

Whether the graph is directed (default: True)

True
schema dict[str, Any] | None

Dictionary defining the knowledge graph schema

None
schema_file str | None

Path to YAML schema file

None
head_ontology_url str | None

URL to ontology file (defaults to Biolink model)

None
validation_mode str

Validation level ("none", "warn", "strict")

'none'
deduplication bool

Whether to enable deduplication (default: False)

False
Source code in biocypher/_workflow.py
def __init__(
    self,
    name: str = "workflow_graph",
    directed: bool = True,
    schema: dict[str, Any] | None = None,
    schema_file: str | None = None,
    head_ontology_url: str | None = None,
    validation_mode: str = "none",
    deduplication: bool = False,
):
    """Initialize the workflow with a unified graph.

    Args:
        name: Name of the knowledge graph
        directed: Whether the graph is directed (default: True)
        schema: Dictionary defining the knowledge graph schema
        schema_file: Path to YAML schema file
        head_ontology_url: URL to ontology file (defaults to Biolink model)
        validation_mode: Validation level ("none", "warn", "strict")
        deduplication: Whether to enable deduplication (default: False)
    """
    self.graph = Graph(name=name, directed=directed)
    self.name = name
    self.schema = schema
    self.schema_file = schema_file
    self.head_ontology_url = head_ontology_url
    self.validation_mode = validation_mode
    self.deduplication = deduplication

    # Track seen entities for deduplication
    self._seen_nodes = set()
    self._seen_edges = set()

    # Initialize schema if provided
    if schema_file:
        self._load_schema_from_file(schema_file)
    elif schema:
        self._load_schema_from_dict(schema)

__len__()

Return the number of nodes in the graph.

Source code in biocypher/_workflow.py
def __len__(self) -> int:
    """Return the number of nodes in the graph."""
    return len(self.graph)

__str__()

String representation of the workflow.

Source code in biocypher/_workflow.py
def __str__(self) -> str:
    """String representation of the workflow."""
    stats = self.get_statistics()
    return (
        f"BioCypherWorkflow(name='{self.name}', "
        f"nodes={stats['basic']['nodes']}, edges={stats['basic']['edges']}, "
        f"hyperedges={stats['basic']['hyperedges']})"
    )

add_edge(edge_id, edge_type, source, target, **properties)

Add an edge to the knowledge graph.

Parameters:

Name Type Description Default
edge_id str

Unique identifier for the edge

required
edge_type str

Type/category of the edge

required
source str

Source node ID

required
target str

Target node ID

required
**properties

Edge properties as keyword arguments

{}

Returns:

Name Type Description
bool bool

True if edge was added, False if it already exists

Example

workflow.add_edge("interaction_1", "interaction", "protein_1", "protein_2", confidence=0.8, method="yeast_two_hybrid")

Source code in biocypher/_workflow.py
def add_edge(self, edge_id: str, edge_type: str, source: str, target: str, **properties) -> bool:
    """Add an edge to the knowledge graph.

    Args:
        edge_id: Unique identifier for the edge
        edge_type: Type/category of the edge
        source: Source node ID
        target: Target node ID
        **properties: Edge properties as keyword arguments

    Returns:
        bool: True if edge was added, False if it already exists

    Example:
        workflow.add_edge("interaction_1", "interaction", "protein_1", "protein_2",
                      confidence=0.8, method="yeast_two_hybrid")
    """
    # Check for duplicates if deduplication is enabled
    if self.deduplication:
        edge_key = (edge_id, edge_type)
        if edge_key in self._seen_edges:
            if self.validation_mode == "warn":
                logger.warning(f"Duplicate edge ID '{edge_id}' of type '{edge_type}' detected")
            elif self.validation_mode == "strict":
                raise ValueError(f"Duplicate edge ID '{edge_id}' not allowed in strict mode")
            return False
        self._seen_edges.add(edge_key)

    # Validate against schema if validation is enabled
    if self.validation_mode in ["warn", "strict"]:
        is_valid = self.validate_against_schema(edge_type, properties)
        if not is_valid:
            if self.validation_mode == "strict":
                raise ValueError(f"Edge '{edge_id}' of type '{edge_type}' failed schema validation")
            elif self.validation_mode == "warn":
                logger.warning(f"Edge '{edge_id}' of type '{edge_type}' failed schema validation")

    # Try to add edge to graph (Graph class handles its own deduplication)
    result = self.graph.add_edge(edge_id, edge_type, source, target, properties)

    # If deduplication is enabled and we're tracking, update our tracking
    if self.deduplication and result:
        edge_key = (edge_id, edge_type)
        self._seen_edges.add(edge_key)

    return result

add_hyperedge(hyperedge_id, hyperedge_type, nodes, **properties)

Add a hyperedge connecting multiple nodes.

Parameters:

Name Type Description Default
hyperedge_id str

Unique identifier for the hyperedge

required
hyperedge_type str

Type/category of the hyperedge

required
nodes set[str]

Set of node IDs to connect

required
**properties

Hyperedge properties as keyword arguments

{}

Returns:

Name Type Description
bool bool

True if hyperedge was added, False if it already exists

Example

workflow.add_hyperedge("complex_1", "protein_complex", {"protein_1", "protein_2", "protein_3"}, name="transcription_factor_complex")

Source code in biocypher/_workflow.py
def add_hyperedge(self, hyperedge_id: str, hyperedge_type: str, nodes: set[str], **properties) -> bool:
    """Add a hyperedge connecting multiple nodes.

    Args:
        hyperedge_id: Unique identifier for the hyperedge
        hyperedge_type: Type/category of the hyperedge
        nodes: Set of node IDs to connect
        **properties: Hyperedge properties as keyword arguments

    Returns:
        bool: True if hyperedge was added, False if it already exists

    Example:
        workflow.add_hyperedge("complex_1", "protein_complex", {"protein_1", "protein_2", "protein_3"},
                           name="transcription_factor_complex")
    """
    return self.graph.add_hyperedge(hyperedge_id, hyperedge_type, nodes, properties)

add_node(node_id, node_type, **properties)

Add a node to the knowledge graph.

Parameters:

Name Type Description Default
node_id str

Unique identifier for the node

required
node_type str

Type/category of the node

required
**properties

Node properties as keyword arguments

{}

Returns:

Name Type Description
bool bool

True if node was added, False if it already exists

Example

workflow.add_node("protein_1", "protein", name="TP53", function="tumor_suppressor")

Source code in biocypher/_workflow.py
def add_node(self, node_id: str, node_type: str, **properties) -> bool:
    """Add a node to the knowledge graph.

    Args:
        node_id: Unique identifier for the node
        node_type: Type/category of the node
        **properties: Node properties as keyword arguments

    Returns:
        bool: True if node was added, False if it already exists

    Example:
        workflow.add_node("protein_1", "protein", name="TP53", function="tumor_suppressor")
    """
    # Check for duplicates if deduplication is enabled
    if self.deduplication:
        if node_id in self._seen_nodes:
            if self.validation_mode == "warn":
                logger.warning(f"Duplicate node ID '{node_id}' detected")
            elif self.validation_mode == "strict":
                raise ValueError(f"Duplicate node ID '{node_id}' not allowed in strict mode")
            return False
        self._seen_nodes.add(node_id)

    # Validate against schema if validation is enabled
    if self.validation_mode in ["warn", "strict"]:
        is_valid = self.validate_against_schema(node_type, properties)
        if not is_valid:
            if self.validation_mode == "strict":
                raise ValueError(f"Node '{node_id}' of type '{node_type}' failed schema validation")
            elif self.validation_mode == "warn":
                logger.warning(f"Node '{node_id}' of type '{node_type}' failed schema validation")

    # Try to add node to graph (Graph class handles its own deduplication)
    result = self.graph.add_node(node_id, node_type, properties)

    # If deduplication is enabled and we're tracking, update our tracking
    if self.deduplication and result:
        self._seen_nodes.add(node_id)

    return result

clear()

Clear all nodes and edges from the graph.

Source code in biocypher/_workflow.py
def clear(self) -> None:
    """Clear all nodes and edges from the graph."""
    self.graph = Graph(name=self.name, directed=self.graph.directed)
    logger.info("Graph cleared")

copy()

Create a copy of the workflow and its graph.

Returns:

Type Description
BioCypherWorkflow

New BioCypherWorkflow instance

Source code in biocypher/_workflow.py
def copy(self) -> "BioCypherWorkflow":
    """Create a copy of the workflow and its graph.

    Returns:
        New BioCypherWorkflow instance
    """
    new_workflow = BioCypherWorkflow(
        name=self.name, directed=self.graph.directed, schema=self.schema, head_ontology_url=self.head_ontology_url
    )
    new_workflow.from_json(self.to_json())
    return new_workflow

export_schema(filepath)

Export the current schema to a YAML file.

Parameters:

Name Type Description Default
filepath str

Path to save the schema file

required
Source code in biocypher/_workflow.py
def export_schema(self, filepath: str) -> None:
    """Export the current schema to a YAML file.

    Args:
        filepath: Path to save the schema file
    """
    if self.schema:
        with open(filepath, "w") as f:
            yaml.dump(self.schema, f, default_flow_style=False)
        logger.info(f"Schema exported to {filepath}")
    else:
        logger.warning("No schema to export")

find_connected_components(node_id, max_depth=2)

Find connected components around a node.

Parameters:

Name Type Description Default
node_id str

Starting node ID

required
max_depth int

Maximum depth to explore

2

Returns:

Type Description
dict[str, Any]

Dictionary with nodes and edges in the component

Source code in biocypher/_workflow.py
def find_connected_components(self, node_id: str, max_depth: int = 2) -> dict[str, Any]:
    """Find connected components around a node.

    Args:
        node_id: Starting node ID
        max_depth: Maximum depth to explore

    Returns:
        Dictionary with nodes and edges in the component
    """
    if not self.graph.has_node(node_id):
        return {"nodes": [], "edges": [], "hyperedges": []}

    # Collect nodes within max_depth
    component_nodes = {node_id}
    current_level = {node_id}

    for depth in range(max_depth):
        next_level = set()
        for node in current_level:
            neighbors = self.graph.get_neighbors(node)
            next_level.update(neighbors)
        current_level = next_level - component_nodes
        component_nodes.update(current_level)

        if not current_level:
            break

    # Get subgraph
    subgraph = self.graph.get_subgraph(component_nodes)

    return {
        "nodes": [node.to_dict() for node in subgraph.get_nodes()],
        "edges": [edge.to_dict() for edge in subgraph.get_edges()],
        "hyperedges": [hyperedge.to_dict() for hyperedge in subgraph.get_hyperedges()],
        "statistics": subgraph.get_statistics(),
    }

find_paths(source, target, max_length=3)

Find all paths between two nodes.

Parameters:

Name Type Description Default
source str

Source node ID

required
target str

Target node ID

required
max_length int

Maximum path length

3

Returns:

Type Description
list[list[Edge]]

List of paths, each path is a list of Edge objects

Source code in biocypher/_workflow.py
def find_paths(self, source: str, target: str, max_length: int = 3) -> list[list[Edge]]:
    """Find all paths between two nodes.

    Args:
        source: Source node ID
        target: Target node ID
        max_length: Maximum path length

    Returns:
        List of paths, each path is a list of Edge objects
    """
    return self.graph.find_paths(source, target, max_length)

from_json(json_data)

Import knowledge graph from JSON format.

Parameters:

Name Type Description Default
json_data str

JSON string containing graph data

required
Source code in biocypher/_workflow.py
def from_json(self, json_data: str) -> None:
    """Import knowledge graph from JSON format.

    Args:
        json_data: JSON string containing graph data
    """
    data = json.loads(json_data)
    self.graph = Graph.from_dict(data)
    self.name = self.graph.name

get_connected_edges(node_id, direction='both')

Get edges connected to a node.

Parameters:

Name Type Description Default
node_id str

Node identifier

required
direction str

"in", "out", or "both"

'both'

Returns:

Type Description
list[Edge]

List of connected Edge objects

Source code in biocypher/_workflow.py
def get_connected_edges(self, node_id: str, direction: str = "both") -> list[Edge]:
    """Get edges connected to a node.

    Args:
        node_id: Node identifier
        direction: "in", "out", or "both"

    Returns:
        List of connected Edge objects
    """
    return self.graph.get_connected_edges(node_id, direction)

get_edge(edge_id)

Get an edge by ID.

Parameters:

Name Type Description Default
edge_id str

Edge identifier

required

Returns:

Type Description
Edge | None

Edge object or None if not found

Source code in biocypher/_workflow.py
def get_edge(self, edge_id: str) -> Edge | None:
    """Get an edge by ID.

    Args:
        edge_id: Edge identifier

    Returns:
        Edge object or None if not found
    """
    return self.graph.get_edge(edge_id)

get_edges(edge_type=None)

Get all edges, optionally filtered by type.

Parameters:

Name Type Description Default
edge_type str | None

Optional filter by edge type

None

Returns:

Type Description
list[Edge]

List of Edge objects

Source code in biocypher/_workflow.py
def get_edges(self, edge_type: str | None = None) -> list[Edge]:
    """Get all edges, optionally filtered by type.

    Args:
        edge_type: Optional filter by edge type

    Returns:
        List of Edge objects
    """
    return self.graph.get_edges(edge_type)

get_edges_between(source, target, edge_type=None)

Get edges between two nodes.

Parameters:

Name Type Description Default
source str

Source node ID

required
target str

Target node ID

required
edge_type str | None

Optional filter by edge type

None

Returns:

Type Description
list[Edge]

List of Edge objects

Source code in biocypher/_workflow.py
def get_edges_between(self, source: str, target: str, edge_type: str | None = None) -> list[Edge]:
    """Get edges between two nodes.

    Args:
        source: Source node ID
        target: Target node ID
        edge_type: Optional filter by edge type

    Returns:
        List of Edge objects
    """
    return self.graph.get_edges_between(source, target, edge_type)

get_graph()

Get the underlying Graph object.

Returns:

Type Description
Graph

Graph object

Source code in biocypher/_workflow.py
def get_graph(self) -> Graph:
    """Get the underlying Graph object.

    Returns:
        Graph object
    """
    return self.graph

get_hyperedge(hyperedge_id)

Get a hyperedge by ID.

Parameters:

Name Type Description Default
hyperedge_id str

Hyperedge identifier

required

Returns:

Type Description
HyperEdge | None

HyperEdge object or None if not found

Source code in biocypher/_workflow.py
def get_hyperedge(self, hyperedge_id: str) -> HyperEdge | None:
    """Get a hyperedge by ID.

    Args:
        hyperedge_id: Hyperedge identifier

    Returns:
        HyperEdge object or None if not found
    """
    return self.graph.get_hyperedge(hyperedge_id)

get_hyperedges(hyperedge_type=None)

Get all hyperedges, optionally filtered by type.

Parameters:

Name Type Description Default
hyperedge_type str | None

Optional filter by hyperedge type

None

Returns:

Type Description
list[HyperEdge]

List of HyperEdge objects

Source code in biocypher/_workflow.py
def get_hyperedges(self, hyperedge_type: str | None = None) -> list[HyperEdge]:
    """Get all hyperedges, optionally filtered by type.

    Args:
        hyperedge_type: Optional filter by hyperedge type

    Returns:
        List of HyperEdge objects
    """
    return self.graph.get_hyperedges(hyperedge_type)

get_neighbors(node_id, direction='both')

Get neighboring nodes.

Parameters:

Name Type Description Default
node_id str

Node identifier

required
direction str

"in", "out", or "both"

'both'

Returns:

Type Description
set[str]

Set of neighboring node IDs

Source code in biocypher/_workflow.py
def get_neighbors(self, node_id: str, direction: str = "both") -> set[str]:
    """Get neighboring nodes.

    Args:
        node_id: Node identifier
        direction: "in", "out", or "both"

    Returns:
        Set of neighboring node IDs
    """
    return self.graph.get_neighbors(node_id, direction)

get_node(node_id)

Get a node by ID.

Parameters:

Name Type Description Default
node_id str

Node identifier

required

Returns:

Type Description
Node | None

Node object or None if not found

Source code in biocypher/_workflow.py
def get_node(self, node_id: str) -> Node | None:
    """Get a node by ID.

    Args:
        node_id: Node identifier

    Returns:
        Node object or None if not found
    """
    return self.graph.get_node(node_id)

get_nodes(node_type=None)

Get all nodes, optionally filtered by type.

Parameters:

Name Type Description Default
node_type str | None

Optional filter by node type

None

Returns:

Type Description
list[Node]

List of Node objects

Source code in biocypher/_workflow.py
def get_nodes(self, node_type: str | None = None) -> list[Node]:
    """Get all nodes, optionally filtered by type.

    Args:
        node_type: Optional filter by node type

    Returns:
        List of Node objects
    """
    return self.graph.get_nodes(node_type)

get_schema()

Get the current schema configuration.

Returns:

Type Description
dict[str, Any] | None

Dictionary representing the schema or None if no schema

Source code in biocypher/_workflow.py
def get_schema(self) -> dict[str, Any] | None:
    """Get the current schema configuration.

    Returns:
        Dictionary representing the schema or None if no schema
    """
    return self.schema

get_statistics()

Get comprehensive graph statistics.

Returns:

Type Description
dict[str, Any]

Dictionary with graph statistics

Source code in biocypher/_workflow.py
def get_statistics(self) -> dict[str, Any]:
    """Get comprehensive graph statistics.

    Returns:
        Dictionary with graph statistics
    """
    return self.graph.get_statistics()

get_summary()

Get a human-readable summary of the graph.

Returns:

Type Description
dict[str, Any]

Dictionary with graph summary

Source code in biocypher/_workflow.py
def get_summary(self) -> dict[str, Any]:
    """Get a human-readable summary of the graph.

    Returns:
        Dictionary with graph summary
    """
    stats = self.graph.get_statistics()

    # Get top node types
    node_types = stats["node_types"]
    top_node_types = sorted(node_types.items(), key=lambda x: x[1], reverse=True)[:5]

    # Get top edge types
    edge_types = stats["edge_types"]
    top_edge_types = sorted(edge_types.items(), key=lambda x: x[1], reverse=True)[:5]

    return {
        "name": self.name,
        "total_nodes": stats["basic"]["nodes"],
        "total_edges": stats["basic"]["edges"],
        "total_hyperedges": stats["basic"]["hyperedges"],
        "top_node_types": top_node_types,
        "top_edge_types": top_edge_types,
        "connectivity": stats["connectivity"],
    }

has_edge(edge_id)

Check if an edge exists.

Parameters:

Name Type Description Default
edge_id str

Edge identifier

required

Returns:

Name Type Description
bool bool

True if edge exists

Source code in biocypher/_workflow.py
def has_edge(self, edge_id: str) -> bool:
    """Check if an edge exists.

    Args:
        edge_id: Edge identifier

    Returns:
        bool: True if edge exists
    """
    return self.graph.has_edge(edge_id)

has_hyperedge(hyperedge_id)

Check if a hyperedge exists.

Parameters:

Name Type Description Default
hyperedge_id str

Hyperedge identifier

required

Returns:

Name Type Description
bool bool

True if hyperedge exists

Source code in biocypher/_workflow.py
def has_hyperedge(self, hyperedge_id: str) -> bool:
    """Check if a hyperedge exists.

    Args:
        hyperedge_id: Hyperedge identifier

    Returns:
        bool: True if hyperedge exists
    """
    return self.graph.has_hyperedge(hyperedge_id)

has_node(node_id)

Check if a node exists.

Parameters:

Name Type Description Default
node_id str

Node identifier

required

Returns:

Name Type Description
bool bool

True if node exists

Source code in biocypher/_workflow.py
def has_node(self, node_id: str) -> bool:
    """Check if a node exists.

    Args:
        node_id: Node identifier

    Returns:
        bool: True if node exists
    """
    return self.graph.has_node(node_id)

load(filepath)

Load the graph from a file.

Parameters:

Name Type Description Default
filepath str

Path to load the graph from

required
Source code in biocypher/_workflow.py
def load(self, filepath: str) -> None:
    """Load the graph from a file.

    Args:
        filepath: Path to load the graph from
    """
    with open(filepath, "r") as f:
        json_data = f.read()
    self.from_json(json_data)
    logger.info(f"Graph loaded from {filepath}")

query_edges(edge_type=None)

Query edges in the knowledge graph.

Parameters:

Name Type Description Default
edge_type str | None

Optional filter by edge type

None

Returns:

Type Description
list[dict[str, Any]]

List of edge dictionaries

Source code in biocypher/_workflow.py
def query_edges(self, edge_type: str | None = None) -> list[dict[str, Any]]:
    """Query edges in the knowledge graph.

    Args:
        edge_type: Optional filter by edge type

    Returns:
        List of edge dictionaries
    """
    edges = self.graph.get_edges(edge_type)
    return [edge.to_dict() for edge in edges]

query_hyperedges(hyperedge_type=None)

Query hyperedges in the knowledge graph.

Parameters:

Name Type Description Default
hyperedge_type str | None

Optional filter by hyperedge type

None

Returns:

Type Description
list[dict[str, Any]]

List of hyperedge dictionaries

Source code in biocypher/_workflow.py
def query_hyperedges(self, hyperedge_type: str | None = None) -> list[dict[str, Any]]:
    """Query hyperedges in the knowledge graph.

    Args:
        hyperedge_type: Optional filter by hyperedge type

    Returns:
        List of hyperedge dictionaries
    """
    hyperedges = self.graph.get_hyperedges(hyperedge_type)
    return [hyperedge.to_dict() for hyperedge in hyperedges]

query_nodes(node_type=None)

Query nodes in the knowledge graph.

Parameters:

Name Type Description Default
node_type str | None

Optional filter by node type

None

Returns:

Type Description
list[dict[str, Any]]

List of node dictionaries

Source code in biocypher/_workflow.py
def query_nodes(self, node_type: str | None = None) -> list[dict[str, Any]]:
    """Query nodes in the knowledge graph.

    Args:
        node_type: Optional filter by node type

    Returns:
        List of node dictionaries
    """
    nodes = self.graph.get_nodes(node_type)
    return [node.to_dict() for node in nodes]

remove_edge(edge_id)

Remove an edge from the graph.

Parameters:

Name Type Description Default
edge_id str

Edge identifier

required

Returns:

Name Type Description
bool bool

True if edge was removed, False if not found

Source code in biocypher/_workflow.py
def remove_edge(self, edge_id: str) -> bool:
    """Remove an edge from the graph.

    Args:
        edge_id: Edge identifier

    Returns:
        bool: True if edge was removed, False if not found
    """
    return self.graph.remove_edge(edge_id)

remove_node(node_id)

Remove a node and all its connected edges.

Parameters:

Name Type Description Default
node_id str

Node identifier

required

Returns:

Name Type Description
bool bool

True if node was removed, False if not found

Source code in biocypher/_workflow.py
def remove_node(self, node_id: str) -> bool:
    """Remove a node and all its connected edges.

    Args:
        node_id: Node identifier

    Returns:
        bool: True if node was removed, False if not found
    """
    return self.graph.remove_node(node_id)

save(filepath)

Save the graph to a file.

Parameters:

Name Type Description Default
filepath str

Path to save the graph

required
Source code in biocypher/_workflow.py
def save(self, filepath: str) -> None:
    """Save the graph to a file.

    Args:
        filepath: Path to save the graph
    """
    with open(filepath, "w") as f:
        f.write(self.to_json())
    logger.info(f"Graph saved to {filepath}")

to_json()

Export the knowledge graph to JSON format.

Returns:

Type Description
str

JSON string representation of the graph

Source code in biocypher/_workflow.py
def to_json(self) -> str:
    """Export the knowledge graph to JSON format.

    Returns:
        JSON string representation of the graph
    """
    return self.graph.to_json()

to_networkx()

Convert to NetworkX graph for compatibility with existing tools.

Returns:

Type Description

networkx.DiGraph: NetworkX representation of the graph

Note

This method provides compatibility with existing NetworkX-based tools while maintaining the native BioCypher object structure. Future versions may use this as the primary backend.

Source code in biocypher/_workflow.py
def to_networkx(self):
    """Convert to NetworkX graph for compatibility with existing tools.

    Returns:
        networkx.DiGraph: NetworkX representation of the graph

    Note:
        This method provides compatibility with existing NetworkX-based
        tools while maintaining the native BioCypher object structure.
        Future versions may use this as the primary backend.
    """
    try:
        import networkx as nx
    except ImportError:
        raise ImportError("NetworkX is required for to_networkx() conversion. Install with: pip install networkx")

    g = nx.DiGraph() if self.graph.directed else nx.Graph()

    # Add nodes with properties
    for node in self.graph._nodes.values():
        attrs = node.properties.copy()
        attrs["node_type"] = node.type
        g.add_node(node.id, **attrs)

    # Add edges with properties
    for edge in self.graph._edges.values():
        attrs = edge.properties.copy()
        attrs["edge_type"] = edge.type
        g.add_edge(edge.source, edge.target, **attrs)

    return g

to_pandas()

Convert to Pandas DataFrames for compatibility with existing tools.

Returns:

Type Description

dict[str, pd.DataFrame]: Dictionary of DataFrames, one per node/edge type

Note

This method provides compatibility with existing Pandas-based tools while maintaining the native BioCypher object structure. Future versions may use this as the primary backend.

Source code in biocypher/_workflow.py
def to_pandas(self):
    """Convert to Pandas DataFrames for compatibility with existing tools.

    Returns:
        dict[str, pd.DataFrame]: Dictionary of DataFrames, one per node/edge type

    Note:
        This method provides compatibility with existing Pandas-based
        tools while maintaining the native BioCypher object structure.
        Future versions may use this as the primary backend.
    """
    try:
        import pandas as pd
    except ImportError:
        raise ImportError("Pandas is required for to_pandas() conversion. Install with: pip install pandas")

    dfs = {}

    # Create node DataFrames by type
    for node_type, node_ids in self.graph._node_types.items():
        nodes = [self.graph._nodes[node_id] for node_id in node_ids]
        data = []
        for node in nodes:
            row = {"node_id": node.id, "node_type": node.type}
            row.update(node.properties)
            data.append(row)
        dfs[node_type] = pd.DataFrame(data)

    # Create edge DataFrames by type
    for edge_type, edge_ids in self.graph._edge_types.items():
        edges = [self.graph._edges[edge_id] for edge_id in edge_ids]
        data = []
        for edge in edges:
            row = {"edge_id": edge.id, "edge_type": edge.type, "source_id": edge.source, "target_id": edge.target}
            row.update(edge.properties)
            data.append(row)
        dfs[edge_type] = pd.DataFrame(data)

    return dfs

validate_against_schema(node_type, properties)

Validate node properties against schema (if available).

Parameters:

Name Type Description Default
node_type str

Type of node to validate

required
properties dict[str, Any]

Properties to validate

required

Returns:

Name Type Description
bool bool

True if valid, False otherwise

Source code in biocypher/_workflow.py
def validate_against_schema(self, node_type: str, properties: dict[str, Any]) -> bool:
    """Validate node properties against schema (if available).

    Args:
        node_type: Type of node to validate
        properties: Properties to validate

    Returns:
        bool: True if valid, False otherwise
    """
    if not self.schema or node_type not in self.schema:
        return True  # No schema or type not in schema, assume valid

    schema_entry = self.schema[node_type]
    if "properties" not in schema_entry:
        return True  # No property constraints

    required_properties = schema_entry["properties"]

    # Check if all required properties are present and have correct types
    for prop_name, prop_type in required_properties.items():
        if prop_name not in properties:
            logger.warning(f"Missing required property '{prop_name}' for node type '{node_type}'")
            return False

        # Check property type
        actual_value = properties[prop_name]
        if not self._validate_property_type(actual_value, prop_type):
            logger.warning(
                f"Property '{prop_name}' has wrong type. Expected {prop_type}, got {type(actual_value).__name__}"
            )
            return False

    return True

create_workflow(name='knowledge_graph', directed=True, schema=None, schema_file=None, head_ontology_url=None, validation_mode='none', deduplication=False)

Create a new knowledge graph workflow.

Parameters:

Name Type Description Default
name str

Name of the knowledge graph

'knowledge_graph'
directed bool

Whether the graph is directed

True
schema dict[str, Any] | None

Dictionary defining the knowledge graph schema

None
schema_file str | None

Path to YAML schema file

None
head_ontology_url str | None

URL to ontology file

None
validation_mode str

Validation level ("none", "warn", "strict")

'none'
deduplication bool

Whether to enable deduplication

False

Returns:

Type Description
BioCypherWorkflow

BioCypherWorkflow instance

Source code in biocypher/_workflow.py
def create_workflow(
    name: str = "knowledge_graph",
    directed: bool = True,
    schema: dict[str, Any] | None = None,
    schema_file: str | None = None,
    head_ontology_url: str | None = None,
    validation_mode: str = "none",
    deduplication: bool = False,
) -> BioCypherWorkflow:
    """Create a new knowledge graph workflow.

    Args:
        name: Name of the knowledge graph
        directed: Whether the graph is directed
        schema: Dictionary defining the knowledge graph schema
        schema_file: Path to YAML schema file
        head_ontology_url: URL to ontology file
        validation_mode: Validation level ("none", "warn", "strict")
        deduplication: Whether to enable deduplication

    Returns:
        BioCypherWorkflow instance
    """
    return BioCypherWorkflow(
        name=name,
        directed=directed,
        schema=schema,
        schema_file=schema_file,
        head_ontology_url=head_ontology_url,
        validation_mode=validation_mode,
        deduplication=deduplication,
    )