#!/usr/bin/perl -w

#
# message_to_rssacint
#
# Copyright (C) 2016-2023 University of Southern California.
# All rights reserved.                                            
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License,
# version 2, as published by the Free Software Foundation.
# 
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
# 
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
# 

=head1 NAME

message_to_rssacint - convert dnsanon messages to RSSAC intermediate format

=head1 SYNOPSIS

message_to_rssacint < foo.message.fsdb > foo.rssacint

=head1 DESCRIPTION

This program reads dnsanon message (or message_question) files and produces RSSAC
intermediate format, a simple key-value format that is all one needs
to compute RSSAC-002 statistics.

It runs in linear time and has constant memory usage.

The output format ("rssacint") is documented in L<rssacint_reduce(1)>.

=head1 OPTIONS

=over

=item B<--file-seqno>

File sequence number to pass along in output.  (Default: none.)
Format is either an integer (1), or site:integer (like lax:1)
or "comment".  If given as "comment", it extracts site:seqno from a comment
that matches "#.*dnsanon.*-f \d+-\d+-(\d+)\.([\-\w]+)"
or "#.*dnstapmq.* \d+-\d+-(\d+)\.([\-\w]+)".

=item B<--queries-each-second>

Report number of queries and responses, binned in each second.
(Default: off.)

=item B<--no-active-ranges>

Report time ranges traffic is received.  (Default: on.)

=item B<--transactions>

Report specific transactions, to estimate transaction time  (Default: off.)

=item B<--no-cache-easy>

Disable caching of "easy" keys.  (Default: they are cached.)

=item B<--no-prefixes>

Disable emission of +6a records for v6 /64s and +6b records for v4 /24s.
(Default: they are emitted.)

=item B<-d>

Enable debugging output.

=item B<-v>

Enable verbose output.

=item B<--help>

Show help.

=item B<--man>

Show full manual.

=back


=head1 OUTPUT

By default, we output the following information:

=over

=item B<+64:192.0.2.0>

How many queries are sent from a given IPv4 address.
IPv4 addresses must be a consistent format
(typically dotted-quad format with no leading zeros).
(Per-IP data for num-sources-ipv4 from RSSAC002v2 section 2.6.)

=item B<+66:2001:db8::1>

How many queries are sent from a given IPv6 address.
IPv6 addresses must be a consistent format
(typically IPv6 preferred form with zero replacement, form 2 in RFC-1884).
(Per-IP data for num-sources-ipv6 from RSSAC002v2 section 2.6.)

=item B<+6a:2001:db8::>

How many queries are sent from a given IPv6 /64 prefix.
IPv6 prefixes must be a consistent format
(typically IPv6 preferred form with zero replacement for the low 64 bits,
form 2 in RFC-1884).
(Per-IP-prefix data for num-sources-ipv6-aggregate RSSAC002v2 section 2.6.)


=item B<+74:2001:db8::>, B<+76:192.0.2.>

How many unique IPs are sent from the given v4 or v6 prefix.

=item B<+3t04>, B<+3t14>,  B<+3t06>, B<+3t16>, B<+3u04>, B<+3u14>,  B<+3u06>, B<+3u16>

Now many TCP (t), UDP (u), TLS (s), or HTTPS (h) 
queries were received (0) or responses were sent (1)
for with IPv4 (4) or IPv6 (6).
(RSSAC002v2 section 2.3.)

=item B<+4t0:100>, B<+4t1:100>, B<+4u0:100>, B<+4u1:100>

How many TCP (t), UDP (u), TLS (s), or HTTPS (h) 
queries were received (0) or responses were sent (1)
for with IPv4 (4) or IPv6 (6)
of a given size (the number after the :).
We process all sizes, and leave binning to L<rssacfin_to_rssacyaml(1)>.
(RSSAC002v2 section 2.4.)

=item B<+50:2>, B<+51:2>

How many
of each given DNS reply code (where reply code is the value after the colon),
queries were received (0) or responses were sent (1).
(RSSAC002v2 section 2.5.)

=item B<&lt;ts>

The earliest timestamp (in Unix epoch seconds) seen.
(Extra, not in RSSAC002.)

=item B<&gt;ts>

