net.py 17.9 KB
Newer Older
1
"""
2
About: ComNetsEmu Network Module.
3 4
"""

5 6
import http.server
import json
7
import os
8
import os.path
9
import shutil
Zuo Xiang's avatar
Zuo Xiang committed
10
import threading
11
from functools import partial
12 13 14
from time import sleep

import docker
Zuo Xiang's avatar
Zuo Xiang committed
15

Zuo Xiang's avatar
Zuo Xiang committed
16
from comnetsemu.cli import spawnXtermDocker
17
from comnetsemu.exceptions import InvalidDockerArgs
18
from comnetsemu.node import APPContainer, DockerHost
19
from mininet.log import debug, error, info
20
from mininet.net import Mininet
21
from mininet.term import cleanUpScreens, makeTerms
22
from mininet.util import BaseString
23 24

# ComNetsEmu version: should be consistent with README and LICENSE
Zuo Xiang's avatar
Zuo Xiang committed
25
VERSION = "0.1.12"
26

Zuo Xiang's avatar
Zuo Xiang committed
27
APPCONTAINERMANGER_MOUNTED_DIR = "/tmp/comnetsemu/appcontainermanger"
28

29 30

class Containernet(Mininet):
31 32
    """Network emulation with containerized network nodes."""

33
    def __init__(self, **params):
Zuo Xiang's avatar
Zuo Xiang committed
34 35 36
        """Create a Containernet object with the same parameters provided by
        Mininet.
        """
37
        self._appcontainers = list()
38 39
        Mininet.__init__(self, **params)

40
    def addDockerHost(self, name: str, **params):  # pragma: no cover
41 42
        """Wrapper for addHost method that adds a Docker container as a host.

43 44
        :param name: Name of the host.
        :type name: str
45 46
        """
        return self.addHost(name, cls=DockerHost, **params)
47

Zuo Xiang's avatar
Zuo Xiang committed
48
    def startTerms(self):  # pragma: no cover
49
        "Start a terminal for each node."
50
        if "DISPLAY" not in os.environ:
51 52
            error("Error starting terms: Cannot connect to display\n")
            return
53
        info("*** Running terms on %s\n" % os.environ["DISPLAY"])
54
        cleanUpScreens()
55 56
        self.terms += makeTerms(self.controllers, "controller")
        self.terms += makeTerms(self.switches, "switch")
57 58
        dhosts = [h for h in self.hosts if isinstance(h, DockerHost)]
        for d in dhosts:
Zuo Xiang's avatar
Zuo Xiang committed
59
            self.terms.append(spawnXtermDocker(d.name))
60
        rest = [h for h in self.hosts if h not in dhosts]
61
        self.terms += makeTerms(rest, "host")
62

Zuo Xiang's avatar
Zuo Xiang committed
63
    def addLinkNamedIfce(self, src, dst, *args, **kwargs):  # pragma: no cover
64
        """Add a link with two named interfaces.
65 66 67 68 69 70
           - Name of interface 1: src-dst
           - Name of interface 2: dst-src
        """
        # Accept node objects or names
        src = src if not isinstance(src, BaseString) else self[src]
        dst = dst if not isinstance(dst, BaseString) else self[dst]
71 72 73 74 75 76 77 78
        self.addLink(
            src,
            dst,
            intfName1="-".join((src.name, dst.name)),
            intfName2="-".join((dst.name, src.name)),
            *args,
            **kwargs,
        )
79 80


81
class APPContainerManagerRequestHandler(http.server.BaseHTTPRequestHandler):
82 83 84 85 86 87
    """Basic implementation of a REST API for app containers.

    Python's built-in http server only does basic security checks and this class
    has basic and limited sanity checks on the requests. Designed only for
    teaching.
    """
88

89
    _container_resource_path = "/containers"
90

91
    def __init__(self, appcontainermanager, enable_log=True, *args, **kargs):
92
        self.mgr = appcontainermanager
93
        self.enable_log = enable_log
