Skip to content

Probe generators

diamond_miner.generators

probe_generator(prefixes, flow_ids, ttls, *, prefix_len_v4=DEFAULT_PREFIX_LEN_V4, prefix_len_v6=DEFAULT_PREFIX_LEN_V6, probe_src_port=DEFAULT_PROBE_SRC_PORT, probe_dst_port=DEFAULT_PROBE_DST_PORT, mapper_v4=SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V4), mapper_v6=SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V6), seed=None)

Generate a probe for each prefix, flow ID and TTL, in a random order.

Parameters:

Name Type Description Default
prefixes Sequence[tuple[str, str]]

A list of (prefix, protocol) tuples. The protocol can be icmp, icmp6 or udp.

required
flow_ids Sequence[int]

The flow IDs to probe.

required
ttls Sequence[int]

The TTLs to probe.

required
prefix_len_v4 int

The prefix length to which the IPv4 prefixes will be split to.

DEFAULT_PREFIX_LEN_V4
prefix_len_v6 int

The prefix length to which the IPv6 prefixes will be split to.

DEFAULT_PREFIX_LEN_V6
probe_src_port int

The minimum source port of the probes (can be incremented by the flow mapper).

DEFAULT_PROBE_SRC_PORT
probe_dst_port int

The destination port of the probes (constant).

DEFAULT_PROBE_DST_PORT
mapper_v4 FlowMapper

The flow mapper for IPv4 probes.

SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V4)
mapper_v6 FlowMapper

The flow mapper for IPv6 probes.

SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V6)
seed int | None

The seed of the random permutation (two calls with the same seed will yield the probes in the same order).

None

Examples:

This function is very versatile, it can generate Tokyo-Ping[@pelsser2013paris], Paris-Traceroute[@augustin2006avoiding] or Yarrp-like[@beverly2016yarrp] probes.

For ICMP probes, the source port is encoded by caracal in the checksum field of the ICMP header which is generally used by routers for per-flow load-balancing.

ICMP ping towards Google DNS servers with 2 flows per address and a TTL of 32
prefixes = [("8.8.8.8/32", "icmp"), ("2001:4860:4860::8888/128", "icmp6")]
prefix_len_v4 = 32
prefix_len_v6 = 128
flow_ids = range(2)
ttls = [32]

# When given a prefix size of 1, the sequential flow mapper will only vary the port.
mapper_v4 = SequentialFlowMapper(prefix_size=1)
mapper_v6 = SequentialFlowMapper(prefix_size=1)
ICMP ping towards 1.0.0.0/24 with 1 flow per address and a TTL of 32
prefixes = [("1.0.0.0/24", "icmp")]
prefix_len_v4 = 32 # The generator will cut the /24 in 256 /32.
flow_ids = range(1)
ttls = [32]
# Same flow mappers as above.
UDP traceroute towards 1.0.0.0/24 with 2 flows per address
# 256 addresses * 2 flows * 30 TTLs = 15,360 probes.
prefixes = [("1.0.0.0/24", "udp")]
prefix_len_v4 = 32 # The generator will cut the /24 in 256 /32.
flow_ids = range(2)
ttls = range(2, 32)
# Same flow mappers as above.
UDP traceroute towards 1.0.0.0/24 with 6 flows **per prefix**
# 1 prefix * 6 flows * 30 TTLs = 180 probes.
prefixes = [("1.0.0.0/24", "udp")]
prefix_len_v4 = 24 # We want to target the prefix, not its individual addresses.
flow_ids = range(6)
ttls = range(2, 32)

