PyModbus
PyModbusDocs

Error Handling Best Practices

Handle Modbus errors properly with PyModbus. Exception codes, timeouts, retries, logging, and recovery strategies explained.

Error Handling in PyModbus

Handle all the things that go wrong: timeouts, exceptions, connection failures, and invalid data.

Exception Types

from pymodbus.exceptions import (
    ModbusException,          # Base exception
    ModbusIOException,        # I/O errors
    ParameterException,       # Invalid parameters
    NoSuchSlaveException,     # Slave doesn't exist
    NotImplementedException, # Function not supported
    ConnectionException,      # Connection issues
    InvalidMessageReceivedException  # Corrupt message
)

try:
    result = client.read_holding_registers(0, 10)
except ModbusIOException as e:
    print(f"I/O error: {e}")
except ConnectionException as e:
    print(f"Connection error: {e}")
except ModbusException as e:
    print(f"Modbus error: {e}")
except Exception as e:
    print(f"Unexpected error: {e}")

Modbus Exception Codes

def decode_exception(result):
    """Decode Modbus exception response."""
    if not result.isError():
        return None
    
    exceptions = {
        0x01: "Illegal Function - Function code not supported",
        0x02: "Illegal Data Address - Address not allowed",
        0x03: "Illegal Data Value - Value out of range",
        0x04: "Slave Device Failure - Device error",
        0x05: "Acknowledge - Request accepted, processing",
        0x06: "Slave Device Busy - Try again later",
        0x08: "Memory Parity Error - Device memory error",
        0x0A: "Gateway Path Unavailable - Gateway error",
        0x0B: "Gateway Target Failed - Target not responding"
    }
    
    code = getattr(result, 'exception_code', None)
    if code:
        return exceptions.get(code, f"Unknown exception: {code}")
    return str(result)

# Use it
result = client.read_holding_registers(9999, 10)
error = decode_exception(result)
if error:
    print(f"Error: {error}")

Retry Logic

import time
from functools import wraps

def retry_on_error(max_retries=3, delay=1.0, backoff=2.0):
    """Decorator for automatic retry."""
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            current_delay = delay
            
            while retries < max_retries:
                try:
                    result = func(*args, **kwargs)
                    
                    # Check if Modbus error
                    if hasattr(result, 'isError') and result.isError():
                        raise ModbusException(f"Modbus error: {result}")
                    
                    return result
                    
                except Exception as e:
                    retries += 1
                    if retries >= max_retries:
                        print(f"Failed after {max_retries} retries: {e}")
                        raise
                    
                    print(f"Retry {retries}/{max_retries} after {current_delay}s")
                    time.sleep(current_delay)
                    current_delay *= backoff
            
        return wrapper
    return decorator

# Use decorator
@retry_on_error(max_retries=3, delay=0.5)
def read_temperature(client):
    result = client.read_holding_registers(100, 1)
    return result.registers[0] / 10.0

# Will retry automatically
temp = read_temperature(client)

Connection Recovery

class ResilientModbusClient:
    """Client that recovers from connection failures."""
    
    def __init__(self, host, port=502):
        self.host = host
        self.port = port
        self.client = None
        self.connected = False
        self.reconnect_delay = 1.0
        self.max_reconnect_delay = 30.0
    
    def connect(self):
        """Connect with exponential backoff."""
        delay = self.reconnect_delay
        
        while not self.connected:
            try:
                if self.client:
                    self.client.close()
                
                self.client = ModbusTcpClient(self.host, self.port)
                self.connected = self.client.connect()
                
                if self.connected:
                    print(f"Connected to {self.host}")
                    self.reconnect_delay = 1.0  # Reset delay
                    return True
                
            except Exception as e:
                print(f"Connection failed: {e}")
            
            print(f"Retrying in {delay}s...")
            time.sleep(delay)
            delay = min(delay * 2, self.max_reconnect_delay)
        
        return False
    
    def read(self, address, count, slave=1):
        """Read with automatic reconnection."""
        if not self.connected:
            self.connect()
        
        try:
            result = self.client.read_holding_registers(address, count, slave)
            
            if result.isError():
                # Check if connection error
                if result.exception_code in [0x0A, 0x0B]:
                    self.connected = False
                    return self.read(address, count, slave)  # Retry
                
            return result
            
        except Exception as e:
            print(f"Read error: {e}")
            self.connected = False
            return self.read(address, count, slave)  # Retry

Timeout Handling

import signal
from contextlib import contextmanager

class TimeoutError(Exception):
    pass

@contextmanager
def timeout(seconds):
    """Context manager for operation timeout."""
    def timeout_handler(signum, frame):
        raise TimeoutError(f"Operation timed out after {seconds}s")
    
    # Set alarm
    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    
    try:
        yield
    finally:
        signal.alarm(0)  # Cancel alarm

# Use timeout
try:
    with timeout(5):
        # This must complete within 5 seconds
        result = client.read_holding_registers(0, 100)
        print(result.registers)
except TimeoutError as e:
    print(f"Timeout: {e}")

Logging Errors

import logging
from datetime import datetime

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('modbus_errors.log'),
        logging.StreamHandler()
    ]
)

logger = logging.getLogger('modbus')