94 95 96 97 98 99
        super(APPContainerManagerRequestHandler, self).__init__(*args, **kargs)

    def _send_bad_request(self):
        self.send_response(400)
        self.end_headers()

100 101 102 103 104 105
    def log_message(self, format, *args):
        if not self.enable_log:
            return
        else:  # pragma no cover
            super(APPContainerManagerRequestHandler, self).log_message(format, *args)

106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141
    def do_GET(self):
        if self.path == self._container_resource_path:
            self.send_response(200)
            self.end_headers()
            ret = json.dumps(self.mgr.getAllContainers()).encode("utf-8")
            self.wfile.write(ret)
        else:
            self._send_bad_request()

    @staticmethod
    def _post_sanity_check(post_dict):
        # Check for essential keys.
        for k in ["name", "dhost", "dimage", "dcmd", "docker_args"]:
            if k not in post_dict:
                return False
        else:
            return True

    def do_POST(self):
        """Create a new APP container."""
        if self.path == self._container_resource_path:
            content_length = int(self.headers.get("content-length", 0))
            if content_length == 0:
                self._send_bad_request()
            else:
                post_data = self.rfile.read(content_length).decode("utf-8")
                container_para = json.loads(post_data)
                if not self._post_sanity_check(container_para):
                    self._send_bad_request()
                else:
                    self.mgr.addContainer(**container_para)
                    self.send_response(200)
                    self.end_headers()
        else:
            self._send_bad_request()

142 143 144 145
    def _delete_sanity_check(self, container_name):
        # Check if container exists.
        c = self.mgr.getContainerInstance(container_name, None)
        return True if c else False
146 147

    def do_DELETE(self):
148 149 150 151
        paths = os.path.split(self.path)
        if len(paths) == 2 and paths[0] == self._container_resource_path:
            container_name = paths[1]
            if not self._delete_sanity_check(container_name):
152 153
                self._send_bad_request()
            else:
154 155 156
                self.mgr.removeContainer(container_name)
                self.send_response(200)
                self.end_headers()
157 158 159 160 161
        else:
            self._send_bad_request()


class APPContainerManager:
Zuo Xiang's avatar
Zuo Xiang committed
162 163
    """Manager for application containers (sibling containers) deployed on
    Mininet hosts.
164 165 166 167

    - To make is simple. It uses docker-py APIs to manage internal containers
      from host system.

168 169 170
    - Internal methods (starts with an underscore) should be documented after
      tests and before stable releases.

171 172
    Ref:
        [1] https://docker-py.readthedocs.io/en/stable/containers.html
173 174
    """

175 176 177 178 179 180 181 182 183 184
    reserved_docker_args = [
        "init",
        "tty",
        "detach",
        "labels",
        "security_opt",
        "cgroup_parent",
        "network_mode",
    ]

185
    docker_args_default = {
186
        "init": True,
187
        "tty": True,  # -t
188
        "detach": True,  # -d
189
        # Used for cleanups
190
        "labels": {"comnetsemu": "dockercontainer"},
191 192
        # Required for CRIU checkpoint
        "security_opt": ["seccomp:unconfined"],
193 194 195
    }

    docker_volumes_default = {
196
        # Shared directory in host OS
197 198 199 200
        APPCONTAINERMANGER_MOUNTED_DIR: {
            "bind": APPCONTAINERMANGER_MOUNTED_DIR,
            "mode": "rw",
        }
201 202
    }

203 204 205
    # Default delay between tries for Docker API
    retry_delay_secs = 0.1

206
    def __init__(self, net: Mininet):
207
        """Init the APPContainerManager.
208

Zuo Xiang's avatar
Zuo Xiang committed
209 210
        :param net (Mininet): The mininet object, used to manage hosts via
        Mininet's API.
211 212 213 214
        """
        self.net = net
        self.dclt = docker.from_env()

215 216 217
        # Following resources can be shared by main and httpd threads.
        # A simple lock is used.
        self._container_queue_lock = threading.Lock()
Zuo Xiang's avatar
Zuo Xiang committed
218
        self._container_queue = list()
219
        # Fast search for added containers.
