Tutorial 4 - DNS & Wireshark

Submission process:

  • Submission deadline is December 11, 14:00 CET (before the lecture) .
  • Commit and push your solution as separate notebook files per subtask via git as ./tutorial/tutorial4/tutorial4_1.ipynb. Please take care of the correct subfolder/filename since submission is denied otherwise.
  • During the first lecture after the deadline we will discuss a sample solution in class.
  • Afterwards, you have time until December 16, 16:00 CET (before the lecture) to submit a corrected version of your submission:
    1. Rework your solution according to our discussion in class.
    2. Commit and push the corrected version as a separate file per subtask via git as ./tutorial/tutorial4/tutorial4_1.ipynb. Please take care of the correct filename since submission is denied otherwise.

Remarks:

  • Grading is done based on both versions of your submission.
  • If the first submission is missing or contains major flaws a deduction of up to 50% of the achieved points will be applied
  • A sample solution is provided after December 16, 16:00 CET eventually.
  • Do NOT duplicate cells or change the type of cells, otherwise you might receive zero points for your submission
  • Please use acn@net.in.tum.de for questions regarding lecture, tutorial, and project of ACN.

This cell is used to verify the structure of the submission

DO NOT DELETE OR CHANGE THIS CELL, otherwise the task will be graded with 0 points

Problem 1 DNS Parsing (7.15 credits)

The DNS message format was defined in RFC 1035. Normally, our DNS software parses DNS messages. In this exercise, it is your task to parse provided DNS messages.

Attention: You may only use the Python standard libraries and modules to parse the DNS messages. Do not use any third-party modules which do the full parsing for you.

In [1]:
from typing import List, Tuple
from ctypes import *
from struct import *
from dataclasses import dataclass
from ipaddress import IPv4Address, IPv6Address
import json

type Domain = List[bytes]
type ByteLike = bytes | bytearray | memoryview
In [2]:
# Download DNS test messages from ACN website
!wget -N https://acn.net.in.tum.de/exercise/dns-messages.json
--2026-01-13 22:54:57--  https://acn.net.in.tum.de/exercise/dns-messages.json
Resolving acn.net.in.tum.de (acn.net.in.tum.de)... 
188.95.232.11, 2a00:4700:0:9:f::1
Connecting to acn.net.in.tum.de (acn.net.in.tum.de)|188.95.232.11|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 7350 (7.2K) [application/json]
Saving to: ‘dns-messages.json’


dns-messages.json     0%[                    ]       0  --.-KB/s               
dns-messages.json   100%[===================>]   7.18K  --.-KB/s    in 0s      

2026-01-13 22:54:57 (256 MB/s) - ‘dns-messages.json’ saved [7350/7350]

In [3]:
with open('dns-messages.json', 'r') as f:
    dns_messages_hex = json.load(f)
dns_messages = [bytes.fromhex(m) for m in dns_messages_hex]
print(f"Downloaded and loaded dns_messages: {len(dns_messages)} messages")
print(f"Example message 1: {dns_messages[1]}")
ex_one = dns_messages[1] 
Downloaded and loaded dns_messages: 22 messages
Example message 1: b'\xdaM\x81\x80\x00\x01\x00\x01\x00\x04\x00\x06\x03acn\x03net\x03cit\x03tum\x02de\x00\x00\x01\x00\x01\xc0\x0c\x00\x01\x00\x01\x00\x00\x01i\x00\x04\xbc_\xe8\x0b\xc0\x10\x00\x02\x00\x01\x00\x00\x01i\x00\x07\x04dns2\xc0\x10\xc0\x10\x00\x02\x00\x01\x00\x00\x01i\x00\x07\x04dns1\xc0\x10\xc0\x10\x00\x02\x00\x01\x00\x00\x01i\x00\n\x03ns1\x03rbg\xc0\x18\xc0\x10\x00\x02\x00\x01\x00\x00\x01i\x00\x06\x03ns2\xc0j\xc0f\x00\x01\x00\x01\x00\x01\xdf\xfc\x00\x04\x83\x9f\x00\x01\xc0|\x00\x01\x00\x01\x00\x01\xc3\xc6\x00\x04\x83\x9f\x00Q\xc0S\x00\x01\x00\x01\x00\x00\x00\x9c\x00\x04\x83\x9f\x0e\xc5\xc0S\x00\x1c\x00\x01\x00\x00\x00\x9c\x00\x10 \x01L\xa0 \x01\x00\x17\x00\x00\x00\x00\x00\x00\x01\x97\xc0@\x00\x01\x00\x01\x00\x00\x00\x9c\x00\x04\x83\x9f\x0f\x0f\xc0@\x00\x1c\x00\x01\x00\x00\x00\x9c\x00\x10 \x01L\xa0 \x01\x00\x13\x02\x16>\xff\xfe+\xa24'

