Skip to content

Database queries

diamond_miner.queries

Wrappers around ClickHouse SQL queries.

The queries operate on different kind of tables. Refer to the following superclasses for more information: Query, LinksQuery, PrefixesQuery, ProbesQuery, ResultsQuery.

Count dataclass

Bases: Query

Count the number of rows returned by a given query.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetLinks, GetNodes
>>> Count(query=GetNodes()).execute(client, 'test_nsdi_example')[0]["count()"]
7
>>> Count(query=GetLinks()).execute(client, 'test_nsdi_example')[0]["count()"]
8
Source code in diamond_miner/queries/count.py
@dataclass(frozen=True)
class Count(Query):
    """
    Count the number of rows returned by a given query.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetLinks, GetNodes
        >>> Count(query=GetNodes()).execute(client, 'test_nsdi_example')[0]["count()"]
        7
        >>> Count(query=GetLinks()).execute(client, 'test_nsdi_example')[0]["count()"]
        8
    """

    query: Query | None = None
    "The query for which to count the nodes."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        # `query` must be typed `Optional` since it appears after arguments with default values.
        assert self.query is not None
        return f"SELECT count() FROM ({self.query.statement(measurement_id, subset)})"

query: Query | None = None class-attribute instance-attribute

The query for which to count the nodes.

CountLinksPerPrefix dataclass

Bases: LinksQuery

Count the number of (non-distinct) links per prefix.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import CountLinksPerPrefix
>>> rows = CountLinksPerPrefix().execute(client, 'test_nsdi_example')
>>> sorted((row["prefix"], row["count"]) for row in rows)
[('::ffff:200.0.0.0', 58)]
Source code in diamond_miner/queries/count_rows.py
@dataclass(frozen=True)
class CountLinksPerPrefix(LinksQuery):
    """
    Count the number of (non-distinct) links per prefix.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import CountLinksPerPrefix
        >>> rows = CountLinksPerPrefix().execute(client, 'test_nsdi_example')
        >>> sorted((row["prefix"], row["count"]) for row in rows)
        [('::ffff:200.0.0.0', 58)]
    """

    prefix_len_v4: int = 16
    "The IPv4 prefix length to consider."

    prefix_len_v6: int = 8
    "The IPv6 prefix length to consider."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        return f"""
        WITH {cut_ipv6('probe_dst_addr', self.prefix_len_v4, self.prefix_len_v6)} AS prefix
        SELECT prefix, count() AS count
        FROM {links_table(measurement_id)}
        WHERE {self.filters(subset)}
        GROUP BY prefix
        """

prefix_len_v4: int = 16 class-attribute instance-attribute

The IPv4 prefix length to consider.

prefix_len_v6: int = 8 class-attribute instance-attribute

The IPv6 prefix length to consider.

CountProbesPerPrefix dataclass

Bases: ProbesQuery

Count the number of probes sent per prefix.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import CountProbesPerPrefix
>>> rows = CountResultsPerPrefix(round_eq=1).execute(client, 'test_nsdi_example')
>>> sorted((row["prefix"], row["count"]) for row in rows)
[('::ffff:200.0.0.0', 24)]
Source code in diamond_miner/queries/count_rows.py
@dataclass(frozen=True)
class CountProbesPerPrefix(ProbesQuery):
    """
    Count the number of probes sent per prefix.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import CountProbesPerPrefix
        >>> rows = CountResultsPerPrefix(round_eq=1).execute(client, 'test_nsdi_example')
        >>> sorted((row["prefix"], row["count"]) for row in rows)
        [('::ffff:200.0.0.0', 24)]
    """

    prefix_len_v4: int = 16
    "The IPv4 prefix length to consider."

    prefix_len_v6: int = 8
    "The IPv6 prefix length to consider."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        assert self.round_eq
        return f"""
        WITH {cut_ipv6('probe_dst_prefix', self.prefix_len_v4, self.prefix_len_v6)} AS prefix
        SELECT prefix, sum(cumulative_probes) AS count
        FROM {probes_table(measurement_id)}
        WHERE {self.filters(subset)}
        GROUP BY prefix
        """

prefix_len_v4: int = 16 class-attribute instance-attribute

The IPv4 prefix length to consider.

prefix_len_v6: int = 8 class-attribute instance-attribute

The IPv6 prefix length to consider.

CountResultsPerPrefix dataclass

Bases: ResultsQuery

Count the number of results per prefix.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import CountResultsPerPrefix
>>> rows = CountResultsPerPrefix(prefix_len_v4=8, prefix_len_v6=8).execute(client, 'test_count_replies')
>>> sorted((row["prefix"], row["count"]) for row in rows)
[('::ffff:1.0.0.0', 2), ('::ffff:2.0.0.0', 1), ('::ffff:204.0.0.0', 1)]
Source code in diamond_miner/queries/count_rows.py
@dataclass(frozen=True)
class CountResultsPerPrefix(ResultsQuery):
    """
    Count the number of results per prefix.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import CountResultsPerPrefix
        >>> rows = CountResultsPerPrefix(prefix_len_v4=8, prefix_len_v6=8).execute(client, 'test_count_replies')
        >>> sorted((row["prefix"], row["count"]) for row in rows)
        [('::ffff:1.0.0.0', 2), ('::ffff:2.0.0.0', 1), ('::ffff:204.0.0.0', 1)]
    """

    prefix_len_v4: int = 16
    "The IPv4 prefix length to consider."

    prefix_len_v6: int = 8
    "The IPv6 prefix length to consider."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        return f"""
        WITH {cut_ipv6('probe_dst_addr', self.prefix_len_v4, self.prefix_len_v6)} AS prefix
        SELECT prefix, count() AS count
        FROM {results_table(measurement_id)}
        WHERE {self.filters(subset)}
        GROUP BY prefix
        """

prefix_len_v4: int = 16 class-attribute instance-attribute

The IPv4 prefix length to consider.

prefix_len_v6: int = 8 class-attribute instance-attribute

The IPv6 prefix length to consider.

CreateLinksTable dataclass

Bases: Query

Create the links table containing one line per (flow, link) pair.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import CreateLinksTable
>>> CreateLinksTable().execute(client, "test")
[]
Source code in diamond_miner/queries/create_links_table.py
@dataclass(frozen=True)
class CreateLinksTable(Query):
    """
    Create the links table containing one line per (flow, link) pair.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import CreateLinksTable
        >>> CreateLinksTable().execute(client, "test")
        []
    """

    storage_policy: StoragePolicy = StoragePolicy()
    "ClickHouse storage policy to use."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        assert subset == UNIVERSE_SUBSET, "subset not allowed for this query"
        return f"""
        CREATE TABLE IF NOT EXISTS {links_table(measurement_id)}
        (
            probe_protocol    UInt8,
            probe_src_addr    IPv6,
            probe_dst_prefix  IPv6,
            probe_dst_addr    IPv6,
            probe_src_port    UInt16,
            probe_dst_port    UInt16,
            near_round        UInt8,
            far_round         UInt8,
            near_ttl          UInt8,
            far_ttl           UInt8,
            near_addr         IPv6,
            far_addr          IPv6,
            -- Materialized columns
            is_destination    UInt8 MATERIALIZED (near_addr = probe_dst_addr) OR (far_addr = probe_dst_addr),
            is_inter_round    UInt8 MATERIALIZED near_round != far_round,
            is_partial        UInt8 MATERIALIZED near_addr = toIPv6('::') OR far_addr = toIPv6('::'),
            is_virtual        UInt8 MATERIALIZED near_addr = toIPv6('::') AND far_addr = toIPv6('::')
        )
        ENGINE MergeTree
        ORDER BY (
            probe_protocol,
            probe_src_addr,
            probe_dst_prefix,
            probe_dst_addr,
            probe_src_port,
            probe_dst_port
        )
        TTL {date_time(self.storage_policy.archive_on)} TO VOLUME '{self.storage_policy.archive_to}'
        SETTINGS storage_policy = '{self.storage_policy.name}'
        """

storage_policy: StoragePolicy = StoragePolicy() class-attribute instance-attribute

ClickHouse storage policy to use.

CreatePrefixesTable dataclass

Bases: Query

Create the table containing (invalid) prefixes.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import CreatePrefixesTable
>>> CreatePrefixesTable().execute(client, "test")
[]
Source code in diamond_miner/queries/create_prefixes_table.py
@dataclass(frozen=True)
class CreatePrefixesTable(Query):
    """
    Create the table containing (invalid) prefixes.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import CreatePrefixesTable
        >>> CreatePrefixesTable().execute(client, "test")
        []
    """

    SORTING_KEY = "probe_protocol, probe_src_addr, probe_dst_prefix"
    "Columns by which the data is ordered."

    storage_policy: StoragePolicy = StoragePolicy()
    "ClickHouse storage policy to use."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        return f"""
        CREATE TABLE IF NOT EXISTS {prefixes_table(measurement_id)}
        (
            probe_protocol         UInt8,
            probe_src_addr         IPv6,
            probe_dst_prefix       IPv6,
            has_amplification      UInt8,
            has_loops              UInt8
        )
        ENGINE MergeTree
        ORDER BY ({self.SORTING_KEY})
        TTL {date_time(self.storage_policy.archive_on)} TO VOLUME '{self.storage_policy.archive_to}'
        SETTINGS storage_policy = '{self.storage_policy.name}'
        """

SORTING_KEY = 'probe_protocol, probe_src_addr, probe_dst_prefix' class-attribute instance-attribute

Columns by which the data is ordered.

storage_policy: StoragePolicy = StoragePolicy() class-attribute instance-attribute

ClickHouse storage policy to use.

CreateProbesTable dataclass

Bases: Query