Zuo Xiang's avatar
Zuo Xiang committed
220 221
        self._name_container_map = dict()

222 223 224
        self._http_server_started = False
        self._http_server_thread = None

Zuo Xiang's avatar
Zuo Xiang committed
225
        os.makedirs(APPCONTAINERMANGER_MOUNTED_DIR, exist_ok=True)
226 227

    def _createContainer(self, name, dhost, dimage, dcmd, docker_args):
Zuo Xiang's avatar
Zuo Xiang committed
228
        """Create a Docker container."""
229 230 231 232 233
        if "volumes" in docker_args:
            debug(
                f"Update the default volumes {self.docker_volumes_default} to the already given volumes config.\n"
            )
            docker_args["volumes"].update(self.docker_volumes_default)
234 235
        for key in self.reserved_docker_args:
            if key in docker_args:
236
                error(
237
                    f"Given argument: {key} is invalid. This key is reserved by the APPContainerManager for internal usage."
238
                )
239 240
                raise InvalidDockerArgs

241 242 243 244 245 246 247 248
        docker_args.update(self.docker_args_default)
        docker_args["name"] = name
        docker_args["image"] = dimage
        docker_args["cgroup_parent"] = "/docker/{}".format(dhost.dins.id)
        docker_args["command"] = dcmd
        docker_args["network_mode"] = "container:{}".format(dhost.dins.id)

        ret = self.dclt.containers.create(**docker_args)
249 250
        return ret

251
    def _waitContainerStart(self, name):  # pragma: no cover
252
        """Wait for container to start up running"""
Zuo Xiang's avatar
Zuo Xiang committed
253
        while not self._getDockerIns(name):
254 255
            debug("Failed to get container:%s" % (name))
            sleep(self.retry_delay_secs)
Zuo Xiang's avatar
Zuo Xiang committed
256
        dins = self._getDockerIns(name)
257 258

        while not dins.attrs["State"]["Running"]:
259
            sleep(self.retry_delay_secs)
260 261
            dins.reload()  # refresh information in 'attrs'

262
    def _waitContainerRemoved(self, name):  # pragma: no cover
263
        """Wait for container to be removed"""
Zuo Xiang's avatar
Zuo Xiang committed
264
        while self._getDockerIns(name):
265 266
            sleep(self.retry_delay_secs)

Zuo Xiang's avatar
Zuo Xiang committed
267
    def _getDockerIns(self, name):
268
        """Get the APPContainer instance by name.
269 270 271 272 273 274 275 276 277

        :param name (str): Name of the container
        """
        try:
            dins = self.dclt.containers.get(name)
        except docker.errors.NotFound:
            return None
        return dins

278 279
    def getContainerInstance(self, name: str, default=None) -> APPContainer:
        """Get the APPContainer instance with the given name.
280 281 282 283

        :param name: The name of the given container.
        :type name: str
        :param default: The default return value if not found.
284
        :rtype: APPContainer
285 286 287 288 289 290 291 292 293
        """
        with self._container_queue_lock:
            for c in self._container_queue:
                if c.name == name:
                    return c
            else:
                return default

    def getContainersDhost(self, dhost: str) -> list:
Zuo Xiang's avatar
Zuo Xiang committed
294 295
        """Get containers deployed on the given DockerHost.

296 297
        :param dhost: Name of the DockerHost.
        :type dhost: str
298
        :return: A list of APPContainer instances on given DockerHost.
299
        :rtype: list
Zuo Xiang's avatar
Zuo Xiang committed
300
        """
301 302 303 304 305 306 307 308 309 310 311
        with self._container_queue_lock:
            clist = [c.name for c in self._container_queue if c.dhost == dhost]
            return clist

    def getAllContainers(self) -> list:
        """Get a list of names of all containers in current container queue.

        :rtype: list
        """
        with self._container_queue_lock:
            return [c.name for c in self._container_queue]
Zuo Xiang's avatar
Zuo Xiang committed
312

313 314 315 316 317 318
    def addContainer(
        self,
        name: str,
        dhost: str,
        dimage: str,
        dcmd: str,
319
        docker_args: dict = None,
320
        wait: bool = True,
321
    ) -> APPContainer:
