Skip to content

get_cipher_info

certmonitor.core.CertMonitor.get_cipher_info

get_cipher_info() -> Dict[str, Any]

Retrieve and structure the cipher information of the SSL/TLS connection.

Source code in certmonitor/core.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
def get_cipher_info(self) -> Dict[str, Any]:
    """Retrieve and structure the cipher information of the SSL/TLS connection."""
    raw_cipher = self._fetch_raw_cipher()

    # Check if raw_cipher is an error response
    if isinstance(raw_cipher, dict) and "error" in raw_cipher:
        return raw_cipher

    # If raw_cipher is not an error, it should be a tuple of 3 elements
    if not isinstance(raw_cipher, tuple) or len(raw_cipher) != 3:
        return self.error_handler.handle_error(
            "CipherInfoError", "Unexpected cipher info format", self.host, self.port
        )

    cipher_suite, protocol_version, key_bit_length = raw_cipher
    parsed_cipher: Dict[str, str] = parse_cipher_suite(cipher_suite)

    result: Dict[str, Any] = {
        "cipher_suite": {
            "name": cipher_suite,
            "encryption_algorithm": parsed_cipher["encryption"],
            "message_authentication_code": parsed_cipher["mac"],
        },
        "protocol_version": protocol_version,
        "key_bit_length": key_bit_length,
    }

    if protocol_version == "TLSv1.3":
        result["cipher_suite"]["key_exchange_algorithm"] = (
            "Not applicable (TLS 1.3 uses ephemeral key exchange by default)"
        )
    else:
        result["cipher_suite"]["key_exchange_algorithm"] = parsed_cipher[
            "key_exchange"
        ]

    return result

API Reference: CertMonitor

certmonitor.core.CertMonitor

CertMonitor(host: str, port: int = 443, enabled_validators: Optional[List[str]] = None)

Class for monitoring and retrieving certificate details from a given host.

Initialize the CertMonitor with the specified host and port.

Source code in certmonitor/core.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
def __init__(
    self,
    host: str,
    port: int = 443,
    enabled_validators: Optional[List[str]] = None,
):
    """Initialize the CertMonitor with the specified host and port."""
    self.host = host
    self.port = port
    self.is_ip = self._is_ip_address(host)
    self.der = None
    self.pem = None
    self.cert_info = None
    self.cert_data: Dict[str, Any] = {}
    self.public_key_der = None
    self.public_key_pem = None
    self.validators = VALIDATORS
    self.enabled_validators = (
        enabled_validators
        if enabled_validators is not None
        else config.ENABLED_VALIDATORS
    )
    self.error_handler = ErrorHandler()
    self.handler: Optional[BaseProtocolHandler] = None
    self.protocol: Optional[str] = None
    self.connected = False

cert_data instance-attribute

cert_data: Dict[str, Any] = {}

cert_info instance-attribute

cert_info = None

connected instance-attribute

connected = False

der instance-attribute

der = None

enabled_validators instance-attribute

enabled_validators = enabled_validators if enabled_validators is not None else ENABLED_VALIDATORS

error_handler instance-attribute

error_handler = ErrorHandler()

handler instance-attribute

handler: Optional[BaseProtocolHandler] = None

host instance-attribute

host = host

is_ip instance-attribute

is_ip = _is_ip_address(host)

pem instance-attribute

pem = None

port instance-attribute

port = port

protocol instance-attribute

protocol: Optional[str] = None

public_key_der instance-attribute

public_key_der = None

public_key_pem instance-attribute

public_key_pem = None

validators instance-attribute

validators = VALIDATORS

__enter__

__enter__() -> CertMonitor

Enter the runtime context related to this object.

Source code in certmonitor/core.py
51
52
53
54
def __enter__(self) -> "CertMonitor":
    """Enter the runtime context related to this object."""
    self.connect()
    return self

__exit__

__exit__(exc_type: Any, exc_value: Any, traceback: Any) -> None

Exit the runtime context related to this object.

Source code in certmonitor/core.py
56
57
58
def __exit__(self, exc_type: Any, exc_value: Any, traceback: Any) -> None:
    """Exit the runtime context related to this object."""
    self.close()

close

close() -> None

Close the connection and reset the handler.