class LoggingModbusClient:
    """Client with comprehensive logging."""
    
    def __init__(self, host):
        self.host = host
        self.client = ModbusTcpClient(host)
        self.error_count = 0
        self.success_count = 0
    
    def read(self, address, count, slave=1):
        """Read with logging."""
        start_time = time.time()
        
        try:
            result = self.client.read_holding_registers(address, count, slave)
            elapsed = time.time() - start_time
            
            if result.isError():
                self.error_count += 1
                logger.error(
                    f"Read error: addr={address}, count={count}, "
                    f"slave={slave}, error={result}, time={elapsed:.3f}s"
                )
                return None
            
            self.success_count += 1
            logger.debug(
                f"Read success: addr={address}, count={count}, "
                f"slave={slave}, time={elapsed:.3f}s"
            )
            
            return result.registers
            
        except Exception as e:
            self.error_count += 1
            elapsed = time.time() - start_time
            logger.exception(
                f"Read exception: addr={address}, count={count}, "
                f"slave={slave}, error={e}, time={elapsed:.3f}s"
            )
            return None
    
    def get_stats(self):
        """Get error statistics."""
        total = self.success_count + self.error_count
        if total > 0:
            error_rate = (self.error_count / total) * 100
            logger.info(
                f"Stats: {self.success_count} success, "
                f"{self.error_count} errors ({error_rate:.1f}%)"
            )
        return self.success_count, self.error_count

Validation

def validate_register_value(value, min_val=None, max_val=None, name="Value"):
    """Validate register value."""
    if value is None:
        raise ValueError(f"{name} is None")
    
    if not isinstance(value, (int, float)):
        raise TypeError(f"{name} must be numeric, got {type(value)}")
    
    if min_val is not None and value < min_val:
        raise ValueError(f"{name} {value} below minimum {min_val}")
    
    if max_val is not None and value > max_val:
        raise ValueError(f"{name} {value} above maximum {max_val}")
    
    return True

def safe_read_temperature(client):
    """Read temperature with validation."""
    try:
        result = client.read_holding_registers(100, 1)
        
        if result.isError():
            raise ModbusException(f"Read error: {result}")
        
        raw_value = result.registers[0]
        temperature = raw_value / 10.0
        
        # Validate temperature range
        validate_register_value(
            temperature,
            min_val=-50,
            max_val=200,
            name="Temperature"
        )
        
        return temperature
        
    except (ModbusException, ValueError) as e:
        logger.error(f"Temperature read failed: {e}")
        return None

Circuit Breaker Pattern

class CircuitBreaker:
    """Circuit breaker for failing connections."""
    
    def __init__(self, failure_threshold=5, recovery_timeout=60):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout
        self.failure_count = 0
        self.last_failure = None
        self.state = 'CLOSED'  # CLOSED, OPEN, HALF_OPEN
    
    def call(self, func, *args, **kwargs):
        """Call function with circuit breaker."""
        if self.state == 'OPEN':
            if time.time() - self.last_failure > self.recovery_timeout:
                self.state = 'HALF_OPEN'
                print("Circuit breaker: Trying recovery")
            else:
                raise Exception("Circuit breaker is OPEN")
        
        try:
            result = func(*args, **kwargs)
            
            if self.state == 'HALF_OPEN':
                self.state = 'CLOSED'
                self.failure_count = 0
                print("Circuit breaker: Recovered")
            
            return result
            
        except Exception as e:
            self.failure_count += 1
            self.last_failure = time.time()
            
            if self.failure_count >= self.failure_threshold:
                self.state = 'OPEN'
                print(f"Circuit breaker: OPEN after {self.failure_count} failures")
            
            raise

# Use circuit breaker
breaker = CircuitBreaker(failure_threshold=3, recovery_timeout=30)

def read_with_breaker(client, address, count):
    try:
        return breaker.call(client.read_holding_registers, address, count)
    except Exception as e:
        print(f"Failed: {e}")
        return None

Health Check

class ModbusHealthCheck:
    """Monitor Modbus device health."""
    
    def __init__(self, client, check_register=0):
        self.client = client
        self.check_register = check_register
        self.consecutive_failures = 0
        self.max_failures = 3
    
    def is_healthy(self):
        """Check if device is responding."""
        try:
            result = self.client.read_holding_registers(
                self.check_register, 1
            )
            
            if result.isError():
                self.consecutive_failures += 1
            else:
                self.consecutive_failures = 0
                return True
                
        except Exception:
            self.consecutive_failures += 1
        
        if self.consecutive_failures >= self.max_failures:
            logger.warning(f"Device unhealthy: {self.consecutive_failures} failures")
            return False
        
        return True
    
    def wait_until_healthy(self, timeout=60):
        """Wait for device to become healthy."""
        start = time.time()
        
        while time.time() - start < timeout:
            if self.is_healthy():
                return True
            time.sleep(2)
        
        return False

Always implement proper error handling in production. Unhandled errors can crash your application or corrupt data.

Error Recovery Strategies

  1. Retry with backoff - Don't hammer failing devices
  2. Circuit breaker - Stop trying when device is down
  3. Fallback values - Use last known good value
  4. Graceful degradation - Continue with reduced functionality
  5. Alert on persistent errors - Notify operators

Next Steps