PyModbus
PyModbusDocs

Creating a Modbus TCP Client

Build robust Modbus TCP clients with PyModbus. Connection handling, timeouts, retry logic, and connection pooling best practices.

Modbus TCP Client

Connect to Modbus devices over Ethernet. Handle connections, timeouts, and errors properly.

Basic TCP Client

from pymodbus.client import ModbusTcpClient

# Create client
client = ModbusTcpClient(
    host='192.168.1.100',  # IP address
    port=502,               # Default Modbus port
    timeout=3,              # Socket timeout
    retries=3,              # Retry count
    retry_on_empty=False,   # Retry on empty response
    close_comm_on_error=False,
    strict=True,            # Strict timing
    source_address=None     # Source IP (optional)
)

# Connect
if client.connect():
    print("Connected")
    # Read registers
    result = client.read_holding_registers(0, 10)
    client.close()
else:
    print("Connection failed")

Connection Management

Auto-reconnect Client

class ReconnectingModbusClient:
    """TCP client with automatic reconnection."""
    
    def __init__(self, host, port=502, timeout=3):
        self.host = host
        self.port = port
        self.timeout = timeout
        self.client = None
        self._connect()
    
    def _connect(self):
        """Create and connect client."""
        self.client = ModbusTcpClient(
            self.host,
            port=self.port,
            timeout=self.timeout
        )
        return self.client.connect()
    
    def _ensure_connected(self):
        """Ensure client is connected."""
        if not self.client or not self.client.is_socket_open():
            print(f"Reconnecting to {self.host}:{self.port}")
            if not self._connect():
                raise ConnectionError(f"Cannot connect to {self.host}:{self.port}")
    
    def read_holding_registers(self, address, count, slave=1):
        """Read with automatic reconnection."""
        self._ensure_connected()
        try:
            result = self.client.read_holding_registers(address, count, slave)
            if result.isError():
                print(f"Modbus error: {result}")
            return result
        except Exception as e:
            print(f"Read failed: {e}")
            self.client.close()
            self.client = None
            raise
    
    def close(self):
        """Close connection."""
        if self.client:
            self.client.close()

# Use it
client = ReconnectingModbusClient('192.168.1.100')
for i in range(100):
    result = client.read_holding_registers(0, 10)
    time.sleep(1)
client.close()

Context Manager

from contextlib import contextmanager

@contextmanager
def modbus_client(host, port=502):
    """Context manager for Modbus TCP client."""
    client = ModbusTcpClient(host, port=port)
    try:
        if not client.connect():
            raise ConnectionError(f"Cannot connect to {host}:{port}")
        yield client
    finally:
        client.close()

# Use it
with modbus_client('192.168.1.100') as client:
    result = client.read_holding_registers(0, 10)
    print(result.registers)

Connection Pool

from queue import Queue
import threading

class ModbusConnectionPool:
    """Connection pool for multiple clients."""
    
    def __init__(self, host, port=502, pool_size=5):
        self.host = host
        self.port = port
        self.pool = Queue(maxsize=pool_size)
        self.lock = threading.Lock()
        
        # Create initial connections
        for _ in range(pool_size):
            client = ModbusTcpClient(host, port=port)
            if client.connect():
                self.pool.put(client)
    
    def get_client(self, timeout=10):
        """Get client from pool."""
        client = self.pool.get(timeout=timeout)
        
        # Check if still connected
        if not client.is_socket_open():
            client.connect()
        
        return client
    
    def return_client(self, client):
        """Return client to pool."""
        self.pool.put(client)
    
    def close_all(self):
        """Close all connections."""
        while not self.pool.empty():
            client = self.pool.get()
            client.close()

# Use connection pool
pool = ModbusConnectionPool('192.168.1.100', pool_size=3)

def worker():
    client = pool.get_client()
    try:
        result = client.read_holding_registers(0, 10)
        print(result.registers)
    finally:
        pool.return_client(client)

# Run multiple workers
threads = [threading.Thread(target=worker) for _ in range(10)]
for t in threads:
    t.start()
for t in threads:
    t.join()

pool.close_all()

Timeout Handling

import socket
from pymodbus.exceptions import ConnectionException, ModbusIOException

def read_with_timeout(host, address, count, timeout=5):
    """Read with custom timeout handling."""
    client = ModbusTcpClient(
        host,
        port=502,
        timeout=timeout
    )
    
    try:
        if not client.connect():
            return None, "Connection failed"
        
        # Set socket timeout
        client.comm_params.timeout = timeout
        
        result = client.read_holding_registers(address, count)
        
        if result.isError():
            return None, f"Modbus error: {result}"
        
        return result.registers, None
        
    except socket.timeout:
        return None, f"Timeout after {timeout} seconds"
    except ConnectionException as e:
        return None, f"Connection error: {e}"
    except ModbusIOException as e:
        return None, f"I/O error: {e}"
    finally:
        client.close()

# Use it
data, error = read_with_timeout('192.168.1.100', 0, 10, timeout=2)
if error:
    print(f"Failed: {error}")
else:
    print(f"Data: {data}")

Multiple Device Management

class ModbusDeviceManager:
    """Manage multiple Modbus TCP devices."""
    
    def __init__(self):
        self.devices = {}
    
    def add_device(self, name, host, port=502, slave=1):
        """Add device to manager."""
        self.devices[name] = {
            'host': host,
            'port': port,
            'slave': slave,
            'client': None
        }
    
    def connect(self, name):
        """Connect to specific device."""
        if name not in self.devices:
            return False
        
        device = self.devices[name]
        client = ModbusTcpClient(device['host'], port=device['port'])
        
        if client.connect():
            device['client'] = client
            return True
        return False
    
    def read_registers(self, name, address, count):
        """Read from specific device."""
        if name not in self.devices:
            return None
        
        device = self.devices[name]
        if not device['client']:
            if not self.connect(name):
                return None
        
        return device['client'].read_holding_registers(
            address, count, device['slave']
        )
    
    def disconnect_all(self):
        """Disconnect all devices."""
        for device in self.devices.values():
            if device['client']:
                device['client'].close()
                device['client'] = None

