ollama_request.modules.ollama_request_module

 1from __future__ import annotations
 2
 3import traceback
 4from typing import Type
 5
 6from aaambos.core.supervision.run_time_manager import ControlMsg
 7from attrs import define
 8from ollama import AsyncClient
 9
10from aaambos.core.configuration.module_config import ModuleConfig
11from aaambos.core.module.base import Module, ModuleInfo
12from aaambos.core.module.feature import FeatureNecessity, Feature, SimpleFeatureNecessity
13
14from ollama_request.conventions.definitions import OllamaResponseComFeatureOut, OllamaRequestComFeatureIn, \
15    OllamaRequestComFeatureOut, OllamaResponseComFeatureIn, OLLAMA_REQUEST, OllamaRequest, OllamaRequestType, \
16    OLLAMA_RESPONSE, OllamaResponse, OllamaResponseStatus
17from ollama_request.utils import get_response
18
19
20class OllamaRequestModule(Module, ModuleInfo):
21    config: OllamaRequestModuleConfig
22
23    @classmethod
24    def provides_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
25        return {
26            OllamaResponseComFeatureOut.name: (OllamaResponseComFeatureOut, SimpleFeatureNecessity.Required),
27            OllamaRequestComFeatureIn.name: (OllamaRequestComFeatureIn, SimpleFeatureNecessity.Required)
28        }
29    
30    @classmethod
31    def requires_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
32        return {
33            OllamaResponseComFeatureIn.name: (OllamaResponseComFeatureIn, SimpleFeatureNecessity.Required),
34            OllamaRequestComFeatureOut.name: (OllamaRequestComFeatureOut, SimpleFeatureNecessity.Required)
35        }
36    
37    @classmethod
38    def get_module_config_class(cls) -> Type[ModuleConfig]:
39        return OllamaRequestModuleConfig
40
41    async def initialize(self):
42        self.ollama_instance_clients = {}
43        self.ollama_base_url = f"http:{'s' if self.config.use_ssl else ''}//{self.config.ollama_ip}:{self.config.ollama_port}"
44        
45        await self.com.register_callback_for_promise(self.prm[OLLAMA_REQUEST], self.handle_ollama_request)
46    
47    def get_client(self, host: str | None) -> AsyncClient:
48        if not host:
49            host = self.ollama_base_url
50            
51        if host not in self.ollama_instance_clients:
52            self.ollama_instance_clients[host] = AsyncClient(host=host)
53        return self.ollama_instance_clients[host]
54    
55    async def handle_ollama_request(self, topic, request: OllamaRequest):
56        client = self.get_client(request.ollama_host)
57        response = await get_response(client, request)
58        await self.com.send(self.tpc[OLLAMA_RESPONSE], response)
59    
60    async def step(self):
61        pass
62
63    def terminate(self, control_msg: ControlMsg = None) -> int:
64        exit_code = super().terminate()
65        return exit_code
66
67
68@define(kw_only=True)
69class OllamaRequestModuleConfig(ModuleConfig):
70    module_path: str = "ollama_request.modules.ollama_request_module"
71    module_info: Type[ModuleInfo] = OllamaRequestModule
72    restart_after_failure: bool = True
73    expected_start_up_time: float | int = 0
74    ollama_ip: str = "localhost"
75    ollama_port: int = 11434
76    use_ssl: bool = False
77
78
79def provide_module():
80    return OllamaRequestModule
class OllamaRequestModule(aaambos.core.module.base.Module, aaambos.core.module.base.ModuleInfo):
21class OllamaRequestModule(Module, ModuleInfo):
22    config: OllamaRequestModuleConfig
23
24    @classmethod
25    def provides_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
26        return {
27            OllamaResponseComFeatureOut.name: (OllamaResponseComFeatureOut, SimpleFeatureNecessity.Required),
28            OllamaRequestComFeatureIn.name: (OllamaRequestComFeatureIn, SimpleFeatureNecessity.Required)
29        }
30    
31    @classmethod
32    def requires_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
33        return {
34            OllamaResponseComFeatureIn.name: (OllamaResponseComFeatureIn, SimpleFeatureNecessity.Required),
35            OllamaRequestComFeatureOut.name: (OllamaRequestComFeatureOut, SimpleFeatureNecessity.Required)
36        }
37    
38    @classmethod
39    def get_module_config_class(cls) -> Type[ModuleConfig]:
40        return OllamaRequestModuleConfig
41
42    async def initialize(self):
43        self.ollama_instance_clients = {}
44        self.ollama_base_url = f"http:{'s' if self.config.use_ssl else ''}//{self.config.ollama_ip}:{self.config.ollama_port}"
45        
46        await self.com.register_callback_for_promise(self.prm[OLLAMA_REQUEST], self.handle_ollama_request)
47    
48    def get_client(self, host: str | None) -> AsyncClient:
49        if not host:
50            host = self.ollama_base_url
51            
52        if host not in self.ollama_instance_clients:
53            self.ollama_instance_clients[host] = AsyncClient(host=host)
54        return self.ollama_instance_clients[host]
55    
56    async def handle_ollama_request(self, topic, request: OllamaRequest):
57        client = self.get_client(request.ollama_host)
58        response = await get_response(client, request)
59        await self.com.send(self.tpc[OLLAMA_RESPONSE], response)
60    
61    async def step(self):
62        pass
63
64    def terminate(self, control_msg: ControlMsg = None) -> int:
65        exit_code = super().terminate()
66        return exit_code

