RPA / Robocorp

Data: monitor / Notify on changes

Notify on changes via e-mail

tasks.py
Copied!

import os
import hashlib
import time
import smtplib
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from datetime import datetime
from pathlib import Path
import json

from robocorp.tasks import task
from robocorp import browser

# Configuration - modify these values
WEBSITE_URL = "https://example.com/page-to-monitor"  # The URL to monitor
CSS_SELECTOR = "div.content"  # The specific part of the page to monitor
CHECK_INTERVAL = 3600  # Check every hour (in seconds)
RETRY_INTERVAL = 300  # If check fails, retry after 5 minutes
DATA_FOLDER = "output/monitoring"
MAX_ATTEMPTS = 3  # Maximum number of attempts per check

# Email configuration
EMAIL_CONFIG = {
    "smtp_server": "smtp.gmail.com",
    "smtp_port": 587,
    "username": "[email protected]",  # Update with your email
    "password": "",  # Use an app password for Gmail
    "sender": "[email protected]",  # Update with your email
    "recipient": "[email protected]",  # Update with recipient email
}

@task
def monitor_website_for_changes():
    """
    Monitor a website for changes and send email notifications when changes are detected.

    This robot:
    1. Checks a specific part of a webpage at regular intervals
    2. Detects if content has changed since the last check
    3. Sends an email notification when changes are found
    4. Keeps a log of previous checks
    """
    # Create data directory if it doesn't exist
    data_path = Path(DATA_FOLDER)
    data_path.mkdir(parents=True, exist_ok=True)
    hash_file = data_path / "previous_hash.txt"
    log_file = data_path / "monitoring_log.json"

    # Load previous hash if exists
    previous_hash = None
    if hash_file.exists():
        previous_hash = hash_file.read_text().strip()
        print(f"Loaded previous hash: {previous_hash}")

    # Initialize or load log
    log = []
    if log_file.exists():
        try:
            log = json.loads(log_file.read_text())
        except json.JSONDecodeError:
            print("Error reading log file, starting with empty log")

    while True:
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        print(f"Checking website at {timestamp}")

        # Check if content has changed
        try:
            current_hash = get_content_hash(WEBSITE_URL, CSS_SELECTOR)

            # Log the check
            check_entry = {
                "timestamp": timestamp,
                "status": "success",
                "hash": current_hash,
            }

            # Compare with previous hash
            if previous_hash and current_hash != previous_hash:
                print("Content change detected!")
                check_entry["change_detected"] = True

                # Save the new hash
                hash_file.write_text(current_hash)

                # Send notification email
                if EMAIL_CONFIG["password"]:  # Only if password is set
                    email_subject = f"Website Change Detected: {WEBSITE_URL}"
                    email_body = f"""
                    
                    
                        

Website Change Detected

A change was detected on the monitored website.

URL: {WEBSITE_URL}

Time: {timestamp}

Element monitored: {CSS_SELECTOR}

Please visit the website to view the changes.

""" send_email_notification(email_subject, email_body) else: print("Email notification skipped: No password configured") else: print("No changes detected") check_entry["change_detected"] = False # Save the hash if this is the first check if not previous_hash: hash_file.write_text(current_hash) previous_hash = current_hash except Exception as e: print(f"Error checking website: {e}") check_entry = { "timestamp": timestamp, "status": "error", "error": str(e) } # Wait shorter time before retrying print(f"Retrying in {RETRY_INTERVAL} seconds...") time.sleep(RETRY_INTERVAL) continue # Update the log log.append(check_entry) # Keep only the last 100 entries if len(log) > 100: log = log[-100:] log_file.write_text(json.dumps(log, indent=2)) # Wait for the next check print(f"Next check in {CHECK_INTERVAL} seconds...") time.sleep(CHECK_INTERVAL) def get_content_hash(url, selector, max_attempts=MAX_ATTEMPTS): """Get a hash of the content from a specific part of a webpage.""" for attempt in range(max_attempts): try: # Open the website page = browser.goto(url) # Wait for the content to load page.wait_for_selector(selector, state="visible", timeout=30000) # Get the text content element = page.locator(selector) content = element.text_content() # Close the browser browser.close_all_browsers() # Generate hash of the content return hashlib.md5(content.encode()).hexdigest() except Exception as e: print(f"Attempt {attempt+1}/{max_attempts} failed: {e}") if attempt+1 < max_attempts: # Wait before retrying time.sleep(RETRY_INTERVAL) else: raise def send_email_notification(subject, body): """Send an email notification.""" config = EMAIL_CONFIG try: # Create message msg = MIMEMultipart() msg['From'] = config["sender"] msg['To'] = config["recipient"] msg['Subject'] = subject # Attach HTML content msg.attach(MIMEText(body, 'html')) # Connect to SMTP server server = smtplib.SMTP(config["smtp_server"], config["smtp_port"]) server.starttls() # Secure the connection server.login(config["username"], config["password"]) # Send email server.send_message(msg) server.quit() print("Email notification sent successfully") return True except Exception as e: print(f"Failed to send email: {e}") return False