Component Interaction

This is the IRIS Framework.

IRIS Framework

The components inside of IRIS represent a production.

You have 3 main components: - BusinessService: This component aims to recieve requests from external applications. - BusinessProcess: This component is responsible for processing the requests and orchestrating the business logic. - BusinessOperation: This component is responsible for executing the technical operations that are needed to fulfill the requests.

The composite applications will give us access to the production through external applications like REST services.

The arrows between them all of this components are messages.

The messages are the way that the components interact with each other.

How to exchange messages

To exchange messages between components, you can use the following methods:

  • send_request_sync: This method is used to send a request synchronously. It will wait for a response before continuing.
  • send_request_async: This method is used to send a request asynchronously. It will not wait for a response before continuing.
  • send_request_async_ng: Same as send_request_async, but with an ayncio implementation.
  • send_multi_request_sync: This method is used to send multiple requests synchronously. It will wait for all responses before continuing.
  • send_generator_request: This method is used to send a request synchronously and return a generator.

send_request_sync

This method is used to send a request synchronously. It will wait for a response before continuing.

Function signature

def send_request_sync(self, target: str, request: Union[Message, Any], 
                        timeout: int = -1, description: Optional[str] = None) -> Any:
    """Send message synchronously to target component.

    Args:
        target: Name of target component
        request: Message to send
        timeout: Timeout in seconds, -1 means wait forever 
        description: Optional description for logging

    Returns:
        Response from target component

    Raises:
        TypeError: If request is invalid type
    """
    ...

Example usage

from iop import BusinessProcess
from msg import MyMessage

class MyBP(BusinessProcess):

    def on_message(self, request):
        msg = MyMessage(message="Hello World")
        # Send a synchronous request to the target component
        response = self.send_request_sync("Python.MyBO", msg)
        self.log_info(f"Received response: {response}")

send_request_async

This method is used to send a request asynchronously. It will not wait for a response before continuing.

Function signature (for BusinessService and BusinessOperation)

def send_request_async(self, target: str, request: Union[Message, Any], 
                        description: Optional[str] = None) -> None:
    """Send message asynchronously to target component.

    Args:
        target: Name of target component
        request: Message to send
        description: Optional description for logging

    Raises:
        TypeError: If request is invalid type
    """
    ...

Example usage (for BusinessService and BusinessOperation)

from iop import BusinessService
from msg import MyMessage

class MyBS(BusinessService):

    def on_message(self, request):
        msg = MyMessage(message="Hello World")
        # Send an asynchronous request to the target component
        self.send_request_async("Python.MyBO", msg)
        self.log_info("Request sent asynchronously")

Function signature (for BusinessProcess)

def send_request_async(self, target: str, request: Any, description: Optional[str]=None, completion_key: Optional[str]=None, response_required: bool=True) -> None:
    """Send the specified message to the target business process or business operation asynchronously.

    Args:
        target: The name of the business process or operation to receive the request
        request: The message to send to the target
        description: An optional description property in the message header
        completion_key: A token to identify the completion of the request
        response_required: Whether a response is required

    Raises:
        TypeError: If request is not of type Message or IRISObject
    """
    ...

Example usage (for BusinessProcess)

from iop import BusinessProcess
from msg import MyMessage

class MyBP(BusinessProcess):

    def on_message(self, request):
        msg_one = MyMessage(message="Message1")
        msg_two = MyMessage(message="Message2")

        self.send_request_async("Python.MyBO", msg_one,completion_key="1")
        self.send_request_async("Python.MyBO", msg_two,completion_key="2")

    def on_response(self, request, response, call_request, call_response, completion_key):
        if completion_key == "1":
            self.response_one = call_response
        elif completion_key == "2":
            self.response_two = call_response

    def on_complete(self, request, response):
        self.log_info(f"Received response one: {self.response_one.message}")
        self.log_info(f"Received response two: {self.response_two.message}")

send_request_async_ng

