Hardware Signal Analysis Report

SANS Holiday Hack Challenge 2025 - Signal Decoding Challenge


πŸ“‹ Executive Summary

This report documents the complete analysis and decoding of three different hardware communication protocols:

All signals were captured from WebSocket endpoints and decoded to reveal hidden messages encrypted with XOR ciphers.


πŸ”Œ Protocol 1: 1-Wire (DQ Signal)

Overview

The 1-Wire protocol is a device communication bus system using a single data line for bidirectional communication. The DQ (Data Queue) line carries both data and timing information.

Signal Capture

Decoding Method

Timing Analysis

The protocol uses pulse-width modulation to encode bits:

Bit Reading Strategy

Bits are read from HIGH→LOW transitions:

  1. Detect when voltage goes from HIGH (1) to LOW (0)
  2. Measure the duration the line stays LOW
  3. Duration < 20 ΞΌs = bit 1, duration > 20 ΞΌs = bit 0

Byte Assembly

Tools Used

Decoded Message

read and decrypt the SPI bus data using the XOR key: icy

Key Findings


⚑ Protocol 2: SPI (Serial Peripheral Interface)

Overview

SPI is a synchronous serial communication protocol using separate clock and data lines. Data is sampled on clock edges.

Signal Capture

Decoding Method

Clock-Data Synchronization

  1. Identify Rising Edges: Find SCK transitions from 0β†’1
  2. Sample MOSI: Read MOSI value at each rising clock edge
  3. Assemble Bits: Collect sampled bits in sequence

Byte Conversion

XOR Decryption

key = b"icy"
decrypted[i] = encrypted[i] ^ key[i % len(key)]

Tools Used

Decoded Message

read and decrypt the I2C bus data using the XOR key: bananza. 
the temperature sensor address is 0x3C

Key Findings


πŸ”— Protocol 3: I2C (Inter-Integrated Circuit)

Overview

I2C is a multi-master, multi-slave serial communication protocol using two lines: SCL (clock) and SDA (data). Devices are addressed, and data includes ACK/NACK bits.

Signal Capture

Decoding Method

Transaction Detection

Data Sampling

  1. Sample SDA on SCL rising edges (marked as *-sample)
  2. First 8 bits = Address byte (7-bit address + R/W bit)
  3. Following bits = Data bytes (8 bits + 1 ACK bit repeated)

Address Filtering

Address Byte Format: [A6 A5 A4 A3 A2 A1 A0 R/W]
Target Address: 0x3C (0011 1100)

Byte Parsing

XOR Decryption

key = b"bananza"
decrypted[i] = encrypted[i] ^ key[i % len(key)]

Tools Used

Decoded Message

The sensor transmitted environmental readings:

Sensor Data from Address 0x3C:

Key Findings


πŸ› οΈ Technical Tools Summary

Python Scripts Created

Script Name Protocol Function
websocket_downloader.py DQ Capture WebSocket signals to file
simple_websocket.py DQ Simplified capture script
dq_decoder_final.py DQ Decode 1-Wire timing protocol
capture_spi_signals.py SPI Capture SCK and MOSI signals
spi_decoder.py SPI Decode and decrypt SPI data
spi_capture_and_decode.py SPI All-in-one capture + decode
capture_i2c_signals.py I2C Capture SCL and SDA signals
i2c_decoder.py I2C Decode and decrypt I2C data
i2c_capture_and_decode.py I2C All-in-one capture + decode

Dependencies

pip install websockets

πŸ“Š Results Timeline

β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚  DQ Signal  β”‚ β†’ "read and decrypt the SPI bus data using the XOR key: icy"
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜
       β”‚
       ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ SPI Signal  β”‚ β†’ "read and decrypt the I2C bus data using the XOR key: bananza.
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    the temperature sensor address is 0x3C"
       β”‚
       ↓
β”Œβ”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”
β”‚ I2C Signal  β”‚ β†’ Temperature: 32.84Β°C, Pressure: 1013 hPa
β””β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”€β”˜    Humidity: 45%/85%, Light: 3450/3850 lux

πŸ” Encryption Details

XOR Cipher Implementation

All three protocols used XOR encryption with repeating keys:

def xor_decrypt(data, key):
    """XOR decrypt with repeating key"""
    key_bytes = key.encode('utf-8')
    result = bytearray()
    for i, byte in enumerate(data):
        result.append(byte ^ key_bytes[i % len(key)])
    return bytes(result)

Keys Used


πŸ“ˆ Signal Statistics

DQ (1-Wire)

SPI

I2C


🎯 Key Observations

Protocol Characteristics

Protocol Wires Clock Bidirectional Addressing
1-Wire 1 No Yes No
SPI 2+ Yes No No
I2C 2 Yes Yes Yes (7-bit)

Decoding Challenges

  1. 1-Wire: Required identifying correct bit timing and bit order (LSB vs MSB)
  2. SPI: Needed proper clock-data synchronization and bit order determination
  3. I2C: Complex protocol with START/STOP conditions, addressing, and ACK bits

Success Factors


πŸ“ Output Files

All decoded messages and scripts are available in /mnt/user-data/outputs/:


🏁 Conclusion

Successfully decoded three layers of hardware communication protocols:

  1. 1-Wire provided the key to decrypt SPI
  2. SPI provided the key and address to decrypt I2C
  3. I2C revealed environmental sensor data from a temperature sensor

The challenge demonstrated proficiency in:

Total Time: Progressive decoding through three protocol layers
Tools Created: 9 Python scripts for capture and analysis
Final Result: Environmental sensor readings successfully extracted


Report Generated: December 2025
Challenge: SANS Holiday Hack Challenge 2025
Analyst: Hardware Signal Analysis Team


πŸ“œ Complete Script Repository

Script Index

All scripts are included below with complete source code for reproducibility.