Create the table containing the cumulative number of probes sent over the rounds.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import CreateProbesTable
>>> CreateProbesTable().execute(client, "test")
[]
Source code in diamond_miner/queries/create_probes_table.py
@dataclass(frozen=True)
class CreateProbesTable(Query):
    """
    Create the table containing the cumulative number of probes sent over the rounds.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import CreateProbesTable
        >>> CreateProbesTable().execute(client, "test")
        []
    """

    SORTING_KEY = "probe_protocol, probe_dst_prefix, probe_ttl"
    "Columns by which the data is ordered."

    storage_policy: StoragePolicy = StoragePolicy()
    "ClickHouse storage policy to use."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        return f"""
        CREATE TABLE IF NOT EXISTS {probes_table(measurement_id)}
        (
            probe_protocol         UInt8,
            probe_dst_prefix       IPv6,
            probe_ttl              UInt8,
            cumulative_probes      UInt32,
            round                  UInt8
        )
        ENGINE MergeTree
        ORDER BY ({self.SORTING_KEY})
        TTL {date_time(self.storage_policy.archive_on)} TO VOLUME '{self.storage_policy.archive_to}'
        SETTINGS storage_policy = '{self.storage_policy.name}'
        """

SORTING_KEY = 'probe_protocol, probe_dst_prefix, probe_ttl' class-attribute instance-attribute

Columns by which the data is ordered.

storage_policy: StoragePolicy = StoragePolicy() class-attribute instance-attribute

ClickHouse storage policy to use.

CreateResultsTable dataclass

Bases: Query

Create the table used to store the measurement results from the prober.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import CreateResultsTable
>>> CreateResultsTable().execute(client, "test")
[]
Source code in diamond_miner/queries/create_results_table.py
@dataclass(frozen=True)
class CreateResultsTable(Query):
    """
    Create the table used to store the measurement results from the prober.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import CreateResultsTable
        >>> CreateResultsTable().execute(client, "test")
        []
    """

    SORTING_KEY = "probe_protocol, probe_src_addr, probe_dst_prefix, probe_dst_addr, probe_src_port, probe_dst_port, probe_ttl"
    "Columns by which the data is ordered."

    prefix_len_v4: int = DEFAULT_PREFIX_LEN_V4
    "The prefix length used to compute the IPv4 prefix of an IP address."

    prefix_len_v6: int = DEFAULT_PREFIX_LEN_V6
    "The prefix length used to compute the IPv6 prefix of an IP address."

    storage_policy: StoragePolicy = StoragePolicy()
    "ClickHouse storage policy to use."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        return f"""
        CREATE TABLE IF NOT EXISTS {results_table(measurement_id)}
        (
            -- Since we do not order by capture timestamp, this column compresses badly.
            -- To reduce its size, caracal outputs the timestamp with a one-second resolution (instead of one microsecond).
            -- This is sufficient to know if two replies were received close in time
            -- and avoid the inference of false links over many hours.
            capture_timestamp      DateTime CODEC(T64, ZSTD(1)),
            probe_protocol         UInt8,
            probe_src_addr         IPv6,
            probe_dst_addr         IPv6,
            probe_src_port         UInt16,
            probe_dst_port         UInt16,
            probe_ttl              UInt8,
            quoted_ttl             UInt8,
            reply_src_addr         IPv6,
            reply_protocol         UInt8,
            reply_icmp_type        UInt8,
            reply_icmp_code        UInt8,
            reply_ttl              UInt8,
            reply_size             UInt16,
            reply_mpls_labels      Array(Tuple(UInt32, UInt8, UInt8, UInt8)),
            -- The rtt column is the largest compressed column, we use T64 and ZSTD to reduce its size, see:
            -- https://altinity.com/blog/2019/7/new-encodings-to-improve-clickhouse
            -- https://clickhouse.tech/docs/en/sql-reference/statements/create/table/#codecs
            rtt                    UInt16 CODEC(T64, ZSTD(1)),
            round                  UInt8,
            -- Materialized columns
            probe_dst_prefix       IPv6 MATERIALIZED {cut_ipv6('probe_dst_addr', self.prefix_len_v4, self.prefix_len_v6)},
            reply_src_prefix       IPv6 MATERIALIZED {cut_ipv6('reply_src_addr', self.prefix_len_v4, self.prefix_len_v6)},
            -- https://en.wikipedia.org/wiki/Reserved_IP_addresses
            private_probe_dst_prefix UInt8 MATERIALIZED
                (probe_dst_prefix >= toIPv6('0.0.0.0')      AND probe_dst_prefix <= toIPv6('0.255.255.255'))   OR
                (probe_dst_prefix >= toIPv6('10.0.0.0')     AND probe_dst_prefix <= toIPv6('10.255.255.255'))  OR
                (probe_dst_prefix >= toIPv6('100.64.0.0')   AND probe_dst_prefix <= toIPv6('100.127.255.255')) OR
                (probe_dst_prefix >= toIPv6('127.0.0.0')    AND probe_dst_prefix <= toIPv6('127.255.255.255')) OR
                (probe_dst_prefix >= toIPv6('172.16.0.0')   AND probe_dst_prefix <= toIPv6('172.31.255.255'))  OR
                (probe_dst_prefix >= toIPv6('192.0.0.0')    AND probe_dst_prefix <= toIPv6('192.0.0.255'))     OR
                (probe_dst_prefix >= toIPv6('192.0.2.0')    AND probe_dst_prefix <= toIPv6('192.0.2.255'))     OR
                (probe_dst_prefix >= toIPv6('192.88.99.0')  AND probe_dst_prefix <= toIPv6('192.88.99.255'))   OR
                (probe_dst_prefix >= toIPv6('192.168.0.0')  AND probe_dst_prefix <= toIPv6('192.168.255.255')) OR
                (probe_dst_prefix >= toIPv6('198.18.0.0')   AND probe_dst_prefix <= toIPv6('198.19.255.255'))  OR
                (probe_dst_prefix >= toIPv6('198.51.100.0') AND probe_dst_prefix <= toIPv6('198.51.100.255'))  OR
                (probe_dst_prefix >= toIPv6('203.0.113.0')  AND probe_dst_prefix <= toIPv6('203.0.113.255'))   OR
                (probe_dst_prefix >= toIPv6('224.0.0.0')    AND probe_dst_prefix <= toIPv6('239.255.255.255')) OR
                (probe_dst_prefix >= toIPv6('233.252.0.0')  AND probe_dst_prefix <= toIPv6('233.252.0.255'))   OR
                (probe_dst_prefix >= toIPv6('240.0.0.0')    AND probe_dst_prefix <= toIPv6('255.255.255.255')) OR
                (probe_dst_prefix >= toIPv6('fd00::')       AND probe_dst_prefix <= toIPv6('fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')),
            private_reply_src_addr UInt8 MATERIALIZED
                (reply_src_addr >= toIPv6('0.0.0.0')        AND reply_src_addr <= toIPv6('0.255.255.255'))     OR
                (reply_src_addr >= toIPv6('10.0.0.0')       AND reply_src_addr <= toIPv6('10.255.255.255'))    OR
                (reply_src_addr >= toIPv6('100.64.0.0')     AND reply_src_addr <= toIPv6('100.127.255.255'))   OR
                (reply_src_addr >= toIPv6('127.0.0.0')      AND reply_src_addr <= toIPv6('127.255.255.255'))   OR
                (reply_src_addr >= toIPv6('172.16.0.0')     AND reply_src_addr <= toIPv6('172.31.255.255'))    OR
                (reply_src_addr >= toIPv6('192.0.0.0')      AND reply_src_addr <= toIPv6('192.0.0.255'))       OR
                (reply_src_addr >= toIPv6('192.0.2.0')      AND reply_src_addr <= toIPv6('192.0.2.255'))       OR
                (reply_src_addr >= toIPv6('192.88.99.0')    AND reply_src_addr <= toIPv6('192.88.99.255'))     OR
                (reply_src_addr >= toIPv6('192.168.0.0')    AND reply_src_addr <= toIPv6('192.168.255.255'))   OR
                (reply_src_addr >= toIPv6('198.18.0.0')     AND reply_src_addr <= toIPv6('198.19.255.255'))    OR
                (reply_src_addr >= toIPv6('198.51.100.0')   AND reply_src_addr <= toIPv6('198.51.100.255'))    OR
                (reply_src_addr >= toIPv6('203.0.113.0')    AND reply_src_addr <= toIPv6('203.0.113.255'))     OR
                (reply_src_addr >= toIPv6('224.0.0.0')      AND reply_src_addr <= toIPv6('239.255.255.255'))   OR
                (reply_src_addr >= toIPv6('233.252.0.0')    AND reply_src_addr <= toIPv6('233.252.0.255'))     OR
                (reply_src_addr >= toIPv6('240.0.0.0')      AND reply_src_addr <= toIPv6('255.255.255.255'))   OR
                (reply_src_addr >= toIPv6('fd00::')         AND reply_src_addr <= toIPv6('fdff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')),
            destination_host_reply   UInt8 MATERIALIZED probe_dst_addr = reply_src_addr,
            destination_prefix_reply UInt8 MATERIALIZED probe_dst_prefix = reply_src_prefix,
            -- ICMP: protocol 1, UDP: protocol 17, ICMPv6: protocol 58
            valid_probe_protocol   UInt8 MATERIALIZED probe_protocol IN [1, 17, 58],
            time_exceeded_reply    UInt8 MATERIALIZED (reply_protocol = 1 AND reply_icmp_type = 11) OR (reply_protocol = 58 AND reply_icmp_type = 3)
        )
        ENGINE MergeTree
        ORDER BY ({self.SORTING_KEY})
        TTL {date_time(self.storage_policy.archive_on)} TO VOLUME '{self.storage_policy.archive_to}'
        SETTINGS storage_policy = '{self.storage_policy.name}'
        """