Source code in certmonitor/core.py
 96
 97
 98
 99
100
def close(self) -> None:
    """Close the connection and reset the handler."""
    if self.handler:
        self.handler.close()
    self.handler = None

connect

connect() -> Optional[Dict[str, Any]]

Establishes a connection to the host if not already connected.

Source code in certmonitor/core.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
def connect(self) -> Optional[Dict[str, Any]]:
    """Establishes a connection to the host if not already connected."""
    if self.connected:
        logging.debug("Already connected, skipping connection attempt")
        return None

    protocol_result = self.detect_protocol()
    if isinstance(protocol_result, dict) and "error" in protocol_result:
        return protocol_result

    # If we get here, protocol_result is a string
    self.protocol = cast(str, protocol_result)

    if self.protocol == "ssl":
        self.handler = SSLHandler(self.host, self.port, self.error_handler)
    elif self.protocol == "ssh":
        self.handler = SSHHandler(self.host, self.port, self.error_handler)
    else:
        return cast(
            Dict[str, Any],
            self.error_handler.handle_error(
                "ProtocolError",
                f"Unsupported protocol: {self.protocol}",
                self.host,
                self.port,
            ),
        )

    connection_result = self.handler.connect()
    if connection_result is not None:  # This means there was an error
        return connection_result

    self.connected = True
    logging.debug(f"Successfully connected to {self.host}:{self.port}")
    return None

describe_validators

describe_validators() -> Dict[str, Dict[str, Any]]

Describe every registered validator and the user args it accepts.

Reads each validator's cached _user_params (built by BaseCertValidator.__init_subclass__ / BaseCipherValidator.__init_subclass__ at class definition time) and renders a serializable description suitable for printing, logging, or feeding into a CLI --help page.

Returns:

Name Type Description
dict Dict[str, Dict[str, Any]]

Keyed by validator name. Each value contains:

  • validator_type: "cert" or "cipher".
  • doc: the validator class docstring (first line).
  • args: dict keyed by user arg name, each with annotation (string), default (the literal default value), and required (always False — every user arg must declare a default).
Example
with CertMonitor("example.com") as monitor:
    for name, info in monitor.describe_validators().items():
        print(name, info["args"])
Source code in certmonitor/core.py
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
def describe_validators(self) -> Dict[str, Dict[str, Any]]:
    """Describe every registered validator and the user args it accepts.

    Reads each validator's cached ``_user_params`` (built by
    ``BaseCertValidator.__init_subclass__`` / ``BaseCipherValidator.__init_subclass__``
    at class definition time) and renders a serializable description suitable
    for printing, logging, or feeding into a CLI ``--help`` page.

    Returns:
        dict: Keyed by validator name. Each value contains:

            - ``validator_type``: ``"cert"`` or ``"cipher"``.
            - ``doc``: the validator class docstring (first line).
            - ``args``: dict keyed by user arg name, each with ``annotation``
              (string), ``default`` (the literal default value), and
              ``required`` (always ``False`` — every user arg must declare a
              default).

    Example:
        ```python
        with CertMonitor("example.com") as monitor:
            for name, info in monitor.describe_validators().items():
                print(name, info["args"])
        ```
    """
    import inspect

    described: Dict[str, Dict[str, Any]] = {}
    for name, validator in self.validators.items():
        user_params = getattr(validator, "_user_params", {}) or {}
        args_info: Dict[str, Dict[str, Any]] = {}
        for param_name, param in user_params.items():
            # ``str()`` renders both plain classes and parameterized
            # generics; only plain classes need the ``<class 'X'>`` wrapper
            # unwrapped. Enforcement in __init_subclass__ guarantees every
            # user param has an annotation, so no empty-annotation path.
            rendered = str(param.annotation)
            if rendered.startswith("<class '") and rendered.endswith("'>"):
                rendered = rendered[len("<class '") : -len("'>")]
            args_info[param_name] = {
                "annotation": rendered.replace("typing.", ""),
                "default": param.default,
                "required": False,
            }

        doc = inspect.getdoc(validator.__class__) or ""
        described[name] = {
            "validator_type": getattr(validator, "validator_type", "cert"),
            "doc": doc.splitlines()[0] if doc else "",
            "args": args_info,
        }
    return described

