#!/usr/bin/env python3 import http.client import json import socket import os import subprocess DOCKER_SOCKET_PATHS = [ '/var/run/docker.sock', # Default path, '$HOME/.colima/default/docker.sock', # Colima ] _docker_socket = None def find_docker_socket(): """Iterate through common paths to find the Docker Unix socket.""" global _docker_socket if _docker_socket: return _docker_socket for path in DOCKER_SOCKET_PATHS: expanded_path = os.path.expandvars(path) if os.path.exists(expanded_path): _docker_socket = expanded_path return _docker_socket raise FileNotFoundError("Docker Unix socket not found") def shorten_volume_name(volume_name): """Shorten volume names to a maximum of 30 characters.""" if len(volume_name) > 30: return volume_name[:27] + '...' return volume_name def get_volume_size(mountpoint): """Get the size of the volume using 'du -hs'.""" try: # Run 'du -hs' command to get the size of the directory result = subprocess.run(['du', '-hs', mountpoint], stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) if result.returncode != 0: # Handle errors if 'du' command fails #print(f"Error running 'du -hs' on {mountpoint}: {result.stderr.strip()}") return "---" # The output will be something like '1.4G /path/to/volume' size_str = result.stdout.split()[0] # Get the size part return size_str # Return size as printed by 'du -hs' except Exception as e: #print(f"Exception while getting volume size: {e}") return "---" def docker_api_request(endpoint): """Make a request to the Docker REST API using the Unix socket.""" socket_path = find_docker_socket() sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) sock.connect(socket_path) conn = http.client.HTTPConnection("localhost") conn.sock = sock # Use the Unix socket conn.request("GET", endpoint) response = conn.getresponse() if response.status != 200: print(f"Error: {response.status} - {response.reason}") return None data = response.read() conn.close() return json.loads(data) # Print the header print(f"{'Docker Volume Name':<30} | {'Size':<10} | {'Container'}") print('-' * 60) # Get the list of volumes volumes_data = docker_api_request("/volumes") if volumes_data: volumes = volumes_data.get('Volumes', []) volumes.sort(key=lambda volume: volume['Name']) for volume in volumes: volume_name = volume['Name'] mountpoint = volume['Mountpoint'] volume_size = get_volume_size(mountpoint) # Get containers using the volume containers_data = docker_api_request(f"/containers/json?all=1&filters={{\"volume\":[\"{volume_name}\"]}}") containers = containers_data if containers_data else [] # Check if any container is using the volume if not containers: container_names = '---' else: container_names = ', '.join(container['Names'][0].lstrip('/') for container in containers) # Shorten the volume name if necessary short_volume_name = shorten_volume_name(volume_name) # Print the volume information print(f"{short_volume_name:<30} | {volume_size:<10} | {container_names}")