a) [0.5 credits] Write a class DNSHeader, which parses a DNS header.

Use a ctypes Structure or one of its subclasses as superclass. It should parse the header into an int property for each header field. The properties should named as defined in RFC 1035 in lower case.

In [4]:
class DNSHeader:
    pass

# Override the dummy class
# begin insert code
class DNSHeader(BigEndianStructure):
    _pack_ = True
    _fields_ = [
        ('id', c_uint16),
        ('qr', c_uint8, 1),
        ('opcode', c_uint8, 4),
        ('aa', c_uint8, 1),
        ('tc', c_uint8, 1),
        ('rd', c_uint8, 1),
        ('ra', c_uint8, 1),
        ('z', c_uint8, 3),
        ('rcode', c_uint8, 4),
        ('qdcount', c_uint16),
        ('ancount', c_uint16),
        ('nscount', c_uint16),
        ('arcount', c_uint16)
    ]

# end insert code
In [5]:
ex_header = DNSHeader.from_buffer_copy(ex_one)
# Print the fields
for f in ex_header._fields_:
    print(f"{f[0]}\t{str(getattr(ex_header, f[0]))}")
id	55885
qr	1
opcode	0
aa	0
tc	0
rd	1
ra	1
z	0
rcode	0
qdcount	1
ancount	1
nscount	4
arcount	6
In [ ]:
 

b) [2.5 credits] Write a function parse_name.

The function receives a bytes-like object containing a valid DNS message body, an offset and the full DNS message as arguments. It should return the offset and the sequence of labels as a list of bytes. Make sure that your implementation satisfies the following requirements.

  • Support both (ordinary) labels and pointers
  • Raise a ValueError if you encounter an unknown label type or a loop
  • The content argument might not be a full DNS message, e.g., when printing an rdata containing a domain name
  • The offset argument is the offset of content at which to start parsing
  • The returned offset should be the offset in the content after the first pointer or the last (ordinary) label if no pointer exists
  • If message is not None, use this byte-like object as DNS message content from that point onward.
In [6]:
def parse_name(content: ByteLike, offset: int, message: ByteLike) -> Tuple[int, Domain]:
    # begin insert code
    offsets = set()
    name = list()
    offset_after_first_pointer = None
    encountered_pointer = False
    while offset not in offsets:
        offsets.add(offset)
        label_byte = content[offset]
        # Shift 6 bytes to right to only have identifying bits left
        label_type = label_byte >> 6
        match label_type:
            case 0:
                # (Ordinary) label
                length = label_byte & 0b00111111
                lcontent = content[offset + 1:offset + 1 + length]
                name.append(bytes(lcontent))
                offset  += 1 + length
                if not encountered_pointer:
                    offset_after_first_pointer = offset
                if length == 0:
                    # Empty label -> root -> end of domain name
                    break
            case 3:
                # Pointer
                if not encountered_pointer:
                    # First pointer encountered -> save the offset to be returned
                    offset_after_first_pointer = offset + 2
                    encountered_pointer = True
                pointer_offset, = unpack('!H', bytes(content[offset:offset+2]))
                # remove identifying bits
                pointer_offset -= int.from_bytes(b'\xc0\x00', signed=False, byteorder='big')
                # Continue parsing at pointer offset
                offset = pointer_offset
                if message:
                    content = message
            case _:
                raise ValueError(f'Does not support label type {label_type}')
    if offset in offsets:
        raise ValueError(f'We encountered a loop at offset {offset}')
    return offset_after_first_pointer, name
    # end insert code
    return 0, []