detect_protocol

detect_protocol() -> Union[str, Dict[str, Any]]

Detect the protocol used by the host.

Source code in certmonitor/core.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
def detect_protocol(self) -> Union[str, Dict[str, Any]]:
    """Detect the protocol used by the host."""
    try:
        with socket.create_connection((self.host, self.port), timeout=10) as sock:
            sock.setblocking(False)
            try:
                data = sock.recv(4, socket.MSG_PEEK)
                if data.startswith(b"SSH-"):
                    return "ssh"
                elif data[0] in [22, 128, 160]:  # Common first bytes for SSL/TLS
                    return "ssl"
                else:
                    return cast(
                        Dict[str, Any],
                        self.error_handler.handle_error(
                            "ProtocolDetectionError",
                            f"Unable to determine protocol. First bytes: {data.hex()}",
                            self.host,
                            self.port,
                        ),
                    )
            except socket.error:
                # If no data is received, assume it's SSL
                return "ssl"
            finally:
                sock.setblocking(True)
    except Exception as e:
        return cast(
            Dict[str, Any],
            self.error_handler.handle_error(
                "ConnectionError", str(e), self.host, self.port
            ),
        )

get_cert_info

get_cert_info() -> Dict[str, Any]

Retrieves and structures the certificate details.

Source code in certmonitor/core.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
def get_cert_info(self) -> Dict[str, Any]:
    """Retrieves and structures the certificate details."""
    if not self.cert_info:
        try:
            connection_result = self._ensure_connection()
            if connection_result is not None:  # Connection failed
                return connection_result

            cert_data = self._fetch_raw_cert()

            if isinstance(cert_data, dict) and "error" in cert_data:
                logging.error(f"Error in fetching raw certificate: {cert_data}")
                return cert_data

            # The _fetch_raw_cert already sets self.cert_data, self.public_key_der, self.public_key_pem
            # We just need to structure the cert_info part
            self.cert_info = self._to_structured_dict(cert_data["cert_info"])
            # Update the cert_data with the structured version
            if not hasattr(self, "cert_data") or not self.cert_data:
                self.cert_data = {}
            self.cert_data["cert_info"] = self.cert_info
            logging.debug("Certificate info retrieved and structured")
        except Exception as e:
            logging.error(f"Error while getting certificate info: {e}")
            return cast(
                Dict[str, Any],
                self.error_handler.handle_error(
                    "UnknownError", str(e), self.host, self.port
                ),
            )

    return self.cert_info if self.cert_info is not None else {}

get_cipher_info

get_cipher_info() -> Dict[str, Any]

Retrieve and structure the cipher information of the SSL/TLS connection.

Source code in certmonitor/core.py
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
def get_cipher_info(self) -> Dict[str, Any]:
    """Retrieve and structure the cipher information of the SSL/TLS connection."""
    raw_cipher = self._fetch_raw_cipher()

    # Check if raw_cipher is an error response
    if isinstance(raw_cipher, dict) and "error" in raw_cipher:
        return raw_cipher

    # If raw_cipher is not an error, it should be a tuple of 3 elements
    if not isinstance(raw_cipher, tuple) or len(raw_cipher) != 3:
        return self.error_handler.handle_error(
            "CipherInfoError", "Unexpected cipher info format", self.host, self.port
        )

    cipher_suite, protocol_version, key_bit_length = raw_cipher
    parsed_cipher: Dict[str, str] = parse_cipher_suite(cipher_suite)

    result: Dict[str, Any] = {
        "cipher_suite": {
            "name": cipher_suite,
            "encryption_algorithm": parsed_cipher["encryption"],
            "message_authentication_code": parsed_cipher["mac"],
        },
        "protocol_version": protocol_version,
        "key_bit_length": key_bit_length,
    }

    if protocol_version == "TLSv1.3":
        result["cipher_suite"]["key_exchange_algorithm"] = (
            "Not applicable (TLS 1.3 uses ephemeral key exchange by default)"
        )
    else:
        result["cipher_suite"]["key_exchange_algorithm"] = parsed_cipher[
            "key_exchange"
        ]

    return result

get_enabled_validators

