Boakye I. Ababio
Published on

HDFS; Top-3 IPs for each hour of IP stream

Authors

To show the top-3 IPs for each hour:

Open In Github

First run the following command:

cat ../../mapreduce-test-data/access.log | python mapper.py | sort -k1,1 | python reducer.py > intermediate_results.txt

And then run the following command:

cat intermediate_results.txt | python mapper11.py | sort -k1,1 | python reducer11.py

running the code:

 ./run_script.py 1-2

If the log data does not contain any entries for the specified hour range, the script will not output anything but it will also not yield an error.

If you strictly want to run it without run_script.py then you need to manually export the environment variables FROM_HOUR and TO_HOUR in the shell before running the command:

export FROM_HOUR=0
export TO_HOUR=1
cat ../../mapreduce-test-data/access.log | python mapper1.py | sort -k1,1 | python reducer1.py > intermediate_results.txt
shot1 shot2

Overview of the project:

Project

Files in the directory:

mapreduce/
├── mappers/
│   ├── mapper.py
│   ├── mapper1.py
│   ├── mapper11.py
├── reducers/
│   ├── reducer.py
│   ├── reducer1.py
│   ├── reducer11.py
│   ├── reducer2.py
├── scripts/
│   ├── run_script.py
├── data/
│   ├── access.log
│   ├── intermediate_results.txt
├── screenshots/
│   ├── screen_shot1.png
│   ├── screen_shot2.png
└── readme.md

Structured Code

Here’s how the code for the above project structure

mappers/mapper.py

#!/usr/bin/python
import sys
import re

pattern = re.compile('(?P<ip>\d+.\d+.\d+.\d+).*?\[(\d+)/[a-zA-Z]+/\d{4}:(?P<hour>\d{2}):\d{2}:\d{2}.*')

for line in sys.stdin:
    match = pattern.search(line)
    if match:
        print('%s\t%s' % ('[' + match.group('hour') + ']' + match.group('ip'), 1))

mappers/mapper1.py

#!/usr/bin/python
import sys
import re
import os

from_hour = int(os.environ['FROM_HOUR'])
to_hour = int(os.environ['TO_HOUR'])

pattern = re.compile('(?P<ip>\d+.\d+.\d+.\d+).*?\[(\d+)/[a-zA-Z]+/\d{4}:(?P<hour>\d{2}):\d{2}:\d{2}.*')

for line in sys.stdin:
    match = pattern.search(line)
    if match:
        hour = int(match.group('hour'))
        if from_hour <= hour < to_hour:
            print('%s\t%s' % ('[' + match.group('hour') + ']' + match.group('ip'), 1))

mappers/mapper11.py

#!/usr/bin/env python
import sys

for line in sys.stdin:
    line = line.strip()
    print('%s\t%s' % (line, 1))

reducers/reducer.py

#!/usr/bin/python
import sys

current_key = None
current_count = 0

for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_key == key:
        current_count += count
    else:
        if current_key:
            print('%s\t%s' % (current_key, current_count))
        current_key = key
        current_count = count

if current_key == key:
    print('%s\t%s' % (current_key, current_count))

reducers/reducer1.py

#!/usr/bin/python
import sys

current_key = None
current_count = 0

for line in sys.stdin:
    line = line.strip()
    key, count = line.split('\t', 1)
    try:
        count = int(count)
    except ValueError:
        continue
    if current_key == key:
        current_count += count
    else:
        if current_key:
            print('%s\t%s' % (current_key, current_count))
        current_key = key
        current_count = count

if current_key == key:
    print('%s\t%s' % (current_key, current_count))

reducers/reducer11.py

#!/usr/bin/python
import sys
import heapq

current_hour = None
ip_count_heap = []

for line in sys.stdin:
    line = line.strip()
    hour_ip, count = line.rsplit("\t", 1)
    hour, ip = hour_ip[1:].split("]", 1)
    count = int(count)

    if hour != current_hour:
        if current_hour is not None:
            top_ip_counts = heapq.nlargest(3, ip_count_heap)
            for ic in top_ip_counts:
                print('%s\t%s:%s' % (current_hour, ic[1], ic[0]))
        current_hour = hour
        ip_count_heap = []

    heapq.heappush(ip_count_heap, (count, ip))

if current_hour == hour:
    top_ip_counts = heapq.nlargest(3, ip_count_heap)
    for ic in top_ip_counts:
        print('%s\t%s:%s' % (current_hour, ic[1], ic[0]))

reducers/reducer2.py

#!/usr/bin/python
import sys
import heapq
import os

from_hour = int(os.environ['FROM_HOUR'])
to_hour = int(os.environ['TO_HOUR'])

current_hour = None
ip_count_heap = []

for line in sys.stdin:
    line = line.strip()
    hour_ip, count = line.split('\t', 1)
    hour, ip = hour_ip[1:].split(']', 1)
    hour = int(hour)
    count = int(count)

    if from_hour <= hour < to_hour:
        if hour != current_hour:
            if current_hour is not None:
                top_ip_counts = heapq.nlargest(3, ip_count_heap)
                for ic in top_ip_counts:
                    print('%s\t%s:%s' % (current_hour, ic[1], ic[0]))
            current_hour = hour
            ip_count_heap = []
        heapq.heappush(ip_count_heap, (count, ip))

if from_hour <= hour < to_hour:
    top_ip_counts = heapq.nlargest(3, ip_count_heap)
    for ic in top_ip_counts:
        print('%s\t%s:%s' % (current_hour, ic[1], ic[0]))

scripts/run_script.py

#!/usr/bin/python
import os
import sys

# Get user input
args = sys.argv
if len(args) != 2 or len(args[1].split("-")) != 2:
    print("Incorrect usage. Please provide the range in the format 'start_hour-end_hour', where start_hour < end_hour. Example: '0-1'")
    sys.exit(1)

from_hour, to_hour = map(int, args[1].split("-"))

if from_hour >= to_hour:
    print("Invalid range. Ensure start_hour < end_hour.")
    sys.exit(1)

os.environ["FROM_HOUR"] = str(from_hour)
os.environ["TO_HOUR"] = str(to_hour)

# Run MapReduce
os.system("cat ../../data/access.log | python mappers/mapper1.py | sort -k1,1 | python reducers/reducer1.py > data/intermediate_results.txt")
os.system("cat data/intermediate_results.txt | python reducers/reducer2.py")

mapper.py: This file contains the code for the first map reduce. It reads the access.log file and outputs the IP address and the hour of the log entry.

reducer.py: This file contains the code for the first reduce. It reads the output of the first map reduce and outputs the IP address and the hour of the log entry.

mapper1.py: This file contains the code for the second map reduce. It reads the intermediate_results.txt file and outputs the IP address and the hour of the log entry.

reducer1.py: This file contains the code for the second reduce. It reads the output of the second map reduce and outputs the top 3 IP addresses for each hour.

mapper11.py: This file contains the code for the second map reduce. It reads the intermediate_results.txt file and outputs the IP address and the hour of the log entry.

reducer11.py: This file contains the code for the second reduce. It reads the output of the second map reduce and outputs the top 3 IP addresses for each hour.

intermediate_results.txt: This file contains the output of the first map reduce. It is used as input for the second map reduce.