SORTING_KEY = 'probe_protocol, probe_src_addr, probe_dst_prefix, probe_dst_addr, probe_src_port, probe_dst_port, probe_ttl' class-attribute instance-attribute

Columns by which the data is ordered.

prefix_len_v4: int = DEFAULT_PREFIX_LEN_V4 class-attribute instance-attribute

The prefix length used to compute the IPv4 prefix of an IP address.

prefix_len_v6: int = DEFAULT_PREFIX_LEN_V6 class-attribute instance-attribute

The prefix length used to compute the IPv6 prefix of an IP address.

storage_policy: StoragePolicy = StoragePolicy() class-attribute instance-attribute

ClickHouse storage policy to use.

CreateTables dataclass

Bases: Query

Create the tables necessary for a measurement.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import CreateTables
>>> CreateTables().execute(client, "test")
[]
Source code in diamond_miner/queries/create_tables.py
@dataclass(frozen=True)
class CreateTables(Query):
    """
    Create the tables necessary for a measurement.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import CreateTables
        >>> CreateTables().execute(client, "test")
        []
    """

    prefix_len_v4: int = DEFAULT_PREFIX_LEN_V4
    "The prefix length used to compute the IPv4 prefix of an IP address."

    prefix_len_v6: int = DEFAULT_PREFIX_LEN_V6
    "The prefix length used to compute the IPv6 prefix of an IP address."

    storage_policy: StoragePolicy = StoragePolicy()
    "ClickHouse storage policy to use."

    def statements(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> Sequence[str]:
        all_params = {field.name: getattr(self, field.name) for field in fields(self)}
        # Only CreateResultsTable accepts these parameters.
        params = {
            x: y
            for x, y in all_params.items()
            if x not in ["prefix_len_v4", "prefix_len_v6"]
        }
        return (
            *CreateResultsTable(**all_params).statements(measurement_id, subset),
            *CreateLinksTable(**params).statements(measurement_id, subset),
            *CreatePrefixesTable(**params).statements(measurement_id, subset),
            *CreateProbesTable(**params).statements(measurement_id, subset),
        )

prefix_len_v4: int = DEFAULT_PREFIX_LEN_V4 class-attribute instance-attribute

The prefix length used to compute the IPv4 prefix of an IP address.

prefix_len_v6: int = DEFAULT_PREFIX_LEN_V6 class-attribute instance-attribute

The prefix length used to compute the IPv6 prefix of an IP address.

storage_policy: StoragePolicy = StoragePolicy() class-attribute instance-attribute

ClickHouse storage policy to use.

DropTables dataclass

Bases: Query

Drop the tables associated to a measurement.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import DropTables
>>> DropTables().execute(client, "test")
[]
Source code in diamond_miner/queries/drop_tables.py
@dataclass(frozen=True)
class DropTables(Query):
    """
    Drop the tables associated to a measurement.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import DropTables
        >>> DropTables().execute(client, "test")
        []
    """

    def statements(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> Sequence[str]:
        return (
            f"DROP TABLE IF EXISTS {results_table(measurement_id)}",
            f"DROP TABLE IF EXISTS {links_table(measurement_id)}",
            f"DROP TABLE IF EXISTS {prefixes_table(measurement_id)}",
            f"DROP TABLE IF EXISTS {probes_table(measurement_id)}",
        )

GetInvalidPrefixes dataclass

Bases: PrefixesQuery

Return the prefixes with unexpected behavior (see GetPrefixesWithAmplification and GetPrefixesWithLoops.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetInvalidPrefixes
>>> rows = GetInvalidPrefixes().execute(client, "test_invalid_prefixes")
>>> [x["probe_dst_prefix"] for x in rows]
['::ffff:201.0.0.0', '::ffff:202.0.0.0']
Source code in diamond_miner/queries/get_invalid_prefixes.py
@dataclass(frozen=True)
class GetInvalidPrefixes(PrefixesQuery):
    """
    Return the prefixes with unexpected behavior
    (see `GetPrefixesWithAmplification` and `GetPrefixesWithLoops`.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetInvalidPrefixes
        >>> rows = GetInvalidPrefixes().execute(client, "test_invalid_prefixes")
        >>> [x["probe_dst_prefix"] for x in rows]
        ['::ffff:201.0.0.0', '::ffff:202.0.0.0']
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        return f"""
        SELECT probe_dst_prefix
        FROM {prefixes_table(measurement_id)}
        WHERE {self.filters(subset)} AND (has_amplification OR has_loops)
        """

Bases: LinksQuery

Return the links pre-computed in the links table.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetLinks
>>> links = GetLinks(filter_invalid_prefixes=False).execute(client, 'test_invalid_prefixes')
>>> len(links)
3
>>> links = GetLinks(filter_invalid_prefixes=True).execute(client, 'test_invalid_prefixes')
>>> len(links)
1
>>> links = GetLinks(include_metadata=False).execute(client, 'test_nsdi_example')
>>> len(links)
8
>>> links = GetLinks(include_metadata=True).execute(client, 'test_nsdi_example')
>>> len(links)
8
>>> links = GetLinks(near_or_far_addr="150.0.6.1").execute(client, 'test_nsdi_example')
>>> len(links)
3
Source code in diamond_miner/queries/get_links.py
@dataclass(frozen=True)
class GetLinks(LinksQuery):
    """
    Return the links pre-computed in the links table.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetLinks
        >>> links = GetLinks(filter_invalid_prefixes=False).execute(client, 'test_invalid_prefixes')
        >>> len(links)
        3
        >>> links = GetLinks(filter_invalid_prefixes=True).execute(client, 'test_invalid_prefixes')
        >>> len(links)
        1
        >>> links = GetLinks(include_metadata=False).execute(client, 'test_nsdi_example')
        >>> len(links)
        8
        >>> links = GetLinks(include_metadata=True).execute(client, 'test_nsdi_example')
        >>> len(links)
        8
        >>> links = GetLinks(near_or_far_addr="150.0.6.1").execute(client, 'test_nsdi_example')
        >>> len(links)
        3
    """

    filter_invalid_prefixes: bool = False
    "If true, exclude links from prefixes with amplification or loops."

    include_metadata: bool = False
    "If true, include the TTLs at which `near_addr` and `far_addr` were seen."

    def columns(self) -> list[str]:
        columns = ["near_addr", "far_addr"]
        if self.include_metadata:
            columns = ["near_ttl", "far_ttl", *columns]
        return columns

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        if self.filter_invalid_prefixes:
            invalid_prefixes_query = GetInvalidPrefixes(
                **common_parameters(self, GetInvalidPrefixes)
            )
            prefix_filter = f"""
            probe_dst_prefix NOT IN ({invalid_prefixes_query.statement(measurement_id, subset)})
            """
        else:
            prefix_filter = "1"
        return f"""
        SELECT DISTINCT {','.join(self.columns())}
        FROM {links_table(measurement_id)}
        WHERE {self.filters(subset)} AND {prefix_filter}
        """

filter_invalid_prefixes: bool = False class-attribute instance-attribute

If true, exclude links from prefixes with amplification or loops.

include_metadata: bool = False class-attribute instance-attribute

If true, include the TTLs at which near_addr and far_addr were seen.

GetLinksFromResults dataclass

Bases: ResultsQuery

Compute the links from the results table. This returns one line per (flow, link) pair.

We do not emit a link in the case of single reply in a traceroute. For example: * * node * *, does not generate a link. However, * * node * * node', will generate (node, *) and (*, node').

We emit cross-rounds links. For example if flow N sees node A at TTL 10 at round 1 and flow N sees node B at TTL 11 at round 2, we will emit (1, 10, A) - (2, 11, B).

We assume that there exists a single (flow, ttl) pair over all rounds (TODO: assert this).

If round_eq is none, compute the links per flow, across all rounds. Otherwise, compute the links per flow, for the specified round. This is useful if you want to update a links table round-by-round: such a table will contain only intra-round links but can be updated incrementally.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetLinksFromResults
>>> links = GetLinksFromResults().execute(client, "test_nsdi_example")
>>> len(links)
58
Source code in diamond_miner/queries/get_links_from_results.py
@dataclass(frozen=True)
class GetLinksFromResults(ResultsQuery):
    """
    Compute the links from the results table.
    This returns one line per `(flow, link)` pair.

    We do not emit a link in the case of single reply in a traceroute.
    For example: `* * node * *`, does not generate a link.
    However, `* * node * * node'`, will generate `(node, *)` and `(*, node')`.

    We emit cross-rounds links.
    For example if flow N sees node A at TTL 10 at round 1 and flow N sees node B at TTL 11 at round 2,
    we will emit `(1, 10, A) - (2, 11, B)`.

    We assume that there exists a single (flow, ttl) pair over all rounds (TODO: assert this).

    If `round_eq` is none, compute the links per flow, across all rounds.
    Otherwise, compute the links per flow, for the specified round.
    This is useful if you want to update a `links` table round-by-round:
    such a table will contain only intra-round links but can be updated incrementally.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetLinksFromResults
        >>> links = GetLinksFromResults().execute(client, "test_nsdi_example")
        >>> len(links)
        58
    """

    ignore_invalid_prefixes: bool = True
    "If true, exclude invalid prefixes from links computation."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        if self.ignore_invalid_prefixes:
            invalid_filter = f"""
            AND (probe_protocol, probe_src_addr, probe_dst_prefix)
            NOT IN (
                SELECT probe_protocol, probe_src_addr, probe_dst_prefix
                FROM {prefixes_table(measurement_id)}
                WHERE {ip_in('probe_dst_prefix', subset)}
                AND has_amplification
            )
            """
            # TODO: We currently do not drop prefixes with loops as this considerably
            # reduces the number of discoveries. As such, we send more probe than necessary.
            # A better way would be to detect the min/max TTL of a loop, and to ignore it
            # in the next round query.
        else:
            invalid_filter = ""

        return f"""
        WITH
            groupUniqArray((round, probe_ttl, reply_src_addr)) AS traceroute,
            arrayMap(x -> x.2, traceroute) AS ttls,
            arrayMap(x -> (x.1, x.3), traceroute) AS val,
            CAST((ttls, val), 'Map(UInt8, Tuple(UInt8, IPv6))') AS map,
            arrayMin(ttls) AS first_ttl,
            arrayMax(ttls) AS last_ttl,
            arrayMap(i -> (toUInt8(i), toUInt8(i + 1), map[toUInt8(i)], map[toUInt8(i + 1)]), range(first_ttl, last_ttl)) AS links,
            arrayJoin(links) AS link
        SELECT
            probe_protocol,
            probe_src_addr,
            probe_dst_prefix,
            probe_dst_addr,
            probe_src_port,
            probe_dst_port,
            -- Set the round number for partial links:
            -- The link (1, 10, A) -> (null, 11, *) becomes
            --          (1, 10, A) -> (1,    11, *)
            if(link.3.1 != 0, link.3.1, link.4.1) AS near_round,
            if(link.4.1 != 0, link.4.1, link.3.1) AS far_round,
            link.1 AS near_ttl,
            link.2 AS far_ttl,
            link.3.2 AS near_addr,
            link.4.2 AS far_addr
        FROM {results_table(measurement_id)}
        WHERE {self.filters(subset)}
        {invalid_filter}
        GROUP BY (
            probe_protocol,
            probe_src_addr,
            probe_dst_prefix,
            probe_dst_addr,
            probe_src_port,
            probe_dst_port
        )
        """

ignore_invalid_prefixes: bool = True class-attribute instance-attribute

If true, exclude invalid prefixes from links computation.

GetMDAProbes dataclass

Bases: LinksQuery

Return the number of probes to send per prefix and per TTL according to the Diamond-Miner algorithm.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetMDAProbes
>>> GetMDAProbes(round_leq=1).execute(client, "test_nsdi_lite")
[{'probe_protocol': 1, 'probe_dst_prefix': '::ffff:200.0.0.0', 'cumulative_probes': [12, 12, 12, 12], 'TTLs': [1, 2, 3, 4]}]
Source code in diamond_miner/queries/get_mda_probes.py
@dataclass(frozen=True)
class GetMDAProbes(LinksQuery):
    """
    Return the number of probes to send per prefix and per TTL according to the Diamond-Miner algorithm.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetMDAProbes
        >>> GetMDAProbes(round_leq=1).execute(client, "test_nsdi_lite")
        [{'probe_protocol': 1, 'probe_dst_prefix': '::ffff:200.0.0.0', 'cumulative_probes': [12, 12, 12, 12], 'TTLs': [1, 2, 3, 4]}]
    """

    adaptive_eps: bool = True

    dminer_lite: bool = True
    "If true, use an heuristic that requires less probes to handle nested load-balancers."

    target_epsilon: float = DEFAULT_FAILURE_RATE
    """
    The desired failure rate of the MDA algorithm, that is, the probability of not detecting
    all the outgoing edges of a load-balancer for a given prefix and TTL.
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        if self.adaptive_eps:
            eps_fragment = """
            arrayMax(links_per_ttl) AS max_links,
            if(max_links == 0, target_epsilon, 1 - exp(log(1 - target_epsilon) / max_links))
                AS epsilon,
            """
        else:
            eps_fragment = """
            target_epsilon AS epsilon,
            """

        if self.dminer_lite:
            dm_fragment = """
            arrayMap(k -> toUInt32(ceil(ln(epsilon / (k + 1)) / ln((k + 1 - 1) / (k + 1)))), links_per_ttl) AS mda_flows,
            """
        else:
            # TODO: Implement by computing Dh(v)
            raise NotImplementedError

        return f"""
        WITH
            {self.target_epsilon} AS target_epsilon,
            -- 1) Compute the links
            --  x.1       x.2        x.3
            -- (near_ttl, near_addr, far_addr)
            groupUniqArray((near_ttl, near_addr, far_addr)) AS links,
            -- 2) Count the number of links per TTL
            -- extract only the TTLs, this greatly speeds-up arrayCount
            arrayMap(x -> x.1, links) AS links_ttls,
            -- find the min/max TTLs
            -- we add +2 since range() is exclusive and that we compute the max over the *near* TTL
            range(arrayMin(links_ttls), arrayMax(links_ttls) + 2) AS TTLs,
            -- count distinct links per TTL
            arrayMap(t -> countEqual(links_ttls, t), TTLs) AS links_per_ttl,
            -- 3) Compute MDA stopping points
            {eps_fragment}
            -- 4) Compute the number of probes to send during the next round
            {dm_fragment}
            -- compute the number of probes to send during the next round
            -- => max of probes to send over TTL t and t-1
            arrayMap(i -> arrayMax([mda_flows[i], mda_flows[i - 1]]), arrayEnumerate(TTLs)) AS cumulative_probes
            -- TODO: Cleanup/optimize/rewrite/... below
            -- do not send probes to TTLs where no replies have been received
            -- it is unlikely that we will discover more at this TTL if the first 6 flows have seen nothing
            -- (see GetNextRoundStateless)
        SELECT
            probe_protocol,
            probe_dst_prefix,
            cumulative_probes,
            TTLs
        FROM {links_table(measurement_id)} AS links_table
        WHERE {self.filters(subset)}
        GROUP BY (probe_protocol, probe_src_addr, probe_dst_prefix)
        """

dminer_lite: bool = True class-attribute instance-attribute

If true, use an heuristic that requires less probes to handle nested load-balancers.

target_epsilon: float = DEFAULT_FAILURE_RATE class-attribute instance-attribute

The desired failure rate of the MDA algorithm, that is, the probability of not detecting all the outgoing edges of a load-balancer for a given prefix and TTL.

GetNodes dataclass

Bases: ResultsQuery

Return all the discovered nodes.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetNodes
>>> nodes = GetNodes(include_probe_ttl=True).execute(client, 'test_nsdi_example')
>>> sorted((node["probe_ttl"], node["reply_src_addr"]) for node in nodes)
[(1, '::ffff:150.0.1.1'), (2, '::ffff:150.0.2.1'), (2, '::ffff:150.0.3.1'), (3, '::ffff:150.0.4.1'), (3, '::ffff:150.0.5.1'), (3, '::ffff:150.0.7.1'), (4, '::ffff:150.0.6.1')]
>>> nodes = GetNodes(filter_invalid_prefixes=False).execute(client, 'test_invalid_prefixes')
>>> sorted(node["reply_src_addr"] for node in nodes)
['::ffff:150.0.0.1', '::ffff:150.0.0.2', '::ffff:150.0.1.1', '::ffff:150.0.1.2', '::ffff:150.0.2.1', '::ffff:150.0.2.2', '::ffff:150.0.2.3']
>>> nodes = GetNodes(filter_invalid_prefixes=True).execute(client, 'test_invalid_prefixes')
>>> sorted(node["reply_src_addr"] for node in nodes)
['::ffff:150.0.0.1', '::ffff:150.0.0.2', '::ffff:150.0.2.3']
Source code in diamond_miner/queries/get_nodes.py
@dataclass(frozen=True)
class GetNodes(ResultsQuery):
    """
    Return all the discovered nodes.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetNodes
        >>> nodes = GetNodes(include_probe_ttl=True).execute(client, 'test_nsdi_example')
        >>> sorted((node["probe_ttl"], node["reply_src_addr"]) for node in nodes)
        [(1, '::ffff:150.0.1.1'), (2, '::ffff:150.0.2.1'), (2, '::ffff:150.0.3.1'), (3, '::ffff:150.0.4.1'), (3, '::ffff:150.0.5.1'), (3, '::ffff:150.0.7.1'), (4, '::ffff:150.0.6.1')]
        >>> nodes = GetNodes(filter_invalid_prefixes=False).execute(client, 'test_invalid_prefixes')
        >>> sorted(node["reply_src_addr"] for node in nodes)
        ['::ffff:150.0.0.1', '::ffff:150.0.0.2', '::ffff:150.0.1.1', '::ffff:150.0.1.2', '::ffff:150.0.2.1', '::ffff:150.0.2.2', '::ffff:150.0.2.3']
        >>> nodes = GetNodes(filter_invalid_prefixes=True).execute(client, 'test_invalid_prefixes')
        >>> sorted(node["reply_src_addr"] for node in nodes)
        ['::ffff:150.0.0.1', '::ffff:150.0.0.2', '::ffff:150.0.2.3']
    """

    filter_invalid_prefixes: bool = False
    "If true, exclude nodes from prefixes with amplification or loops."

    include_probe_ttl: bool = False
    "If true, include the TTL at which `reply_src_addr` was seen."

    def columns(self) -> list[str]:
        columns = ["reply_src_addr"]
        if self.include_probe_ttl:
            columns.insert(0, "probe_ttl")
        return columns

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        if self.filter_invalid_prefixes:
            invalid_prefixes_query = GetInvalidPrefixes(
                **common_parameters(self, GetInvalidPrefixes)
            )
            prefix_filter = f"""
                    probe_dst_prefix NOT IN ({invalid_prefixes_query.statement(measurement_id, subset)})
                    """
        else:
            prefix_filter = "1"
        return f"""
        SELECT DISTINCT {','.join(self.columns())}
        FROM {results_table(measurement_id)}
        WHERE {self.filters(subset)} AND {prefix_filter}
        """

filter_invalid_prefixes: bool = False class-attribute instance-attribute

If true, exclude nodes from prefixes with amplification or loops.

include_probe_ttl: bool = False class-attribute instance-attribute

If true, include the TTL at which reply_src_addr was seen.

GetPrefixes dataclass

Bases: ResultsQuery

Return the destination prefixes for which replies have been received.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetPrefixes
>>> from ipaddress import ip_network
>>> rows = GetPrefixes().execute(client, 'test_nsdi_example')
>>> len(rows)
1
>>> rows = GetPrefixes().execute(client, 'test_invalid_prefixes')
>>> len(rows)
3
>>> rows = GetPrefixes(reply_src_addr_in=ip_network("150.0.1.0/24")).execute(client, 'test_invalid_prefixes')
>>> len(rows)
1
Source code in diamond_miner/queries/get_prefixes.py
@dataclass(frozen=True)
class GetPrefixes(ResultsQuery):
    """
    Return the destination prefixes for which replies have been received.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetPrefixes
        >>> from ipaddress import ip_network
        >>> rows = GetPrefixes().execute(client, 'test_nsdi_example')
        >>> len(rows)
        1
        >>> rows = GetPrefixes().execute(client, 'test_invalid_prefixes')
        >>> len(rows)
        3
        >>> rows = GetPrefixes(reply_src_addr_in=ip_network("150.0.1.0/24")).execute(client, 'test_invalid_prefixes')
        >>> len(rows)
        1
    """

    reply_src_addr_in: IPNetwork | None = None
    "If specified, keep only the replies from this network."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        # The prefixes table doesn't contains network information, so we
        # need to join the results table for these filters.
        join_fragment = ""
        if self.reply_src_addr_in:
            join_fragment = f"""
            INNER JOIN (
                SELECT DISTINCT probe_protocol, probe_src_addr, probe_dst_prefix
                FROM {results_table(measurement_id)}
                WHERE {self.filters(subset)} AND {ip_in('reply_src_addr', self.reply_src_addr_in)}
            ) AS results
            ON  prefixes.probe_protocol   = results.probe_protocol
            AND prefixes.probe_src_addr   = results.probe_src_addr
            AND prefixes.probe_dst_prefix = results.probe_dst_prefix
            """
        return f"""
        SELECT probe_dst_prefix, has_amplification, has_loops
        FROM {prefixes_table(measurement_id)} AS prefixes
        {join_fragment}
        ORDER BY {CreatePrefixesTable.SORTING_KEY}
        """

reply_src_addr_in: IPNetwork | None = None class-attribute instance-attribute

If specified, keep only the replies from this network.

GetPrefixesWithAmplification dataclass

Bases: ResultsQuery

Return the prefixes for which we have more than one reply per (flow ID, TTL).

Important

This query assumes that a single probe is sent per (flow ID, TTL) pair.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetPrefixesWithAmplification
>>> rows = GetPrefixesWithAmplification().execute(client, "test_invalid_prefixes")
>>> [x["probe_dst_prefix"] for x in rows]
['::ffff:202.0.0.0']
Source code in diamond_miner/queries/get_invalid_prefixes.py
@dataclass(frozen=True)
class GetPrefixesWithAmplification(ResultsQuery):
    """
    Return the prefixes for which we have more than one reply per (flow ID, TTL).

    Important:
        This query assumes that a single probe is sent per (flow ID, TTL) pair.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetPrefixesWithAmplification
        >>> rows = GetPrefixesWithAmplification().execute(client, "test_invalid_prefixes")
        >>> [x["probe_dst_prefix"] for x in rows]
        ['::ffff:202.0.0.0']
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        return f"""
        SELECT DISTINCT
            probe_protocol,
            probe_src_addr,
            probe_dst_prefix,
            -- This column is to simplify the InsertPrefixes query.
            1 AS has_amplification
        FROM {results_table(measurement_id)}
        WHERE {self.filters(subset)}
        GROUP BY (
            probe_protocol,
            probe_src_addr,
            probe_dst_prefix,
            probe_dst_addr,
            probe_src_port,
            probe_dst_port,
            probe_ttl
        )
        HAVING count() > 1
        """

GetPrefixesWithLoops dataclass

Bases: ResultsQuery

Return the prefixes for which an IP address appears multiple time for a single flow ID.

Important

Prefixes with amplification (multiple replies per probe) may trigger a false positive for this query, since we do not check that the IP appears at two different TTLs.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetPrefixesWithLoops
>>> GetPrefixesWithLoops().execute(client, "test_invalid_prefixes")
[{'probe_protocol': 1, 'probe_src_addr': '::ffff:100.0.0.1', 'probe_dst_prefix': '::ffff:201.0.0.0', 'has_loops': 1}]
Source code in diamond_miner/queries/get_invalid_prefixes.py
@dataclass(frozen=True)
class GetPrefixesWithLoops(ResultsQuery):
    """
    Return the prefixes for which an IP address appears multiple time for a single flow ID.

    Important:
        Prefixes with amplification (multiple replies per probe) may trigger a false positive
        for this query, since we do not check that the IP appears at two *different* TTLs.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetPrefixesWithLoops
        >>> GetPrefixesWithLoops().execute(client, "test_invalid_prefixes")
        [{'probe_protocol': 1, 'probe_src_addr': '::ffff:100.0.0.1', 'probe_dst_prefix': '::ffff:201.0.0.0', 'has_loops': 1}]
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        return f"""
        SELECT DISTINCT
            probe_protocol,
            probe_src_addr,
            probe_dst_prefix,
            -- This column is to simplify the InsertPrefixes query.
            1 AS has_loops
        FROM {results_table(measurement_id)}
        WHERE {self.filters(subset)}
        GROUP BY (
            probe_protocol,
            probe_src_addr,
            probe_dst_prefix,
            probe_dst_addr,
            probe_src_port,
            probe_dst_port
        )
        HAVING uniqExact(reply_src_addr) < count()
        """

GetProbes

Bases: ProbesQuery

Return the cumulative number of probes sent at a specified round.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetProbes
>>> row = GetProbes(round_eq=1).execute(client, 'test_nsdi_example')[0]
>>> row["probe_protocol"]
1
>>> row["probe_dst_prefix"]
'::ffff:200.0.0.0'
>>> sorted(row["probes_per_ttl"])
[[1, 6], [2, 6], [3, 6], [4, 6]]
>>> row = GetProbes(round_eq=2).execute(client, 'test_nsdi_example')[0]
>>> sorted(row["probes_per_ttl"])
[[1, 11], [2, 18], [3, 18], [4, 18]]
>>> row = GetProbes(round_eq=3).execute(client, 'test_nsdi_example')[0]
>>> sorted(row["probes_per_ttl"])
[[1, 11], [2, 20], [3, 27], [4, 27]]
>>> row = GetProbes(round_eq=3, probe_ttl_geq=2, probe_ttl_leq=3).execute(client, 'test_nsdi_example')[0]
>>> sorted(row["probes_per_ttl"])
[[2, 20], [3, 27]]
>>> GetProbes(round_eq=4).execute(client, 'test_nsdi_example')
[]
Source code in diamond_miner/queries/get_probes.py
class GetProbes(ProbesQuery):
    """
    Return the cumulative number of probes sent at a specified round.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetProbes
        >>> row = GetProbes(round_eq=1).execute(client, 'test_nsdi_example')[0]
        >>> row["probe_protocol"]
        1
        >>> row["probe_dst_prefix"]
        '::ffff:200.0.0.0'
        >>> sorted(row["probes_per_ttl"])
        [[1, 6], [2, 6], [3, 6], [4, 6]]
        >>> row = GetProbes(round_eq=2).execute(client, 'test_nsdi_example')[0]
        >>> sorted(row["probes_per_ttl"])
        [[1, 11], [2, 18], [3, 18], [4, 18]]
        >>> row = GetProbes(round_eq=3).execute(client, 'test_nsdi_example')[0]
        >>> sorted(row["probes_per_ttl"])
        [[1, 11], [2, 20], [3, 27], [4, 27]]
        >>> row = GetProbes(round_eq=3, probe_ttl_geq=2, probe_ttl_leq=3).execute(client, 'test_nsdi_example')[0]
        >>> sorted(row["probes_per_ttl"])
        [[2, 20], [3, 27]]
        >>> GetProbes(round_eq=4).execute(client, 'test_nsdi_example')
        []
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        assert self.round_eq, "`round_eq` must be specified."
        return f"""
        SELECT
            probe_protocol,
            probe_dst_prefix,
            groupArray((probe_ttl, cumulative_probes)) AS probes_per_ttl
        FROM {probes_table(measurement_id)}
        WHERE {self.filters(subset)}
        GROUP BY (probe_protocol, probe_dst_prefix)
        """

GetProbesDiff

Bases: ProbesQuery

Return the number of probes sent at a specific round and at the previous round.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetProbesDiff
>>> row = GetProbesDiff(round_eq=1).execute(client, 'test_nsdi_example')[0]
>>> row["probe_protocol"]
1
>>> row["probe_dst_prefix"]
'::ffff:200.0.0.0'
>>> sorted(row["probes_per_ttl"])
[[1, 6, 0], [2, 6, 0], [3, 6, 0], [4, 6, 0]]
>>> row = GetProbesDiff(round_eq=2).execute(client, 'test_nsdi_example')[0]
>>> sorted(row["probes_per_ttl"])
[[1, 11, 6], [2, 18, 6], [3, 18, 6], [4, 18, 6]]
>>> row = GetProbesDiff(round_eq=3).execute(client, 'test_nsdi_example')[0]
>>> sorted(row["probes_per_ttl"])
[[1, 11, 11], [2, 20, 18], [3, 27, 18], [4, 27, 18]]
>>> GetProbesDiff(round_eq=4).execute(client, 'test_nsdi_example')
[]
Source code in diamond_miner/queries/get_probes.py
class GetProbesDiff(ProbesQuery):
    """
    Return the number of probes sent at a specific round and at the previous round.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetProbesDiff
        >>> row = GetProbesDiff(round_eq=1).execute(client, 'test_nsdi_example')[0]
        >>> row["probe_protocol"]
        1
        >>> row["probe_dst_prefix"]
        '::ffff:200.0.0.0'
        >>> sorted(row["probes_per_ttl"])
        [[1, 6, 0], [2, 6, 0], [3, 6, 0], [4, 6, 0]]
        >>> row = GetProbesDiff(round_eq=2).execute(client, 'test_nsdi_example')[0]
        >>> sorted(row["probes_per_ttl"])
        [[1, 11, 6], [2, 18, 6], [3, 18, 6], [4, 18, 6]]
        >>> row = GetProbesDiff(round_eq=3).execute(client, 'test_nsdi_example')[0]
        >>> sorted(row["probes_per_ttl"])
        [[1, 11, 11], [2, 20, 18], [3, 27, 18], [4, 27, 18]]
        >>> GetProbesDiff(round_eq=4).execute(client, 'test_nsdi_example')
        []
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        assert self.round_eq
        return f"""
        SELECT
            current.probe_protocol,
            current.probe_dst_prefix,
            groupArray((current.probe_ttl, current.cumulative_probes, previous.cumulative_probes)) AS probes_per_ttl
        FROM {probes_table(measurement_id)} AS current
        LEFT JOIN (
            SELECT
                probe_protocol,
                probe_dst_prefix,
                probe_ttl,
                cumulative_probes
            FROM {probes_table(measurement_id)}
            WHERE {ip_in("probe_dst_prefix", subset)} AND round = {self.round_eq - 1}
        ) AS previous
        ON current.probe_protocol = previous.probe_protocol
        AND current.probe_dst_prefix = previous.probe_dst_prefix
        AND current.probe_ttl = previous.probe_ttl
        WHERE {self.filters(subset)}
        GROUP BY (current.probe_protocol, current.probe_dst_prefix)
        """

GetResults dataclass

Bases: ResultsQuery

Return all the columns from the results table.

Examples:

>>> from diamond_miner.test import client
>>> from diamond_miner.queries import GetResults
>>> rows = GetResults().execute(client, 'test_nsdi_example')
>>> len(rows)
85
Source code in diamond_miner/queries/get_results.py
@dataclass(frozen=True)
class GetResults(ResultsQuery):
    """
    Return all the columns from the results table.

    Examples:
        >>> from diamond_miner.test import client
        >>> from diamond_miner.queries import GetResults
        >>> rows = GetResults().execute(client, 'test_nsdi_example')
        >>> len(rows)
        85
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        return f"""
        SELECT *
        FROM {results_table(measurement_id)}
        WHERE {self.filters(subset)}
        """

GetSlidingPrefixes dataclass

Bases: ResultsQuery

Get the prefixes to probe for a given sliding window.

Source code in diamond_miner/queries/get_sliding_prefixes.py
@dataclass(frozen=True)
class GetSlidingPrefixes(ResultsQuery):
    """
    Get the prefixes to probe for a given sliding window.
    """

    stopping_condition: int = 0
    "Number of stars to allow."

    window_max_ttl: int = 0
    "Set to 0 to return every prefix."

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        assert self.filter_destination_host and self.time_exceeded_only
        return f"""
        SELECT
            probe_protocol,
            probe_src_addr,
            probe_dst_prefix
        FROM {results_table(measurement_id)}
        WHERE {self.filters(subset)}
        AND probe_ttl >= {self.window_max_ttl - self.stopping_condition}
        GROUP BY (
            probe_protocol,
            probe_src_addr,
            probe_dst_prefix
        )
        """

stopping_condition: int = 0 class-attribute instance-attribute

Number of stars to allow.

window_max_ttl: int = 0 class-attribute instance-attribute

Set to 0 to return every prefix.

Bases: ResultsQuery

Insert the results of the GetLinksFromResults query into the links table.

Source code in diamond_miner/queries/insert_links.py
@dataclass(frozen=True)
class InsertLinks(ResultsQuery):
    """
    Insert the results of the `GetLinksFromResults` query into the links table.
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        links_query = GetLinksFromResults(**asdict(self)).statement(
            measurement_id, subset
        )
        return f"""
        INSERT INTO {links_table(measurement_id)}
        SELECT * FROM ({links_query})
        """

InsertMDAProbes dataclass

Bases: GetMDAProbes

Insert the result of the GetMDAProbes queries into the probes table.

Source code in diamond_miner/queries/insert_mda_probes.py
@dataclass(frozen=True)
class InsertMDAProbes(GetMDAProbes):
    """
    Insert the result of the `GetMDAProbes` queries
    into the probes table.
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        assert self.round_leq
        return f"""
        INSERT INTO {probes_table(measurement_id)}
        WITH
            arrayJoin(arrayZip(TTLs, cumulative_probes)) AS ttl_probe
        SELECT
            probe_protocol,
            probe_dst_prefix,
            ttl_probe.1,
            ttl_probe.2,
            {self.round_leq + 1}
        FROM ({super().statement(measurement_id, subset)})
        """

InsertPrefixes dataclass

Bases: ResultsQuery

Insert the results of the GetPrefixesWithAmplification and GetPrefixesWithLoops queries into the prefixes table.

Source code in diamond_miner/queries/insert_prefixes.py
@dataclass(frozen=True)
class InsertPrefixes(ResultsQuery):
    """
    Insert the results of the `GetPrefixesWithAmplification` and `GetPrefixesWithLoops` queries into the prefixes table.
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        amplification_query = GetPrefixesWithAmplification(**asdict(self)).statement(
            measurement_id, subset
        )
        loops_query = GetPrefixesWithLoops(**asdict(self)).statement(
            measurement_id, subset
        )
        return f"""
        INSERT INTO {prefixes_table(measurement_id)}
        SELECT
            prefixes.probe_protocol,
            prefixes.probe_src_addr,
            prefixes.probe_dst_prefix,
            amplification.has_amplification,
            loops.has_loops
        FROM (
            SELECT DISTINCT probe_protocol, probe_src_addr, probe_dst_prefix
            FROM {results_table(measurement_id)}
            WHERE {self.filters(subset)}
        ) AS prefixes
        FULL OUTER JOIN ({amplification_query}) AS amplification
        ON  prefixes.probe_protocol   = amplification.probe_protocol
        AND prefixes.probe_src_addr   = amplification.probe_src_addr
        AND prefixes.probe_dst_prefix = amplification.probe_dst_prefix
        FULL OUTER JOIN ({loops_query}) AS loops
        ON  prefixes.probe_protocol   = loops.probe_protocol
        AND prefixes.probe_src_addr   = loops.probe_src_addr
        AND prefixes.probe_dst_prefix = loops.probe_dst_prefix
        WHERE prefixes.probe_dst_prefix != toIPv6('::')
        """

InsertResults dataclass

Bases: Query

Insert measurement results from a CSV file.

Source code in diamond_miner/queries/insert_results.py
@dataclass(frozen=True)
class InsertResults(Query):
    """
    Insert measurement results from a CSV file.
    """

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        return f"INSERT INTO {results_table(measurement_id)} FORMAT CSVWithNames"

LinksQuery dataclass

Bases: Query

Base class for queries on the links table.

Source code in diamond_miner/queries/query.py
@dataclass(frozen=True)
class LinksQuery(Query):
    """Base class for queries on the links table."""

    filter_inter_round: bool = False
    "If true, exclude links inferred across rounds."

    filter_partial: bool = False
    "If true, exclude partial links: `('::', node)` and `(node, '::')`."

    filter_virtual: bool = False
    "If true, exclude virtual links: `('::', '::')`."

    near_or_far_addr: str | None = None
    "If specified, keep only the links that contains this IP address."

    probe_protocol: int | None = None
    "If specified, keep only the links inferred from probes sent with this protocol."

    probe_src_addr: str | None = None
    """
    If specified, keep only the links inferred from probes sent by this address.
    This filter is relatively costly (IPv6 comparison on each row).
    """

    round_eq: int | None = None
    "If specified, keep only the links from this round."

    round_leq: int | None = None
    "If specified, keep only the links from this round or before."

    def filters(self, subset: IPNetwork) -> str:
        """`WHERE` clause common to all queries on the links table."""
        s = []
        if subset != UNIVERSE_SUBSET:
            s += [ip_in("probe_dst_prefix", subset)]
        if self.probe_protocol:
            s += [eq("probe_protocol", self.probe_protocol)]
        if self.probe_src_addr:
            s += [ip_eq("probe_src_addr", self.probe_src_addr)]
        if self.round_eq:
            s += [eq("near_round", self.round_eq), eq("far_round", self.round_eq)]
        if self.round_leq:
            s += [leq("near_round", self.round_leq), leq("far_round", self.round_leq)]
        if self.near_or_far_addr:
            s += [
                or_(
                    ip_eq("near_addr", self.near_or_far_addr),
                    ip_eq("far_addr", self.near_or_far_addr),
                )
            ]
        if self.filter_inter_round:
            s += [not_("is_inter_round")]
        if self.filter_partial:
            s += [not_("is_partial")]
        if self.filter_virtual:
            s += [not_("is_virtual")]
        return reduce(and_, s or ["1"])

filter_inter_round: bool = False class-attribute instance-attribute

If true, exclude links inferred across rounds.

filter_partial: bool = False class-attribute instance-attribute

If true, exclude partial links: ('::', node) and (node, '::').

filter_virtual: bool = False class-attribute instance-attribute

If true, exclude virtual links: ('::', '::').

near_or_far_addr: str | None = None class-attribute instance-attribute

If specified, keep only the links that contains this IP address.

probe_protocol: int | None = None class-attribute instance-attribute

If specified, keep only the links inferred from probes sent with this protocol.

probe_src_addr: str | None = None class-attribute instance-attribute

If specified, keep only the links inferred from probes sent by this address. This filter is relatively costly (IPv6 comparison on each row).

round_eq: int | None = None class-attribute instance-attribute

If specified, keep only the links from this round.

round_leq: int | None = None class-attribute instance-attribute

If specified, keep only the links from this round or before.

filters(subset)

WHERE clause common to all queries on the links table.

Source code in diamond_miner/queries/query.py
def filters(self, subset: IPNetwork) -> str:
    """`WHERE` clause common to all queries on the links table."""
    s = []
    if subset != UNIVERSE_SUBSET:
        s += [ip_in("probe_dst_prefix", subset)]
    if self.probe_protocol:
        s += [eq("probe_protocol", self.probe_protocol)]
    if self.probe_src_addr:
        s += [ip_eq("probe_src_addr", self.probe_src_addr)]
    if self.round_eq:
        s += [eq("near_round", self.round_eq), eq("far_round", self.round_eq)]
    if self.round_leq:
        s += [leq("near_round", self.round_leq), leq("far_round", self.round_leq)]
    if self.near_or_far_addr:
        s += [
            or_(
                ip_eq("near_addr", self.near_or_far_addr),
                ip_eq("far_addr", self.near_or_far_addr),
            )
        ]
    if self.filter_inter_round:
        s += [not_("is_inter_round")]
    if self.filter_partial:
        s += [not_("is_partial")]
    if self.filter_virtual:
        s += [not_("is_virtual")]
    return reduce(and_, s or ["1"])

PrefixesQuery dataclass

Bases: Query

Base class for queries on the prefixes table.

Source code in diamond_miner/queries/query.py
@dataclass(frozen=True)
class PrefixesQuery(Query):
    """Base class for queries on the prefixes table."""

    probe_protocol: int | None = None
    "If specified, keep only the links inferred from probes sent with this protocol."

    probe_src_addr: str | None = None
    """
    If specified, keep only the links inferred from probes sent by this address.
    This filter is relatively costly (IPv6 comparison on each row).
    """

    def filters(self, subset: IPNetwork) -> str:
        """`WHERE` clause common to all queries on the prefixes table."""
        s = []
        if subset != UNIVERSE_SUBSET:
            s += [ip_in("probe_dst_prefix", subset)]
        if self.probe_protocol:
            s += [eq("probe_protocol", self.probe_protocol)]
        if self.probe_src_addr:
            s += [ip_eq("probe_src_addr", self.probe_src_addr)]
        return reduce(and_, s or ["1"])

probe_protocol: int | None = None class-attribute instance-attribute

If specified, keep only the links inferred from probes sent with this protocol.

probe_src_addr: str | None = None class-attribute instance-attribute

If specified, keep only the links inferred from probes sent by this address. This filter is relatively costly (IPv6 comparison on each row).

filters(subset)

WHERE clause common to all queries on the prefixes table.

Source code in diamond_miner/queries/query.py
def filters(self, subset: IPNetwork) -> str:
    """`WHERE` clause common to all queries on the prefixes table."""
    s = []
    if subset != UNIVERSE_SUBSET:
        s += [ip_in("probe_dst_prefix", subset)]
    if self.probe_protocol:
        s += [eq("probe_protocol", self.probe_protocol)]
    if self.probe_src_addr:
        s += [ip_eq("probe_src_addr", self.probe_src_addr)]
    return reduce(and_, s or ["1"])

ProbesQuery dataclass

Bases: Query

Base class for queries on the probes table.

Source code in diamond_miner/queries/query.py
@dataclass(frozen=True)
class ProbesQuery(Query):
    """Base class for queries on the probes table."""

    probe_protocol: int | None = None
    "If specified, keep only probes sent with this protocol."

    probe_ttl_geq: int | None = None
    "If specified, keep only the probes with TTL >= this value."

    probe_ttl_leq: int | None = None
    "If specified, keep only the probes with TTL <= this value."

    round_eq: int | None = None
    "If specified, keep only the probes from this round."

    round_geq: int | None = None
    "If specified, keep only the probes from this round or after."

    round_leq: int | None = None
    "If specified, keep only the probes from this round or before."

    round_lt: int | None = None
    "If specified, keep only the probes from before this round."

    def filters(self, subset: IPNetwork) -> str:
        """`WHERE` clause common to all queries on the probes table."""
        s = []
        if subset != UNIVERSE_SUBSET:
            s += [ip_in("probe_dst_prefix", subset)]
        if self.probe_protocol:
            s += [eq("probe_protocol", self.probe_protocol)]
        if self.probe_ttl_geq:
            s += [geq("probe_ttl", self.probe_ttl_geq)]
        if self.probe_ttl_leq:
            s += [leq("probe_ttl", self.probe_ttl_leq)]
        if self.round_eq:
            s += [eq("round", self.round_eq)]
        if self.round_geq:
            s += [geq("round", self.round_geq)]
        if self.round_lt:
            s += [lt("round", self.round_lt)]
        if self.round_leq:
            s += [leq("round", self.round_leq)]
        return reduce(and_, s or ["1"])

probe_protocol: int | None = None class-attribute instance-attribute

If specified, keep only probes sent with this protocol.

probe_ttl_geq: int | None = None class-attribute instance-attribute

If specified, keep only the probes with TTL >= this value.

probe_ttl_leq: int | None = None class-attribute instance-attribute

If specified, keep only the probes with TTL <= this value.

round_eq: int | None = None class-attribute instance-attribute

If specified, keep only the probes from this round.

round_geq: int | None = None class-attribute instance-attribute

If specified, keep only the probes from this round or after.

round_leq: int | None = None class-attribute instance-attribute

If specified, keep only the probes from this round or before.

round_lt: int | None = None class-attribute instance-attribute

If specified, keep only the probes from before this round.

filters(subset)

WHERE clause common to all queries on the probes table.

Source code in diamond_miner/queries/query.py
def filters(self, subset: IPNetwork) -> str:
    """`WHERE` clause common to all queries on the probes table."""
    s = []
    if subset != UNIVERSE_SUBSET:
        s += [ip_in("probe_dst_prefix", subset)]
    if self.probe_protocol:
        s += [eq("probe_protocol", self.probe_protocol)]
    if self.probe_ttl_geq:
        s += [geq("probe_ttl", self.probe_ttl_geq)]
    if self.probe_ttl_leq:
        s += [leq("probe_ttl", self.probe_ttl_leq)]
    if self.round_eq:
        s += [eq("round", self.round_eq)]
    if self.round_geq:
        s += [geq("round", self.round_geq)]
    if self.round_lt:
        s += [lt("round", self.round_lt)]
    if self.round_leq:
        s += [leq("round", self.round_leq)]
    return reduce(and_, s or ["1"])

Query dataclass

Base class for every query.

Source code in diamond_miner/queries/query.py
@dataclass(frozen=True)
class Query:
    """Base class for every query."""

    @property
    def name(self) -> str:
        return self.__class__.__name__

    def statement(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> str:
        # As a query user, prefer calling `statements` instead of `statement` as there
        # is no guarantees that the query will implement this method and return a single statement.
        raise NotImplementedError

    def statements(
        self, measurement_id: str, subset: IPNetwork = UNIVERSE_SUBSET
    ) -> Sequence[str]:
        # Override this method if you want your query to return multiple statements.
        return (self.statement(measurement_id, subset),)

    def execute(
        self,
        client: ClickHouseClient,
        measurement_id: str,
        *,
        data: Any | None = None,
        limit: tuple[int, int] | None = None,
        subsets: Iterable[IPNetwork] = (UNIVERSE_SUBSET,),
    ) -> list[dict]:
        """
        Execute the query and return each row as a dict.
        Args:
            client: ClickHouse client.
            measurement_id: Measurement id.
            data: str or bytes iterator containing data to send.
            limit: (limit, offset) tuple.
            subsets: Iterable of IP networks on which to execute the query independently.
        """
        rows = []
        for subset in subsets:
            for i, statement in enumerate(self.statements(measurement_id, subset)):
                with LoggingTimer(
                    logger,
                    f"query={self.name}#{i} measurement_id={measurement_id} subset={subset} limit={limit}",
                ):
                    settings = dict(
                        limit=limit[0] if limit else 0,
                        offset=limit[1] if limit else 0,
                    )
                    rows += client.json(statement, data=data, settings=settings)
        return rows

    def execute_iter(
        self,
        client: ClickHouseClient,
        measurement_id: str,
        *,
        data: Any | None = None,
        limit: tuple[int, int] | None = None,
        subsets: Iterable[IPNetwork] = (UNIVERSE_SUBSET,),
    ) -> Iterator[dict]:
        """
        Execute the query and return each row as a dict, as they are received from the database.
        """
        for subset in subsets:
            for i, statement in enumerate(self.statements(measurement_id, subset)):
                with LoggingTimer(
                    logger,
                    f"query={self.name}#{i} measurement_id={measurement_id} subset={subset} limit={limit}",
                ):
                    settings = dict(
                        limit=limit[0] if limit else 0,
                        offset=limit[1] if limit else 0,
                    )
                    yield from client.iter_json(statement, data=data, settings=settings)

    def execute_concurrent(
        self,
        client: ClickHouseClient,
        measurement_id: str,
        *,
        subsets: Iterable[IPNetwork] = (UNIVERSE_SUBSET,),
        limit: tuple[int, int] | None = None,
        concurrent_requests: int = max(available_cpus() // 8, 1),
    ) -> None:
        """
        Execute the query concurrently on the specified subsets.
        """
        logger.info("query=%s concurrent_requests=%s", self.name, concurrent_requests)
        with ThreadPoolExecutor(concurrent_requests) as executor:
            futures = [
                executor.submit(
                    self.execute,
                    client=client,
                    measurement_id=measurement_id,
                    subsets=(subset,),
                    limit=limit,
                )
                for subset in subsets
            ]
            for future in as_completed(futures):
                future.result()

execute(client, measurement_id, *, data=None, limit=None, subsets=(UNIVERSE_SUBSET))

Execute the query and return each row as a dict. Args: client: ClickHouse client. measurement_id: Measurement id. data: str or bytes iterator containing data to send. limit: (limit, offset) tuple. subsets: Iterable of IP networks on which to execute the query independently.

Source code in diamond_miner/queries/query.py
def execute(
    self,
    client: ClickHouseClient,
    measurement_id: str,
    *,
    data: Any | None = None,
    limit: tuple[int, int] | None = None,
    subsets: Iterable[IPNetwork] = (UNIVERSE_SUBSET,),
) -> list[dict]:
    """
    Execute the query and return each row as a dict.
    Args:
        client: ClickHouse client.
        measurement_id: Measurement id.
        data: str or bytes iterator containing data to send.
        limit: (limit, offset) tuple.
        subsets: Iterable of IP networks on which to execute the query independently.
    """
    rows = []
    for subset in subsets:
        for i, statement in enumerate(self.statements(measurement_id, subset)):
            with LoggingTimer(
                logger,
                f"query={self.name}#{i} measurement_id={measurement_id} subset={subset} limit={limit}",
            ):
                settings = dict(
                    limit=limit[0] if limit else 0,
                    offset=limit[1] if limit else 0,
                )
                rows += client.json(statement, data=data, settings=settings)
    return rows

execute_concurrent(client, measurement_id, *, subsets=(UNIVERSE_SUBSET), limit=None, concurrent_requests=max(available_cpus() // 8, 1))

Execute the query concurrently on the specified subsets.

Source code in diamond_miner/queries/query.py
def execute_concurrent(
    self,
    client: ClickHouseClient,
    measurement_id: str,
    *,
    subsets: Iterable[IPNetwork] = (UNIVERSE_SUBSET,),
    limit: tuple[int, int] | None = None,
    concurrent_requests: int = max(available_cpus() // 8, 1),
) -> None:
    """
    Execute the query concurrently on the specified subsets.
    """
    logger.info("query=%s concurrent_requests=%s", self.name, concurrent_requests)
    with ThreadPoolExecutor(concurrent_requests) as executor:
        futures = [
            executor.submit(
                self.execute,
                client=client,
                measurement_id=measurement_id,
                subsets=(subset,),
                limit=limit,
            )
            for subset in subsets
        ]
        for future in as_completed(futures):
            future.result()

execute_iter(client, measurement_id, *, data=None, limit=None, subsets=(UNIVERSE_SUBSET))

Execute the query and return each row as a dict, as they are received from the database.

Source code in diamond_miner/queries/query.py
def execute_iter(
    self,
    client: ClickHouseClient,
    measurement_id: str,
    *,
    data: Any | None = None,
    limit: tuple[int, int] | None = None,
    subsets: Iterable[IPNetwork] = (UNIVERSE_SUBSET,),
) -> Iterator[dict]:
    """
    Execute the query and return each row as a dict, as they are received from the database.
    """
    for subset in subsets:
        for i, statement in enumerate(self.statements(measurement_id, subset)):
            with LoggingTimer(
                logger,
                f"query={self.name}#{i} measurement_id={measurement_id} subset={subset} limit={limit}",
            ):
                settings = dict(
                    limit=limit[0] if limit else 0,
                    offset=limit[1] if limit else 0,
                )
                yield from client.iter_json(statement, data=data, settings=settings)

ResultsQuery dataclass

Bases: Query

Base class for queries on the results table.

Source code in diamond_miner/queries/query.py
@dataclass(frozen=True)
class ResultsQuery(Query):
    """Base class for queries on the results table."""

    filter_destination_host: bool = True
    "If true, ignore the replies from the destination host."

    filter_destination_prefix: bool = True
    "If true, ignore the replies from the destination prefix."

    filter_private: bool = True
    "If true, ignore the replies from private IP addresses."

    filter_invalid_probe_protocol: bool = True
    "If true, ignore the replies with probe protocol ≠ ICMP, ICMPv6 or UDP."

    time_exceeded_only: bool = True
    "If true, ignore non ICMP time exceeded replies."

    probe_protocol: int | None = None
    "If specified, keep only the replies to probes sent with this protocol."

    probe_src_addr: str | None = None
    """
    If specified, keep only the replies to probes sent by this address.
    This filter is relatively costly (IPv6 comparison on each row).
    """

    round_eq: int | None = None
    "If specified, keep only the replies from this round."

    round_leq: int | None = None
    "If specified, keep only the replies from this round or before."

    def filters(self, subset: IPNetwork) -> str:
        """`WHERE` clause common to all queries on the results table."""
        s = []
        if subset != UNIVERSE_SUBSET:
            s += [ip_in("probe_dst_prefix", subset)]
        if self.probe_protocol:
            s += [eq("probe_protocol", self.probe_protocol)]
        if self.probe_src_addr:
            s += [ip_eq("probe_src_addr", self.probe_src_addr)]
        if self.round_eq:
            s += [eq("round", self.round_eq)]
        if self.round_leq:
            s += [leq("round", self.round_leq)]
        if self.filter_destination_host:
            s += [not_("destination_host_reply")]
        if self.filter_destination_prefix:
            s += [not_("destination_prefix_reply")]
        if self.filter_private:
            s += [not_("private_probe_dst_prefix"), not_("private_reply_src_addr")]
        if self.time_exceeded_only:
            s += ["time_exceeded_reply"]
        if self.filter_invalid_probe_protocol:
            s += ["valid_probe_protocol"]
        return reduce(and_, s or ["1"])

filter_destination_host: bool = True class-attribute instance-attribute

If true, ignore the replies from the destination host.

filter_destination_prefix: bool = True class-attribute instance-attribute

If true, ignore the replies from the destination prefix.

filter_invalid_probe_protocol: bool = True class-attribute instance-attribute

If true, ignore the replies with probe protocol ≠ ICMP, ICMPv6 or UDP.

filter_private: bool = True class-attribute instance-attribute

If true, ignore the replies from private IP addresses.

probe_protocol: int | None = None class-attribute instance-attribute

If specified, keep only the replies to probes sent with this protocol.

probe_src_addr: str | None = None class-attribute instance-attribute

If specified, keep only the replies to probes sent by this address. This filter is relatively costly (IPv6 comparison on each row).

round_eq: int | None = None class-attribute instance-attribute

If specified, keep only the replies from this round.

round_leq: int | None = None class-attribute instance-attribute

If specified, keep only the replies from this round or before.

time_exceeded_only: bool = True class-attribute instance-attribute

If true, ignore non ICMP time exceeded replies.

filters(subset)

WHERE clause common to all queries on the results table.

Source code in diamond_miner/queries/query.py
def filters(self, subset: IPNetwork) -> str:
    """`WHERE` clause common to all queries on the results table."""
    s = []
    if subset != UNIVERSE_SUBSET:
        s += [ip_in("probe_dst_prefix", subset)]
    if self.probe_protocol:
        s += [eq("probe_protocol", self.probe_protocol)]
    if self.probe_src_addr:
        s += [ip_eq("probe_src_addr", self.probe_src_addr)]
    if self.round_eq:
        s += [eq("round", self.round_eq)]
    if self.round_leq:
        s += [leq("round", self.round_leq)]
    if self.filter_destination_host:
        s += [not_("destination_host_reply")]
    if self.filter_destination_prefix:
        s += [not_("destination_prefix_reply")]
    if self.filter_private:
        s += [not_("private_probe_dst_prefix"), not_("private_reply_src_addr")]
    if self.time_exceeded_only:
        s += ["time_exceeded_reply"]
    if self.filter_invalid_probe_protocol:
        s += ["valid_probe_protocol"]
    return reduce(and_, s or ["1"])

StoragePolicy dataclass

Source code in diamond_miner/queries/query.py
@dataclass(frozen=True)
class StoragePolicy:
    """
    - [TTL for Columns and Tables](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree/#table_engine-mergetree-ttl)
    - [Using Multiple Block Devices for Data Storage](https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/mergetree/#table_engine-mergetree-multiple-volumes)
    """

    name: str = "default"
    """Name of the ClickHouse storage policy to use for the table."""
    archive_to: str = "default"
    """Name of the ClickHouse archive volume."""
    archive_on: datetime = datetime(2100, 1, 1)
    """Date at which the table will be moved to the archive volume."""

archive_on: datetime = datetime(2100, 1, 1) class-attribute instance-attribute

Date at which the table will be moved to the archive volume.

archive_to: str = 'default' class-attribute instance-attribute

Name of the ClickHouse archive volume.

name: str = 'default' class-attribute instance-attribute

Name of the ClickHouse storage policy to use for the table.

Returns the name of the links table.

Source code in diamond_miner/queries/query.py
def links_table(measurement_id: str) -> str:
    """Returns the name of the links table."""
    return f"links__{measurement_id}".replace("-", "_")

prefixes_table(measurement_id)

Returns the name of the prefixes table.

Source code in diamond_miner/queries/query.py
def prefixes_table(measurement_id: str) -> str:
    """Returns the name of the prefixes table."""
    return f"prefixes__{measurement_id}".replace("-", "_")

probes_table(measurement_id)

Returns the name of the probes table.

Source code in diamond_miner/queries/query.py
def probes_table(measurement_id: str) -> str:
    """Returns the name of the probes table."""
    return f"probes__{measurement_id}".replace("-", "_")

results_table(measurement_id)

Returns the name of the results table.

Source code in diamond_miner/queries/query.py
def results_table(measurement_id: str) -> str:
    """Returns the name of the results table."""
    return f"results__{measurement_id}".replace("-", "_")