get_enabled_validators() -> List[str]

Get the list of validators enabled for this CertMonitor instance.

Returns:

Type Description
List[str]

List[str]: A list of enabled validator names for this instance.

Source code in certmonitor/core.py
710
711
712
713
714
715
716
717
718
719
def get_enabled_validators(self) -> List[str]:
    """
    Get the list of validators enabled for this CertMonitor instance.

    Returns:
        List[str]: A list of enabled validator names for this instance.
    """
    return (
        self.enabled_validators.copy()
    )  # Return a copy to prevent external modification

get_public_key_der

get_public_key_der() -> Union[bytes, Dict[str, Any], None]

Return the public key in DER format.

Source code in certmonitor/core.py
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
def get_public_key_der(self) -> Union[bytes, Dict[str, Any], None]:
    """Return the public key in DER format."""
    if self.protocol != "ssl":
        return self.error_handler.handle_error(
            "ProtocolError",
            "Public key extraction is only available for SSL/TLS connections",
            self.host,
            self.port,
        )

    connection_result = self._ensure_connection()
    if connection_result is not None:  # Connection failed
        return connection_result

    if self.public_key_der is None:
        # Trigger certificate fetching which will also extract public keys
        cert_data = self._fetch_raw_cert()
        if isinstance(cert_data, dict) and "error" in cert_data:
            return cert_data

    return self.public_key_der

get_public_key_pem

get_public_key_pem() -> Union[str, Dict[str, Any], None]

Return the public key in PEM format.

Source code in certmonitor/core.py
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
def get_public_key_pem(self) -> Union[str, Dict[str, Any], None]:
    """Return the public key in PEM format."""
    if self.protocol != "ssl":
        return self.error_handler.handle_error(
            "ProtocolError",
            "Public key extraction is only available for SSL/TLS connections",
            self.host,
            self.port,
        )

    connection_result = self._ensure_connection()
    if connection_result is not None:  # Connection failed
        return connection_result

    if self.public_key_pem is None:
        # Trigger certificate fetching which will also extract public keys
        cert_data = self._fetch_raw_cert()
        if isinstance(cert_data, dict) and "error" in cert_data:
            return cert_data

    return self.public_key_pem

get_raw_der

get_raw_der() -> Union[bytes, Dict[str, Any]]

Return the raw DER format of the certificate.

Source code in certmonitor/core.py
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
def get_raw_der(self) -> Union[bytes, Dict[str, Any]]:
    """Return the raw DER format of the certificate."""
    if self.protocol != "ssl":
        return cast(
            Dict[str, Any],
            self.error_handler.handle_error(
                "ProtocolError",
                "DER format is only available for SSL/TLS connections",
                self.host,
                self.port,
            ),
        )

    connection_result = self._ensure_connection()
    if connection_result is not None:  # Connection failed
        return connection_result

    if self.der is None:
        if self.handler is None:
            return cast(
                Dict[str, Any],
                self.error_handler.handle_error(
                    "ConnectionError",
                    "Handler is not initialized",
                    self.host,
                    self.port,
                ),
            )

        cert_data = self.handler.fetch_raw_cert()
        if isinstance(cert_data, dict) and "error" in cert_data:
            return cert_data
        self.der = cert_data.get("der")

    # Return the DER or empty bytes if None
    return self.der if self.der is not None else b""

get_raw_pem

get_raw_pem() -> Union[str, Dict[str, Any]]

Return the raw PEM format of the certificate.

Source code in certmonitor/core.py
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def get_raw_pem(self) -> Union[str, Dict[str, Any]]:
    """Return the raw PEM format of the certificate."""
    if self.protocol != "ssl":
        return cast(
            Dict[str, Any],
            self.error_handler.handle_error(
                "ProtocolError",
                "PEM format is only available for SSL/TLS connections",
                self.host,
                self.port,
            ),
        )

    connection_result = self._ensure_connection()
    if connection_result is not None:  # Connection failed
        return connection_result

    if self.pem is None:
        if self.handler is None:
            return cast(
                Dict[str, Any],
                self.error_handler.handle_error(
                    "ConnectionError",
                    "Handler is not initialized",
                    self.host,
                    self.port,
                ),
            )

        cert_data = self.handler.fetch_raw_cert()
        if isinstance(cert_data, dict) and "error" in cert_data:
            return cert_data
        self.pem = cert_data.get("pem")

    # Return the PEM or empty string if None
    return self.pem if self.pem is not None else ""