# The random flow mapper will assign a random destination (in the /24) to each flow.
mapper_v4 = RandomFlowMapper(prefix_size=256)
Source code in diamond_miner/generators/standalone.py
def probe_generator(
    prefixes: Sequence[tuple[str, str]],  # /32 or / 128 if nothing specified
    flow_ids: Sequence[int],
    ttls: Sequence[int],
    *,
    prefix_len_v4: int = DEFAULT_PREFIX_LEN_V4,
    prefix_len_v6: int = DEFAULT_PREFIX_LEN_V6,
    probe_src_port: int = DEFAULT_PROBE_SRC_PORT,
    probe_dst_port: int = DEFAULT_PROBE_DST_PORT,
    mapper_v4: FlowMapper = SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V4),
    mapper_v6: FlowMapper = SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V6),
    seed: int | None = None,
) -> Iterator[Probe]:
    """
    Generate a probe for each prefix, flow ID and TTL, in a random order.

    Args:
        prefixes: A list of (prefix, protocol) tuples. The protocol can be ``icmp``, ``icmp6`` or ``udp``.
        flow_ids: The flow IDs to probe.
        ttls: The TTLs to probe.
        prefix_len_v4: The prefix length to which the IPv4 prefixes will be split to.
        prefix_len_v6: The prefix length to which the IPv6 prefixes will be split to.
        probe_src_port: The minimum source port of the probes (can be incremented by the flow mapper).
        probe_dst_port: The destination port of the probes (constant).
        mapper_v4: The flow mapper for IPv4 probes.
        mapper_v6: The flow mapper for IPv6 probes.
        seed: The seed of the random permutation (two calls with the same seed will yield the probes in the same order).

    Examples:
        This function is very versatile, it can generate Tokyo-Ping[@pelsser2013paris],
        Paris-Traceroute[@augustin2006avoiding] or Yarrp-like[@beverly2016yarrp] probes.

        For ICMP probes, the source port is encoded by caracal in the checksum field of the ICMP header
        which is generally used by routers for per-flow load-balancing.

        ```python title="ICMP ping towards Google DNS servers with 2 flows per address and a TTL of 32"
        prefixes = [("8.8.8.8/32", "icmp"), ("2001:4860:4860::8888/128", "icmp6")]
        prefix_len_v4 = 32
        prefix_len_v6 = 128
        flow_ids = range(2)
        ttls = [32]

        # When given a prefix size of 1, the sequential flow mapper will only vary the port.
        mapper_v4 = SequentialFlowMapper(prefix_size=1)
        mapper_v6 = SequentialFlowMapper(prefix_size=1)
        ```

        ```python title="ICMP ping towards 1.0.0.0/24 with 1 flow per address and a TTL of 32"
        prefixes = [("1.0.0.0/24", "icmp")]
        prefix_len_v4 = 32 # The generator will cut the /24 in 256 /32.
        flow_ids = range(1)
        ttls = [32]
        # Same flow mappers as above.
        ```

        ```python title="UDP traceroute towards 1.0.0.0/24 with 2 flows per address"
        # 256 addresses * 2 flows * 30 TTLs = 15,360 probes.
        prefixes = [("1.0.0.0/24", "udp")]
        prefix_len_v4 = 32 # The generator will cut the /24 in 256 /32.
        flow_ids = range(2)
        ttls = range(2, 32)
        # Same flow mappers as above.
        ```

        ```python title="UDP traceroute towards 1.0.0.0/24 with 6 flows **per prefix**"
        # 1 prefix * 6 flows * 30 TTLs = 180 probes.
        prefixes = [("1.0.0.0/24", "udp")]
        prefix_len_v4 = 24 # We want to target the prefix, not its individual addresses.
        flow_ids = range(6)
        ttls = range(2, 32)

        # The random flow mapper will assign a random destination (in the /24) to each flow.
        mapper_v4 = RandomFlowMapper(prefix_size=256)
        ```
    """
    prefixes_: list[tuple[int, int, int, str]] = []
    for prefix, protocol in prefixes:
        for af, subprefix, subprefix_size in split_prefix(
            prefix, prefix_len_v4, prefix_len_v6
        ):
            prefixes_.append((af, subprefix, subprefix_size, protocol))

    grid = ParameterGrid(prefixes_, ttls, flow_ids).shuffled(seed=seed)

    for (af, subprefix, subprefix_size, protocol), ttl, flow_id in grid:
        mapper = mapper_v4 if af == 4 else mapper_v6
        addr_offset, port_offset = mapper.offset(flow_id, subprefix)
        yield subprefix + addr_offset, probe_src_port + port_offset, probe_dst_port, ttl, protocol

probe_generator_by_flow(prefixes, flow_ids, *, prefix_len_v4=DEFAULT_PREFIX_LEN_V4, prefix_len_v6=DEFAULT_PREFIX_LEN_V6, probe_src_port=DEFAULT_PROBE_SRC_PORT, probe_dst_port=DEFAULT_PROBE_DST_PORT, mapper_v4=SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V4), mapper_v6=SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V6), seed=None)

Generate a probe for each prefix, flow id and TTL, in a random order. This function differs from probe_generator in two aspects:

  • The TTLs are specified for each prefixes, and not globally.
  • All the probes for a given prefix and flow id are generated sequentially.

The parameters and output are identical to probe_generator, excepted for prefixes which is a list of (prefix, protocol, TTLs) tuples, and the absence of the ttls parameter.

Parameters:

Name Type Description Default
prefixes Iterable[tuple[str, str, Iterable[int]]]

TODO