The base class for a module.

A module is part of the architecture / system that handles a specific part, It communicates via a communications service with other modules. Features define the requirements and what the module provides for the other modules. It can have extensions (defined via features).

Attributes: name: com: (CommunicationService) ext: all extensions of the module in a dict keyed by the extension names. Extensions can also be passed via kwarg to the init if the attribute in the ExtensionFeature is set. log: tpc: wpr:

Examples:

the module config

@classmethod
def provides_features( cls, config: aaambos.core.configuration.module_config.ModuleConfig = Ellipsis) -> dict[<member 'name' of 'Feature' objects>, tuple[aaambos.core.module.feature.Feature, aaambos.core.module.feature.FeatureNecessity]]:
24    @classmethod
25    def provides_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
26        return {
27            OllamaResponseComFeatureOut.name: (OllamaResponseComFeatureOut, SimpleFeatureNecessity.Required),
28            OllamaRequestComFeatureIn.name: (OllamaRequestComFeatureIn, SimpleFeatureNecessity.Required)
29        }

Define/Provide the features that the module provides (com output, etc.)

Args: config: the module config for config related feature turn on / off.

Returns: the features keyed by their name in a dict with the feature necessity (can it be turned on or off).

@classmethod
def requires_features( cls, config: aaambos.core.configuration.module_config.ModuleConfig = Ellipsis) -> dict[<member 'name' of 'Feature' objects>, tuple[aaambos.core.module.feature.Feature, aaambos.core.module.feature.FeatureNecessity]]:
31    @classmethod
32    def requires_features(cls, config: ModuleConfig = ...) -> dict[Feature.name, tuple[Feature, FeatureNecessity]]:
33        return {
34            OllamaResponseComFeatureIn.name: (OllamaResponseComFeatureIn, SimpleFeatureNecessity.Required),
35            OllamaRequestComFeatureOut.name: (OllamaRequestComFeatureOut, SimpleFeatureNecessity.Required)
36        }

Define/Provide the features that the module requires (com input, extensions, etc.)

Args: config: the module config for config related feature turn on / off.

Returns: the features keyed by their name in a dict with the feature necessity (can it be turned on or off).

@classmethod
def get_module_config_class(cls) -> Type[aaambos.core.configuration.module_config.ModuleConfig]:
38    @classmethod
39    def get_module_config_class(cls) -> Type[ModuleConfig]:
40        return OllamaRequestModuleConfig

Provide the config class. Overwrite when the module requires another ModuleConfig (additional parameters).

Returns: the module config class.

async def initialize(self):
42    async def initialize(self):
43        self.ollama_instance_clients = {}
44        self.ollama_base_url = f"http:{'s' if self.config.use_ssl else ''}//{self.config.ollama_ip}:{self.config.ollama_port}"
45        
46        await self.com.register_callback_for_promise(self.prm[OLLAMA_REQUEST], self.handle_ollama_request)

Implement this method that should happen during initialization (callback registration, etc.)

Returns: None

