Python Apply Changes example
This Python script demonstrates a Apply Changes implementation for the Breeze Runtime software, allowing communication with the Breeze Runtime server for executing workflows and managing real-time data processing tasks. The client performs essential operations such as connecting to the server, sending commands, listening for events, and handling progress updates. Below is an overview of the key functionalities:
Server Connection: The client connects to the Breeze Runtime server using designated command, event, and stream ports, allowing communication for both command execution and event handling.
Command Execution: The client sends commands to the server, such as applying changes using specific XML configurations and raw data files. Each command is assigned a unique identifier, and the client manages the server's responses, ensuring that commands are executed correctly.
Event Stream Listening: The client listens for server events, processes these in real-time, and updates a console-based progress bar accordingly. Event data, typically received in JSON format, is parsed to extract relevant information such as progress updates or informational messages.
Workflow Management: The script loads a specific workflow (e.g., the Runtime Classification of nuts), takes dark and white references, and starts the prediction process on spectral data. The workflow path is dynamically constructed from the server's workspace property.
import socket
import json
import uuid
import threading
from tqdm import tqdm # pip install tqdm
# Server host and port configuration
HOST = '127.0.0.1'
COMMAND_PORT = 2000
EVENT_PORT = 2500
# Flag to control thread execution
stop_event = threading.Event()
def start_client():
soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
soc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
soc.connect((HOST, COMMAND_PORT))
soc.settimeout(120)
return soc
def send_command(command_socket, command):
command_id = uuid.uuid4().hex[:8]
print(f"Sending command '{command.get('Command')}' with id {command_id}")
command['Id'] = command_id
message = json.dumps(command, separators=(',', ':')) + '\r\n'
command_socket.sendall(message.encode('utf-8'))
full_response = ''
while True:
try:
part = command_socket.recv(1024).decode('utf-8')
full_response += part
if '\r\n' in part:
break
except socket.timeout:
print("Request timed out")
return None
try:
response_json = json.loads(full_response)
if response_json.get('Id') == command_id:
return response_json
except json.JSONDecodeError:
print(f"Invalid JSON received: {full_response}")
return None
return None
def listen_for_events():
progress_bar = None
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as event_socket:
event_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
event_socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
event_socket.connect((HOST, EVENT_PORT))
message_buffer = ""
while not stop_event.is_set():
event_socket.settimeout(1)
try:
data = event_socket.recv(1024).decode('utf-8')
if not data:
break
message_buffer += data
while '\r\n' in message_buffer:
message, message_buffer = message_buffer.split('\r\n', 1)
try:
message_json = json.loads(message.strip())
event_message = message_json.get('Message', '')
event_code = message_json.get('Code', 0)
if event_code == 5000: # Progress update
if progress_bar is not None:
progress_bar.update(int(event_message))
elif event_code == 5002: # Info message
info_text = event_message
if progress_bar is not None:
progress_bar.set_description(info_text)
tqdm.write(f"Info: {info_text}")
elif event_code == 5003: # Progress initialization
if progress_bar is not None:
progress_bar.close()
total = int(event_message)
progress_bar = tqdm(total=total, leave=False)
except json.JSONDecodeError:
continue
except socket.timeout:
continue
def handle_response(response):
if not response:
tqdm.write(f"No response or incorrect response ID received: {response}")
if not response.get("Success", False):
tqdm.write(f"Command not successful: {response.get('Message', response)}")
message = str(response.get('Message'))
tqdm.write(f"Id: {response.get('Id')} successfully received: {message[:100]}" + ("..." if len(message) > 100 else ""))
def main():
with start_client() as s:
event_listener_thread = threading.Thread(target=listen_for_events)
event_listener_thread.start()
handle_response(send_command(s, {"Command": "ApplyChanges",
"XmlFile": "[Path to workflow xml]",
"Files": [
"ENVI file absolut path"
]}))
stop_event.set()
event_listener_thread.join()
print("Done")
if __name__ == '__main__':
main()