Usage
The diamond-miner
library contains three principal components:
- Database queries that implements most of the algorithms in ClickHouse SQL.
- Flow mappers, to map between flow IDs and (address, port) offsets.
- Probe generators, to generate randomized probes on-the-fly.
These components can be pieced together to conduct various kind of topology measurements.
How to run the examples
To run the examples below, you need a running ClickHouse server:
You also need pycaracal
and pych-client
.
We recommend that you install them in a virtual environment:
Yarrp
Yarrp is a high-speed single-path traceroute tool. Since Diamond-Miner is a generalization of Yarrp, it is easy to re-implement Yarrp with this library.
import logging
from pathlib import Path
from uuid import uuid4
from pycaracal import Probe, prober
from pych_client import ClickHouseClient
from diamond_miner.format import format_ipv6
from diamond_miner.generators import probe_generator
from diamond_miner.queries import (
CreateTables,
GetLinks,
InsertLinks,
InsertPrefixes,
InsertResults,
)
# Configuration
credentials = {
"base_url": "http://localhost:8123",
"database": "default",
"username": "default",
"password": "",
}
measurement_id = str(uuid4())
results_filepath = Path("results.csv")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
# Configure pycaracal
config = prober.Config()
config.set_output_file_csv(str(results_filepath))
config.set_probing_rate(10_000)
config.set_sniffer_wait_time(1)
# Generate ICMP probes towards every /24 in 1.0.0.0/22,
# with a single flow per prefix between TTLs 2-32.
gen = probe_generator(
prefixes=[("1.0.0.0/22", "icmp")],
flow_ids=range(1),
ttls=range(2, 33),
)
# Convert tuples output by `probe_generator` to pycaracal probes.
probes = (
Probe(format_ipv6(dst_addr), src_port, dst_port, ttl, protocol, 0)
for dst_addr, src_port, dst_port, ttl, protocol in gen
)
# Send the probes.
# Note that the probes are randomized and sent on-the-fly,
# without being buffered in memory.
prober_stats, sniffer_stats, pcap_stats = prober.probe(config, probes)
# Display some statistics from pycaracal.
print(f"{prober_stats.read} probes read")
print(f"{sniffer_stats.received_count} probes received")
with ClickHouseClient(**credentials) as client:
# Insert the results into the database
CreateTables().execute(client, measurement_id)
InsertResults().execute(
client, measurement_id, data=results_filepath.read_bytes()
)
InsertPrefixes().execute(client, measurement_id)
InsertLinks().execute(client, measurement_id)
# Query the results
links = GetLinks().execute(client, measurement_id)
print(f"{len(links)} links discovered")
Diamond-Miner
Diamond-Miner needs to remember how many probes were sent to each TTL. As such, instead of generating the probes on-the-fly as in the Yarrp example, we first store in the database the number of probes to send at each round, and we then generate a probes file containing one line per probe. This file is given as an input to pycaracal.
import logging
from pathlib import Path
from uuid import uuid4
from pycaracal import prober
from pych_client import ClickHouseClient
from diamond_miner.generators import probe_generator_parallel
from diamond_miner.insert import insert_mda_probe_counts, insert_probe_counts
from diamond_miner.queries import (
CreateTables,
GetLinks,
InsertLinks,
InsertPrefixes,
InsertResults,
)
# Configuration
credentials = {
"base_url": "http://localhost:8123",
"database": "default",
"username": "default",
"password": "",
}
measurement_id = str(uuid4())
probes_filepath = Path("probes.csv.zst")
results_filepath = Path("results.csv")
# ICMP traceroute towards every /24 in 1.0.0.0/22 starting with 6 flows per prefix between TTLs 2-32
prefixes = [("1.0.0.0/22", "icmp", range(2, 33), 6)]
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
with ClickHouseClient(**credentials) as client:
CreateTables().execute(client, measurement_id)
for round_ in range(1, 10):
logging.info("round=%s", round_)
if round_ == 1:
# Compute the initial probes
insert_probe_counts(
client=client,
measurement_id=measurement_id,
round_=1,
prefixes=prefixes,
)
else:
# Insert results from the previous round
InsertResults().execute(
client, measurement_id, data=results_filepath.read_bytes()
)
InsertPrefixes().execute(client, measurement_id)
InsertLinks().execute(client, measurement_id)
# Compute subsequent probes
insert_mda_probe_counts(
client=client,
measurement_id=measurement_id,
previous_round=round_ - 1,
)
# Write the probes to a file
n_probes = probe_generator_parallel(
filepath=probes_filepath,
client=client,
measurement_id=measurement_id,
round_=round_,
)
logging.info("n_probes=%s", n_probes)
if n_probes == 0:
break
# Send the probes
config = prober.Config()
config.set_output_file_csv(str(results_filepath))
config.set_probing_rate(10_000)
config.set_sniffer_wait_time(1)
prober.probe(config, str(probes_filepath))
links = GetLinks().execute(client, measurement_id)
print(f"{len(links)} links discovered")
Scaling Diamond-Miner
You may find the previous example to run slowly for a large number of prefixes and/or results.
- To speed up
InsertResults
, you can first split the input file in multiple parts, and run this query in parallel over each part. - To speed up
InsertPrefixes
andInsertLinks
, you can run these queries in parallel over subsets of the probing space. For example: from diamond_miner.queries import InsertPrefixes from diamond_miner.subsets import subsets_for from pych_client import ClickHouseClient with ClickHouseClient() as client: query = InsertPrefixes() # First option: define subsets manually subsets = ["1.0.0.0/23", "1.0.2.0/23"] # Second option: compute subsets automatically with `subsets_for` subsets = subsets_for(query, client, measurement_id) query.execute_concurrent(client, measurement_id, subsets=subsets, concurrent_requests=8)
You can see such techniques implemented in Iris source code:
Alternative probing tools
This library is designed to work with pycaracal
as the probing tool.
However, you can use the tool of your choice, such as scamper
as long as you can convert the results to the following format:
capture_timestamp,probe_protocol,probe_src_addr,probe_dst_addr,probe_src_port,probe_dst_port,probe_ttl,quoted_ttl,reply_src_addr,reply_protocol,reply_icmp_type,reply_icmp_code,reply_ttl,reply_size,reply_mpls_labels,rtt,round
1658244381,1,::ffff:132.227.78.108,::ffff:1.0.0.1,24000,0,3,1,::ffff:134.157.254.124,1,11,0,253,56,"[]",30,1
1658244381,1,::ffff:132.227.78.108,::ffff:1.0.0.5,24000,0,3,1,::ffff:134.157.254.124,1,11,0,253,56,"[]",57,1
...