This method is used to send a request asynchronously using an asyncio implementation.

Function signature

async def send_request_async_ng(self, target: str, request: Union[Message, Any], 
                                   timeout: int = -1, description: Optional[str] = None) -> Any:
    """Send message asynchronously to target component with asyncio.

    Args:
        target: Name of target component
        request: Message to send
        timeout: Timeout in seconds, -1 means wait forever 
        description: Optional description for logging

    Returns:
        Response from target component
    """
    ...

Example usage

import asyncio
import random

from iop import BusinessProcess
from msg import MyMessage


class MyAsyncNGBP(BusinessProcess):

    def on_message(self, request):

        results = asyncio.run(self.await_response(request))

        for result in results:
            self.logger.info(f"Received response: {result.message}")

        return MyMessage(message="All responses received")

    async def await_response(self, request):
        # create 1 to 10 messages
        tasks = []
        for i in range(random.randint(1, 10)):
            tasks.append(self.send_request_async_ng("Python.MyAsyncNGBO",
                                                    MyMessage(message=f"Message {i}")))

        return await asyncio.gather(*tasks)

send_multi_request_sync

This method is used to send multiple requests synchronously. It will wait for all responses before continuing.

Function signature

def send_multi_request_sync(self, target_request: List[Tuple[str, Union[Message, Any]]], 
                               timeout: int = -1, description: Optional[str] = None) -> List[Tuple[str, Union[Message, Any], Any, int]]:
    """Send multiple messages synchronously to target components.

    Args:
        target_request: List of tuples (target, request) to send
        timeout: Timeout in seconds, -1 means wait forever 
        description: Optional description for logging

    Returns:
        List of tuples (target, request, response, status)

    Raises:
        TypeError: If target_request is not a list of tuples
        ValueError: If target_request is empty
    """
...

Example usage

from iop import BusinessProcess
from msg import MyMessage


class MyMultiBP(BusinessProcess):

    def on_message(self, request):
        msg_one = MyMessage(message="Message1")
        msg_two = MyMessage(message="Message2")

        tuple_responses = self.send_multi_request_sync([("Python.MyMultiBO", msg_one),
                                                        ("Python.MyMultiBO", msg_two)])

        self.log_info("All requests have been processed")
        for target,request,response,status in tuple_responses:
            self.log_info(f"Received response: {response.message}")

send_generator_request

This method is used to send a request synchronously and return a generator.

Function signature

    def send_generator_request(self, target: str, request: Union[Message, Any], 
                              timeout: int = -1, description: Optional[str] = None) -> _GeneratorRequest:
    """Send message as a generator request to target component.
    Args:
        target: Name of target component
        request: Message to send
        timeout: Timeout in seconds, -1 means wait forever
        description: Optional description for logging
    Returns:
        _GeneratorRequest: An instance of _GeneratorRequest to iterate over responses
    Raises:
        TypeError: If request is not of type Message
    """
    ...

Example usage

from typing import Any
from iop import BusinessProcess,BusinessOperation

from msg import MyGenerator,MyGeneratorResponse

class MyGeneratorProcess(BusinessProcess):

    def on_request(self, request: Any) -> Any:
        gen = self.send_generator_request(
            target="User.MyGeneratorOperation",
            request=MyGenerator(my_string="Hello, World!"),
            timeout=10,
            description="My generator request")

        for response in gen:
            self.log_info(f"Received response: {response}")

class MyGeneratorOperation(BusinessOperation):

    def on_private_session_started(self, request: MyGenerator) -> Any:
        # return the generator to the process that called it
        return self.my_generator(request)

    def my_generator(self, request: Any) -> Any:
        self.log_info(f"Processing request: {request}")
        # Simulate some processing and yield responses
        for i in range(5):
            response = f"Response {i} from MyGeneratorOperation"
            self.log_info(response)
            yield MyGeneratorResponse(my_other_string=response) # notice that we yield a response here