123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181 |
- // Copyright 2021 The Gitea Authors. All rights reserved.
- // Use of this source code is governed by a MIT-style
- // license that can be found in the LICENSE file.
-
- package integrations
-
- import (
- "bytes"
- "fmt"
- "io"
- "mime/multipart"
- "net/http"
- "regexp"
- "strings"
- "testing"
-
- "code.gitea.io/gitea/models/db"
- "code.gitea.io/gitea/models/packages"
- "code.gitea.io/gitea/models/unittest"
- user_model "code.gitea.io/gitea/models/user"
- "code.gitea.io/gitea/modules/packages/pypi"
-
- "github.com/stretchr/testify/assert"
- )
-
- func TestPackagePyPI(t *testing.T) {
- defer prepareTestEnv(t)()
- user := unittest.AssertExistsAndLoadBean(t, &user_model.User{ID: 2}).(*user_model.User)
-
- packageName := "test-package"
- packageVersion := "1.0.1"
- packageAuthor := "KN4CK3R"
- packageDescription := "Test Description"
-
- content := "test"
- hashSHA256 := "9f86d081884c7d659a2feaa0c55ad015a3bf4f1b2b0b822cd15d6c15b0f00a08"
-
- root := fmt.Sprintf("/api/packages/%s/pypi", user.Name)
-
- uploadFile := func(t *testing.T, filename, content string, expectedStatus int) {
- body := &bytes.Buffer{}
- writer := multipart.NewWriter(body)
- part, _ := writer.CreateFormFile("content", filename)
- _, _ = io.Copy(part, strings.NewReader(content))
-
- writer.WriteField("name", packageName)
- writer.WriteField("version", packageVersion)
- writer.WriteField("author", packageAuthor)
- writer.WriteField("summary", packageDescription)
- writer.WriteField("description", packageDescription)
- writer.WriteField("sha256_digest", hashSHA256)
- writer.WriteField("requires_python", "3.6")
-
- _ = writer.Close()
-
- req := NewRequestWithBody(t, "POST", root, body)
- req.Header.Add("Content-Type", writer.FormDataContentType())
- req = AddBasicAuthHeader(req, user.Name)
- MakeRequest(t, req, expectedStatus)
- }
-
- t.Run("Upload", func(t *testing.T) {
- defer PrintCurrentTest(t)()
-
- filename := "test.whl"
- uploadFile(t, filename, content, http.StatusCreated)
-
- pvs, err := packages.GetVersionsByPackageType(db.DefaultContext, user.ID, packages.TypePyPI)
- assert.NoError(t, err)
- assert.Len(t, pvs, 1)
-
- pd, err := packages.GetPackageDescriptor(db.DefaultContext, pvs[0])
- assert.NoError(t, err)
- assert.NotNil(t, pd.SemVer)
- assert.IsType(t, &pypi.Metadata{}, pd.Metadata)
- assert.Equal(t, packageName, pd.Package.Name)
- assert.Equal(t, packageVersion, pd.Version.Version)
-
- pfs, err := packages.GetFilesByVersionID(db.DefaultContext, pvs[0].ID)
- assert.NoError(t, err)
- assert.Len(t, pfs, 1)
- assert.Equal(t, filename, pfs[0].Name)
- assert.True(t, pfs[0].IsLead)
-
- pb, err := packages.GetBlobByID(db.DefaultContext, pfs[0].BlobID)
- assert.NoError(t, err)
- assert.Equal(t, int64(4), pb.Size)
- })
-
- t.Run("UploadAddFile", func(t *testing.T) {
- defer PrintCurrentTest(t)()
-
- filename := "test.tar.gz"
- uploadFile(t, filename, content, http.StatusCreated)
-
- pvs, err := packages.GetVersionsByPackageType(db.DefaultContext, user.ID, packages.TypePyPI)
- assert.NoError(t, err)
- assert.Len(t, pvs, 1)
-
- pd, err := packages.GetPackageDescriptor(db.DefaultContext, pvs[0])
- assert.NoError(t, err)
- assert.NotNil(t, pd.SemVer)
- assert.IsType(t, &pypi.Metadata{}, pd.Metadata)
- assert.Equal(t, packageName, pd.Package.Name)
- assert.Equal(t, packageVersion, pd.Version.Version)
-
- pfs, err := packages.GetFilesByVersionID(db.DefaultContext, pvs[0].ID)
- assert.NoError(t, err)
- assert.Len(t, pfs, 2)
-
- pf, err := packages.GetFileForVersionByName(db.DefaultContext, pvs[0].ID, filename, packages.EmptyFileKey)
- assert.NoError(t, err)
- assert.Equal(t, filename, pf.Name)
- assert.True(t, pf.IsLead)
-
- pb, err := packages.GetBlobByID(db.DefaultContext, pf.BlobID)
- assert.NoError(t, err)
- assert.Equal(t, int64(4), pb.Size)
- })
-
- t.Run("UploadHashMismatch", func(t *testing.T) {
- defer PrintCurrentTest(t)()
-
- filename := "test2.whl"
- uploadFile(t, filename, "dummy", http.StatusBadRequest)
- })
-
- t.Run("UploadExists", func(t *testing.T) {
- defer PrintCurrentTest(t)()
-
- uploadFile(t, "test.whl", content, http.StatusBadRequest)
- uploadFile(t, "test.tar.gz", content, http.StatusBadRequest)
- })
-
- t.Run("Download", func(t *testing.T) {
- defer PrintCurrentTest(t)()
-
- downloadFile := func(filename string) {
- req := NewRequest(t, "GET", fmt.Sprintf("%s/files/%s/%s/%s", root, packageName, packageVersion, filename))
- req = AddBasicAuthHeader(req, user.Name)
- resp := MakeRequest(t, req, http.StatusOK)
-
- assert.Equal(t, []byte(content), resp.Body.Bytes())
- }
-
- downloadFile("test.whl")
- downloadFile("test.tar.gz")
-
- pvs, err := packages.GetVersionsByPackageType(db.DefaultContext, user.ID, packages.TypePyPI)
- assert.NoError(t, err)
- assert.Len(t, pvs, 1)
- assert.Equal(t, int64(2), pvs[0].DownloadCount)
- })
-
- t.Run("PackageMetadata", func(t *testing.T) {
- defer PrintCurrentTest(t)()
-
- req := NewRequest(t, "GET", fmt.Sprintf("%s/simple/%s", root, packageName))
- req = AddBasicAuthHeader(req, user.Name)
- resp := MakeRequest(t, req, http.StatusOK)
-
- htmlDoc := NewHTMLParser(t, resp.Body)
- nodes := htmlDoc.doc.Find("a").Nodes
- assert.Len(t, nodes, 2)
-
- hrefMatcher := regexp.MustCompile(fmt.Sprintf(`%s/files/%s/%s/test\..+#sha256-%s`, root, packageName, packageVersion, hashSHA256))
-
- for _, a := range nodes {
- for _, att := range a.Attr {
- switch att.Key {
- case "href":
- assert.Regexp(t, hrefMatcher, att.Val)
- case "data-requires-python":
- assert.Equal(t, "3.6", att.Val)
- default:
- t.Fail()
- }
- }
- }
- })
- }
|