Skip to content

Helpers

diamond_miner.format

format_ipv6(addr)

Convert an IPv6 UInt128 to a string. >>> from diamond_miner.format import format_ipv6 >>> format_ipv6(281470816487432) '::ffff:808:808'

Source code in diamond_miner/format.py
def format_ipv6(addr: int) -> str:
    """
    Convert an IPv6 UInt128 to a string.
        >>> from diamond_miner.format import format_ipv6
        >>> format_ipv6(281470816487432)
        '::ffff:808:808'
    """
    return str(IPv6Address(addr))

format_probe(dst_addr_v6, src_port, dst_port, ttl, protocol)

Create a Caracal probe string. Examples: >>> from diamond_miner.format import format_probe >>> format_probe(281470816487432, 24000, 33434, 1, "icmp") '::ffff:808:808,24000,33434,1,icmp'

Source code in diamond_miner/format.py
def format_probe(
    dst_addr_v6: int, src_port: int, dst_port: int, ttl: int, protocol: str
) -> str:
    """
    Create a Caracal probe string.
    Examples:
        >>> from diamond_miner.format import format_probe
        >>> format_probe(281470816487432, 24000, 33434, 1, "icmp")
        '::ffff:808:808,24000,33434,1,icmp'
    """
    return f"{format_ipv6(dst_addr_v6)},{src_port},{dst_port},{ttl},{protocol}"

diamond_miner.insert