In [7]:
t = parse_name(ex_one, 12, ex_one)
print(t)
assert t == (32, [b'acn', b'net', b'cit', b'tum', b'de', b''])
(32, [b'acn', b'net', b'cit', b'tum', b'de', b''])
In [8]:
t = parse_name(ex_one[64:71], 0, ex_one)
print(t)
assert t == (7, [b'dns2', b'net', b'cit', b'tum', b'de', b''])
(7, [b'dns2', b'net', b'cit', b'tum', b'de', b''])
In [ ]:
 

c) [0.25 credits] Write a function label_sequence_to_str.

The function receives a list of bytes representing the lables observed. It should return the FQDN as ASCII-string representation.

In [9]:
def label_sequence_to_str(labels: Domain) -> str:
    # begin insert code

    if len(labels) == 1 and labels[0] == b'':
        return '.'
    else:
        return '.'.join(map(lambda x: x.decode('ascii'), labels))
    # end insert code
    return ''
In [10]:
label_sequence_to_str([b'acn', b'net', b'cit', b'tum', b'de', b''])
Out[10]:
'acn.net.cit.tum.de.'
In [ ]:
 

d) [0.4 credits] Override the to_text method, type_name and type_value variables for the following subclasses of the following DNS types.

  • A
  • NS
  • CNAME
  • AAAA

You may use the ipaddress module. The to_text method should return the text representation of these DNS types. These classes are used to print the parsed DNS types later.

In [11]:
class DNSType:
    type_name: str = None
    type_value: int = None

    @classmethod
    def __str__(cls):
        return str(cls.type_name)

    @classmethod
    def to_text(cls, rdata, content=None) -> str:
        """Return the text representaion of a DNS type for given rdata.

        Arguments:
        rdata   -- the rdata to format according to the DNS type
        content -- the complete DNS message
        """
        return f"\\# {len(rdata)} {rdata.hex()}"

    @classmethod
    def from_type(cls, type_value: int):
        """Based on the type_value, return the correct subclass"""
        lookup = {c.type_value : c for c in DNSType.__subclasses__()}
        return lookup.get(type_value, __class__)
In [12]:
class AType(DNSType):
    # begin insert code
    type_name = 'A'
    type_value = 1
    # end insert code

    @classmethod
    def to_text(cls, rdata, content=None) -> str:
        # begin insert code
        return IPv4Address(rdata).__str__()
        # end insert code
        return ''
In [ ]:
 
In [13]:
class NSType(DNSType):
    # begin insert code
    type_name = 'NS'
    type_value = 2
    # end insert code

    @classmethod
    def to_text(cls, rdata, content=None) -> str:
        # begin insert code
        return label_sequence_to_str(parse_name(rdata, 0, content)[1])
        # end insert code
        return ''
In [ ]:
 
In [14]:
class CNAMEType(DNSType):
    # begin insert code
    type_name = 'CNAME'
    type_value = 5
    # end insert code
    
    @classmethod
    def to_text(cls, rdata, content=None) -> str:
        # begin insert code
        return label_sequence_to_str(parse_name(rdata, 0, content)[1])
        # end insert code
        return ''
In [ ]:
 
In [15]:
class AAAAType(DNSType):
    # begin insert code
    type_name = 'AAAA'
    type_value = 28
    # end insert code
    
    @classmethod
    def to_text(cls, rdata, content=None) -> str:
        # begin insert code
        return IPv6Address(rdata).__str__()
        # end insert code
        return ''
In [ ]:
 

e) [0.75 credits] Write a function parse_query_section.

The function receives a bytes-like object containing a valid DNS message body and optionally a question count. It should return the offset after the question section and a list of QuestionRecords.

In [16]:
@dataclass
class QuestionRecord:
    name: Domain
    rtype: int
    rclass: int

    @classmethod
    def _class_mnemonics(cls):
        return {
            1: 'IN',
            1: 'IN',
            2: 'CS',
            3: 'CH',
            4: 'HS'
        }

    def to_text(self):
        qtype_cls = DNSType.from_type(self.rtype)
        if not qtype_cls:
            raise ValueError(f"Unknown type {self.qtype}")
        return f"{label_sequence_to_str(self.name)}\t{__class__._class_mnemonics().get(self.rclass, 'CLASS'+str(self.rclass))}\t{qtype_cls.type_name or 'TYPE'+str(self.rtype)}"