def get_client(self, host: str | None) -> ollama._client.AsyncClient:
48    def get_client(self, host: str | None) -> AsyncClient:
49        if not host:
50            host = self.ollama_base_url
51            
52        if host not in self.ollama_instance_clients:
53            self.ollama_instance_clients[host] = AsyncClient(host=host)
54        return self.ollama_instance_clients[host]
async def handle_ollama_request( self, topic, request: ollama_request.conventions.definitions.OllamaRequest):
56    async def handle_ollama_request(self, topic, request: OllamaRequest):
57        client = self.get_client(request.ollama_host)
58        response = await get_response(client, request)
59        await self.com.send(self.tpc[OLLAMA_RESPONSE], response)
async def step(self):
61    async def step(self):
62        pass

Is called every step of the module. Can be a Generator method but can also be a normal method.

Implement it in your module.

Returns/Yields: None

def terminate( self, control_msg: aaambos.core.supervision.run_time_manager.ControlMsg = None) -> int:
64    def terminate(self, control_msg: ControlMsg = None) -> int:
65        exit_code = super().terminate()
66        return exit_code

Is called during termination. Clean up etc. Is not an async method.

Args: control_msg: can contain information about the reason for termination.

Returns: the exit code of the module.

@define(kw_only=True)
class OllamaRequestModuleConfig(aaambos.core.configuration.module_config.ModuleConfig):
69@define(kw_only=True)
70class OllamaRequestModuleConfig(ModuleConfig):
71    module_path: str = "ollama_request.modules.ollama_request_module"
72    module_info: Type[ModuleInfo] = OllamaRequestModule
73    restart_after_failure: bool = True
74    expected_start_up_time: float | int = 0
75    ollama_ip: str = "localhost"
76    ollama_port: int = 11434
77    use_ssl: bool = False
OllamaRequestModuleConfig( *, module_import_delay: float | int = 0, module_start_delay: float | int = 0, mean_frequency_step: float = 2, general_run_config: 'GeneralRunPart' = None, name: 'ModuleName' = None, ft_info: 'ModuleFeatureInfo | None' = None, com_info: 'ModuleCommunicationInfo | None' = None, module_extension_setups: 'dict[str, tuple[ExtensionSetup, ExtensionFeature]] | None' = None, logging: 'LoggingConfig | None' = None, extra_delay: float | int = 0, module_path: str = 'ollama_request.modules.ollama_request_module', module_info: Type[aaambos.core.module.base.ModuleInfo] = <class 'OllamaRequestModule'>, restart_after_failure: bool = True, expected_start_up_time: float | int = 0, ollama_ip: str = 'localhost', ollama_port: int = 11434, use_ssl: bool = False)
 2def __init__(self, *, module_import_delay=attr_dict['module_import_delay'].default, module_start_delay=attr_dict['module_start_delay'].default, mean_frequency_step=attr_dict['mean_frequency_step'].default, general_run_config=attr_dict['general_run_config'].default, name=attr_dict['name'].default, ft_info=attr_dict['ft_info'].default, com_info=attr_dict['com_info'].default, module_extension_setups=attr_dict['module_extension_setups'].default, logging=attr_dict['logging'].default, extra_delay=attr_dict['extra_delay'].default, module_path=attr_dict['module_path'].default, module_info=attr_dict['module_info'].default, restart_after_failure=attr_dict['restart_after_failure'].default, expected_start_up_time=attr_dict['expected_start_up_time'].default, ollama_ip=attr_dict['ollama_ip'].default, ollama_port=attr_dict['ollama_port'].default, use_ssl=attr_dict['use_ssl'].default):
 3    self.module_import_delay = module_import_delay
 4    self.module_start_delay = module_start_delay
 5    self.mean_frequency_step = mean_frequency_step
 6    self.general_run_config = general_run_config
 7    self.name = name
 8    self.ft_info = ft_info
 9    self.com_info = com_info
10    self.module_extension_setups = module_extension_setups
11    self.logging = logging
12    self.extra_delay = extra_delay
13    self.module_path = module_path
14    self.module_info = module_info
15    self.restart_after_failure = restart_after_failure
16    self.expected_start_up_time = expected_start_up_time
17    self.ollama_ip = ollama_ip
18    self.ollama_port = ollama_port
19    self.use_ssl = use_ssl

Method generated by attrs for class OllamaRequestModuleConfig.

module_path: str
module_info: Type[aaambos.core.module.base.ModuleInfo]
restart_after_failure: bool
expected_start_up_time: float | int
ollama_ip: str
ollama_port: int
use_ssl: bool
def provide_module():
80def provide_module():
81    return OllamaRequestModule