list_validators

list_validators() -> List[str]

Get a list of all available validators that can be used.

Returns:

Type Description
List[str]

List[str]: A list of all registered validator names.

Source code in certmonitor/core.py
721
722
723
724
725
726
727
728
729
730
def list_validators(self) -> List[str]:
    """
    Get a list of all available validators that can be used.

    Returns:
        List[str]: A list of all registered validator names.
    """
    from .validators import list_validators as _list_validators

    return _list_validators()

validate

validate(validator_args: Optional[Dict[str, Any]] = None) -> Dict[str, Any]

Validates the target host by running all enabled validators.

This method: 1. Checks if all requested validators are implemented. 2. Separates validators into cert-based and cipher-based groups. 3. Fetches cert_info and cipher_info as needed. 4. Runs each validator with the appropriate arguments. 5. Returns a dictionary of validation results.

Parameters:

Name Type Description Default
validator_args dict

Additional arguments for specific validators. Example: { "subject_alt_names": ["example.com", "test.com"] }

None

Returns:

Name Type Description
dict Dict[str, Any]

A dictionary keyed by validator name, each value being the result of that validator.

Example

results = monitor.validate() print(results["expiration"]) # Output for expiration validator print(results["weak_cipher"]) # Output for weak cipher validator

Source code in certmonitor/core.py
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
def validate(
    self, validator_args: Optional[Dict[str, Any]] = None
) -> Dict[str, Any]:
    """
    Validates the target host by running all enabled validators.

    This method:
    1. Checks if all requested validators are implemented.
    2. Separates validators into cert-based and cipher-based groups.
    3. Fetches cert_info and cipher_info as needed.
    4. Runs each validator with the appropriate arguments.
    5. Returns a dictionary of validation results.

    Args:
        validator_args (dict, optional): Additional arguments for specific validators.
            Example:
            {
                "subject_alt_names": ["example.com", "test.com"]
            }

    Returns:
        dict: A dictionary keyed by validator name, each value being the result of that validator.

    Example:
        results = monitor.validate()
        print(results["expiration"])      # Output for expiration validator
        print(results["weak_cipher"])     # Output for weak cipher validator
    """
    results = {}

    # Check for unknown validators
    for requested_validator in self.enabled_validators:
        if requested_validator not in self.validators:
            results[requested_validator] = {
                "is_valid": False,
                "reason": f"Validator '{requested_validator}' is not implemented.",
            }

    cert_validators = [
        validator
        for name, validator in self.validators.items()
        if name in self.enabled_validators
        and getattr(validator, "validator_type", "cert") == "cert"
        and name not in results  # exclude already marked unknown validators
    ]

    cipher_validators = [
        validator
        for name, validator in self.validators.items()
        if name in self.enabled_validators
        and getattr(validator, "validator_type", "cert") == "cipher"
        and name not in results
    ]

    # Certificate-based validations
    if cert_validators:
        cert_data = getattr(self, "cert_data", None)
        if not cert_data or (isinstance(cert_data, dict) and "error" in cert_data):
            error_reason = (
                cert_data["error"]
                if isinstance(cert_data, dict) and "error" in cert_data
                else "Certificate data is missing due to a connection or retrieval error."
            )
            for validator in cert_validators:
                results[validator.name] = {
                    "is_valid": False,
                    "reason": f"Certificate-based validation could not be performed: {error_reason}",
                }
        else:
            for validator in cert_validators:
                results[validator.name] = self._invoke_validator(
                    validator, (cert_data, self.host, self.port), validator_args
                )

    # Cipher-based validations
    if cipher_validators:
        cipher_info = self.get_cipher_info()
        if isinstance(cipher_info, dict) and "error" in cipher_info:
            logging.error(
                "Skipping cipher-based validations due to cipher info retrieval error."
            )
        else:
            for validator in cipher_validators:
                results[validator.name] = self._invoke_validator(
                    validator, (cipher_info, self.host, self.port), validator_args
                )

    return results