""" Simple and navie mDNS implementation. Low level socket management to listen to mDNS queries and respond to _http._tcp.local service queries with constant name. """ import socket from dnslib import DNSRecord, DNSHeader, QTYPE, A, SRV, TXT, RR, PTR import dns.message multicast_group = "224.0.0.251" multicast_port = 5353 interface_ip = "0.0.0.0" SERVICE_TO_ANNOUNCE = "toto" def main(): sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.setsockopt(socket.IPPROTO_IP, socket.IP_MULTICAST_TTL, 32) # See man socket(7) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) # sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) sock.bind(("", multicast_port)) # sock.setsockopt(socket.SOL_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(multicast_group) + socket.inet_aton(interface_ip)) sock.setsockopt( socket.IPPROTO_IP, socket.IP_MULTICAST_IF, socket.inet_aton(interface_ip) ) sock.setsockopt( socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, socket.inet_aton(multicast_group) + socket.inet_aton(interface_ip), ) while True: received, sender = sock.recvfrom(1500) print(f"Received packet of {len(received)} bytes from {sender}") try: msg = dns.message.from_wire(received) except dns.exception.FormError: print("Not a valid DNS message") continue if msg.opcode() == dns.opcode.QUERY: for question in msg.question: if str(question.name) == "_http._tcp.local.": print(f"question name {type(msg)}") unicast_response = question.rdclass == 0x8001 response = DNSRecord(DNSHeader(id=0, bitmap=0x8400)) ptr = PTR(SERVICE_TO_ANNOUNCE + "._http._tcp.local") srv = SRV(target="toto.local", priority=10, port=80, weight=100) txt = TXT("path=/") a = A("192.168.1.21") response.add_answer( RR("_http._tcp.local", QTYPE.PTR, rdata=ptr, ttl=120) ) response.add_ar( RR( SERVICE_TO_ANNOUNCE + "._http._tcp.local", QTYPE.SRV, rdata=srv, ttl=120, ) ) response.add_ar( RR( SERVICE_TO_ANNOUNCE + "._http._tcp.local", QTYPE.TXT, rdata=txt, ttl=120, ) ) response.add_ar( RR(SERVICE_TO_ANNOUNCE + ".local", QTYPE.A, rdata=a, ttl=120) ) print(str(response)) if unicast_response: print("Unicast response") sock.sendto(response.pack(), sender) else: print("Multicast response") sock.sendto(response.pack(), (multicast_group, multicast_port)) print(f"Send response to {sender}") if __name__ == "__main__": main()