The latest timestamp (in Unix epoch seconds) seen.
(Extra, not in RSSAC002.)

=item B<-ts>

A list of all seconds (in Unix epoch seconds) that see traffic.
(Extra, not in RSSAC002.)

=item B<=rfileseqno:site>

A list of all file sequence numbers (given as integers)
seen for a given site.
(Extra, not in RSSAC002.)

=item B<+s:192.0.2.0>

The number of queries going to service IP address 192.0.2.0.
(Extra, not in RSSAC002.)

=back



=head1 SAMPLE USAGE

=head2 Input

	#fsdb -F t msgid time srcip srcport dstip dstport protocol id qr opcode aa tc rd ra z ad cd rcode qdcount ancount nscount arcount edns_present edns_udp_size edns_extended_rcode edns_version edns_z msglen 
	1	1451192632.254226	128.9.168.85	39142	192.228.79.201	53	udp	21461	0	0	0	0	1	0	0	0	1	0	1	0	0	0	1	4096	0	0	0	40
	2	1451192632.255816	192.228.79.201	53	128.9.168.85	39142	udp	21461	1	0	0	0	1	0	0	0	0	0	1	0	13	15	1	4096	0	0	0	528
	3	1451192637.645691	128.9.168.85	59556	192.228.79.201	53	udp	63206	0	0	0	0	1	0	0	0	1	0	1	0	0	0	1	4096	0	0	0	40
	4	1451192637.647542	192.228.79.201	53	128.9.168.85	59556	udp	63206	1	0	0	0	1	0	0	0	0	0	1	0	13	15	1	4096	0	0	0	525
	5	1451192652.006988	128.9.168.85	42548	192.228.79.201	53	tcp	33197	0	0	0	0	1	0	0	0	1	0	1	0	0	0	1	4096	0	0	0	32
	6	1451192652.008452	192.228.79.201	53	128.9.168.85	42548	tcp	33197	1	0	0	0	1	0	0	0	0	0	1	0	6	12	1	4096	0	0	0	434

(Input may have additional fields: ipttl, rtt, name, type, class.  If
so, they are ignored.  However, parsing does I<not> use proper fsdb
libraries, so the order of the fields in the input scheme is
manditory.)

=head2 Command

    ./message_to_rssacint --file-seqno=1

=head2 Output

	#fsdb -F t key count
	+64:128.9.168.85	1
	+64:128.9.168.85	1
	+64:128.9.168.85	1
	+3t04	1
	+3t14	1
	+3u04	2
	+3u14	2
	+4t0:32	1
	+4t1:434	1
	+4u0:40	2
	+4u1:525	1
	+4u1:528	1
	+50:0	3
	+51:0	3
	-rt	1451192632,1451192637,1451192652
	<ts	1451192632.254226
	=rfileseqno:-	1
	>te	1451192652.008452
	# message_to_rssacint.pl --file-seqno=1


=back

=cut

use strict;
use Pod::Usage;
use Getopt::Long;
use Net::IP;

