Skip to content

Usage

The diamond-miner library contains three principal components:

  • Database queries that implements most of the algorithms in ClickHouse SQL.
  • Flow mappers, to map between flow IDs and (address, port) offsets.
  • Probe generators, to generate randomized probes on-the-fly.

These components can be pieced together to conduct various kind of topology measurements.

How to run the examples

To run the examples below, you need a running ClickHouse server:

docker run --rm -d -p 8123:8123 clickhouse/clickhouse-server:22.8

You also need pycaracal and pych-client. We recommend that you install them in a virtual environment:

python -m venv venv
source venv/bin/activate
pip install diamond-miner pycaracal pych-client

Yarrp

Yarrp is a high-speed single-path traceroute tool. Since Diamond-Miner is a generalization of Yarrp, it is easy to re-implement Yarrp with this library.

python examples/yarrp.py
# 2 links discovered
examples/yarrp.py
import logging
from pathlib import Path
from uuid import uuid4

from pycaracal import Probe, prober
from pych_client import ClickHouseClient

from diamond_miner.format import format_ipv6
from diamond_miner.generators import probe_generator
from diamond_miner.queries import (
    CreateTables,
    GetLinks,
    InsertLinks,
    InsertPrefixes,
    InsertResults,
)

# Configuration
credentials = {
    "base_url": "http://localhost:8123",
    "database": "default",
    "username": "default",
    "password": "",
}
measurement_id = str(uuid4())
results_filepath = Path("results.csv")

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)

    # Configure pycaracal
    config = prober.Config()
    config.set_output_file_csv(str(results_filepath))
    config.set_probing_rate(10_000)
    config.set_sniffer_wait_time(1)

    # Generate ICMP probes towards every /24 in 1.0.0.0/22,
    # with a single flow per prefix between TTLs 2-32.
    gen = probe_generator(
        prefixes=[("1.0.0.0/22", "icmp")],
        flow_ids=range(1),
        ttls=range(2, 33),
    )

    # Convert tuples output by `probe_generator` to pycaracal probes.
    probes = (
        Probe(format_ipv6(dst_addr), src_port, dst_port, ttl, protocol, 0)
        for dst_addr, src_port, dst_port, ttl, protocol in gen
    )

    # Send the probes.
    # Note that the probes are randomized and sent on-the-fly,
    # without being buffered in memory.
    prober_stats, sniffer_stats, pcap_stats = prober.probe(config, probes)

    # Display some statistics from pycaracal.
    print(f"{prober_stats.read} probes read")
    print(f"{sniffer_stats.received_count} probes received")

    with ClickHouseClient(**credentials) as client:
        # Insert the results into the database
        CreateTables().execute(client, measurement_id)
        InsertResults().execute(
            client, measurement_id, data=results_filepath.read_bytes()
        )
        InsertPrefixes().execute(client, measurement_id)
        InsertLinks().execute(client, measurement_id)

        # Query the results
        links = GetLinks().execute(client, measurement_id)
        print(f"{len(links)} links discovered")

Diamond-Miner

Diamond-Miner needs to remember how many probes were sent to each TTL. As such, instead of generating the probes on-the-fly as in the Yarrp example, we first store in the database the number of probes to send at each round, and we then generate a probes file containing one line per probe. This file is given as an input to pycaracal.

python examples/diamond-miner.py
# 8 links discovered
examples/diamond-miner.py
import logging
from pathlib import Path
from uuid import uuid4

from pycaracal import prober
from pych_client import ClickHouseClient

from diamond_miner.generators import probe_generator_parallel
from diamond_miner.insert import insert_mda_probe_counts, insert_probe_counts
from diamond_miner.queries import (
    CreateTables,
    GetLinks,
    InsertLinks,
    InsertPrefixes,
    InsertResults,
)

# Configuration
credentials = {
    "base_url": "http://localhost:8123",
    "database": "default",
    "username": "default",
    "password": "",
}
measurement_id = str(uuid4())
probes_filepath = Path("probes.csv.zst")
results_filepath = Path("results.csv")

# ICMP traceroute towards every /24 in 1.0.0.0/22 starting with 6 flows per prefix between TTLs 2-32
prefixes = [("1.0.0.0/22", "icmp", range(2, 33), 6)]

if __name__ == "__main__":
    logging.basicConfig(level=logging.INFO)
    with ClickHouseClient(**credentials) as client:
        CreateTables().execute(client, measurement_id)
        for round_ in range(1, 10):
            logging.info("round=%s", round_)
            if round_ == 1:
                # Compute the initial probes
                insert_probe_counts(
                    client=client,
                    measurement_id=measurement_id,
                    round_=1,
                    prefixes=prefixes,
                )
            else:
                # Insert results from the previous round
                InsertResults().execute(
                    client, measurement_id, data=results_filepath.read_bytes()
                )
                InsertPrefixes().execute(client, measurement_id)
                InsertLinks().execute(client, measurement_id)
                # Compute subsequent probes
                insert_mda_probe_counts(
                    client=client,
                    measurement_id=measurement_id,
                    previous_round=round_ - 1,
                )

            # Write the probes to a file
            n_probes = probe_generator_parallel(
                filepath=probes_filepath,
                client=client,
                measurement_id=measurement_id,
                round_=round_,
            )
            logging.info("n_probes=%s", n_probes)
            if n_probes == 0:
                break

            # Send the probes
            config = prober.Config()
            config.set_output_file_csv(str(results_filepath))
            config.set_probing_rate(10_000)
            config.set_sniffer_wait_time(1)
            prober.probe(config, str(probes_filepath))

        links = GetLinks().execute(client, measurement_id)
        print(f"{len(links)} links discovered")

Scaling Diamond-Miner

You may find the previous example to run slowly for a large number of prefixes and/or results.

  • To speed up InsertResults, you can first split the input file in multiple parts, and run this query in parallel over each part.
  • To speed up InsertPrefixes and InsertLinks, you can run these queries in parallel over subsets of the probing space. For example:
  • from diamond_miner.queries import InsertPrefixes
    from diamond_miner.subsets import subsets_for
    from pych_client import ClickHouseClient
    
    with ClickHouseClient() as client:
        query = InsertPrefixes()
        # First option: define subsets manually
        subsets = ["1.0.0.0/23", "1.0.2.0/23"]
        # Second option: compute subsets automatically with `subsets_for`
        subsets = subsets_for(query, client, measurement_id)
        query.execute_concurrent(client, measurement_id, subsets=subsets, concurrent_requests=8)
    

You can see such techniques implemented in Iris source code:

Alternative probing tools

This library is designed to work with pycaracal as the probing tool. However, you can use the tool of your choice, such as scamper as long as you can convert the results to the following format:

capture_timestamp,probe_protocol,probe_src_addr,probe_dst_addr,probe_src_port,probe_dst_port,probe_ttl,quoted_ttl,reply_src_addr,reply_protocol,reply_icmp_type,reply_icmp_code,reply_ttl,reply_size,reply_mpls_labels,rtt,round
1658244381,1,::ffff:132.227.78.108,::ffff:1.0.0.1,24000,0,3,1,::ffff:134.157.254.124,1,11,0,253,56,"[]",30,1
1658244381,1,::ffff:132.227.78.108,::ffff:1.0.0.5,24000,0,3,1,::ffff:134.157.254.124,1,11,0,253,56,"[]",57,1
...