Skip to main content
Skip table of contents

Python client example

This Python script demonstrates a simple client implementation for the Breeze Runtime software, allowing communication with the Breeze Runtime server for executing workflows and managing real-time data processing tasks. The client performs key operations such as connecting to the server, sending commands, listening for events, and streaming data. Below is an overview of the key functionalities:

  1. Server Connection: The client connects to the Breeze Runtime server using designated command, event, and stream ports, allowing communication for both command execution and event handling.

  2. Command Execution: It sends various commands to the server (e.g., initializing the camera, loading a workflow, and starting/stopping predictions). Each command is sent with a unique identifier, and the client handles the server's responses, ensuring correct execution.

  3. Event Stream Listening: The client listens for server events, such as prediction object data, and processes these in real-time. Event data is received in JSON format, parsed, and key information such as classification results is extracted and printed.

    • The field CameraId can be used to detect the source of the event if multiple sensors are being used

  4. Data Stream Listening: A separate thread listens for data streams from the server. The client parses the stream header to extract details such as stream type, frame number, and timestamps, allowing further processing of streamed data.

    • Each sensor (if multiple cameras are being used) has its own data stream starting at Data Stream Port and adding one for each camera, e.g,

      • First Camera (CameraId = 0) - Data Stream Port = 3000

      • Second Camera (CameraId = 1) - Data Stream Port = 3001

  5. Workflow Management: The script loads a specific workflow (e.g., the Runtime Classification of nuts), takes dark and white references, and starts the prediction process on spectral data. The workflow path is dynamically constructed from the server's workspace property.

This client offers a foundation for integrating with the Breeze Runtime software, demonstrating how to handle real-time event and data stream processing for spectral analysis workflows.

Untitled-20240815-080623.png

Breeze Runtime streams

PY
import socket
import json
import uuid
import threading
from datetime import datetime, timedelta
from dateutil import tz  # python-dateutil - for time zone adjustment of dates

# Server host and port configuration
HOST = '127.0.0.1'
COMMAND_PORT = 2000
EVENT_PORT = 2500
DATA_STREAM_PORT = 3000

# Flag to control thread execution
stop_event = threading.Event()


def start_command_client():
    soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    soc.connect((HOST, COMMAND_PORT))
    soc.settimeout(120)
    return soc


def send_command(command_socket, command):
    command_id = uuid.uuid4().hex[:8]
    print(f"Sending command '{command.get('Command')}' with id {command_id}")
    command['Id'] = command_id
    message = json.dumps(command, separators=(',', ':')) + '\r\n'

    command_socket.sendall(message.encode('utf-8'))

    message_buffer = ""

    while True:
        try:
            part = command_socket.recv(1024).decode('utf-8')
            if not part:
                break  # Socket closed by the server

            message_buffer += part

            while '\r\n' in message_buffer:
                full_response_str, message_buffer = message_buffer.split('\r\n', 1)

                try:
                    response_json = json.loads(full_response_str.strip())

                    if response_json.get('Id') == command_id:
                        return response_json

                except json.JSONDecodeError:
                    print(f"Invalid JSON received: {full_response_str}")
                    continue

        except socket.timeout:
            print("Request timed out")
            return None

    return None


def listen_for_events():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as event_socket:
        event_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        event_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        event_socket.connect((HOST, EVENT_PORT))

        message_buffer = ""

        while not stop_event.is_set():
            event_socket.settimeout(1)
            try:
                data = event_socket.recv(1024).decode('utf-8')
                if not data:
                    break

                message_buffer += data

                while '\r\n' in message_buffer:
                    message, message_buffer = message_buffer.split('\r\n', 1)
                    try:
                        message_json = json.loads(message)
                        event = message_json.get('Event', '')
                        inner_message = json.loads(message_json.get('Message', '{}'))
                        if event == "PredictionObject":
                            descriptors = inner_message.get('Descriptors', [])

                            start_date = convert_ticks_to_datetime(inner_message.get('StartTime', 0))
                            end_date = convert_ticks_to_datetime(inner_message.get('EndTime', 0))

                            start_line = inner_message.get('StartLine', 0)
                            end_line = inner_message.get('EndLine', 0)

                            camera_id = inner_message.get('CameraId', 0) # If multiple cameras are used
                            print(
                                f"start line:{start_line} end line:{end_line} start time:{start_date} end time:{end_date} classification:{descriptors[0]} cameraId:{camera_id}")

                            if (shape := inner_message.get('Shape')) is not None:
                                center_of_object = [int(coord) for coord in shape.get('Center', [])] # [X,Y]
                                border_of_object = [[int(coord) for coord in point] for point in shape.get('Border', [])] # [[X,Y]..]
                                # print(f"shape - center:{center_of_object} border:{border_of_object}")
                                # NOTE: Quite a verbose output

                        else:
                            print(f"event:{event} message:{inner_message}")
                    except json.JSONDecodeError:
                        print("Invalid JSON received")
            except socket.timeout:
                continue


