RPA / Robocorp
Data: monitor / Notify on changes
Notify on changes via e-mail
tasks.py
import os
import hashlib
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
from pathlib import Path
import json
from robocorp.tasks import task
from robocorp import browser
# Configuration - modify these values
WEBSITE_URL = "https://example.com/page-to-monitor" # The URL to monitor
CSS_SELECTOR = "div.content" # The specific part of the page to monitor
CHECK_INTERVAL = 3600 # Check every hour (in seconds)
RETRY_INTERVAL = 300 # If check fails, retry after 5 minutes
DATA_FOLDER = "output/monitoring"
MAX_ATTEMPTS = 3 # Maximum number of attempts per check
# Email configuration
EMAIL_CONFIG = {
"smtp_server": "smtp.gmail.com",
"smtp_port": 587,
"username": "[email protected]", # Update with your email
"password": "", # Use an app password for Gmail
"sender": "[email protected]", # Update with your email
"recipient": "[email protected]", # Update with recipient email
}
@task
def monitor_website_for_changes():
"""
Monitor a website for changes and send email notifications when changes are detected.
This robot:
1. Checks a specific part of a webpage at regular intervals
2. Detects if content has changed since the last check
3. Sends an email notification when changes are found
4. Keeps a log of previous checks
"""
# Create data directory if it doesn't exist
data_path = Path(DATA_FOLDER)
data_path.mkdir(parents=True, exist_ok=True)
hash_file = data_path / "previous_hash.txt"
log_file = data_path / "monitoring_log.json"
# Load previous hash if exists
previous_hash = None
if hash_file.exists():
previous_hash = hash_file.read_text().strip()
print(f"Loaded previous hash: {previous_hash}")
# Initialize or load log
log = []
if log_file.exists():
try:
log = json.loads(log_file.read_text())
except json.JSONDecodeError:
print("Error reading log file, starting with empty log")
while True:
timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
print(f"Checking website at {timestamp}")
# Check if content has changed
try:
current_hash = get_content_hash(WEBSITE_URL, CSS_SELECTOR)
# Log the check
check_entry = {
"timestamp": timestamp,
"status": "success",
"hash": current_hash,
}
# Compare with previous hash
if previous_hash and current_hash != previous_hash:
print("Content change detected!")
check_entry["change_detected"] = True
# Save the new hash
hash_file.write_text(current_hash)
# Send notification email
if EMAIL_CONFIG["password"]: # Only if password is set
email_subject = f"Website Change Detected: {WEBSITE_URL}"
email_body = f"""
Website Change Detected
A change was detected on the monitored website.
URL: {WEBSITE_URL}
Time: {timestamp}
Element monitored: {CSS_SELECTOR}
Please visit the website to view the changes.
"""
send_email_notification(email_subject, email_body)
else:
print("Email notification skipped: No password configured")
else:
print("No changes detected")
check_entry["change_detected"] = False
# Save the hash if this is the first check
if not previous_hash:
hash_file.write_text(current_hash)
previous_hash = current_hash
except Exception as e:
print(f"Error checking website: {e}")
check_entry = {
"timestamp": timestamp,
"status": "error",
"error": str(e)
}
# Wait shorter time before retrying
print(f"Retrying in {RETRY_INTERVAL} seconds...")
time.sleep(RETRY_INTERVAL)
continue
# Update the log
log.append(check_entry)
# Keep only the last 100 entries
if len(log) > 100:
log = log[-100:]
log_file.write_text(json.dumps(log, indent=2))
# Wait for the next check
print(f"Next check in {CHECK_INTERVAL} seconds...")
time.sleep(CHECK_INTERVAL)
def get_content_hash(url, selector, max_attempts=MAX_ATTEMPTS):
"""Get a hash of the content from a specific part of a webpage."""
for attempt in range(max_attempts):
try:
# Open the website
page = browser.goto(url)
# Wait for the content to load
page.wait_for_selector(selector, state="visible", timeout=30000)
# Get the text content
element = page.locator(selector)
content = element.text_content()
# Close the browser
browser.close_all_browsers()
# Generate hash of the content
return hashlib.md5(content.encode()).hexdigest()
except Exception as e:
print(f"Attempt {attempt+1}/{max_attempts} failed: {e}")
if attempt+1 < max_attempts:
# Wait before retrying
time.sleep(RETRY_INTERVAL)
else:
raise
def send_email_notification(subject, body):
"""Send an email notification."""
config = EMAIL_CONFIG
try:
# Create message
msg = MIMEMultipart()
msg['From'] = config["sender"]
msg['To'] = config["recipient"]
msg['Subject'] = subject
# Attach HTML content
msg.attach(MIMEText(body, 'html'))
# Connect to SMTP server
server = smtplib.SMTP(config["smtp_server"], config["smtp_port"])
server.starttls() # Secure the connection
server.login(config["username"], config["password"])
# Send email
server.send_message(msg)
server.quit()
print("Email notification sent successfully")
return True
except Exception as e:
print(f"Failed to send email: {e}")
return False