322
        """Create and run a new container inside a Mininet DockerHost.
323

324 325 326 327 328 329 330 331 332
        :param name: Name of the container.
        :type name: str
        :param dhost: Name of the host used for deployment.
        :type name: str
        :param dimage: Name of the docker image.
        :type dimage: str
        :param dcmd: Command to run after the creation.
        :type dcmd: str
        :param docker_args: All other keyword arguments supported by Docker-py.
Zuo Xiang's avatar
Zuo Xiang committed
333
            e.g. CPU and memory related limitations.
334
            Some parameters are overriden for APPContainerManager's functionalities.
335 336 337
        :type docker_args: dict
        :param wait: Wait until the container has the running state if True.
        :type wait: bool
338 339

        Check cls.docker_args_default.
340

341 342
        :return: Added APPContainer instance or None if the creation process failed.
        :rtype: APPContainer
343
        """
344
        container = None
345 346
        if not docker_args:
            docker_args = dict()
347
        dhost = self.net.get(dhost)
348 349 350 351 352
        with self._container_queue_lock:
            dins = self._createContainer(name, dhost, dimage, dcmd, docker_args)
            dins.start()
            if wait:
                self._waitContainerStart(name)
353
            container = APPContainer(name, dhost.name, dimage, dins)
354 355
            self._container_queue.append(container)
            self._name_container_map[container.name] = container
356
            self.net._appcontainers.append(name)
357
            return container
358

359 360
    def removeContainer(self, name: str, wait: bool = True):
        """Remove the APP container with the given name.
361

362 363 364 365
        :param name: Name of the to be removed container.
        :type name: str
        :param wait: Wait until the container is fully removed if True.
        :type wait: bool
366
        """
367
        with self._container_queue_lock:
368
            container = self._name_container_map.get(name, None)
369 370 371
            # The container could be already removed by the user via CLI or
            # other approaches, raise the exception to let user handle this
            # situation.
372 373
            if not container:
                raise ValueError(f"Can not find container with name: {container}")
374

375 376 377
            container.dins.remove(force=True)
            if wait:
                self._waitContainerRemoved(container.name)
378 379 380
            self._container_queue.remove(container)
            self.net._appcontainers.remove(name)
            del self._name_container_map[name]
381 382

    @staticmethod
383 384
    def _calculate_cpu_percent(stats):
        """Calculate the CPU usage in percent with given stats JSON data"""
385 386
        cpu_count = len(stats["cpu_stats"]["cpu_usage"]["percpu_usage"])
        cpu_percent = 0.0
387 388 389 390 391 392
        cpu_delta = float(stats["cpu_stats"]["cpu_usage"]["total_usage"]) - float(
            stats["precpu_stats"]["cpu_usage"]["total_usage"]
        )
        system_delta = float(stats["cpu_stats"]["system_cpu_usage"]) - float(
            stats["precpu_stats"]["system_cpu_usage"]
        )
393 394 395
        if system_delta > 0.0:
            cpu_percent = cpu_delta / system_delta * 100.0 * cpu_count

Zuo Xiang's avatar
Zuo Xiang committed
396
        if cpu_percent > 100:  # pragma: no cover
397 398 399 400
            cpu_percent = 100

        return cpu_percent

401
    def monResourceStats(
402
        self, name: str, sample_num: int = 3, sample_period: float = 1.0
403
    ) -> list:
404 405 406 407 408 409 410 411 412 413 414 415 416 417
        """Monitor the resource stats of a container within the given name.
        This function measure the CPU and memory usages sample_num times and
        sleep for sample_period between each time. All measurement results are
        returned as a list.

        :param container: Name of the container
        :type container: str
        :param sample_num: Number of samples.
        :type sample_num: int
        :param sample_period: Sleep period for each sample
        :type sample_period: float

        :return: A list of resource usages. Each item is a tuple (cpu_usg, mem_usg)
        :rtype: list
Zuo Xiang's avatar
Zuo Xiang committed
418
        :raise ValueError: container is not found
419 420
        """