@dataclass
class ResourceRecord(QuestionRecord):
    name: Domain
    rtype: int
    rclass: int
    ttl: int
    rdlength: int
    rdata: bytes

    def to_text(self, content=None):
        qtype_cls = DNSType.from_type(self.rtype)       
        return f"{label_sequence_to_str(self.name)}\t{self.ttl}\t{__class__._class_mnemonics().get(self.rclass, 'CLASS'+str(self.rclass))}\t{qtype_cls.type_name or 'TYPE'+str(self.rtype)}\t{qtype_cls.to_text(self.rdata, content)}"
In [17]:
def parse_query_section(content, qdcount: int = 1):
    """Parse a query section of a DNS message

    Arguments:
    content -- bytes-like object (e.g. bytes, bytearray, memoryview, ...)
    Keyword Arguments:
    qdcount -- the count of questions, normally 1
    """
    # begin insert code
    # Parse query section
    offset = 12
    question = []
    for i in range(qdcount):
        of, qname = parse_name(content, offset, content)
        qtype, qclass = unpack('!HH', content[of:of+4])
        #question.append((qname, qtype, qclass))
        question.append(QuestionRecord(name=qname, rclass=qclass, rtype=qtype))
        offset = of + 4
    return offset, question
    # end insert code
    return 0, []
In [18]:
r = parse_query_section(ex_one)
print(r)
for q in r[1]:
    print(q.to_text())
(36, [QuestionRecord(name=[b'acn', b'net', b'cit', b'tum', b'de', b''], rtype=1, rclass=1)])
acn.net.cit.tum.de.	IN	A
In [ ]:
 

f) [0.75 credits] Write a function parse_rr_section.

The function receives a bytes-like object containing a valid DNS message body and a resource record count. It should return the offset after the section and a list of ResourceRecords.

In [19]:
def parse_rrset(content, offset: int, rr_count: int):
    rrset = []
    # begin insert code
    # Parse an rrset
    for i in range(rr_count):
        of, name = parse_name(content, offset, content)
        rtype, rclass, ttl, rdlength = unpack('!HHIH', content[of:of+10])
        # rrset.append((name, rtype, rclass, ttl, rdlength, bytes(content[of+10:of+10+rdlength])))
        rrset.append(ResourceRecord(name=name, rtype=rtype, rclass=rclass, ttl=ttl, rdlength=rdlength, rdata=bytes(content[of+10:of+10+rdlength])))
        offset = of + 10 + rdlength
    # end insert code
    return offset, rrset
In [20]:
offset, qs = parse_query_section(ex_one)
for q in parse_rrset(ex_one, offset, 1)[1]:
    print(q.to_text(ex_one))
acn.net.cit.tum.de.	361	IN	A	188.95.232.11
In [ ]:
 

g) [1 credits] Extend the constructor of the DNSMessage class to parse the DNS message into the corresponding variables.

Reuse the functions you wrote above!

In [21]:
OPCODE = {
    0: 'QUERY',
    1: 'IQUERY',
    2: 'STATUS'
}