1-Wire (DQ) Scripts

SPI Scripts

I2C Scripts


1-Wire (DQ) Signal Scripts

websocket_downloader.py

#!/usr/bin/env python3
"""
WebSocket Data Downloader
Connects to a WebSocket and saves all received data
"""

import asyncio
import websockets
import json
from datetime import datetime
import sys

# WebSocket URL
WS_URL = "wss://signals.holidayhackchallenge.com/wire/dq"

# Headers to match your request
HEADERS = {
    "Host": "signals.holidayhackchallenge.com",
    "Connection": "Upgrade",
    "Pragma": "no-cache",
    "Cache-Control": "no-cache",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36",
    "Upgrade": "websocket",
    "Origin": "https://signals.holidayhackchallenge.com",
    "Sec-WebSocket-Version": "13",
    "Accept-Encoding": "gzip, deflate, br, zstd",
    "Accept-Language": "en-US,en;q=0.9,es;q=0.8",
}

async def download_websocket_data(output_file="websocket_data.txt"):
    """
    Connect to WebSocket and save all received messages
    """
    print(f"[{datetime.now()}] Connecting to {WS_URL}...")
    
    try:
        async with websockets.connect(
            WS_URL,
            extra_headers=HEADERS,
            compression="deflate"  # For permessage-deflate extension
        ) as websocket:
            print(f"[{datetime.now()}] Connected successfully!")
            print(f"[{datetime.now()}] Receiving data... (Press Ctrl+C to stop)")
            
            message_count = 0
            
            with open(output_file, 'w', encoding='utf-8') as f:
                f.write(f"WebSocket Data Capture - Started at {datetime.now()}\n")
                f.write("=" * 80 + "\n\n")
                
                try:
                    async for message in websocket:
                        message_count += 1
                        timestamp = datetime.now()
                        
                        # Print to console
                        print(f"\n[{timestamp}] Message #{message_count}")
                        print(f"Type: {type(message)}")
                        
                        # Write to file
                        f.write(f"[{timestamp}] Message #{message_count}\n")
                        f.write("-" * 80 + "\n")
                        
                        # Handle different message types
                        if isinstance(message, bytes):
                            # Binary data
                            print(f"Binary data received ({len(message)} bytes)")
                            f.write(f"Binary data ({len(message)} bytes):\n")
                            f.write(message.hex() + "\n")
                            
                            # Try to decode as text
                            try:
                                text = message.decode('utf-8')
                                print(f"Decoded text: {text[:100]}...")
                                f.write(f"Decoded text:\n{text}\n")
                            except:
                                pass
                        else:
                            # Text data
                            print(f"Text data: {message[:100]}...")
                            f.write(f"Text data:\n{message}\n")
                            
                            # Try to parse as JSON
                            try:
                                data = json.loads(message)
                                print(f"Parsed JSON: {json.dumps(data, indent=2)[:200]}...")
                                f.write(f"Parsed JSON:\n{json.dumps(data, indent=2)}\n")
                            except:
                                pass
                        
                        f.write("\n")
                        f.flush()  # Ensure data is written immediately
                        
                except asyncio.CancelledError:
                    print(f"\n[{datetime.now()}] Stopping...")
                    
            print(f"\n[{datetime.now()}] Total messages received: {message_count}")
            print(f"[{datetime.now()}] Data saved to: {output_file}")
            
    except websockets.exceptions.WebSocketException as e:
        print(f"[{datetime.now()}] WebSocket error: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"[{datetime.now()}] Error: {e}")
        sys.exit(1)

def main():
    """Main entry point"""
    output_file = "websocket_data.txt"
    
    if len(sys.argv) > 1:
        output_file = sys.argv[1]
    
    print("WebSocket Data Downloader")
    print("=" * 80)
    print(f"Target: {WS_URL}")
    print(f"Output: {output_file}")
    print("=" * 80)
    
    try:
        asyncio.run(download_websocket_data(output_file))
    except KeyboardInterrupt:
        print(f"\n[{datetime.now()}] Interrupted by user")
    except Exception as e:
        print(f"\n[{datetime.now()}] Fatal error: {e}")
        sys.exit(1)

if __name__ == "__main__":
    main()

simple_websocket.py

#!/usr/bin/env python3
"""
Simple WebSocket Data Capture - saves all data to a file
"""

import asyncio
import websockets

WS_URL = "wss://signals.holidayhackchallenge.com/wire/dq"

async def capture_data():
    headers = {
        "Origin": "https://signals.holidayhackchallenge.com",
        "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"
    }
    
    print("Connecting to WebSocket...")
    async with websockets.connect(WS_URL, extra_headers=headers) as ws:
        print("Connected! Receiving data (Ctrl+C to stop)...\n")
        
        with open("output.txt", "w") as f:
            count = 0
            async for message in ws:
                count += 1
                print(f"Message {count}: {message[:100]}...")
                f.write(f"{message}\n")
                f.flush()
        
        print(f"\nReceived {count} messages")

if __name__ == "__main__":
    try:
        asyncio.run(capture_data())
    except KeyboardInterrupt:
        print("\nStopped by user")

dq_decoder_final.py

#!/usr/bin/env python3
"""
DQ Signal Decoder - Final Working Version
Decodes 1-Wire protocol timing into ASCII message

Usage: python3 dq_decoder_final.py <input_file>
"""

import json
import sys

def decode_dq_signal(filename):
    """
    Decode DQ signal from JSON file
    
    The signal uses 1-Wire protocol:
    - Bits are read from HIGH→LOW transitions
    - Short low pulse (~6 ΞΌs) = bit 1
    - Long low pulse (~60 ΞΌs) = bit 0
    - Bytes are LSB first (Least Significant Bit first)
    """
    
    # Load signals from file
    signals = []
    with open(filename, 'r') as f:
        for line in f:
            line = line.strip()
            if line:
                try:
                    data = json.loads(line)
                    if 'v' in data and 't' in data:
                        signals.append(data)
                except json.JSONDecodeError:
                    continue
    
    print(f"Loaded {len(signals)} signal transitions")
    
    # Find start of data (after presence marker)
    start_idx = 0
    for i, sig in enumerate(signals):
        if 'marker' in sig and sig['marker'] == 'presence':
            start_idx = i + 1
            break
    
    print(f"Starting data decode at index {start_idx}")
    
    # Decode bits from HIGH→LOW transitions
    bits = []
    i = start_idx
    
    while i < len(signals) - 1:
        # Look for HIGH to LOW transition (start of bit slot)
        if signals[i]['v'] == 1 and signals[i+1]['v'] == 0:
            # Find the next LOW to HIGH transition (end of low pulse)
            for j in range(i+1, len(signals)):
                if signals[j]['v'] == 1:
                    # Measure low pulse duration
                    low_duration = signals[j]['t'] - signals[i+1]['t']
                    
                    # Decode bit based on pulse width
                    if low_duration < 20:
                        bits.append('1')  # Short pulse = 1
                    elif low_duration < 70:
                        bits.append('0')  # Long pulse = 0
                    
                    i = j
                    break
            else:
                break
        else:
            i += 1
    
    bit_string = ''.join(bits)
    print(f"Decoded {len(bit_string)} bits ({len(bit_string)//8} bytes)")
    
    # Convert bits to ASCII (LSB first)
    chars = []
    for i in range(0, len(bit_string) - 7, 8):
        # Take 8 bits and reverse them (LSB first)
        byte_bits = bit_string[i:i+8][::-1]
        byte_val = int(byte_bits, 2)
        chars.append(chr(byte_val))
    
    return ''.join(chars)


def main():
    if len(sys.argv) < 2:
        print("Usage: python3 dq_decoder_final.py <input_file>")
        print("\nUsing default file: /mnt/user-data/uploads/dq.out")
        filename = "/mnt/user-data/uploads/dq.out"
    else:
        filename = sys.argv[1]
    
    print("=" * 80)
    print("DQ Signal Decoder")
    print("=" * 80)
    print()
    
    try:
        message = decode_dq_signal(filename)
        
        print()
        print("=" * 80)
        print("DECODED MESSAGE:")
        print("=" * 80)
        print(message)
        print("=" * 80)
        
        # Save to output file
        output_file = "dq_decoded_message.txt"
        with open(output_file, 'w') as f:
            f.write("DQ Signal Decoded Message\n")
            f.write("=" * 80 + "\n\n")
            f.write(message + "\n\n")
            f.write("=" * 80 + "\n")
        
        print(f"\nMessage saved to: {output_file}")
        
    except Exception as e:
        print(f"Error: {e}")
        import traceback
        traceback.print_exc()
        sys.exit(1)


if __name__ == "__main__":
    main()

SPI Signal Scripts

capture_spi_signals.py

#!/usr/bin/env python3
"""
SPI Signal Capture - Captures SCK (clock) and MOSI (data) signals
"""

import asyncio
import websockets
import json
from datetime import datetime

SCK_URL = "wss://signals.holidayhackchallenge.com/wire/sck"
MOSI_URL = "wss://signals.holidayhackchallenge.com/wire/mosi"

HEADERS = {
    "Origin": "https://signals.holidayhackchallenge.com",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/142.0.0.0 Safari/537.36"
}

async def capture_signal(url, signal_name, output_file):
    """Capture a single signal"""
    print(f"[{signal_name}] Connecting to {url}...")
    
    try:
        async with websockets.connect(url, extra_headers=HEADERS) as ws:
            print(f"[{signal_name}] Connected! Receiving data...")
            
            with open(output_file, 'w') as f:
                count = 0
                async for message in ws:
                    f.write(f"{message}\n")
                    f.flush()
                    count += 1
                    
                    if count % 100 == 0:
                        print(f"[{signal_name}] Received {count} messages...")
                    
    except Exception as e:
        print(f"[{signal_name}] Error: {e}")

async def capture_both():
    """Capture both signals simultaneously"""
    print("=" * 80)
    print("SPI Signal Capture - SCK and MOSI")
    print("=" * 80)
    print()
    
    # Create tasks for both signals
    sck_task = asyncio.create_task(capture_signal(SCK_URL, "SCK", "sck.out"))
    mosi_task = asyncio.create_task(capture_signal(MOSI_URL, "MOSI", "mosi.out"))
    
    # Wait for both to complete (or Ctrl+C)
    try:
        await asyncio.gather(sck_task, mosi_task)
    except KeyboardInterrupt:
        print("\n\nStopping capture...")
        sck_task.cancel()
        mosi_task.cancel()
        
        try:
            await asyncio.gather(sck_task, mosi_task)
        except asyncio.CancelledError:
            pass
    
    print("\n" + "=" * 80)
    print("Capture complete!")
    print("Files saved: sck.out, mosi.out")
    print("=" * 80)

def main():
    try:
        asyncio.run(capture_both())
    except KeyboardInterrupt:
        print("\nInterrupted by user")

if __name__ == "__main__":
    main()

spi_decoder.py

#!/usr/bin/env python3
"""
SPI Signal Decoder
Decodes SPI data using SCK (clock) and MOSI (data) signals
Then applies XOR decryption with key "icy"
"""

import json
import sys

def load_signal(filename):
    """Load signal data from file"""
    signals = []
    with open(filename, 'r') as f:
        for line in f:
            line = line.strip()
            if line:
                try:
                    data = json.loads(line)
                    if 'v' in data and 't' in data:
                        signals.append(data)
                except json.JSONDecodeError:
                    continue
    return signals

def decode_spi(sck_signals, mosi_signals):
    """
    Decode SPI data by sampling MOSI on SCK rising edges
    
    SPI protocol:
    - Data is sampled on clock edges (usually rising edge)
    - MOSI = Master Out Slave In (data line)
    - SCK = Serial Clock
    """
    
    # Find rising edges of clock (0β†’1 transitions)
    clock_rising_edges = []
    for i in range(len(sck_signals) - 1):
        if sck_signals[i]['v'] == 0 and sck_signals[i+1]['v'] == 1:
            clock_rising_edges.append(sck_signals[i+1]['t'])
    
    print(f"Found {len(clock_rising_edges)} clock rising edges")
    
    # Sample MOSI data at each clock rising edge
    bits = []
    mosi_idx = 0
    
    for edge_time in clock_rising_edges:
        # Find MOSI value at this time
        # Look for the MOSI signal state at or just before this time
        while mosi_idx < len(mosi_signals) - 1 and mosi_signals[mosi_idx + 1]['t'] <= edge_time:
            mosi_idx += 1
        
        # Sample the MOSI bit value
        bit_value = str(mosi_signals[mosi_idx]['v'])
        bits.append(bit_value)
    
    print(f"Sampled {len(bits)} bits")
    
    return ''.join(bits)

def bits_to_bytes(bit_string, msb_first=True):
    """Convert bit string to bytes"""
    bytes_list = []
    
    for i in range(0, len(bit_string) - 7, 8):
        byte_bits = bit_string[i:i+8]
        
        if not msb_first:
            byte_bits = byte_bits[::-1]  # Reverse for LSB first
        
        byte_val = int(byte_bits, 2)
        bytes_list.append(byte_val)
    
    return bytes(bytes_list)

def xor_decrypt(data, key):
    """XOR decrypt data with repeating key"""
    key_bytes = key.encode('utf-8')
    decrypted = bytearray()
    
    for i, byte in enumerate(data):
        key_byte = key_bytes[i % len(key_bytes)]
        decrypted.append(byte ^ key_byte)
    
    return bytes(decrypted)

def main():
    if len(sys.argv) < 3:
        print("Usage: python3 spi_decoder.py <sck_file> <mosi_file>")
        print("\nUsing default files...")
        sck_file = "sck.out"
        mosi_file = "mosi.out"
    else:
        sck_file = sys.argv[1]
        mosi_file = sys.argv[2]
    
    print("=" * 80)
    print("SPI Signal Decoder")
    print("=" * 80)
    print()
    
    # Load signals
    print(f"Loading SCK from {sck_file}...")
    sck_signals = load_signal(sck_file)
    print(f"  Loaded {len(sck_signals)} SCK transitions")
    
    print(f"Loading MOSI from {mosi_file}...")
    mosi_signals = load_signal(mosi_file)
    print(f"  Loaded {len(mosi_signals)} MOSI transitions")
    print()
    
    # Decode SPI
    print("Decoding SPI data...")
    bit_string = decode_spi(sck_signals, mosi_signals)
    print(f"First 80 bits: {bit_string[:80]}")
    print()
    
    # Try both MSB and LSB first
    for bit_order_name, msb_first in [("MSB first", True), ("LSB first", False)]:
        print(f"\n{'='*80}")
        print(f"Trying {bit_order_name}")
        print(f"{'='*80}")
        
        # Convert to bytes
        data_bytes = bits_to_bytes(bit_string, msb_first=msb_first)
        print(f"Raw bytes ({len(data_bytes)}): {data_bytes[:50]}...")
        
        # Decrypt with XOR key "icy"
        decrypted = xor_decrypt(data_bytes, "icy")
        
        # Try to decode as text
        try:
            message = decrypted.decode('utf-8', errors='ignore')
            print(f"\nDecrypted message:")
            print("-" * 80)
            print(message)
            print("-" * 80)
            
            # Check if it looks like readable text
            printable_count = sum(1 for c in message if c.isprintable())
            print(f"Printable characters: {printable_count}/{len(message)} ({100*printable_count/len(message):.1f}%)")
            
            # If this looks good, save it
            if printable_count / len(message) > 0.8:
                output_file = "spi_decrypted_message.txt"
                with open(output_file, 'w') as f:
                    f.write("SPI Decrypted Message\n")
                    f.write("=" * 80 + "\n\n")
                    f.write(f"Bit order: {bit_order_name}\n")
                    f.write(f"XOR key: icy\n\n")
                    f.write(message + "\n\n")
                    f.write("=" * 80 + "\n")
                print(f"\nβœ“ Message saved to {output_file}")
        
        except Exception as e:
            print(f"Decoding error: {e}")

if __name__ == "__main__":
    main()

spi_capture_and_decode.py

#!/usr/bin/env python3
"""
Complete SPI Capture and Decode Pipeline
1. Captures SCK and MOSI signals from WebSockets
2. Decodes SPI data
3. Applies XOR decryption with key "icy"
"""

import asyncio
import websockets
import json

SCK_URL = "wss://signals.holidayhackchallenge.com/wire/sck"
MOSI_URL = "wss://signals.holidayhackchallenge.com/wire/mosi"

HEADERS = {
    "Origin": "https://signals.holidayhackchallenge.com",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}

class SignalCapture:
    def __init__(self):
        self.sck_signals = []
        self.mosi_signals = []
    
    async def capture_signal(self, url, signal_list, name):
        """Capture signal into list"""
        print(f"[{name}] Connecting...")
        async with websockets.connect(url, extra_headers=HEADERS) as ws:
            print(f"[{name}] Connected! Capturing...")
            
            count = 0
            async for message in ws:
                try:
                    data = json.loads(message)
                    if 'v' in data and 't' in data:
                        signal_list.append(data)
                        count += 1
                        
                        if count % 100 == 0:
                            print(f"[{name}] Captured {count} signals...")
                except:
                    pass
    
    async def capture_both(self, duration=10):
        """Capture both signals for specified duration (seconds)"""
        print("=" * 80)
        print("Starting SPI Capture...")
        print(f"Will capture for {duration} seconds (or press Ctrl+C)")
        print("=" * 80)
        print()
        
        sck_task = asyncio.create_task(
            self.capture_signal(SCK_URL, self.sck_signals, "SCK")
        )
        mosi_task = asyncio.create_task(
            self.capture_signal(MOSI_URL, self.mosi_signals, "MOSI")
        )
        
        try:
            await asyncio.wait_for(
                asyncio.gather(sck_task, mosi_task),
                timeout=duration
            )
        except asyncio.TimeoutError:
            print(f"\n{duration} seconds elapsed, stopping capture...")
            sck_task.cancel()
            mosi_task.cancel()
        except KeyboardInterrupt:
            print("\nStopping capture...")
            sck_task.cancel()
            mosi_task.cancel()
        
        print(f"\nCaptured {len(self.sck_signals)} SCK and {len(self.mosi_signals)} MOSI signals")

def decode_spi(sck_signals, mosi_signals):
    """Decode SPI by sampling MOSI on SCK rising edges"""
    
    # Find rising edges
    rising_edges = []
    for i in range(len(sck_signals) - 1):
        if sck_signals[i]['v'] == 0 and sck_signals[i+1]['v'] == 1:
            rising_edges.append(sck_signals[i+1]['t'])
    
    # Sample MOSI at each edge
    bits = []
    mosi_idx = 0
    
    for edge_time in rising_edges:
        while mosi_idx < len(mosi_signals) - 1 and mosi_signals[mosi_idx + 1]['t'] <= edge_time:
            mosi_idx += 1
        bits.append(str(mosi_signals[mosi_idx]['v']))
    
    return ''.join(bits)

def bits_to_bytes(bit_string, msb_first=True):
    """Convert bits to bytes"""
    bytes_list = []
    for i in range(0, len(bit_string) - 7, 8):
        byte_bits = bit_string[i:i+8]
        if not msb_first:
            byte_bits = byte_bits[::-1]
        bytes_list.append(int(byte_bits, 2))
    return bytes(bytes_list)

def xor_decrypt(data, key):
    """XOR decrypt with repeating key"""
    key_bytes = key.encode('utf-8')
    result = bytearray()
    for i, byte in enumerate(data):
        result.append(byte ^ key_bytes[i % len(key_bytes)])
    return bytes(result)

async def main():
    # Capture signals
    capture = SignalCapture()
    await capture.capture_both(duration=10)
    
    if not capture.sck_signals or not capture.mosi_signals:
        print("Error: No signals captured!")
        return
    
    print("\n" + "=" * 80)
    print("Decoding SPI data...")
    print("=" * 80)
    
    # Decode
    bit_string = decode_spi(capture.sck_signals, capture.mosi_signals)
    print(f"Decoded {len(bit_string)} bits")
    
    # Try both bit orders
    for order_name, msb in [("MSB first", True), ("LSB first", False)]:
        print(f"\n{'='*80}")
        print(f"{order_name}")
        print(f"{'='*80}")
        
        data = bits_to_bytes(bit_string, msb_first=msb)
        decrypted = xor_decrypt(data, "icy")
        
        try:
            message = decrypted.decode('utf-8', errors='ignore')
            printable = sum(1 for c in message if c.isprintable())
            
            print(message[:200])
            print(f"\nPrintable: {printable}/{len(message)} ({100*printable/len(message):.1f}%)")
            
            if printable / len(message) > 0.8:
                with open('spi_message.txt', 'w') as f:
                    f.write(message)
                print(f"\nβœ“ Saved to spi_message.txt")
        except:
            print("Failed to decode as text")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nInterrupted")

I2C Signal Scripts

capture_i2c_signals.py

#!/usr/bin/env python3
"""
I2C Signal Capture - Captures SCL (clock) and SDA (data)
"""

import asyncio
import websockets
import json

SCL_URL = "wss://signals.holidayhackchallenge.com/wire/scl"
SDA_URL = "wss://signals.holidayhackchallenge.com/wire/sda"

HEADERS = {
    "Origin": "https://signals.holidayhackchallenge.com",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}

async def capture_signal(url, signal_name, output_file):
    """Capture a single signal"""
    print(f"[{signal_name}] Connecting to {url}...")
    
    try:
        async with websockets.connect(url, extra_headers=HEADERS) as ws:
            print(f"[{signal_name}] Connected! Receiving data...")
            
            with open(output_file, 'w') as f:
                count = 0
                async for message in ws:
                    f.write(f"{message}\n")
                    f.flush()
                    count += 1
                    
                    if count % 100 == 0:
                        print(f"[{signal_name}] Received {count} messages...")
                    
    except Exception as e:
        print(f"[{signal_name}] Error: {e}")

async def capture_both():
    """Capture both signals simultaneously"""
    print("=" * 80)
    print("I2C Signal Capture - SCL and SDA")
    print("=" * 80)
    print()
    
    scl_task = asyncio.create_task(capture_signal(SCL_URL, "SCL", "scl.out"))
    sda_task = asyncio.create_task(capture_signal(SDA_URL, "SDA", "sda.out"))
    
    try:
        await asyncio.gather(scl_task, sda_task)
    except KeyboardInterrupt:
        print("\n\nStopping capture...")
        scl_task.cancel()
        sda_task.cancel()
        
        try:
            await asyncio.gather(scl_task, sda_task)
        except asyncio.CancelledError:
            pass
    
    print("\n" + "=" * 80)
    print("Capture complete!")
    print("Files saved: scl.out, sda.out")
    print("=" * 80)

def main():
    try:
        asyncio.run(capture_both())
    except KeyboardInterrupt:
        print("\nInterrupted by user")

if __name__ == "__main__":
    main()

i2c_decoder.py

#!/usr/bin/env python3
"""
I2C Signal Decoder
Decodes I2C data from SCL (clock) and SDA (data) signal files
Applies XOR decryption with key "bananza"
Filters for temperature sensor at address 0x3C
"""

import json
import sys

def load_signal(filename):
    """Load signal data from file"""
    signals = []
    with open(filename, 'r') as f:
        for line in f:
            line = line.strip()
            if line:
                try:
                    data = json.loads(line)
                    if 'v' in data and 't' in data:
                        signals.append(data)
                except json.JSONDecodeError:
                    continue
    return signals

def find_i2c_transactions(scl_signals, sda_signals):
    """
    Find I2C transactions using START and STOP conditions
    START: SDA falls (1β†’0) while SCL is high
    STOP: SDA rises (0β†’1) while SCL is high
    """
    
    transactions = []
    
    # Helper to get SCL value at a given time
    def get_scl_at_time(t):
        scl_val = 0
        for scl in scl_signals:
            if scl['t'] <= t:
                scl_val = scl['v']
            else:
                break
        return scl_val
    
    # Find START conditions
    for i in range(len(sda_signals) - 1):
        if sda_signals[i]['v'] == 1 and sda_signals[i+1]['v'] == 0:
            start_time = sda_signals[i+1]['t']
            
            if get_scl_at_time(start_time) == 1:
                # Found START, now find STOP
                for j in range(i+1, len(sda_signals) - 1):
                    if sda_signals[j]['v'] == 0 and sda_signals[j+1]['v'] == 1:
                        stop_time = sda_signals[j+1]['t']
                        
                        if get_scl_at_time(stop_time) == 1:
                            transactions.append({
                                'start': start_time,
                                'stop': stop_time
                            })
                            break
    
    return transactions

def decode_i2c_transaction(scl_signals, sda_signals, start_time, stop_time):
    """
    Decode bits from an I2C transaction
    Sample SDA on SCL rising edges
    """
    
    bits = []
    
    # Find SCL rising edges in this time window
    for i in range(len(scl_signals) - 1):
        t = scl_signals[i+1]['t']
        
        if start_time < t < stop_time:
            # Check for rising edge
            if scl_signals[i]['v'] == 0 and scl_signals[i+1]['v'] == 1:
                # Sample SDA at this time
                sda_val = 0
                for sda in sda_signals:
                    if sda['t'] <= t:
                        sda_val = sda['v']
                    else:
                        break
                
                bits.append(str(sda_val))
    
    return ''.join(bits)

def parse_i2c_bytes(bit_string):
    """
    Parse I2C bits into bytes
    Format: [8 data bits][1 ACK bit] repeated
    """
    
    bytes_list = []
    i = 0
    
    while i + 8 <= len(bit_string):
        # Extract 8 bits (MSB first)
        byte_bits = bit_string[i:i+8]
        byte_val = int(byte_bits, 2)
        bytes_list.append(byte_val)
        
        # Skip ACK/NACK bit
        i += 9
    
    return bytes_list

def xor_decrypt(data, key):
    """XOR decrypt with repeating key"""
    key_bytes = key.encode('utf-8')
    result = bytearray()
    for i, byte in enumerate(data):
        result.append(byte ^ key_bytes[i % len(key_bytes)])
    return bytes(result)

def main():
    if len(sys.argv) < 3:
        print("Usage: python3 i2c_decoder.py <scl_file> <sda_file>")
        print("\nUsing default files...")
        scl_file = "scl.out"
        sda_file = "sda.out"
    else:
        scl_file = sys.argv[1]
        sda_file = sys.argv[2]
    
    print("=" * 80)
    print("I2C Signal Decoder")
    print("=" * 80)
    print()
    
    # Load signals
    print(f"Loading SCL from {scl_file}...")
    scl_signals = load_signal(scl_file)
    print(f"  Loaded {len(scl_signals)} SCL transitions")
    
    print(f"Loading SDA from {sda_file}...")
    sda_signals = load_signal(sda_file)
    print(f"  Loaded {len(sda_signals)} SDA transitions")
    print()
    
    # Find transactions
    print("Finding I2C transactions...")
    transactions = find_i2c_transactions(scl_signals, sda_signals)
    print(f"Found {len(transactions)} transactions")
    print()
    
    # Decode each transaction
    all_data_bytes = []
    
    for idx, trans in enumerate(transactions):
        print(f"Transaction {idx + 1}:")
        print(f"  Time: {trans['start']} β†’ {trans['stop']}")
        
        # Decode bits
        bits = decode_i2c_transaction(scl_signals, sda_signals, 
                                      trans['start'], trans['stop'])
        
        if not bits:
            print("  No bits decoded")
            continue
        
        print(f"  Bits ({len(bits)}): {bits[:72]}{'...' if len(bits) > 72 else ''}")
        
        # Parse bytes
        trans_bytes = parse_i2c_bytes(bits)
        
        if not trans_bytes:
            print("  No bytes parsed")
            continue
        
        # First byte contains address (7 bits) + R/W bit
        addr_byte = trans_bytes[0]
        address = addr_byte >> 1
        read_write = "READ" if (addr_byte & 1) else "WRITE"
        
        print(f"  Address: 0x{address:02X}, Operation: {read_write}")
        print(f"  Data: {[f'0x{b:02x}' for b in trans_bytes[1:]]}")
        
        # Filter for sensor at 0x3C
        if address == 0x3C and len(trans_bytes) > 1:
            all_data_bytes.extend(trans_bytes[1:])
            print(f"  βœ“ From sensor 0x3C - collected {len(trans_bytes)-1} data bytes")
        
        print()
    
    # Decrypt collected data
    if all_data_bytes:
        print("=" * 80)
        print(f"Collected {len(all_data_bytes)} data bytes from sensor 0x3C")
        print("=" * 80)
        print()
        
        encrypted = bytes(all_data_bytes)
        print(f"Encrypted (hex): {encrypted.hex()}")
        print()
        
        # Decrypt with XOR key "bananza"
        decrypted = xor_decrypt(encrypted, "bananza")
        
        try:
            message = decrypted.decode('utf-8', errors='ignore')
            
            print("=" * 80)
            print("DECRYPTED MESSAGE:")
            print("=" * 80)
            print(message)
            print("=" * 80)
            
            # Save to file
            with open('i2c_decrypted.txt', 'w') as f:
                f.write("I2C Decrypted Message\n")
                f.write("=" * 80 + "\n\n")
                f.write(f"Sensor Address: 0x3C\n")
                f.write(f"XOR Key: bananza\n")
                f.write(f"Data bytes: {len(all_data_bytes)}\n\n")
                f.write(message + "\n\n")
                f.write("=" * 80 + "\n")
            
            print(f"\nβœ“ Saved to i2c_decrypted.txt")
        
        except Exception as e:
            print(f"Error decoding as text: {e}")
            print(f"Raw bytes: {decrypted}")
    else:
        print("No data bytes collected from sensor 0x3C")

if __name__ == "__main__":
    main()

i2c_capture_and_decode.py

#!/usr/bin/env python3
"""
I2C Signal Capture and Decoder
Captures SCL (clock) and SDA (data) signals
Decodes I2C protocol and applies XOR decryption with key "bananza"
Temperature sensor address: 0x3C
"""

import asyncio
import websockets
import json

SCL_URL = "wss://signals.holidayhackchallenge.com/wire/scl"
SDA_URL = "wss://signals.holidayhackchallenge.com/wire/sda"

HEADERS = {
    "Origin": "https://signals.holidayhackchallenge.com",
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36"
}

class I2CCapture:
    def __init__(self):
        self.scl_signals = []
        self.sda_signals = []
    
    async def capture_signal(self, url, signal_list, name):
        """Capture signal into list"""
        print(f"[{name}] Connecting...")
        async with websockets.connect(url, extra_headers=HEADERS) as ws:
            print(f"[{name}] Connected! Capturing...")
            
            count = 0
            async for message in ws:
                try:
                    data = json.loads(message)
                    if 'v' in data and 't' in data:
                        signal_list.append(data)
                        count += 1
                        
                        if count % 100 == 0:
                            print(f"[{name}] Captured {count} signals...")
                except:
                    pass
    
    async def capture_both(self, duration=10):
        """Capture both signals for specified duration"""
        print("=" * 80)
        print("I2C Signal Capture - SCL (clock) and SDA (data)")
        print(f"Duration: {duration} seconds (or press Ctrl+C)")
        print("=" * 80)
        print()
        
        scl_task = asyncio.create_task(
            self.capture_signal(SCL_URL, self.scl_signals, "SCL")
        )
        sda_task = asyncio.create_task(
            self.capture_signal(SDA_URL, self.sda_signals, "SDA")
        )
        
        try:
            await asyncio.wait_for(
                asyncio.gather(scl_task, sda_task),
                timeout=duration
            )
        except asyncio.TimeoutError:
            print(f"\n{duration} seconds elapsed, stopping...")
            scl_task.cancel()
            sda_task.cancel()
        except KeyboardInterrupt:
            print("\nStopping...")
            scl_task.cancel()
            sda_task.cancel()
        
        print(f"\nCaptured {len(self.scl_signals)} SCL and {len(self.sda_signals)} SDA signals")

def find_i2c_transactions(scl_signals, sda_signals):
    """
    Find I2C transactions by detecting START and STOP conditions
    START: SDA falls while SCL is high
    STOP: SDA rises while SCL is high
    """
    
    transactions = []
    
    # Find START conditions (SDA 1β†’0 while SCL=1)
    for i in range(len(sda_signals) - 1):
        if sda_signals[i]['v'] == 1 and sda_signals[i+1]['v'] == 0:
            start_time = sda_signals[i+1]['t']
            
            # Check if SCL is high at this time
            scl_high = False
            for scl in scl_signals:
                if scl['t'] <= start_time:
                    scl_high = (scl['v'] == 1)
                else:
                    break
            
            if scl_high:
                # Find corresponding STOP (SDA 0β†’1 while SCL=1)
                for j in range(i+1, len(sda_signals) - 1):
                    if sda_signals[j]['v'] == 0 and sda_signals[j+1]['v'] == 1:
                        stop_time = sda_signals[j+1]['t']
                        
                        # Check if SCL is high at STOP time
                        scl_high_stop = False
                        for scl in scl_signals:
                            if scl['t'] <= stop_time:
                                scl_high_stop = (scl['v'] == 1)
                            else:
                                break
                        
                        if scl_high_stop:
                            transactions.append({
                                'start': start_time,
                                'stop': stop_time
                            })
                            break
    
    return transactions

def decode_i2c_bits(scl_signals, sda_signals, start_time, stop_time):
    """
    Decode I2C bits between START and STOP
    Bits are sampled on SCL rising edge (or high level)
    """
    
    bits = []
    
    # Find SCL rising edges within the transaction
    for i in range(len(scl_signals) - 1):
        if scl_signals[i]['t'] >= start_time and scl_signals[i+1]['t'] <= stop_time:
            # SCL rising edge (0β†’1)
            if scl_signals[i]['v'] == 0 and scl_signals[i+1]['v'] == 1:
                sample_time = scl_signals[i+1]['t']
                
                # Sample SDA at this time
                sda_value = 0
                for sda in sda_signals:
                    if sda['t'] <= sample_time:
                        sda_value = sda['v']
                    else:
                        break
                
                bits.append(str(sda_value))
    
    return ''.join(bits)

def parse_i2c_bytes(bit_string):
    """
    Parse I2C bit string into bytes
    I2C format: 8 data bits + 1 ACK bit (repeated)
    """
    
    bytes_list = []
    i = 0
    
    while i + 8 < len(bit_string):
        # Get 8 data bits (MSB first)
        byte_bits = bit_string[i:i+8]
        byte_val = int(byte_bits, 2)
        bytes_list.append(byte_val)
        
        # Skip ACK/NACK bit
        i += 9
    
    return bytes_list

def xor_decrypt(data, key):
    """XOR decrypt with repeating key"""
    key_bytes = key.encode('utf-8')
    result = bytearray()
    for i, byte in enumerate(data):
        result.append(byte ^ key_bytes[i % len(key_bytes)])
    return bytes(result)

def decode_i2c_data(scl_signals, sda_signals):
    """Main I2C decoding function"""
    
    print("\n" + "=" * 80)
    print("Decoding I2C Data")
    print("=" * 80)
    
    # Find transactions
    transactions = find_i2c_transactions(scl_signals, sda_signals)
    print(f"Found {len(transactions)} I2C transactions")
    
    all_bytes = []
    
    for idx, trans in enumerate(transactions):
        print(f"\nTransaction {idx + 1}:")
        print(f"  Start: {trans['start']}, Stop: {trans['stop']}")
        
        # Decode bits
        bits = decode_i2c_bits(scl_signals, sda_signals, trans['start'], trans['stop'])
        print(f"  Bits: {bits[:80]}{'...' if len(bits) > 80 else ''}")
        
        # Parse into bytes
        if bits:
            trans_bytes = parse_i2c_bytes(bits)
            
            # First byte is address + R/W bit
            if trans_bytes:
                addr = trans_bytes[0] >> 1  # Address is upper 7 bits
                rw = trans_bytes[0] & 1     # R/W is LSB
                print(f"  Address: 0x{addr:02X}, R/W: {'READ' if rw else 'WRITE'}")
                print(f"  Data bytes: {trans_bytes[1:]}")
                
                # If this is from our sensor (0x3C)
                if addr == 0x3C:
                    all_bytes.extend(trans_bytes[1:])  # Skip address byte
    
    return all_bytes

async def main():
    # Capture signals
    capture = I2CCapture()
    await capture.capture_both(duration=10)
    
    if not capture.scl_signals or not capture.sda_signals:
        print("Error: No signals captured!")
        return
    
    # Decode I2C
    data_bytes = decode_i2c_data(capture.scl_signals, capture.sda_signals)
    
    if not data_bytes:
        print("\nNo data bytes found from sensor 0x3C")
        return
    
    print("\n" + "=" * 80)
    print("Decryption")
    print("=" * 80)
    
    # Decrypt with key "bananza"
    encrypted = bytes(data_bytes)
    print(f"\nEncrypted data ({len(encrypted)} bytes):")
    print(f"Hex: {encrypted.hex()}")
    
    decrypted = xor_decrypt(encrypted, "bananza")
    
    # Try to decode as text
    try:
        message = decrypted.decode('utf-8', errors='ignore')
        
        print(f"\n{'='*80}")
        print("DECRYPTED MESSAGE:")
        print(f"{'='*80}")
        print(message)
        print(f"{'='*80}")
        
        # Save to file
        with open('i2c_decrypted_message.txt', 'w') as f:
            f.write("I2C Decrypted Message\n")
            f.write("=" * 80 + "\n\n")
            f.write(f"Sensor Address: 0x3C\n")
            f.write(f"XOR Key: bananza\n\n")
            f.write(message + "\n\n")
            f.write("=" * 80 + "\n")
        
        print(f"\nβœ“ Message saved to i2c_decrypted_message.txt")
    
    except Exception as e:
        print(f"Error decoding: {e}")
        print(f"Raw decrypted bytes: {decrypted}")

if __name__ == "__main__":
    try:
        asyncio.run(main())
    except KeyboardInterrupt:
        print("\nInterrupted")

πŸ“¦ Requirements

Python Dependencies

Create a requirements.txt file:

websockets>=12.0

Install with:

pip install -r requirements.txt

Or install directly:

pip install websockets

πŸš€ Quick Start Guide

Decode 1-Wire (DQ) Signal

# Capture the signal
python3 simple_websocket.py

# Decode the captured data
python3 dq_decoder_final.py dq.out

Decode SPI Signal

# Option 1: All-in-one
python3 spi_capture_and_decode.py

# Option 2: Separate capture and decode
python3 capture_spi_signals.py  # Press Ctrl+C after ~10 seconds
python3 spi_decoder.py sck.out mosi.out

Decode I2C Signal

# Option 1: All-in-one
python3 i2c_capture_and_decode.py

# Option 2: Separate capture and decode
python3 capture_i2c_signals.py  # Press Ctrl+C after ~10 seconds
python3 i2c_decoder.py scl.out sda.out

πŸ’‘ Usage Tips

WebSocket Capture

Decoding

Troubleshooting


πŸ“Š Expected Output

1-Wire (DQ)

DECODED MESSAGE:
read and decrypt the SPI bus data using the XOR key: icy

SPI

DECRYPTED MESSAGE:
read and decrypt the I2C bus data using the XOR key: bananza. 
the temperature sensor address is 0x3C

I2C

SENSOR READINGS:
Temperature: 32.84Β°C
Pressure: 1013 hPa
Humidity: 45% and 85%
Light: 3450 lux and 3850 lux

Complete Technical Report with Full Source Code
Generated: December 2025
SANS Holiday Hack Challenge 2025


← Back to all posts