ollama_request.conventions.definitions

  1from datetime import datetime
  2from enum import Enum
  3
  4from msgspec import Struct, field as m_field
  5from aaambos.std.communications.utils import create_promise_and_unit, create_in_out_com_feature
  6from aaambos.std.communications.categories import MSGSPEC_STRUCT_TYPE            
  7
  8OLLAMA_REQUEST = "OllamaRequest"
  9
 10class OllamaRequestType(Enum):
 11    GENERATE = "generate"
 12    CHAT = "chat"
 13
 14class OllamaRequest(Struct):
 15    """OllamaRequest datastructure"""
 16    # TODO add attributes of the msg
 17    request_id: str
 18    model: str
 19    prompt: str | None = None
 20    system: str | None = None
 21    format: str | None = None
 22    request_type: OllamaRequestType = OllamaRequestType.GENERATE
 23    ollama_host: str | None = None
 24    messages: list[dict] | None = None
 25    time: datetime = m_field(default_factory=lambda: datetime.now())
 26
 27
 28    def to_generate_kwargs(self):
 29        return {f: getattr(self, f) for f in self.__struct_fields__ if f not in ["time", "ollama_host", "request_type", "request_id", "messages"]}
 30    
 31    def to_chat_kwargs(self):
 32        return {f: getattr(self, f) for f in self.__struct_fields__ if
 33                f in ["model", "messages", "format", "tools", "keep_alive", "options"]}
 34
 35
 36OllamaRequestPromise, OllamaRequestUnit = create_promise_and_unit(
 37    name=OLLAMA_REQUEST,
 38    payload_wrapper=OllamaRequest,
 39    unit_description="__description of the content of the msg__",
 40    frequency=1,
 41    version_str="0.1.0",
 42    required_service_categories=[{MSGSPEC_STRUCT_TYPE}],
 43)
 44
 45OllamaRequestComFeatureIn, OllamaRequestComFeatureOut = create_in_out_com_feature(
 46    name=OLLAMA_REQUEST, promise=OllamaRequestPromise, version_str="0.1.0", requirements=[], version_compatibility_str=">=0.1.0"
 47)
 48
 49# # add the features to your modules
 50# # modules that send the feature (provide out-feature (and require that some module provides the in-feature)):
 51# # inside the dict of the provides_features method:
 52# OllamaRequestComFeatureOut.name: (OllamaRequestComFeatureOut, SimpleFeatureNecessity.Required),
 53# # inside the dict of the requires_features method:
 54# OllamaRequestComFeatureIn.name: (OllamaRequestComFeatureIn, SimpleFeatureNecessity.Required)
 55
 56# # modules that receive the feature (provide in-feature (and require that some module provides the out-feature)):
 57# # inside the dict of the provides_features method:
 58# OllamaRequestComFeatureIn.name: (OllamaRequestComFeatureIn, SimpleFeatureNecessity.Required)
 59# # inside the dict of the requires_features method:
 60# OllamaRequestComFeatureOut.name: (OllamaRequestComFeatureOut, SimpleFeatureNecessity.Required),
 61# # imports: from aaambos.core.module.feature import FeatureNecessity, Feature, SimpleFeatureNecessity
 62# # and the OllamaRequestComFeatureOut and OllamaRequestComFeatureIn based on their location.
 63
 64class OllamaResponseStatus(Enum):
 65    SUCCESS = "success"
 66    ERROR = "error"
 67
 68
 69OLLAMA_RESPONSE = "OllamaResponse"
 70
 71class OllamaResponse(Struct):
 72    """OllamaResponse datastructure"""
 73    # TODO add attributes of the msg
 74    request_id: str
 75    response: dict
 76    status: OllamaResponseStatus
 77    time: datetime = m_field(default_factory=lambda: datetime.now())
 78
 79
 80OllamaResponsePromise, OllamaResponseUnit = create_promise_and_unit(
 81    name=OLLAMA_RESPONSE,
 82    payload_wrapper=OllamaResponse,
 83    unit_description="__description of the content of the msg__",
 84    frequency=1,
 85    version_str="0.1.0",
 86    required_service_categories=[{MSGSPEC_STRUCT_TYPE}],
 87)
 88
 89OllamaResponseComFeatureIn, OllamaResponseComFeatureOut = create_in_out_com_feature(
 90    name=OLLAMA_RESPONSE, promise=OllamaResponsePromise, version_str="0.1.0", requirements=[], version_compatibility_str=">=0.1.0"
 91)
 92
 93# # add the features to your modules
 94# # modules that send the feature (provide out-feature (and require that some module provides the in-feature)):
 95# # inside the dict of the provides_features method:
 96# OllamaResponseComFeatureOut.name: (OllamaResponseComFeatureOut, SimpleFeatureNecessity.Required),
 97# # inside the dict of the requires_features method:
 98# OllamaResponseComFeatureIn.name: (OllamaResponseComFeatureIn, SimpleFeatureNecessity.Required)
 99