required
Source code in diamond_miner/generators/standalone.py
def probe_generator_by_flow(
    prefixes: Iterable[
        tuple[str, str, Iterable[int]]
    ],  # /32 or / 128 if nothing specified
    flow_ids: Sequence[int],
    *,
    prefix_len_v4: int = DEFAULT_PREFIX_LEN_V4,
    prefix_len_v6: int = DEFAULT_PREFIX_LEN_V6,
    probe_src_port: int = DEFAULT_PROBE_SRC_PORT,
    probe_dst_port: int = DEFAULT_PROBE_DST_PORT,
    mapper_v4: FlowMapper = SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V4),
    mapper_v6: FlowMapper = SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V6),
    seed: int | None = None,
) -> Iterator[Probe]:
    """
    Generate a probe for each prefix, flow id and TTL, in a random order.
    This function differs from [probe_generator][diamond_miner.generators.probe_generator] in two aspects:

    - The TTLs are specified for each prefixes, and not globally.
    - All the probes for a given prefix and flow id are generated sequentially.

    The parameters and output are identical to [probe_generator][diamond_miner.generators.probe_generator],
    excepted for `prefixes` which is a list of (prefix, protocol, TTLs) tuples,
    and the absence of the `ttls` parameter.

    Args:
        prefixes: TODO
    """
    prefixes_: list[tuple[int, int, int, str, Iterable[int]]] = []
    for prefix, protocol, ttls in prefixes:
        for af, subprefix, subprefix_size in split_prefix(
            prefix, prefix_len_v4, prefix_len_v6
        ):
            prefixes_.append((af, subprefix, subprefix_size, protocol, ttls))

    grid = ParameterGrid(prefixes_, flow_ids).shuffled(seed=seed)

    for (af, subprefix, subprefix_size, protocol, ttls), flow_id in grid:
        mapper = mapper_v4 if af == 4 else mapper_v6
        for ttl in ttls:
            addr_offset, port_offset = mapper.offset(flow_id, subprefix)
            yield subprefix + addr_offset, probe_src_port + port_offset, probe_dst_port, ttl, protocol

probe_generator_from_database(client, measurement_id, round_, *, mapper_v4=SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V4), mapper_v6=SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V6), probe_src_port=DEFAULT_PROBE_SRC_PORT, probe_dst_port=DEFAULT_PROBE_DST_PORT, probe_ttl_geq=None, probe_ttl_leq=None, subsets=(UNIVERSE_SUBSET))

TODO: Doctest, note that this doesn't randomize probes.

Examples:

>>> from ipaddress import ip_address
>>> from diamond_miner.insert import insert_probe_counts
>>> from diamond_miner.test import client, create_tables
>>> create_tables(client, "test_probe_gen")
>>> insert_probe_counts(client, "test_probe_gen", 1, [("8.8.0.0/23", "icmp", [1, 2], 2)])
>>> probes = list(probe_generator_from_database(client, "test_probe_gen", 1))
>>> len(probes)
8
>>> (str(ip_address(probes[0][0])), *probes[0][1:])
('::ffff:808:100', 24000, 33434, 1, 'icmp')
Source code in diamond_miner/generators/database.py
def probe_generator_from_database(
    client: ClickHouseClient,
    measurement_id: str,
    round_: int,
    *,
    mapper_v4: FlowMapper = SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V4),
    mapper_v6: FlowMapper = SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V6),
    probe_src_port: int = DEFAULT_PROBE_SRC_PORT,
    probe_dst_port: int = DEFAULT_PROBE_DST_PORT,
    probe_ttl_geq: int | None = None,
    probe_ttl_leq: int | None = None,
    subsets: Iterable[IPNetwork] = (UNIVERSE_SUBSET,),
) -> Iterator[Probe]:
    """
    TODO: Doctest, note that this doesn't randomize probes.

    Examples:
        >>> from ipaddress import ip_address
        >>> from diamond_miner.insert import insert_probe_counts
        >>> from diamond_miner.test import client, create_tables
        >>> create_tables(client, "test_probe_gen")
        >>> insert_probe_counts(client, "test_probe_gen", 1, [("8.8.0.0/23", "icmp", [1, 2], 2)])
        >>> probes = list(probe_generator_from_database(client, "test_probe_gen", 1))
        >>> len(probes)
        8
        >>> (str(ip_address(probes[0][0])), *probes[0][1:])
        ('::ffff:808:100', 24000, 33434, 1, 'icmp')
    """
    global max_probes
    if max_probes == 0:
        max_probes = 4095 # XXX make this a parameter
        logger.info("capping the number of probes to send at %d", max_probes)

    rows = GetProbesDiff(
        round_eq=round_, probe_ttl_geq=probe_ttl_geq, probe_ttl_leq=probe_ttl_leq
    ).execute_iter(client, measurement_id, subsets=subsets)
    for row in rows:
        dst_prefix_int = int(IPv6Address(row["probe_dst_prefix"]))
        mapper = (
            mapper_v4 if row["probe_dst_prefix"].startswith("::ffff:") else mapper_v6
        )
        protocol_str = PROTOCOLS[row["probe_protocol"]]

        for ttl, total_probes, already_sent in row["probes_per_ttl"]:
            for flow_id in range(already_sent, total_probes):
                addr_offset, port_offset = mapper.offset(flow_id, dst_prefix_int)
                dst_addr = dst_prefix_int + addr_offset
                src_port = probe_src_port + port_offset
                # Note that port_offset is actually the number of probes sent after having already sent 256 probes.
                if port_offset > max_probes:
                    logger.warning("not probing %s after having already sent %d probes", row, max_probes+256)
                    break
                yield dst_addr, src_port, probe_dst_port, ttl, protocol_str  # type: ignore

