首页 > 代码库 > atm+shopping

atm+shopping

技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang
import os,sys
BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(BASE_DIR)

from Atm.core import main

if __name__ == __main__:
    main.admin_run()
atm.bin.admin
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang
import os,sys


BASE_DIR = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(BASE_DIR)


from Atm.core import main
if __name__ == __main__:
    main.run()
atm.bin.user
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang

import os,sys
import logging



BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATABASE = {
    engine: file_storage,  # support mysql,postgresql in the future
    name: accounts,
    path: "%s\\db" % BASE_DIR
}
LOG_LEVEL = logging.INFO
LOG_TYPES = {
    transaction: transactions.log,
    access: access.log,
}


LOG_DATABASE = {
    engine: file_storage,  # support mysql,postgresql in the future
    name: accounts,
    path: "%s/log" % BASE_DIR
}

TRANSACTION_TYPE = {
    repay: {action: plus, interest: 0}, # 还款
    receive: {action: plus, interest: 0},   # 接收
    withdraw: {action: minus, interest: 0.05},  # 提款
    transfer: {action: minus, interest: 0.05},  # 转出
    pay: {action: minus, interest: 0},  # 支付
    save: {action: plus, interest: 0},  # 存钱

}
atm.conf.setting
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang
from Atm.conf import setting
from Atm.core import accounts
from Atm.core import logger



def make_transaction(log_obj,account_data ,type,amount,**others):
    #判断交易类型
    amount = float(amount)
    old_balance = account_data[balance]
    if type in setting.TRANSACTION_TYPE:
        interest = amount * setting.TRANSACTION_TYPE[type][interest]
        if setting.TRANSACTION_TYPE[type][action] == plus:
            new_balance = old_balance + amount + interest
        elif setting.TRANSACTION_TYPE[type][action] == minus:
            new_balance = old_balance - amount - interest
            if new_balance < 0:
                print(您的信用卡额度不足!当前余额为%s%old_balance)
                return
        account_data[balance] = new_balance
        accounts.dump_account(account_data)
        log_obj.info("account:%s   action:%s    amount:%s   interest:%s" %
                     (account_data[id], type, amount, interest))
        return account_data
atm.core.transaction
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang
import os,time,datetime,json
from Atm.core import auth
from Atm.core import logger
from Atm.core import transaction
from Atm.core import accounts


#交易日志
trans_logger = logger.logger(transaction)
#登录日志
access_logger = logger.logger(access)

#临时的账户数据
user_data = http://www.mamicode.com/{auth_id:None,
             auth_status:False,
             auth_data:None
             }