def listen_for_data_stream():
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as stream_socket:
        stream_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        stream_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
        stream_socket.connect((HOST, DATA_STREAM_PORT))

        expected_header_size = 25  # 1 + 8 + 8 + 4 + 4 bytes = 25 bytes

        while not stop_event.is_set():
            stream_socket.settimeout(1)
            try:
                header = b""

                # Ensure that the full header is received
                while len(header) < expected_header_size:
                    chunk = stream_socket.recv(expected_header_size - len(header))
                    if not chunk:
                        break  # connection closed
                    header += chunk

                if len(header) != expected_header_size:
                    print("Incomplete header received")
                    continue

                # Manually parse the header
                stream_type = header[0]
                frame_number = int.from_bytes(header[1:9], byteorder='little', signed=True)
                timestamp = int.from_bytes(header[9:17], byteorder='little', signed=False)
                metadata_size = int.from_bytes(header[17:21], byteorder='little', signed=False)
                data_body_size = int.from_bytes(header[21:25], byteorder='little', signed=False)

                # Print the parsed values
                print(f"Stream Type: {stream_type}, Frame Number: {frame_number}, "
                      f"Timestamp: {timestamp}, Metadata Size: {metadata_size}, Data Body Size: {data_body_size}")

                # Skip metadata and data body based on their sizes
                stream_socket.recv(metadata_size)
                stream_socket.recv(data_body_size)

            except socket.timeout:
                continue


def convert_ticks_to_datetime(ticks):
    return (datetime(1, 1, 1) + timedelta(microseconds=ticks // 10)).replace(tzinfo=tz.tzutc()).astimezone(tz.tzlocal())


def handle_response(response):
    if not response:
        raise ValueError(f"No response or incorrect response ID received: {response}")

    message = response.get('Message', '')
    if not response.get("Success", False):
        raise RuntimeError(f"Command not successful: {message}")

    print(f"Id: {response.get('Id')} successfully received message body: '{message[:100]}{"..." if len(message) > 100 else ""}'")

    return message


def main():
    with start_command_client() as command_socket:
        handle_response(send_command(command_socket, {"Command": "InitializeCamera"}))
        # Get current Breeze workspace path
        ws = handle_response(send_command(command_socket, {"Command": "GetProperty", "Property": "WorkspacePath"}))
        # This is using the tutorial default workflow from nuts tutorial
        # see https://help.prediktera.com/breeze-runtime/runtime-classification-of-nuts
        workflow_path = f"{ws}/Data/Runtime/Nuts_Classification.xml"
        handle_response(send_command(command_socket, {"Command": "LoadWorkflow", "FilePath": workflow_path}))
        handle_response(send_command(command_socket, {"Command": "TakeDarkReference"}))
        handle_response(send_command(command_socket, {"Command": "TakeWhiteReference"}))
        handle_response(send_command(command_socket, {"Command": "StartPredict", "IncludeObjectShape": True}))

        event_listener_thread = threading.Thread(target=listen_for_events)
        data_stream_listener_thread = threading.Thread(target=listen_for_data_stream)

        event_listener_thread.start()
        data_stream_listener_thread.start()

        input("Press Enter to stop prediction...\n")
        try:
            response = send_command(command_socket, {"Command": "StopPredict"})
            handle_response(response)
        except (ValueError, RuntimeError) as e:
            print(f"Error during stop prediction: {e}")
        finally:
            stop_event.set()
            event_listener_thread.join()
            data_stream_listener_thread.join()

        print("Done")


if __name__ == '__main__':
    main()

JavaScript errors detected

Please note, these errors can depend on your browser setup.

If this problem persists, please contact our support.