RCODE = {
    0: 'NoError',
    1: 'FormErr',
    2: 'ServFail',
    3: 'NXDomain',
    4: 'NotImp',
    5: 'Refused'
}
In [22]:
class DNSMessage:
    
    def __init__(self, c: bytes):
        self._question: List[QuestionRecord] = []
        self._answer: List[ResourceRecord] = []
        self._authority: List[ResourceRecord] = []
        self._additional: List[ResourceRecord] = []
        
        self._content = memoryview(bytearray(c))
        # begin insert code
        self._header = DNSHeader.from_buffer(self._content)

        offset, self._question = parse_query_section(self._content, qdcount=self._header.qdcount)
        #print(f'Offset after query: {offset}')
        offset, self._answer = parse_rrset(self._content, offset, self._header.ancount)
        #print(f'Offset after answer: {offset}')
        offset, self._authority = parse_rrset(self._content, offset, self._header.nscount)
        #print(f'Offset after authority: {offset}')
        offset, self._additional = parse_rrset(self._content, offset, self._header.arcount)
        #print(f'Offset after answer: {offset}')
        # end insert code

    def __str__(self):
        """Return the text representation of the DNSMessage"""
        header = {field: getattr(self._header, field) for field in ['id', 'qr', 'opcode', 'aa', 'tc', 'rd', 'ra', 'z', 'rcode']}
        flags = filter(lambda x: x[1] >= 1 and len(x[0]) == 2 and x[0] != 'id', header.items())
        flags = list(map(lambda x: x[0], flags))
        
        header = f"""
;; ->>HEADER<<- opcode: {OPCODE.get(self._header.opcode, 'FutureUse')}; status: {RCODE.get(self._header.rcode)}; id: {self._header.id}
;; Flags: {' '.join(flags)}; QUERY: {self._header.qdcount}; ANSWER: {self._header.ancount}; AUTHORITY: {self._header.nscount}; ADDITIONAL: {self._header.arcount}
"""
        question_s = "\n;; QUESTION SECTION:"
        questions = [
            ';; ' + q.to_text()
            for q in self._question
        ]

        answer_s = "\n;; ANSWER SECTION:"
        answers = [
            a.to_text(self._content)
            for a in self._answer
        ]

        authority_s = "\n;; AUTHORITY SECTION:"
        authorities = [
            a.to_text(self._content)
            for a in self._authority
        ]

        additional_s = "\n;; ADDITIONAL SECTION:"
        additionals = [
            a.to_text(self._content)
            for a in self._additional
        ]
        result = [header, question_s] + questions + [answer_s] + answers + [authority_s] + authorities + [additional_s] + additionals
        return '\n'.join(result)
In [23]:
print(DNSMessage(ex_one))
;; ->>HEADER<<- opcode: QUERY; status: NoError; id: 55885
;; Flags: qr rd ra; QUERY: 1; ANSWER: 1; AUTHORITY: 4; ADDITIONAL: 6


;; QUESTION SECTION:
;; acn.net.cit.tum.de.	IN	A

;; ANSWER SECTION:
acn.net.cit.tum.de.	361	IN	A	188.95.232.11

;; AUTHORITY SECTION:
net.cit.tum.de.	361	IN	NS	dns2.net.cit.tum.de.
net.cit.tum.de.	361	IN	NS	dns1.net.cit.tum.de.
net.cit.tum.de.	361	IN	NS	ns1.rbg.tum.de.
net.cit.tum.de.	361	IN	NS	ns2.rbg.tum.de.

;; ADDITIONAL SECTION:
ns1.rbg.tum.de.	122876	IN	A	131.159.0.1
ns2.rbg.tum.de.	115654	IN	A	131.159.0.81
dns1.net.cit.tum.de.	156	IN	A	131.159.14.197
dns1.net.cit.tum.de.	156	IN	AAAA	2001:4ca0:2001:17::197
dns2.net.cit.tum.de.	156	IN	A	131.159.15.15
dns2.net.cit.tum.de.	156	IN	AAAA	2001:4ca0:2001:13:216:3eff:fe2b:a234
In [ ]:
 

h) [1 credits] Given the query and answer below: explain what type of resolver was queried?

  • Nawrocki et al.'s paper defines the following types of resolvers:
    • Recursive Resolver: answers recursive queries by iteratively resolving the query and answering with the final answer
    • Recursive Forwarder: answers recursive queries by forwarding the query to another resolver which performs the iterative resolution and returns the answer to the recursive forwarder. The forwarder then returns the answer to the client initiating the query.
    • Transparent Resolver: takes incoming recursive queries and forwards them to another resolver with a spoofed source IP address (the IP address of the client)
  • The query was send to IP address 131.159.14.221 (ITO resolver)
  • The answer was send from IP address 131.159.14.221
  • The authoritative name server for query.mirror.measr.net. returns the following RRs when queried:
    • The fixed check-IP address 10.77.77.77
    • The IP address of the resolver performing the actual DNS query to the authoritative name server
