|
15 | 15 |
|
16 | 16 |
|
17 | 17 | class DenseEmbedder(BaseEmbedder): |
18 | | - """Singleton dense embedder using OpenAI text-embedding-3-large (3072 dims). |
| 18 | + """OpenAI dense embedder with dynamic model selection (non-singleton). |
| 19 | +
|
| 20 | + IMPORTANT: No longer a singleton! Each collection may use different embedding models, |
| 21 | + so we create fresh instances with the correct model for each sync/search operation. |
19 | 22 |
|
20 | 23 | Features: |
21 | | - - Singleton shared across all syncs in pod |
| 24 | + - Dynamic model selection based on vector_size (3072 or 1536) |
22 | 25 | - Batch processing with OpenAI limits (2048 texts/request, 300K tokens/request) |
23 | 26 | - 5 concurrent requests max |
24 | | - - Rate limiting with OpenAIRateLimiter singleton |
| 27 | + - Rate limiting with OpenAIRateLimiter singleton (shared across instances) |
25 | 28 | - Automatic retry on transient errors (via AsyncOpenAI client) |
26 | 29 | - Fail-fast on any API errors (no silent failures) |
27 | 30 | """ |
28 | 31 |
|
29 | | - MODEL_NAME = "text-embedding-3-large" |
30 | | - VECTOR_DIMENSIONS = 3072 |
31 | 32 | MAX_TOKENS_PER_TEXT = 8192 # OpenAI limit per text |
32 | 33 | MAX_BATCH_SIZE = 2048 # OpenAI limit per request |
33 | 34 | MAX_TOKENS_PER_REQUEST = 300000 # OpenAI limit |
34 | 35 | MAX_CONCURRENT_REQUESTS = 5 |
35 | 36 |
|
36 | | - def __init__(self): |
37 | | - """Initialize OpenAI embedder (once per pod).""" |
38 | | - if self._initialized: |
39 | | - return |
| 37 | + def __new__(cls, vector_size: int = None): |
| 38 | + """Override singleton pattern from BaseEmbedder - create fresh instances.""" |
| 39 | + return object.__new__(cls) |
| 40 | + |
| 41 | + def __init__(self, vector_size: int = None): |
| 42 | + """Initialize OpenAI embedder for specific vector dimensions. |
40 | 43 |
|
| 44 | + Args: |
| 45 | + vector_size: Vector dimensions to determine model: |
| 46 | + - 3072: text-embedding-3-large |
| 47 | + - 1536: text-embedding-3-small |
| 48 | + - None: defaults to 3072 (large model) |
| 49 | + """ |
41 | 50 | if not settings.OPENAI_API_KEY: |
42 | 51 | raise SyncFailureError("OPENAI_API_KEY required for dense embeddings") |
43 | 52 |
|
| 53 | + # Fail-fast: vector_size should always be provided from collection |
| 54 | + # Only allow None for backward compatibility, but warn |
| 55 | + if vector_size is None: |
| 56 | + # Fallback to large model but this shouldn't happen |
| 57 | + self.MODEL_NAME = "text-embedding-3-large" |
| 58 | + self.VECTOR_DIMENSIONS = 3072 |
| 59 | + else: |
| 60 | + # Select model and dimensions based on vector_size |
| 61 | + from airweave.platform.destinations.collection_strategy import ( |
| 62 | + get_openai_embedding_model_for_vector_size, |
| 63 | + ) |
| 64 | + |
| 65 | + self.MODEL_NAME = get_openai_embedding_model_for_vector_size(vector_size) |
| 66 | + self.VECTOR_DIMENSIONS = vector_size |
| 67 | + |
| 68 | + # Create fresh client instance |
44 | 69 | self._client = AsyncOpenAI( |
45 | 70 | api_key=settings.OPENAI_API_KEY, |
46 | 71 | timeout=1200.0, # 20 min timeout for high concurrency |
47 | 72 | max_retries=2, |
48 | 73 | ) |
49 | | - self._rate_limiter = OpenAIRateLimiter() # Singleton |
| 74 | + self._rate_limiter = OpenAIRateLimiter() # This singleton is still OK (shared rate limit) |
50 | 75 | self._tokenizer = tiktoken.get_encoding("cl100k_base") |
51 | | - self._initialized = True |
52 | 76 |
|
53 | 77 | async def embed_many(self, texts: List[str], sync_context: SyncContext) -> List[List[float]]: |
54 | 78 | """Embed batch of texts using OpenAI text-embedding-3-large. |
|
0 commit comments