Skip to content

Commit

Permalink
Add AbstractCollection
Browse files Browse the repository at this point in the history
  • Loading branch information
belkka committed Feb 7, 2023
1 parent a3e457e commit 044f053
Showing 1 changed file with 33 additions and 17 deletions.
50 changes: 33 additions & 17 deletions pymilvus/orm/collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@

import copy
import json
import typing
from typing import List
import pandas

from .connections import connections
from .connections import connections, AbstractConnections, Connections as SyncConnections
from .schema import (
CollectionSchema,
FieldSchema,
Expand Down Expand Up @@ -46,8 +47,11 @@
from ..client.configs import DefaultConfigs


ConnectionsT = typing.TypeVar('ConnectionsT', bound=AbstractConnections)

class AbstractCollection(typing.Generic[ConnectionsT]):
connections: ConnectionsT

class Collection:
def __init__(self, name: str, schema: CollectionSchema=None, using: str="default", shards_num: int=2, **kwargs):
""" Constructs a collection by name, schema and other parameters.
Expand Down Expand Up @@ -91,17 +95,38 @@ def __init__(self, name: str, schema: CollectionSchema=None, using: str="default
self._using = using
self._shards_num = shards_num
self._kwargs = kwargs
self._init_schema = schema
self._init()

def _init(self):
raise NotImplementedError

def _get_connection(self):
return self.connections._fetch_handler(self._using)

@property
def name(self) -> str:
"""str: the name of the collection. """
return self._name


class Collection(AbstractCollection[SyncConnections]):
connections = connections

def _init(self):
assert not (hasattr(self, '_schema') or hasattr(self, '_schema_dict')), "_init() must be called only once"
schema = self._init_schema
kwargs = self._kwargs
conn = self._get_connection()

has = conn.has_collection(self._name, **kwargs)
if has:
resp = conn.describe_collection(self._name, **kwargs)
s_consistency_level = resp.get("consistency_level", DEFAULT_CONSISTENCY_LEVEL)
arg_consistency_level = kwargs.get("consistency_level", s_consistency_level)
if not cmp_consistency_level(s_consistency_level, arg_consistency_level):
consistency_level = resp.get("consistency_level", DEFAULT_CONSISTENCY_LEVEL)
arg_consistency_level = kwargs.get("consistency_level", consistency_level)
if not cmp_consistency_level(consistency_level, arg_consistency_level):
raise SchemaNotReadyException(message=ExceptionsMessage.ConsistencyLevelInconsistent)
server_schema = CollectionSchema.construct_from_dict(resp)
self._consistency_level = s_consistency_level
if schema is None:
self._schema = server_schema
else:
Expand All @@ -113,18 +138,17 @@ def __init__(self, name: str, schema: CollectionSchema=None, using: str="default

else:
if schema is None:
raise SchemaNotReadyException(message=ExceptionsMessage.CollectionNotExistNoSchema % name)
raise SchemaNotReadyException(message=ExceptionsMessage.CollectionNotExistNoSchema % self._name)
if isinstance(schema, CollectionSchema):
check_schema(schema)
consistency_level = get_consistency_level(kwargs.get("consistency_level", DEFAULT_CONSISTENCY_LEVEL))
conn.create_collection(self._name, schema, shards_num=self._shards_num, **kwargs)
self._schema = schema
self._consistency_level = consistency_level
else:
raise SchemaNotReadyException(message=ExceptionsMessage.SchemaType)

self._schema_dict = self._schema.to_dict()
self._schema_dict["consistency_level"] = self._consistency_level
self._schema_dict["consistency_level"] = consistency_level

def __repr__(self):
_dict = {
Expand All @@ -139,9 +163,6 @@ def __repr__(self):
r.append(s.format(k, v))
return "".join(r)

def _get_connection(self):
return connections._fetch_handler(self._using)

@classmethod
def construct_from_dataframe(cls, name, dataframe, **kwargs):
if dataframe is None:
Expand Down Expand Up @@ -212,11 +233,6 @@ def description(self) -> str:
"""str: a text description of the collection. """
return self._schema.description

@property
def name(self) -> str:
"""str: the name of the collection. """
return self._name

@property
def is_empty(self) -> bool:
"""bool: whether the collection is empty or not."""
Expand Down

0 comments on commit 044f053

Please sign in to comment.