403Webshell
Server IP : 172.67.134.114  /  Your IP : 104.23.197.122
Web Server : Apache/2.4.37
System : Linux almalinux.duckdns.org 4.18.0-553.111.1.el8_10.x86_64 #1 SMP Sun Mar 8 20:06:07 EDT 2026 x86_64
User : ricodeal ( 1046)
PHP Version : 7.4.33
Disable Function : NONE
MySQL : OFF  |  cURL : ON  |  WGET : ON  |  Perl : ON  |  Python : ON  |  Sudo : ON  |  Pkexec : ON
Directory :  /usr/local/bin/

Upload File :
current_dir [ Writeable ] document_root [ Writeable ]

 

Command :


[ Back ]     

Current File : /usr/local/bin/cloudflare-ddns
#!/usr/bin/env python
import argparse
import json
import os
import re
import requests
import stat
import subprocess
import configparser
from tld import get_fld

CONFIGURATION_FILE = os.path.expanduser('~/') + '.cloudflare-ddns'

EXTERNAL_IP_QUERY_API = 'https://api.ipify.org/?format=json'
CLOUDFLARE_ZONE_QUERY_API = 'https://api.cloudflare.com/client/v4/zones'  # GET
CLOUDFLARE_ZONE_DNS_RECORDS_QUERY_API = 'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records'  # GET
CLOUDFLARE_ZONE_DNS_RECORDS_UPDATE_API = 'https://api.cloudflare.com/client/v4/zones/{zone_id}/dns_records/{dns_record_id}'  # PATCH

# Backwards compatible with Python 2
try:
    input = raw_input
except NameError:
    pass