# Use it
manager = ModbusDeviceManager()
manager.add_device('plc1', '192.168.1.10', slave=1)
manager.add_device('plc2', '192.168.1.11', slave=1)
manager.add_device('meter', '192.168.1.20', slave=2)

# Read from different devices
plc1_data = manager.read_registers('plc1', 0, 10)
plc2_data = manager.read_registers('plc2', 100, 5)
meter_data = manager.read_registers('meter', 0, 20)

manager.disconnect_all()

TLS/SSL Secure Connection

import ssl

# Create SSL context
ssl_context = ssl.create_default_context()
ssl_context.check_hostname = False
ssl_context.verify_mode = ssl.CERT_NONE

# Create secure client
client = ModbusTcpClient(
    '192.168.1.100',
    port=802,  # Common secure Modbus port
    sslctx=ssl_context,
    certfile='client.crt',  # Optional client cert
    keyfile='client.key'     # Optional client key
)

if client.connect():
    result = client.read_holding_registers(0, 10)
    client.close()

Performance Optimization

Keep-Alive Connection

class PersistentModbusClient:
    """Keep connection alive for better performance."""
    
    def __init__(self, host, port=502):
        self.client = ModbusTcpClient(host, port=port)
        self.connected = False
        self.last_activity = 0
        self.keepalive_interval = 30  # seconds
    
    def _keepalive(self):
        """Send keepalive to maintain connection."""
        if time.time() - self.last_activity > self.keepalive_interval:
            try:
                # Read one register as keepalive
                self.client.read_holding_registers(0, 1)
                self.last_activity = time.time()
            except:
                self.connected = False
    
    def read(self, address, count):
        """Read with keepalive."""
        if not self.connected:
            self.connected = self.client.connect()
        
        if self.connected:
            self._keepalive()
            result = self.client.read_holding_registers(address, count)
            self.last_activity = time.time()
            return result
        return None

Batch Operations

def batch_read(client, operations):
    """Perform multiple reads efficiently."""
    results = {}
    
    for name, address, count in operations:
        start = time.time()
        result = client.read_holding_registers(address, count)
        elapsed = time.time() - start
        
        results[name] = {
            'data': result.registers if not result.isError() else None,
            'error': str(result) if result.isError() else None,
            'time': elapsed
        }
    
    return results

# Define batch operations
operations = [
    ('temperature', 100, 1),
    ('pressure', 101, 1),
    ('flow_rate', 102, 2),
    ('status', 200, 1),
]

# Execute batch
with modbus_client('192.168.1.100') as client:
    results = batch_read(client, operations)
    
    for name, data in results.items():
        if data['data']:
            print(f"{name}: {data['data']} ({data['time']:.3f}s)")
        else:
            print(f"{name}: ERROR - {data['error']}")

Network Diagnostics

import ping3

def diagnose_connection(host, port=502):
    """Diagnose connection issues."""
    print(f"Diagnosing {host}:{port}")
    
    # Check if host is reachable
    ping_time = ping3.ping(host)
    if ping_time is None:
        print(f"✗ Host unreachable")
        return False
    print(f"✓ Ping: {ping_time*1000:.1f}ms")
    
    # Check if port is open
    import socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(3)
    result = sock.connect_ex((host, port))
    sock.close()
    
    if result == 0:
        print(f"✓ Port {port} open")
    else:
        print(f"✗ Port {port} closed")
        return False
    
    # Try Modbus connection
    client = ModbusTcpClient(host, port=port, timeout=3)
    if client.connect():
        print(f"✓ Modbus connection successful")
        
        # Try reading
        result = client.read_holding_registers(0, 1)
        if not result.isError():
            print(f"✓ Modbus communication working")
        else:
            print(f"⚠ Modbus error: {result}")
        
        client.close()
        return True
    else:
        print(f"✗ Modbus connection failed")
        return False

# Diagnose
diagnose_connection('192.168.1.100')

Most industrial networks use static IPs. Always document IP addresses and port numbers.

Error Recovery

class RobustModbusClient:
    """Client with comprehensive error handling."""
    
    def __init__(self, host, port=502):
        self.host = host
        self.port = port
        self.client = None
        self.error_count = 0
        self.max_errors = 3
    
    def read_safe(self, address, count, slave=1):
        """Read with full error recovery."""
        try:
            # Ensure connected
            if not self.client:
                self.client = ModbusTcpClient(self.host, self.port)
                if not self.client.connect():
                    raise ConnectionError("Cannot connect")
            
            # Read data
            result = self.client.read_holding_registers(address, count, slave)
            
            # Check result
            if result.isError():
                self.error_count += 1
                if self.error_count >= self.max_errors:
                    # Too many errors, reset connection
                    self.reset()
                    raise Exception(f"Too many errors: {result}")
                return None
            
            # Success, reset error count
            self.error_count = 0
            return result.registers
            
        except Exception as e:
            print(f"Error: {e}")
            self.reset()
            return None
    
    def reset(self):
        """Reset connection."""
        if self.client:
            try:
                self.client.close()
            except:
                pass
            self.client = None
        self.error_count = 0

Next Steps