100# # modules that receive the feature (provide in-feature (and require that some module provides the out-feature)):
101# # inside the dict of the provides_features method:
102# OllamaResponseComFeatureIn.name: (OllamaResponseComFeatureIn, SimpleFeatureNecessity.Required)
103# # inside the dict of the requires_features method:
104# OllamaResponseComFeatureOut.name: (OllamaResponseComFeatureOut, SimpleFeatureNecessity.Required),
105# # imports: from aaambos.core.module.feature import FeatureNecessity, Feature, SimpleFeatureNecessity
106# # and the OllamaResponseComFeatureOut and OllamaResponseComFeatureIn based on their location.
OLLAMA_REQUEST = 'OllamaRequest'
class OllamaRequestType(enum.Enum):
11class OllamaRequestType(Enum):
12    GENERATE = "generate"
13    CHAT = "chat"

An enumeration.

GENERATE = <OllamaRequestType.GENERATE: 'generate'>
CHAT = <OllamaRequestType.CHAT: 'chat'>
class OllamaRequest(msgspec.Struct):
15class OllamaRequest(Struct):
16    """OllamaRequest datastructure"""
17    # TODO add attributes of the msg
18    request_id: str
19    model: str
20    prompt: str | None = None
21    system: str | None = None
22    format: str | None = None
23    request_type: OllamaRequestType = OllamaRequestType.GENERATE
24    ollama_host: str | None = None
25    messages: list[dict] | None = None
26    time: datetime = m_field(default_factory=lambda: datetime.now())
27
28
29    def to_generate_kwargs(self):
30        return {f: getattr(self, f) for f in self.__struct_fields__ if f not in ["time", "ollama_host", "request_type", "request_id", "messages"]}
31    
32    def to_chat_kwargs(self):
33        return {f: getattr(self, f) for f in self.__struct_fields__ if
34                f in ["model", "messages", "format", "tools", "keep_alive", "options"]}

OllamaRequest datastructure

request_id: str
model: str
prompt: str | None
system: str | None
format: str | None
request_type: OllamaRequestType
ollama_host: str | None
messages: list[dict] | None
time: datetime.datetime
def to_generate_kwargs(self):
29    def to_generate_kwargs(self):
30        return {f: getattr(self, f) for f in self.__struct_fields__ if f not in ["time", "ollama_host", "request_type", "request_id", "messages"]}
def to_chat_kwargs(self):
32    def to_chat_kwargs(self):
33        return {f: getattr(self, f) for f in self.__struct_fields__ if
34                f in ["model", "messages", "format", "tools", "keep_alive", "options"]}
class OllamaResponseStatus(enum.Enum):
65class OllamaResponseStatus(Enum):
66    SUCCESS = "success"
67    ERROR = "error"

An enumeration.

SUCCESS = <OllamaResponseStatus.SUCCESS: 'success'>
ERROR = <OllamaResponseStatus.ERROR: 'error'>
OLLAMA_RESPONSE = 'OllamaResponse'
class OllamaResponse(msgspec.Struct):
72class OllamaResponse(Struct):
73    """OllamaResponse datastructure"""
74    # TODO add attributes of the msg
75    request_id: str
76    response: dict
77    status: OllamaResponseStatus
78    time: datetime = m_field(default_factory=lambda: datetime.now())

OllamaResponse datastructure

request_id: str
response: dict
time: datetime.datetime