def load_arguments():
    """
    Arguments to the program.

    :return: An objects with argument name properties
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--configure', action='store_true', help='Interactively configure the account and domain for the DDNS updates.')
    parser.add_argument('--update-now', action='store_true', help='Update DNS records right now.')
    parser.add_argument('--debug', action='store_true', help='Print detailed debug output.')
    return parser.parse_args()


START_ARGS = load_arguments()


def load_configuration():
    """
    Loads the configuration file from disk.

    :return: A dictionary that either has all the keys read from the configuration file,
             or an empty dictionary if there was an error reading the file.
    """
    try:
        # Attempt to parse configuration file
        config = configparser.ConfigParser()
        config.read(CONFIGURATION_FILE)

        # Ensure all fields are present
        if 'domains' in config['Cloudflare DDNS']:
            if config['Cloudflare DDNS']['auth_type'] == 'token' and 'api_token' in config['Cloudflare DDNS']:
                return config['Cloudflare DDNS']
            elif config['Cloudflare DDNS']['auth_type'] == 'key' and all([key in config['Cloudflare DDNS'] for key in ['email', 'api_key']]):
                return config['Cloudflare DDNS']
        else:
            print('Configuration file {config_file} is missing parameters. Run cloudflare-ddns --configure'
                  ' to set the configuration.'.format(config_file=CONFIGURATION_FILE))
            return {}
    except KeyError:
        print('Configuration file {config_file} not found or invalid! Did you run cloudflare-ddns --configure?'
              .format(config_file=CONFIGURATION_FILE))
        return {}


def initialize_configuration():
    """
    Initializes the configuration file via an interactive shell.

    :return: None, but writes the data to CONFIGURATION_FILE
    """
    config = configparser.ConfigParser()
    config['Cloudflare DDNS'] = {}
    print('=============Configuring CloudFlare automatic DDNS update client=============')
    print('You may rerun this at any time with cloudflare-ddns --configure')
    print('Quit and cancel at any time with Ctrl-C\n')
    auth_type = input('Use API token or API key to authenticate?\nSee https://dash.cloudflare.com/profile/api-tokens'
                      ' for more info.\nEnter [T]oken or [K]ey: ')
    if auth_type.lower() == 't' or auth_type.lower() == 'token':
        config['Cloudflare DDNS']['auth_type'] = 'token'
        api_token = input('Enter the API token you created at https://dash.cloudflare.com/profile/api-tokens.\n'
                          'Required permissions are READ Account.Access: Organizations, Identity Providers, and Groups; '
                          'READ Zone.Zone; EDIT Zone.DNS\nExample: XhtcWvLmWBFGRW-YSK52WBghKtfC40rtuysLAETs\n'
                          'CloudFlare API token: ')
        config['Cloudflare DDNS']['api_token'] = api_token
    elif auth_type.lower() == 'k' or auth_type.lower() == 'key':
        config['Cloudflare DDNS']['auth_type'] = 'key'
        email = input('\nEnter the email address associated with your CloudFlare account.\nExample: [email protected]\nEmail: ')
        api_key = input('\nEnter the API key associated with your CloudFlare account. You can find your API key at '
                        'https://dash.cloudflare.com/profile/api-tokens\n'
                        'Example: 7d9dfl2fid74lsg50saa9j2dbqm67zn39v673\nCloudFlare API key: ')
        config['Cloudflare DDNS']['email'] = email
        config['Cloudflare DDNS']['api_key'] = api_key
    domains = input('Enter the domains for which you would like to automatically update the DNS records, '
                    'delimited by a single comma.\nExample: kevinlin.info,cloudflaremanager.com\n'
                    'Comma-delimited domains: ')
    config['Cloudflare DDNS']['domains'] = domains
    with open(CONFIGURATION_FILE, 'w') as config_file:
        config.write(config_file)
    os.chmod(CONFIGURATION_FILE, stat.S_IREAD | stat.S_IWRITE)
    print('\nConfiguration file written to {config_file} successfully.'.format(config_file=CONFIGURATION_FILE))


def get_external_ip():
    """
    Get the external IP of the network the script where the script is being executed.

    :return: A string representing the network's external IP address
    """
    try:
        r = requests.get(EXTERNAL_IP_QUERY_API, timeout=6)
        return r.json()['ip']
    except requests.exceptions.ConnectTimeout:
        print('Cannot fetch your external ip. {} not reachable.'.format(EXTERNAL_IP_QUERY_API))
        return
    except requests.exceptions.ConnectionError:
        print('Cannot fetch your external ip. {} not reachable.'.format(EXTERNAL_IP_QUERY_API))
        return
    except json.decoder.JSONDecodeError:
        print('Cannot fetch your external ip. Data provided by {} is invalid.\nRaw:\n{}'
              .format(EXTERNAL_IP_QUERY_API, r.text))
        return
    except KeyError:
        print('Cannot fetch your external ip. No ip provided by {}.\nRaw:\n{}'
              .format(EXTERNAL_IP_QUERY_API, r.text))
        return


def get_ipv6():
    """
    Based on: https://gist.github.com/corny/7a07f5ac901844bd20c9

    :return: A string representing one of the network's IPv6 addresses
    """
    inet6_finder = re.compile('^    inet6 ([0-9a-f:]+)')
    for line in subprocess.check_output(['ip', '-6', 'addr', 'list', 'scope', 'global', '-deprecated']).decode('utf-8').split('\n'):
        match = inet6_finder.match(line)
        if match is not None:
            # Multiple address might be present, assuming the first one is the best
            # Maybe add flag for preventing temporary addresses?
            return match.group(1)
    return None


def update_dns_record(auth, zone_id, record, ip_address):
    if record is None or ip_address is None:
        return
    print('Updating the {type} record (ID {dns_record_id}) of (sub)domain {subdomain} (ID {zone_id}) to {ip_address}.'
          .format(type=record['type'], dns_record_id=record['id'], zone_id=zone_id, subdomain=record['name'], ip_address=ip_address))
    if record['content'] == ip_address:
        print('DNS record is already up-to-date; taking no action')
        return
    update_resp = requests.patch(
        CLOUDFLARE_ZONE_DNS_RECORDS_UPDATE_API.format(zone_id=zone_id, dns_record_id=record['id']),
        headers=dict(list(auth.items()) + [('Content-Type', 'application/json')]),
        data=json.dumps({'content': ip_address}),
        timeout=6,
    )
    if update_resp.json().get('success'):
        print('DNS record updated successfully!')
    else:
        print('DNS record failed to update.\nCloudFlare returned the following errors: {errors}.\n\n'
              'CloudFlare returned the following messages: {messages}'.format(errors=update_resp.json()['errors'],
                                                                              messages=update_resp.json()['messages']))


def update_dns(subdomain, auth, ipv4_address, ipv6_address):
    """
    Updates the specified domain with the given IP address, given authentication parameters.

    :param subdomain: String representing domain to update
    :param auth: Dictionary of API authentication credentials
    :param ipv4_address: IPv4 address with which to update the A record
    :param ipv6_address: IPv6 address with which to update the AAAA record
    :return: None
    """
    # Extract the domain
    domain = get_fld(subdomain, fix_protocol=True)

    # Find the zone ID corresponding to the domain
    cur_page = 1
    zone_names_to_ids = {}
    print('Listing all zones.')
    while True:
        zone_resp = requests.get(CLOUDFLARE_ZONE_QUERY_API, headers=auth, timeout=6, params={'per_page': 50, 'page': cur_page})
        if zone_resp.status_code != 200:
            try:
                errors = [x.get("message") for x in zone_resp.json()["errors"]]
                print('===================================')
                print('Request failed.')
                print('Check your permissions and API key.')
                if START_ARGS.debug:
                    print('Status code: {}'.format(zone_resp.status_code))
                    print('Text: "{}"'.format(zone_resp.text))
                else:
                    print('Use --debug for more details')
                for e in [x for x in errors if x]:
                    print(e)  # this will print detailed info about missing permission etc.
                    # e.g. "requires permission 'com.cloudflare.api.account.zone.list' to list zones"
            except KeyError:
                print('Authentication error: make sure your email and API key are correct. '
                      'To set new values, run cloudflare-ddns --configure')
            return
        data = zone_resp.json()
        total_pages = data['result_info']['total_pages']
        for zone in data['result']:
            zone_names_to_ids[zone['name']] = zone['id']
        if cur_page < total_pages:
            cur_page += 1
        else:
            break

    if domain not in zone_names_to_ids:
        print('The domain {domain} doesn\'t appear to be one of your CloudFlare domains. We only found {domain_list}.'
              .format(domain=domain, domain_list=map(str, zone_names_to_ids.keys())))
        return
    zone_id = zone_names_to_ids[domain]

    # Find DNS records
    print('Finding all DNS records.')
    record_a = None
    record_aaaa = None
    r = requests.get(
            CLOUDFLARE_ZONE_DNS_RECORDS_QUERY_API.format(zone_id=zone_id),
            headers=auth,
            params={'name': subdomain},
            timeout=6,
    )
    if r.status_code != 200:
        try:
            errors = [x.get("message") for x in r.json()["errors"]]
            print('===================================')
            print('Request failed.')
            print('Check your permissions and API key.')
            if START_ARGS.debug:
                print('Status code: {}'.format(r.status_code))
                print('Text: "{}"'.format(r.text))
            else:
                print('Use --debug for more details')
            for e in [x for x in errors if x]:
                print(e)  # this will print detailed info about missing permission etc.
                # e.g. "requires permission 'com.cloudflare.api.account.zone.list' to list zones"
        except KeyError:
            print('Authentication error: make sure your email and API key are correct. To set new values, run cloudflare-ddns --configure')
        return
    for dns_record in r.json()['result']:
        if dns_record['type'] == 'A':
            record_a = dns_record
        elif dns_record['type'] == 'AAAA':
            record_aaaa = dns_record

    # Update the record as necessary
    update_dns_record(auth, zone_id, record_a, ipv4_address)
    update_dns_record(auth, zone_id, record_aaaa, ipv6_address)


def main():
    """
    Main program: either make the configuration file or update the DNS
    """
    if START_ARGS.configure:
        initialize_configuration()
    elif START_ARGS.update_now:
        config = load_configuration()
        if not config:
            raise RuntimeError('There was a problem with the configuration file {config_file}! '
                               'Try running cloudflare-ddns --configure'.format(config_file=CONFIGURATION_FILE))
        if config['auth_type'] == 'token':
            auth = {'Authorization': 'Bearer {token}'.format(token=config['api_token'])}
        elif config['auth_type'] == 'key':
            auth = {'X-Auth-Email': config['email'], 'X-Auth-Key': config['api_key']}
        else:
            raise RuntimeError('There was a problem with the configuration file {config_file}! '
                               'Try running cloudflare-ddns --configure'.format(config_file=CONFIGURATION_FILE))
        external_ip = get_external_ip()
        if external_ip:
            print('Found external IPv4: "{}"'.format(str(external_ip)))
        ipv6 = get_ipv6()
        if ipv6:
            print('Found external IPv6: "{}"'.format(str(ipv6)))
        try:
            domains = config['domains'].split(',')
        except KeyError:
            raise RuntimeError('There was a problem with the configuration file {config_file}! '
                               'Try running cloudflare-ddns --configure'.format(config_file=CONFIGURATION_FILE))
        for domain in domains:
            update_dns(domain, auth, external_ip, ipv6)
    else:
        print('No arguments passed; exiting.')
        print('Try cloudflare-ddns --help.')


if __name__ == '__main__':
    main()

Youez - 2016 - github.com/yon3zu
LinuXploit