probe_generator_parallel(filepath, client, measurement_id, round_, *, mapper_v4=SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V4), mapper_v6=SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V6), probe_src_port=DEFAULT_PROBE_SRC_PORT, probe_dst_port=DEFAULT_PROBE_DST_PORT, probe_ttl_geq=None, probe_ttl_leq=None, max_open_files=8192, n_workers=max(available_cpus() // 8, 1))

Compute the probes to send given the previously discovered links. This function shuffle the probes on-disk: External-memory shuffling in linear time?

Parameters:

Name Type Description Default
filepath Path

Output file (Zstd-compressed CSV file); will be overwritten.

required
client ClickHouseClient

ClickHouse client.

required
measurement_id str

Measurement id.

required
round_ int

Number of the round for which to generate the probes.

required
mapper_v4 FlowMapper

The flow mapper for IPv4 probes.

SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V4)
mapper_v6 FlowMapper

The flow mapper for IPv6 probes.

SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V6)
probe_src_port int

The minimum source port of the probes (can be incremented by the flow mapper).

DEFAULT_PROBE_SRC_PORT
probe_dst_port int

The destination port of the probes (constant).

DEFAULT_PROBE_DST_PORT
Source code in diamond_miner/generators/parallel.py
def probe_generator_parallel(
    filepath: Path,
    client: ClickHouseClient,
    measurement_id: str,
    round_: int,
    *,
    mapper_v4: FlowMapper = SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V4),
    mapper_v6: FlowMapper = SequentialFlowMapper(DEFAULT_PREFIX_SIZE_V6),
    probe_src_port: int = DEFAULT_PROBE_SRC_PORT,
    probe_dst_port: int = DEFAULT_PROBE_DST_PORT,
    probe_ttl_geq: int | None = None,
    probe_ttl_leq: int | None = None,
    max_open_files: int = 8192,
    n_workers: int = max(available_cpus() // 8, 1),
) -> int:
    """
    Compute the probes to send given the previously discovered links.
    This function shuffle the probes on-disk:
    [External-memory shuffling in linear time?](https://lemire.me/blog/2010/03/15/external-memory-shuffling-in-linear-time/)

    Args:
        filepath: Output file (Zstd-compressed CSV file); will be overwritten.
        client: ClickHouse client.
        measurement_id: Measurement id.
        round_: Number of the round for which to generate the probes.
        mapper_v4: The flow mapper for IPv4 probes.
        mapper_v6: The flow mapper for IPv6 probes.
        probe_src_port: The minimum source port of the probes (can be incremented by the flow mapper).
        probe_dst_port: The destination port of the probes (constant).

    """
    # TODO: These subsets are sub-optimal, `CountProbesPerPrefix` should count
    # the actual number of probes to be sent, not the total number of probes sent.
    subsets = subsets_for(
        GetProbesDiff(
            round_eq=round_, probe_ttl_geq=probe_ttl_geq, probe_ttl_leq=probe_ttl_leq
        ),
        client,
        measurement_id,
    )

    if not subsets:
        return 0

    n_files_per_subset = max_open_files // len(subsets)

    logger.info(
        "mda_probes n_workers=%s n_subsets=%s n_files_per_subset=%s",
        n_workers,
        len(subsets),
        n_files_per_subset,
    )

    with TemporaryDirectory(dir=filepath.parent) as temp_dir:
        with ProcessPoolExecutor(n_workers) as executor:
            futures = [
                executor.submit(
                    worker,
                    Path(temp_dir) / f"subset_{i}",
                    client.config,
                    measurement_id,
                    round_,
                    mapper_v4,
                    mapper_v6,
                    probe_src_port,
                    probe_dst_port,
                    probe_ttl_geq,
                    probe_ttl_leq,
                    subset,
                    n_files_per_subset,
                )
                for i, subset in enumerate(subsets)
            ]
            n_probes = sum(future.result() for future in as_completed(futures))

        files = list(Path(temp_dir).glob("subset_*.csv.zst"))
        random.shuffle(files)

        logger.info("mda_probes status=merging n_files=%s", len(files))
        with filepath.open("wb") as out:
            for f in files:
                with f.open("rb") as inp:
                    shutil.copyfileobj(inp, out)

    return n_probes