Getopt::Long::Configure ("bundling");
pod2usage(2) if ($#ARGV >= 0 && $ARGV[0] eq '-?');
my(@orig_argv) = @ARGV;
my($prog) = $0;
my $debug = undef;
my $verbose = undef;
my $cache_easy = 1;
my $emit_prefixes = 1;
my $file_seqno = undef;
my $active_ranges = 1;
my $queries_each_second = undef;
my $transactions = undef;
&GetOptions(
	'active-ranges!' => \$active_ranges,
	'cache-easy!' => \$cache_easy,
	'file-seqno=s' => \$file_seqno,
	'queries-each-second!' => \$queries_each_second,
	'transactions!' => \$transactions,
	'prefixes!' => \$emit_prefixes,
 	'help|?' => sub { pod2usage(1); },
	'man' => sub { pod2usage(-verbose => 2); },
	'd|debug+' => \$debug,   
        'v|verbose+' => \$verbose) or pod2usage(2);

$active_ranges = 1 if ($queries_each_second);
my($in_schema) = "#fsdb -F t msgid time srcip srcport dstip dstport protocol id qr opcode aa tc rd ra z ad cd rcode qdcount ancount nscount arcount edns_present edns_udp_size edns_extended_rcode edns_version edns_z msglen";
#"#fsdb -F t msgid time srcip srcport dstip dstport protocol id qr opcode aa tc rd ra z ad cd rcode qdcount ancount nscount arcount edns_present edns_size ends_version ends_flags msglen";
# "#fsdb msgid time srcip srcport dstip dstport protocol id qr opcode aa tc rd ra z ad cd rcode qdcount ancount nscount arcount edns_present edns_size ends_version ends_flags msglen";
my($out_schema) = "#fsdb -F t key count";

# error handling
my %unknown_ops;

binmode STDOUT, ":utf8";
print $out_schema . "\n";


my($failed_ips) = 0;
my $MAX_FAILED_IPS = 1000;

my(%easy) = ();
# cache stats counting
# For one file, I see 147 fastpath new seconds, 6165597 fastpath same updates, and 21074 slow paths.
# I.e., caching works very well.
#foreach (qw(x+s x+fi x+fs)) {
#    $easy{$_} = 0;
#}

# because Net::IP is way too slow
# copied from message_question_to_load
sub ip_to_hexip($) {
    my($ip) = @_;
    if ($ip =~ /:/) {
        my $head = '';
        my $tail = '';
        my $in_tail = undef;
        foreach (split(/:/, $ip)) {
            if (!$in_tail && $_ eq '') {
                $in_tail = 1;
                next;
            };
            my $part = substr("0000" . $_, -4);
            if ($in_tail) {
                $tail .= $part;
            } else {
                $head .= $part;
            };
        };
        my $mid = "0" x (32 - length($head) - length($tail));
        return $head . $mid . $tail;
    } else {
        # v4
        my(@f) = split(/\./, $ip);
        return undef if ($#f != 3);
        return sprintf("%02x%02x%02x%02x", @f);
    };
}


sub truncate_ip6($) {
    my($srcip) = @_;
#    # As nice as Net::IP is, it's slow to build objects, so we avoid
#    # it for IPv4.
#    my($srcip_ni) = new Net::IP($srcip);
#    if (!$srcip_ni) {
#	next if ($failed_ips > $MAX_FAILED_IPS);
#	print "+eip-fail:$msgid,$time,$srcip\t1\n";
#	return undef;
#    };
#    my $a_str = $srcip_ni->ip();
#    $a_str =~ s/....:....:....:....$/:/;
#    my $as = new Net::IP($a_str);
#    return $as->short();
    my($srcip_hex) = ip_to_hexip($srcip);
    # truncate to /64
    $srcip_hex =~ s/................$/0000000000000000/;
    # renormalize
    my @parts;
    foreach (unpack("(A4)*", $srcip_hex)) {
        s/^0*//;
        $_ = '0' if ($_ eq '');
        push(@parts, $_);
    };
    for (;;) {
        last if ($#parts == -1 || $parts[$#parts] ne '0');
        pop(@parts);
    };
    if ($#parts < 0) {
        $parts[0] = "::";
    } elsif ($#parts < 8) {
        $parts[$#parts] .= "::";
    };
    return join(":", @parts);
}

sub truncate_ip4($) {
    my($srcip) = @_;
    # IPv4 is much easier than IPv6
    # Assume input is in dotted quad.
    my(@parts) = split(/\./, $srcip);
    return undef if ($#parts != 3);
    $parts[3] = '';
    return join('.', @parts);
}

######################################################################
# Rangelist code.
# same code in message_to_rssacinc.pl and rssacint_reduce.pl

#
# Take the string form of a rangelist and break it into start and end arrays.
#
sub decompose_rangelist($) {
    my($rl_str) = $_[0];
    my(@ss, @es);
    foreach (split(/,/, $rl_str)) {
	my($s, $e) = split(/-/, $_);
	die "unparsable range $_\n" if (!defined($s));
	$e //= $s;
	push(@ss, $s);
	push(@es, $e);
    };
    return (\@ss, \@es);
			}

sub min($$) {
    return $_[0] < $_[1] ? $_[0] : $_[1];
}

#
# Take two range lists (format like: 1-2,4,6-7)
# and merge them.
#
sub merge_ranges($$;$) {
    my(@rangelists) = ($_[0], $_[1]);
    my($report_overlap_as_error) = $_[2];

    #
    # fast path
    # optimize appending a simple other on a ranged one
    # merge_ranges("1-2", "3") => "1-3"
    #
    my($one, $other) = (undef, undef);
    if ($rangelists[1] =~ /^\d+$/) {
	($one, $other) = (0, 1);
    } elsif ($rangelists[0] =~ /^\d+$/) {
	($one, $other) = (1, 0);
    };
    if (defined($other) && $rangelists[$one] =~ /\-(\d+)$/) {
	# can try to fastpath
	# one:  1-2   (or more complex)
	# other:    3 (hopefully)
	my($one_e) = $1;
	my($other_s) = $rangelists[$other];
        if ($one_e == $other_s) {
            # no change needed
	    # $easy{'x+fs'}++;
            if ($report_overlap_as_error) {
		print ":eoverlapping-regions\t$rangelists[$one]+$other_s\n" if ($report_overlap_as_error > 1);
		return $rangelists[$one] . "/e";
	    } else {
		return $rangelists[$one];
	    };
        } elsif ($one_e + 1 == $other_s) {
	    # $easy{'x+fi'}++;
            $rangelists[$one] =~ s/(\D?)(\d+)$/$1$other_s/;
            return $rangelists[$one];
    	};
	# fall through for slow path
    };
    # $easy{'x+s'}++;

    #
    # slow path
    #
    # Decompose comma-separated list into array of ranges (start and ends).
    #    
    my(@ss, @es);
    foreach (0..1) {
        ($ss[$_], $es[$_]) = decompose_rangelist($rangelists[$_]);
    };

    #
    # Count how many lists each range occurs in.
    # If there is overlap, make more intermediate ranges.
    #
    # On exit of this loop, we have ONE rangelist in an array, plus counts.
    #
    # (And ick: this code is ALL corner cases.)
    #
    my(@count, @s, @e);
  buildcount:
    while (1) {
	#
	# Check if either side has drained.
	#
	foreach $one (0, 1) {
	    # print "checking $one for emtpy, is $#{$ss[$one]}\n";
	    $other = 1 - $one;
	    if ($#{$ss[$one]} == -1) {
		push(@count, (1) x ($#{$ss[$other]} + 1));
		push(@s, @{$ss[$other]});
		push(@e, @{$es[$other]});
		last buildcount;
	    };
	};
	#
	# assert(have stuff left in both)
	#
	# Make $one be the one the starts first
	# (so we only have a million cases, not four million.)
	#
	my($new_count) = 1;
	if ($ss[0][0] < $ss[1][0]) {
	    ($one, $other) = (0, 1);
	} elsif ($ss[0][0] > $ss[1][0]) {
	    ($one, $other) = (1, 0);
	} else {
	    # both start at same time
	    $new_count = 2;
	    # $one becomes the one that ends first
	    if ($es[0][0] <= $es[1][0]) {
		($one, $other) = (0, 1);
	    } else {
		($one, $other) = (1, 0);
	    };
	};
	#
	# assert($lists[$one] starts first (or at same time))
	#
	my($consume_one) = undef;
	push(@count, $new_count);
	push(@s, $ss[$one][0]);
	if ($ss[$one][0] < $ss[$other][0]) {
	    # one starts first
	    if ($es[$one][0] < $ss[$other][0]) {
		# and ends before other
		# +----+
		#         +----+
		# or abutts other (in which case we will merge later)
		# +----+
		#       +----+
		push(@e, $es[$one][0]);
		$consume_one = 1;
	    } elsif ($es[$one][0] >= $ss[$other][0]) {
		# and overlaps with other
		# +----+
		#      +----+
		# or
		# +----+
		#    +----+
		push(@e, $ss[$other][0]-1);
		$ss[$one][0] = $ss[$other][0];
		$consume_one = 0;
	    } else {
		die "invariant violated: one $one starts first\n";
	    };
	} elsif ($ss[$one][0] == $ss[$other][0]) {
	    # start at same time
	    push(@e, $es[$one][0]);
	    $consume_one = 1;
	    if ($es[$one][0] < $es[$other][0]) {
		# but one ends first
		# +----+
		# +--------+
		$ss[$other][0] = $es[$one][0]+1;
	    } elsif ($es[$one][0] == $es[$other][0]) {
		# complete overlap
		# +----+
		# +----+
		#
		# so also consume other here:
		shift @{$ss[$other]};
		shift @{$es[$other]};
	    } else {
		die "invariant violated: one $one and other $other start at same time and other ends first\n";
	    };
	} else {
	    die "invariant violated: one $one starts after other $other\n";
	};
	if ($consume_one) {
	    shift @{$ss[$one]};
	    shift @{$es[$one]};
	};
    };

    #
    # We now have a clean, single rangelist in an array, with counts.
    #
    # Now concatinate adjacent ranges and report overlap.
    #
    my($out, $error_out) = ("", "");
    while ($#s != -1) {
	if ($count[0] == 2) {
	    if ($report_overlap_as_error) {
		print ":eoverlapping-regions\t$s[0]-$e[0]\n" if ($report_overlap_as_error > 1);
		$error_out = "/e";
	    };
	};
	# merge?
	if ($#s >= 1) {
	    if ($e[0]+1 >= $s[1]) {
		$s[1] = $s[0];
		shift @count;
		shift @s;
		shift @e;
		# no output
		next;
	    };
	};
	$out .= "," if ($out ne "");
	$out .= ($s[0] == $e[0] ? $s[0] : $s[0] . "-" . $e[0]);
	shift @count;
	shift @s;
	shift @e;
    };
    return $out . $error_out;
}

######################################################################

# same code in message_to_rssacinc.pl and rssacint_reduce.pl
sub reduce_pair($$$$;$$) {
    my($op, $matcher, $last_value, $value, $last_unique, $unique) = @_;
    # reduce!
    if ($op eq '+') {
        $value += $last_value;             
    } elsif ($op eq '!') {
	die "internal error: undef unique\n" if (!defined($unique));
	if (!defined($last_unique)) {
	    $value = 1;
	} else {
	    if ($unique ne $last_unique) {
		$value = $last_value + 1;
	    } else {
		$value = $last_value;
	    };
	};
    } elsif ($op eq '-') {
	# range
	$value = merge_ranges($last_value, $value);
    } elsif ($op eq '=') {
	# ranges with overlap reporting
	$value = merge_ranges($last_value, $value, 2);
    } elsif ($op eq '<') {
        # lexical comparision! (not numeric)
        $value = $last_value if ($last_value lt $value);
    } elsif ($op eq '>') {
        $value = $last_value if ($last_value gt $value);
    } elsif ($op eq 'q') {
	return undef;
    } else {
	# pass other operators through unchanged
	$unknown_ops{$op} //= 0;
	$unknown_ops{$op}++;
	return undef;
    };
    return $value;
}


######################################################################


sub cachable_output($$$) {
    my($op, $key, $value) = @_;

    if (!$cache_easy) {
	print "$key\t$value\n";
	return;
    };
    if (defined($easy{$key})) {
	$easy{$key} = reduce_pair($op, $key, $easy{$key}, $value);
    } else {
	$easy{$key} = $value;
    };
}

sub flush_cachable_output() {
    foreach (sort keys %easy) {
	print "$_\t$easy{$_}\n";
    };
}       

sub process_file($) {
    my($file) = @_;
    my($open_mode, $open_place) = ("<:utf8", $file);
    if ($file eq '-') {
	($open_mode, $open_place) = ("<&=", 0);
    };
    my $in;
    if (!open($in, $open_mode, $open_place)) {
	print "+eopen-fail:$file\t1\n";
	return;
    };
    if (defined($file_seqno) && $file_seqno ne 'comment') {
	my(@parts) = split(/:/, $file_seqno);
	unshift(@parts, "-") if ($#parts < 1);
	die ("--file-seqno ($parts[1] is not digits or site:digits)\n")
	    if ($parts[1] !~ /^\d+/);
	# extra: keep track of what files we scanned, with site
        cachable_output("=", "=rfileseqno:$parts[0]", $parts[1]);
    };
    while (<$in>) {
	chomp;
        if (/^#/) {
            if (/^#fsdb/) {
                die "unexpected schema: $_\n"
                    if ($_ !~ /^$in_schema/);
		next;
            };
	    if ($file_seqno eq 'comment') {
		my($seqno, $site) = ($1, $3);
                if (/^#.*dnsanon .*-f \d+-\d+-(\d+)(\.)([\-\w-]+)/) {
                    ($seqno, $site) = ($1, $3);
                } elsif (/^#.*dnstapmq .*\d+-\d+-(\d+)(\.)([\-\w]+)/) {
                    ($seqno, $site) = ($1, $3);
                    };
                if (defined($seqno)) {
                    $site //= '-';
		    cachable_output("=", "=rfileseqno:$site", $seqno);
		    next;
		};
	    };
	    print "$_\n";
            next;
        };
	my($msgid, $time, $srcip, $srcport, $dstip, $dstport, $protocol, $id, $qr, $opcode, $aa, $tc, $rd, $ra, $z, $ad, $cd, $rcode, $qdcount, $ancount, $nscount, $arcount, $edns_present, $edns_udp_size, $edns_extended_rcode, $edns_version, $edns_z, $msglen) = split(/\s+/);
	# extra: keep track of measurement period
        cachable_output("<", "<ts", $time);
        cachable_output(">", ">te", $time);
	# extra: keep track of which seconds have activity
	my($time_secs) = undef;
	if ($active_ranges) {
	    if ($time =~ /^(\d+)(\.\d*)$/) {
	        $time_secs = $1;
		cachable_output("-", "-rt", $time_secs);
	    };
	};
        my($v46) = ($srcip =~ /\./ ? 4 : 6);
        # map TLS and DOx to different chars
        if ($protocol eq "tls" || $protocol eq "dot") {
            $protocol = "ssl";
        } elsif ($protocol eq "doh") {
            $protocol = "https";
        };
	my($short_protocol) = ($protocol =~ /^(.)/);
	# rssac-002v2 section 2.3: number of queries
        cachable_output("+", "+3$short_protocol$qr$v46", 1);
	# extra: queries per second
	if ($queries_each_second && defined($time_secs)) {
	    # we don't split out v4 / v6
	    cachable_output("+", "+3S$qr:$time_secs", 1);
	};
	# rssac-002v2 section 2.4: query and response size distribution
        cachable_output("+", "+4$short_protocol$qr:$msglen", 1);
	my($final_rcode) = ($edns_extended_rcode eq '-' || $edns_extended_rcode eq '0') ? $rcode : $edns_extended_rcode;
	# rssac-002v2 section 2.5: rcode distribution (responses only, but we do both)
        cachable_output("+", "+5$qr:$final_rcode", 1);
	# rssac-002v2 section 2.6: unique sources
        if ($qr == 0) {
            print "+6$v46:$srcip\t1\n";
            if ($emit_prefixes) {
                my($prefix) = ($v46 eq '6' ? truncate_ip6($srcip) : truncate_ip4($srcip));
                print "+6" . ($v46 eq '6' ? 'a' : 'b') . ":$prefix\t1\n" if ($prefix);
            };
        };
	# extra: transactions
	if ($transactions) {
	    my($client_ip, $client_port) = ($qr == 0 ? ($srcip, $srcport) : ($dstip, $dstport));
	    print "q$short_protocol$client_ip,$client_port,$id\t$qr,$time\n";
	};
        # extra: queries per service address
        if ($qr == 0) {
            cachable_output("+", "+s:$dstip", 1);
        };
    };
    close $in;
    flush_cachable_output();
    print "+eip-fail:too-many\t$failed_ips\n" if ($failed_ips > $MAX_FAILED_IPS);
    print "# message_to_rssacint.pl " . join(" ", @orig_argv) . "\n";
};

push (@ARGV, "-") if ($#ARGV == -1);
foreach (@ARGV) {
    process_file($_);
};

exit 0;

=head1 SEE ALSO

L<dnsanon(1)>,
L<message_to_rssacint(1)>,
L<rssacint_reduce(1)>,
L<rssacfin_to_rssacyaml(1)>


=head1 AUTHOR and COPYRIGHT

This program was written by John Heidemann.

Copyright (C) 2016-2020 University of Southern California.

This program is distributed under terms of the GNU general
public license, version 2.  See the file COPYING
with the distribution for details.

=cut