insert_mda_probe_counts(client, measurement_id, previous_round, adaptive_eps=False, target_epsilon=DEFAULT_FAILURE_RATE, concurrent_requests=max(available_cpus() // 8, 1))

Run the Diamond-Miner algorithm and insert the resulting probes into the probes table.

Parameters:

Name Type Description Default
client ClickHouseClient

ClickHouse client.

required
measurement_id str

Measurement id.

required
previous_round int

Round on which to run the Diamond-Miner algorithm.

required
adaptive_eps bool

Set to True to handle nested load-balancers.

False
target_epsilon float

Target failure rate of the MDA algorithm.

DEFAULT_FAILURE_RATE
concurrent_requests int

Maximum number of requests to execute concurrently.

max(available_cpus() // 8, 1)
Source code in diamond_miner/insert.py
def insert_mda_probe_counts(
    client: ClickHouseClient,
    measurement_id: str,
    previous_round: int,
    adaptive_eps: bool = False,
    target_epsilon: float = DEFAULT_FAILURE_RATE,
    concurrent_requests: int = max(available_cpus() // 8, 1),
) -> None:
    """
    Run the Diamond-Miner algorithm and insert the resulting probes into the probes table.

    Args:
        client: ClickHouse client.
        measurement_id: Measurement id.
        previous_round: Round on which to run the Diamond-Miner algorithm.
        adaptive_eps: Set to `True` to handle nested load-balancers.
        target_epsilon: Target failure rate of the MDA algorithm.
        concurrent_requests: Maximum number of requests to execute concurrently.
    """
    # TODO: set filter_partial and filter_virtual to false?
    query = InsertMDAProbes(
        adaptive_eps=adaptive_eps,
        round_leq=previous_round,
        filter_partial=True,
        filter_virtual=True,
        filter_inter_round=True,
        target_epsilon=target_epsilon,
    )
    subsets = subsets_for(query, client, measurement_id)
    query.execute_concurrent(
        client, measurement_id, subsets=subsets, concurrent_requests=concurrent_requests
    )

insert_probe_counts(client, measurement_id, round_, prefixes, prefix_len_v4=DEFAULT_PREFIX_LEN_V4, prefix_len_v6=DEFAULT_PREFIX_LEN_V6)

Insert the probe counts specified by prefixes into the probes table.

Parameters:

Name Type Description Default
client ClickHouseClient

ClickHouse client.

required
measurement_id str

Measurement id.

required
round_ int

Round number for which to insert the probe counts.

required
prefixes Iterable[tuple[str, str, Iterable[int], int]]

A list of (prefix, protocol, ttls, n_probes) tuples. /32 or /128 is assumed if not specified.

required
prefix_len_v4 int

The prefix length to which the IPv4 prefixes will be split to.

DEFAULT_PREFIX_LEN_V4
prefix_len_v6 int

The prefix length to which the IPv6 prefixes will be split to.

DEFAULT_PREFIX_LEN_V6

Examples:

>>> from diamond_miner.test import client, create_tables
>>> from diamond_miner.queries import GetProbes
>>> create_tables(client, "test_probe_counts")
>>> insert_probe_counts(client, "test_probe_counts", 1, [("8.8.0.0/22", "icmp", range(2, 5), 6)])
>>> rows = sorted(GetProbes(round_eq=1).execute(client, "test_probe_counts"), key=lambda x: x["probe_dst_prefix"])
>>> len(rows)
4
>>> row = rows[0]
>>> row["probe_dst_prefix"]
'::ffff:8.8.0.0'
>>> sorted(row["probes_per_ttl"])
[[2, 6], [3, 6], [4, 6]]
>>> row = rows[1]
>>> row["probe_dst_prefix"]
'::ffff:8.8.1.0'
>>> sorted(row["probes_per_ttl"])
[[2, 6], [3, 6], [4, 6]]
Source code in diamond_miner/insert.py
def insert_probe_counts(
    client: ClickHouseClient,
    measurement_id: str,
    round_: int,
    prefixes: Iterable[tuple[str, str, Iterable[int], int]],
    prefix_len_v4: int = DEFAULT_PREFIX_LEN_V4,
    prefix_len_v6: int = DEFAULT_PREFIX_LEN_V6,
) -> None:
    """
    Insert the probe counts specified by `prefixes` into the probes table.

    Args:
        client: ClickHouse client.
        measurement_id: Measurement id.
        round_: Round number for which to insert the probe counts.
        prefixes: A list of `(prefix, protocol, ttls, n_probes)` tuples. /32 or /128 is assumed if not specified.
        prefix_len_v4: The prefix length to which the IPv4 prefixes will be split to.
        prefix_len_v6: The prefix length to which the IPv6 prefixes will be split to.

    Examples:
        >>> from diamond_miner.test import client, create_tables
        >>> from diamond_miner.queries import GetProbes
        >>> create_tables(client, "test_probe_counts")
        >>> insert_probe_counts(client, "test_probe_counts", 1, [("8.8.0.0/22", "icmp", range(2, 5), 6)])
        >>> rows = sorted(GetProbes(round_eq=1).execute(client, "test_probe_counts"), key=lambda x: x["probe_dst_prefix"])
        >>> len(rows)
        4
        >>> row = rows[0]
        >>> row["probe_dst_prefix"]
        '::ffff:8.8.0.0'
        >>> sorted(row["probes_per_ttl"])
        [[2, 6], [3, 6], [4, 6]]
        >>> row = rows[1]
        >>> row["probe_dst_prefix"]
        '::ffff:8.8.1.0'
        >>> sorted(row["probes_per_ttl"])
        [[2, 6], [3, 6], [4, 6]]
    """

    def gen() -> Iterator[bytes]:
        for prefix, protocol, ttls, n_probes in prefixes:
            protocol = PROTOCOLS[protocol]  # type: ignore
            for af, subprefix, subprefix_size in split_prefix(
                prefix, prefix_len_v4, prefix_len_v6
            ):
                yield "\n".join(
                    f'[{protocol},"{format_ipv6(subprefix)}",{ttl},{n_probes},{round_}]'
                    for ttl in ttls
                ).encode()

    InsertProbes().execute(client, measurement_id, data=gen())

diamond_miner.mda

stopping_point(k, eps=DEFAULT_FAILURE_RATE)

Return the number n_k of probes that guarantees that the probability of not detecting k outgoing load-balanced edges is lower than eps[@veitch2009failure;@jacquet2018collecter].

Examples:

>>> stopping_point(1, 0.05)
0
>>> stopping_point(2, 0.05)
6
>>> stopping_point(3, 0.05)
11
>>> stopping_point(11, 0.05)
57
>>> stopping_point(101, 0.05)
765
Note

There is a typo in the D-Miner paper: n(101) = 765, not 757.

Source code in diamond_miner/mda.py
def stopping_point(k: int, eps: float = DEFAULT_FAILURE_RATE) -> int:
    """
    Return the number `n_k` of probes that guarantees that the probability of not
    detecting `k` outgoing load-balanced edges is lower than `eps`[@veitch2009failure;@jacquet2018collecter].

    Examples:
        >>> stopping_point(1, 0.05)
        0
        >>> stopping_point(2, 0.05)
        6
        >>> stopping_point(3, 0.05)
        11
        >>> stopping_point(11, 0.05)
        57
        >>> stopping_point(101, 0.05)
        765

    Note:
        There is a typo in the D-Miner paper: n(101) = 765, not 757.
    """
    assert (k >= 1) and (0 <= eps <= 1)
    if k == 1:
        return 0
    return ceil(log(eps / k) / log((k - 1) / k))

diamond_miner.subsets

addr_to_network(addr, prefix_len_v4, prefix_len_v6)

Examples:

>>> addr_to_network("::ffff:8.8.8.0", 24, 64)
IPv6Network('::ffff:808:800/120')
>>> addr_to_network("2001:4860:4860:1234::", 24, 64)
IPv6Network('2001:4860:4860:1234::/64')
Source code in diamond_miner/subsets.py
def addr_to_network(addr: str, prefix_len_v4: int, prefix_len_v6: int) -> IPv6Network:
    """
    Examples:
        >>> addr_to_network("::ffff:8.8.8.0", 24, 64)
        IPv6Network('::ffff:808:800/120')
        >>> addr_to_network("2001:4860:4860:1234::", 24, 64)
        IPv6Network('2001:4860:4860:1234::/64')
    """
    assert ":" in addr, "`addr` must be an (IPv4-mapped) IPv6 address."
    if addr.startswith("::ffff:"):
        return IPv6Network(f"{addr}/{96+prefix_len_v4}")
    return IPv6Network(f"{addr}/{prefix_len_v6}")

is_subnet_of(a, b)

A faster version of IPv6Network.subnet_of(other).

Examples:

>>> is_subnet_of(IPv6Network("1000::/16"), IPv6Network("1000::/16"))
True
>>> is_subnet_of(IPv6Network("1000::/17"), IPv6Network("1000::/16"))
True
>>> is_subnet_of(IPv6Network("1000::/15"), IPv6Network("1000::/16"))
False
>>> is_subnet_of(IPv6Network("1000::/16"), IPv6Network("2000::/16"))
False
Source code in diamond_miner/subsets.py
def is_subnet_of(a: IPv6Network, b: IPv6Network) -> bool:
    """
    A faster version of IPv6Network.subnet_of(other).

    Examples:
        >>> is_subnet_of(IPv6Network("1000::/16"), IPv6Network("1000::/16"))
        True
        >>> is_subnet_of(IPv6Network("1000::/17"), IPv6Network("1000::/16"))
        True
        >>> is_subnet_of(IPv6Network("1000::/15"), IPv6Network("1000::/16"))
        False
        >>> is_subnet_of(IPv6Network("1000::/16"), IPv6Network("2000::/16"))
        False
    """
    a_net = a.network_address._ip  # type: ignore
    b_net = b.network_address._ip  # type: ignore
    if b_net <= a_net:
        a_brd = a_net | (a.netmask._ip ^ ALL_ONES_V6)  # type: ignore
        b_brd = b_net | (b.netmask._ip ^ ALL_ONES_V6)  # type: ignore
        return b_brd >= a_brd  # type: ignore
    return False

n_items(counts, subset)

Examples:

>>> counts = {IPv6Network("1000::/16"): 2, IPv6Network("8000::/16"): 10}
>>> n_items(counts, IPv6Network("0000::/1"))
2
>>> n_items(counts, IPv6Network("8000::/1"))
10
>>> n_items(counts, IPv6Network("::/0"))
12
Source code in diamond_miner/subsets.py
def n_items(counts: Counts, subset: IPv6Network) -> int:
    """
    Examples:
        >>> counts = {IPv6Network("1000::/16"): 2, IPv6Network("8000::/16"): 10}
        >>> n_items(counts, IPv6Network("0000::/1"))
        2
        >>> n_items(counts, IPv6Network("8000::/1"))
        10
        >>> n_items(counts, IPv6Network("::/0"))
        12
    """
    total = 0
    for network, count in counts.items():
        if is_subnet_of(network, subset):
            total += count
    return total

split(counts, max_items_per_subset)

Return the IP networks such that there are no more than max_items_per_subset per network.

Parameters:

Name Type Description Default
counts Counts

Number of items per prefix in the database table.

required
max_items_per_subset int

Maximum number of items per network.

required

Examples:

>>> counts = {IPv6Network("::ffff:8.8.4.0/120"): 10, IPv6Network("::ffff:8.8.8.0/120"): 5}
>>> split(counts, 15)
[IPv6Network('::/0')]
>>> split(counts, 10)
[IPv6Network('::ffff:808:0/117'), IPv6Network('::ffff:808:800/117')]
>>> split(counts, 1) # Impossible case, should return the minimal feasible networks.
[IPv6Network('::ffff:808:400/120'), IPv6Network('::ffff:808:800/120')]
>>> split({}, 10)
[]
Source code in diamond_miner/subsets.py
def split(counts: Counts, max_items_per_subset: int) -> list[IPv6Network]:
    """
    Return the IP networks such that there are no more than `max_items_per_subset`
    per network.

    Args:
        counts: Number of items per prefix in the database table.
        max_items_per_subset: Maximum number of items per network.

    Examples:
        >>> counts = {IPv6Network("::ffff:8.8.4.0/120"): 10, IPv6Network("::ffff:8.8.8.0/120"): 5}
        >>> split(counts, 15)
        [IPv6Network('::/0')]
        >>> split(counts, 10)
        [IPv6Network('::ffff:808:0/117'), IPv6Network('::ffff:808:800/117')]
        >>> split(counts, 1) # Impossible case, should return the minimal feasible networks.
        [IPv6Network('::ffff:808:400/120'), IPv6Network('::ffff:808:800/120')]
        >>> split({}, 10)
        []
    """
    candidates = [(IPv6Network("::/0"), n_items(counts, IPv6Network("::/0")))]
    subsets = []

    while candidates:
        candidate, n_replies = candidates.pop()
        if max_items_per_subset >= n_replies > 0:
            subsets.append(candidate)
        elif n_replies > 0:
            a, b = tuple(candidate.subnets(prefixlen_diff=1))
            n_items_a = n_items(counts, a)
            n_items_b = n_items(counts, b)
            if n_items_a + n_items_b == 0:
                subsets.append(candidate)
            else:
                candidates.append((a, n_items_a))
                candidates.append((b, n_items_b))

    return sorted(subsets)

subsets_for(query, client, measurement_id, *, max_items_per_subset=8000000)

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetLinks, GetProbes, GetResults
>>> subsets_for(GetLinks(), client, 'test_nsdi_example', max_items_per_subset=1)
[IPv6Network('::ffff:c800:0/112')]
>>> subsets_for(GetProbes(round_eq=1), client, 'test_nsdi_example', max_items_per_subset=1)
[IPv6Network('::ffff:c800:0/112')]
>>> subsets_for(GetResults(), client, 'test_nsdi_example', max_items_per_subset=1)
[IPv6Network('::ffff:c800:0/112')]
Source code in diamond_miner/subsets.py
def subsets_for(
    query: LinksQuery | ProbesQuery | ResultsQuery,
    client: ClickHouseClient,
    measurement_id: str,
    *,
    max_items_per_subset: int = 8_000_000,
) -> list[IPv6Network]:
    """
    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetLinks, GetProbes, GetResults
        >>> subsets_for(GetLinks(), client, 'test_nsdi_example', max_items_per_subset=1)
        [IPv6Network('::ffff:c800:0/112')]
        >>> subsets_for(GetProbes(round_eq=1), client, 'test_nsdi_example', max_items_per_subset=1)
        [IPv6Network('::ffff:c800:0/112')]
        >>> subsets_for(GetResults(), client, 'test_nsdi_example', max_items_per_subset=1)
        [IPv6Network('::ffff:c800:0/112')]
    """
    if isinstance(query, LinksQuery):
        count_query = CountLinksPerPrefix(**common_parameters(query, LinksQuery))
    elif isinstance(query, ProbesQuery):
        count_query = CountProbesPerPrefix(**common_parameters(query, ProbesQuery))  # type: ignore
    elif isinstance(query, ResultsQuery):
        count_query = CountResultsPerPrefix(**common_parameters(query, ResultsQuery))  # type: ignore
    else:
        raise NotImplementedError
    counts = {
        addr_to_network(
            row["prefix"], count_query.prefix_len_v4, count_query.prefix_len_v6
        ): row["count"]
        for row in count_query.execute_iter(client, measurement_id)
    }
    return split(counts, max_items_per_subset)