def account_info(user_data):
    #查看帐户信息
    print(\033[32;1m您的帐户信息为:\033[0m)
    for k in user_data[auth_data]:
        if k == password:
            continue
        else:
            print(\033[32;1m{}:{}\033[0m.format(k,user_data[auth_data].get(k)))

def repay(user_data):
    #还款
    account_data = http://www.mamicode.com/user_data[auth_data]
    cost = account_data[credit] - account_data[balance]
    balance_message = ‘‘‘--------当前帐户财产情况--------\n额度:%s\n剩余额度:%s\n当前应还:%s                  ‘‘‘                       % (account_data[credit], account_data[balance], cost)
    print(balance_message)
    tsg = False
    while not tsg:
        amount = input(请输入您的还款金额(按"b"返回):)
        if amount.isdigit():
            new_balance = transaction.make_transaction(trans_logger,account_data ,repay,amount)
            time.sleep(0.1)  # 处理显示问题
            if new_balance:
                print(‘‘‘\033[42;1m现在账户余额为%s\033[0m‘‘‘ % (new_balance[balance]))
        elif amount == b:
            tsg = True
        else:
            print(请输入正确的还款金额(数字)!)

def withdraw(user_data):
    #取款
    account_data = http://www.mamicode.com/user_data[auth_data]
    cost = account_data[credit] - account_data[balance]
    balance_message = ‘‘‘--------当前帐户财产情况--------\n额度:%s\n剩余额度:%s\n当前应还:%s                  ‘‘‘    %(account_data[credit],account_data[balance],cost)
    print(balance_message)
    tsg = False
    while not tsg:
        amount = input(请输入您的取款金额(按"b"返回):)
        if amount.isdigit():
            new_balance = transaction.make_transaction(trans_logger,account_data ,withdraw,amount)
            time.sleep(0.1)  # 处理显示问题
            if new_balance:
                print(‘‘‘\033[42;1m现在账户余额为%s\033[0m‘‘‘ % (new_balance[balance]))
        elif amount == b:
            tsg = True
        else:
            print(请输入正确的取款金额(数字)!)

def transfer(user_data):
    #转账
    account_data = http://www.mamicode.com/user_data[auth_data]
    cost = account_data[credit] - account_data[balance]
    balance_message = ‘‘‘--------当前帐户财产情况--------\n额度:%s\n剩余额度:%s\n当前应还:%s                  ‘‘‘                       % (account_data[credit], account_data[balance], cost)
    print(balance_message)
    tsg = False
    while not tsg:
        receiver = input(请输入被转账人id(按"b"返回):)
        if receiver == user_data["auth_id"]:  # 判断帐号是否为自己
            print("\033[31;1m用户为自己\033[0m")
            continue
        elif receiver == "b":
            break
        else:
            receiver_account_data = auth.acc_check(receiver)
            if receiver_account_data =http://www.mamicode.com/= False:
                print(用户不存在!)
                continue
            else:
                status = receiver_account_data[status]
                if status == 0:
                    amount = input(请输入您的转账金额(按"b"返回):)
                    if amount.isdigit():
                        new_balance = transaction.make_transaction(trans_logger,account_data ,transfer,amount)
                        transaction.make_transaction(trans_logger, receiver_account_data, receive,amount )
                        time.sleep(0.1)  # 处理显示问题
                        if new_balance:
                            print(‘‘‘转账成功,\033[42;1m现在账户余额为%s\033[0m‘‘‘ % (new_balance[balance]))
                    elif amount == b:
                        tsg = True
                    else:
                        print(请输入正确的金额(数字)!)

def save(user_data):
    #存款
    account_data = http://www.mamicode.com/user_data[auth_data]
    cost = account_data[credit] - account_data[balance]
    balance_message = ‘‘‘--------当前帐户财产情况--------\n额度:%s\n剩余额度:%s\n当前应还:%s                  ‘‘‘                       % (account_data[credit], account_data[balance], cost)
    print(balance_message)
    tsg = False
    while not tsg:
        amount = input(请输入您的存款金额(按"b"返回):)
        if amount.isdigit():
            new_balance = transaction.make_transaction(trans_logger,account_data ,save,amount)
            time.sleep(0.1)  # 处理显示问题
            if new_balance:
                print(‘‘‘\033[42;1m现在账户余额为%s\033[0m‘‘‘ % (new_balance[balance]))
        elif amount == b:
            tsg = True
        else:
            print(请输入正确的存款金额(数字)!)

def logout(user_data):
    #用户登出
    exit(用户%s退出成功!%user_data[auth_id])

def sign_up(user_data):
    #用户注册界面
    print(----------欢迎来到用户注册界面!---------)
    auth.acc_sign_up()

def lock(user_data):
    #锁定或者解锁用户
    while True:
        acc_id = input(请输入您要修改的用户id(按“b”返回):)
        if acc_id == b:
            print(返回成功!)
            break
        elif accounts.exist_account(acc_id):
            acc_data = accounts.load_account(acc_id)
            now = datetime.datetime.now()
            times = datetime.timedelta(days=3650)
            exp_time_stamp = time.mktime(time.strptime(acc_data[expire_date], %Y-%m-%d))
            if time.time() > exp_time_stamp:
                    lock_type = input(此帐户已经锁定!是否解锁(Y/N):)
                    if lock_type == Y:
                        acc_data[expire_date] = (now + times).strftime(%Y-%m-%d)
                        accounts.dump_account(acc_data)
                        print(解锁成功!)
                    else:
                        print(未进行任何操作)
            if time.time() < exp_time_stamp:
                    lock_type = input(此帐户正常!是否锁定(Y/N):)
                    if lock_type == Y:
                        acc_data[expire_date] = (now - times).strftime(%Y-%m-%d)
                        accounts.dump_account(acc_data)
                        print(锁定成功!)
                    else:
                        print(未进行任何操作)
        else:
            print(用户不存在!)

def pay_log(user_data):
    #打印消费流水
    while True:
        base_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
        auth_id = user_data[auth_id]
        file = %s\\shopping\\log\\%s_shopping.log % (base_dir, auth_id)
        if os.path.exists(file):
            with open(file,r) as f:
                print(该用户流水账单如下:)
                print(------------------------------)
                print(f.read())
                print(------------------------------)
        else:
            print(该用户没有消费记录!)
        choise = input(按“b”返回:)
        if choise == q:
            break

def improve(user_data):
    #调整额度
    while True:
        acc_id = input(请输入您要调整额度的用户id(按“b”返回):)
        if acc_id == b:
            print(返回成功!)
            break
        elif accounts.exist_account(acc_id):
            acc_data = accounts.load_account(acc_id)
            credit = input(请输入需要调整的额度:)
            if credit.isdigit():
                acc_data[credit] = int(credit)
                accounts.dump_account(acc_data)
                print(修改成功!)
            else:
                print(请输入数字!)
        else:
            print(用户不存在!)


@auth.login_auth  #装饰器,验证是否为普通用户
def interactive(user_data):
    #普通用户菜单

    menu = u‘‘‘
        ------- ATM---------\033[32;1m
        1.  账户信息
        2.  还款
        3.  取款
        4.  转账
        5.  存款
        6.  消费流水
        7.  退出
        \033[0m‘‘‘
    menu_dic = {
        1: account_info,
        2: repay,
        3: withdraw,
        4: transfer,
        5: save,
        7: logout,
        6: pay_log
    }
    exit_flag = False
    while not exit_flag:
        print(menu)
        user_choise = input(>>:).strip()
        if user_choise in menu_dic:
            menu_dic[user_choise](user_data)
        else:
            print("选择不存在")



@auth.login_auth_admin   #装饰器,验证是否为管理员用户
def admin_interactive(user_data):
    #管理员菜单
    menu = u‘‘‘
            ------- 管理员模式---------\033[32;1m
            1.  账户信息
            2.  注册用户
            3.  解锁以及锁定用户
            4.  调整额度
            5.  退出
            \033[0m‘‘‘
    menu_dic = {
        1: account_info,
        2: sign_up,
        3: lock,
        4: improve,
        5: logout,
    }
    exit_flag = False
    while not exit_flag:
        print(menu)
        user_choise = input(>>:).strip()
        if user_choise in menu_dic:
            menu_dic[user_choise](user_data)
        else:
            print("选择不存在")

def get_user_data():
    #进行用户认证
    account_data =http://www.mamicode.com/ auth.acc_login(user_data,access_logger)
    if user_data[auth_status]:
        user_data[auth_data] = account_data
        return user_data
    else:
        return None

def run():
    #定义接口函数
    print(欢迎来到ATM!)
    user_data = get_user_data()
    interactive(user_data)

def admin_run():
    #定义管理员接口函数
    print(欢迎管理员登录ATM!)
    user_data = get_user_data()
    admin_interactive(user_data)
atm.core.main
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang

import logging
from Atm.conf import setting

def logger(log_type):
    # create logger
    logger = logging.getLogger(log_type)
    logger.setLevel(setting.LOG_LEVEL)

    # create console handler and set level to debug
    ch = logging.StreamHandler()
    ch.setLevel(setting.LOG_LEVEL)

    # create file handler and set level to warning
    log_file = "%s\log\%s" % (setting.BASE_DIR, setting.LOG_TYPES[log_type])
    fh = logging.FileHandler(log_file)
    fh.setLevel(setting.LOG_LEVEL)
    # create formatter
    formatter = logging.Formatter(%(asctime)s - %(name)s - %(levelname)s - %(message)s)

    # add formatter to ch and fh
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)

    # add ch and fh to logger
    logger.addHandler(ch)
    logger.addHandler(fh)

    return logger
atm.core.logger
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang
import os,json
from Atm.conf import setting


def file_execute(sql,**kwargs):
    #对用户进行认证,看用户名是否存在在文件中
    conn_params = setting.DATABASE
    db_path = %s\\%s % (conn_params[path],conn_params[name])
    sql = sql.split(where)
    if sql[0].strip().startswith(select) and len(sql) > 1:
        column,val = sql[1].strip().split(=)
        if column == username:
            account_file = %s\\%s.json %(db_path,val)
            if os.path.isfile(account_file):
                with open(account_file,r) as f:
                    account_data = json.load(f)
                    return account_data
            else:
                exit(用户不存在)




def file_db_handle(conn_params):
    return file_execute


def db_handler():
    #连接数据
    conn_params = setting.DATABASE
    if conn_params[engine] == file_storage:
        return file_db_handle(conn_params)
atm.core.dbhandler
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang

import json,time,os
from Atm.core import db_handler
from Atm.conf import setting
from Atm.core import accounts





def login_auth(func):
    #判断是否为普通用户
    def wapper(user_data):
        if user_data[auth_data][status] == 0:
            func(user_data)
        else:
            print(您是管理员用户,请重新登录!)
    return wapper
def login_auth_admin(func):
    #判断是否为管理员用户
    def wapper(user_data):
        if user_data[auth_data][status] == 1:
            func(user_data)
        else:
            print(%s您好,您不是管理员用户请重新登录!%user_data[auth_id])
    return wapper



def acc_count(username,password):
    #验证登录
    db_api = db_handler.db_handler()
    data = db_api(select * from accounts where username=%s%username)
    if data[password] == password:
        exp_time_stamp = time.mktime(time.strptime(data[expire_date],%Y-%m-%d))
        if time.time() > exp_time_stamp:
            print(账户过期)
        else:
            return data
    else:
        print(用户名密码错误,请重新输入!)

def acc_check(id):
    #检查用户信息是否存在,存在返回数据,不存在返回False
    conn_params = setting.DATABASE
    db_path = %s\\%s % (conn_params[path], conn_params[name])
    account_file = %s\\%s.json % (db_path, id)
    if os.path.isfile(account_file):
        with open(account_file, r) as f:
            account_data = json.load(f)
        return account_data
    else:
        return False






def acc_login(user_data,log_obj):
    #定义acc_Login函数,判断用户是否登录过
    retry_count =0
    while user_data[auth_status] is not True and retry_count <3 :
        username = input(请输入用户名:).strip()
        password = input(请输入密码:).strip()
        acc_data = acc_count(username,password)
        if acc_data:
            user_data[auth_status] = True
            user_data[auth_id] = username
            print(登录成功!)
            log_obj.info("account [%s] login success" % username)
            return acc_data
        retry_count +=1
    else:
        log_obj.error("account [%s] too many login attempts" % username)
        exit()


def acc_sign_up():
    retry_count = 0
    tag = False

    while not tag:
        username = input(请输入用户名(按b返回):).strip()
        if accounts.exist_account(username):
            print(用户已存在)
        elif username == b:
            break

        elif username != ‘‘:
            password = input(请输入密码:).strip()
            account_data  = {"enroll_date": time.strftime("%Y-%m-%d"), "password": password, "id": username,
                             "credit": 15000,"status": 0, "balance": 15000.0, "expire_date": "2021-01-01",
                             "pay_day": 22}
            accounts.dump_account(account_data)
            print(用户%s创建成功!%username)
            tag = True
        else:
            print(请不要输入空值!)
atm.core.auth
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang

import json,os
from Atm.conf import setting

def exist_account(id):
    #判断永辉是否存在
    conn_params = setting.DATABASE
    db_path = %s\\%s % (conn_params[path], conn_params[name])
    account_file = %s\\%s.json % (db_path, id)
    if os.path.exists(account_file):
        return True



def load_account(id):
    #读取文件
    conn_params = setting.DATABASE
    db_path = %s\\%s % (conn_params[path], conn_params[name])
    account_file = %s\\%s.json % (db_path, id)
    with open(account_file,r) as f:
        acc_data = json.load(f)
    return acc_data



def dump_account(account_data):
    #写入文件
    conn_params = setting.DATABASE
    db_path = %s\\%s % (conn_params[path], conn_params[name])
    account_file = %s\\%s.json % (db_path, account_data[id])
    with open(account_file,w) as f:
        acc_data = json.dump(account_data, f)
    return True
atm.core.accounts
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang


import os,sys


from Atm.core import accounts
def money(id):
    #用户存在,返回信用卡可用额度
    if accounts.exist_account(id):
        money = accounts.load_account(id)[balance]
        return money
    else:
        return False

def password(id):
    #用户存在返回用户密码
    if accounts.exist_account(id):
        password = accounts.load_account(id)[password]
        return password
    else:
        return False


def balance(id,money):
    #写入新额度
    acc_data =http://www.mamicode.com/ accounts.load_account(id)
    acc_data[balance] = money
    accounts.dump_account(acc_data)
atm.pay_api.pay_api
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang
import os,sys

bash_dir = os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
sys.path.append(bash_dir)
from shopping.core import main



if __name__ == __main__:
    main.run()
shopping.bin.shop
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang
import os,logging

BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
DATABASE = {
    engine: file_storage,
    name: accounts,
    path: "%s\\db" % BASE_DIR
}



LOG_LEVEL = logging.INFO
LOG_TYPES = {
    shopping: shopping.log
}
shopping.conf.setting
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang

import json,os
from shopping.conf import setting

def exist_account(id):
    conn_params = setting.DATABASE
    db_path = %s\\%s % (conn_params[path], conn_params[name])
    account_file = %s\\%s.json % (db_path, id)
    if os.path.exists(account_file):
        return True



def load_account(id):
    conn_params = setting.DATABASE
    db_path = %s\\%s % (conn_params[path], conn_params[name])
    account_file = %s\\%s.json % (db_path, id)
    with open(account_file,r) as f:
        acc_data = json.load(f)
    return acc_data



def dump_account(account_data):
    conn_params = setting.DATABASE
    db_path = %s\\%s % (conn_params[path], conn_params[name])
    account_file = %s\\%s.json % (db_path, account_data[user_id])
    with open(account_file,w) as f:
        acc_data = json.dump(account_data, f)
    return True
shopping.core.shopping_accounts
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang
from Atm.pay_api import pay_api
from shopping.core import shopping_accouts
from shopping.core import logger




shopping_list = []
user_data = {
    auth_id: None,
    money: None,
    auth_goods:None
}


def recory():
    user_id = user_data[auth_id]
    if shopping_accouts.exist_account(user_id):
        acc_data = shopping_accouts.load_account(user_id)
        user_data[auth_goods] = acc_data[goods]
        ask = input(是否查询之前的消费记录?(Y/N):)
        if ask == Y:
            print(您之前购买的商品为%s%user_data[auth_goods])
    else:
        user_data[auth_goods] = []
    ask_two = input(是否进入购物?(Y/N):)
    if ask_two == Y:
        log_type = "shopping"
        shopping_logger = logger.logger(log_type, user_id)
        shopping(shopping_logger, user_data)
    else:
        exit(退出成功!)


def shopping(log_obj,user_data):
    salary = user_data[money]
    product_list = {
        1: [Iphone, 5800],
        2: [Mac Pro, 9800],
        3: [Bike, 800],
        4: [Watch, 10600],
        5: [Coffee, 31],
        6: [Alex Python, 120],
    }

    while True:
        print(--------------------------------)
        for k, v in product_list.items():  # 打印货物清单
            print(k, v)
        choise = input("请输入你要买的商品编号(按q返回登录界面):")
        if choise.isdigit():  # 将输入的数字转换为int类型
            goods = product_list[choise][0]
            value = product_list[choise][1]
            choise = int(choise)
            if choise <= len(product_list) and choise > 0:  # 当输入的数字在范围内
                shopping_list.append((goods,value))  # 添加到购物车
                if value <= salary:  # 判断价格是否大于工资
                    salary -= value  # 大于就减少工资

                    print(购买成功,您所购买的商品为%s,所剩下的余额为\033[51;1m%s\033[0m\n % (goods, salary))
                    log_obj.info("account:%s action:%s product_number:%s goods:%s cost:%s" %
                                 (user_data[auth_id], "shopping", choise, goods, value))
                else:
                    print(\033[21;1m你的钱不够,请选择其他商品。\033[0m\n)
            else:
                print(请输入商品编号!(按q返回登录界面))
        elif choise == q:  # 输入q打印商品名称,将清单添加到records文件中,返回登陆界面。
            print(以下是您买的商品)
            for i in shopping_list:
                user_data[money] = salary
                user_data[auth_goods].append(i)
                print(i)
                shopping_data = {user_id: user_data[auth_id], goods: user_data[auth_goods]}
                shopping_accouts.dump_account(shopping_data)
                pay_api.balance(user_data[auth_id],user_data[money])
            break
        else:
            print(请输入商品编号!(按q返回登录界面))

def interactive():
    #判断用户登录
    while True:
        username = input(请输入用户名(按"q"退出):)
        if username == q:
            print(退出成功!)
            break
        else:
            passwd = pay_api.password(username)
            if passwd == False:
                print(用户不存在,请协调管理员注册!)
            else:
                password = input(请输入密码:)
                if password  == passwd:
                    money = pay_api.money(username)
                    user_data[auth_id] = username
                    user_data[money] = money
                    print(\033[32;1m您的信用卡可用额度为:%s\033[0m % money)
                    print(登录成功!)
                    recory()
                else:
                    print(密码不正确!)


def run():
    print(\033[32;1m欢迎来到购物商城\033[0m.center(30,-))
    interactive()
shopping.core.main
技术分享
#!/usr/bin/env python
# coding=utf-8
#Author:yang

import logging
from shopping.conf import setting

def logger(log_type, *user_name):
    # create logger
    logger = logging.getLogger(log_type)
    logger.setLevel(setting.LOG_LEVEL)

    # create console handler and set level to debug
    ch = logging.StreamHandler()
    ch.setLevel(setting.LOG_LEVEL)
    # create file handler and set level to warning
    if user_name:
        log_file = "%s/log/%s_%s" % (setting.BASE_DIR, user_name[0], setting.LOG_TYPES[log_type])
    else:
        log_file = "%s/log/%s" % (setting.BASE_DIR, setting.LOG_TYPES[log_type])

    fh = logging.FileHandler(log_file)
    fh.setLevel(setting.LOG_LEVEL)
    # create formatter
    formatter = logging.Formatter(%(asctime)s - %(name)s - %(levelname)s - %(message)s)

    # add formatter to ch and fh
    ch.setFormatter(formatter)
    fh.setFormatter(formatter)

    # add ch and fh to logger
    logger.addHandler(ch)
    logger.addHandler(fh)

    return logger
shopping.core.logger

 

atm+shopping