# CG_FTP.py
# CG FTP - 跨平台 FTP/FTPS 客戶端(Tkinter 單檔 MVP、中文介面)
# 需求:Python 3.10+、tkinter(內建)、tkinterdnd2(用於外部拖曳)
# 安裝拖曳套件:python -m pip install tkinterdnd2
import os
import sys
import io
import ssl
import json
import time
import math
import queue
import shutil
import threading
import traceback
import pathlib
from dataclasses import dataclass, field
from typing import Optional, List, Tuple, Dict
import tkinter as tk
from tkinter import ttk, messagebox, simpledialog, filedialog
# --- 可選:拖曳支援 ---
HAS_DND = False
try:
from tkinterdnd2 import DND_FILES, TkinterDnD
HAS_DND = True
except Exception:
HAS_DND = False
# --- 基本設定 ---
APP_NAME = "CG FTP"
SETTINGS_DIR = os.path.join(os.path.expanduser("~"), ".cg_ftp")
SETTINGS_PATH = os.path.join(SETTINGS_DIR, "settings.json")
DEFAULT_SETTINGS = {
"theme": "dark", # dark / light
"sites": [], # [{"name","host","port","user","passive":true,"ftps":false,"remote_dir":"/"}]
"last_local_dir": str(pathlib.Path.home()),
"last_remote_dir": "/",
}
def ensure_settings():
os.makedirs(SETTINGS_DIR, exist_ok=True)
if not os.path.exists(SETTINGS_PATH):
with open(SETTINGS_PATH, "w", encoding="utf-8") as f:
json.dump(DEFAULT_SETTINGS, f, indent=2, ensure_ascii=False)
def load_settings() -> dict:
ensure_settings()
try:
with open(SETTINGS_PATH, "r", encoding="utf-8") as f:
data = json.load(f)
for k, v in DEFAULT_SETTINGS.items():
if k not in data:
data[k] = v
return data
except Exception:
return DEFAULT_SETTINGS.copy()
def save_settings(s: dict):
ensure_settings()
with open(SETTINGS_PATH, "w", encoding="utf-8") as f:
json.dump(s, f, indent=2, ensure_ascii=False)
def human_size(n: int) -> str:
if n is None:
return ""
units = ["B","KB","MB","GB","TB","PB"]
if n <= 0:
return "0 B"
i = min(int(math.log(n, 1024)), len(units)-1)
return f"{n/1024**i:.2f} {units[i]}"
def eta_str(seconds: float) -> str:
if seconds is None or seconds <= 0 or math.isinf(seconds):
return "--:--"
m, s = divmod(int(seconds), 60)
h, m = divmod(m, 60)
return f"{h:02d}:{m:02d}:{s:02d}" if h else f"{m:02d}:{s:02d}"
# --- FTP 包裝 ---
class FTPClient:
"""簡化的 FTP/FTPS 包裝,支援 MLSD、LIST 後備。"""
def __init__(self, logger):
self.ftp = None
self.is_ftps = False
self.logger = logger
self.encoding = "utf-8"
def log(self, msg: str):
if self.logger:
self.logger(msg)
def connect(self, host: str, port: int, user: str, password: str,
passive: bool=True, use_ftps: bool=False, timeout: int=30):
self.close()
self.is_ftps = use_ftps
if use_ftps:
from ftplib import FTP_TLS
# MVP:憑證驗證先放寬,之後可加嚴格選項
context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
self.ftp = FTP_TLS(timeout=timeout, context=context)
self.log(f"連線 FTPS {host}:{port}(Explicit TLS,驗證放寬)")
else:
from ftplib import FTP
self.ftp = FTP(timeout=timeout)
self.log(f"連線 FTP {host}:{port}")
self.ftp.connect(host, port)
self.log(self.ftp.getwelcome())
self.ftp.login(user=user, passwd=password)
if use_ftps:
self.ftp.prot_p()
self.ftp.set_pasv(passive)
try:
self.ftp.sendcmd("OPTS UTF8 ON")
self.encoding = "utf-8"
except Exception:
pass
self.log("登入成功。")
def close(self):
if self.ftp:
try:
self.ftp.quit()
except Exception:
try:
self.ftp.close()
except Exception:
pass
self.ftp = None
def pwd(self) -> str:
return self.ftp.pwd()
def cwd(self, path: str):
self.ftp.cwd(path)
def mlsd_supported(self) -> bool:
try:
self.ftp.sendcmd("OPTS MLST type;size;modify;perm;")
return True
except Exception:
return False
def listdir(self, path: Optional[str]=None) -> List[dict]:
"""回傳:[{name,is_dir,size,modify,perm}]"""
if path:
self.cwd(path)
entries = []
# 優先 MLSD
try:
for name, facts in self.ftp.mlsd():
is_dir = facts.get("type","") in ("dir","cdir","pdir")
size = int(facts.get("size", "0")) if not is_dir else None
modify = facts.get("modify")
perm = facts.get("perm")
if name in (".",".."):
continue
entries.append({
"name": name,
"is_dir": is_dir,
"size": size,
"modify": modify,
"perm": perm,
})
return entries
except Exception:
pass
# 後備 LIST(Unix-like)
buf = []
self.ftp.retrlines("LIST", buf.append)
for line in buf:
parts = line.split(maxsplit=8)
if len(parts) < 6:
continue
perms = parts[0]
is_dir = perms.startswith("d")
name = parts[-1]
size = None
try:
size = int(parts[4]) if not is_dir else None
except Exception:
pass
entries.append({
"name": name,
"is_dir": is_dir,
"size": size,
"modify": None,
"perm": perms,
})
return entries
def download_file(self, remote_path: str, local_path: str, progress_cb=None, stop_flag=None):
with open(local_path, "wb") as f:
def writer(chunk):
if stop_flag and stop_flag.is_set():
raise RuntimeError("已取消")
f.write(chunk)
if progress_cb:
progress_cb(len(chunk))
self.ftp.retrbinary(f"RETR {remote_path}", writer, blocksize=64*1024)
def upload_file(self, local_path: str, remote_path: str, progress_cb=None, stop_flag=None):
with open(local_path, "rb") as rf:
def cb(data):
if stop_flag and stop_flag.is_set():
raise RuntimeError("已取消")
if progress_cb:
progress_cb(len(data))
self.ftp.storbinary(f"STOR {remote_path}", rf, blocksize=64*1024, callback=cb)
def mkdir(self, name: str):
self.ftp.mkd(name)
def rmdir(self, path: str):
self.ftp.rmd(path) # 假設為空資料夾
def rename(self, old: str, new: str):
self.ftp.rename(old, new)
def delete(self, path: str):
self.ftp.delete(path)
# --- 傳輸佇列 ---
@dataclass
class TransferItem:
tid: int
direction: str # "上傳" / "下載"
src: str
dst: str
size: Optional[int] = None
transferred: int = 0
start_ts: float = field(default_factory=time.time)
status: str = "佇列中" # 佇列中/進行中/完成/錯誤/已取消
error: Optional[str] = None
stop_flag: threading.Event = field(default_factory=threading.Event)
class TransferManager:
def __init__(self, app, max_workers=2):
self.app = app
self.q = queue.Queue()
self.items: Dict[int, TransferItem] = {}
self.lock = threading.Lock()
self.workers = []
self.max_workers = max_workers
self.running = True
for _ in range(max_workers):
t = threading.Thread(target=self.worker, daemon=True)
t.start()
self.workers.append(t)
def add(self, item: TransferItem):
with self.lock:
self.items[item.tid] = item
self.q.put(item.tid)
self.app.on_transfer_updated(item.tid)
def stop(self):
self.running = False
for _ in self.workers:
self.q.put(None)
def worker(self):
while self.running:
tid = self.q.get()
if tid is None:
break
item = self.items.get(tid)
if not item:
continue
try:
item.status = "進行中"
self.app.on_transfer_updated(tid)
def progress(delta):
item.transferred += delta
# 節流更新由 UI 定時器做總覽,這裡即時刷新單列
self.app.on_transfer_updated(tid)
if item.direction == "下載":
self.app.ftp_client.download_file(item.src, item.dst, progress_cb=progress, stop_flag=item.stop_flag)
else:
self.app.ftp_client.upload_file(item.src, item.dst, progress_cb=progress, stop_flag=item.stop_flag)
item.status = "完成"
self.app.on_transfer_updated(tid)
except Exception as e:
item.status = "已取消" if item.stop_flag.is_set() else "錯誤"
item.error = str(e)
self.app.log(f"[錯誤] 傳輸 {item.direction} {item.src} -> {item.dst}: {e}")
self.app.on_transfer_updated(tid)
def totals(self) -> Tuple[float, float, float]:
"""回傳 (總速度B/s, 剩餘位元組數, 預估總ETA秒)"""
now = time.time()
total_speed = 0.0
remaining = 0.0
for it in self.items.values():
if it.status == "進行中":
elapsed = max(0.5, now - it.start_ts)
size = it.size or 0
speed = it.transferred / elapsed
total_speed += speed
if size > 0:
remaining += max(0, size - it.transferred)
eta = remaining / total_speed if total_speed > 0 else math.inf
return total_speed, remaining, eta
# --- 連線對話框(修正:使用 self.parent 取設定) ---
class ConnectDialog(simpledialog.Dialog):
def __init__(self, parent):
self.parent = parent # 關鍵:保存 App 參考
super().__init__(parent, title="連線到伺服器")
def body(self, master):
s = self.parent.settings # ← 修正點:不要用 self.master.master
last = s["sites"][0] if s["sites"] else {
"host":"", "port":21, "user":"", "passive":True, "ftps":False, "remote_dir":"/"
}
ttk.Label(master, text="主機位址:").grid(row=0, column=0, sticky="e", padx=4, pady=4)
self.e_host = ttk.Entry(master, width=32)
self.e_host.insert(0, last.get("host",""))
self.e_host.grid(row=0, column=1, sticky="w")
ttk.Label(master, text="連接埠:").grid(row=1, column=0, sticky="e", padx=4, pady=4)
self.e_port = ttk.Entry(master, width=8)
self.e_port.insert(0, str(last.get("port",21)))
self.e_port.grid(row=1, column=1, sticky="w")
ttk.Label(master, text="帳號:").grid(row=2, column=0, sticky="e", padx=4, pady=4)
self.e_user = ttk.Entry(master, width=24)
self.e_user.insert(0, last.get("user",""))
self.e_user.grid(row=2, column=1, sticky="w")
ttk.Label(master, text="密碼:").grid(row=3, column=0, sticky="e", padx=4, pady=4)
self.e_pass = ttk.Entry(master, width=24, show="*")
self.e_pass.grid(row=3, column=1, sticky="w")
self.var_passive = tk.BooleanVar(value=last.get("passive", True))
self.var_ftps = tk.BooleanVar(value=last.get("ftps", False))
ttk.Checkbutton(master, text="被動模式(PASV)", variable=self.var_passive).grid(row=4, column=1, sticky="w")
ttk.Checkbutton(master, text="FTPS(Explicit TLS)", variable=self.var_ftps).grid(row=5, column=1, sticky="w")
return self.e_host # 初始焦點
def apply(self):
self.result = {
"host": self.e_host.get().strip(),
"port": int(self.e_port.get().strip() or "21"),
"user": self.e_user.get().strip(),
"password": self.e_pass.get(),
"passive": self.var_passive.get(),
"ftps": self.var_ftps.get(),
}
# --- 主應用 ---
class App(TkinterDnD.Tk if HAS_DND else tk.Tk):
def __init__(self):
super().__init__()
self.title(APP_NAME)
self.geometry("1200x720")
self.minsize(1000, 600)
self.settings = load_settings()
self.theme = self.settings.get("theme", "dark")
self.ftp_client = FTPClient(logger=self.log)
self.transfer_mgr = TransferManager(self, max_workers=2)
self._tid_counter = 0
self._build_style()
self._build_menu()
self._build_toolbar()
self._build_panes()
self._build_bottom()
self._apply_theme(recursive=True)
self.after(500, self._tick_totals)
self.protocol("WM_DELETE_WINDOW", self.on_close)
self.refresh_local()
# --- 樣式 / 主題 ---
def _build_style(self):
self.style = ttk.Style(self)
self.dark_colors = {
"bg": "#1e1e1e", "fg": "#e6e6e6", "sel": "#264f78",
"border": "#2a2a2a", "accent": "#3a7bd5", "textbg": "#1e1e1e"
}
self.light_colors = {
"bg": "#ffffff", "fg": "#000000", "sel": "#cde6ff",
"border": "#d9d9d9", "accent": "#2b7cd3", "textbg": "#ffffff"
}
def _apply_theme(self, recursive=False):
c = self.dark_colors if self.theme == "dark" else self.light_colors
self.configure(bg=c["bg"])
# 通用元件
self.style.configure("TFrame", background=c["bg"])
self.style.configure("TLabel", background=c["bg"], foreground=c["fg"])
self.style.configure("TButton", padding=4)
self.style.configure("TEntry", fieldbackground=c["bg"], foreground=c["fg"])
self.style.configure("Treeview",
background=c["bg"], foreground=c["fg"],
fieldbackground=c["bg"])
self.style.configure("TNotebook", background=c["bg"])
self.style.configure("TNotebook.Tab", background=c["bg"], foreground=c["fg"])
self.style.configure("Horizontal.TProgressbar", background=c["accent"])
# Text
if hasattr(self, "log_txt"):
try:
self.log_txt.configure(bg=c["textbg"], fg=c["fg"], insertbackground=c["fg"])
except Exception:
pass
# 遞迴套色(保險)
if recursive:
def apply_bg(widget):
try:
widget.configure(bg=c["bg"])
except Exception:
pass
for ch in widget.winfo_children():
apply_bg(ch)
apply_bg(self)
def toggle_theme(self):
self.theme = "light" if self.theme == "dark" else "dark"
self.settings["theme"] = self.theme
save_settings(self.settings)
self._apply_theme(recursive=True)
# --- 功能表 / 工具列 ---
def _build_menu(self):
menubar = tk.Menu(self)
filem = tk.Menu(menubar, tearoff=0)
filem.add_command(label="連線…", command=self.on_connect)
filem.add_separator()
filem.add_command(label="結束", command=self.on_close)
menubar.add_cascade(label="檔案", menu=filem)
viewm = tk.Menu(menubar, tearoff=0)
viewm.add_command(label="切換主題(深/淺)", command=self.toggle_theme)
viewm.add_command(label="重新整理(本機/遠端)", command=self.refresh_both)
menubar.add_cascade(label="檢視", menu=viewm)
helpm = tk.Menu(menubar, tearoff=0)
helpm.add_command(label="關於", command=self.show_about)
menubar.add_cascade(label="說明", menu=helpm)
self.config(menu=menubar)
def _build_toolbar(self):
tb = ttk.Frame(self)
tb.pack(side="top", fill="x")
btns = [
("連線", self.on_connect),
("重新整理", self.refresh_both),
("返回上一層(本機)", self.local_up_dir),
("返回上一層(遠端)", self.remote_up_dir),
("新資料夾(本機)", self.mkdir_local),
("新資料夾(遠端)", self.mkdir_remote),
("刪除", self.delete_selected),
("重新命名", self.rename_selected),
]
for text, cmd in btns:
ttk.Button(tb, text=text, command=cmd).pack(side="left", padx=2, pady=2)
ttk.Label(tb, text=f"拖曳:{'啟用' if HAS_DND else '未啟用'}").pack(side="right", padx=8)
def _build_panes(self):
sp = ttk.Panedwindow(self, orient="horizontal")
sp.pack(side="top", fill="both", expand=True)
# 本機
lf = ttk.Frame(sp)
sp.add(lf, weight=1)
ttk.Label(lf, text="本機").pack(anchor="w")
self.local_path_var = tk.StringVar(value=self.settings.get("last_local_dir", str(pathlib.Path.home())))
p = ttk.Frame(lf)
p.pack(fill="x")
ttk.Entry(p, textvariable=self.local_path_var).pack(side="left", fill="x", expand=True)
ttk.Button(p, text="前往", command=self.refresh_local).pack(side="left", padx=4)
cols = ("name","size","type","modified")
self.local_tv = ttk.Treeview(lf, columns=cols, show="headings", selectmode="extended")
headers = {"name":"名稱","size":"大小","type":"類型","modified":"修改時間"}
for c in cols:
self.local_tv.heading(c, text=headers[c])
self.local_tv.column(c, width=160 if c=="name" else 110, anchor="w")
self.local_tv.pack(fill="both", expand=True)
self.local_tv.bind("<Double-1>", lambda e: self.on_local_double())
self.local_tv.bind("<Button-3>", self.on_local_context)
if HAS_DND:
self.local_tv.drop_target_register(DND_FILES)
self.local_tv.dnd_bind("<<Drop>>", self.on_external_drop_local)
# 遠端
rf = ttk.Frame(sp)
sp.add(rf, weight=1)
ttk.Label(rf, text="遠端").pack(anchor="w")
self.remote_path_var = tk.StringVar(value="/")
rp = ttk.Frame(rf)
rp.pack(fill="x")
ttk.Entry(rp, textvariable=self.remote_path_var, state="readonly").pack(side="left", fill="x", expand=True)
ttk.Button(rp, text="前往", command=self.refresh_remote).pack(side="left", padx=4)
self.remote_tv = ttk.Treeview(rf, columns=cols, show="headings", selectmode="extended")
for c in cols:
self.remote_tv.heading(c, text=headers[c])
self.remote_tv.column(c, width=160 if c=="name" else 110, anchor="w")
self.remote_tv.pack(fill="both", expand=True)
self.remote_tv.bind("<Double-1>", lambda e: self.on_remote_double())
self.remote_tv.bind("<Button-3>", self.on_remote_context)
if HAS_DND:
self.remote_tv.drop_target_register(DND_FILES)
self.remote_tv.dnd_bind("<<Drop>>", self.on_external_drop_remote)
def _build_bottom(self):
nb = ttk.Notebook(self)
nb.pack(side="bottom", fill="both", pady=2, ipady=2)
# 傳輸
tf = ttk.Frame(nb)
nb.add(tf, text="傳輸佇列")
cols = ("id","direction","name","size","progress","speed","eta","status")
self.tx_tv = ttk.Treeview(tf, columns=cols, show="headings", height=8)
headers = {
"id":"編號","direction":"方向","name":"名稱","size":"大小",
"progress":"進度","speed":"速度","eta":"剩餘時間","status":"狀態"
}
for c in cols:
self.tx_tv.heading(c, text=headers[c])
self.tx_tv.column(c, width=100 if c!="name" else 240, anchor="w")
self.tx_tv.pack(fill="both", expand=True)
bar = ttk.Frame(tf)
bar.pack(fill="x")
self.total_speed_var = tk.StringVar(value="總速度:0 B/s")
self.total_eta_var = tk.StringVar(value="總剩餘時間:--:--")
ttk.Label(bar, textvariable=self.total_speed_var).pack(side="left", padx=8)
ttk.Label(bar, textvariable=self.total_eta_var).pack(side="left", padx=16)
ttk.Button(bar, text="取消所選", command=self.cancel_selected_transfers).pack(side="right", padx=8)
# 日誌
lf = ttk.Frame(nb)
nb.add(lf, text="日誌")
self.log_txt = tk.Text(lf, height=10, wrap="none")
self.log_txt.pack(fill="both", expand=True)
# --- 日誌 ---
def log(self, msg: str):
ts = time.strftime("%H:%M:%S")
self.log_txt.insert("end", f"[{ts}] {msg}\n")
self.log_txt.see("end")
# --- 關於 ---
def show_about(self):
# TODO:把你記事本「關於」的內容貼到此處(保留第一行 CG FTP 也可)
content = (
"CG FTP\n"
"版本:1.0.0\n"
"作者:Kelvin Huang \n"
"說明:FTP/FTPS 客戶端,提供檔案拖曳、顯示佇列與進度功能。\n"
)
messagebox.showinfo("關於", content)
# --- 連線 / 刷新 ---
def on_connect(self):
dlg = ConnectDialog(self)
if not getattr(dlg, "result", None):
return
info = dlg.result
try:
self.ftp_client.connect(
host=info["host"], port=info["port"], user=info["user"],
password=info["password"], passive=info["passive"], use_ftps=info["ftps"]
)
self.remote_path_var.set(self.ftp_client.pwd())
# 儲存最近站台
site = dict(info)
site["remote_dir"] = self.remote_path_var.get()
self.settings["sites"] = [site]
save_settings(self.settings)
self.refresh_remote()
self.log(f"已連線:{info['host']}(PASV={info['passive']} FTPS={info['ftps']})")
if info["ftps"]:
self.log("FTPS 憑證驗證於 MVP 放寬;後續版本可提供嚴格驗證選項。")
except Exception as e:
self.log(f"[錯誤] 連線失敗:{e}")
messagebox.showerror("連線失敗", str(e))
def refresh_both(self):
self.refresh_local()
self.refresh_remote()
def refresh_local(self):
path = self.local_path_var.get().strip() or str(pathlib.Path.home())
try:
p = pathlib.Path(path)
if not p.exists():
raise FileNotFoundError(path)
items = []
if p.parent != p:
items.append(("..", None, "資料夾", ""))
for ch in sorted(p.iterdir(), key=lambda x:(not x.is_dir(), x.name.lower())):
size = None if ch.is_dir() else ch.stat().st_size
mtime = time.strftime("%Y-%m-%d %H:%M", time.localtime(ch.stat().st_mtime))
items.append((ch.name, size, "資料夾" if ch.is_dir() else "檔案", mtime))
self.local_tv.delete(*self.local_tv.get_children())
for name, size, typ, mtime in items:
self.local_tv.insert("", "end", values=(name, "" if size is None else human_size(size), typ, mtime))
self.settings["last_local_dir"] = str(p)
save_settings(self.settings)
except Exception as e:
self.log(f"[錯誤] 本機刷新:{e}")
messagebox.showerror("本機錯誤", str(e))
def refresh_remote(self):
if not self.ftp_client.ftp:
return
try:
cur = self.ftp_client.pwd()
self.remote_path_var.set(cur)
items = self.ftp_client.listdir()
rows = [("..", "", "資料夾", "")]
for it in sorted(items, key=lambda x:(not x["is_dir"], x["name"].lower())):
size = "" if it["is_dir"] else human_size(it["size"] or 0)
rows.append((it["name"], size, "資料夾" if it["is_dir"] else "檔案", it.get("modify","") or ""))
self.remote_tv.delete(*self.remote_tv.get_children())
for r in rows:
self.remote_tv.insert("", "end", values=r)
except Exception as e:
self.log(f"[錯誤] 遠端刷新:{e}")
messagebox.showerror("遠端錯誤", str(e))
# --- 導覽 ---
def on_local_double(self):
sel = self.local_tv.selection()
if not sel:
return
name = self.local_tv.item(sel[0], "values")[0]
cur = pathlib.Path(self.local_path_var.get())
newp = cur.parent if name == ".." else (cur / name)
if newp.is_dir():
self.local_path_var.set(str(newp))
self.refresh_local()
def on_remote_double(self):
sel = self.remote_tv.selection()
if not sel:
return
name = self.remote_tv.item(sel[0], "values")[0]
if name == "..":
try:
self.ftp_client.cwd("..")
self.refresh_remote()
except Exception as e:
self.log(f"[錯誤] 遠端返回上一層:{e}")
else:
try:
self.ftp_client.cwd(name)
self.refresh_remote()
except Exception:
self.download_selected_remote()
def local_up_dir(self):
try:
p = pathlib.Path(self.local_path_var.get()).parent
self.local_path_var.set(str(p))
self.refresh_local()
except Exception as e:
self.log(f"[錯誤] 本機上一層:{e}")
def remote_up_dir(self):
try:
self.ftp_client.cwd("..")
self.refresh_remote()
except Exception as e:
self.log(f"[錯誤] 遠端上一層:{e}")
# --- 右鍵選單 ---
def on_local_context(self, event):
menu = tk.Menu(self, tearoff=0)
menu.add_command(label="上傳到遠端", command=self.upload_selected_local)
menu.add_separator()
menu.add_command(label="新資料夾", command=self.mkdir_local)
menu.add_command(label="重新命名", command=self.rename_local)
menu.add_command(label="刪除", command=self.delete_local)
menu.tk_popup(event.x_root, event.y_root)
def on_remote_context(self, event):
menu = tk.Menu(self, tearoff=0)
menu.add_command(label="下載到本機", command=self.download_selected_remote)
menu.add_separator()
menu.add_command(label="新資料夾", command=self.mkdir_remote)
menu.add_command(label="重新命名", command=self.rename_remote)
menu.add_command(label="刪除", command=self.delete_remote)
menu.tk_popup(event.x_root, event.y_root)
# --- 本機操作 ---
def mkdir_local(self):
base = pathlib.Path(self.local_path_var.get())
name = simpledialog.askstring("新資料夾(本機)", "請輸入資料夾名稱:", parent=self)
if not name:
return
try:
(base / name).mkdir(parents=False, exist_ok=False)
self.refresh_local()
except Exception as e:
messagebox.showerror("本機新資料夾", str(e))
def rename_local(self):
sel = self.local_tv.selection()
if not sel:
return
name = self.local_tv.item(sel[0], "values")[0]
base = pathlib.Path(self.local_path_var.get())
new = simpledialog.askstring("重新命名(本機)", f"請輸入「{name}」的新名稱:", parent=self)
if not new:
return
try:
(base / name).rename(base / new)
self.refresh_local()
except Exception as e:
messagebox.showerror("本機重新命名", str(e))
def delete_local(self):
sels = self.local_tv.selection()
if not sels:
return
base = pathlib.Path(self.local_path_var.get())
names = [self.local_tv.item(s, "values")[0] for s in sels if self.local_tv.item(s, "values")[0] != ".."]
if not names:
return
if not messagebox.askyesno("刪除(本機)", f"確定刪除 {len(names)} 個項目?", parent=self):
return
for n in names:
p = base / n
try:
if p.is_dir():
shutil.rmtree(p)
else:
p.unlink()
except Exception as e:
self.log(f"[錯誤] 本機刪除 {n}:{e}")
self.refresh_local()
# --- 遠端操作 ---
def mkdir_remote(self):
name = simpledialog.askstring("新資料夾(遠端)", "請輸入資料夾名稱:", parent=self)
if not name:
return
try:
self.ftp_client.mkdir(name)
self.refresh_remote()
except Exception as e:
messagebox.showerror("遠端新資料夾", str(e))
def rename_remote(self):
sel = self.remote_tv.selection()
if not sel:
return
name = self.remote_tv.item(sel[0], "values")[0]
new = simpledialog.askstring("重新命名(遠端)", f"請輸入「{name}」的新名稱:", parent=self)
if not new:
return
try:
self.ftp_client.rename(name, new)
self.refresh_remote()
except Exception as e:
messagebox.showerror("遠端重新命名", str(e))
def delete_remote(self):
sels = self.remote_tv.selection()
if not sels:
return
names = [self.remote_tv.item(s, "values")[0] for s in sels if self.remote_tv.item(s, "values")[0] != ".."]
if not names:
return
if not messagebox.askyesno("刪除(遠端)", f"確定刪除 {len(names)} 個項目?", parent=self):
return
for n in names:
try:
try:
self.ftp_client.delete(n)
except Exception:
self.ftp_client.rmdir(n)
except Exception as e:
self.log(f"[錯誤] 遠端刪除 {n}:{e}")
self.refresh_remote()
# --- 上傳 / 下載 ---
def upload_selected_local(self):
if not self.ftp_client.ftp:
messagebox.showwarning("尚未連線", "請先連線到 FTP 伺服器。", parent=self)
return
sels = self.local_tv.selection()
if not sels:
return
base = pathlib.Path(self.local_path_var.get())
remote_dir = self.remote_path_var.get()
for s in sels:
name = self.local_tv.item(s, "values")[0]
if name == "..":
continue
src = base / name
if src.is_dir():
self._upload_dir(src, remote_dir)
else:
self._enqueue_upload_file(src, remote_dir)
def _enqueue_upload_file(self, src_path: pathlib.Path, remote_dir: str):
dst = f"{remote_dir.rstrip('/')}/{src_path.name}"
size = src_path.stat().st_size
self._tid_counter += 1
item = TransferItem(
tid=self._tid_counter, direction="上傳",
src=str(src_path), dst=dst, size=size
)
self.transfer_mgr.add(item)
def _upload_dir(self, src_dir: pathlib.Path, remote_dir: str):
try:
self.ftp_client.mkdir(src_dir.name)
except Exception:
pass
base_remote = f"{remote_dir.rstrip('/')}/{src_dir.name}"
for root, dirs, files in os.walk(src_dir):
rel = os.path.relpath(root, start=str(src_dir))
rdir = base_remote if rel=='.' else f"{base_remote}/{rel.replace(os.sep,'/')}"
try:
self.ftp_client.cwd(rdir)
except Exception:
try:
parts = rdir.strip("/").split("/")
self.ftp_client.cwd("/")
for p in parts:
try:
self.ftp_client.cwd(p)
except Exception:
self.ftp_client.mkdir(p)
self.ftp_client.cwd(p)
except Exception as e:
self.log(f"[錯誤] 遠端建立資料夾 {rdir}:{e}")
for fn in files:
self._enqueue_upload_file(pathlib.Path(root)/fn, rdir)
self.refresh_remote()
def download_selected_remote(self):
if not self.ftp_client.ftp:
return
sels = self.remote_tv.selection()
if not sels:
return
local_base = pathlib.Path(self.local_path_var.get())
for s in sels:
name, size, typ, _ = self.remote_tv.item(s, "values")
if name == "..":
continue
if typ == "資料夾":
self._download_dir(name, local_base)
else:
self._enqueue_download_file(name, local_base)
def _enqueue_download_file(self, remote_name: str, local_base: pathlib.Path):
local_path = local_base / remote_name
size = None
try:
size = int(self.ftp_client.ftp.size(remote_name))
except Exception:
pass
self._tid_counter += 1
item = TransferItem(
tid=self._tid_counter, direction="下載",
src=remote_name, dst=str(local_path), size=size
)
self.transfer_mgr.add(item)
def _download_dir(self, remote_dir: str, local_base: pathlib.Path):
dest = local_base / remote_dir
dest.mkdir(exist_ok=True)
try:
cur = self.ftp_client.pwd()
self.ftp_client.cwd(remote_dir)
items = self.ftp_client.listdir()
for it in items:
if it["name"] in (".",".."):
continue
if it["is_dir"]:
self._download_dir(it["name"], dest)
else:
self._enqueue_download_file(it["name"], dest)
self.ftp_client.cwd(cur)
except Exception as e:
self.log(f"[錯誤] 下載資料夾 {remote_dir}:{e}")
# --- 外部拖曳 ---
def on_external_drop_local(self, event):
# 目前不做「拖入到本機面板就複製到本地資料夾」,避免誤用
pass
def on_external_drop_remote(self, event):
if not self.ftp_client.ftp:
messagebox.showwarning("尚未連線", "請先連線到 FTP 伺服器。", parent=self)
return
remote_dir = self.remote_path_var.get()
paths = self._parse_dnd_paths(event.data)
for p in paths:
pth = pathlib.Path(p)
if pth.is_dir():
self._upload_dir(pth, remote_dir)
else:
self._enqueue_upload_file(pth, remote_dir)
self.refresh_remote()
def _parse_dnd_paths(self, data: str) -> List[str]:
out, buf, in_brace = [], "", False
for ch in data:
if ch == "{":
in_brace = True; buf = ""
elif ch == "}":
in_brace = False; out.append(buf); buf = ""
elif ch == " " and not in_brace:
if buf: out.append(buf); buf = ""
else:
buf += ch
if buf: out.append(buf)
return out
# --- 傳輸清單更新 / 總速率 ---
def on_transfer_updated(self, tid: int):
it = self.transfer_mgr.items.get(tid)
if not it:
return
node = None
for iid in self.tx_tv.get_children():
vals = self.tx_tv.item(iid, "values")
if vals and int(vals[0]) == tid:
node = iid
break
name = os.path.basename(it.src)
elapsed = max(0.1, time.time() - it.start_ts)
speed = it.transferred / elapsed if it.transferred and elapsed > 0 else 0.0
eta = math.inf
if it.size and it.size > 0 and speed > 0:
eta = (it.size - it.transferred)/speed
progress = f"{human_size(it.transferred)}/{human_size(it.size) if it.size else '?'}"
row = (
it.tid, it.direction, name,
human_size(it.size) if it.size else "",
progress,
f"{human_size(int(speed))}/s",
eta_str(eta),
it.status if not it.error else f"{it.status}: {it.error}"
)
if node is None:
self.tx_tv.insert("", "end", values=row)
else:
self.tx_tv.item(node, values=row)
def cancel_selected_transfers(self):
sels = self.tx_tv.selection()
for s in sels:
vals = self.tx_tv.item(s, "values")
if not vals:
continue
tid = int(vals[0])
it = self.transfer_mgr.items.get(tid)
if it and it.status in ("佇列中","進行中"):
it.stop_flag.set()
it.status = "已取消"
self.on_transfer_updated(tid)
def _tick_totals(self):
speed, remain, eta = self.transfer_mgr.totals()
self.total_speed_var.set(f"總速度:{human_size(int(speed))}/s")
self.total_eta_var.set(f"總剩餘時間:{eta_str(eta)}")
self.after(500, self._tick_totals)
# --- 工具列「依焦點」動作 ---
def delete_selected(self):
if self.focus_get() in (self.remote_tv,):
self.delete_remote()
else:
self.delete_local()
def rename_selected(self):
if self.focus_get() in (self.remote_tv,):
self.rename_remote()
else:
self.rename_local()
def on_close(self):
try:
self.transfer_mgr.stop()
self.ftp_client.close()
except Exception:
pass
self.destroy()
def main():
app = App()
app.title(APP_NAME) # 視窗標題:CG FTP
app.mainloop()
if __name__ == "__main__":
main()