ollama_request.modules.ollama_request_module
1from __future__ import annotations 2 3import traceback 4from typing import Type 5 6from aaambos.core.supervision.run_time_manager import ControlMsg 7from attrs import define 8from ollama import AsyncClient 9 10from aaambos.core.configuration.module_config import ModuleConfig 11from aaambos.core.module.base import Module, ModuleInfo 12from aaambos.core.module.feature import FeatureNecessity, Feature, SimpleFeatureNecessity 13 14from ollama_request.conventions.definitions import OllamaResponseComFeatureOut, OllamaRequestComFeatureIn, \ 15 OllamaRequestComFeatureOut, OllamaResponseComFeatureIn, OLLAMA_REQUEST, OllamaRequest, OllamaRequestType, \ 16 OLLAMA_RESPONSE, OllamaResponse, OllamaResponseStatus 17from ollama_request.utils import get_response 18 19 20class OllamaRequestModule(Module, ModuleInfo): 21 config: OllamaRequestModuleConfig 22 23 @classmethod 24 def provides_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]: 25 return { 26 OllamaResponseComFeatureOut.name: (OllamaResponseComFeatureOut, SimpleFeatureNecessity.Required), 27 OllamaRequestComFeatureIn.name: (OllamaRequestComFeatureIn, SimpleFeatureNecessity.Required) 28 } 29 30 @classmethod 31 def requires_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]: 32 return { 33 OllamaResponseComFeatureIn.name: (OllamaResponseComFeatureIn, SimpleFeatureNecessity.Required), 34 OllamaRequestComFeatureOut.name: (OllamaRequestComFeatureOut, SimpleFeatureNecessity.Required) 35 } 36 37 @classmethod 38 def get_module_config_class(cls) -> Type[ModuleConfig]: 39 return OllamaRequestModuleConfig 40 41 async def initialize(self): 42 self.ollama_instance_clients = {} 43 self.ollama_base_url = f"http:{'s' if self.config.use_ssl else ''}//{self.config.ollama_ip}:{self.config.ollama_port}" 44 45 await self.com.register_callback_for_promise(self.prm[OLLAMA_REQUEST], self.handle_ollama_request) 46 47 def get_client(self, host: str | None) -> AsyncClient: 48 if not host: 49 host = self.ollama_base_url 50 51 if host not in self.ollama_instance_clients: 52 self.ollama_instance_clients[host] = AsyncClient(host=host) 53 return self.ollama_instance_clients[host] 54 55 async def handle_ollama_request(self, topic, request: OllamaRequest): 56 client = self.get_client(request.ollama_host) 57 response = await get_response(client, request) 58 await self.com.send(self.tpc[OLLAMA_RESPONSE], response) 59 60 async def step(self): 61 pass 62 63 def terminate(self, control_msg: ControlMsg = None) -> int: 64 exit_code = super().terminate() 65 return exit_code 66 67 68@define(kw_only=True) 69class OllamaRequestModuleConfig(ModuleConfig): 70 module_path: str = "ollama_request.modules.ollama_request_module" 71 module_info: Type[ModuleInfo] = OllamaRequestModule 72 restart_after_failure: bool = True 73 expected_start_up_time: float | int = 0 74 ollama_ip: str = "localhost" 75 ollama_port: int = 11434 76 use_ssl: bool = False 77 78 79def provide_module(): 80 return OllamaRequestModule
21class OllamaRequestModule(Module, ModuleInfo): 22 config: OllamaRequestModuleConfig 23 24 @classmethod 25 def provides_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]: 26 return { 27 OllamaResponseComFeatureOut.name: (OllamaResponseComFeatureOut, SimpleFeatureNecessity.Required), 28 OllamaRequestComFeatureIn.name: (OllamaRequestComFeatureIn, SimpleFeatureNecessity.Required) 29 } 30 31 @classmethod 32 def requires_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]: 33 return { 34 OllamaResponseComFeatureIn.name: (OllamaResponseComFeatureIn, SimpleFeatureNecessity.Required), 35 OllamaRequestComFeatureOut.name: (OllamaRequestComFeatureOut, SimpleFeatureNecessity.Required) 36 } 37 38 @classmethod 39 def get_module_config_class(cls) -> Type[ModuleConfig]: 40 return OllamaRequestModuleConfig 41 42 async def initialize(self): 43 self.ollama_instance_clients = {} 44 self.ollama_base_url = f"http:{'s' if self.config.use_ssl else ''}//{self.config.ollama_ip}:{self.config.ollama_port}" 45 46 await self.com.register_callback_for_promise(self.prm[OLLAMA_REQUEST], self.handle_ollama_request) 47 48 def get_client(self, host: str | None) -> AsyncClient: 49 if not host: 50 host = self.ollama_base_url 51 52 if host not in self.ollama_instance_clients: 53 self.ollama_instance_clients[host] = AsyncClient(host=host) 54 return self.ollama_instance_clients[host] 55 56 async def handle_ollama_request(self, topic, request: OllamaRequest): 57 client = self.get_client(request.ollama_host) 58 response = await get_response(client, request) 59 await self.com.send(self.tpc[OLLAMA_RESPONSE], response) 60 61 async def step(self): 62 pass 63 64 def terminate(self, control_msg: ControlMsg = None) -> int: 65 exit_code = super().terminate() 66 return exit_code
The base class for a module.
A module is part of the architecture / system that handles a specific part, It communicates via a communications service with other modules. Features define the requirements and what the module provides for the other modules. It can have extensions (defined via features).
Attributes: name: com: (CommunicationService) ext: all extensions of the module in a dict keyed by the extension names. Extensions can also be passed via kwarg to the init if the attribute in the ExtensionFeature is set. log: tpc: wpr:
Examples:
24 @classmethod 25 def provides_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]: 26 return { 27 OllamaResponseComFeatureOut.name: (OllamaResponseComFeatureOut, SimpleFeatureNecessity.Required), 28 OllamaRequestComFeatureIn.name: (OllamaRequestComFeatureIn, SimpleFeatureNecessity.Required) 29 }
Define/Provide the features that the module provides (com output, etc.)
Args: config: the module config for config related feature turn on / off.
Returns: the features keyed by their name in a dict with the feature necessity (can it be turned on or off).
31 @classmethod 32 def requires_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]: 33 return { 34 OllamaResponseComFeatureIn.name: (OllamaResponseComFeatureIn, SimpleFeatureNecessity.Required), 35 OllamaRequestComFeatureOut.name: (OllamaRequestComFeatureOut, SimpleFeatureNecessity.Required) 36 }
Define/Provide the features that the module requires (com input, extensions, etc.)
Args: config: the module config for config related feature turn on / off.
Returns: the features keyed by their name in a dict with the feature necessity (can it be turned on or off).
38 @classmethod 39 def get_module_config_class(cls) -> Type[ModuleConfig]: 40 return OllamaRequestModuleConfig
Provide the config class. Overwrite when the module requires another ModuleConfig (additional parameters).
Returns: the module config class.
42 async def initialize(self): 43 self.ollama_instance_clients = {} 44 self.ollama_base_url = f"http:{'s' if self.config.use_ssl else ''}//{self.config.ollama_ip}:{self.config.ollama_port}" 45 46 await self.com.register_callback_for_promise(self.prm[OLLAMA_REQUEST], self.handle_ollama_request)
Implement this method that should happen during initialization (callback registration, etc.)
Returns: None
Is called every step of the module. Can be a Generator method but can also be a normal method.
Implement it in your module.
Returns/Yields: None
64 def terminate(self, control_msg: ControlMsg = None) -> int: 65 exit_code = super().terminate() 66 return exit_code
Is called during termination. Clean up etc. Is not an async method.
Args: control_msg: can contain information about the reason for termination.
Returns: the exit code of the module.
69@define(kw_only=True) 70class OllamaRequestModuleConfig(ModuleConfig): 71 module_path: str = "ollama_request.modules.ollama_request_module" 72 module_info: Type[ModuleInfo] = OllamaRequestModule 73 restart_after_failure: bool = True 74 expected_start_up_time: float | int = 0 75 ollama_ip: str = "localhost" 76 ollama_port: int = 11434 77 use_ssl: bool = False
2def __init__(self, *, module_import_delay=attr_dict['module_import_delay'].default, module_start_delay=attr_dict['module_start_delay'].default, mean_frequency_step=attr_dict['mean_frequency_step'].default, general_run_config=attr_dict['general_run_config'].default, name=attr_dict['name'].default, ft_info=attr_dict['ft_info'].default, com_info=attr_dict['com_info'].default, module_extension_setups=attr_dict['module_extension_setups'].default, logging=attr_dict['logging'].default, extra_delay=attr_dict['extra_delay'].default, module_path=attr_dict['module_path'].default, module_info=attr_dict['module_info'].default, restart_after_failure=attr_dict['restart_after_failure'].default, expected_start_up_time=attr_dict['expected_start_up_time'].default, ollama_ip=attr_dict['ollama_ip'].default, ollama_port=attr_dict['ollama_port'].default, use_ssl=attr_dict['use_ssl'].default): 3 self.module_import_delay = module_import_delay 4 self.module_start_delay = module_start_delay 5 self.mean_frequency_step = mean_frequency_step 6 self.general_run_config = general_run_config 7 self.name = name 8 self.ft_info = ft_info 9 self.com_info = com_info 10 self.module_extension_setups = module_extension_setups 11 self.logging = logging 12 self.extra_delay = extra_delay 13 self.module_path = module_path 14 self.module_info = module_info 15 self.restart_after_failure = restart_after_failure 16 self.expected_start_up_time = expected_start_up_time 17 self.ollama_ip = ollama_ip 18 self.ollama_port = ollama_port 19 self.use_ssl = use_ssl
Method generated by attrs for class OllamaRequestModuleConfig.