In [24]:
query = bytes.fromhex("cd3101200001000000000000057175657279066d6972726f72056d65617372036e65740000010001")
answer = bytes.fromhex("cd31818000010002000d000a057175657279066d6972726f72056d65617372036e65740000010001c00c0001000100000ada0004839f0135c00c0001000100000ada00040a4d4d4d000002000100004f0d001101660c726f6f742d73657276657273c01f000002000100004f0d0004016dc055000002000100004f0d00040163c055000002000100004f0d0004016bc055000002000100004f0d0004016cc055000002000100004f0d00040167c055000002000100004f0d00040169c055000002000100004f0d0004016ac055000002000100004f0d00040165c055000002000100004f0d00040162c055000002000100004f0d00040161c055000002000100004f0d00040168c055000002000100004f0d00040164c055c0f600010001000741920004c6290004c0f6001c000100081a6c001020010503ba3e00000000000000020030c0e700010001000564610004aaf7aa02c0e7001c0001000592dd0010280101b800100000000000000000000bc07e00010001000022a30004c021040cc07e001c00010005646100102001050000020000000000000000000cc11400010001000022a30004c7075b0dc114001c000100076d8a001020010500002d0000000000000000000dc0d800010001000022a30004c0cbe60ac0d8001c0001000592dd00102001050000a80000000000000000000e")
In [25]:
print(DNSMessage(query))
;; ->>HEADER<<- opcode: QUERY; status: NoError; id: 52529
;; Flags: rd; QUERY: 1; ANSWER: 0; AUTHORITY: 0; ADDITIONAL: 0


;; QUESTION SECTION:
;; query.mirror.measr.net.	IN	A

;; ANSWER SECTION:

;; AUTHORITY SECTION:

;; ADDITIONAL SECTION:
In [26]:
print(DNSMessage(answer))
;; ->>HEADER<<- opcode: QUERY; status: NoError; id: 52529
;; Flags: qr rd ra; QUERY: 1; ANSWER: 2; AUTHORITY: 13; ADDITIONAL: 10


;; QUESTION SECTION:
;; query.mirror.measr.net.	IN	A

;; ANSWER SECTION:
query.mirror.measr.net.	2778	IN	A	131.159.1.53
query.mirror.measr.net.	2778	IN	A	10.77.77.77

;; AUTHORITY SECTION:
.	20237	IN	NS	f.root-servers.net.
.	20237	IN	NS	m.root-servers.net.
.	20237	IN	NS	c.root-servers.net.
.	20237	IN	NS	k.root-servers.net.
.	20237	IN	NS	l.root-servers.net.
.	20237	IN	NS	g.root-servers.net.
.	20237	IN	NS	i.root-servers.net.
.	20237	IN	NS	j.root-servers.net.
.	20237	IN	NS	e.root-servers.net.
.	20237	IN	NS	b.root-servers.net.
.	20237	IN	NS	a.root-servers.net.
.	20237	IN	NS	h.root-servers.net.
.	20237	IN	NS	d.root-servers.net.

;; ADDITIONAL SECTION:
a.root-servers.net.	475538	IN	A	198.41.0.4
a.root-servers.net.	531052	IN	AAAA	2001:503:ba3e::2:30
b.root-servers.net.	353377	IN	A	170.247.170.2
b.root-servers.net.	365277	IN	AAAA	2801:1b8:10::b
c.root-servers.net.	8867	IN	A	192.33.4.12
c.root-servers.net.	353377	IN	AAAA	2001:500:2::c
d.root-servers.net.	8867	IN	A	199.7.91.13
d.root-servers.net.	486794	IN	AAAA	2001:500:2d::d
e.root-servers.net.	8867	IN	A	192.203.230.10
e.root-servers.net.	365277	IN	AAAA	2001:500:a8::e
  • The resolver 131.159.14.221 seems to be a recursive forwarder.
  • The actual resolution was performed from 131.159.1.53, which is the non-check IP address used to perform the actual query at the authoritative name server.
  • Since the answer is again returned from 131.159.14.221, it can not be a transparent forwarder.

Advanced Computer Networking by Prof. Dr.-Ing. Georg Carle

Teaching assistants: Christian Dietze, Sebastian Gallenmüller, Marcel Kempf, Lorenz Lehle, Nikolas Gauder, Patrick Dirks