From 044f0537209f2e1329f1d9261b70fb0f6ee119f7 Mon Sep 17 00:00:00 2001 From: Michael Ulianich Date: Tue, 7 Feb 2023 10:21:28 +0200 Subject: [PATCH] Add AbstractCollection --- pymilvus/orm/collection.py | 50 +++++++++++++++++++++++++------------- 1 file changed, 33 insertions(+), 17 deletions(-) diff --git a/pymilvus/orm/collection.py b/pymilvus/orm/collection.py index 288a7347c..00a9a33cd 100644 --- a/pymilvus/orm/collection.py +++ b/pymilvus/orm/collection.py @@ -12,10 +12,11 @@ import copy import json +import typing from typing import List import pandas -from .connections import connections +from .connections import connections, AbstractConnections, Connections as SyncConnections from .schema import ( CollectionSchema, FieldSchema, @@ -46,8 +47,11 @@ from ..client.configs import DefaultConfigs +ConnectionsT = typing.TypeVar('ConnectionsT', bound=AbstractConnections) + +class AbstractCollection(typing.Generic[ConnectionsT]): + connections: ConnectionsT -class Collection: def __init__(self, name: str, schema: CollectionSchema=None, using: str="default", shards_num: int=2, **kwargs): """ Constructs a collection by name, schema and other parameters. @@ -91,17 +95,38 @@ def __init__(self, name: str, schema: CollectionSchema=None, using: str="default self._using = using self._shards_num = shards_num self._kwargs = kwargs + self._init_schema = schema + self._init() + + def _init(self): + raise NotImplementedError + + def _get_connection(self): + return self.connections._fetch_handler(self._using) + + @property + def name(self) -> str: + """str: the name of the collection. """ + return self._name + + +class Collection(AbstractCollection[SyncConnections]): + connections = connections + + def _init(self): + assert not (hasattr(self, '_schema') or hasattr(self, '_schema_dict')), "_init() must be called only once" + schema = self._init_schema + kwargs = self._kwargs conn = self._get_connection() has = conn.has_collection(self._name, **kwargs) if has: resp = conn.describe_collection(self._name, **kwargs) - s_consistency_level = resp.get("consistency_level", DEFAULT_CONSISTENCY_LEVEL) - arg_consistency_level = kwargs.get("consistency_level", s_consistency_level) - if not cmp_consistency_level(s_consistency_level, arg_consistency_level): + consistency_level = resp.get("consistency_level", DEFAULT_CONSISTENCY_LEVEL) + arg_consistency_level = kwargs.get("consistency_level", consistency_level) + if not cmp_consistency_level(consistency_level, arg_consistency_level): raise SchemaNotReadyException(message=ExceptionsMessage.ConsistencyLevelInconsistent) server_schema = CollectionSchema.construct_from_dict(resp) - self._consistency_level = s_consistency_level if schema is None: self._schema = server_schema else: @@ -113,18 +138,17 @@ def __init__(self, name: str, schema: CollectionSchema=None, using: str="default else: if schema is None: - raise SchemaNotReadyException(message=ExceptionsMessage.CollectionNotExistNoSchema % name) + raise SchemaNotReadyException(message=ExceptionsMessage.CollectionNotExistNoSchema % self._name) if isinstance(schema, CollectionSchema): check_schema(schema) consistency_level = get_consistency_level(kwargs.get("consistency_level", DEFAULT_CONSISTENCY_LEVEL)) conn.create_collection(self._name, schema, shards_num=self._shards_num, **kwargs) self._schema = schema - self._consistency_level = consistency_level else: raise SchemaNotReadyException(message=ExceptionsMessage.SchemaType) self._schema_dict = self._schema.to_dict() - self._schema_dict["consistency_level"] = self._consistency_level + self._schema_dict["consistency_level"] = consistency_level def __repr__(self): _dict = { @@ -139,9 +163,6 @@ def __repr__(self): r.append(s.format(k, v)) return "".join(r) - def _get_connection(self): - return connections._fetch_handler(self._using) - @classmethod def construct_from_dataframe(cls, name, dataframe, **kwargs): if dataframe is None: @@ -212,11 +233,6 @@ def description(self) -> str: """str: a text description of the collection. """ return self._schema.description - @property - def name(self) -> str: - """str: the name of the collection. """ - return self._name - @property def is_empty(self) -> bool: """bool: whether the collection is empty or not."""