421
        container = self._name_container_map.get(name, None)
422
        if not container:
423
            raise ValueError(f"Can not found container with name: {container}")
424 425 426 427

        n = 0
        usages = list()
        while n < sample_num:
428
            stats = container.getCurrentStats()
429
            mem_stats = stats["memory_stats"]
430
            mem_usg = mem_stats["usage"] / (1024 ** 2)
431
            cpu_usg = self._calculate_cpu_percent(stats)
432 433 434 435 436 437
            usages.append((cpu_usg, mem_usg))
            sleep(sample_period)
            n += 1

        return usages

Zuo Xiang's avatar
Zuo Xiang committed
438 439 440 441 442 443
    # BUG: Checkpoint inside container breaks the networking of outside
    # container if container networking mode is used.
    # def checkpoint(self, container: str) -> str:
    #     container = self._name_container_map.get(container, None)
    #     if not container:
    #         raise ValueError(f"Can not found container with name: {container}")
Zuo Xiang's avatar
Zuo Xiang committed
444
    #     ckpath = os.path.join(APPCONTAINERMANGER_MOUNTED_DIR, f"{container.name}")
Zuo Xiang's avatar
Zuo Xiang committed
445 446 447 448 449 450 451 452 453 454
    #     # MARK: Docker-py does not provide API for checkpoint and restore,
    #     # Docker CLI is directly used with subprocess as a temp workaround.
    #     subprocess.run(split(
    #         f"docker checkpoint create --checkpoint-dir={ckpath} {container.name} {container.name}"
    #     ),
    #                    check=True,
    #                    stdout=subprocess.DEVNULL,
    #                    stderr=subprocess.DEVNULL)

    #     return ckpath
455

456 457 458
    def _runHTTPServer(self, ip_addr, port, enable_log):
        """Generate HTTPServer with partial parameters and run it forever."""
        handler = partial(APPContainerManagerRequestHandler, self, enable_log)
459 460 461 462
        httpd = http.server.HTTPServer((ip_addr, port), handler)
        info(f"Start REST API server on address: {ip_addr}:{port}.\n")
        httpd.serve_forever()

463 464 465 466 467 468 469 470 471 472 473 474 475 476 477 478 479
    def runRESTServerThread(
        self, ip: str, port: int = 8000, enable_log: bool = True
    ) -> None:
        """Run the REST API server in a separate daemon thread.
        threading is used to avoid blocking the main thread in the emulation
        scripts. Since to be performed operation is more IO-bounded and this
        server should have no impact on the concurrency approach of main thread,
        the threading with a Lock is used instead of multi-processing or
        asyncio.

        :param ip: Listening IP address.
        :type ip: str
        :param port: Port number.
        :type port: int
        :param enable_log: Print logs using stdout if True.
        :type enable_log: bool
        """
480 481
        self._http_server_started = True
        self._http_server_thread = threading.Thread(
482
            target=self._runHTTPServer, args=(ip, port, enable_log)
483
        )
484 485 486 487 488 489 490 491 492 493 494 495 496
        # It will die if all non-daemon threads (including main) exist.
        self._http_server_thread.daemon = True
        self._http_server_thread.start()

    def stop(self):
        """Stop the APPContainerManager."""
        if len(self._container_queue) > 0:
            info(
                "Stop {} containers in the App container queue: {}\n".format(
                    len(self._container_queue),
                    ", ".join((c.name for c in self._container_queue)),
                )
            )
497

498 499
            # Avoid missing delete internal containers manually before stop
            for c in self._container_queue:
500
                c._terminate()
501
                c.dins.remove(force=True)
502 503

        self.dclt.close()
Zuo Xiang's avatar
Zuo Xiang committed
504 505 506
        shutil.rmtree(APPCONTAINERMANGER_MOUNTED_DIR)


507
class VNFManager(APPContainerManager):
Zuo Xiang's avatar
Zuo Xiang committed
508 509 510
    """App container for Virtualized Network Functions"""

    pass